hello,

I have a simple test case here that mirrors what i am doing in my main
project.  I need to publish messages from several worker threads, so I
have a socket to connect to an ipc pipe from each thread and then a
forwarder device to forward from the backend socket to the main
publisher socket in the main thread.  The problem is that when I monitor
the port, no messages are forthcoming. 

I monitor the port with the line: tshark -f "tcp port 4000" -i eth1

(and with -i lo) 

The name of the interface is correct. 

Does anyone have any ideas why this isnt outputing any messages?

Thanks,
dgs.

#include <stdlib.h>
#include <stdio.h>
#include <stdint.h>
#include <unistd.h>
#include <string.h>
#include <zmq.h>
#include <pthread.h>

#define NB_WORKER_THREADS 10

static int thr_n = 0;
static int cmd = 0;
static char *topic_str = "MSG";

void* pub_worker(void* arg){
    char addr[32];
    void *ctx = arg;
    int threadnum = thr_n++;

    void *pubskt = zmq_socket(ctx, ZMQ_PUB);
    if (pubskt == NULL){
        fprintf(stderr,"unable to get pub skt\n");
        exit(1);
    }

    snprintf(addr, 32, "inproc://publish%d",threadnum);
    fprintf(stdout,"bind to %s\n", addr);
    int rc = zmq_bind(pubskt, addr);
    if (rc){
        fprintf(stderr,"unable to bind to %s\n", addr);
        exit(1);
    }

    sleep(10);
    fprintf(stdout,"worker%d: start sending messages ...\n", threadnum);
    zmq_msg_t topic_msg, cmd_msg;
    while (1){
        zmq_msg_init_data(&topic_msg, topic_str, 3, NULL, NULL);
        zmq_send(pubskt, &topic_msg, ZMQ_SNDMORE);
        zmq_msg_close(&topic_msg);

        zmq_msg_init_size(&cmd_msg, sizeof(uint8_t));
        memcpy(zmq_msg_data(&cmd_msg), &cmd, sizeof(uint8_t));
        cmd++;

        zmq_send(pubskt, &cmd_msg, 0);
        zmq_msg_close(&cmd_msg);

        fprintf(stdout, "publish: %s %u\n", topic_str, cmd);

        sleep(rand()%10);
    }

    return NULL;
}

typedef struct device_param_t {
    void *skt1;
    void *skt2;
} DeviceParam;

void* doforward(void *arg){
    DeviceParam *dp = (DeviceParam*)arg;
    int rc = zmq_device(ZMQ_FORWARDER, dp->skt1, dp->skt2);
    if (rc){
        fprintf(stderr,"unable to set up device\n");
    }
    return NULL;
}


int main(int argc, char **argv){
    void *ctx = zmq_init(1);

    char addr[32];
    if (ctx == NULL){
        printf("unable to create zmq context\n");
        exit(1);
    }
    void *pubskt = zmq_socket(ctx, ZMQ_PUB);
    if (pubskt == NULL){
        printf("unable to get socket\n");
        exit(1);
    }

    fprintf(stdout,"bind to eth1:4000\n");
    int rc = zmq_bind(pubskt, "tcp://eth1:4000");
    if (rc){
        printf("unable to bind to address, eth1\n");
        exit(1);
    }

    fprintf(stdout,"bind to lo:4000\n");
    rc = zmq_bind(pubskt, "tcp://lo:4000");
    if (rc){
        printf("unable to bind to address, lo\n");
        exit(1);
    }

    void *workerskt = zmq_socket(ctx, ZMQ_SUB);
    if (workerskt == NULL){
        printf("unable to create worker socket\n");
        exit(1);
    }

    int i;
    pthread_t workerthrs[NB_WORKER_THREADS];
    for (i=0;i<NB_WORKER_THREADS;i++){
        fprintf(stdout,"create worker thread %d\n", i);

        snprintf(addr, 32, "inproc://publish%d", i);
        if (pthread_create(&workerthrs[i], NULL, pub_worker, ctx) != 0){
            fprintf(stderr,"unable to create worker thread, %d\n", i);
            exit(1);
        }

        fprintf(stdout,"subscribe\n");
        rc = zmq_setsockopt(workerskt, ZMQ_SUBSCRIBE, "", 0);
        if (rc){
            fprintf(stderr,"unable to subscribe worker skt\n");
            exit(1);
        }

        fprintf(stdout,"connect to %s\n", addr);
        rc = zmq_connect(workerskt, addr);
        if (rc){
            fprintf(stderr,"unable to connect socket to %s\n", addr);
            exit(1);
        }
    }

    fprintf(stdout,"set up fwd'er device\n");
    rc = zmq_device(ZMQ_FORWARDER, workerskt, pubskt); 
    if (rc){
        printf("unable to set up forwarder device\n");
        exit(1);
    }

    return 0;
}

_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to