Hi,

I've done some extensive benchmarks using local_thr and remote and tcp in
the last couple of months and found some odd things. This line is slightly
wrong for the throughput calculation ( apart from the overflow):

throughput = (unsigned long)((double) message_count / (double) elapsed *
1000000);

should be at least:

throughput = (unsigned long)((double) message_count / (double) elapsed *
double( 1024*1024));

There are few more things to consider too that will effect performance,
ZMQ_RCVBUF/ZMQ_SNDBUF/ZMQ_SNDHWM/ZMQ_RCVHWM.
Also since ZMQ does buffering on the background :
ZMQ_DELAY_ATTACH_ON_CONNECT can throw off your time measurement. With large
buffers and many messages the programs will actually crash, as ZMQ runs out
of memory queueing messages, I'm not sure if this is the intended behavior,
but setting the HWMs  prevents that problem.

I had posted the corrections and improvements for the two programs earlier
to the mail list. Here is the improved versions of the local_thr and
remote_thr again if you care to try some other options. There is also an mt
version of the server.

Mark


On Mon, Feb 25, 2013 at 4:32 AM, Erik Hugne <erik.hu...@ericsson.com> wrote:

> So, i now realize that the bw figures are actually in _megabit_..
> and shortly after sending, obviously i found the problem.
>
> As i posted on IRC some moments ago:
> [13:33] <haze_> it's an overflow on this line
> [13:33] <haze_> throughput = (unsigned long)((double) message_count /
> (double)
>  elapsed * 1000000);
> [13:33] <haze_> in local_thr
> [13:33] <haze_> more specifically, (double) elapsed*1000000
>
>
> //E
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev@lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
/*
    Copyright (c) 2007-2012 iMatix Corporation
    Copyright (c) 2009-2011 250bpm s.r.o.
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "zmq.h"
#include "zmq_utils.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <limits.h>
#include <sys/time.h>

typedef struct US_TIMER US_TIMER;

struct US_TIMER{

    struct timeval  time_was;
    struct timeval  time_now;
};
/*  Records the current timer state
*/
void tm_init( US_TIMER *t){

    if( gettimeofday( &t->time_now, NULL) < 0){ perror( "d_timer_init()");}

    t->time_was = t->time_now;

}

/*  Returns the time passed in microsecond precision in seconds since last init
    of timer.
*/
float tm_secs( US_TIMER *t){

    register float seconds;

    if( gettimeofday( &t->time_now, NULL) < 0){ perror( "d_timer_init()");}

    seconds = ( ((float)( t->time_now.tv_sec - t->time_was.tv_sec)) +
             (((float)( t->time_now.tv_usec - t->time_was.tv_usec)) / 1000000.0));

    t->time_was = t->time_now;

    return( seconds);
}

const char *bind_to;
int message_count = 1000;
int message_size = 1024;
int threads = 1;
int workers = 1;
int sndbuflen = 128*256;
int rcvbuflen = 128*256;
int flow = ZMQ_PULL;

void my_free (void *data, void *hint)
{
   // free (data);
}

int main (int argc, char *argv [])
{
    US_TIMER timer;
    void *ctx;
    void *s;
    int rc;
    int i;
	void *buf = NULL;

    if (argc != 7) {
        printf ("usage: local_thr <bind-to> <message-size> <message-count> <SND/RCV buffer> <flow (PUSH/PULL)> <threads>\n");
        return 1;
    }

    bind_to = argv [1];
    message_size = atoi (argv [2]);
    message_count = atoi (argv [3]);
    if( !strcmp( argv [5], "PUSH")){
        flow = ZMQ_PUSH;
        rcvbuflen = sndbuflen = atoi (argv [4]);
    }
    if( !strcmp( argv [5], "PULL")){
        flow = ZMQ_PULL;
        sndbuflen = rcvbuflen = atoi (argv [4]);
    }
    threads = atoi (argv [6]);

	if( !(buf = malloc( message_size))){ perror("malloc"); return -1;}

    ctx = zmq_ctx_new ();
    if (!ctx) {
        printf ("error in zmq_ctx_new: %s\n", zmq_strerror (errno));
        return -1;
    }

    rc = zmq_ctx_set ( ctx, ZMQ_IO_THREADS, threads);
    if (rc) {
        printf ("error in zmq_ctx_set: %s\n", zmq_strerror (errno));
        return -1;
    }

    s = zmq_socket (ctx, flow);
    if (!s) {
        printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
        return -1;
    }

    //  Add your socket options here.
    //  For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.

	size_t rcvbuflenlen = (size_t)sizeof rcvbuflen;
	size_t sndbuflenlen = (size_t)sizeof sndbuflen;

	rc = zmq_setsockopt (s, ZMQ_RCVBUF, &rcvbuflen, rcvbuflenlen);
    if (rc != 0) {
        printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
        return -1;
    }
	rc = zmq_setsockopt (s, ZMQ_SNDBUF, &sndbuflen, sndbuflenlen);
    if (rc != 0) {
        printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
        return -1;
    }

	sndbuflen = 1;
	rc = zmq_setsockopt (s, ZMQ_DELAY_ATTACH_ON_CONNECT, &sndbuflen, sndbuflenlen);
    if (rc != 0) {
        printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
        return -1;
    }

	sndbuflen = 2;
	rc = zmq_setsockopt (s, ZMQ_SNDHWM, &sndbuflen, sndbuflenlen);
    if (rc != 0) {
        printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
        return -1;
    }

	sndbuflen = 2;
	rc = zmq_setsockopt (s, ZMQ_RCVHWM, &sndbuflen, sndbuflenlen);
    if (rc != 0) {
        printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
        return -1;
    }


	rc = zmq_getsockopt (s, ZMQ_RCVBUF, &rcvbuflen, &rcvbuflenlen);
    if (rc != 0) {
        printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno));
        return -1;
    }
	rc = zmq_getsockopt (s, ZMQ_SNDBUF, &sndbuflen, &sndbuflenlen);
    if (rc != 0) {
        printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno));
        return -1;
    }

	printf("RCVBUF=%d KB SNDBUF=%d KB adjusted\n", rcvbuflen/1024, sndbuflen/1024);

	printf("Threads: %d\n", zmq_ctx_get( ctx, ZMQ_IO_THREADS));

    rc = zmq_bind (s, bind_to);
    if (rc != 0) {
        printf ("error in zmq_bind: %s\n", zmq_strerror (errno));
        return -1;
    }

	if( !(buf = malloc( message_size))){ perror("malloc"); return -1;}

    tm_init( &timer);

    if( flow == ZMQ_PULL){

/*	DOIT USING ZMQ_MSG
        zmq_msg_t msg;

        rc = zmq_msg_init (&msg);
        if (rc != 0) {
            printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno));
            return -1;
        }
        for (i = 0; i != message_count; i++) {
            rc = zmq_msg_recv (&msg, s, 0);
            if (rc < 0) {
                printf ("error in zmq_recv: %s\n", zmq_strerror (errno));
                return -1;
            }
            /*
            if ((rc = zmq_msg_size (&msg)) != message_size) {
                printf ("message of incorrect size (%d) received in loop %d\n", rc, i);
                return -1;
            }
        }
        rc = zmq_msg_close (&msg);
        if (rc != 0) {
            printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
            return -1;
        }
*/
        for (i = 0; i != message_count; i++) {
            rc = zmq_recv( s, buf, message_size, 0);
            if (rc < 0) {
                printf ("error in zmq_recv: %s\n", zmq_strerror (errno));
                return -1;
            }
        }
    }
    if( flow == ZMQ_PUSH){

        zmq_msg_t msg;

        for (i = 0; i != message_count; i++) {

            rc = zmq_msg_init_data (&msg, buf, message_size, NULL, NULL);
            if (rc != 0) {
                printf ("error in zmq_msg_init_data: %s\n", zmq_strerror (errno));
                return -1;
            }

            rc = zmq_msg_send( &msg, s, 0);
            if (rc < 0) {
                printf ("error in zmq_send: %s\n", zmq_strerror (errno));
                return -1;
            }
/*
            rc = zmq_msg_close (&msg);
            if (rc != 0) {
                printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
               return -1;
            }
            */
        }
    }

    float secs = tm_secs( &timer);
    float total = (((float) message_count) * ((float) message_size)) / (1024.0*1024.0*1024.0);

    printf ("Message size: %d KBytes, time: %f secs\n", (int) message_size/1024, secs);
    printf ("%sed %.3f GB @ %.3f GB/s\n", (flow == ZMQ_PULL) ? "Pull":"Push", total, total/secs);


    rc = zmq_close (s);
    if (rc != 0) {
        printf ("error in zmq_close: %s\n", zmq_strerror (errno));
        return -1;
    }

    rc = zmq_term (ctx);
    if (rc != 0) {
        printf ("error in zmq_term: %s\n", zmq_strerror (errno));
        return -1;
    }

    if( buf) free( buf);

    return 0;
}
/*
    Copyright (c) 2009-2011 250bpm s.r.o.
    Copyright (c) 2007-2009 iMatix Corporation
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "zmq.h"
#include "zmq_utils.h"
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <time.h>
#include <limits.h>
#include <sys/time.h>
#include <pthread.h>

const char *connect_to;
int message_count = 1000;
int message_size = 1024;
int threads = 1;
int workers = 1;
int sndbuflen = 128*256;
int rcvbuflen = 128*256;
int flow = ZMQ_PUSH;

typedef struct US_TIMER US_TIMER;

struct US_TIMER{

    struct timeval  time_was;
    struct timeval  time_now;
};
/*  Records the current timer state
*/
void tm_init( US_TIMER *t){

    if( gettimeofday( &t->time_now, NULL) < 0){ perror( "d_timer_init()");}

    t->time_was = t->time_now;

}

/*  Returns the time passed in microsecond precision in seconds since last init
    of timer.
*/
float tm_secs( US_TIMER *t){

    register float seconds;

    if( gettimeofday( &t->time_now, NULL) < 0){ perror( "d_timer_init()");}

    seconds = ( ((float)( t->time_now.tv_sec - t->time_was.tv_sec)) +
             (((float)( t->time_now.tv_usec - t->time_was.tv_usec)) / 1000000.0));

    t->time_was = t->time_now;

    return( seconds);
}

void my_free (void *data, void *hint)
{
    //free (data);
}

static void *worker_routine (void *ctx) {

	int rc,i;
	void *buf = NULL;

	if( !(buf = malloc( message_size))){ perror("malloc"); return NULL;}

    void *s = zmq_socket (ctx, flow);

    if (!s) {
        printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
        return NULL;
    }

    //  Add your socket options here.
    //  For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.

	size_t rcvbuflenlen = (size_t)sizeof rcvbuflen;
	size_t sndbuflenlen = (size_t)sizeof sndbuflen;

	rc = zmq_setsockopt (s, ZMQ_RCVBUF, &rcvbuflen, rcvbuflenlen);
    if (rc != 0) {
        printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
        return NULL;
    }
	rc = zmq_setsockopt (s, ZMQ_SNDBUF, &sndbuflen, sndbuflenlen);
    if (rc != 0) {
        printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
        return NULL;
    }

	sndbuflen = 1;
	rc = zmq_setsockopt (s, ZMQ_DELAY_ATTACH_ON_CONNECT, &sndbuflen, sndbuflenlen);
    if (rc != 0) {
        printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
        return NULL;
    }

	sndbuflen = 2;
	rc = zmq_setsockopt (s, ZMQ_SNDHWM, &sndbuflen, sndbuflenlen);
    if (rc != 0) {
        printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
        return NULL;
    }

	sndbuflen = 2;
	rc = zmq_setsockopt (s, ZMQ_RCVHWM, &sndbuflen, sndbuflenlen);
    if (rc != 0) {
        printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
        return NULL;
    }

	rc = zmq_getsockopt (s, ZMQ_RCVBUF, &rcvbuflen, &rcvbuflenlen);
    if (rc != 0) {
        printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno));
        return NULL;
    }
	rc = zmq_getsockopt (s, ZMQ_SNDBUF, &sndbuflen, &sndbuflenlen);
    if (rc != 0) {
        printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno));
        return NULL;
    }

	printf("RCVBUF=%d KB SNDBUF=%d KB adjusted\n", rcvbuflen/1024, sndbuflen/1024);


    rc = zmq_connect (s, connect_to);
    if (rc != 0) {
        printf ("error in zmq_connect: %s\n", zmq_strerror (errno));
        return NULL;
    }

    if( flow == ZMQ_PUSH){

/*	DO IT USING ZMQ_MESSAGES
        zmq_msg_t msg;

        for (i = 0; i != message_count; i++) {

            rc = zmq_msg_init_data (&msg, buf, message_size, NULL, NULL);
            if (rc != 0) {
                printf ("error in zmq_msg_init_data: %s\n", zmq_strerror (errno));
                return NULL;
            }

            rc = zmq_msg_send( &msg, s, 0);
            if (rc < 0) {
                printf ("error in zmq_send: %s\n", zmq_strerror (errno));
            return NULL;
            }

            rc = zmq_msg_close (&msg);
            if (rc != 0) {
                printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
                exit (1);
            }
        }
*/
        for (i = 0; i != message_count; i++) {
            rc = zmq_send( s, buf, message_size, 0);
            if (rc < 0) {
                printf ("error in zmq_send: %s\n", zmq_strerror (errno));
            	return NULL;
            }
        }

    }
    if( flow == ZMQ_PULL){

        zmq_msg_t msg;

        rc = zmq_msg_init (&msg);
        if (rc != 0) {
            printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno));
            return NULL;
        }
        for (i = 0; i != message_count; i++) {
            rc = zmq_msg_recv (&msg, s, 0);
            if (rc < 0) {
                printf ("error in zmq_recv: %s\n", zmq_strerror (errno));
                return NULL;
            }
        }
    }

    rc = zmq_close (s);
    if (rc != 0) {
        printf ("error in zmq_close: %s\n", zmq_strerror (errno));
		return NULL;
    }

	free( buf);

    return NULL;
}

int main (int argc, char *argv [])
{
    void *ctx;
    int rc;
    int i;
	void *p;

    if (argc != 8) {
        printf ("usage: remote_thr <connect-to> <message-size> <message-count> <SND/RCV buffer> <flow (PUSH/PULL)> <zmq-threads> <workers>\n");
        return 1;
    }

    connect_to = argv [1];
    message_size = atoi (argv [2]);
    message_count = atoi (argv [3]);
    if( !strcmp( argv [5], "PUSH")){
        flow = ZMQ_PUSH;
        rcvbuflen = sndbuflen = atoi (argv [4]);
    }
    if( !strcmp( argv [5], "PULL")){
        flow = ZMQ_PULL;
        sndbuflen = rcvbuflen = atoi (argv [4]);
    }
    threads = atoi (argv [6]);
    workers = atoi (argv [7]);


    ctx = zmq_ctx_new ();
    if (!ctx) {
        printf ("error in zmq_ctx_new: %s\n", zmq_strerror (errno));
        return -1;
    }

    rc = zmq_ctx_set ( ctx, ZMQ_IO_THREADS, threads);
    if (rc) {
        printf ("error in zmq_ctx_set: %s\n", zmq_strerror (errno));
        return -1;
    }

	printf("Threads: %d, workers %d\n", zmq_ctx_get( ctx, ZMQ_IO_THREADS), workers);
	pthread_t worker[128];

    US_TIMER timer;

    tm_init( &timer);

    for (i = 0; i < workers; i++) {
        pthread_create (&worker[i], NULL, worker_routine, ctx);
		printf("Worker %d spawned\n", i);
    }

    for (i = 0; i < workers; i++) {
        pthread_join( worker[i], &p);
		printf("Worker %d joined\n", i);
    }

    float secs = tm_secs( &timer);
    float total = ( (float)threads)*(((float) message_count) * ((float) message_size)) / (1024.0*1024.0*1024.0);

    printf ("Message size: %d KBytes, time: %f secs\n", (int) message_size/1024, secs);
    printf ("%sed %.3f GB @ %.3f GB/s\n", (flow == ZMQ_PULL) ? "Pull":"Push", total, total/secs);

    rc = zmq_term (ctx);
    if (rc != 0) {
        printf ("error in zmq_term: %s\n", zmq_strerror (errno));
        return -1;
    }

    return 0;
}

/*
    Copyright (c) 2007-2012 iMatix Corporation
    Copyright (c) 2009-2011 250bpm s.r.o.
    Copyright (c) 2007-2011 Other contributors as noted in the AUTHORS file

    This file is part of 0MQ.

    0MQ is free software; you can redistribute it and/or modify it under
    the terms of the GNU Lesser General Public License as published by
    the Free Software Foundation; either version 3 of the License, or
    (at your option) any later version.

    0MQ is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
    GNU Lesser General Public License for more details.

    You should have received a copy of the GNU Lesser General Public License
    along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/

#include "zmq.h"
#include "zmq_utils.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include <limits.h>
#include <sys/time.h>
#include <pthread.h>

typedef struct US_TIMER US_TIMER;

struct US_TIMER{

    struct timeval  time_was;
    struct timeval  time_now;
};
/*  Records the current timer state
*/
void tm_init( US_TIMER *t){

    if( gettimeofday( &t->time_now, NULL) < 0){ perror( "d_timer_init()");}

    t->time_was = t->time_now;

}

/*  Returns the time passed in microsecond precision in seconds since last init
    of timer.
*/
float tm_secs( US_TIMER *t){

    register float seconds;

    if( gettimeofday( &t->time_now, NULL) < 0){ perror( "d_timer_init()");}

    seconds = ( ((float)( t->time_now.tv_sec - t->time_was.tv_sec)) +
             (((float)( t->time_now.tv_usec - t->time_was.tv_usec)) / 1000000.0));

    t->time_was = t->time_now;

    return( seconds);
}

const char *bind_to;
const char rsbind_to[] = "tcp://*:5031";
const char dsbind_to[] = "inproc://workers";
int message_count = 1000;
int message_size = 1024;
int threads = 1;
int workers = 1;
int sndbuflen = 128*256;
int rcvbuflen = 128*256;
int flow = ZMQ_PULL;

void my_free (void *data, void *hint)
{
   // free (data);
}

static void *server_routine (void *ctx_) {

    US_TIMER timer;
    void *ctx;
    void *s;
    int rc;
    int i;
	void *buf = NULL;

	if( !(buf = malloc( message_size))){ perror("malloc"); return NULL;}

    ctx = zmq_ctx_new ();
    if (!ctx) {
        printf ("error in zmq_ctx_new: %s\n", zmq_strerror (errno));
        return NULL;
    }

    rc = zmq_ctx_set ( ctx, ZMQ_IO_THREADS, threads);
    if (rc) {
        printf ("error in zmq_ctx_set: %s\n", zmq_strerror (errno));
        return NULL;
    }

    s = zmq_socket (ctx, flow);
    if (!s) {
        printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
        return NULL;
    }

    //  Add your socket options here.
    //  For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.

	size_t rcvbuflenlen = (size_t)sizeof rcvbuflen;
	size_t sndbuflenlen = (size_t)sizeof sndbuflen;

	rc = zmq_setsockopt (s, ZMQ_RCVBUF, &rcvbuflen, rcvbuflenlen);
    if (rc != 0) {
        printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
        return NULL;
    }
	rc = zmq_setsockopt (s, ZMQ_SNDBUF, &sndbuflen, sndbuflenlen);
    if (rc != 0) {
        printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
        return NULL;
    }

	rc = zmq_getsockopt (s, ZMQ_RCVBUF, &rcvbuflen, &rcvbuflenlen);
    if (rc != 0) {
        printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno));
        return NULL;
    }
	rc = zmq_getsockopt (s, ZMQ_SNDBUF, &sndbuflen, &sndbuflenlen);
    if (rc != 0) {
        printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno));
        return NULL;
    }

	printf("RCVBUF=%d KB SNDBUF=%d KB adjusted\n", rcvbuflen/1024, sndbuflen/1024);

	printf("Threads: %d\n", zmq_ctx_get( ctx, ZMQ_IO_THREADS));

    zmq_connect( s, "inproc://workers");
    if (rc != 0) {
        printf ("error in zmq_connect: %s\n", zmq_strerror (errno));
        return NULL;
    }

	if( !(buf = malloc( message_size))){ perror("malloc"); return NULL;}

    tm_init( &timer);

    if( flow == ZMQ_PULL){

        zmq_msg_t msg;

        rc = zmq_msg_init (&msg);
        if (rc != 0) {
            printf ("error in zmq_msg_init: %s\n", zmq_strerror (errno));
            return NULL;
        }
        for (i = 0; i != message_count; i++) {
            rc = zmq_msg_recv (&msg, s, 0);
            if (rc < 0) {
                printf ("error in zmq_recv: %s\n", zmq_strerror (errno));
                return NULL;
            }
            /*
            if ((rc = zmq_msg_size (&msg)) != message_size) {
                printf ("message of incorrect size (%d) received in loop %d\n", rc, i);
                return NULL;
            }*/
        }
        rc = zmq_msg_close (&msg);
        if (rc != 0) {
            printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
            return NULL;
        }
    }
    if( flow == ZMQ_PUSH){

        zmq_msg_t msg;

        for (i = 0; i != message_count; i++) {

            rc = zmq_msg_init_data (&msg, buf, message_size, NULL, NULL);
            if (rc != 0) {
                printf ("error in zmq_msg_init_data: %s\n", zmq_strerror (errno));
                return NULL;
            }

            rc = zmq_msg_send( &msg, s, 0);
            if (rc < 0) {
                printf ("error in zmq_send: %s\n", zmq_strerror (errno));
                return NULL;
            }
/*
            rc = zmq_msg_close (&msg);
            if (rc != 0) {
                printf ("error in zmq_msg_close: %s\n", zmq_strerror (errno));
               return NULL;
            }
            */
        }
    }

    float secs = tm_secs( &timer);
    float total = (((float) message_count) * ((float) message_size)) / (1024.0*1024.0*1024.0);

    printf ("Message size: %d KBytes, time: %f secs\n", (int) message_size/1024, secs);
    printf ("%sed %.3f GB @ %.3f GB/s\n", (flow == ZMQ_PULL) ? "Pull":"Push", total, total/secs);


    rc = zmq_close (s);
    if (rc != 0) {
        printf ("error in zmq_close: %s\n", zmq_strerror (errno));
        return NULL;
    }

    rc = zmq_term (ctx);
    if (rc != 0) {
        printf ("error in zmq_term: %s\n", zmq_strerror (errno));
        return NULL;
    }

    if( buf) free( buf);

    return NULL;
}

int main (int argc, char *argv [])
{
    US_TIMER timer;
    void *ctx;
    void *rs, *ds;
    int rc;
    int i;
    void *p;

    if (argc != 8) {
        printf ("usage: local_thr <bind-to> <message-size> <message-count> <SND/RCV buffer> <flow (PUSH/PULL)> <threads> <workers>\n");
        return 1;
    }

    bind_to = argv [1];
    message_size = atoi (argv [2]);
    message_count = atoi (argv [3]);

    if( !strcmp( argv [5], "PUSH")){
        flow = ZMQ_PUSH;
        rcvbuflen = sndbuflen = atoi (argv [4]);
    }
    if( !strcmp( argv [5], "PULL")){
        flow = ZMQ_PULL;
        sndbuflen = rcvbuflen = atoi (argv [4]);
    }
    threads = atoi (argv [6]);
    workers = atoi (argv [7]);

    ctx = zmq_ctx_new ();
    if (!ctx) {
        printf ("error in zmq_ctx_new: %s\n", zmq_strerror (errno));
        return -1;
    }

    rc = zmq_ctx_set ( ctx, ZMQ_IO_THREADS, threads);
    if (rc) {
        printf ("error in zmq_ctx_set: %s\n", zmq_strerror (errno));
        return -1;
    }

    rs = zmq_socket (ctx, ZMQ_ROUTER);
    if (!rs) {
        printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
        return -1;
    }

    ds = zmq_socket (ctx, ZMQ_DEALER);
    if (!ds) {
        printf ("error in zmq_socket: %s\n", zmq_strerror (errno));
        return -1;
    }

        //  Add your socket options here.
    //  For example ZMQ_RATE, ZMQ_RECOVERY_IVL and ZMQ_MCAST_LOOP for PGM.

/*	size_t rcvbuflenlen = (size_t)sizeof rcvbuflen;
	size_t sndbuflenlen = (size_t)sizeof sndbuflen;

	rc = zmq_setsockopt (s, ZMQ_RCVBUF, &rcvbuflen, rcvbuflenlen);
    if (rc != 0) {
        printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
        return -1;
    }
	rc = zmq_setsockopt (s, ZMQ_SNDBUF, &sndbuflen, sndbuflenlen);
    if (rc != 0) {
        printf ("error in zmq_setsockopt: %s\n", zmq_strerror (errno));
        return -1;
    }

	rc = zmq_getsockopt (s, ZMQ_RCVBUF, &rcvbuflen, &rcvbuflenlen);
    if (rc != 0) {
        printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno));
        return -1;
    }
	rc = zmq_getsockopt (s, ZMQ_SNDBUF, &sndbuflen, &sndbuflenlen);
    if (rc != 0) {
        printf ("error in zmq_getsockopt: %s\n", zmq_strerror (errno));
        return -1;
    }

	printf("RCVBUF=%d KB SNDBUF=%d KB adjusted\n", rcvbuflen/1024, sndbuflen/1024);

	printf("Threads: %d\n", zmq_ctx_get( ctx, ZMQ_IO_THREADS));

*/
    rc = zmq_bind (rs, rsbind_to);
    if (rc != 0) {
        printf ("error in zmq_bind: %s\n", zmq_strerror (errno));
        return -1;
    }

    rc = zmq_bind (ds, dsbind_to);
    if (rc != 0) {
        printf ("error in zmq_bind: %s\n", zmq_strerror (errno));
        return -1;
    }

	pthread_t worker[128];

    tm_init( &timer);

    for (i = 0; i < workers; i++) {
        pthread_create (&worker[i], NULL, server_routine, ctx);
		printf("Worker %d spawned\n", i);
    }

    rc = zmq_proxy( ds, rs, NULL);
    if (rc != 0) {
        printf ("error in zmq_proxy: %s\n", zmq_strerror (errno));
        return -1;
    }

    for (i = 0; i < workers; i++) {
        pthread_join( worker[i], &p);
		printf("Worker %d joined\n", i);
    }

    float secs = tm_secs( &timer);
    float total = ( (float)threads)*(((float) message_count) * ((float) message_size)) / (1024.0*1024.0*1024.0);

    tm_init( &timer);

    secs = tm_secs( &timer);
    total = (((float) message_count) * ((float) message_size)) / (1024.0*1024.0*1024.0);

    printf ("Message size: %d KBytes, time: %f secs\n", (int) message_size/1024, secs);
    printf ("%sed %.3f GB @ %.3f GB/s\n", (flow == ZMQ_PULL) ? "Pull":"Push", total, total/secs);


    rc = zmq_close (rs);
    if (rc != 0) {
        printf ("error in zmq_close: %s\n", zmq_strerror (errno));
        return -1;
    }
    rc = zmq_close (ds);
    if (rc != 0) {
        printf ("error in zmq_close: %s\n", zmq_strerror (errno));
        return -1;
    }

    rc = zmq_term (ctx);
    if (rc != 0) {
        printf ("error in zmq_term: %s\n", zmq_strerror (errno));
        return -1;
    }

    return 0;
}
_______________________________________________
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to