A 0MQ context is thread safe but individual sockets aren't by default, http://api.zeromq.org/2-1:zmq#toc4

On vie 25 jul 2014 12:47:22 AST, Tommaso Paba wrote:
Hi,
I'm using Akka with ZMQ integration.
I have noticed that, if an actor is restarted by the supervisor
because it throws an exception, ZeroMQExtension cannot restart and
throws a fatal exception which, in turn, triggers the VM termination
if you have the jvm-exit-on-fatal-error options set, even if the
exception originally thrown was not fatal at all.
I did a couple of tests and I think that this is due to the fact that
the actor that manages the ZMQ socket is not yet terminated when the
crashed actor is restarted, and so it still hasn't released the
resources (i.e. closed the ZMQ socket) when the original actor is
restarted (I'll tell you why I came to this conclusion later).
Here's an example that can be used to trigger the problem and to check
my assumptions:

case object ClockTick

object ZmqTest extends App {

  val system = ActorSystem("system")
  val actor = system.actorOf(Props(new ZmqTestActor), name = "actor")
}

class ZmqTestActor extends Actor {

  import context.dispatcher
  val socket =
ZeroMQExtension(context.system).newSocket(SocketType.Router,
Bind("tcp://127.0.0.1:9666"), Listener(self))

  override def preStart() = {
    println("preStart called")
    context.system.scheduler.schedule(FiniteDuration(1000,
TimeUnit.MILLISECONDS), FiniteDuration(1000, TimeUnit.MILLISECONDS),
self, ClockTick)
  }

  override def postStop() = {
    println("postStop called")
    // Uncomment the following and it won't crash!
    // socket ! PoisonPill
    // Thread.sleep(1000)
  }

  override def receive = {
    case ClockTick =>
      println("Generating exception...")
      val arr = Array[Int](1, 2, 3)
      arr(99)
  }
}

As you see, the program is very simple: it creates a ZMQ socket and
then generates an ArrayIndexOutOfBounds exception.
The supervisor terminates and restarts the actor, but upon restart the
following exception is thrown:

[ERROR] [07/25/2014 18:16:58.266]
[system-akka.actor.default-dispatcher-6] [ActorSystem(system)]
Uncaught error from thread [system-akka.actor.default-dispatcher-6]
shutting down JVM since 'akka.jvm-exit-on-fatal-error' is enabled
java.lang.NoClassDefFoundError: org/zeromq/ZeroMQ$
    at
akka.zeromq.ZeroMQExtension$$anonfun$1$$anon$1.akka$zeromq$ZeroMQExtension$$anonfun$$anon$$nonfatal(ZeroMQExtension.scala:253)
    at
akka.zeromq.ZeroMQExtension$$anonfun$1$$anon$1$$anonfun$supervisorStrategy$1.applyOrElse(ZeroMQExtension.scala:248)
    at
akka.zeromq.ZeroMQExtension$$anonfun$1$$anon$1$$anonfun$supervisorStrategy$1.applyOrElse(ZeroMQExtension.scala:247)
    at
akka.actor.SupervisorStrategy.handleFailure(FaultHandling.scala:295)
(etc etc)

Well, of course ZMQ is in the classpath and so are the binaries... if
it weren't so, it would not have worked even the first time I launched it.
The point is that I noticed that you get the same exception if you
start two instances of a program that binds a ZMQ socket on the same
IP and port, I thought that the problem could simply due to the fact
that ZMQ cannot bind to the given IP/port a second time because it's busy.
So I added in the postStop the following lines:

    socket ! PoisonPill
    Thread.sleep(1000)

This kills the ZMQ socket actor and waits a little bit to allow it to
clean the resources it's using.
Guess what? In this case, the supervisor is able to restart the actor
correctly.

Now my questions are:
1) is my hypothesis correct? Could this be the reason (port still
busy) why we see that exception and ZMQ is not able to restart?
2) is this happening because I'm not using ZMQ integration correctly?
BTW, keep in mind that I'm not using JNA (we tested it, and found it
was really too slow, so we're using the binaries).
3) about the solution I found, is there a better way to handle this issue?
I could do something like:
    while (!socket.isTerminated)
      Thread.sleep(10)
and it works (I tried), but isTerminated is deprecated. I know that,
to know if the actor is terminated, I should watch it and handle the
Terminated message, but this is a thing that I cannot do here since
the actor that owns the socket is being terminated by the supervisor.
So it seems to me that there's a sort of design flaw that doesn't
allow to handle ZMQ socket termination in a clean way.
4) shouldn't ZeroMQExtension show a better message when it tries to
bind on a port that is already used instead of throwing a
NoClassDefFoundException?

Thanks to anyone who'll be willing to help.
Tommaso.

--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ:
http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google
Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to akka-user+unsubscr...@googlegroups.com
<mailto:akka-user+unsubscr...@googlegroups.com>.
To post to this group, send email to akka-user@googlegroups.com
<mailto:akka-user@googlegroups.com>.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

--
     Read the docs: http://akka.io/docs/
     Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html
     Search the archives: https://groups.google.com/group/akka-user
--- You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to