Logo Search packages:      
Sourcecode: dc-qt version File versions  Download package

rpcdriver.cpp

/*
 *  RpcDriver.cpp
 *  RpcDriver
 *
 *  Created by Mikael Gransell on 1/31/06.
 *  Copyright 2006 __MyCompanyName__. All rights reserved.
 *
 */

#include <boost/bind.hpp>

#include <typeinfo>
#include <iostream>

#include <netinet/in.h>

#include "rpcdriver.h"

#include "socketmanager.h"
#include "inputbuffer.h"
#include "datainputstream.h"
#include "dataoutputstream.h"
#include "rpcexception.h"

using std::cout;
using std::endl;
using std::map;
using std::list;
using std::string;

namespace rpc {
      
RpcClientDriver::RpcClientDriver( const char* addr, int port ) : serverPort(port), serverAddr(addr)
{
      try {
            
            Socket* client = new Socket( this );
            //client->connect(addr, port);
            clientSocket = client;
            boost::shared_ptr< InputBuffer<int> > sizeBuf( new InputBuffer<int>( 4 ) );
            SocketData sd = { sizeBuf, CmdInputBufferPtr() };
            socketDataMap[clientSocket] = sd;
            //SocketManager::instance()->addSocket(clientSocket);
            enableAuthenticationChecking(false);            
      }
      catch( const SocketException& e ) {
            cout << "Could not create RpcClientDriver. " << e.getReason() << endl;
            throw;
      }
}

void RpcClientDriver::connect()
{
      clientSocket->connect(serverAddr, serverPort);
      SocketManager::instance()->addSocket(clientSocket);
      // Start the threads
      startServer();
}

RpcClientDriver::~RpcClientDriver()
{
      clientSocket->disconnect();
      SocketManager::instance()->removeSocket(clientSocket);
}


RpcServerDriver::RpcServerDriver( int port ) : serverPort(port)
{
      try {
            enableAuthenticationChecking(true);
            // Init the server listening socket
            server = new Socket( this );
            
      }
      catch( const SocketException& e ) {
            cout << "Could not init server socket. " << e.getReason() << endl;
            throw;
      }
}

void RpcServerDriver::startListening()
{
      try {
            server->listen( serverPort );
            SocketManager::instance()->addSocket( server );
      }
      catch( const SocketException& e ) {
            cout << "Could not init server socket. " << e.getReason() << endl;
            throw;
      }
      // Start the threads
      startServer();    
}

RpcServerDriver::~RpcServerDriver()
{
}

      
RpcDriver::RpcDriver( ) : abort( false )
{     
      // Create command dispatcher
      CommandDispatcherPtr tempDisp( new CommandDispatcher() );
      cmdDispatcher = tempDisp;
      
      threads.create_thread( boost::bind( &CommandDispatcher::waitForCommand,
                                                            cmdDispatcher.get() ) );
}

RpcDriver::~RpcDriver()
{
      // Disconnect all the sockets at this point
      std::set<Socket*> sockets = SocketManager::instance()->getSockets();
      std::set<Socket*>::iterator it = sockets.begin();
      for(;it!=sockets.end();it++) {
            SocketManager::instance()->removeSocket(*it);
            (*it)->disconnect();
            delete *it;
      }
      // Signal command handler thread to exit
      
      // Join with the command handler thread before destructing
      
      // Removed by rikard, this is really not a good place to stop
      // threads when you dont event know if they are running!!

      // threads.join_all();
}

void RpcDriver::receiver()
{
      cout << "enter receiver" << endl;
      try {
            
            // How should we signal the receiver thread that we should quit.
            while( !abort ) {
                  
                  // Perform a select on all the sockets
                  SocketManager::instance()->select();
            }
      }
      catch( const SocketException& e ) {
            
            cout << "Exception caught while selecting. " << e.getReason() << endl;
      }
      cout << "exit receiver" << endl;
}

void RpcDriver::sender()
{
      try {
            
            // Maybe we need another lock here so that no other threads execute thi method by misstake
            
            // Lock for use with the condition below
            boost::mutex::scoped_lock commandNotifyLock( commandMonitor );
            
            // Keep waiting for command until someone tells us to stop
            while( true ) {
                  
                  //cout << "Waiting for a command" << endl;
                  
                  // Wait for commands to be available in the
                  commandAvailable.wait( commandNotifyLock );
                  
                  // Lock abort flag
                  {
                        boost::mutex::scoped_lock lk( abortMutex );
                        
                        // Check abort flag
                        if( abort ) {
                              
                              // Stop execution
                              break;
                        }     
                  }
                  
                  // Keep sending all the commands in the buffer until it is empty
                  while( !commandBuffer.empty() ) {
                        
                    // Get next command
                    CmdEntry cmdEntry = commandBuffer.dequeue();
                    
                    
                    try {
                      
                      OutputBufferPtr buf = convertCmdToBinaryBuffer( cmdEntry.cmd );
                      
                      // Write to clients
                      sendCommandToClients( buf, cmdEntry.socket );
                    }
                    catch( const RpcException& rpcExcept ) {
                      cout << "--- RpcException in sender() ---" << endl;
                      cout << "Reason: " << rpcExcept.getReason() << endl;
                      string funk = boost::any_cast<string>(cmdEntry.cmd->front());
                      cout << "ProcedureName: " << funk << endl;

                    }
                    catch( const SocketException& socketExcept ) {
                      cout << "Could not write socket. " << socketExcept.getReason() << endl;
                    }
                  }
            }
      }
      catch ( const boost::lock_error& e ) {
            
            cout << "Could not lock mutex!" << endl;
      }
      catch( const std::bad_alloc& allocExcept ) {
            
            cout << "Could not allocate buffer. " << allocExcept.what() << endl;
      }
      
      cout << "Stoping sender thread" << endl;
}

void RpcDriver::queueCommand( int socketId, const CmdPtr& cmd )
{
      // Add item to buffer. It should handle the synch by itself
      CmdEntry entry = { socketId, cmd };
      commandBuffer.enqueue( entry );
      
      // Signal waiting thread that there is a command available
      commandAvailable.notify_one();
}

void RpcDriver::registerCommand( RpcCommandHandlerPtr& pCmd )
{
      // Just forward the command handler to the command dispatcher
      cmdDispatcher->registerCommand( pCmd );
}

void RpcDriver::startServer()
{
      threads.create_thread( boost::bind( &RpcDriver::receiver, this ) );
      senderThread = threads.create_thread( boost::bind( &RpcDriver::sender, this ) );
}

void RpcDriver::stopSender()
{
      
      try {
            
      {
            boost::mutex::scoped_lock lk( abortMutex );
            abort = true;
      }
            
            // Notify thread
            commandAvailable.notify_one();      
      }
      catch( const boost::lock_error& e ) {
            
            cout << "Could not lock mutex" << endl;
      }
}

void RpcDriver::waitForCompletion()
{
      cout << "Waiting for threads" << endl;
      //threads.join_all();
      senderThread->join();
      cout << "Done Waiting" << endl;
}


void RpcDriver::onRead( Socket* socket )
{
    // We can read some data from the socket now
      int bytesRead = 0;
      try {
            
            // Find the data for this socket and try to read into it
            map< Socket*, SocketData >::iterator socketData = socketDataMap.find( socket );
            if( socketData != socketDataMap.end() ) {
                  
                  // Read an int to get the size of the payload
                  if( !socketData->second.sizeBuf->filled() ) {
                        
                        bytesRead = socket->read( socketData->second.sizeBuf->curPtr(), 
                                                              socketData->second.sizeBuf->remaining() );
                        socketData->second.sizeBuf->advance( bytesRead );
                        
                        // If we have read the size now
                        if( socketData->second.sizeBuf->filled() ) {
                              
                              // Get available size
                              uint32_t size = ntohl(*(socketData->second.sizeBuf->bufPtr()));
                              // Reset old data
                              CmdInputBufferPtr tempBuffer( new InputBuffer<char>( size ) );
                              socketData->second.dataBuf = tempBuffer;
                        }
                  }
                  else {
                        
                        // We have read the size and there should be an input buffer available
                        bytesRead = socket->read( socketData->second.dataBuf->curPtr(), 
                                                              socketData->second.dataBuf->remaining() );
                        socketData->second.dataBuf->advance( bytesRead );
                        
                        // Check if we have read all the data that we are supposed to
                        if( socketData->second.dataBuf->filled() ) {
                              // Reset the buffer pointer so that it points to the beginning of the buffer
                              socketData->second.dataBuf->reset();
                              
                              // Send the command to the command dispatcher
                              cmdDispatcher->handleCommand( socket->getId(), socketData->second.dataBuf );
                              // Reset the sizeBuf so that we can start to receive a new size
                              socketData->second.sizeBuf->reset();
                        }
                  }     
            }
            else {
                  cout << "Could not find data corresponding to socket" << endl;
            }
      }
      catch( const RpcException& rpcExcept ) {
            
            cout << "Execption " << rpcExcept.getReason() << endl;
      }
      catch( const SocketException& socketExcept ) {
            
            cout << "Could not read socket. " << socketExcept.getReason() << endl;
      }
      catch( const std::bad_alloc& allocExcept ) {
            
            cout << "Could not allocate buffer. " << allocExcept.what() << endl;
      }
      
}

void RpcDriver::onDisconnect( Socket* socket )
{
      try {
            cout << "onDisconnect" << endl;
            // Remove the socket from the socket manager
            SocketManager::instance()->removeSocket( socket );
            
            map< Socket*, SocketData>::iterator socketData = socketDataMap.find( socket );
            socketDataMap.erase( socketData );
            setClientAuthenticated( socket->getId(), false );
      }
      catch( const SocketException& e ) {
            
            cout << "Exception caught when removing socket from socket manager" <<
                        e.getReason() <<
                        endl;
      }
}


void RpcDriver::onIncoming( Socket* socket )
{
      try {
            cout << "onIncoming" << endl;
            // Accept the conenction
            Socket* s = socket->accept( this );
            boost::shared_ptr< InputBuffer<int> > sizeBuf( new InputBuffer<int>( 4 ) );
            SocketData sd = { sizeBuf, CmdInputBufferPtr() };
            socketDataMap[s] = sd;
            cout << "Accepted new connection " << s->getFd() << endl;
            SocketManager::instance()->addSocket( s );
      }
      catch( const SocketException& e ) {
            
            cout << "Exception caught while accepting connection. " << 
            e.getReason() << 
            endl;
      }     
}


void RpcDriver::convertListToBinary(list< boost::any >::const_iterator cmdIt,
list< boost::any >::const_iterator itEnd,
DataOutputStream& stream) const
{
// Iterate through all the entries in the command and write the to the stream
            while( cmdIt != itEnd ) {
                                    
                  if( cmdIt->type() == typeid(int) ) {
                        stream.writeByte(eRpcParamTypeInt);
                        stream.writeInt( boost::any_cast<int>(*cmdIt) );
                  }
                  else if( cmdIt->type() == typeid(char) ) {
                        stream.writeByte(eRpcParamTypeByte);
                        stream.writeByte( boost::any_cast<char>(*cmdIt) );
                  }
                  else if( cmdIt->type() == typeid(short) ) {
                        stream.writeByte(eRpcParamTypeShort);
                        stream.writeShort( boost::any_cast<short>(*cmdIt) );
                  }
                  else if( cmdIt->type() == typeid(bool) ) {
                        stream.writeByte(eRpcParamTypeBoolean);
                        stream.writeBool( boost::any_cast<bool>(*cmdIt) );
                  }
                  else if( cmdIt->type() == typeid(int64_t) ) {
                        stream.writeByte(eRpcParamTypeLong);
                        stream.writeLong( boost::any_cast<int64_t>(*cmdIt) );
                  }
                  else if( cmdIt->type() == typeid(string) ) {
                        stream.writeByte(eRpcParamTypeString);
                        stream.writeUTF( boost::any_cast<string>(*cmdIt) );
                  }
                  else if( cmdIt->type() == typeid(std::list<boost::any>) ) {
                        stream.writeByte(eRpcParamTypeList);
                        stream.writeInt((boost::any_cast<std::list<boost::any> >(*cmdIt)).size());
                        const std::list<boost::any>& lst = boost::any_cast<const std::list<boost::any> >(*cmdIt);
                        convertListToBinary(lst.begin(),lst.end(),stream);
//(boost::any_cast<std::list<boost::any> >(*cmdIt)).begin(),
//(boost::any_cast<std::list<boost::any> >(*cmdIt)).end(),
//stream);
                  }
                  else {
                        cout << "unknown" << endl;
                        throw RpcException( "Invalid type in command" );
                  }
                  
                  ++cmdIt;
            }
}

OutputBufferPtr RpcDriver::convertCmdToBinaryBuffer( CmdPtr cmd ) const
{
      OutputBufferPtr buf( new OutputBuffer() );
      DataOutputStream stream( buf.get() );
      
      // Write a dummy int that will later hold the size
      stream.writeInt(0);
      
      try {
            
            // To iterate through the list of params
            list< boost::any >::const_iterator cmdIt = cmd->begin();
            
            // First parameter should be the name
            //cout << "Sending command: " << boost::any_cast<string>(*cmdIt) << endl;
            stream.writeUTF(boost::any_cast<string>(*cmdIt));
            ++cmdIt;
            
            convertListToBinary(cmdIt,cmd->end(),stream);
      }
      catch( const boost::bad_any_cast& e ) {
            cout << e.what() << endl;
            throw RpcException( "Could not parse command" );
      }
      
      // Finally we write the size of the buffer to the first pos
      // We have to do something about this crapy line of code
      int* sizePtr = reinterpret_cast<int*>(const_cast<char*>(buf->getByteArray()));
      // Subtract the size parameter from the size since this is not part of the command
      *sizePtr = htonl(buf->getSize()-4);
      
      return buf;
}

void RpcDriver::sendCommandToClients( OutputBufferPtr buf, int socketId ) const
{
      //cout << "sendCommandToClients" << endl;
      // Send to all connected clients
      if(socketId==-1)
      {
            const std::set<Socket*>& sockets = SocketManager::instance()->getSockets();
            std::set<Socket*>::iterator it = sockets.begin();
            while(it!=sockets.end()) {
                  if( !(*it)->getIsServer() && cmdDispatcher->isClientAuthenticated((*it)->getId())) {
                        
                        try {
                              int bytesWritten = 0;
                              while ( bytesWritten < buf->getSize() ) {
                                    bytesWritten += (*it)->write( buf->getByteArray() + bytesWritten,buf->getSize() - bytesWritten);
                              }           
                        }
                        catch( const SocketException& e ) {
                              cout << "Could not write to socket. Error: " << e.getReason() << endl;
                        }
                  }
                  it++;
            }
      } else {
            Socket* clientSocket = SocketManager::instance()->findSocket( socketId );
      
            if( clientSocket ) {
                  
             if(cmdDispatcher->isClientAuthenticated(clientSocket->getId())) {
                  int bytesWritten = 0;
                  while ( bytesWritten < buf->getSize() ) {
                        
                        bytesWritten += clientSocket->write( buf->getByteArray() + bytesWritten,
                                                                               buf->getSize() - bytesWritten);
                  }
            }
            }
            else {
                  throw RpcException("Could not find a socket to match id");
            }
      }
}

void RpcDriver::setClientAuthenticated( int clientId, bool yes )
{
      cmdDispatcher->setClientAuthenticated(clientId, yes);
}

void RpcDriver::enableAuthenticationChecking( bool yes )
{
      cmdDispatcher->enableAuthenticationChecking(yes);

}

}

Generated by  Doxygen 1.6.0   Back to index