Hi Guillaume, Interesting that you brought up Shuffle. In fact we are experiencing this issue of shuffle files being left behind and not being cleaned up. Since this is a Spark streaming application, it is expected to stay up indefinitely, so shuffle files being left is a big problem right now. Since our max window size is 6 hours, we have set a cron job to clean up shuffle files older than 12 hours otherwise it will eat up all our disk space.
Please see the following. It seems the non-cleaning of shuffle files is being documented in 1.3.1. https://github.com/apache/spark/pull/5074/files https://issues.apache.org/jira/browse/SPARK-5836 Also, for some reason, the following issues had been opened but were closed as Duplicates of the above Documentation bug. https://issues.apache.org/jira/browse/SPARK-3563 https://issues.apache.org/jira/browse/SPARK-4796 https://issues.apache.org/jira/browse/SPARK-6011 Any further insight into how to handle shuffle files will be greatly appreciated. Thanks NB On Fri, Apr 10, 2015 at 12:33 AM, Guillaume Pitel < guillaume.pi...@exensa.com> wrote: > Hi, > > From my experience, the main causes of timeout are related to file > cleanup, for instance after a shuffle. If your parallelism is very high and > you didn't set the consolidate option, there are many files created by > Spark, and when they are cleaned up, the calls starts timeouting. So you > may find your solution by monitoring the folder where spark store its > shuffles. > > Guillaume > > Thanks TD. I believe that might have been the issue. Will try for a few > days after passing in the GC option on the java command line when we start > the process. > > Thanks for your timely help. > NB > > On Wed, Apr 8, 2015 at 6:08 PM, Tathagata Das <t...@databricks.com> wrote: > >> Yes, in local mode they the driver and executor will be same the process. >> And in that case the Java options in SparkConf configuration will not >> work. >> >> On Wed, Apr 8, 2015 at 1:44 PM, N B <nb.nos...@gmail.com> wrote: >> >>> Since we are running in local mode, won't all the executors be in the >>> same JVM as the driver? >>> >>> Thanks >>> NB >>> >>> On Wed, Apr 8, 2015 at 1:29 PM, Tathagata Das <t...@databricks.com> >>> wrote: >>> >>>> Its does take effect on the executors, not on the driver. Which is okay >>>> because executors have all the data and therefore have GC issues, not so >>>> usually for the driver. If you want to double-sure, print the JVM flag >>>> (e.g. http://stackoverflow.com/questions/10486375/print-all-jvm-flags) >>>> >>>> However, the GC i was referring to that initiates the RDD and shuffle >>>> cleanup was the GC on the driver. Thought I would clarify. >>>> >>>> TD >>>> >>>> On Wed, Apr 8, 2015 at 1:23 PM, N B <nb.nos...@gmail.com> wrote: >>>> >>>>> Hi TD, >>>>> >>>>> Thanks for the response. Since you mentioned GC, this got me thinking. >>>>> >>>>> Given that we are running in local mode (all in a single JVM) for >>>>> now, does the option "spark.executor.extraJavaOptions" set to >>>>> "-XX:+UseConcMarkSweepGC" inside SparkConf object take effect at all >>>>> before >>>>> we use it to create the StreamingContext? I ask because that is what we >>>>> are >>>>> doing right now. If not, perhaps we have not been running with the >>>>> Concurrent Mark Sweep at all and is that recommended instead of forcing GC >>>>> periodically? >>>>> >>>>> Thanks >>>>> NB >>>>> >>>>> >>>>> On Wed, Apr 8, 2015 at 10:20 AM, Tathagata Das <t...@databricks.com> >>>>> wrote: >>>>> >>>>>> There are a couple of options. Increase timeout (see Spark >>>>>> configuration). >>>>>> >>>>>> Also see past mails in the mailing list. >>>>>> >>>>>> Another option you may try (I have gut feeling that may work, but I >>>>>> am not sure) is calling GC on the driver periodically. The cleaning up of >>>>>> stuff is tied to GCing of RDD objects and regular cleaning may help keep >>>>>> things clean more rigorously rather than in unpredictable bursts of GC >>>>>> activity. >>>>>> >>>>>> Let us know how it works out. >>>>>> >>>>>> TD >>>>>> >>>>>> On Tue, Apr 7, 2015 at 6:00 PM, Nikunj Bansal <nb.nos...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> I have a standalone and local Spark streaming process where we are >>>>>>> reading inputs using FlumeUtils. Our longest window size is 6 hours. >>>>>>> After >>>>>>> about a day and a half of running without any issues, we start seeing >>>>>>> Timeout errors while cleaning up input blocks. This seems to cause >>>>>>> reading >>>>>>> from Flume to cease. >>>>>>> >>>>>>> >>>>>>> ERROR sparkDriver-akka.actor.default-dispatcher-78 >>>>>>> BlockManagerSlaveActor.logError - Error in removing block >>>>>>> input-0-1428182594000 >>>>>>> org.apache.spark.SparkException: Error sending message [message = >>>>>>> UpdateBlockInfo(BlockManagerId(<driver>, localhost, >>>>>>> 55067),input-0-1428182594000,StorageLevel(false, false, false, false, >>>>>>> 1),0,0,0)] >>>>>>> at >>>>>>> org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:201) >>>>>>> at >>>>>>> org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221) >>>>>>> at >>>>>>> org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62) >>>>>>> at org.apache.spark.storage.BlockManager.org >>>>>>> $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:385) >>>>>>> at >>>>>>> org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:361) >>>>>>> at >>>>>>> org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1105) >>>>>>> at >>>>>>> org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply$mcZ$sp(BlockManagerSlaveActor.scala:44) >>>>>>> at >>>>>>> org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43) >>>>>>> at >>>>>>> org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43) >>>>>>> at >>>>>>> org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$1.apply(BlockManagerSlaveActor.scala:76) >>>>>>> at >>>>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24) >>>>>>> at >>>>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24) >>>>>>> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41) >>>>>>> at >>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393) >>>>>>> at >>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>>>>> at >>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>>>>> at >>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>>>>> at >>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>>>>> Caused by: java.util.concurrent.TimeoutException: Futures timed out >>>>>>> after [30 seconds] >>>>>>> at >>>>>>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) >>>>>>> at >>>>>>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) >>>>>>> at >>>>>>> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) >>>>>>> at >>>>>>> akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169) >>>>>>> at >>>>>>> scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640) >>>>>>> at >>>>>>> akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167) >>>>>>> at scala.concurrent.Await$.result(package.scala:107) >>>>>>> at >>>>>>> org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187) >>>>>>> ... 17 more >>>>>>> >>>>>>> There was a similar query posted here >>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Block-removal-causes-Akka-timeouts-td15632.html >>>>>>> but did not find any resolution to that issue. >>>>>>> >>>>>>> >>>>>>> Thanks in advance, >>>>>>> NB >>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> > > > -- > [image: eXenSa] > *Guillaume PITEL, Président* > +33(0)626 222 431 > > eXenSa S.A.S. <http://www.exensa.com/> > 41, rue Périer - 92120 Montrouge - FRANCE > Tel +33(0)184 163 677 / Fax +33(0)972 283 705 >