Hi,

In stock trading industry it's pretty common to delay stock quote feeds so that it can be sold for discount price to the clients that have no need for real-time stock quote feeds.

Attached is the source code for a simple application that delays messages by 10 seconds. It can be tested using "prompt" and "display" applications from "chat" example (examples/chat). Write an instant message and it gets delivered with 10 sec delay :)

I hope some of you will find this piece of code useful.

Martin
#include <stdint.h>
#include <pthread.h>
#include <unistd.h>
#include <sys/time.h>

#include <zmq.hpp>

//  Delay in seconds.
#define DELAY 10

//  Return current time (in second precision).
uint64_t now ()
{
    timeval tv;
    int rc = gettimeofday (&tv, NULL);
    assert (rc == 0);
    return tv.tv_sec;
}

//  Worker thread to receive messages and push them into internal pipe.
void *receive_routine (void *arg)
{
   zmq::context_t *ctx = (zmq::context_t*) arg;

    //  Socket to read messages from the network.
    zmq::socket_t ins (*ctx, ZMQ_SUB);
    ins.setsockopt (ZMQ_SUBSCRIBE, "", 0);
    ins.bind ("tcp://lo:5555");

    //  Socket to push message into internal pipe.
    zmq::socket_t outs (*ctx, ZMQ_PUB);
    outs.connect ("inproc://pipe");

    while (true) {

        //  Receive message.
        zmq::message_t msg;
        ins.recv (&msg);

        //  Tag the message with current time.
        zmq::message_t msgex (msg.size () + 8);
        *(uint64_t*) msgex.data () = now ();
        memcpy (((unsigned char*) msgex.data ()) + 8, msg.data (), msg.size ());

        //  Push it into the pipe (i.e. send it to the main thread).
        outs.send (msgex);
    }
}

int main ()
{
    zmq::context_t ctx (2, 1);

    //  Socket to pop the messages from the internal pipe.
    zmq::socket_t ins (ctx, ZMQ_SUB);
    ins.setsockopt (ZMQ_SUBSCRIBE, "", 0);
    ins.bind ("inproc://pipe");

    //  Socket to send the messages to the network.
    zmq::socket_t outs (ctx, ZMQ_PUB);
    outs.bind ("tcp://lo:5556");

    //  Start the worket thread.
    pthread_t worker;
    int rc = pthread_create (&worker, NULL, receive_routine, (void*) &ctx);
    assert (rc == 0);
    
    while (true) {

        //  Get next message from the internal pipe.
        zmq::message_t msg;
        ins.recv (&msg);

        //  Get the message timestamp.
        uint64_t arrived = *(uint64_t*) msg.data ();

        //  If the delay haven't yet elapsed, wait for a while.
        uint64_t current = now ();        
        if (current < arrived + DELAY)
            sleep (arrived + DELAY - current);

        //  Strip the timestamp from the message.
        zmq::message_t msgraw (msg.size () - 8);
        memcpy (msgraw.data (), ((unsigned char*) msg.data ()) + 8,
            msgraw.size ());

        //  Send the delayed message to the network.
        outs.send (msgraw);
    }
}
_______________________________________________
zeromq-dev mailing list
[email protected]
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to