OK, so I went back and I fixed a couple of issues and reattached the two modified test programs, added RCV/SND buffer shaping and now it uses zmq_msg_init_data (zero-copy) for better performance. I'm getting about 2.5GB/s avg at best which is a lot better then with remote_thr local_thr but still a 25% less then what I'm expecting at least 3.4GB/s.
When I initiate 4 simultaneous procesess(not threads) for each client and server via separate ports the total does add up to ~3.3GB/s as it should. The trouble is for that to work that way I need to bind 4 ports and the whole point in using accept is to have multiple connections on the same port traditionally. Is there a way to achieve the desired throughput via 0MQ without using separate ports for each socket? I think using multiple connections (via separate threads) on the same ZMQ socket should naturally do it but according to the results it doesn't happen. On Mon, Jan 7, 2013 at 7:16 PM, A. Mark <gougol...@gmail.com> wrote: > Hello, > > I'm very interested in porting my current transfer engine to 0MQ. The > current engine is written in pure BSD sockets and has certain limitations > that would be easily overcome by QMQ's intelligent and versatile design. > However my main concern is performance on very long messages in access of > 1MB. The current backbone MT design is the following: > > > control node (client ) <---> server A--- worker node 1 <---> worker node 1 > ------ server B > > | > | > |------------ worker node 2 <---> > worker node 2 -----------| > > | | > --------------worker node N <---> > worker node N ---------- > > So the control client controls whatever task needs to be performed by > submitting requests to a server, the actual work is done by the worker > nodes in each separate thread on the server. The worker nodes are > synchronized across the two servers but they work independently since they > are working on the same task. Each worker node has it's own FD but connect > to the same TCP address and port. The main task of each node is to perform > some transformation on some large data buffer from a buffer pool then push > the finished result to the other server. My current benchmarks gives me > 3.5GBytes/s using TCP over the local loop when simply pushing the buffers > without doing any work. > > I ran the 0MQ benchmarks local_thr and remote_thr, and the performance is > only 1.5GB/s at best, with large buffers(messages) and lower with small > ones. I'm also concerned looking at the benchmarks for the 10GE test. My > current engine can perform at a steady 1.1GBytes/s with large buffers over > 10GE. > > I've also tried a modified version of the two benchmarks to try to emulate > the above situation, but the performance is about the same. The modified MT > code is attached. > > Is there something else I need to do to get the best performance out of > 0MQ using MT for this work flow engine? > >
/* Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2009 iMatix Corporation Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. 0MQ is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. 0MQ is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. */ #include "zmq.h" #include "zmq_utils.h" #include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <pthread.h> const char *connect_to; int message_count; int message_size; int threads = 1; int workers = 1; void my_free (void *data, void *hint) { //free (data); } static void *worker_routine (void *ctx) { int rc,i; void *buf = NULL; zmq_msg_t msg; if( !(buf = malloc( message_size))){ perror("malloc"); return NULL;} void *s = zmq_socket (ctx, ZMQ_PUSH); if (!s) { printf ("error in zmq_socket: %s\n", zmq_strerror (errno)); return NULL; } // Add your socket options here. // For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM. int sndbuflen; size_t sndbuflenlen = (size_t)sizeof sndbuflen; rc = zmq_getsockopt (s, ZMQ_SNDBUF, &sndbuflen, &sndbuflenlen); if (rc != 0) { printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno)); return NULL; } printf("RCVBUF=%d before\n", sndbuflen); sndbuflen = 1024*1024*32; rc = zmq_setsockopt (s, ZMQ_SNDBUF, &sndbuflen, sndbuflenlen); if (rc != 0) { printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno)); return NULL; } rc = zmq_getsockopt (s, ZMQ_SNDBUF, &sndbuflen, &sndbuflenlen); if (rc != 0) { printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno)); return NULL; } printf("RCVBUF=%d after\n", sndbuflen); rc = zmq_connect (s, connect_to); if (rc != 0) { printf ("error in zmq_connect: %s\n", zmq_strerror (errno)); return NULL; } for (i = 0; i != message_count; i++) { rc = zmq_msg_init_data (&msg, buf, message_size, NULL, NULL); if (rc != 0) { printf ("error in zmq_msg_init_data: %s\n", zmq_strerror (errno)); return NULL; } rc = zmq_msg_send( &msg, s, 0); if (rc < 0) { printf ("error in zmq_send: %s\n", zmq_strerror (errno)); return NULL; } rc = zmq_msg_close (&msg); if (rc != 0) { printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno)); exit (1); } } rc = zmq_close (s); if (rc != 0) { printf ("error in zmq_close: %s\n", zmq_strerror (errno)); return NULL; } free( buf); return NULL; } int main (int argc, char *argv []) { void *ctx; int rc; int i; void *p; if (argc != 6) { printf ("usage: remote_thr <connect-to> <message-size> <zmq-threads> <workers>" "<message-count>\n"); return 1; } connect_to = argv [1]; message_size = atoi (argv [2]); message_count = atoi (argv [3]); threads = atoi (argv [4]); workers = atoi (argv [5]); ctx = zmq_init( threads); if (!ctx) { printf ("error in zmq_init: %s\n", zmq_strerror (errno)); return -1; } printf("Threads: %d, workers %d\n", zmq_ctx_get( ctx, ZMQ_IO_THREADS), workers); pthread_t worker[128]; for (i = 0; i < workers; i++) { pthread_create (&worker[i], NULL, worker_routine, ctx); printf("Worker %d spawned\n", i); } sleep(1); for (i = 0; i < workers; i++) { pthread_join( worker[i], &p); printf("Worker %d joined\n", i); } rc = zmq_term (ctx); if (rc != 0) { printf ("error in zmq_term: %s\n", zmq_strerror (errno)); return -1; } return 0; }
/* Copyright (c) 2007-2012 iMatix Corporation Copyright (c) 2009-2011 250bpm s.r.o. Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file This file is part of 0MQ. 0MQ is free software; you can redistribute it and/or modify it under the terms of the GNU Lesser General Public License as published by the Free Software Foundation; either version 3 of the License, or (at your option) any later version. 0MQ is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. You should have received a copy of the GNU Lesser General Public License along with this program. If not, see <http://www.gnu.org/licenses/>. */ #include "zmq.h" #include "zmq_utils.h" #include <stdio.h> #include <stdlib.h> #include <time.h> #include <limits.h> #include <sys/time.h> typedef struct US_TIMER US_TIMER; struct US_TIMER{ struct timeval time_was; struct timeval time_now; }; /* Records the current timer state */ void tm_init( US_TIMER *t){ if( gettimeofday( &t->time_now, NULL) < 0){ perror( "d_timer_init()");} t->time_was = t->time_now; } /* Returns the time passed in microsecond precision in seconds since last init of timer. */ float tm_secs( US_TIMER *t){ register float seconds; if( gettimeofday( &t->time_now, NULL) < 0){ perror( "d_timer_init()");} seconds = ( ((float)( t->time_now.tv_sec - t->time_was.tv_sec)) + (((float)( t->time_now.tv_usec - t->time_was.tv_usec)) / 1000000.0)); t->time_was = t->time_now; return( seconds); } void my_free (void *data, void *hint) { // free (data); } int main (int argc, char *argv []) { US_TIMER timer; const char *bind_to; int message_count; size_t message_size; int threads; void *ctx; void *s; int rc; int i; void *buf = NULL; if (argc != 5) { printf ("usage: local_thr <bind-to> <message-size> <message-count> <threads>\n"); return 1; } bind_to = argv [1]; message_size = atoi (argv [2]); message_count = atoi (argv [3]); threads = atoi (argv [4]); if( !(buf = malloc( message_size))){ perror("malloc"); return -1;} ctx = zmq_init (threads); if (!ctx) { printf ("error in zmq_init: %s\n", zmq_strerror (errno)); return -1; } s = zmq_socket (ctx, ZMQ_PULL); if (!s) { printf ("error in zmq_socket: %s\n", zmq_strerror (errno)); return -1; } // Add your socket options here. // For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM. int rcvbuflen; size_t rcvbuflenlen = (size_t)sizeof rcvbuflen; rc = zmq_getsockopt (s, ZMQ_RCVBUF, &rcvbuflen, &rcvbuflenlen); if (rc != 0) { printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno)); return -1; } printf("RCVBUF=%d before\n", rcvbuflen); rcvbuflen = 1024*1024*32; rc = zmq_setsockopt (s, ZMQ_RCVBUF, &rcvbuflen, rcvbuflenlen); if (rc != 0) { printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno)); return -1; } rc = zmq_getsockopt (s, ZMQ_RCVBUF, &rcvbuflen, &rcvbuflenlen); if (rc != 0) { printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno)); return -1; } printf("RCVBUF=%d after\n", rcvbuflen); printf("Threads: %d\n", zmq_ctx_get( ctx, ZMQ_IO_THREADS)); rc = zmq_bind (s, bind_to); if (rc != 0) { printf ("error in zmq_bind: %s\n", zmq_strerror (errno)); return -1; } zmq_msg_t msg; rc = zmq_msg_init (&msg); if (rc != 0) { printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno)); return -1; } tm_init( &timer); for (i = 0; i != message_count; i++) { rc = zmq_msg_recv (&msg, s, 0); if (rc < 0) { printf ("error in zmq_recv: %s\n", zmq_strerror (errno)); return -1; } if ((rc = zmq_msg_size (&msg)) != message_size) { printf ("message of incorrect size (%d) received in loop %d\n", rc, i); return -1; } } float secs = tm_secs( &timer); float total = (((float) message_count) * ((float) message_size)) / 1000000000.0; printf ("message size: %d Bytes, time: %f secs\n", (int) message_size, secs); printf ("Through %.1f GB @ %.1f GB/s\n", total, total/secs); rc = zmq_msg_close (&msg); if (rc != 0) { printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno)); return -1; } rc = zmq_close (s); if (rc != 0) { printf ("error in zmq_close: %s\n", zmq_strerror (errno)); return -1; } rc = zmq_term (ctx); if (rc != 0) { printf ("error in zmq_term: %s\n", zmq_strerror (errno)); return -1; } return 0; }
_______________________________________________ zeromq-dev mailing list zeromq-dev@lists.zeromq.org http://lists.zeromq.org/mailman/listinfo/zeromq-dev