Solved the issue by setting up the same heartbeat interval and pauses in both actor systems
akka { loggers = ["akka.event.slf4j.Slf4jLogger"] loglevel = DEBUG logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" log-dead-letters = on log-dead-letters-during-shutdown = on daemonic = on jvm-exit-on-fatal-error = off actor { provider = "akka.remote.RemoteActorRefProvider" default-dispatcher.throughput = 15 } remote { enabled-transports = ["akka.remote.netty.tcp"] log-remote-lifecycle-events = on require-cookie = off secure-cookie = off netty.tcp { hostname = "spark-engine" port = 9083 tcp-nodelay = on transport-class = "akka.remote.transport.netty.NettyTransport" connection-timeout = 120 s execution-pool-size = 4 } transport-failure-detector { heartbeat-interval = 4 s acceptable-heartbeat-pause = 16 s } } } .set("spark.akka.heartbeat.interval", "4s") .set("spark.akka.heartbeat.pauses", "16s") On Tue, Mar 15, 2016 at 9:50 PM, David Gomez Saavedra <mikr...@gmail.com> wrote: > hi there, > > I'm trying to set up a simple spark streaming app using akka actors as > receivers. I followed the example provided and created two apps. One > creating an actor system and another one subscribing to it. I can see the > subscription message but few seconds later i get an error > > [info] 20:37:40.296 [INFO ] Slf4jLogger started > [info] 20:37:40.466 [INFO ] Starting remoting > [info] 20:37:40.871 [INFO ] Remoting started; listening on addresses > :[akka.tcp://spark-engine@spark-engine:9083] > [info] 20:37:40.876 [INFO ] Remoting now listens on addresses: > [akka.tcp://spark-engine@spark-engine:9083] > [info] 20:37:40.913 [INFO ] starting actor on > akka://spark-engine/user/integrationActor > [info] received subscribe from Actor[akka.tcp:// > sparkExecutorActorSystem@172.18.0.2:6006/user/Supervisor0/TagsReceiver#536081036 > ] > [info] 20:38:34.125 [INFO ] No response from remote. Handshake timed out > or transport failure detector triggered. > [info] 20:38:34.226 [WARN ] Association with remote system [akka.tcp:// > sparkExecutorActorSystem@172.18.0.2:6006] has failed, address is now > gated for [5000] ms. Reason: [Disassociated] > [info] received unsubscribe from Actor[akka.tcp:// > sparkExecutorActorSystem@172.18.0.2:6006/user/Supervisor0/TagsReceiver#536081036 > ] > > I'm running the master and worker on docker. The two apps are running on > my laptop for testing. Here's the code of both > > def main(args: Array[String]) { > val conf = new SparkConf() > .setMaster(sparkMaster) > .setAppName(sparkApp) > .set("spark.logConf", "true") > .set("spark.driver.port","7001") > .set("spark.fileserver.port","6002") > .set("spark.broadcast.port","6003") > .set("spark.replClassServer.port","6004") > .set("spark.blockManager.port","6005") > .set("spark.executor.port","6006") > .set("spark.akka.heartbeat.interval", "100") > .set("spark.akka.logLifecycleEvents", "true") > .set("spark.rpc.netty.dispatcher.numThreads","2") > .setJars(sparkJars) > > > val ssc = new StreamingContext(conf, Seconds(5)) > > ssc.checkpoint("/tmp") > > val tags = ssc.actorStream [Tuple2[UUID, Tuple4[Set[String], Int, Int, > Int]]] (Props(new > GifteeTagStreamingActor("akka.tcp://spark-engine@spark-engine:9083/user/integrationActor")), > "TagsReceiver") > > tags.print() > > ssc.start() > ssc.awaitTermination() > > } > > > > def main(args: Array[String]) { > > val config = ConfigFactory.load() > val system = ActorSystem("spark-engine", config.getConfig("spark-engine")) > > val integrationActor = system.actorOf(Props(new IntegrationActor()), > "integrationActor") > > log.info("starting actor on " + integrationActor.path) > > system.awaitTermination() > > } > > > This is my config for the remote actor system to where spark subscribes > > spark-engine { > > akka { > loggers = ["akka.event.slf4j.Slf4jLogger"] > loglevel = DEBUG > logging-filter = "akka.event.slf4j.Slf4jLoggingFilter" > log-dead-letters = 10 > log-dead-letters-during-shutdown = on > > actor { > provider = "akka.remote.RemoteActorRefProvider" > } > remote { > enabled-transports = ["akka.remote.netty.tcp"] > log-remote-lifecycle-events = on > netty.tcp { > hostname = "spark-engine" > port = 9083 > } > } > } > } > > These are the logs from the executor > > 16/03/15 20:47:36 WARN NativeCodeLoader: Unable to load native-hadoop library > for your platform... using builtin-java classes where applicable > 16/03/15 20:48:12 WARN ReliableDeliverySupervisor: Association with remote > system [akka.tcp://spark-engine@spark-engine:9083] has failed, address is now > gated for [5000] ms. Reason: [Disassociated] > > > > Any idea why the two actor systems get disassociated ? > > Thank you very much in advanced. > > Best > David >