The issue is related to this


seem to fix the problem

On Tue, Mar 15, 2016 at 6:45 AM, David Gomez Saavedra <>

> I have updated the config since I realized the actor system was listening
> on driver port + 1. So changed the ports in my program + the docker images
> val conf = new SparkConf()
>   .setMaster(sparkMaster)
>   //.setMaster("local[2]")
>   .setAppName(sparkApp)
>   .set("",
>   .set("spark.logConf", "true")
>   .set("spark.driver.port","7001")
>   .set("","")
>   .set("spark.fileserver.port","6002")
>   .set("spark.broadcast.port","6003")
>   .set("spark.replClassServer.port","6004")
>   .set("spark.blockManager.port","6005")
>   .set("spark.executor.port","6006")
> .set("spark.broadcast.factory","org.apache.spark.broadcast.HttpBroadcastFactory")
>   .setJars(sparkJars)
> Netstat of my stream app
> tcp6       0      0 :::6002                 :::*                    LISTEN
>      9314/java
> tcp6       0      0 :::6003                 :::*                    LISTEN
>      9314/java
> tcp6       0      0 :::6005                 :::*                    LISTEN
>      9314/java
> tcp6       0      0      :::*
>  LISTEN      9314/java
> tcp6       0      0      :::*
>  LISTEN      9314/java
> tcp6       0      0 :::4040                 :::*                    LISTEN
>      9314/java
> netstat of the master running on docker
> Proto Recv-Q Send-Q Local Address           Foreign Address         State
>       PID/Program name
> tcp6       0      0         :::*
>  LISTEN      -
> tcp6       0      0 :::8080                 :::*                    LISTEN
>      -
> tcp6       0      0         :::*
>  LISTEN      -
> netstat of worker running on docker
> Proto Recv-Q Send-Q Local Address           Foreign Address         State
>       PID/Program name
> tcp6       0      0 :::8081                 :::*                    LISTEN
>      -
> tcp6       0      0 :::6005                 :::*                    LISTEN
>      -
> tcp6       0      0         :::*
>  LISTEN      -
> tcp6       0      0         :::*
>  LISTEN      -
> so far still no success
> On Mon, Mar 14, 2016 at 11:14 PM, Shixiong(Ryan) Zhu <
>> wrote:
>> Could you use netstat to show the ports that the driver is listening?
>> On Mon, Mar 14, 2016 at 1:45 PM, David Gomez Saavedra <>
>> wrote:
>>> hi everyone,
>>> I'm trying to set up spark streaming using akka with a similar example
>>> of the word count provided. When using spark master in local mode
>>> everything works but when I try to run it the driver and executors using
>>> docker I get the following exception
>>> 16/03/14 20:32:03 WARN NettyRpcEndpointRef: Error sending message [message 
>>> = Heartbeat(0,[Lscala.Tuple2;@5ad3f40c,BlockManagerId(0,, 
>>> 7005))] in 1 attempts
>>> org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 10 
>>> seconds. This timeout is controlled by spark.executor.heartbeatInterval
>>>     at 
>>>     at 
>>> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
>>>     at 
>>> org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
>>>     at 
>>> scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36)
>>>     at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:216)
>>>     at scala.util.Try$.apply(Try.scala:192)
>>>     at scala.util.Failure.recover(Try.scala:216)
>>>     at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
>>>     at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324)
>>>     at
>>>     at 
>>> org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(
>>>     at 
>>> scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:136)
>>>     at 
>>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>>>     at 
>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
>>>     at scala.concurrent.Promise$class.complete(Promise.scala:55)
>>>     at 
>>> scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153)
>>>     at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
>>>     at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235)
>>>     at
>>>     at 
>>> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.processBatch$1(BatchingExecutor.scala:63)
>>>     at 
>>> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply$mcV$sp(BatchingExecutor.scala:78)
>>>     at 
>>> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
>>>     at 
>>> scala.concurrent.BatchingExecutor$Batch$$anonfun$run$1.apply(BatchingExecutor.scala:55)
>>>     at 
>>> scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72)
>>>     at 
>>> scala.concurrent.BatchingExecutor$
>>>     at 
>>> scala.concurrent.Future$InternalCallbackExecutor$.unbatchedExecute(Future.scala:599)
>>>     at 
>>> scala.concurrent.BatchingExecutor$class.execute(BatchingExecutor.scala:106)
>>>     at 
>>> scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:597)
>>>     at 
>>> scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40)
>>>     at 
>>> scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248)
>>>     at scala.concurrent.Promise$class.tryFailure(Promise.scala:112)
>>>     at 
>>> scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153)
>>>     at 
>>> org.apache.spark.rpc.netty.NettyRpcEnv$$anon$
>>>     at 
>>> java.util.concurrent.Executors$
>>>     at
>>>     at 
>>> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(
>>>     at 
>>> java.util.concurrent.ScheduledThreadPoolExecutor$
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor.runWorker(
>>>     at 
>>> java.util.concurrent.ThreadPoolExecutor$
>>>     at
>>> Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply 
>>> in 10 seconds
>>>     at 
>>> org.apache.spark.rpc.netty.NettyRpcEnv$$anon$
>>>     ... 7 more
>>> Here is the config of the spark streaming app
>>> val conf = new SparkConf()
>>>   .setMaster(sparkMaster)
>>>   .setAppName(sparkApp)
>>>   .set("",
>>>   .set("spark.logConf", "true")
>>>   .set("spark.fileserver.port","7002")
>>>   .set("spark.broadcast.port","7003")
>>>   .set("spark.replClassServer.port","7004")
>>>   .set("spark.blockManager.port","7005")
>>>   .set("spark.executor.port","7006")
>>> .set("spark.broadcast.factory","org.apache.spark.broadcast.HttpBroadcastFactory")
>>>   .setJars(sparkJars)
>>> val sc = new SparkContext(conf)
>>> val ssc = new StreamingContext(sc, Seconds(5))
>>> val tags = ssc.actorStream[String](Props(new 
>>> GifteeTagStreamingActor("akka.tcp://spark-engine@spark-engine:9083/user/integrationActor")),
>>>  "TagsReceiver")
>>> the docker images for master and worker expose those ports.
>>> master ---> EXPOSE 8080 7077 4040 7001 7002 7003 7004 7005 7006
>>> worker ---> EXPOSE 8888 8081 4040 7001 7002 7003 7004 7005 7006
>>> I'm using those images docker images to run spark jobs without a
>>> problem. I only get errors on the streaming app.
>>> any pointers on what can be wrong?
>>> Thank you very much in advanced.
>>> David

Reply via email to