Logo Search packages:      
Sourcecode: dc-qt version File versions  Download package

rpcdriver.h

#ifndef _RPC_DRIVER_H_
#define _RPC_DRIVER_H_

#include <boost/utility.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread/condition.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/thread.hpp>

#include <queue>
#include <string>
#include <list>

#include "protectedbuffer.h"
#include "rpccommandhandler.h"
#include "commanddispatcher.h"

#include "socketlistener.h"
#include "inputbuffer.h"
#include "outputbuffer.h"
#include "dataoutputstream.h"


namespace rpc {

/**
 * 
 */
class RpcDriver : 
      public SocketListener, 
      private boost::noncopyable
{
public:
            
      /**
       * Create a server socket that listens for incoming connections
       * on the specified port. We also create a command dispatcher.
       * @param listeningPort Port that on wich we shold listen for incoming connections.
       */
      RpcDriver( );
      virtual ~RpcDriver();
      
      /**
       * This is the entry point for the thread that listens for
       * incomming data or connections. It will just keep on doing
       * select on the sockets that we have created and when there
       * is something available it will handle that request.
       */
      void receiver();
      
      /**
       * This is the entry point for the thread that does all the 
       * sending of commands. It will wait for an event to get signaled,
       * and when it is it will check the queue of commands and send them
       * all unti there is nothing more to send.
       */
      void sender();
      
      /**
       * Queues a command for sending and then signals the sender
       * thread that there are things available in the queue.
       * @param cmd The command that should be sent
       */
      void queueCommand( int socketId, const CmdPtr& cmd );
      
      /**
       * Register an class that implements the RpcCommandHandler interface
       * with the driver. This will result in the class instance receiving
       * notifications when a command that corresponds to the name it is
       * registered for arrives from a client.
       * @param pCmd Pointer to the command handler that is to be registered.
       */
      void registerCommand( RpcCommandHandlerPtr& pCmd );
            
      /**
       * Stop the sender thread. Do this by setting the abort member and 
       * signaling the thread that there is something available in the queue.
       */
      void stopSender();
      
      /**
       * This method will wait for all the threads to stop.
       */
      void waitForCompletion();
      
      //////////////////
      // Implement the SocketListener interface
      //////////////////
      
      /**
       * Called when there is data available to read one of the sockets
       * we are listening to.
       * @param socket The socket that provides the data.
       */
      virtual void onRead( Socket* socket );
      
      
      virtual void onWrite( Socket* ) {}
      virtual void onConnect( Socket* ) {}
      
      /**
       * Called when a socket gets disconnected. When this happens we will
       * remove it from the socket manager so that we dont select on it any more.
       * @param socket The socket that has been disconnected.
       */
      virtual void onDisconnect( Socket* socket );
      
      /**
       * When there is a connection incoming. What we do is to accept the
       * connection.
       * @param socket The socket that has an incomming connection.
       */
      virtual void onIncoming( Socket* socket );

      void enableAuthenticationChecking( bool yes );
      void setClientAuthenticated( int clientId, bool yes );
      
protected:
            
      /**
       * This method creates the neccesary threads for the server. it is 
       * needed since we can not create the threads in the ctor. This is because
       * sockets and such might not be initialized at this point and we dont 
       * want to start listening on them if thats the case.
       */
      void startServer();
      
      /// One of these entries sould be available for each socket we are listening to.
00130       typedef struct SocketData {
            boost::shared_ptr<InputBuffer<int> > sizeBuf;
            CmdInputBufferPtr dataBuf;
      } SocketData;
      
      /// Map a socket to its data
      typedef std::map< Socket*, SocketData> SocketDataMap;

      SocketDataMap socketDataMap;

private:
            
      /// Used to queue commands in the rpc driver
00143       typedef struct CmdEntry {
            int socket;
            CmdPtr cmd;
      } CmdEntry;
      
      /**
       * Convert a list of boost::any to a DataOutputStream
       */
      OutputBufferPtr convertCmdToBinaryBuffer( CmdPtr cmd ) const;
      void convertListToBinary(std::list< boost::any >::const_iterator cmdIt,
                                           std::list< boost::any >::const_iterator itEnd,
                                           DataOutputStream& stream) const;
      /**
       * Send the command to the client represented by the socket id.
       * @param buf Binary buffer that contains the command.
       * @param socketId Id of the socket/client this command should be sent to.
                                 If this value is -1 it will be sent to all sockets/clients
       */
      void sendCommandToClients( OutputBufferPtr buf, int socketId ) const;
            
      /// To handle the parsing if commands and dispatching them to the
      /// commands that have registered.
      CommandDispatcherPtr cmdDispatcher;
                  
      /// Mutex used in a condition to make the receiver thread wait
      /// for data to arrive in the outgoing queue
      boost::mutex commandMonitor;
      
      /// Protect the abort flag
      boost::mutex abortMutex;
      
      /// Condition used to wait for data to be present in the command queue.
      /// Used together with m_queueMonitor
      boost::condition commandAvailable;
      
      /// Queue of commands
      ProtectedBuffer<CmdEntry> commandBuffer;
      
      /// Thread that handles commands
      boost::thread_group threads;
      
      /// The sender thread
      boost::thread* senderThread;

      /// Set this to make the threads stop.
      bool abort;

      
};

typedef boost::shared_ptr<RpcDriver> RpcDriverPtr;


class RpcClientDriver : public RpcDriver
{
public:
      /**
       * Init parent class and try to connect to a server.
       * @param addr Address of the server we should connect to.
       * @param port Port that the server listens on.
       */
      RpcClientDriver( const char* addr, int port );
      void connect();
      virtual ~RpcClientDriver();
      
private:
      /// Client socket
      Socket* clientSocket;
      int serverPort;
      const char* serverAddr;
};

class RpcServerDriver : public RpcDriver
{
public:
      RpcServerDriver( int port );
      void startListening();
      virtual ~RpcServerDriver();
private:
      Socket* server;
      int serverPort;
};

}

#endif

Generated by  Doxygen 1.6.0   Back to index