Hi Martin,

On Mon, Dec 6, 2010 at 4:38 AM, Martin Sustrik <sust...@250bpm.com> wrote:
> Hi Dhammika,
>
>>>> Patch attached.
>>>
>>> I've checked the patch. Shouldn't the 'dispatch' be called when the
>>> engine
>>> is actually being moved to another thread (inside finalise_initialisation
>>> function) rather than in 'read' function?
>
> I still feel there's something wrong with the patch. Let me explain.
>
> The initialisation is ended when both your identity is sent and peer's
> identity is received. The two events can happen in whatever order (in most
> cases the send happens immediately, while recv happens later on, but that's
> not 100% guaranteed).
>
> Thus, finalise_initialisation function is called both in read() and flush()
> i.e. twice for each connection establishment.
>
> The code inside finalise_initialisation checks whether both events have
> already happened:
>
>    if (sent && received) ...
>
> and if so, it proceeds. Otherwise it does nothing.
>
> Now, in the patch the sending are receiving are handled differently. In one
> case finalise_initialisation is called, in the other dispatch_engine.
>
> I don't see the reason for keeping the behaviour asymetric.
>


Thanks for pointing this out.
I've fixed asymmetric calls in following patch.
Both read and write call finalize_initialization(), and flush()
dispatches engine, if it's already finalized.



From 19cb14923805cfd51be3778ad7cf7395c14522c7 Mon Sep 17 00:00:00 2001
From: dhammika <dhamm...@gmail.com>
Date: Tue, 7 Dec 2010 17:04:59 -0800
Subject: [PATCH] fix race condition in session init


Signed-off-by: dhammika <dhamm...@gmail.com>
---
 src/zmq_engine.cpp |   21 +++++++++++++++++++--
 src/zmq_engine.hpp |    2 ++
 src/zmq_init.cpp   |   43 ++++++++++++++++++++++++++-----------------
 src/zmq_init.hpp   |    3 +++
 4 files changed, 50 insertions(+), 19 deletions(-)

diff --git a/src/zmq_engine.cpp b/src/zmq_engine.cpp
index 0c1070d..e069e10 100644
--- a/src/zmq_engine.cpp
+++ b/src/zmq_engine.cpp
@@ -40,6 +40,7 @@ zmq::zmq_engine_t::zmq_engine_t (fd_t fd_, const
options_t &options_) :
     outsize (0),
     encoder (out_batch_size),
     inout (NULL),
+    ephemeral_inout (NULL),
     options (options_),
     plugged (false)
 {
@@ -57,8 +58,9 @@ void zmq::zmq_engine_t::plug (io_thread_t
*io_thread_, i_inout *inout_)
 {
     zmq_assert (!plugged);
     plugged = true;
+    ephemeral_inout = NULL;

-    //  Conncet to session/init object.
+    //  Connect to session/init object.
     zmq_assert (!inout);
     zmq_assert (inout_);
     encoder.set_inout (inout_);
@@ -89,6 +91,7 @@ void zmq::zmq_engine_t::unplug ()
     //  Disconnect from init/session object.
     encoder.set_inout (NULL);
     decoder.set_inout (NULL);
+    ephemeral_inout = inout;
     inout = NULL;
 }

@@ -139,7 +142,13 @@ void zmq::zmq_engine_t::in_event ()
     }

     //  Flush all messages the decoder may have produced.
-    inout->flush ();
+    //  If IO handler has unplugged engine, flush transient IO handler.
+    if (unlikely (!plugged)) {
+        zmq_assert (ephemeral_inout);
+        ephemeral_inout->flush ();
+    } else {
+        inout->flush ();
+    }

     if (disconnection)
         error ();
@@ -152,6 +161,13 @@ void zmq::zmq_engine_t::out_event ()

         outpos = NULL;
         encoder.get_data (&outpos, &outsize);
+
+        //  If IO handler has unplugged engine, flush transient IO handler.
+        if (unlikely (!plugged)) {
+            zmq_assert (ephemeral_inout);
+            ephemeral_inout->flush ();
+            return;
+        }

         //  If there is no data to send, stop polling for output.
         if (outsize == 0) {
@@ -200,3 +216,4 @@ void zmq::zmq_engine_t::error ()
     unplug ();
     delete this;
 }
+
diff --git a/src/zmq_engine.hpp b/src/zmq_engine.hpp
index c5f95dc..4847324 100644
--- a/src/zmq_engine.hpp
+++ b/src/zmq_engine.hpp
@@ -69,6 +69,8 @@ namespace zmq
         encoder_t encoder;

         i_inout *inout;
+        //  Detached transient inout handler.
+        i_inout *ephemeral_inout;

         options_t options;

diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp
index a796faa..8f90065 100644
--- a/src/zmq_init.cpp
+++ b/src/zmq_init.cpp
@@ -34,6 +34,7 @@ zmq::zmq_init_t::zmq_init_t (io_thread_t *io_thread_,
       socket_base_t *socket_, session_t *session_, fd_t fd_,
       const options_t &options_) :
     own_t (io_thread_, options_),
+    ephemeral_engine (NULL),
     sent (false),
     received (false),
     socket (socket_),
@@ -64,8 +65,7 @@ bool zmq::zmq_init_t::read (::zmq_msg_t *msg_)
         options.identity.size ());
     sent = true;

-    //  If initialisation is done, pass the engine to the session and
-    //  destroy the init object.
+    //  Try finalize initialization.
     finalise_initialisation ();

     return true;
@@ -91,6 +91,9 @@ bool zmq::zmq_init_t::write (::zmq_msg_t *msg_)
     }

     received = true;
+
+    //  Try finalize initialization.
+    finalise_initialisation ();

     return true;
 }
@@ -101,9 +104,9 @@ void zmq::zmq_init_t::flush ()
     if (!received)
         return;

-    //  If initialisation is done, pass the engine to the session and
-    //  destroy the init object.
-    finalise_initialisation ();
+    //  Initialization is done, dispatch engine.
+    if (ephemeral_engine)
+        dispatch_engine ();
 }

 void zmq::zmq_init_t::detach ()
@@ -136,6 +139,20 @@ void zmq::zmq_init_t::process_unplug ()
 void zmq::zmq_init_t::finalise_initialisation ()
 {
     if (sent && received) {
+        //  Unplug and prepare to dispatch engine.
+        ephemeral_engine = engine;
+        engine = NULL;
+        ephemeral_engine->unplug ();
+        return;
+    }
+}
+
+void zmq::zmq_init_t::dispatch_engine ()
+{
+    if (sent && received) {
+        //  Engine must be detached.
+        zmq_assert (!engine);
+        zmq_assert (ephemeral_engine);

         //  If we know what session we belong to, it's easy, just send the
         //  engine to that session and destroy the init object. Note that we
@@ -143,9 +160,7 @@ void zmq::zmq_init_t::finalise_initialisation ()
         //  lifetime of this object in contained in the lifetime of the session
         //  so the pointer cannot become invalid without notice.
         if (session) {
-            engine->unplug ();
-            send_attach (session, engine, peer_identity, true);
-            engine = NULL;
+            send_attach (session, ephemeral_engine, peer_identity, true);
             terminate ();
             return;
         }
@@ -165,9 +180,7 @@ void zmq::zmq_init_t::finalise_initialisation ()
             zmq_assert (session);
             session->inc_seqnum ();
             launch_sibling (session);
-            engine->unplug ();
-            send_attach (session, engine, peer_identity, false);
-            engine = NULL;
+            send_attach (session, ephemeral_engine, peer_identity, false);
             terminate ();
             return;
         }
@@ -178,9 +191,7 @@ void zmq::zmq_init_t::finalise_initialisation ()
         //  than by send_attach.
         session = socket->find_session (peer_identity);
         if (session) {
-            engine->unplug ();
-            send_attach (session, engine, peer_identity, false);
-            engine = NULL;
+            send_attach (session, ephemeral_engine, peer_identity, false);
             terminate ();
             return;
         }
@@ -194,9 +205,7 @@ void zmq::zmq_init_t::finalise_initialisation ()
         zmq_assert (session);
         session->inc_seqnum ();
         launch_sibling (session);
-        engine->unplug ();
-        send_attach (session, engine, peer_identity, false);
-        engine = NULL;
+        send_attach (session, ephemeral_engine, peer_identity, false);
         terminate ();
         return;
     }
diff --git a/src/zmq_init.hpp b/src/zmq_init.hpp
index 511f141..c5aa459 100644
--- a/src/zmq_init.hpp
+++ b/src/zmq_init.hpp
@@ -44,6 +44,7 @@ namespace zmq
     private:

         void finalise_initialisation ();
+        void dispatch_engine ();

         //  i_inout interface implementation.
         bool read (::zmq_msg_t *msg_);
@@ -57,6 +58,8 @@ namespace zmq

         //  Associated wire-protocol engine.
         i_engine *engine;
+        //  Detached transient engine.
+        i_engine *ephemeral_engine;

         //  True if our own identity was already sent to the peer.
         bool sent;
-- 
1.7.0.4



Dhammika
_______________________________________________
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to