Hello,

I'm very interested in porting my current transfer engine to 0MQ. The
current engine is written in pure BSD sockets and has certain limitations
that would be easily overcome by QMQ's intelligent and versatile design.
However my main concern is performance on very long messages in access of
1MB. The current backbone MT design is the following:


control node (client ) <---> server A--- worker node 1 <---> worker node 1
------ server B

|
|
                                       |------------ worker node 2 <--->
worker node 2 -----------|

|                                                                          |
                                       --------------worker node N <--->
worker node N ----------

So the control client controls whatever task needs to be performed by
submitting requests to a server, the actual work is done by the worker
nodes in each separate thread on the server. The worker nodes are
synchronized across the two servers but they work independently since they
are working on the same task. Each worker node has it's own FD but connect
to the same TCP address and port. The main task of each node is to perform
some transformation on some large data buffer from a buffer pool then push
the finished result to the other server. My current benchmarks gives me
3.5GBytes/s using TCP over the local loop when simply pushing the buffers
without doing any work.

I ran the 0MQ benchmarks local_thr and remote_thr, and the performance is
only 1.5GB/s at best, with large buffers(messages) and lower with small
ones. I'm also concerned looking at the benchmarks for the 10GE test. My
current engine can perform at a steady 1.1GBytes/s with large buffers over
10GE.

I've also tried a modified version of the two benchmarks to try to emulate
the above situation, but the performance is about the same. The modified MT
code is attached.

Is there something else I need to do to get the best performance out of 0MQ
using MT for this work flow engine?
/*
    Copyright (c) 2007-2012 iMatix Corporation
    Copyright (c) 2009-2011 250bpm s.r.o.
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "../include/zmq.h"
#include "../include/zmq_utils.h"
#include <stdio.h>
#include <stdlib.h>

int main (int argc, char *argv [])
{
    const char *bind_to;
    int message_count;
    size_t message_size;
	int	threads;
    void *ctx;
    void *s;
    int rc;
    int i;
    void *watch;
    unsigned long elapsed;
    unsigned long throughput;
    double megabytes;
	void *buf = NULL;

    if (argc != 5) {
        printf ("usage: local_thr <bind-to> <message-size> <message-count> <threads>\n");
        return 1;
    }

    bind_to = argv [1];
    message_size = atoi (argv [2]);
    message_count = atoi (argv [3]);
    threads = atoi (argv [4]);
	
	if( !(buf = malloc( message_size))){ perror("malloc"); return -1;}

    ctx = zmq_init (threads);
    if (!ctx) {
        printf ("error in zmq_init: %s\n", zmq_strerror (errno));
        return -1;
    }

    s = zmq_socket (ctx, ZMQ_PULL);
    if (!s) {
        printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
        return -1;
    }

    //  Add your socket options here.
    //  For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.
	printf("Threads: %d\n", zmq_ctx_get( ctx, ZMQ_IO_THREADS));

    rc = zmq_bind (s, bind_to);
    if (rc != 0) {
        printf ("error in zmq_bind: %s\n", zmq_strerror (errno));
        return -1;
    }

    watch = zmq_stopwatch_start ();

    for (i = 0; i != message_count; i++) {
        rc = zmq_recv (s, buf, message_size, 0);
        if (rc < 0) {
            printf ("error in zmq_recv: %s\n", zmq_strerror (errno));
            return -1;
        }
    }

    elapsed = zmq_stopwatch_stop (watch);
    if (elapsed == 0)
        elapsed = 1;

    throughput = (unsigned long)
        ((double) message_count / (double) elapsed * 1000000);
    megabytes = (double) (throughput * message_size) / 1000000;

    printf ("message size: %d [B]\n", (int) message_size);
    printf ("message count: %d\n", (int) message_count);
    printf ("mean throughput: %d [msg/s]\n", (int) throughput);
    printf ("mean throughput: %.3f [MB/s]\n", (double) megabytes);

    rc = zmq_close (s);
    if (rc != 0) {
        printf ("error in zmq_close: %s\n", zmq_strerror (errno));
        return -1;
    }

    rc = zmq_term (ctx);
    if (rc != 0) {
        printf ("error in zmq_term: %s\n", zmq_strerror (errno));
        return -1;
    }
	
	free( buf);

    return 0;
}
/*
    Copyright (c) 2009-2011 250bpm s.r.o.
    Copyright (c) 2007-2009 iMatix Corporation
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "../include/zmq.h"
#include "../include/zmq_utils.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>

const char *connect_to;
int message_count;
int message_size;
int threads = 1;
int workers = 1;
void *buf = NULL;

static void *worker_routine (void *ctx) {
    
	int rc,i;

    void *s = zmq_socket (ctx, ZMQ_PUSH);

    if (!s) {
        printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
        return NULL;
    }

    //  Add your socket options here.
    //  For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.

    rc = zmq_connect (s, connect_to);
    if (rc != 0) {
        printf ("error in zmq_connect: %s\n", zmq_strerror (errno));
        return NULL;;
    }

    for (i = 0; i != message_count; i++) {

        rc = zmq_send(s, buf, message_size, 0);
        if (rc < 0) {
            printf ("error in zmq_send: %s\n", zmq_strerror (errno));
		return NULL;
        }
    }

    rc = zmq_close (s);
    if (rc != 0) {
        printf ("error in zmq_close: %s\n", zmq_strerror (errno));
		return NULL;
    }
    return NULL;
}

int main (int argc, char *argv [])
{
    void *ctx;
    int rc;
    int i;
	void *p;

    if (argc != 6) {
        printf ("usage: remote_thr <connect-to> <message-size> <zmq-threads> <workers>"
            "<message-count>\n");
        return 1;
    }
    connect_to = argv [1];
    message_size = atoi (argv [2]);
    message_count = atoi (argv [3]);
    threads = atoi (argv [4]);
    workers = atoi (argv [5]);

    ctx = zmq_init( threads);
    if (!ctx) {
        printf ("error in zmq_init: %s\n", zmq_strerror (errno));
        return -1;
    }
	if( !(buf = malloc( message_size))){ perror("malloc"); return -1;}

	printf("Threads: %d, workers %d\n", zmq_ctx_get( ctx, ZMQ_IO_THREADS), workers);
	pthread_t worker[128];        

    for (i = 0; i < workers; i++) {
        pthread_create (&worker[i], NULL, worker_routine, ctx);
		printf("Worker %d spawned\n", i);
    }

    for (i = 0; i < workers; i++) {
        pthread_join( worker[i], &p);
		printf("Worker %d joined\n", i);
    }

    rc = zmq_term (ctx);
    if (rc != 0) {
        printf ("error in zmq_term: %s\n", zmq_strerror (errno));
        return -1;
    }

    return 0;
}
_______________________________________________
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to