On 20 May 2011 15:27, Attila-Mihaly Balazs <x_at_y_o...@yahoo.com> wrote:

> Hello everyone,
>
> I'm playing around with ZMQ + Java and I've run into a couple of problems.
> The test program which I use is attached [1]. What I try to do:
> - start a published and a subscriber on a IPC transport (ipc://zeromq_test)
> - publish a message on this transport and ensure that I receive it back
> - stop the transport
>
> However I've run into several problems (which I suspect are caused by me,
> since I'm a ZMQ noob):
> - some times I get the message: "Assertion failed: nbytes == sizeof
> (command_t) (mailbox.cpp:245)" [4] - I found a discussion related to this
> from last year [3], however it's not clear to me what the problem is or how
> I should resolve it. Is this a bug or am I making a mistake?
> - some times the termination thread hangs. I've attached a gdb stack trace
> [2] with the termination thread. From it, if I read correctly, it hangs in
> mailbox.cpp:204, which is in recv (which has the blocking parameter set to
> true). Again, am I incorrectly terminating or is there a bug?
>

Hi,

Try initializing the sockets inside the thread that uses them and avoid
passing around sockets between threads.

See the attached modified version of your test.

Cheers,
Marko
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import org.zeromq.ZMQ;
import org.zeromq.ZMQException;


public class TestJZMQ {
	public static void main(String... args) throws Exception {		
      System.out.println("starting");
		while (true) {
			RunInfo runInfo = start();
      System.out.println("sending");
			runInfo.publishSocket.send("foobar".getBytes(), 0);
      
      System.out.println("try acquire");
			if (!runInfo.dispatcher.receivedMessages.tryAcquire(10, TimeUnit.SECONDS)) {
				System.err.println("Message wasn't received!");
				System.exit(1);
			}
      System.out.println("try acquired");
			if (runInfo.dispatcher.error) {
				System.err.println("Invalid message received!");
				System.exit(2);				
			}
			stop(runInfo);
		}
	}
	
	private static RunInfo start() {
		ZMQ.Context context = ZMQ.context(1);
		ZMQ.Socket publishSocket = context.socket(ZMQ.PUB);
		publishSocket.bind("ipc://zeromq_test");
		ZeroMQDispatcher dispatcher = new ZeroMQDispatcher(context);
		Thread dispatchThread = new Thread(null, dispatcher, 
			"ZeroMQDispatcher@" + System.currentTimeMillis());
		dispatchThread.setDaemon(true);
		dispatchThread.start();
		
		return new RunInfo(dispatchThread, context, publishSocket, dispatcher);
	}
	
	private static void stop(final RunInfo runInfo) throws Exception {
      System.out.println("stopping");
		final CountDownLatch stopLatch = new CountDownLatch(1);
		new Thread("StopThread@" + System.currentTimeMillis()) {
			@Override
			public void run() {
				runInfo.dispatchThread.interrupt();
        System.out.println("dispatcher interrupted");
        //				runInfo.subscribeSocket.close();
				stopLatch.countDown();
        System.out.println("stop thread finishing");
			}
		}.start();
		
		if (!stopLatch.await(10, TimeUnit.SECONDS)) {
			System.err.println("Exit timeouted!");
			System.exit(3);
		}

    System.out.println("closing main thread sockets and context");
    runInfo.publishSocket.close();
    runInfo.context.term();

		System.exit(0);
	}

	static class RunInfo {
		final Thread dispatchThread;
		final ZMQ.Context context;
		final ZMQ.Socket publishSocket;
		final ZeroMQDispatcher dispatcher;
		
		RunInfo(Thread dispatchThread, ZMQ.Context context, 
				ZMQ.Socket publishSocket, ZeroMQDispatcher dispatcher) {
			this.dispatchThread = dispatchThread;
			this.context = context;
			this.publishSocket = publishSocket;
			this.dispatcher = dispatcher;
		}
	}
	
	static class ZeroMQDispatcher implements Runnable {
		 final ZMQ.Socket subscribeSocket;
		final Semaphore receivedMessages;
		volatile boolean error;
		
		ZeroMQDispatcher(ZMQ.Context context) {
        ZMQ.Socket subscribeSocket = context.socket(ZMQ.SUB);
        subscribeSocket.connect("ipc://zeromq_test");
        subscribeSocket.subscribe(new byte[0]);
        
			this.subscribeSocket = subscribeSocket;
			this.receivedMessages = new Semaphore(0);
			this.error = false;
		}

		@Override
		public void run() {
			Thread currentThread = Thread.currentThread();
			while (!currentThread.isInterrupted()) {
				byte[] messageBytes;
				try {
					messageBytes = subscribeSocket.recv(0);
          System.out.println("got some data");
	            } catch (ZMQException ex) {
	                if (ZMQ.Error.ETERM.getCode() == ex.getErrorCode()) {
	                	// termination requested
	                    break;
	                }
	                throw ex;
	            }
				
	            String message = new String(messageBytes);
	            if (!"foobar".equals(message)) { error = true; }
              System.out.println("releasing semaphore");
	            receivedMessages.release();
              System.out.println("released semaphore");
			}
      
      subscribeSocket.close();
		}
	}
}
_______________________________________________
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to