Re: Timeout errors from Akka in Spark 1.2.1

2015-04-16 Thread N B
Hi Guillaume,

Interesting that you brought up Shuffle. In fact we are experiencing this
issue of shuffle files being left behind and not being cleaned up. Since
this is a Spark streaming application, it is expected to stay up
indefinitely, so shuffle files being left is a big problem right now. Since
our max window size is 6 hours, we have set a cron job to clean up shuffle
files older than 12 hours otherwise it will eat up all our disk space.

Please see the following. It seems the non-cleaning of shuffle files is
being documented in 1.3.1.

https://github.com/apache/spark/pull/5074/files
https://issues.apache.org/jira/browse/SPARK-5836


Also, for some reason, the following issues had been opened but were closed
as Duplicates of the above Documentation bug.

https://issues.apache.org/jira/browse/SPARK-3563
https://issues.apache.org/jira/browse/SPARK-4796
https://issues.apache.org/jira/browse/SPARK-6011

Any further insight into how to handle shuffle files will be greatly
appreciated.

Thanks
NB




On Fri, Apr 10, 2015 at 12:33 AM, Guillaume Pitel 
guillaume.pi...@exensa.com wrote:

  Hi,

 From my experience, the main causes of timeout are related to file
 cleanup, for instance after a shuffle. If your parallelism is very high and
 you didn't set the consolidate option, there are many files created by
 Spark, and when they are cleaned up, the calls starts timeouting. So you
 may find your solution by monitoring the folder where spark store its
 shuffles.

 Guillaume

 Thanks TD. I believe that might have been the issue. Will try for a few
 days after passing in the GC option on the java command line when we start
 the process.

  Thanks for your timely help.
 NB

 On Wed, Apr 8, 2015 at 6:08 PM, Tathagata Das t...@databricks.com wrote:

 Yes, in local mode they the driver and executor will be same the process.
 And in that case the Java options in  SparkConf configuration will not
 work.

 On Wed, Apr 8, 2015 at 1:44 PM, N B nb.nos...@gmail.com wrote:

 Since we are running in local mode, won't all the executors be in the
 same JVM as the driver?

  Thanks
  NB

 On Wed, Apr 8, 2015 at 1:29 PM, Tathagata Das t...@databricks.com
 wrote:

 Its does take effect on the executors, not on the driver. Which is okay
 because executors have all the data and therefore have GC issues, not so
 usually for the driver. If you want to double-sure, print the JVM flag
 (e.g. http://stackoverflow.com/questions/10486375/print-all-jvm-flags)

 However, the GC i was referring to that initiates the RDD and shuffle
 cleanup was the GC on the driver. Thought I would clarify.

  TD

 On Wed, Apr 8, 2015 at 1:23 PM, N B nb.nos...@gmail.com wrote:

 Hi TD,

 Thanks for the response. Since you mentioned GC, this got me thinking.

  Given that we are running in local mode (all in a single JVM) for
 now, does the option spark.executor.extraJavaOptions set to
 -XX:+UseConcMarkSweepGC inside SparkConf object take effect at all 
 before
 we use it to create the StreamingContext? I ask because that is what we 
 are
 doing right now. If not, perhaps we have not been running with the
 Concurrent Mark Sweep at all and is that recommended instead of forcing GC
 periodically?

  Thanks
  NB


 On Wed, Apr 8, 2015 at 10:20 AM, Tathagata Das t...@databricks.com
 wrote:

 There are a couple of options. Increase timeout (see Spark
 configuration).

  Also see past mails in the mailing list.

  Another option you may try (I have gut feeling that may work, but I
 am not sure) is calling GC on the driver periodically. The cleaning up of
 stuff is tied to GCing of RDD objects and regular cleaning may help keep
 things clean more rigorously rather than in unpredictable bursts of GC
 activity.

  Let us know how it works out.

  TD

 On Tue, Apr 7, 2015 at 6:00 PM, Nikunj Bansal nb.nos...@gmail.com
 wrote:

 I have a standalone and local Spark streaming process where we are
 reading inputs using FlumeUtils. Our longest window size is 6 hours. 
 After
 about a day and a half of running without any issues, we start seeing
 Timeout errors while cleaning up input blocks. This seems to cause 
 reading
 from Flume to cease.


  ERROR sparkDriver-akka.actor.default-dispatcher-78
 BlockManagerSlaveActor.logError - Error in removing block
 input-0-1428182594000
 org.apache.spark.SparkException: Error sending message [message =
 UpdateBlockInfo(BlockManagerId(driver, localhost,
 55067),input-0-1428182594000,StorageLevel(false, false, false, false,
 1),0,0,0)]
  at
 org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:201)
  at
 org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221)
  at
 org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62)
  at org.apache.spark.storage.BlockManager.org
 $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:385)
  at
 org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:361)
  at
 

Re: Timeout errors from Akka in Spark 1.2.1

2015-04-08 Thread Tathagata Das
There are a couple of options. Increase timeout (see Spark configuration).

Also see past mails in the mailing list.

Another option you may try (I have gut feeling that may work, but I am not
sure) is calling GC on the driver periodically. The cleaning up of stuff is
tied to GCing of RDD objects and regular cleaning may help keep things
clean more rigorously rather than in unpredictable bursts of GC activity.

Let us know how it works out.

TD

On Tue, Apr 7, 2015 at 6:00 PM, Nikunj Bansal nb.nos...@gmail.com wrote:

 I have a standalone and local Spark streaming process where we are reading
 inputs using FlumeUtils. Our longest window size is 6 hours. After about a
 day and a half of running without any issues, we start seeing Timeout
 errors while cleaning up input blocks. This seems to cause reading from
 Flume to cease.


 ERROR sparkDriver-akka.actor.default-dispatcher-78
 BlockManagerSlaveActor.logError - Error in removing block
 input-0-1428182594000
 org.apache.spark.SparkException: Error sending message [message =
 UpdateBlockInfo(BlockManagerId(driver, localhost,
 55067),input-0-1428182594000,StorageLevel(false, false, false, false,
 1),0,0,0)]
 at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:201)
 at
 org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221)
 at
 org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62)
 at org.apache.spark.storage.BlockManager.org
 $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:385)
 at
 org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:361)
 at
 org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1105)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply$mcZ$sp(BlockManagerSlaveActor.scala:44)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$1.apply(BlockManagerSlaveActor.scala:76)
 at
 scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
 at
 scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
 at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: java.util.concurrent.TimeoutException: Futures timed out after
 [30 seconds]
 at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at
 akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
 at
 scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
 at
 akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
 at scala.concurrent.Await$.result(package.scala:107)
 at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
 ... 17 more

 There was a similar query posted here
 http://apache-spark-user-list.1001560.n3.nabble.com/Block-removal-causes-Akka-timeouts-td15632.html
 but did not find any resolution to that issue.


 Thanks in advance,
 NB




Re: Timeout errors from Akka in Spark 1.2.1

2015-04-08 Thread N B
Since we are running in local mode, won't all the executors be in the same
JVM as the driver?

Thanks
NB

On Wed, Apr 8, 2015 at 1:29 PM, Tathagata Das t...@databricks.com wrote:

 Its does take effect on the executors, not on the driver. Which is okay
 because executors have all the data and therefore have GC issues, not so
 usually for the driver. If you want to double-sure, print the JVM flag
 (e.g. http://stackoverflow.com/questions/10486375/print-all-jvm-flags)

 However, the GC i was referring to that initiates the RDD and shuffle
 cleanup was the GC on the driver. Thought I would clarify.

 TD

 On Wed, Apr 8, 2015 at 1:23 PM, N B nb.nos...@gmail.com wrote:

 Hi TD,

 Thanks for the response. Since you mentioned GC, this got me thinking.

 Given that we are running in local mode (all in a single JVM) for now,
 does the option spark.executor.extraJavaOptions set to
 -XX:+UseConcMarkSweepGC inside SparkConf object take effect at all before
 we use it to create the StreamingContext? I ask because that is what we are
 doing right now. If not, perhaps we have not been running with the
 Concurrent Mark Sweep at all and is that recommended instead of forcing GC
 periodically?

 Thanks
 NB


 On Wed, Apr 8, 2015 at 10:20 AM, Tathagata Das t...@databricks.com
 wrote:

 There are a couple of options. Increase timeout (see Spark
 configuration).

 Also see past mails in the mailing list.

 Another option you may try (I have gut feeling that may work, but I am
 not sure) is calling GC on the driver periodically. The cleaning up of
 stuff is tied to GCing of RDD objects and regular cleaning may help keep
 things clean more rigorously rather than in unpredictable bursts of GC
 activity.

 Let us know how it works out.

 TD

 On Tue, Apr 7, 2015 at 6:00 PM, Nikunj Bansal nb.nos...@gmail.com
 wrote:

 I have a standalone and local Spark streaming process where we are
 reading inputs using FlumeUtils. Our longest window size is 6 hours. After
 about a day and a half of running without any issues, we start seeing
 Timeout errors while cleaning up input blocks. This seems to cause reading
 from Flume to cease.


 ERROR sparkDriver-akka.actor.default-dispatcher-78
 BlockManagerSlaveActor.logError - Error in removing block
 input-0-1428182594000
 org.apache.spark.SparkException: Error sending message [message =
 UpdateBlockInfo(BlockManagerId(driver, localhost,
 55067),input-0-1428182594000,StorageLevel(false, false, false, false,
 1),0,0,0)]
 at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:201)
 at
 org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221)
 at
 org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62)
 at org.apache.spark.storage.BlockManager.org
 $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:385)
 at
 org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:361)
 at
 org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1105)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply$mcZ$sp(BlockManagerSlaveActor.scala:44)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$1.apply(BlockManagerSlaveActor.scala:76)
 at
 scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
 at
 scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
 at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: java.util.concurrent.TimeoutException: Futures timed out
 after [30 seconds]
 at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at
 akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
 at
 scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
 at
 akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
 at scala.concurrent.Await$.result(package.scala:107)
 at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
 ... 17 more

 There was a 

Re: Timeout errors from Akka in Spark 1.2.1

2015-04-08 Thread N B
Thanks TD. I believe that might have been the issue. Will try for a few
days after passing in the GC option on the java command line when we start
the process.

Thanks for your timely help.
NB

On Wed, Apr 8, 2015 at 6:08 PM, Tathagata Das t...@databricks.com wrote:

 Yes, in local mode they the driver and executor will be same the process.
 And in that case the Java options in  SparkConf configuration will not
 work.

 On Wed, Apr 8, 2015 at 1:44 PM, N B nb.nos...@gmail.com wrote:

 Since we are running in local mode, won't all the executors be in the
 same JVM as the driver?

 Thanks
 NB

 On Wed, Apr 8, 2015 at 1:29 PM, Tathagata Das t...@databricks.com
 wrote:

 Its does take effect on the executors, not on the driver. Which is okay
 because executors have all the data and therefore have GC issues, not so
 usually for the driver. If you want to double-sure, print the JVM flag
 (e.g. http://stackoverflow.com/questions/10486375/print-all-jvm-flags)

 However, the GC i was referring to that initiates the RDD and shuffle
 cleanup was the GC on the driver. Thought I would clarify.

 TD

 On Wed, Apr 8, 2015 at 1:23 PM, N B nb.nos...@gmail.com wrote:

 Hi TD,

 Thanks for the response. Since you mentioned GC, this got me thinking.

 Given that we are running in local mode (all in a single JVM) for now,
 does the option spark.executor.extraJavaOptions set to
 -XX:+UseConcMarkSweepGC inside SparkConf object take effect at all before
 we use it to create the StreamingContext? I ask because that is what we are
 doing right now. If not, perhaps we have not been running with the
 Concurrent Mark Sweep at all and is that recommended instead of forcing GC
 periodically?

 Thanks
 NB


 On Wed, Apr 8, 2015 at 10:20 AM, Tathagata Das t...@databricks.com
 wrote:

 There are a couple of options. Increase timeout (see Spark
 configuration).

 Also see past mails in the mailing list.

 Another option you may try (I have gut feeling that may work, but I am
 not sure) is calling GC on the driver periodically. The cleaning up of
 stuff is tied to GCing of RDD objects and regular cleaning may help keep
 things clean more rigorously rather than in unpredictable bursts of GC
 activity.

 Let us know how it works out.

 TD

 On Tue, Apr 7, 2015 at 6:00 PM, Nikunj Bansal nb.nos...@gmail.com
 wrote:

 I have a standalone and local Spark streaming process where we are
 reading inputs using FlumeUtils. Our longest window size is 6 hours. 
 After
 about a day and a half of running without any issues, we start seeing
 Timeout errors while cleaning up input blocks. This seems to cause 
 reading
 from Flume to cease.


 ERROR sparkDriver-akka.actor.default-dispatcher-78
 BlockManagerSlaveActor.logError - Error in removing block
 input-0-1428182594000
 org.apache.spark.SparkException: Error sending message [message =
 UpdateBlockInfo(BlockManagerId(driver, localhost,
 55067),input-0-1428182594000,StorageLevel(false, false, false, false,
 1),0,0,0)]
 at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:201)
 at
 org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221)
 at
 org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62)
 at org.apache.spark.storage.BlockManager.org
 $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:385)
 at
 org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:361)
 at
 org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1105)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply$mcZ$sp(BlockManagerSlaveActor.scala:44)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$1.apply(BlockManagerSlaveActor.scala:76)
 at
 scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
 at
 scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
 at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 at
 scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: java.util.concurrent.TimeoutException: Futures timed out
 after [30 seconds]
 at
 scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at 

Re: Timeout errors from Akka in Spark 1.2.1

2015-04-08 Thread Tathagata Das
Yes, in local mode they the driver and executor will be same the process.
And in that case the Java options in  SparkConf configuration will not
work.

On Wed, Apr 8, 2015 at 1:44 PM, N B nb.nos...@gmail.com wrote:

 Since we are running in local mode, won't all the executors be in the same
 JVM as the driver?

 Thanks
 NB

 On Wed, Apr 8, 2015 at 1:29 PM, Tathagata Das t...@databricks.com wrote:

 Its does take effect on the executors, not on the driver. Which is okay
 because executors have all the data and therefore have GC issues, not so
 usually for the driver. If you want to double-sure, print the JVM flag
 (e.g. http://stackoverflow.com/questions/10486375/print-all-jvm-flags)

 However, the GC i was referring to that initiates the RDD and shuffle
 cleanup was the GC on the driver. Thought I would clarify.

 TD

 On Wed, Apr 8, 2015 at 1:23 PM, N B nb.nos...@gmail.com wrote:

 Hi TD,

 Thanks for the response. Since you mentioned GC, this got me thinking.

 Given that we are running in local mode (all in a single JVM) for now,
 does the option spark.executor.extraJavaOptions set to
 -XX:+UseConcMarkSweepGC inside SparkConf object take effect at all before
 we use it to create the StreamingContext? I ask because that is what we are
 doing right now. If not, perhaps we have not been running with the
 Concurrent Mark Sweep at all and is that recommended instead of forcing GC
 periodically?

 Thanks
 NB


 On Wed, Apr 8, 2015 at 10:20 AM, Tathagata Das t...@databricks.com
 wrote:

 There are a couple of options. Increase timeout (see Spark
 configuration).

 Also see past mails in the mailing list.

 Another option you may try (I have gut feeling that may work, but I am
 not sure) is calling GC on the driver periodically. The cleaning up of
 stuff is tied to GCing of RDD objects and regular cleaning may help keep
 things clean more rigorously rather than in unpredictable bursts of GC
 activity.

 Let us know how it works out.

 TD

 On Tue, Apr 7, 2015 at 6:00 PM, Nikunj Bansal nb.nos...@gmail.com
 wrote:

 I have a standalone and local Spark streaming process where we are
 reading inputs using FlumeUtils. Our longest window size is 6 hours. After
 about a day and a half of running without any issues, we start seeing
 Timeout errors while cleaning up input blocks. This seems to cause reading
 from Flume to cease.


 ERROR sparkDriver-akka.actor.default-dispatcher-78
 BlockManagerSlaveActor.logError - Error in removing block
 input-0-1428182594000
 org.apache.spark.SparkException: Error sending message [message =
 UpdateBlockInfo(BlockManagerId(driver, localhost,
 55067),input-0-1428182594000,StorageLevel(false, false, false, false,
 1),0,0,0)]
 at org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:201)
 at
 org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221)
 at
 org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62)
 at org.apache.spark.storage.BlockManager.org
 $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:385)
 at
 org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:361)
 at
 org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1105)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply$mcZ$sp(BlockManagerSlaveActor.scala:44)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43)
 at
 org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$1.apply(BlockManagerSlaveActor.scala:76)
 at
 scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
 at
 scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
 at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
 at
 akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
 at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
 at
 scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
 at
 scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
 at
 scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
 Caused by: java.util.concurrent.TimeoutException: Futures timed out
 after [30 seconds]
 at
 scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
 at
 scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
 at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
 at
 akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
 at
 scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
 at