2011/5/23 Martin Sustrik <[email protected]>: > Hi Fabien, > >>> +Bit 1 (LABEL): _This message part is not part of the original >>> +message._ A value of 1 indicate a message part that weren't part >>> of +the original stream. They are used internally by the >>> transport to +indicate, for example, the origin of a message. + >>> +Bits 2-7: _Reserved_. Bits 1-7 are reserved for future expansion >>> and MUST be set to zero. >>> >>> It should be explained how LABEL flag interacts with MORE flag. I >>> would propose something like this: >>> >>> 1. Labels always occur at the beginning of the message. If label >>> appears elsewhere in the message, the message is malformed. >> >> In present use-case (routing), that's true. Not so sure about future >> use-case but I can't tell why it couldn't be always as start >> neither. > > It's matter of good design. Message envelope dispersed throughout the > message violates clean layering. Additionally, it is a nightmare to > parse at wire speed.
Ack. I attach a new patch with the comment. However, I don't think the code should check for this in all occasions, no ? >>> 2. Labels never have MORE flags set. Combination of LAVEL and MORE >>> on the same message part makes the message malformed. >> >> Make it harder to implement queuing device and require the flag to be >> either exposed externally or to change the behavior of the >> ZMQ_RCVMORE getsockopt. Also, mutually exclusive flags are usually a >> bad smell (mean they lack orthogonality). So, -1 for me here. > > The orthogonality is what should be deliberately avoided here. Note that > LABEL flag and MORE flag live at different layers. LABEL flag is 0MQ > flag, while MORE is an application flag. You can think of it as > user-defined multi-part message being taken as atomic unit and labeled > with various labels by 0MQ. The two should never overlap. > > Unfortunately, backward compatibility concerns force us to mix the two > flags in a single layer, which makes the above non-obvious. Right on this. I was thinking about this for the collector, but can't find a way to completly removed the flag from the user without giving up the frame division of ZMQ messages. May be the next generation of ZMQ API shoud avoid access to frames internally; basically, pushing all the frames on a msg_t and have an api to access only permitted frames ? Would be useful to check how many frames a message is composed and stuff like this, but probably required a more complex API for memory management. Fabien
From 17674c82dfa0b7c9e73f089b6651aea9cc76aeb7 Mon Sep 17 00:00:00 2001 From: Fabien Ninoles <[email protected]> Date: Fri, 20 May 2011 20:58:46 -0400 Subject: [PATCH] Add msg_t::label flag. Include in this change: - documentation of the flag. - add the flags to REQ and ROUTER added frames. - check and drop/assert for the flag in REP/ROUTER sockets. - add a convenient msg_t::check_flags method, making the check of multiple flags more readable. - add a test of the queue device to validate that's nothing broken. Signed-off-by: Fabien Ninoles <[email protected]> --- .gitignore | 1 + doc/zmq_tcp.txt | 10 ++- src/dist.cpp | 4 +- src/encoder.cpp | 4 +- src/fq.cpp | 2 +- src/lb.cpp | 4 +- src/msg.cpp | 6 ++ src/msg.hpp | 4 +- src/pipe.cpp | 8 +- src/rep.cpp | 6 +- src/req.cpp | 8 +- src/session.cpp | 2 +- src/socket_base.cpp | 6 +- src/xrep.cpp | 12 +-- src/xsub.cpp | 8 +- src/zmq_init.cpp | 2 +- tests/Makefile.am | 3 +- tests/test_queue_tcp.cpp | 192 ++++++++++++++++++++++++++++++++++++++++++++++ 18 files changed, 245 insertions(+), 37 deletions(-) create mode 100644 tests/test_queue_tcp.cpp diff --git a/.gitignore b/.gitignore index b309c73..da9f78c 100644 --- a/.gitignore +++ b/.gitignore @@ -26,6 +26,7 @@ tests/test_pair_tcp tests/test_reqrep_inproc tests/test_reqrep_ipc tests/test_reqrep_tcp +tests/test_queue_tcp tests/test_shutdown_stress tests/test_hwm src/platform.hpp* diff --git a/doc/zmq_tcp.txt b/doc/zmq_tcp.txt index 84ec6c8..7d00941 100644 --- a/doc/zmq_tcp.txt +++ b/doc/zmq_tcp.txt @@ -76,7 +76,15 @@ are no more message parts to follow; or that the message being sent is not a multi-part message. A value of 1 indicates that the message being sent is a multi-part message and more message parts are to follow. -Bits 1-7: _Reserved_. Bits 1-7 are reserved for future expansion and MUST be +Bit 1 (LABEL): _This message part is not part of the original +message._ A value of 1 indicate a message part that weren't part of +the original stream. They are used internally by the transport to +indicate, for example, the origin of a message. LABEL frames must only +appear at the beginning of a message. If a LABEL flag appear somewhere +after a frame without it in the same message, the message is considered +malformed. + +Bits 2-7: _Reserved_. Bits 1-7 are reserved for future expansion and MUST be set to zero. The following ABNF grammar represents a single 'frame': diff --git a/src/dist.cpp b/src/dist.cpp index 7c15bfd..aed7888 100644 --- a/src/dist.cpp +++ b/src/dist.cpp @@ -105,7 +105,7 @@ void zmq::dist_t::activated (writer_t *pipe_) int zmq::dist_t::send (msg_t *msg_, int flags_) { // Is this end of a multipart message? - bool msg_more = msg_->flags () & msg_t::more; + bool msg_more = msg_->check_flags (msg_t::more); // Push the message to active pipes. distribute (msg_, flags_); @@ -162,7 +162,7 @@ bool zmq::dist_t::write (class writer_t *pipe_, msg_t *msg_) eligible--; return false; } - if (!(msg_->flags () & msg_t::more)) + if (!msg_->check_flags (msg_t::more)) pipe_->flush (); return true; } diff --git a/src/encoder.cpp b/src/encoder.cpp index a42f06f..734893e 100644 --- a/src/encoder.cpp +++ b/src/encoder.cpp @@ -81,14 +81,14 @@ bool zmq::encoder_t::message_ready () tmpbuf [0] = (unsigned char) size; tmpbuf [1] = (in_progress.flags () & ~msg_t::shared); next_step (tmpbuf, 2, &encoder_t::size_ready, - !(in_progress.flags () & msg_t::more)); + !in_progress.check_flags (msg_t::more)); } else { tmpbuf [0] = 0xff; put_uint64 (tmpbuf + 1, size); tmpbuf [9] = (in_progress.flags () & ~msg_t::shared); next_step (tmpbuf, 10, &encoder_t::size_ready, - !(in_progress.flags () & msg_t::more)); + !in_progress.check_flags (msg_t::more)); } return true; } diff --git a/src/fq.cpp b/src/fq.cpp index 392e554..1f741d3 100644 --- a/src/fq.cpp +++ b/src/fq.cpp @@ -115,7 +115,7 @@ int zmq::fq_t::recv (msg_t *msg_, int flags_) // and replaced by another active pipe. Thus we don't have to increase // the 'current' pointer. if (fetched) { - more = msg_->flags () & msg_t::more; + more = msg_->check_flags (msg_t::more); if (!more) { current++; if (current >= active) diff --git a/src/lb.cpp b/src/lb.cpp index 8eb9157..30c502e 100644 --- a/src/lb.cpp +++ b/src/lb.cpp @@ -98,7 +98,7 @@ int zmq::lb_t::send (msg_t *msg_, int flags_) // switch back to non-dropping mode. if (dropping) { - more = msg_->flags () & msg_t::more; + more = msg_->check_flags (msg_t::more); if (!more) dropping = false; @@ -111,7 +111,7 @@ int zmq::lb_t::send (msg_t *msg_, int flags_) while (active > 0) { if (pipes [current]->write (msg_)) { - more = msg_->flags () & msg_t::more; + more = msg_->check_flags (msg_t::more); break; } diff --git a/src/msg.cpp b/src/msg.cpp index 84ca3e2..4e2f5cc 100644 --- a/src/msg.cpp +++ b/src/msg.cpp @@ -216,6 +216,12 @@ unsigned char zmq::msg_t::flags () return u.base.flags; } +bool zmq::msg_t::check_flags (unsigned char flags_) +{ + return (u.base.flags & flags_) == flags_; +} + + void zmq::msg_t::set_flags (unsigned char flags_) { u.base.flags |= flags_; diff --git a/src/msg.hpp b/src/msg.hpp index 466a96a..9e12a74 100644 --- a/src/msg.hpp +++ b/src/msg.hpp @@ -44,10 +44,11 @@ namespace zmq { public: - // Mesage flags. + // Message flags. enum { more = 1, + label = 2, shared = 128 }; @@ -63,6 +64,7 @@ namespace zmq void *data (); size_t size (); unsigned char flags (); + bool check_flags (unsigned char flags_); void set_flags (unsigned char flags_); void reset_flags (unsigned char flags_); bool is_delimiter (); diff --git a/src/pipe.cpp b/src/pipe.cpp index 36dc808..a17484e 100644 --- a/src/pipe.cpp +++ b/src/pipe.cpp @@ -116,7 +116,7 @@ bool zmq::reader_t::read (msg_t *msg_) return false; } - if (!(msg_->flags () & msg_t::more)) + if (!msg_->check_flags (msg_t::more)) msgs_read++; if (lwm > 0 && msgs_read % lwm == 0) @@ -203,8 +203,8 @@ bool zmq::writer_t::write (msg_t *msg_) if (unlikely (!check_write (msg_))) return false; - pipe->write (*msg_, msg_->flags () & msg_t::more); - if (!(msg_->flags () & msg_t::more)) + pipe->write (*msg_, msg_->check_flags (msg_t::more)); + if (!msg_->check_flags (msg_t::more)) msgs_written++; return true; @@ -215,7 +215,7 @@ void zmq::writer_t::rollback () // Remove incomplete message from the pipe. msg_t msg; while (pipe->unwrite (&msg)) { - zmq_assert (msg.flags () & msg_t::more); + zmq_assert (msg.check_flags (msg_t::more)); int rc = msg.close (); errno_assert (rc == 0); } diff --git a/src/rep.cpp b/src/rep.cpp index 8878bcd..e2c2877 100644 --- a/src/rep.cpp +++ b/src/rep.cpp @@ -42,7 +42,7 @@ int zmq::rep_t::xsend (msg_t *msg_, int flags_) return -1; } - bool more = (msg_->flags () & msg_t::more); + bool more = msg_->check_flags (msg_t::more); // Push message to the reply pipe. int rc = xrep_t::xsend (msg_, flags_); @@ -77,7 +77,7 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_) if (rc != 0) return rc; - if (msg_->flags () & msg_t::more) { + if (msg_->check_flags (msg_t::label|msg_t::more)) { // Empty message part delimits the traceback stack. bool bottom = (msg_->size () == 0); @@ -111,7 +111,7 @@ int zmq::rep_t::xrecv (msg_t *msg_, int flags_) return rc; // If whole request is read, flip the FSM to reply-sending state. - if (!(msg_->flags () & msg_t::more)) { + if (!msg_->check_flags (msg_t::more)) { sending_reply = true; request_begins = true; } diff --git a/src/req.cpp b/src/req.cpp index 6bf502f..dcd5839 100644 --- a/src/req.cpp +++ b/src/req.cpp @@ -48,14 +48,14 @@ int zmq::req_t::xsend (msg_t *msg_, int flags_) msg_t prefix; int rc = prefix.init (); errno_assert (rc == 0); - prefix.set_flags (msg_t::more); + prefix.set_flags (msg_t::more|msg_t::label); rc = xreq_t::xsend (&prefix, flags_); if (rc != 0) return rc; message_begins = false; } - bool more = msg_->flags () & msg_t::more; + bool more = msg_->check_flags (msg_t::more); int rc = xreq_t::xsend (msg_, flags_); if (rc != 0) @@ -83,7 +83,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_) int rc = xreq_t::xrecv (msg_, flags_); if (rc != 0) return rc; - zmq_assert (msg_->flags () & msg_t::more); + zmq_assert (msg_->check_flags (msg_t::more|msg_t::label)); zmq_assert (msg_->size () == 0); message_begins = false; } @@ -93,7 +93,7 @@ int zmq::req_t::xrecv (msg_t *msg_, int flags_) return rc; // If the reply is fully received, flip the FSM into request-sending state. - if (!(msg_->flags () & msg_t::more)) { + if (!msg_->check_flags (msg_t::more)) { receiving_reply = false; message_begins = true; } diff --git a/src/session.cpp b/src/session.cpp index 499fe40..da10404 100644 --- a/src/session.cpp +++ b/src/session.cpp @@ -88,7 +88,7 @@ bool zmq::session_t::read (msg_t *msg_) if (!in_pipe->read (msg_)) return false; - incomplete_in = msg_->flags () & msg_t::more; + incomplete_in = msg_->check_flags (msg_t::more); return true; } diff --git a/src/socket_base.cpp b/src/socket_base.cpp index 3e104a8..24cb314 100644 --- a/src/socket_base.cpp +++ b/src/socket_base.cpp @@ -542,7 +542,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) // If we have the message, return immediately. if (rc == 0) { - rcvmore = msg_->flags () & msg_t::more; + rcvmore = msg_->check_flags (msg_t::more); if (rcvmore) msg_->reset_flags (msg_t::more); return 0; @@ -564,7 +564,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) rc = xrecv (msg_, flags_); if (rc == 0) { - rcvmore = msg_->flags () & msg_t::more; + rcvmore = msg_->check_flags (msg_t::more); if (rcvmore) msg_->reset_flags (msg_t::more); } @@ -584,7 +584,7 @@ int zmq::socket_base_t::recv (msg_t *msg_, int flags_) block = true; } - rcvmore = msg_->flags () & msg_t::more; + rcvmore = msg_->check_flags (msg_t::more); if (rcvmore) msg_->reset_flags (msg_t::more); return 0; diff --git a/src/xrep.cpp b/src/xrep.cpp index 2650f4e..a1e582b 100644 --- a/src/xrep.cpp +++ b/src/xrep.cpp @@ -168,7 +168,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) // If we have malformed message (prefix with no subsequent message) // then just silently ignore it. - if (msg_->flags () & msg_t::more) { + if (msg_->check_flags (msg_t::more|msg_t::label)) { more_out = true; @@ -204,7 +204,7 @@ int zmq::xrep_t::xsend (msg_t *msg_, int flags_) } // Check whether this is the last part of the message. - more_out = msg_->flags () & msg_t::more; + more_out = msg_->check_flags (msg_t::more); // Push the message into the pipe. If there's no out pipe, just drop it. if (current_out) { @@ -233,7 +233,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) if (prefetched) { int rc = msg_->move (prefetched_msg); errno_assert (rc == 0); - more_in = msg_->flags () & msg_t::more; + more_in = msg_->check_flags (msg_t::more); prefetched = false; return 0; } @@ -247,7 +247,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) zmq_assert (inpipes [current_in].active); bool fetched = inpipes [current_in].reader->read (msg_); zmq_assert (fetched); - more_in = msg_->flags () & msg_t::more; + more_in = msg_->check_flags (msg_t::more); if (!more_in) { current_in++; if (current_in >= inpipes.size ()) @@ -269,7 +269,7 @@ int zmq::xrep_t::xrecv (msg_t *msg_, int flags_) errno_assert (rc == 0); memcpy (msg_->data (), inpipes [current_in].identity.data (), msg_->size ()); - msg_->set_flags (msg_t::more); + msg_->set_flags (msg_t::more|msg_t::label); return 0; } @@ -332,5 +332,3 @@ bool zmq::xrep_t::xhas_out () // to be routed to. return true; } - - diff --git a/src/xsub.cpp b/src/xsub.cpp index b0e8cd2..5defa2a 100644 --- a/src/xsub.cpp +++ b/src/xsub.cpp @@ -93,7 +93,7 @@ int zmq::xsub_t::xrecv (msg_t *msg_, int flags_) int rc = msg_->move (message); errno_assert (rc == 0); has_message = false; - more = msg_->flags () & msg_t::more; + more = msg_->check_flags (msg_t::more); return 0; } @@ -113,13 +113,13 @@ int zmq::xsub_t::xrecv (msg_t *msg_, int flags_) // Check whether the message matches at least one subscription. // Non-initial parts of the message are passed if (more || match (msg_)) { - more = msg_->flags () & msg_t::more; + more = msg_->check_flags (msg_t::more); return 0; } // Message doesn't match. Pop any remaining parts of the message // from the pipe. - while (msg_->flags () & msg_t::more) { + while (msg_->check_flags (msg_t::more)) { rc = fq.recv (msg_, ZMQ_DONTWAIT); zmq_assert (rc == 0); } @@ -159,7 +159,7 @@ bool zmq::xsub_t::xhas_in () // Message doesn't match. Pop any remaining parts of the message // from the pipe. - while (message.flags () & msg_t::more) { + while (message.check_flags (msg_t::more)) { rc = fq.recv (&message, ZMQ_DONTWAIT); zmq_assert (rc == 0); } diff --git a/src/zmq_init.cpp b/src/zmq_init.cpp index ea3b55d..a0ae608 100644 --- a/src/zmq_init.cpp +++ b/src/zmq_init.cpp @@ -137,7 +137,7 @@ bool zmq::zmq_init_t::write (msg_t *msg_) zmq_assert (false); } - if (!(msg_->flags () & msg_t::more)) { + if (!msg_->check_flags (msg_t::more)) { received = true; finalise_initialisation (); } diff --git a/tests/Makefile.am b/tests/Makefile.am index ebbc46c..f04ad0a 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -5,7 +5,8 @@ noinst_PROGRAMS = test_pair_inproc \ test_pair_tcp \ test_reqrep_inproc \ test_reqrep_tcp \ - test_hwm + test_hwm \ + test_queue_tcp if !ON_MINGW noinst_PROGRAMS += test_shutdown_stress \ diff --git a/tests/test_queue_tcp.cpp b/tests/test_queue_tcp.cpp new file mode 100644 index 0000000..c60b0f1 --- /dev/null +++ b/tests/test_queue_tcp.cpp @@ -0,0 +1,192 @@ +/* + Copyright (c) 2007-2011 iMatix Corporation + Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file + + This file is part of 0MQ. + + 0MQ is free software; you can redistribute it and/or modify it under + the terms of the GNU Lesser General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + 0MQ is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public License + along with this program. If not, see <http://www.gnu.org/licenses/>. +*/ + +#include "testutil.hpp" +#include "../include/zmq_utils.h" + +#include <pthread.h> + +const char *transport_fe = "tcp://127.0.0.1:5560" ; +const char *transport_be = "tcp://127.0.0.1:5561" ; + +static void copy_msg(void* from, void* to) +{ + zmq_msg_t msg; + + int more = 1; + int rc; + + while (more) + { + more = 0; + + rc = zmq_msg_init(&msg); + assert (rc == 0); + rc = zmq_recvmsg(from, &msg, 0); + assert (rc >= 0); + size_t size = sizeof more; + rc = zmq_getsockopt(from, ZMQ_RCVMORE, &more, &size); + assert (rc == 0); + + int flags = (more ? ZMQ_SNDMORE : 0); + rc = zmq_sendmsg(to, &msg, flags); + assert (rc >= 0); + rc = zmq_msg_close(&msg); + assert (rc == 0); + } +} + +extern "C" +{ + static void *queue (void *ctx) + { + int rc; + + void* be = zmq_socket(ctx, ZMQ_XREQ); + assert (be); + rc = zmq_bind(be, transport_be); + assert (rc == 0); + + void* fe = zmq_socket(ctx, ZMQ_XREP); + assert (fe); + rc = zmq_bind(fe, transport_fe); + assert (rc == 0); + + zmq_pollitem_t items[2]; + + items[0].socket = be; + items[0].events = ZMQ_POLLIN; + items[1].socket = fe; + items[1].events = ZMQ_POLLIN; + + while (true) + { + items[0].revents = 0; + items[1].revents = 0; + int rc = zmq_poll(items, 2, 5000); + if (rc < 0) + { + break; + } + if (rc > 0) + { + if (items[0].revents == ZMQ_POLLIN) + { + copy_msg(items[0].socket, items[1].socket); + } + if (items[1].revents == ZMQ_POLLIN) + { + copy_msg (items[1].socket, items[0].socket); + } + } + } + + zmq_close(fe); + zmq_close(be); + + return NULL; + } +} + +int main (int argc, char *argv []) +{ + + void *ctx = zmq_init (1); + assert (ctx); + + pthread_t thread; + + int rc = pthread_create (&thread, NULL, queue, ctx); + assert (rc == 0); + + void *sb = zmq_socket (ctx, ZMQ_REP); + assert (sb); + rc = zmq_connect (sb, transport_be); + assert (rc == 0); + + void *sc = zmq_socket (ctx, ZMQ_REQ); + assert (sc); + rc = zmq_connect (sc, transport_fe); + assert (rc == 0); + + bounce (sb, sc); + + void *sb2 = zmq_socket (ctx, ZMQ_REP); + assert (sb2); + rc = zmq_connect (sb2, transport_be); + assert (rc == 0); + + void *sc2 = zmq_socket (ctx, ZMQ_REQ); + assert (sc2); + rc = zmq_connect (sc2, transport_fe); + assert (rc == 0); + + zmq_sleep(1); + + const char *content = "12345678ABCDEFGH12345678abcdefgh"; + const char *content2 = "12345678NOPQRSTU12345678nopqrstu"; + + rc = zmq_send (sc, content, 32, 0); + assert (rc == 32); + + rc = zmq_send (sc2, content2, 32, 0); + assert (rc == 32); + + char buf [32]; + rc = zmq_recv (sb, buf, 32, 0); + assert (rc == 32); + + char buf2 [32]; + rc = zmq_recv (sb2, buf2, 32, 0); + assert (rc == 32); + + rc = zmq_send (sb2, buf2, 32, 0); + assert (rc == 32); + + rc = zmq_send (sb, buf, 32, 0); + assert (rc == 32); + + char reply [32]; + rc = zmq_recv (sc, reply, 32, 0); + assert (rc == 32); + assert (memcmp (reply, content, 32) == 0); + + char reply2 [32]; + rc = zmq_recv (sc2, reply2, 32, 0); + assert (rc == 32); + assert (memcmp (reply2, content2, 32) == 0); + + rc = zmq_close (sc); + assert (rc == 0); + + rc = zmq_close (sb); + assert (rc == 0); + + rc = zmq_close (sc2); + assert (rc == 0); + + rc = zmq_close (sb2); + assert (rc == 0); + + rc = zmq_term (ctx); + assert (rc == 0); + + return 0 ; +} -- 1.7.4.4
_______________________________________________ zeromq-dev mailing list [email protected] http://lists.zeromq.org/mailman/listinfo/zeromq-dev
