Logo Search packages:      
Sourcecode: dc-qt version File versions  Download package

commanddispatcher.cpp

/*
 *  CommandDispatcher.cpp
 *  RpcDriver
 *
 *  Created by Mikael Gransell on 1/31/06.
 *  Copyright 2006 __MyCompanyName__. All rights reserved.
 *
 */

#include <iostream>
using namespace std;

#include "commanddispatcher.h"
#include "rpcexception.h"
#include "types.h"

namespace rpc {

void CommandDispatcher::setClientAuthenticated(int clientId,bool authenticated)
{
      if(authenticated) authenticatedClients.insert(clientId);
      else authenticatedClients.erase(clientId);
}

bool CommandDispatcher::isClientAuthenticated(int clientId)
{
      if(!requireAuthentication) return true;
      return authenticatedClients.find(clientId)!=authenticatedClients.end();
}

00031 void rpc::CommandDispatcher::registerCommand( RpcCommandHandlerPtr cmd )
{
      // Lock the mutex
      boost::mutex::scoped_lock lock( handlerListMutex );
      
      // Add the item. If the item does not exist a new list will be created
      // and the item added to it.
      registeredCommands[cmd->getCmdName()].push_back( cmd );
}

00041 void rpc::CommandDispatcher::handleCommand( int sender, CmdInputBufferPtr cmd )
{
      try {
            
            // Create a DataInputStream for easy reading
            DataInputStreamPtr cmdStream( new DataInputStream( cmd ) );
            
            // Will hold the command
            list<boost::any> params;
            
            // The the id of the sender
            params.push_back( sender );
            // Get name of the command. 
            // This should allways be the first param in the command stream
            string cmdName = retrieveCommandName( cmdStream );
            //cout << "handleCommand: " << cmdName << endl;

            // Add the name as the first param in the list
            params.push_back( cmdName );
            // Parse the params of the command. 
            // retriveCommandName should have advanced the buffer to the first param
            parseCommandParams( cmdStream, params );
            
            // Add the command to the buffer
            commandBuffer.enqueue( params );
            
            // Signal waiting thread
            commandAvailable.notify_one();
      }
      catch( const RpcException& rpcExcept ) {
            
            cout << "Could not handle command. Error: " << rpcExcept.getReason() << endl;
      }
}

00076 void rpc::CommandDispatcher::waitForCommand()
{
      try {
            
            // To monitor the addition of commands in the incomming buffer
            boost::mutex::scoped_lock lock( commandMonitor );
            
            bool stop = false;
            while( !stop ) {
                  
                  // Wait for something to get placed in the queue
                  commandAvailable.wait( lock );
                  
                  // Handle all the commands in the list
                  while( !commandBuffer.empty() ) {
                                                
                        try {
                              
                              // Get first command
                              list<boost::any> cmdParams = commandBuffer.dequeue();
                              
                              // Second should be the id of the client that sent this command
                              int sender = boost::any_cast<int>(cmdParams.front());
                              cmdParams.pop_front();
                              // First element should be the command name
                              string cmdName = boost::any_cast<string>(cmdParams.front());
                              cmdParams.pop_front();
                              if(!requireAuthentication || isClientAuthenticated(sender) || cmdName=="authenticate" )
                                    dispatchCommand( cmdName, sender, cmdParams );
                        }
                        catch( const boost::bad_any_cast& castExcept ) {
                              
                              cout << "Could not cast first parameter to command name. Error: " <<
                                          castExcept.what() << endl;
                        }
                        catch( const std::out_of_range& rangeExcept ) {
                              
                              cout << "Could not get command. Error: " << rangeExcept.what() << endl;
                        }
                        catch( const RpcException& except ) {
                              
                              cout << "Could not dispatch command. Error: " << except.getReason() << endl;
                        }
                  }
            }
      }
      catch( std::bad_alloc& allocExcept ) {
            
            cout << "Could not allocate memory for buffer. Error: " << allocExcept.what() << endl;
      }
      catch( const boost::lock_error& lkExcept ) {
            
            cout << "Could not lock command monitor mutex. Error: " << lkExcept.what() << endl;
      }
      catch(...) {
            // Make sure we catch everything so that we dont fuck up the entire app
            cout << "Unknown exception about to leave thread." << endl;
      }
}

boost::any CommandDispatcher::getNextParam(DataInputStreamPtr paramStream, 
                                                               list<boost::any>& params) 
{
      // Hold the next argument
      boost::any nextParam;
      int size;
      list<boost::any> listElements;
      // Read the first byte containing the type of the next param
      char type = paramStream->readByte();
      
      switch( type ) {
            
            case eRpcParamTypeInt:
                  nextParam = paramStream->readInt();
                  break;
                  
            case eRpcParamTypeByte:
                  nextParam = paramStream->readByte();
                  break;
                  
            case eRpcParamTypeShort:
                  nextParam = paramStream->readShort();
                  break;
                  
            case eRpcParamTypeBoolean:
                  nextParam = paramStream->readBool();
                  break;
                  
            case eRpcParamTypeLong:
                  nextParam = paramStream->readLong();
                  break;
                  
            case eRpcParamTypeString:
                  nextParam = paramStream->readUTF();
                  break;
            case eRpcParamTypeList:
                  size = paramStream->readInt();
                  
                  while(size--) listElements.push_back(getNextParam(paramStream,listElements));
                        nextParam = listElements;
                  break;
            default:
                  // Could not match parameter type so we bail
                  throw RpcException("Invalid parameter type");
      }
      return nextParam;
}

void CommandDispatcher::parseCommandParams( DataInputStreamPtr paramStream, 
                                                                  list<boost::any>& params )
{
      try {
            
            // Keep reading data as long as we have something more that the size byte
            while( paramStream->remainingBytes() > 1 ) {
                  boost::any nextParam = getNextParam(paramStream,params);
                                    
                  // Add param to our list
                  params.push_back( nextParam );
            }     
      }
      catch( const RpcException& rpcExcept ) {
            
            cout << "Could not read data. Error: " << rpcExcept.getReason() << endl;
            throw;
      }
}

00204 string CommandDispatcher::retrieveCommandName( DataInputStreamPtr cmdStream )
{
      string cmdName = "";
      
      // Read a string from the stream
      try {
            cmdName = cmdStream->readUTF();
      }
      catch( const RpcException& except ) {
            
            cout << "Could not read command name from stream" << endl;
            throw;
      }
      
      return cmdName;
}

void CommandDispatcher::dispatchCommand( const string& cmdName,
                                                             int sender,
                                                             const list<boost::any>& params )
{
      try {
            // Lock access to the list
            boost::mutex::scoped_lock lock( handlerListMutex );
            
            // Find the list of command handlers that correspond to this command
            map<string, RpcCommandHandlerList>::iterator it = registeredCommands.find( cmdName );
            
            if( it != registeredCommands.end() ) {
                  
                  // Iterate the list of commands and notify them of the command
                  RpcCommandHandlerList::iterator cmdIt = it->second.begin();
                  while( cmdIt != it->second.end() ) {
                        
                        // We need at least numParams amount of params so that 
                        // we dont index outside the list
                        if( (*cmdIt)->getNumParams() <= params.size() ) {
                              // Notify
                              (*cmdIt)->handleCommand( sender, params );
                        }
                        else {
                          cout << "Invalid number of params: " << cmdName << endl;
                        }
                        
                        // Next command
                        ++cmdIt;
                  }
            }
            else {
                  
                  // Maybe log something here.
                  cout << "Could not find any handlers for command: " << 
                              cmdName << 
                              endl;
            }
      }
      catch( const boost::lock_error& e ) {
            
            cout << "Could not lock mutex" << endl;
      }
}

}

Generated by  Doxygen 1.6.0   Back to index