hello,
I have a simple test case here that mirrors what i am doing in my main
project. I need to publish messages from several worker threads, so I
have a socket to connect to an ipc pipe from each thread and then a
forwarder device to forward from the backend socket to the main
publisher socket in the main thread. The problem is that when I monitor
the port, no messages are forthcoming.
I monitor the port with the line: tshark -f "tcp port 4000" -i eth1
(and with -i lo)
The name of the interface is correct.
Does anyone have any ideas why this isnt outputing any messages?
Thanks,
dgs.
#include <stdlib.h>
#include <stdio.h>
#include <stdint.h>
#include <unistd.h>
#include <string.h>
#include <zmq.h>
#include <pthread.h>
#define NB_WORKER_THREADS 10
static int thr_n = 0;
static int cmd = 0;
static char *topic_str = "MSG";
void* pub_worker(void* arg){
char addr[32];
void *ctx = arg;
int threadnum = thr_n++;
void *pubskt = zmq_socket(ctx, ZMQ_PUB);
if (pubskt == NULL){
fprintf(stderr,"unable to get pub skt\n");
exit(1);
}
snprintf(addr, 32, "inproc://publish%d",threadnum);
fprintf(stdout,"bind to %s\n", addr);
int rc = zmq_bind(pubskt, addr);
if (rc){
fprintf(stderr,"unable to bind to %s\n", addr);
exit(1);
}
sleep(10);
fprintf(stdout,"worker%d: start sending messages ...\n", threadnum);
zmq_msg_t topic_msg, cmd_msg;
while (1){
zmq_msg_init_data(&topic_msg, topic_str, 3, NULL, NULL);
zmq_send(pubskt, &topic_msg, ZMQ_SNDMORE);
zmq_msg_close(&topic_msg);
zmq_msg_init_size(&cmd_msg, sizeof(uint8_t));
memcpy(zmq_msg_data(&cmd_msg), &cmd, sizeof(uint8_t));
cmd++;
zmq_send(pubskt, &cmd_msg, 0);
zmq_msg_close(&cmd_msg);
fprintf(stdout, "publish: %s %u\n", topic_str, cmd);
sleep(rand()%10);
}
return NULL;
}
typedef struct device_param_t {
void *skt1;
void *skt2;
} DeviceParam;
void* doforward(void *arg){
DeviceParam *dp = (DeviceParam*)arg;
int rc = zmq_device(ZMQ_FORWARDER, dp->skt1, dp->skt2);
if (rc){
fprintf(stderr,"unable to set up device\n");
}
return NULL;
}
int main(int argc, char **argv){
void *ctx = zmq_init(1);
char addr[32];
if (ctx == NULL){
printf("unable to create zmq context\n");
exit(1);
}
void *pubskt = zmq_socket(ctx, ZMQ_PUB);
if (pubskt == NULL){
printf("unable to get socket\n");
exit(1);
}
fprintf(stdout,"bind to eth1:4000\n");
int rc = zmq_bind(pubskt, "tcp://eth1:4000");
if (rc){
printf("unable to bind to address, eth1\n");
exit(1);
}
fprintf(stdout,"bind to lo:4000\n");
rc = zmq_bind(pubskt, "tcp://lo:4000");
if (rc){
printf("unable to bind to address, lo\n");
exit(1);
}
void *workerskt = zmq_socket(ctx, ZMQ_SUB);
if (workerskt == NULL){
printf("unable to create worker socket\n");
exit(1);
}
int i;
pthread_t workerthrs[NB_WORKER_THREADS];
for (i=0;i<NB_WORKER_THREADS;i++){
fprintf(stdout,"create worker thread %d\n", i);
snprintf(addr, 32, "inproc://publish%d", i);
if (pthread_create(&workerthrs[i], NULL, pub_worker, ctx) != 0){
fprintf(stderr,"unable to create worker thread, %d\n", i);
exit(1);
}
fprintf(stdout,"subscribe\n");
rc = zmq_setsockopt(workerskt, ZMQ_SUBSCRIBE, "", 0);
if (rc){
fprintf(stderr,"unable to subscribe worker skt\n");
exit(1);
}
fprintf(stdout,"connect to %s\n", addr);
rc = zmq_connect(workerskt, addr);
if (rc){
fprintf(stderr,"unable to connect socket to %s\n", addr);
exit(1);
}
}
fprintf(stdout,"set up fwd'er device\n");
rc = zmq_device(ZMQ_FORWARDER, workerskt, pubskt);
if (rc){
printf("unable to set up forwarder device\n");
exit(1);
}
return 0;
}
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev