Hi Sparkers. I am very new to Spark, and I am occasionally getting RpCTimeoutException with the following error.
15/11/01 22:19:46 WARN HeartbeatReceiver: Removing executor 0 with no > recent heartbeats: 321792 ms exceeds timeout 300000 ms > 15/11/01 22:19:46 ERROR TaskSchedulerImpl: Lost executor 0 on 172.31.11.1: > Executor heartbeat timed out after 321792 ms > 15/11/01 22:19:46 WARN TaskSetManager: Lost task 0.0 in stage 755.0 (TID > 755, 172.31.11.1): ExecutorLostFailure (executor 0 lost) > 15/11/01 22:20:18 ERROR ContextCleaner: Error cleaning RDD 1775 > org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [300 > seconds]. This timeout is controlled by spark.network.timeout > ... > ... > ... > 15/11/01 22:20:18 WARN BlockManagerMaster: Failed to remove RDD 1775 - Ask > timed out on [Actor[akka.tcp:// > sparkExecutor@172.31.11.1:34987/user/BlockManagerEndpoint1#-787212020]] > after [300000 ms]. This timeout is controlled by spark.network.timeout > org.apache.spark.rpc.RpcTimeoutException: Ask timed out on > [Actor[akka.tcp:// > sparkExecutor@172.31.11.1:34987/user/BlockManagerEndpoint1#-787212020]] > after [300000 ms]. This timeout is controlled by spark.network.timeout And the following is my code: val sessionsRDD = sessions.mapPartitions { valueIterator => > val conf = new SparkConf() > .set("com.couchbase.nodes", > confBd.value.get("com.couchbase.nodes").get) > .set("com.couchbase.bucket.default", > confBd.value.get("com.couchbase.bucket.default").get) > val cbConf = CouchbaseConfig(conf) > val bucket = CouchbaseConnection().bucket(cbConf, > "default").async() > if (valueIterator.isEmpty) { > Iterator[JsonDocument]() > } else LazyIterator { > Observable > .from(OnceIterable(valueIterator).toSeq) > .flatMap(id => { > > Observable.defer[JsonDocument](toScalaObservable(bucket.get(id, > classOf[JsonDocument]))) > > .retryWhen(RetryBuilder.anyOf(classOf[BackpressureException]) > .max(5) > .delay(Delay.exponential(TimeUnit.MILLISECONDS, 500, > 1)).build()) > }) > .toBlocking > .toIterable > .iterator > } > } > sessionsRDD.cache() > val sessionInfo = sessionsRDD > .map(doc => { > (0, 0, 0) > }) > .count() > println(sessionInfo) > sessionsRDD.unpersist() And this part gives me the error: val sessionInfo = sessionsRDD > .map(doc => { > (0, 0, 0) > }) > .count() > println(sessionInfo) I tried increasing timeout, but it does not quite helping me. How could I solve such issue? Any hint would be helpful. Jake