Code of my classes attached.
A main would look like this
int main (void)
{
Zmqcpp::Context mycontext(1);
Zmqcpp::Publisher mypubber(mycontext, "tcp://*:5678", ZMQCPP_BIND);
Zmqcpp::Subscriber mysubber(mycontext, "tcp://localhost:5678",
ZMQCPP_CONNECT);
mysubber.SubscribeTopic("B");
while(1)
{
char mystr[3];
sprintf(mystr,"%03d",counter);
mypubber.PubMessage(3,"A","We don't want to see this",mystr);
mypubber.PubMessage(3,"B","We would like to see this",mystr);
printf("Cycle [%s]\n",mystr);
usleep(100);
for (int i=0;i<3;i++)
cout << mysubber.receive() << std::endl;
counter++;
usleep(1500000);
}
mypubber.~Publisher();
mysubber.~Subscriber();
mycontext.~Context();
return (EXIT_SUCCESS);
}
Claudio
On 12/22/2012 02:14 PM, Claudio Carbone wrote:
To make debugging an application easier I often joined the server and
client in a single program.
Now I'm using my own c++ classes built on top of the c bindings.
This means I have a class instance for every object:
Context,socket,publisher and subscriber.
When used like this I find that somehow multiple threads are spawned
automatically effectively nullifying having both sides in a single
application.
In fact unless I start putting in callbacks,output redirection and
other stuff, having multiple threads makes my life a lot harder:gdb
starts having problems following the code (sometimes problems seem to
happen nowhere or just in disassembly), simple stdout doesn't work,
and so on.
Now unless I forgot something (quite plausible) ZMQ should create
threads just on context creation.
Why am I seeing multiple threads with a single context?
Being on the phone I don't have the code, will post it later.
Thank you
Claudio
-- Sent from my ParanoidAndroid Galaxy Nexus with K-9 Mail.
/*
* File: zmqcpp.cpp
* Author: erupter
*
* Created on December 14, 2012, 6:32 PM
*/
#include <zmq.h>
#include "zmqcpp.h"
namespace Zmqcpp {
Context::Context()
{
this->_zmq_context_ptr = zmq_init(1);
if (this->_zmq_context_ptr == NULL)
throw error_t ();
}
Context::Context(int threads)
{
this->_zmq_context_ptr = zmq_init(threads);
if (this->_zmq_context_ptr == NULL)
throw error_t ();
}
Context::~Context()
{
zmq_ctx_destroy(this->_zmq_context_ptr);
}
Context* Context::getPtr()
{
return this;
}
Socket::Socket(Context* context, int skt_type)
{
this->_zmq_socket_ptr = 0;
this->_zmq_socket_ptr = zmq_socket(context->_zmq_context_ptr, skt_type);
if (this->_zmq_socket_ptr == NULL)
throw error_t ();
}
Socket::Socket(Context* context, int skt_type, std::string ip_addr, int conn)
{
this->_zmq_socket_ptr = 0;
this->_zmq_socket_ptr = zmq_socket(context->_zmq_context_ptr, skt_type);
if (this->_zmq_socket_ptr == NULL)
throw error_t ();
if (conn == ZMQCPP_BIND)
zmq_bind(this->_zmq_socket_ptr, ip_addr.c_str());
else
{
if (conn == ZMQCPP_CONNECT)
zmq_connect(this->_zmq_socket_ptr, ip_addr.c_str());
else
throw error_t();
}
}
Socket::~Socket()
{
zmq_close(this->_zmq_socket_ptr);
//zmq_disconnect(this->_zmq_socket_ptr, this->_ip_addr);
}
Socket* Socket::getPtr()
{
return this;
}
int Socket::setsockopt(int option, const void *optval,size_t optvallen)
{
return zmq_setsockopt(this->_zmq_socket_ptr,option, optval,optvallen);
}
int Socket::Send(std::string msg)
{
return s_send(this->_zmq_socket_ptr,msg.c_str());
}
int Socket::SendMore(std::string msg)
{
return s_sendmore(this->_zmq_socket_ptr,msg.c_str());
}
int Socket::connect()
{
int rc = zmq_connect (this->_zmq_socket_ptr, "tcp://localhost:5678");
if (rc != 0)
throw error_t ();
this->_ip_addr = "tcp://localhost:5678";
return 0;
}
int Socket::connect(std::string ip_addr)
{
int rc = zmq_connect (this->_zmq_socket_ptr, ip_addr.c_str());
if (rc != 0)
throw error_t ();
this->_ip_addr = ip_addr;
return 0;
}
int Socket::bind()
{
int rc = zmq_bind (this->_zmq_socket_ptr, "tcp://*:5678");
if (rc != 0)
throw error_t ();
this->_ip_addr = "tcp://*:5678";
return 0;
}
int Socket::bind(std::string ip_addr)
{
int rc = zmq_bind (this->_zmq_socket_ptr, ip_addr.c_str());
if (rc != 0)
throw error_t ();
this->_ip_addr = ip_addr;
return 0;
}
char* Socket::Receive()
{
return s_recv (this->_zmq_socket_ptr);
}
void generic::classinit()
{
this->_counter=0;
this->_internal_context=0;
this->_zmq_context_ptr=0;
this->_zmq_socket_ptr=0;
}
void generic::contextcreate()
{
this->_zmq_context_ptr = zmq_init(1);
if (this->_zmq_context_ptr == NULL)
throw error_t ();
this->_internal_context = 1;
}
void generic::contextcreate(void* context)
{
this->_zmq_context_ptr = context;
if (this->_zmq_context_ptr == NULL)
throw error_t ();
this->_internal_context = 0;
}
void generic::contextcreate(Context context)
{
// this->_zmq_context_ptr = context._zmq_context_ptr;
if (this->_zmq_context_ptr == NULL)
throw error_t ();
this->_internal_context = 0;
}
void generic::socketcreate(int skt_type)
{
this->_zmq_socket_ptr = zmq_socket(this->_zmq_context_ptr, skt_type);
if (this->_zmq_socket_ptr == NULL)
throw error_t ();
}
void generic::socketbind()
{
int rc = zmq_bind (this->_zmq_socket_ptr, "tcp://*:5678");
if (rc != 0)
throw error_t ();
}
void generic::socketbind(char* ip_addr)
{
int rc = zmq_bind (this->_zmq_socket_ptr, ip_addr);
if (rc != 0)
throw error_t ();
}
void generic::socketconn()
{
int rc = zmq_connect (this->_zmq_socket_ptr, "tcp://localhost:5678");
if (rc != 0)
throw error_t ();
}
void generic::socketconn(char* ip_addr)
{
int rc = zmq_connect (this->_zmq_socket_ptr, ip_addr);
if (rc != 0)
throw error_t ();
}
void generic::close()
{
zmq_close(this->_zmq_socket_ptr);
if (this->_internal_context)
zmq_ctx_destroy(this->_zmq_context_ptr);
}
int generic2::Bind()
{
return zmq_bind(this->_socket,"tcp://*:5678");
}
int generic2::Bind(std::string ip_addr)
{
return zmq_bind(this->_socket,"epgm://192.168.127.253;239.192.1.1:5678");
}
int generic2::Connect()
{
return zmq_connect(this->_socket,"tcp://*:5678");
}
int generic2::Connect(std::string ip_addr)
{
return zmq_connect(this->_socket,"epgm://192.168.127.253;239.192.1.1:5678");
}
/*Publisher::Publisher(void * context)
{
this->_zmq_context_ptr=context;
this->_socket = new Socket(context,ZMQ_PUB);
this->_counter=0;
}
Publisher::Publisher(void* context, char* ip_str)
{
this->_zmq_context_ptr=context;
this->_socket = new Socket(context,ZMQ_PUB, ip_str);
this->_counter=0;
}*/
Publisher::Publisher(Context* context)
{
this->_counter=0;
this->_socket=0;
this->_zmq_context=0;
this->_zmq_context=context;
this->_socket = new Zmqcpp::Socket(this->_zmq_context,ZMQ_PUB);
this->_socket->bind();
}
Publisher::Publisher(Context* context, std::string ip_str, int conn)
{
this->_counter=0;
this->_socket=0;
this->_zmq_context=0;
this->_zmq_context=context;
this->_socket = new Zmqcpp::Socket(this->_zmq_context,ZMQ_PUB, ip_str, conn);
}
Publisher::~Publisher()
{
this->_socket->~Socket();
}
void Publisher::PubMessage (int count, ...)
{
va_list argptr;
va_start( argptr, count );
for( ; count > 1; count-- ) {
//char *mystr = va_arg(argptr, char*);
this->_socket->SendMore (va_arg(argptr, char*));
}
this->_socket->Send(va_arg(argptr, char*));
va_end( argptr );
}
void Publisher::worker()
{/*
char _count[3];
sprintf(_count,"%03d",this->_counter);
// Write two messages, each with an envelope and content
s_sendmore (this->_zmq_socket_ptr, "A");
s_sendmore (this->_zmq_socket_ptr, "We don't want to see this");
s_send (this->_zmq_socket_ptr, _count);
s_sendmore (this->_zmq_socket_ptr, "B");
s_sendmore (this->_zmq_socket_ptr, "We would like to see this");
s_send (this->_zmq_socket_ptr, _count);
this->_counter++;*/
}
Subscriber::Subscriber(Context* context)
{
this->_socket=0;
this->_zmq_context=0;
this->_zmq_context=context;
this->_socket = new Zmqcpp::Socket(this->_zmq_context,ZMQ_SUB);
}
Subscriber::Subscriber(Context* context, std::string ip_str, int conn)
{
this->_socket=0;
this->_zmq_context=0;
this->_zmq_context=context;
this->_socket = new Zmqcpp::Socket(this->_zmq_context,ZMQ_SUB, ip_str, conn);
}
Subscriber::~Subscriber()
{
//Subscriber::close();
}
int Subscriber::SubscribeTopic(std::string topic)
{
return (this->_socket->setsockopt(ZMQ_SUBSCRIBE, topic.c_str(), topic.length()));
}
std::string Subscriber::worker()
{
// Read envelope with address
std::string address = this->_socket->Receive();
// Read message contents
std::string contents = this->_socket->Receive();
std::string termination = this->_socket->Receive();
std::stringstream ss;
ss << "[" << address << "] " << contents << " | Termination [" << termination<< "]" << std::endl;
return (ss.str());
}
std::string Subscriber::receive()
{
return this->_socket->Receive();
}
}/*
* File: zmqcpp.h
* Author: erupter
*
* Created on December 14, 2012, 6:32 PM
*/
#ifndef ZMQCPP_H
#define ZMQCPP_H
extern "C" {
#include <zmq.h>
#include <zhelpers.h>
}
#include <string>
#include <sstream>
#include <iostream>
#define ZMQCPP_BIND 0
#define ZMQCPP_CONNECT 1
namespace Zmqcpp
{
class Context
{
public:
Context();
Context(int threads);
virtual ~Context();
Context* getPtr();
protected:
friend class Socket;
void* _zmq_context_ptr;
};
class Socket
{
public:
Socket(Context* context, int skt_type);
Socket(Context* context, int skt_type, std::string ip_str, int conn);
virtual ~Socket();
Socket* getPtr();
int setsockopt(int option, const void *optval,size_t optvallen);
int connect();
int connect(std::string ip_addr);
int bind();
int bind(std::string ip_addr);
int SendMore(std::string msg);
int Send(std::string msg);
char* Receive();
protected:
void *_zmq_socket_ptr;
std::string _ip_addr;
};
class generic
{
protected:
Context _zmq_context;
void *_zmq_context_ptr;
void *_zmq_socket_ptr;
int _internal_context;
int _counter;
inline void classinit();
inline void contextcreate();
inline void contextcreate(void* context);
inline void contextcreate(Context context);
inline void socketcreate(int skt_type);
inline void socketbind();
inline void socketbind(char* ip_addr);
inline void socketconn();
inline void socketconn(char* ip_addr);
inline void close();
};
class generic2
{
public:
int Bind();
int Bind(std::string ip_addr);
int Connect();
int Connect(std::string ip_addr);
protected:
Zmqcpp::Context* _zmq_context;
Zmqcpp::Socket* _socket;
};
class Publisher: public generic2
{
public:
//Publisher(void* context);
//Publisher(void* context, char* ip_str);
Publisher(Context* context);
Publisher(Context* context, std::string ip_str, int conn);
virtual ~Publisher();
void PubMessage(int count, ...);
void worker();
private:
unsigned int _counter;
};
class Subscriber: public generic2
{
public:
Subscriber(Context* context);
Subscriber(Context* context, std::string ip_str, int conn);
virtual ~Subscriber();
int SubscribeTopic(std::string topic);
std::string worker ();
std::string receive();
};
class Request: public generic
{
public:
Request(Context* context);
Request(Context* context, std::string ip_str, int conn);
virtual ~Request();
};
class Reply: public generic
{
public:
Reply(Context* context);
Reply(Context* context, char* ip_str, int conn);
virtual ~Reply();
};
}
#endif /* ZMQCPP_H */
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev