On 20 May 2011 15:27, Attila-Mihaly Balazs <x_at_y_o...@yahoo.com> wrote:
> Hello everyone, > > I'm playing around with ZMQ + Java and I've run into a couple of problems. > The test program which I use is attached [1]. What I try to do: > - start a published and a subscriber on a IPC transport (ipc://zeromq_test) > - publish a message on this transport and ensure that I receive it back > - stop the transport > > However I've run into several problems (which I suspect are caused by me, > since I'm a ZMQ noob): > - some times I get the message: "Assertion failed: nbytes == sizeof > (command_t) (mailbox.cpp:245)" [4] - I found a discussion related to this > from last year [3], however it's not clear to me what the problem is or how > I should resolve it. Is this a bug or am I making a mistake? > - some times the termination thread hangs. I've attached a gdb stack trace > [2] with the termination thread. From it, if I read correctly, it hangs in > mailbox.cpp:204, which is in recv (which has the blocking parameter set to > true). Again, am I incorrectly terminating or is there a bug? > Hi, Try initializing the sockets inside the thread that uses them and avoid passing around sockets between threads. See the attached modified version of your test. Cheers, Marko
import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import org.zeromq.ZMQ; import org.zeromq.ZMQException; public class TestJZMQ { public static void main(String... args) throws Exception { System.out.println("starting"); while (true) { RunInfo runInfo = start(); System.out.println("sending"); runInfo.publishSocket.send("foobar".getBytes(), 0); System.out.println("try acquire"); if (!runInfo.dispatcher.receivedMessages.tryAcquire(10, TimeUnit.SECONDS)) { System.err.println("Message wasn't received!"); System.exit(1); } System.out.println("try acquired"); if (runInfo.dispatcher.error) { System.err.println("Invalid message received!"); System.exit(2); } stop(runInfo); } } private static RunInfo start() { ZMQ.Context context = ZMQ.context(1); ZMQ.Socket publishSocket = context.socket(ZMQ.PUB); publishSocket.bind("ipc://zeromq_test"); ZeroMQDispatcher dispatcher = new ZeroMQDispatcher(context); Thread dispatchThread = new Thread(null, dispatcher, "ZeroMQDispatcher@" + System.currentTimeMillis()); dispatchThread.setDaemon(true); dispatchThread.start(); return new RunInfo(dispatchThread, context, publishSocket, dispatcher); } private static void stop(final RunInfo runInfo) throws Exception { System.out.println("stopping"); final CountDownLatch stopLatch = new CountDownLatch(1); new Thread("StopThread@" + System.currentTimeMillis()) { @Override public void run() { runInfo.dispatchThread.interrupt(); System.out.println("dispatcher interrupted"); // runInfo.subscribeSocket.close(); stopLatch.countDown(); System.out.println("stop thread finishing"); } }.start(); if (!stopLatch.await(10, TimeUnit.SECONDS)) { System.err.println("Exit timeouted!"); System.exit(3); } System.out.println("closing main thread sockets and context"); runInfo.publishSocket.close(); runInfo.context.term(); System.exit(0); } static class RunInfo { final Thread dispatchThread; final ZMQ.Context context; final ZMQ.Socket publishSocket; final ZeroMQDispatcher dispatcher; RunInfo(Thread dispatchThread, ZMQ.Context context, ZMQ.Socket publishSocket, ZeroMQDispatcher dispatcher) { this.dispatchThread = dispatchThread; this.context = context; this.publishSocket = publishSocket; this.dispatcher = dispatcher; } } static class ZeroMQDispatcher implements Runnable { final ZMQ.Socket subscribeSocket; final Semaphore receivedMessages; volatile boolean error; ZeroMQDispatcher(ZMQ.Context context) { ZMQ.Socket subscribeSocket = context.socket(ZMQ.SUB); subscribeSocket.connect("ipc://zeromq_test"); subscribeSocket.subscribe(new byte[0]); this.subscribeSocket = subscribeSocket; this.receivedMessages = new Semaphore(0); this.error = false; } @Override public void run() { Thread currentThread = Thread.currentThread(); while (!currentThread.isInterrupted()) { byte[] messageBytes; try { messageBytes = subscribeSocket.recv(0); System.out.println("got some data"); } catch (ZMQException ex) { if (ZMQ.Error.ETERM.getCode() == ex.getErrorCode()) { // termination requested break; } throw ex; } String message = new String(messageBytes); if (!"foobar".equals(message)) { error = true; } System.out.println("releasing semaphore"); receivedMessages.release(); System.out.println("released semaphore"); } subscribeSocket.close(); } } }
_______________________________________________ zeromq-dev mailing list zeromq-dev@lists.zeromq.org http://lists.zeromq.org/mailman/listinfo/zeromq-dev