Hi Martin, On Mon, Dec 6, 2010 at 4:38 AM, Martin Sustrik <sust...@250bpm.com> wrote: > Hi Dhammika, > >>>> Patch attached. >>> >>> I've checked the patch. Shouldn't the 'dispatch' be called when the >>> engine >>> is actually being moved to another thread (inside finalise_initialisation >>> function) rather than in 'read' function? > > I still feel there's something wrong with the patch. Let me explain. > > The initialisation is ended when both your identity is sent and peer's > identity is received. The two events can happen in whatever order (in most > cases the send happens immediately, while recv happens later on, but that's > not 100% guaranteed). > > Thus, finalise_initialisation function is called both in read() and flush() > i.e. twice for each connection establishment. > > The code inside finalise_initialisation checks whether both events have > already happened: > > if (sent && received) ... > > and if so, it proceeds. Otherwise it does nothing. > > Now, in the patch the sending are receiving are handled differently. In one > case finalise_initialisation is called, in the other dispatch_engine. > > I don't see the reason for keeping the behaviour asymetric. >
Thanks for pointing this out. I've fixed asymmetric calls in following patch. Both read and write call finalize_initialization(), and flush() dispatches engine, if it's already finalized. From 19cb14923805cfd51be3778ad7cf7395c14522c7 Mon Sep 17 00:00:00 2001 From: dhammika <dhamm...@gmail.com> Date: Tue, 7 Dec 2010 17:04:59 -0800 Subject: [PATCH] fix race condition in session init Signed-off-by: dhammika <dhamm...@gmail.com> --- src/zmq_engine.cpp | 21 +++++++++++++++++++-- src/zmq_engine.hpp | 2 ++ src/zmq_init.cpp | 43 ++++++++++++++++++++++++++----------------- src/zmq_init.hpp | 3 +++ 4 files changed, 50 insertions(+), 19 deletions(-) diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp index 0c1070d..e069e10 100644 --- a/src/zmq_engine.cpp +++ b/src/zmq_engine.cpp @@ -40,6 +40,7 @@ zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const options_t &options_) : outsize (0), encoder (out_batch_size), inout (NULL), + ephemeral_inout (NULL), options (options_), plugged (false) { @@ -57,8 +58,9 @@ void zmq::zmq_engine_t::plug (io_thread_t *io_thread_, i_inout *inout_) { zmq_assert (!plugged); plugged = true; + ephemeral_inout = NULL; - // Conncet to session/init object. + // Connect to session/init object. zmq_assert (!inout); zmq_assert (inout_); encoder.set_inout (inout_); @@ -89,6 +91,7 @@ void zmq::zmq_engine_t::unplug () // Disconnect from init/session object. encoder.set_inout (NULL); decoder.set_inout (NULL); + ephemeral_inout = inout; inout = NULL; } @@ -139,7 +142,13 @@ void zmq::zmq_engine_t::in_event () } // Flush all messages the decoder may have produced. - inout->flush (); + // If IO handler has unplugged engine, flush transient IO handler. + if (unlikely (!plugged)) { + zmq_assert (ephemeral_inout); + ephemeral_inout->flush (); + } else { + inout->flush (); + } if (disconnection) error (); @@ -152,6 +161,13 @@ void zmq::zmq_engine_t::out_event () outpos = NULL; encoder.get_data (&outpos, &outsize); + + // If IO handler has unplugged engine, flush transient IO handler. + if (unlikely (!plugged)) { + zmq_assert (ephemeral_inout); + ephemeral_inout->flush (); + return; + } // If there is no data to send, stop polling for output. if (outsize == 0) { @@ -200,3 +216,4 @@ void zmq::zmq_engine_t::error () unplug (); delete this; } + diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp index c5f95dc..4847324 100644 --- a/src/zmq_engine.hpp +++ b/src/zmq_engine.hpp @@ -69,6 +69,8 @@ namespace zmq encoder_t encoder; i_inout *inout; + // Detached transient inout handler. + i_inout *ephemeral_inout; options_t options; diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index a796faa..8f90065 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -34,6 +34,7 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_, socket_base_t *socket_, session_t *session_, fd_t fd_, const options_t &options_) : own_t (io_thread_, options_), + ephemeral_engine (NULL), sent (false), received (false), socket (socket_), @@ -64,8 +65,7 @@ bool zmq::zmq_init_t::read (::zmq_msg_t *msg_) options.identity.size ()); sent = true; - // If initialisation is done, pass the engine to the session and - // destroy the init object. + // Try finalize initialization. finalise_initialisation (); return true; @@ -91,6 +91,9 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_) } received = true; + + // Try finalize initialization. + finalise_initialisation (); return true; } @@ -101,9 +104,9 @@ void zmq::zmq_init_t::flush () if (!received) return; - // If initialisation is done, pass the engine to the session and - // destroy the init object. - finalise_initialisation (); + // Initialization is done, dispatch engine. + if (ephemeral_engine) + dispatch_engine (); } void zmq::zmq_init_t::detach () @@ -136,6 +139,20 @@ void zmq::zmq_init_t::process_unplug () void zmq::zmq_init_t::finalise_initialisation () { if (sent && received) { + // Unplug and prepare to dispatch engine. + ephemeral_engine = engine; + engine = NULL; + ephemeral_engine->unplug (); + return; + } +} + +void zmq::zmq_init_t::dispatch_engine () +{ + if (sent && received) { + // Engine must be detached. + zmq_assert (!engine); + zmq_assert (ephemeral_engine); // If we know what session we belong to, it's easy, just send the // engine to that session and destroy the init object. Note that we @@ -143,9 +160,7 @@ void zmq::zmq_init_t::finalise_initialisation () // lifetime of this object in contained in the lifetime of the session // so the pointer cannot become invalid without notice. if (session) { - engine->unplug (); - send_attach (session, engine, peer_identity, true); - engine = NULL; + send_attach (session, ephemeral_engine, peer_identity, true); terminate (); return; } @@ -165,9 +180,7 @@ void zmq::zmq_init_t::finalise_initialisation () zmq_assert (session); session->inc_seqnum (); launch_sibling (session); - engine->unplug (); - send_attach (session, engine, peer_identity, false); - engine = NULL; + send_attach (session, ephemeral_engine, peer_identity, false); terminate (); return; } @@ -178,9 +191,7 @@ void zmq::zmq_init_t::finalise_initialisation () // than by send_attach. session = socket->find_session (peer_identity); if (session) { - engine->unplug (); - send_attach (session, engine, peer_identity, false); - engine = NULL; + send_attach (session, ephemeral_engine, peer_identity, false); terminate (); return; } @@ -194,9 +205,7 @@ void zmq::zmq_init_t::finalise_initialisation () zmq_assert (session); session->inc_seqnum (); launch_sibling (session); - engine->unplug (); - send_attach (session, engine, peer_identity, false); - engine = NULL; + send_attach (session, ephemeral_engine, peer_identity, false); terminate (); return; } diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp index 511f141..c5aa459 100644 --- a/src/zmq_init.hpp +++ b/src/zmq_init.hpp @@ -44,6 +44,7 @@ namespace zmq private: void finalise_initialisation (); + void dispatch_engine (); // i_inout interface implementation. bool read (::zmq_msg_t *msg_); @@ -57,6 +58,8 @@ namespace zmq // Associated wire-protocol engine. i_engine *engine; + // Detached transient engine. + i_engine *ephemeral_engine; // True if our own identity was already sent to the peer. bool sent; -- 1.7.0.4 Dhammika _______________________________________________ zeromq-dev mailing list zeromq-dev@lists.zeromq.org http://lists.zeromq.org/mailman/listinfo/zeromq-dev