Thanks for the test case. I will look at it in more detail later this evening. Might I suggest allocating the direct bytebuffer outside the loop and trying something like this..
... while(true) { try { socket.recvZeroCopy(buffer, buffer.remaining(), ZMQ.DONTWAIT) ; } catch(ZMQException ignore) { Thread.sleep(10); } } JZMQ throws ZMQExceptions in the face of an error. In your case, you are receiving EAGAIN which should be retried. -Trev On Tue, Sep 17, 2013 at 10:05 AM, Alex Suo <alex....@gmail.com> wrote: > Hi there, > > Thank you for your time in advance. I am doing my high performance system > based on ZeroMQ and jzmq in Java. While I am trying to use direct buffer, I > encountered some problem. I have summarize it into the following test code > in windows. > > My unit test environment is: > > Windows 7 64bit > ZeroMQ 3.2.3 > jzmq master from GIT, checked out in July 2013 > JDK 1.7.0_25 > > And the ftest code is as following. You can see in the subscriber thread > that if I use the recv(ZMQ.DONTWAIT) way it works perfectly; but if I use > the direct buffer, it gives me no response while running and gives me the > following error on exit: > > Exception in thread "Thread-0" org.zeromq.ZMQException: Resource > temporarily unavailable(0xb) > at org.zeromq.ZMQ$Socket.recvZeroCopy(Native Method) > at > org.as.algo.messaging.bus.zmq.ZMQReadynessTest$1.run(ZMQReadynessTest.java:48) > > > If I try to use in-direct ByteBuffer, I don't have exception on exit but > still cannot receive anything. > > Please kindly assist me. Lots of thanks. > > Alex > > import static org.junit.Assert.assertTrue; > import static org.junit.Assert.fail; > > import java.nio.ByteBuffer; > import java.util.concurrent.atomic.AtomicInteger; > > import org.junit.Before; > import org.junit.Test; > import org.zeromq.ZMQ; > > /** > * Test if ZMQ is ready on this box. > * > * @author Alex Suo > * > */ > public class ZMQReadynessTest { > > private ZMQ.Context context; > > @Before > public void setUp() { > context = ZMQ.context(1); > } > > @Test > public void testSimpleMessage() { > String topic = "tcp://127.0.0.1:31216"; > final AtomicInteger counter = new AtomicInteger(); > > > // create a simple subscriber > final ZMQ.Socket subscribeSocket = context.socket(ZMQ.SUB); > subscribeSocket.connect(topic); > subscribeSocket.subscribe("TestTopic".getBytes()); > > Thread subThread = new Thread() { > > @Override > public void run() { > while (true) { > String value = null; > > // This would result in trouble > { > ByteBuffer buffer = ByteBuffer.allocateDirect(100); > if (subscribeSocket.recvZeroCopy(buffer, > buffer.remaining(), ZMQ.DONTWAIT) > 0) { > > buffer.flip(); > > value = buffer.asCharBuffer().toString(); > > System.out.println(buffer.asCharBuffer().toString()); > } > } > > // This works perfectly > /* > { > byte[] bytes = subscribeSocket.recv(ZMQ.DONTWAIT); > if (bytes == null || bytes.length == 0) { > continue; > } > > value = new String(bytes); > } > */ > > if (value != null && value.length() > 0) { > counter.incrementAndGet(); > System.out.println(value); > break; > } > } > } > }; > subThread.start(); > > // create a simple publisher - wait 3 sec to make sure its ready > ZMQ.Socket publishSocket = context.socket(ZMQ.PUB); > publishSocket.bind("tcp://*:31216"); > > try { > Thread.sleep(3000); > } catch (InterruptedException e) { > e.printStackTrace(); > fail(); > } > > // publish a sample message > try { > publishSocket.send("TestTopic".getBytes(), ZMQ.SNDMORE); > publishSocket.send("This is test string".getBytes(), 0); > subThread.join(100); > } catch (InterruptedException e) { > e.printStackTrace(); > fail(); > } > > assertTrue(counter.get() > 0); > System.out.println(counter.get()); > } > > } > > > _______________________________________________ > zeromq-dev mailing list > zeromq-dev@lists.zeromq.org > http://lists.zeromq.org/mailman/listinfo/zeromq-dev > >
_______________________________________________ zeromq-dev mailing list zeromq-dev@lists.zeromq.org http://lists.zeromq.org/mailman/listinfo/zeromq-dev