Thanks for the test case. I will look at it in more detail later this
evening. Might I suggest allocating the direct bytebuffer outside the loop
and trying something like this..

...

while(true) {
    try {
      socket.recvZeroCopy(buffer, buffer.remaining(), ZMQ.DONTWAIT) ;
   } catch(ZMQException ignore) {
     Thread.sleep(10);
  }
}

JZMQ throws ZMQExceptions in the face of an error. In your case, you are
receiving EAGAIN which should be retried.

-Trev


On Tue, Sep 17, 2013 at 10:05 AM, Alex Suo <alex....@gmail.com> wrote:

> Hi there,
>
> Thank you for your time in advance. I am doing my high performance system
> based on ZeroMQ and jzmq in Java. While I am trying to use direct buffer, I
> encountered some problem. I have summarize it into the following test code
> in windows.
>
> My unit test environment is:
>
> Windows 7 64bit
> ZeroMQ 3.2.3
> jzmq master from GIT, checked out in July 2013
> JDK 1.7.0_25
>
> And the ftest code is as following. You can see in the subscriber thread
> that if I use the recv(ZMQ.DONTWAIT) way it works perfectly; but if I use
> the direct buffer, it gives me no response while running and gives me the
> following error on exit:
>
> Exception in thread "Thread-0" org.zeromq.ZMQException: Resource
> temporarily unavailable(0xb)
>     at org.zeromq.ZMQ$Socket.recvZeroCopy(Native Method)
>     at
> org.as.algo.messaging.bus.zmq.ZMQReadynessTest$1.run(ZMQReadynessTest.java:48)
>
>
> If I try to use in-direct ByteBuffer, I don't have exception on exit but
> still cannot receive anything.
>
> Please kindly assist me. Lots of thanks.
>
> Alex
>
> import static org.junit.Assert.assertTrue;
> import static org.junit.Assert.fail;
>
> import java.nio.ByteBuffer;
> import java.util.concurrent.atomic.AtomicInteger;
>
> import org.junit.Before;
> import org.junit.Test;
> import org.zeromq.ZMQ;
>
> /**
>  * Test if ZMQ is ready on this box.
>  *
>  * @author Alex Suo
>  *
>  */
> public class ZMQReadynessTest {
>
>     private ZMQ.Context context;
>
>     @Before
>     public void setUp() {
>         context = ZMQ.context(1);
>     }
>
>     @Test
>     public void testSimpleMessage() {
>         String topic = "tcp://127.0.0.1:31216";
>         final AtomicInteger counter = new AtomicInteger();
>
>
>         // create a simple subscriber
>         final ZMQ.Socket subscribeSocket = context.socket(ZMQ.SUB);
>         subscribeSocket.connect(topic);
>         subscribeSocket.subscribe("TestTopic".getBytes());
>
>         Thread subThread = new Thread() {
>
>             @Override
>             public void run() {
>                 while (true) {
>                     String value = null;
>
>                     // This would result in trouble
>                     {
>                         ByteBuffer buffer = ByteBuffer.allocateDirect(100);
>                         if (subscribeSocket.recvZeroCopy(buffer,
> buffer.remaining(), ZMQ.DONTWAIT) > 0) {
>
>                             buffer.flip();
>
>                             value = buffer.asCharBuffer().toString();
>
> System.out.println(buffer.asCharBuffer().toString());
>                         }
>                     }
>
>                     // This works perfectly
>                     /*
>                     {
>                         byte[] bytes = subscribeSocket.recv(ZMQ.DONTWAIT);
>                         if (bytes == null || bytes.length == 0) {
>                             continue;
>                         }
>
>                         value = new String(bytes);
>                     }
>                      */
>
>                     if (value != null && value.length() > 0) {
>                         counter.incrementAndGet();
>                         System.out.println(value);
>                         break;
>                     }
>                 }
>             }
>         };
>         subThread.start();
>
>         // create a simple publisher - wait 3 sec to make sure its ready
>         ZMQ.Socket publishSocket = context.socket(ZMQ.PUB);
>         publishSocket.bind("tcp://*:31216");
>
>         try {
>             Thread.sleep(3000);
>         } catch (InterruptedException e) {
>             e.printStackTrace();
>             fail();
>         }
>
>         // publish a sample message
>         try {
>             publishSocket.send("TestTopic".getBytes(), ZMQ.SNDMORE);
>             publishSocket.send("This is test string".getBytes(), 0);
>             subThread.join(100);
>         } catch (InterruptedException e) {
>             e.printStackTrace();
>             fail();
>         }
>
>         assertTrue(counter.get() > 0);
>         System.out.println(counter.get());
>     }
>
> }
>
>
> _______________________________________________
> zeromq-dev mailing list
> zeromq-dev@lists.zeromq.org
> http://lists.zeromq.org/mailman/listinfo/zeromq-dev
>
>
_______________________________________________
zeromq-dev mailing list
zeromq-dev@lists.zeromq.org
http://lists.zeromq.org/mailman/listinfo/zeromq-dev

Reply via email to