Hi Sparkers.

I am very new to Spark, and I am occasionally getting RpCTimeoutException
with the following error.

15/11/01 22:19:46 WARN HeartbeatReceiver: Removing executor 0 with no
> recent heartbeats: 321792 ms exceeds timeout 300000 ms
> 15/11/01 22:19:46 ERROR TaskSchedulerImpl: Lost executor 0 on 172.31.11.1:
> Executor heartbeat timed out after 321792 ms
> 15/11/01 22:19:46 WARN TaskSetManager: Lost task 0.0 in stage 755.0 (TID
> 755, 172.31.11.1): ExecutorLostFailure (executor 0 lost)
> 15/11/01 22:20:18 ERROR ContextCleaner: Error cleaning RDD 1775
> org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [300
> seconds]. This timeout is controlled by spark.network.timeout
> ...
> ...
> ...
> 15/11/01 22:20:18 WARN BlockManagerMaster: Failed to remove RDD 1775 - Ask
> timed out on [Actor[akka.tcp://
> sparkExecutor@172.31.11.1:34987/user/BlockManagerEndpoint1#-787212020]]
> after [300000 ms]. This timeout is controlled by spark.network.timeout
> org.apache.spark.rpc.RpcTimeoutException: Ask timed out on
> [Actor[akka.tcp://
> sparkExecutor@172.31.11.1:34987/user/BlockManagerEndpoint1#-787212020]]
> after [300000 ms]. This timeout is controlled by spark.network.timeout


And the following is my code:

            val sessionsRDD = sessions.mapPartitions { valueIterator =>
>             val conf = new SparkConf()
>               .set("com.couchbase.nodes",
> confBd.value.get("com.couchbase.nodes").get)
>               .set("com.couchbase.bucket.default",
> confBd.value.get("com.couchbase.bucket.default").get)
>             val cbConf = CouchbaseConfig(conf)
>             val bucket = CouchbaseConnection().bucket(cbConf,
> "default").async()
>             if (valueIterator.isEmpty) {
>               Iterator[JsonDocument]()
>             } else LazyIterator {
>               Observable
>                 .from(OnceIterable(valueIterator).toSeq)
>                 .flatMap(id => {
>
> Observable.defer[JsonDocument](toScalaObservable(bucket.get(id,
> classOf[JsonDocument])))
>
> .retryWhen(RetryBuilder.anyOf(classOf[BackpressureException])
>                       .max(5)
>                       .delay(Delay.exponential(TimeUnit.MILLISECONDS, 500,
> 1)).build())
>                 })
>                 .toBlocking
>                 .toIterable
>                 .iterator
>             }
>           }
>           sessionsRDD.cache()
>           val sessionInfo = sessionsRDD
>           .map(doc => {
>               (0, 0, 0)
>             })
>           .count()
>           println(sessionInfo)
>           sessionsRDD.unpersist()



And this part gives me the error:

          val sessionInfo = sessionsRDD
>           .map(doc => {
>               (0, 0, 0)
>             })
>           .count()
>           println(sessionInfo)


I tried increasing timeout, but it does not quite helping me.
How could I solve such issue?
Any hint would be helpful.

Jake

Reply via email to