OK, so I went back and I fixed a couple of issues and reattached the two
modified test programs, added RCV/SND buffer shaping and now it uses
zmq_msg_init_data (zero-copy) for better performance. I'm getting about
2.5GB/s avg at best which is a lot better then with remote_thr local_thr
but still a 25% less then what I'm expecting at least 3.4GB/s.

When I initiate 4 simultaneous procesess(not threads) for each client and
server via separate ports the total does add up to ~3.3GB/s as it should.
The trouble is for that to work that way I need to bind 4 ports and the
whole point in using accept is to have multiple connections on the same
port traditionally.

Is there a way to achieve the desired throughput via 0MQ without using
separate ports for each socket? I think using multiple connections (via
separate threads) on the same ZMQ socket should naturally do it but
according to the results it doesn't happen.




On Mon, Jan 7, 2013 at 7:16 PM, A. Mark <gougol...@gmail.com> wrote:

> Hello,
>
> I'm very interested in porting my current transfer engine to 0MQ. The
> current engine is written in pure BSD sockets and has certain limitations
> that would be easily overcome by QMQ's intelligent and versatile design.
> However my main concern is performance on very long messages in access of
> 1MB. The current backbone MT design is the following:
>
>
> control node (client ) <---> server A--- worker node 1 <---> worker node 1
> ------ server B
>
> |
> |
>                                        |------------ worker node 2 <--->
> worker node 2 -----------|
>
> |                                                                          |
>                                        --------------worker node N <--->
> worker node N ----------
>
> So the control client controls whatever task needs to be performed by
> submitting requests to a server, the actual work is done by the worker
> nodes in each separate thread on the server. The worker nodes are
> synchronized across the two servers but they work independently since they
> are working on the same task. Each worker node has it's own FD but connect
> to the same TCP address and port. The main task of each node is to perform
> some transformation on some large data buffer from a buffer pool then push
> the finished result to the other server. My current benchmarks gives me
> 3.5GBytes/s using TCP over the local loop when simply pushing the buffers
> without doing any work.
>
> I ran the 0MQ benchmarks local_thr and remote_thr, and the performance is
> only 1.5GB/s at best, with large buffers(messages) and lower with small
> ones. I'm also concerned looking at the benchmarks for the 10GE test. My
> current engine can perform at a steady 1.1GBytes/s with large buffers over
> 10GE.
>
> I've also tried a modified version of the two benchmarks to try to emulate
> the above situation, but the performance is about the same. The modified MT
> code is attached.
>
> Is there something else I need to do to get the best performance out of
> 0MQ using MT for this work flow engine?
>
>
/*
    Copyright (c) 2009-2011 250bpm s.r.o.
    Copyright (c) 2007-2009 iMatix Corporation
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "zmq.h"
#include "zmq_utils.h"
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <pthread.h>

const char *connect_to;
int message_count;
int message_size;
int threads = 1;
int workers = 1;

void my_free (void *data, void *hint)
{
    //free (data);
}

static void *worker_routine (void *ctx) {

	int rc,i;
	void *buf = NULL;
   	zmq_msg_t msg;

	if( !(buf = malloc( message_size))){ perror("malloc"); return NULL;}

    void *s = zmq_socket (ctx, ZMQ_PUSH);

    if (!s) {
        printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
        return NULL;
    }

    //  Add your socket options here.
    //  For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
	int sndbuflen;
	size_t sndbuflenlen = (size_t)sizeof sndbuflen;

	rc = zmq_getsockopt (s, ZMQ_SNDBUF, &sndbuflen, &sndbuflenlen);
    if (rc != 0) {
        printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno));
        return NULL;
    }

	printf("RCVBUF=%d before\n", sndbuflen);

	sndbuflen = 1024*1024*32;
	rc = zmq_setsockopt (s, ZMQ_SNDBUF, &sndbuflen, sndbuflenlen);
    if (rc != 0) {
        printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
        return NULL;
    }

	rc = zmq_getsockopt (s, ZMQ_SNDBUF, &sndbuflen, &sndbuflenlen);
    if (rc != 0) {
        printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno));
        return NULL;
    }

	printf("RCVBUF=%d after\n", sndbuflen);

    rc = zmq_connect (s, connect_to);
    if (rc != 0) {
        printf ("error in zmq_connect: %s\n", zmq_strerror (errno));
        return NULL;
    }

    for (i = 0; i != message_count; i++) {

		rc = zmq_msg_init_data (&msg, buf, message_size, NULL, NULL);
	    if (rc != 0) {
	        printf ("error in zmq_msg_init_data: %s\n", zmq_strerror (errno));
	        return NULL;
	    }

        rc = zmq_msg_send( &msg, s, 0);
        if (rc < 0) {
            printf ("error in zmq_send: %s\n", zmq_strerror (errno));
		return NULL;
        }
        rc = zmq_msg_close (&msg);
        if (rc != 0) {
            printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
            exit (1);
        }
    }

    rc = zmq_close (s);
    if (rc != 0) {
        printf ("error in zmq_close: %s\n", zmq_strerror (errno));
		return NULL;
    }

	free( buf);

    return NULL;
}

int main (int argc, char *argv [])
{
    void *ctx;
    int rc;
    int i;
	void *p;

    if (argc != 6) {
        printf ("usage: remote_thr <connect-to> <message-size> <zmq-threads> <workers>"
            "<message-count>\n");
        return 1;
    }
    connect_to = argv [1];
    message_size = atoi (argv [2]);
    message_count = atoi (argv [3]);
    threads = atoi (argv [4]);
    workers = atoi (argv [5]);

    ctx = zmq_init( threads);
    if (!ctx) {
        printf ("error in zmq_init: %s\n", zmq_strerror (errno));
        return -1;
    }

	printf("Threads: %d, workers %d\n", zmq_ctx_get( ctx, ZMQ_IO_THREADS), workers);
	pthread_t worker[128];

    for (i = 0; i < workers; i++) {
        pthread_create (&worker[i], NULL, worker_routine, ctx);
		printf("Worker %d spawned\n", i);
    }
    sleep(1);
    for (i = 0; i < workers; i++) {
        pthread_join( worker[i], &p);
		printf("Worker %d joined\n", i);
    }

    rc = zmq_term (ctx);
    if (rc != 0) {
        printf ("error in zmq_term: %s\n", zmq_strerror (errno));
        return -1;
    }

    return 0;
}
/*
    Copyright (c) 2007-2012 iMatix Corporation
    Copyright (c) 2009-2011 250bpm s.r.o.
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "zmq.h"
#include "zmq_utils.h"
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <limits.h>
#include <sys/time.h>

typedef struct US_TIMER US_TIMER;

struct US_TIMER{

    struct timeval  time_was;
    struct timeval  time_now;
};
/*  Records the current timer state
*/
void tm_init( US_TIMER *t){

    if( gettimeofday( &t->time_now, NULL) < 0){ perror( "d_timer_init()");}

    t->time_was = t->time_now;

}

/*  Returns the time passed in microsecond precision in seconds since last init
    of timer.
*/
float tm_secs( US_TIMER *t){

    register float seconds;

    if( gettimeofday( &t->time_now, NULL) < 0){ perror( "d_timer_init()");}

    seconds = ( ((float)( t->time_now.tv_sec - t->time_was.tv_sec)) +
             (((float)( t->time_now.tv_usec - t->time_was.tv_usec)) / 1000000.0));

    t->time_was = t->time_now;

    return( seconds);
}

void my_free (void *data, void *hint)
{
   // free (data);
}

int main (int argc, char *argv [])
{
    US_TIMER timer;
    const char *bind_to;
    int message_count;
    size_t message_size;
	int	threads;
    void *ctx;
    void *s;
    int rc;
    int i;
	void *buf = NULL;

    if (argc != 5) {
        printf ("usage: local_thr <bind-to> <message-size> <message-count> <threads>\n");
        return 1;
    }

    bind_to = argv [1];
    message_size = atoi (argv [2]);
    message_count = atoi (argv [3]);
    threads = atoi (argv [4]);

	if( !(buf = malloc( message_size))){ perror("malloc"); return -1;}

    ctx = zmq_init (threads);
    if (!ctx) {
        printf ("error in zmq_init: %s\n", zmq_strerror (errno));
        return -1;
    }

    s = zmq_socket (ctx, ZMQ_PULL);
    if (!s) {
        printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
        return -1;
    }

    //  Add your socket options here.
    //  For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.

	int rcvbuflen;
	size_t rcvbuflenlen = (size_t)sizeof rcvbuflen;

	rc = zmq_getsockopt (s, ZMQ_RCVBUF, &rcvbuflen, &rcvbuflenlen);
    if (rc != 0) {
        printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno));
        return -1;
    }

	printf("RCVBUF=%d before\n", rcvbuflen);

	rcvbuflen = 1024*1024*32;
	rc = zmq_setsockopt (s, ZMQ_RCVBUF, &rcvbuflen, rcvbuflenlen);
    if (rc != 0) {
        printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
        return -1;
    }

	rc = zmq_getsockopt (s, ZMQ_RCVBUF, &rcvbuflen, &rcvbuflenlen);
    if (rc != 0) {
        printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno));
        return -1;
    }

	printf("RCVBUF=%d after\n", rcvbuflen);

	printf("Threads: %d\n", zmq_ctx_get( ctx, ZMQ_IO_THREADS));

    rc = zmq_bind (s, bind_to);
    if (rc != 0) {
        printf ("error in zmq_bind: %s\n", zmq_strerror (errno));
        return -1;
    }

   	zmq_msg_t msg;
    rc = zmq_msg_init (&msg);
    if (rc != 0) {
        printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno));
        return -1;
    }

    tm_init( &timer);

    for (i = 0; i != message_count; i++) {
        rc = zmq_msg_recv (&msg, s, 0);
        if (rc < 0) {
            printf ("error in zmq_recv: %s\n", zmq_strerror (errno));
            return -1;
        }
        if ((rc = zmq_msg_size (&msg)) != message_size) {
            printf ("message of incorrect size (%d) received in loop %d\n", rc, i);
            return -1;
        }
    }

    float secs = tm_secs( &timer);
    float total = (((float) message_count) * ((float) message_size)) / 1000000000.0;

    printf ("message size: %d Bytes, time: %f secs\n", (int) message_size, secs);
    printf ("Through %.1f GB @ %.1f GB/s\n", total, total/secs);

    rc = zmq_msg_close (&msg);
    if (rc != 0) {
        printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
        return -1;
    }

    rc = zmq_close (s);
    if (rc != 0) {
        printf ("error in zmq_close: %s\n", zmq_strerror (errno));
        return -1;
    }

    rc = zmq_term (ctx);
    if (rc != 0) {
        printf ("error in zmq_term: %s\n", zmq_strerror (errno));
        return -1;
    }

    return 0;
}
_______________________________________________
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to