I regularly requests for "how do I tell if ... disconnected" etc.
I'm currently struggling with a way to associate a message over a
PULL/PUSH and/or REQ/REP multiplexed zocket with the previous hop so
that I can locally associate incoming messages with cached data/etc,
without needing to send that information as part of a message payload.
I wonder if these two seemingly unrelated issues could simply be handled
by providing ZMQ with two additional setsockopts:
struct userdata_t { } ;
typedef userdata_t* (attach_fn)(void) ;
socket_t { ... setsockopt(zmq.ATTACHUSERDATA, attach_fn) ; ... }
typedef void (detach_fn)(userdata_t*) ;
socket_t { ... setsockopt(zmq.DETACHUSERDATA, detach_fn) ; ... }
When ZMQ internally handles a new incoming connection and has finished
with the tear-up process and is ready to begin receiving messages on
said connection, it will call the registered attach_fn for that
socket_t. Where possible, the user can either derive their user data
class from userdata_t or else simply cast their data type to userdata_t.
When a connection terminates, the registered detach_fn is called with
the user data. I can't decide whether it would be a good idea to allow
this to happen regardless of the value of the user data: I can see use
cases where you might just want to know when connections are lost
without assigning userdata to them, in which case you might want your
detach_fn called with NULL.
Example code:
struct Session : public zmq::userdata_t
{
enum State { _Invalid, _Connected, _Authenticated } ;
Session() : connectedAt(time(NULL)), state(_Connected), crypt() {}
// The destructor could probably do something more...
// such as send a fake "hangup" zmq::message_t downstream
// if the sender has hung up unexpectedly, or put the session
// onto an expiry queue yadda yadda..
~Session() {}
time_t connectedAt ;
State state ;
EncryptionStuff crypt ; /// public/private keys for per-packet
'cryption.
bool authenticate(const zmq::message_t& msg) ;
void process(const zmq::message_t& msg) ;
} ;
zmq::userdata_t* myAttachFn()
{
Session* const session = new Session ;
return (zmq::userdata_t*)session ;
}
void myDetachFn(zmq::userdata_t* session)
{
if ( session == NULL ) return ;
delete session ;
}
bool Session::authenticate(const zmq::message_t& msg)
{
// First packet should contain "hello world"
return ( strcmp((char*)msg.data(), "hello world") == 0 ) ;
}
void Session::process(const zmq::message_t& msg)
{
switch ( session->state )
{
case _Invalid:
// Session has gone bad, ignore it.
return ;
break ;
case _Connected:
// This is the first message from this connection.
if ( session->authenticate(msg) )
state = Session::_Authenticated ;
else
state = Session::_Invalid ;
break ;
case _Authenticated:
printf("%s\n", (char*)msg.data()) ;
break ;
}
int main(const int argc, const char* const argv[])
{
zmq::Context_t c(1) ;
zmq::socket_t s(c, zmq::PULL) ;
s.bind("tcp://0.0.0.0:54321") ;
s.setsockopt(zmq.ATTACHUSERDATA, myAttachFn) ;
s.setsockopt(zmq.DETACHUSERDATA, myDetachFn) ;
while ( true )
{
zmq::message_t msg ;
s.recv(msg) ;
Session* const session = (Session*)s.getUserData() ;
// or msg.getUserData?
assert( session != NULL ) ; //
how could it be?
session->process(msg) ;
}
}
- Oliver
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev