I've done a bit of work.

Here is an example main with the new class

int main(int argc, char** argv) {
    Saetta::Context mycontext;
    Saetta::Publisher mypubber(mycontext); //=new Saetta::Server;
    Saetta::Subscriber mysubber(mycontext); //=new Saetta::Client;
    mysubber.SubscribeTopic("B");

    for(int i=0;i<10;i++)
    {
        mypubber.PubMessage(3,"A","We don't want to see this","1");
        mypubber.PubMessage(3,"B","We would like to see this","1");
        //mypuber.worker();
        usleep(100);
        cout << mysubber.worker();
        usleep(1500000);
    }
    mypubber.~Publisher();
    mysubber.~Subscriber();
    return (EXIT_SUCCESS);
}

I'm seeking help here: the constructors can work like it's depicted above, or providing them with an external context. The above mentioned piece of code does not work: when the Publisher constructor attempts to create the socket, ZMQ_SOCKET returns EFAULT. I checked with GDB and the context pointer is absolutely correct, so I can't see where is the problem.

A version that actually works is this

int main(int argc, char** argv) {
    void *mycontext = zmq_init(1);
    Saetta::Publisher mypubber(mycontext); //=new Saetta::Server;
    Saetta::Subscriber mysubber(mycontext); //=new Saetta::Client;
    mysubber.SubscribeTopic("B");

    for(int i=0;i<10;i++)
    {
        mypubber.PubMessage(3,"A","We don't want to see this","1");
        mypubber.PubMessage(3,"B","We would like to see this","1");
        //mypuber.worker();
        usleep(100);
        cout << mysubber.worker();
        usleep(1500000);
    }
    mypubber.~Publisher();
    mysubber.~Subscriber();
    return (EXIT_SUCCESS);
}

Attached you'll find the source of the class (currently testing only Pub and Sub). As you can see the variables passed are the same, yet when I just pass the pointer it works, when I pass the whole class it doesn't work.
Hope someone can chime in.

Claudio


On 16/12/12 15:01, Claudio Carbone wrote:
Hello all.

Recently I've been moving from C to C++ and I'd really like to exploit
the capabilities of this language.
One of the scenario I've been thinking about is a ZMQ generic class able
to do all the work.

Generally speaking abstracting from the socket and context creation is
easy: you just need to tell the constructor what kind of socket you
want, and where you want it.
What would be much more useful and powerful (and complex) would be to
design a way to pass message syntax in an abstract form, so you can add
as many messages you want to the class.

That way a sender subclass would create as many sending hooks as message
types, while a receiver class would create as many callbacks as message
types.
I think this would be a nice design, thus I'm asking here for advices
and ideas about such an implementation.

Thank you
Claudio
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

/* 
 * File:   saetta.cpp
 * Author: erupter
 * 
 * Created on December 14, 2012, 6:32 PM
 */

#include "saetta.h"
namespace Saetta {

    Context::Context()
    {
        this->_zmq_context_ptr = zmq_init(1);
        if (this->_zmq_context_ptr == NULL)
            throw error_t ();      
    }
    
    Context::~Context()
    {
        zmq_ctx_destroy(this->_zmq_context_ptr);
    }
    
    void generic::classinit()
    {
        this->_counter=0;
        this->_internal_context=0;
        this->_zmq_context_ptr=0;
        this->_zmq_socket_ptr=0;
    }
    
    void generic::contextcreate()
    {
        this->_zmq_context_ptr = zmq_init(1);
        if (this->_zmq_context_ptr == NULL)
            throw error_t ();      
        this->_internal_context = 1;     
    }
    void generic::contextcreate(void* context)
    {
        this->_zmq_context_ptr = context;
        if (this->_zmq_context_ptr == NULL)
            throw error_t ();          
        this->_internal_context = 0;        
    }
    void generic::contextcreate(Context context)
    {
        this->_zmq_context_ptr = context._zmq_context_ptr;
        if (this->_zmq_context_ptr == NULL)
            throw error_t ();          
        this->_internal_context = 0;
    }


    void generic::socketcreate(int skt_type)
    {
        this->_zmq_socket_ptr = zmq_socket(this->_zmq_context_ptr, skt_type);    
        if (this->_zmq_socket_ptr == NULL)
            throw error_t ();    
    }

    void generic::socketbind()
    {
    int rc = zmq_bind (this->_zmq_socket_ptr, "tcp://*:5678");
        if (rc != 0)
            throw error_t ();    
    }

    void generic::socketbind(char* ip_addr)
    {
    int rc = zmq_bind (this->_zmq_socket_ptr, ip_addr);
        if (rc != 0)
            throw error_t ();    
    }
    void generic::socketconn()
    {
        int rc = zmq_connect (this->_zmq_socket_ptr, "tcp://localhost:5678");
        if (rc != 0)
            throw error_t ();
    }
    void generic::socketconn(char* ip_addr)
    {
        int rc = zmq_connect (this->_zmq_socket_ptr, ip_addr);
        if (rc != 0)
            throw error_t ();    
    }

    void generic::close()
    {
    zmq_close(this->_zmq_socket_ptr);
        if (this->_internal_context)
            zmq_ctx_destroy(this->_zmq_context_ptr);      
    }

    Publisher::Publisher(void)
    {
        Publisher::classinit();
        Publisher::contextcreate();
        Publisher::socketcreate(ZMQ_PUB);
        Publisher::socketbind();

        this->_counter=0;
    }
    Publisher::Publisher(void * context)
    {
        Publisher::classinit();
        Publisher::contextcreate(context);
        Publisher::socketcreate(ZMQ_PUB);
        Publisher::socketbind();
        this->_counter=0;
    }
        
    Publisher::Publisher(Context context)
    {
        Publisher::classinit();
        Publisher::contextcreate(context);
        Publisher::socketcreate(ZMQ_PUB);
        Publisher::socketbind();
        this->_counter=0;
    }
    Publisher::Publisher(Context context, char* ip_str)
    {
        Publisher::classinit();
        Publisher::contextcreate(context);
        Publisher::socketcreate(ZMQ_PUB);
        Publisher::socketbind(ip_str);
        this->_counter=0;
    }
    Publisher::~Publisher()
    {
        Publisher::close();
    }

    void Publisher::PubMessage (int count, ...)
    {
        va_list argptr;

        va_start( argptr, count );      

        for( ; count > 1; count-- ) {
            //char *mystr = va_arg(argptr, char*);
            s_sendmore (this->_zmq_socket_ptr, va_arg(argptr, char*));
        }
        s_send(this->_zmq_socket_ptr, va_arg(argptr, char*));

        va_end( argptr );           
    }

    void Publisher::worker()
    {
        char _count[3];
            sprintf(_count,"%03d",this->_counter);
            // Write two messages, each with an envelope and content
            s_sendmore (this->_zmq_socket_ptr, "A");
            s_sendmore (this->_zmq_socket_ptr, "We don't want to see this");
            s_send     (this->_zmq_socket_ptr, _count);
            s_sendmore (this->_zmq_socket_ptr, "B");
            s_sendmore (this->_zmq_socket_ptr, "We would like to see this");
            s_send     (this->_zmq_socket_ptr, _count);
            this->_counter++;
    }

    Subscriber::Subscriber()
    {
        Subscriber::classinit();
        Subscriber::contextcreate();
        Subscriber::socketcreate(ZMQ_SUB);
        Subscriber::socketconn(); 
    }
    
    Subscriber::Subscriber(void* context)
    {
        Subscriber::classinit();
        Subscriber::contextcreate(context);
        Subscriber::socketcreate(ZMQ_SUB);
        Subscriber::socketconn();     
    }
    
    Subscriber::Subscriber(Context context)
    {
        Subscriber::classinit();
        Subscriber::contextcreate(context);
        Subscriber::socketcreate(ZMQ_SUB);
        Subscriber::socketconn();     
    }

    Subscriber::Subscriber(Context context, char* ip_str)
    {
        Subscriber::classinit();
        Subscriber::contextcreate(context);
        Subscriber::socketcreate(ZMQ_SUB);
        Subscriber::socketconn(ip_str);    
    }

    Subscriber::~Subscriber()
    {
        Subscriber::close();
    }

    int Subscriber::SubscribeTopic(std::string topic)
    {
        return (zmq_setsockopt(this->_zmq_socket_ptr, ZMQ_SUBSCRIBE, topic.c_str(), topic.length()));    
    }

    std::string Subscriber::worker()
    {
        // Read envelope with address
        std::string address = s_recv (this->_zmq_socket_ptr);
        // Read message contents
        std::string contents = s_recv (this->_zmq_socket_ptr);

        std::string termination = s_recv (this->_zmq_socket_ptr);

        std::stringstream ss;
        ss << "[" << address << "] " << contents << "  | Termination ["<< termination<<"]"<< std::endl;
        return (ss.str());    
    }


}
/* 
 * File:   saetta.h
 * Author: erupter
 *
 * Created on December 14, 2012, 6:32 PM
 */

#ifndef SAETTA_H
#define	SAETTA_H
extern "C" {
    #include <zmq.h>
    #include <zhelpers.h>
}
#include <string>
#include <sstream>
#include <iostream>


namespace Saetta
{
    class Context
    {
        public:
            Context();
            virtual ~Context();
            
        protected:
            friend class generic;
            void* _zmq_context_ptr;
            
    };
    
    class generic
    {
        protected:
            void *_zmq_context_ptr;
            void *_zmq_socket_ptr;
            int _internal_context;
            int _counter;
            inline void classinit();
            inline void contextcreate();
            inline void contextcreate(void* context);
            inline void contextcreate(Context context);
            inline void socketcreate(int skt_type);
            inline void socketbind();
            inline void socketbind(char* ip_addr);
            inline void socketconn();
            inline void socketconn(char* ip_addr);
            inline void close();
            
            
        
        
    };
    
    class Publisher: public generic
    {
        
        public:
            Publisher(void);
            Publisher(void* context);
            Publisher(Context context);
            Publisher(Context context, char* ip_str);
            virtual ~Publisher();
            void PubMessage(int count, ...);
            void worker();
            friend class Context;
    };
    
    class Subscriber: public generic
    {
        public:
            Subscriber(void);
            Subscriber(void* context);
            Subscriber(Context context);
            Subscriber(Context context, char* ip_str);
            virtual ~Subscriber();
            int SubscribeTopic(std::string topic);
            std::string worker ();
            friend class Context;

    };
    
    class Request: public generic
    {
        public:
            Request(void);
            Request(void* context);
            Request(Context context);
            Request(Context context, char* ip_str);
            virtual ~Request();
            friend class Context;

    };
    
    class Reply: public generic
    {
        public:
            Reply(void);
            Reply(void* context);
            Reply(Context context);
            Reply(Context context, char* ip_str);
            virtual ~Reply();
            friend class Context;
    };

}


#endif	/* SAETTA_H */

_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to