On 11/24/2014 8:08 PM, Arnaud Kapp wrote:
Currently, the patch is written for 3.2.4. I'll wait to put it in libzmq master
The first patch for 3.2.4 had an issue in zmq_poll(), I had tried a little too aggressive optimization by bypassing the "first_pass" processing. It is fixed in the current one.
The patch for the current head is clean.
Cheers,
Francis

Oh okay. This is the commit that added the flag:
https://github.com/zeromq/libzmq/commit/779c37abc433cb6595ddeedaf86b280317656bdd

libzmq was 4.1 at the time I believe.

I'll probably look at it this week-end then :)

On Mon, Nov 24, 2014 at 12:10 PM, Francis Le Bourse
<[email protected]> wrote:
Hi,
On 11/24/2014 11:35 AM, Arnaud Kapp wrote:
Hello,

I recently added support for POLLPRI flag.
It looks like it's not handled in your patch
No, it isn't handled. In which version do you have added this flag ?
Currently, the patch is written for 3.2.4. I'll wait to put it in libzmq
master.
   and that it needs custom
support. Since there is no test related to this flags you wouldn't
notice.

I can give it a look if you want.
That would be nice.

Cheers,
Francis


On Sat, Nov 22, 2014 at 2:16 PM, Pieter Hintjens <[email protected]> wrote:
I suggest you send the patch to libzmq master, and ensure all test
cases pass. Then we can get this into the next version.

On Fri, Nov 21, 2014 at 2:50 PM, Francis Le Bourse
<[email protected]> wrote:
Hi,

On 11/6/2014 3:18 PM, Pieter Hintjens wrote:
Oh, ok. Sounds like you have a good candidate for some before/after
measurement and optimization. Are you going to try to make a patch for
this?
I have a patch candidate for this optimization, the performance
improvement
is very good and it doesn't seem to introduce any new instability.
What is modified:
      - zmq_poll(), there is only one poll() now,
      - and epoll() from epoll.cpp
Other calls to poll() and select() are left unmodified.

I woulld be happy to have any feedback.


Cheers,
Francis

On Thu, Nov 6, 2014 at 2:09 PM, Francis Le Bourse
<[email protected]> wrote:
On 11/6/2014 11:47 AM, Pieter Hintjens wrote:
A simple optimization is, when you are polling sockets for input, to
continue reading from an active socket using a non-blocking read. So
you process all waiting messages on a socket and then only switch
back
to poll when needed.
Thank you for you quick reply.

Yes, but the question was more about the zmq_poll() internals.
For 600+ file descriptors, zmq_poll() calls poll() a huge number of
times
for only a few that will trigger a POLLIN and the relevant information
is
already known / present in the pollfds array. The performance hit is
there.

Cheers,
Francis



On Thu, Nov 6, 2014 at 11:28 AM, Francis Le Bourse
<[email protected]> wrote:
Hi,

I am looking at a performance issue in zmq, when the number of
zsockets
/
file descriptors becomes large.
The relevant calls are:
        poll+0x57
        zmq_poll+0x2e3
        zloop_start+0x1e8
        main+0xb40
        __libc_start_main+0xfd
immediately followed by a loop of
        poll+0x57
        zmq::signaler_t::wait(int)+0x33
        zmq::mailbox_t::recv(zmq::command_t*, int)+0x78
        zmq::socket_base_t::process_commands(int, bool)+0xbe
        zmq::socket_base_t::getsockopt(int, void*, unsigned
long*)+0x135
        zmq_getsockopt+0x75
        zmq_poll+0x3da
        zloop_start+0x1e8
        main+0xb40
        __libc_start_main+0xfd

The code in the loop is executed once per file descriptor in the
initial
pollarray, redoing a poll() system call each time.
Is there a reason to proceed that way ?
Would be possible to reuse the results of the first poll() in order
to
bypass the second set of system calls ?

Cheers,
Francis


_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev






diff -urwB zeromq-3.2.4-interm/src/ctx.cpp zeromq-3.2.4/src/ctx.cpp
--- zeromq-3.2.4-interm/src/ctx.cpp     2014-11-14 11:50:34.679645346 +0100
+++ zeromq-3.2.4/src/ctx.cpp    2014-11-10 14:46:33.092394320 +0100
@@ -29,6 +29,8 @@
 #include <new>
 #include <string.h>
 
+#include <poll.h>
+
 #include "ctx.hpp"
 #include "socket_base.hpp"
 #include "io_thread.hpp"
diff -urwB zeromq-3.2.4-interm/src/epoll.cpp zeromq-3.2.4/src/epoll.cpp
--- zeromq-3.2.4-interm/src/epoll.cpp   2012-11-13 13:39:41.000000000 +0100
+++ zeromq-3.2.4/src/epoll.cpp  2014-11-20 13:19:52.958137819 +0100
@@ -29,6 +29,8 @@
 #include <algorithm>
 #include <new>
 
+#include <poll.h>
+
 #include "epoll.hpp"
 #include "err.hpp"
 #include "config.hpp"
@@ -148,18 +150,38 @@
         for (int i = 0; i < n; i ++) {
             poll_entry_t *pe = ((poll_entry_t*) ev_buf [i].data.ptr);
 
+           uint32_t events = ev_buf[i].events;
+           ev_buf[i].events = 0;
+           struct pollfd pfd;
+           pfd.fd = pe->fd;
+           pfd.events = 0;
+           if(pe->ev.events & EPOLLIN)
+               pfd.events |= POLLIN;
+           if(pe->ev.events & EPOLLOUT)
+               pfd.events |= POLLOUT;
+           pfd.revents = 0;
+           
+           if(events & EPOLLIN)
+               pfd.revents |= POLLIN;
+           if(events & EPOLLOUT)
+               pfd.revents |= POLLOUT;
+           if(events & EPOLLERR)
+               pfd.revents |= POLLERR;
+           if(events & EPOLLHUP)
+               pfd.revents |= POLLHUP;
+           
             if (pe->fd == retired_fd)
                 continue;
-            if (ev_buf [i].events & (EPOLLERR | EPOLLHUP))
+            if (events & (EPOLLERR | EPOLLHUP))
                 pe->events->in_event ();
             if (pe->fd == retired_fd)
                continue;
-            if (ev_buf [i].events & EPOLLOUT)
+            if (events & EPOLLOUT)
                 pe->events->out_event ();
             if (pe->fd == retired_fd)
                 continue;
-            if (ev_buf [i].events & EPOLLIN)
-                pe->events->in_event ();
+            if (events & EPOLLIN)
+               pe->events->in_event_ex (&pfd);
         }
 
         //  Destroy retired event sources.
diff -urwB zeromq-3.2.4-interm/src/i_poll_events.hpp 
zeromq-3.2.4/src/i_poll_events.hpp
--- zeromq-3.2.4-interm/src/i_poll_events.hpp   2012-10-15 05:58:09.000000000 
+0200
+++ zeromq-3.2.4/src/i_poll_events.hpp  2014-11-13 11:45:22.509276876 +0100
@@ -22,6 +22,8 @@
 #ifndef __ZMQ_I_POLL_EVENTS_HPP_INCLUDED__
 #define __ZMQ_I_POLL_EVENTS_HPP_INCLUDED__
  
+#include <poll.h>
+
 namespace zmq
 {
  
@@ -34,6 +36,7 @@
  
         // Called by I/O thread when file descriptor is ready for reading.
         virtual void in_event () = 0;
+        virtual void in_event_ex (pollfd *) = 0;
  
         // Called by I/O thread when file descriptor is ready for writing.
         virtual void out_event () = 0;
diff -urwB zeromq-3.2.4-interm/src/io_object.cpp zeromq-3.2.4/src/io_object.cpp
--- zeromq-3.2.4-interm/src/io_object.cpp       2012-10-15 05:58:09.000000000 
+0200
+++ zeromq-3.2.4/src/io_object.cpp      2014-11-13 12:12:41.171976993 +0100
@@ -97,6 +97,11 @@
     zmq_assert (false);
 }
 
+void zmq::io_object_t::in_event_ex (pollfd *pfd)
+{
+    zmq_assert (false);
+}
+
 void zmq::io_object_t::out_event ()
 {
     zmq_assert (false);
diff -urwB zeromq-3.2.4-interm/src/io_object.hpp zeromq-3.2.4/src/io_object.hpp
--- zeromq-3.2.4-interm/src/io_object.hpp       2012-10-15 05:58:09.000000000 
+0200
+++ zeromq-3.2.4/src/io_object.hpp      2014-11-13 11:42:11.089118102 +0100
@@ -65,6 +65,7 @@
 
         //  i_poll_events interface implementation.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
         void timer_event (int id_);
 
diff -urwB zeromq-3.2.4-interm/src/io_thread.cpp zeromq-3.2.4/src/io_thread.cpp
--- zeromq-3.2.4-interm/src/io_thread.cpp       2012-10-15 05:58:09.000000000 
+0200
+++ zeromq-3.2.4/src/io_thread.cpp      2014-11-14 11:57:40.836245798 +0100
@@ -79,6 +79,23 @@
     errno_assert (rc != 0 && errno == EAGAIN);
 }
 
+void zmq::io_thread_t::in_event_ex (pollfd *pfd)
+{
+    //  TODO: Do we want to limit number of commands I/O thread can
+    //  process in a single go?
+
+    command_t cmd;
+    int rc = mailbox.recv_ex (&cmd, 0, pfd); // will clear POLLIN
+    
+    while (rc == 0 || errno == EINTR) {
+        if (rc == 0)
+            cmd.destination->process_command (cmd);
+        rc = mailbox.recv_ex (&cmd, 0, pfd);
+    }
+
+    errno_assert (rc != 0 && errno == EAGAIN);
+}
+
 void zmq::io_thread_t::out_event ()
 {
     //  We are never polling for POLLOUT here. This function is never called.
diff -urwB zeromq-3.2.4-interm/src/io_thread.hpp zeromq-3.2.4/src/io_thread.hpp
--- zeromq-3.2.4-interm/src/io_thread.hpp       2012-10-15 05:58:09.000000000 
+0200
+++ zeromq-3.2.4/src/io_thread.hpp      2014-11-13 13:03:40.680393409 +0100
@@ -59,6 +59,7 @@
 
         //  i_poll_events implementation.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
         void timer_event (int id_);
 
diff -urwB zeromq-3.2.4-interm/src/ipc_connecter.cpp 
zeromq-3.2.4/src/ipc_connecter.cpp
--- zeromq-3.2.4-interm/src/ipc_connecter.cpp   2012-11-23 08:35:33.000000000 
+0100
+++ zeromq-3.2.4/src/ipc_connecter.cpp  2014-11-13 12:57:10.186264608 +0100
@@ -100,6 +100,14 @@
     out_event ();
 }
 
+void zmq::ipc_connecter_t::in_event_ex (pollfd *pfd)
+{
+    //  We are not polling for incomming data, so we are actually called
+    //  because of error here. However, we can get error on out event as well
+    //  on some platforms, so we'll simply handle both events in the same way.
+    out_event ();
+}
+
 void zmq::ipc_connecter_t::out_event ()
 {
     fd_t fd = connect ();
diff -urwB zeromq-3.2.4-interm/src/ipc_connecter.hpp 
zeromq-3.2.4/src/ipc_connecter.hpp
--- zeromq-3.2.4-interm/src/ipc_connecter.hpp   2012-10-15 05:58:09.000000000 
+0200
+++ zeromq-3.2.4/src/ipc_connecter.hpp  2014-11-13 12:55:54.156407616 +0100
@@ -59,6 +59,7 @@
 
         //  Handlers for I/O events.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
         void timer_event (int id_);
 
diff -urwB zeromq-3.2.4-interm/src/ipc_listener.cpp 
zeromq-3.2.4/src/ipc_listener.cpp
--- zeromq-3.2.4-interm/src/ipc_listener.cpp    2012-11-23 08:35:52.000000000 
+0100
+++ zeromq-3.2.4/src/ipc_listener.cpp   2014-11-13 12:59:05.723526546 +0100
@@ -99,6 +99,11 @@
     socket->event_accepted (endpoint, fd);
 }
 
+void zmq::ipc_listener_t::in_event_ex (pollfd *pfd)
+{
+    zmq::ipc_listener_t::in_event();
+}
+
 int zmq::ipc_listener_t::get_address (std::string &addr_)
 {
     struct sockaddr_storage ss;
diff -urwB zeromq-3.2.4-interm/src/ipc_listener.hpp 
zeromq-3.2.4/src/ipc_listener.hpp
--- zeromq-3.2.4-interm/src/ipc_listener.hpp    2012-10-15 05:58:09.000000000 
+0200
+++ zeromq-3.2.4/src/ipc_listener.hpp   2014-11-13 12:57:44.364745769 +0100
@@ -60,6 +60,7 @@
 
         //  Handlers for I/O events.
         void in_event ();
+        void in_event_ex (pollfd *);
 
         //  Close the listening socket.
         int close ();
diff -urwB zeromq-3.2.4-interm/src/mailbox.cpp zeromq-3.2.4/src/mailbox.cpp
--- zeromq-3.2.4-interm/src/mailbox.cpp 2013-05-01 05:30:36.000000000 +0200
+++ zeromq-3.2.4/src/mailbox.cpp        2014-11-13 11:41:41.849549736 +0100
@@ -19,6 +19,10 @@
     along with this program.  If not, see <http://www.gnu.org/licenses/>.
 */
 
+#if defined ZMQ_POLL_BASED_ON_POLL
+#include <poll.h>
+#endif
+
 #include "mailbox.hpp"
 #include "err.hpp"
 
@@ -85,3 +89,31 @@
     return 0;
 }
 
+int zmq::mailbox_t::recv_ex (command_t *cmd_, int timeout_, pollfd *pfd_)
+{
+    //  Try to get the command straight away.
+    if (active) {
+        bool ok = cpipe.read (cmd_);
+        if (ok)
+            return 0;
+
+        //  If there are no more commands available, switch into passive state.
+        active = false;
+        signaler.recv ();
+    }
+
+    //  Wait for signal from the command sender.
+    int rc = signaler.wait_ex (timeout_, pfd_);
+    if (rc != 0 && (errno == EAGAIN || errno == EINTR))
+        return -1;
+
+    //  We've got the signal. Now we can switch into active state.
+    active = true;
+
+    //  Get a command.
+    errno_assert (rc == 0);
+    bool ok = cpipe.read (cmd_);
+    zmq_assert (ok);
+    return 0;
+}
+
diff -urwB zeromq-3.2.4-interm/src/mailbox.hpp zeromq-3.2.4/src/mailbox.hpp
--- zeromq-3.2.4-interm/src/mailbox.hpp 2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/mailbox.hpp        2014-11-13 11:41:16.399927840 +0100
@@ -24,6 +24,8 @@
 
 #include <stddef.h>
 
+#include <poll.h>
+
 #include "platform.hpp"
 #include "signaler.hpp"
 #include "fd.hpp"
@@ -45,6 +47,7 @@
         fd_t get_fd ();
         void send (const command_t &cmd_);
         int recv (command_t *cmd_, int timeout_);
+        int recv_ex (command_t *cmd_, int timeout_, pollfd *pfd_);
         
     private:
 
diff -urwB zeromq-3.2.4-interm/src/pgm_receiver.cpp 
zeromq-3.2.4/src/pgm_receiver.cpp
--- zeromq-3.2.4-interm/src/pgm_receiver.cpp    2012-10-15 05:58:09.000000000 
+0200
+++ zeromq-3.2.4/src/pgm_receiver.cpp   2014-11-13 13:01:34.993282854 +0100
@@ -262,6 +262,11 @@
     session->flush ();
 }
 
+void zmq::pgm_receiver_t::in_event_ex (pollfd *pfd)
+{
+    zmq::pgm_receiver_t::in_event();
+}
+
 void zmq::pgm_receiver_t::timer_event (int token)
 {
     zmq_assert (token == rx_timer_id);
diff -urwB zeromq-3.2.4-interm/src/pgm_receiver.hpp 
zeromq-3.2.4/src/pgm_receiver.hpp
--- zeromq-3.2.4-interm/src/pgm_receiver.hpp    2012-10-15 05:58:09.000000000 
+0200
+++ zeromq-3.2.4/src/pgm_receiver.hpp   2014-11-13 12:59:45.474928619 +0100
@@ -65,6 +65,7 @@
 
         //  i_poll_events interface implementation.
         void in_event ();
+        void in_event_ex (pollfd *);
         void timer_event (int token);
 
     private:
diff -urwB zeromq-3.2.4-interm/src/pgm_sender.cpp 
zeromq-3.2.4/src/pgm_sender.cpp
--- zeromq-3.2.4-interm/src/pgm_sender.cpp      2012-10-15 05:58:09.000000000 
+0200
+++ zeromq-3.2.4/src/pgm_sender.cpp     2014-11-13 13:03:08.792872122 +0100
@@ -153,6 +153,11 @@
     }
 }
 
+void zmq::pgm_sender_t::in_event_ex (pollfd *pfd)
+{
+    zmq::pgm_sender_t::in_event ();
+}
+
 void zmq::pgm_sender_t::out_event ()
 {
     //  POLLOUT event from send socket. If write buffer is empty, 
diff -urwB zeromq-3.2.4-interm/src/pgm_sender.hpp 
zeromq-3.2.4/src/pgm_sender.hpp
--- zeromq-3.2.4-interm/src/pgm_sender.hpp      2012-10-15 05:58:09.000000000 
+0200
+++ zeromq-3.2.4/src/pgm_sender.hpp     2014-11-13 13:02:07.983786701 +0100
@@ -63,6 +63,7 @@
 
         //  i_poll_events interface implementation.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
         void timer_event (int token);
 
diff -urwB zeromq-3.2.4-interm/src/reaper.cpp zeromq-3.2.4/src/reaper.cpp
--- zeromq-3.2.4-interm/src/reaper.cpp  2014-03-11 12:09:15.000000000 +0100
+++ zeromq-3.2.4/src/reaper.cpp 2014-11-13 11:44:15.547267347 +0100
@@ -73,6 +73,24 @@
     }
 }
 
+void zmq::reaper_t::in_event_ex (pollfd *pfd)
+{
+    while (true) {
+
+        //  Get the next command. If there is none, exit.
+        command_t cmd;
+        int rc = mailbox.recv_ex (&cmd, 0, pfd);
+        if (rc != 0 && errno == EINTR)
+            continue;
+        if (rc != 0 && errno == EAGAIN)
+            break;
+        errno_assert (rc == 0);
+
+        //  Process the command.
+        cmd.destination->process_command (cmd);
+    }
+}
+
 void zmq::reaper_t::out_event ()
 {
     zmq_assert (false);
diff -urwB zeromq-3.2.4-interm/src/reaper.hpp zeromq-3.2.4/src/reaper.hpp
--- zeromq-3.2.4-interm/src/reaper.hpp  2012-10-15 05:58:09.000000000 +0200
+++ zeromq-3.2.4/src/reaper.hpp 2014-11-13 11:44:26.167111581 +0100
@@ -46,6 +46,7 @@
 
         //  i_poll_events implementation.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
         void timer_event (int id_);
 
diff -urwB zeromq-3.2.4-interm/src/signaler.cpp zeromq-3.2.4/src/signaler.cpp
--- zeromq-3.2.4-interm/src/signaler.cpp        2014-03-12 15:51:34.000000000 
+0100
+++ zeromq-3.2.4/src/signaler.cpp       2014-11-10 15:54:40.453118275 +0100
@@ -193,6 +193,76 @@
 #endif
 }
 
+int zmq::signaler_t::wait_ex (int timeout_, pollfd *pfd_)
+{
+#ifdef ZMQ_SIGNALER_WAIT_BASED_ON_POLL
+
+    struct pollfd pfd;
+    pfd.fd = r;
+    pfd.events = POLLIN;
+    if(timeout_)
+    {
+       int rc = poll (&pfd, 1, timeout_);
+       if (unlikely (rc < 0)) {
+           errno_assert (errno == EINTR);
+           return -1;
+       }
+       else if (unlikely (rc == 0)) {
+           errno = EAGAIN;
+           return -1;
+       }
+       zmq_assert (rc == 1);
+       zmq_assert (pfd.revents & POLLIN);
+       return 0;
+    }
+    else
+    {
+       if((pfd_->revents & POLLIN) == 0)
+       {
+           // simulate EAGAIN
+           errno = EAGAIN;
+           return -1;
+       }
+       zmq_assert (pfd_->revents & POLLIN);
+       // MUST clear POLLIN
+       pfd_->revents &= ~POLLIN;
+       return 0;
+    }
+    
+#elif defined ZMQ_SIGNALER_WAIT_BASED_ON_SELECT
+
+    fd_set fds;
+    FD_ZERO (&fds);
+    FD_SET (r, &fds);
+    struct timeval timeout;
+    if (timeout_ >= 0) {
+        timeout.tv_sec = timeout_ / 1000;
+        timeout.tv_usec = timeout_ % 1000 * 1000;
+    }
+#ifdef ZMQ_HAVE_WINDOWS
+    int rc = select (0, &fds, NULL, NULL,
+        timeout_ >= 0 ? &timeout : NULL);
+    wsa_assert (rc != SOCKET_ERROR);
+#else
+    int rc = select (r + 1, &fds, NULL, NULL,
+        timeout_ >= 0 ? &timeout : NULL);
+    if (unlikely (rc < 0)) {
+        errno_assert (errno == EINTR);
+        return -1;
+    }
+#endif
+    if (unlikely (rc == 0)) {
+        errno = EAGAIN;
+        return -1;
+    }
+    zmq_assert (rc == 1);
+    return 0;
+
+#else
+#error
+#endif
+}
+
 void zmq::signaler_t::recv ()
 {
     //  Attempt to read a signal.
diff -urwB zeromq-3.2.4-interm/src/signaler.hpp zeromq-3.2.4/src/signaler.hpp
--- zeromq-3.2.4-interm/src/signaler.hpp        2012-10-15 05:58:09.000000000 
+0200
+++ zeromq-3.2.4/src/signaler.hpp       2014-11-10 14:46:59.063018398 +0100
@@ -21,6 +21,8 @@
 #ifndef __ZMQ_SIGNALER_HPP_INCLUDED__
 #define __ZMQ_SIGNALER_HPP_INCLUDED__
 
+#include <poll.h>
+
 #include "fd.hpp"
 
 namespace zmq
@@ -41,6 +43,7 @@
         fd_t get_fd ();
         void send ();
         int wait (int timeout_);
+        int wait_ex (int timeout_, pollfd *pfd_);
         void recv ();
         
     private:
diff -urwB zeromq-3.2.4-interm/src/socket_base.cpp 
zeromq-3.2.4/src/socket_base.cpp
--- zeromq-3.2.4-interm/src/socket_base.cpp     2014-03-11 12:08:49.000000000 
+0100
+++ zeromq-3.2.4/src/socket_base.cpp    2014-11-14 12:03:58.872633507 +0100
@@ -39,6 +39,8 @@
 #include <unistd.h>
 #endif
 
+#include <poll.h>
+
 #include "socket_base.hpp"
 #include "tcp_listener.hpp"
 #include "ipc_listener.hpp"
@@ -303,6 +305,38 @@
     return options.getsockopt (option_, optval_, optvallen_);
 }
 
+// only for ZMQ_EVENTS
+int zmq::socket_base_t::getsockopt_ex (int option_, void *optval_,
+                                      size_t *optvallen_, pollfd *pfd_)
+{
+    if (unlikely (ctx_terminated)) {
+        errno = ETERM;
+        return -1;
+    }
+
+    assert (option_ == ZMQ_EVENTS);
+
+    if (option_ == ZMQ_EVENTS) {
+        if (*optvallen_ < sizeof (int)) {
+            errno = EINVAL;
+            return -1;
+        }
+        int rc = process_commands_ex (0, false, pfd_);
+        if (rc != 0 && (errno == EINTR || errno == ETERM))
+            return -1;
+        errno_assert (rc == 0);
+        *((int*) optval_) = 0;
+        if (has_out ())
+            *((int*) optval_) |= ZMQ_POLLOUT;
+        if (has_in ())
+            *((int*) optval_) |= ZMQ_POLLIN;
+        *optvallen_ = sizeof (int);
+        return 0;
+    }
+
+    return 0;
+}
+
 int zmq::socket_base_t::bind (const char *addr_)
 {
     if (unlikely (ctx_terminated)) {
@@ -872,6 +906,62 @@
     return 0;
 }
 
+int zmq::socket_base_t::process_commands_ex (int timeout_, bool throttle_, 
pollfd *pfd_)
+{
+    int rc;
+    command_t cmd;
+    if (timeout_ != 0) {
+
+        //  If we are asked to wait, simply ask mailbox to wait.
+        rc = mailbox.recv_ex (&cmd, timeout_, pfd_);
+    }
+    else {
+
+        //  If we are asked not to wait, check whether we haven't processed
+        //  commands recently, so that we can throttle the new commands.
+
+        //  Get the CPU's tick counter. If 0, the counter is not available.
+        uint64_t tsc = zmq::clock_t::rdtsc ();
+
+        //  Optimised version of command processing - it doesn't have to check
+        //  for incoming commands each time. It does so only if certain time
+        //  elapsed since last command processing. Command delay varies
+        //  depending on CPU speed: It's ~1ms on 3GHz CPU, ~2ms on 1.5GHz CPU
+        //  etc. The optimisation makes sense only on platforms where getting
+        //  a timestamp is a very cheap operation (tens of nanoseconds).
+        if (tsc && throttle_) {
+
+            //  Check whether TSC haven't jumped backwards (in case of 
migration
+            //  between CPU cores) and whether certain time have elapsed since
+            //  last command processing. If it didn't do nothing.
+            if (tsc >= last_tsc && tsc - last_tsc <= max_command_delay)
+                return 0;
+            last_tsc = tsc;
+        }
+
+        //  Check whether there are any commands pending for this thread.
+        rc = mailbox.recv_ex (&cmd, 0, pfd_);
+    }
+
+    //  Process all available commands.
+    while (rc == 0) {
+        cmd.destination->process_command (cmd);
+        rc = mailbox.recv_ex (&cmd, 0, pfd_);
+    }
+
+    if (errno == EINTR)
+        return -1;
+
+    zmq_assert (errno == EAGAIN);
+
+    if (ctx_terminated) {
+        errno = ETERM;
+        return -1;
+    }
+
+    return 0;
+}
+
 void zmq::socket_base_t::process_stop ()
 {
     //  Here, someone have called zmq_term while the socket was still alive.
@@ -960,6 +1050,16 @@
     check_destroy ();
 }
 
+void zmq::socket_base_t::in_event_ex (pollfd *pfd)
+{
+    //  This function is invoked only once the socket is running in the context
+    //  of the reaper thread. Process any commands from other threads/sockets
+    //  that may be available at the moment. Ultimately, the socket will
+    //  be destroyed.
+    process_commands (0, false);
+    check_destroy ();
+}
+
 void zmq::socket_base_t::out_event ()
 {
     zmq_assert (false);
diff -urwB zeromq-3.2.4-interm/src/socket_base.hpp 
zeromq-3.2.4/src/socket_base.hpp
--- zeromq-3.2.4-interm/src/socket_base.hpp     2014-04-09 11:14:26.000000000 
+0200
+++ zeromq-3.2.4/src/socket_base.hpp    2014-11-13 11:42:15.699047646 +0100
@@ -77,6 +77,7 @@
         //  Interface for communication with the API layer.
         int setsockopt (int option_, const void *optval_, size_t optvallen_);
         int getsockopt (int option_, void *optval_, size_t *optvallen_);
+        int getsockopt_ex (int option_, void *optval_, size_t *optvallen_, 
pollfd *pfd_);
         int bind (const char *addr_);
         int connect (const char *addr_);
         int term_endpoint (const char *addr_);
@@ -96,6 +97,7 @@
         //  i_poll_events implementation. This interface is used when socket
         //  is handled by the poller in the reaper thread.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
         void timer_event (int id_);
 
@@ -210,6 +212,7 @@
         //  If throttle argument is true, commands are processed at most once
         //  in a predefined time period.
         int process_commands (int timeout_, bool throttle_);
+        int process_commands_ex (int timeout_, bool throttle_, pollfd *pfd_);
 
         //  Handlers for incoming commands.
         void process_stop ();
diff -urwB zeromq-3.2.4-interm/src/stream_engine.cpp 
zeromq-3.2.4/src/stream_engine.cpp
--- zeromq-3.2.4-interm/src/stream_engine.cpp   2014-04-25 11:53:51.000000000 
+0200
+++ zeromq-3.2.4/src/stream_engine.cpp  2014-11-13 15:29:55.984958203 +0100
@@ -244,6 +245,11 @@
     }
 }
 
+void zmq::stream_engine_t::in_event_ex (pollfd *pfd)
+{
+    zmq::stream_engine_t::in_event();
+}
+
 void zmq::stream_engine_t::out_event ()
 {
     //  If write buffer is empty, try to read new data from the encoder.
diff -urwB zeromq-3.2.4-interm/src/stream_engine.hpp 
zeromq-3.2.4/src/stream_engine.hpp
--- zeromq-3.2.4-interm/src/stream_engine.hpp   2013-02-01 10:03:55.000000000 
+0100
+++ zeromq-3.2.4/src/stream_engine.hpp  2014-11-13 15:27:26.236236302 +0100
@@ -62,6 +62,7 @@
 
         //  i_poll_events interface implementation.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
 
     private:
diff -urwB zeromq-3.2.4-interm/src/tcp_connecter.cpp 
zeromq-3.2.4/src/tcp_connecter.cpp
--- zeromq-3.2.4-interm/src/tcp_connecter.cpp   2013-05-02 20:41:25.000000000 
+0200
+++ zeromq-3.2.4/src/tcp_connecter.cpp  2014-11-13 13:05:15.161971274 +0100
@@ -110,6 +110,14 @@
     out_event ();
 }
 
+void zmq::tcp_connecter_t::in_event_ex (pollfd *pfd)
+{
+    //  We are not polling for incomming data, so we are actually called
+    //  because of error here. However, we can get error on out event as well
+    //  on some platforms, so we'll simply handle both events in the same way.
+    out_event ();
+}
+
 void zmq::tcp_connecter_t::out_event ()
 {
     fd_t fd = connect ();
diff -urwB zeromq-3.2.4-interm/src/tcp_connecter.hpp 
zeromq-3.2.4/src/tcp_connecter.hpp
--- zeromq-3.2.4-interm/src/tcp_connecter.hpp   2012-10-15 05:58:09.000000000 
+0200
+++ zeromq-3.2.4/src/tcp_connecter.hpp  2014-11-13 13:04:30.041651724 +0100
@@ -57,6 +57,7 @@
 
         //  Handlers for I/O events.
         void in_event ();
+        void in_event_ex (pollfd *);
         void out_event ();
         void timer_event (int id_);
 
diff -urwB zeromq-3.2.4-interm/src/tcp_listener.cpp 
zeromq-3.2.4/src/tcp_listener.cpp
--- zeromq-3.2.4-interm/src/tcp_listener.cpp    2012-11-23 08:38:26.000000000 
+0100
+++ zeromq-3.2.4/src/tcp_listener.cpp   2014-11-13 13:07:09.469256737 +0100
@@ -111,6 +111,11 @@
     socket->event_accepted (endpoint, fd);
 }
 
+void zmq::tcp_listener_t::in_event_ex (pollfd *pfd)
+{
+    zmq::tcp_listener_t::in_event ();
+}
+
 void zmq::tcp_listener_t::close ()
 {
     zmq_assert (s != retired_fd);
diff -urwB zeromq-3.2.4-interm/src/tcp_listener.hpp 
zeromq-3.2.4/src/tcp_listener.hpp
--- zeromq-3.2.4-interm/src/tcp_listener.hpp    2012-10-15 05:58:09.000000000 
+0200
+++ zeromq-3.2.4/src/tcp_listener.hpp   2014-11-13 13:05:37.060644451 +0100
@@ -57,6 +57,7 @@
 
         //  Handlers for I/O events.
         void in_event ();
+        void in_event_ex (pollfd *);
 
         //  Close the listening socket.
         void close ();
diff -urwB zeromq-3.2.4-interm/src/zmq.cpp zeromq-3.2.4/src/zmq.cpp
--- zeromq-3.2.4-interm/src/zmq.cpp     2014-04-09 14:39:35.000000000 +0200
+++ zeromq-3.2.4/src/zmq.cpp    2014-11-20 13:21:23.465769599 +0100
@@ -264,6 +265,17 @@
     return result;
 }
 
+int zmq_getsockopt_ex (void *s_, int option_, void *optval_, size_t 
*optvallen_, pollfd *pfd_)
+{
+    if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
+        errno = ENOTSOCK;
+        return -1;
+    }
+    zmq::socket_base_t *s = (zmq::socket_base_t *) s_;
+    int result = s->getsockopt_ex (option_, optval_, optvallen_, pfd_);
+    return result;
+}
+
 int zmq_getsockopt (void *s_, int option_, void *optval_, size_t *optvallen_)
 {
     if (!s_ || !((zmq::socket_base_t*) s_)->check_tag ()) {
@@ -703,12 +720,15 @@
             //  using the ZMQ_EVENTS socket option.
             if (items_ [i].socket) {
                 size_t zmq_events_size = sizeof (uint32_t);
-                uint32_t zmq_events;
-                if (zmq_getsockopt (items_ [i].socket, ZMQ_EVENTS, &zmq_events,
-                    &zmq_events_size) == -1) {
+                uint32_t zmq_events = 0;
+
+               if (zmq_getsockopt_ex (items_ [i].socket, ZMQ_EVENTS,
+                                      &zmq_events,
+                                      &zmq_events_size, &pollfds [i]) == -1) {
                     free (pollfds);
                     return -1;
                 }
+               
                 if ((items_ [i].events & ZMQ_POLLOUT) &&
                       (zmq_events & ZMQ_POLLOUT))
                     items_ [i].revents |= ZMQ_POLLOUT;
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to