Hi Guillaume,

Interesting that you brought up Shuffle. In fact we are experiencing this
issue of shuffle files being left behind and not being cleaned up. Since
this is a Spark streaming application, it is expected to stay up
indefinitely, so shuffle files being left is a big problem right now. Since
our max window size is 6 hours, we have set a cron job to clean up shuffle
files older than 12 hours otherwise it will eat up all our disk space.

Please see the following. It seems the non-cleaning of shuffle files is
being documented in 1.3.1.

https://github.com/apache/spark/pull/5074/files
https://issues.apache.org/jira/browse/SPARK-5836


Also, for some reason, the following issues had been opened but were closed
as Duplicates of the above Documentation bug.

https://issues.apache.org/jira/browse/SPARK-3563
https://issues.apache.org/jira/browse/SPARK-4796
https://issues.apache.org/jira/browse/SPARK-6011

Any further insight into how to handle shuffle files will be greatly
appreciated.

Thanks
NB




On Fri, Apr 10, 2015 at 12:33 AM, Guillaume Pitel <
guillaume.pi...@exensa.com> wrote:

>  Hi,
>
> From my experience, the main causes of timeout are related to file
> cleanup, for instance after a shuffle. If your parallelism is very high and
> you didn't set the consolidate option, there are many files created by
> Spark, and when they are cleaned up, the calls starts timeouting. So you
> may find your solution by monitoring the folder where spark store its
> shuffles.
>
> Guillaume
>
> Thanks TD. I believe that might have been the issue. Will try for a few
> days after passing in the GC option on the java command line when we start
> the process.
>
>  Thanks for your timely help.
> NB
>
> On Wed, Apr 8, 2015 at 6:08 PM, Tathagata Das <t...@databricks.com> wrote:
>
>> Yes, in local mode they the driver and executor will be same the process.
>> And in that case the Java options in  SparkConf configuration will not
>> work.
>>
>> On Wed, Apr 8, 2015 at 1:44 PM, N B <nb.nos...@gmail.com> wrote:
>>
>>> Since we are running in local mode, won't all the executors be in the
>>> same JVM as the driver?
>>>
>>>  Thanks
>>>  NB
>>>
>>> On Wed, Apr 8, 2015 at 1:29 PM, Tathagata Das <t...@databricks.com>
>>> wrote:
>>>
>>>> Its does take effect on the executors, not on the driver. Which is okay
>>>> because executors have all the data and therefore have GC issues, not so
>>>> usually for the driver. If you want to double-sure, print the JVM flag
>>>> (e.g. http://stackoverflow.com/questions/10486375/print-all-jvm-flags)
>>>>
>>>> However, the GC i was referring to that initiates the RDD and shuffle
>>>> cleanup was the GC on the driver. Thought I would clarify.
>>>>
>>>>  TD
>>>>
>>>> On Wed, Apr 8, 2015 at 1:23 PM, N B <nb.nos...@gmail.com> wrote:
>>>>
>>>>> Hi TD,
>>>>>
>>>>> Thanks for the response. Since you mentioned GC, this got me thinking.
>>>>>
>>>>>  Given that we are running in local mode (all in a single JVM) for
>>>>> now, does the option "spark.executor.extraJavaOptions" set to
>>>>> "-XX:+UseConcMarkSweepGC" inside SparkConf object take effect at all 
>>>>> before
>>>>> we use it to create the StreamingContext? I ask because that is what we 
>>>>> are
>>>>> doing right now. If not, perhaps we have not been running with the
>>>>> Concurrent Mark Sweep at all and is that recommended instead of forcing GC
>>>>> periodically?
>>>>>
>>>>>  Thanks
>>>>>  NB
>>>>>
>>>>>
>>>>> On Wed, Apr 8, 2015 at 10:20 AM, Tathagata Das <t...@databricks.com>
>>>>> wrote:
>>>>>
>>>>>> There are a couple of options. Increase timeout (see Spark
>>>>>> configuration).
>>>>>>
>>>>>>  Also see past mails in the mailing list.
>>>>>>
>>>>>>  Another option you may try (I have gut feeling that may work, but I
>>>>>> am not sure) is calling GC on the driver periodically. The cleaning up of
>>>>>> stuff is tied to GCing of RDD objects and regular cleaning may help keep
>>>>>> things clean more rigorously rather than in unpredictable bursts of GC
>>>>>> activity.
>>>>>>
>>>>>>  Let us know how it works out.
>>>>>>
>>>>>>  TD
>>>>>>
>>>>>> On Tue, Apr 7, 2015 at 6:00 PM, Nikunj Bansal <nb.nos...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> I have a standalone and local Spark streaming process where we are
>>>>>>> reading inputs using FlumeUtils. Our longest window size is 6 hours. 
>>>>>>> After
>>>>>>> about a day and a half of running without any issues, we start seeing
>>>>>>> Timeout errors while cleaning up input blocks. This seems to cause 
>>>>>>> reading
>>>>>>> from Flume to cease.
>>>>>>>
>>>>>>>
>>>>>>>  ERROR sparkDriver-akka.actor.default-dispatcher-78
>>>>>>> BlockManagerSlaveActor.logError - Error in removing block
>>>>>>> input-0-1428182594000
>>>>>>> org.apache.spark.SparkException: Error sending message [message =
>>>>>>> UpdateBlockInfo(BlockManagerId(<driver>, localhost,
>>>>>>> 55067),input-0-1428182594000,StorageLevel(false, false, false, false,
>>>>>>> 1),0,0,0)]
>>>>>>>  at
>>>>>>> org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:201)
>>>>>>>  at
>>>>>>> org.apache.spark.storage.BlockManagerMaster.askDriverWithReply(BlockManagerMaster.scala:221)
>>>>>>>  at
>>>>>>> org.apache.spark.storage.BlockManagerMaster.updateBlockInfo(BlockManagerMaster.scala:62)
>>>>>>>  at org.apache.spark.storage.BlockManager.org
>>>>>>> $apache$spark$storage$BlockManager$$tryToReportBlockStatus(BlockManager.scala:385)
>>>>>>>  at
>>>>>>> org.apache.spark.storage.BlockManager.reportBlockStatus(BlockManager.scala:361)
>>>>>>>  at
>>>>>>> org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1105)
>>>>>>>  at
>>>>>>> org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply$mcZ$sp(BlockManagerSlaveActor.scala:44)
>>>>>>>  at
>>>>>>> org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43)
>>>>>>>  at
>>>>>>> org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$receiveWithLogging$1$$anonfun$applyOrElse$1.apply(BlockManagerSlaveActor.scala:43)
>>>>>>>  at
>>>>>>> org.apache.spark.storage.BlockManagerSlaveActor$$anonfun$1.apply(BlockManagerSlaveActor.scala:76)
>>>>>>>  at
>>>>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
>>>>>>>  at
>>>>>>> scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
>>>>>>>  at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:41)
>>>>>>>  at
>>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
>>>>>>>  at
>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>>  at
>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>>  at
>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>>  at
>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>> Caused by: java.util.concurrent.TimeoutException: Futures timed out
>>>>>>> after [30 seconds]
>>>>>>>  at
>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
>>>>>>>  at
>>>>>>> scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
>>>>>>>  at
>>>>>>> scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
>>>>>>>  at
>>>>>>> akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread$$anon$3.block(ThreadPoolBuilder.scala:169)
>>>>>>>  at
>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.managedBlock(ForkJoinPool.java:3640)
>>>>>>>  at
>>>>>>> akka.dispatch.MonitorableThreadFactory$AkkaForkJoinWorkerThread.blockOn(ThreadPoolBuilder.scala:167)
>>>>>>>  at scala.concurrent.Await$.result(package.scala:107)
>>>>>>>  at
>>>>>>> org.apache.spark.util.AkkaUtils$.askWithReply(AkkaUtils.scala:187)
>>>>>>>  ... 17 more
>>>>>>>
>>>>>>>  There was a similar query posted here
>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Block-removal-causes-Akka-timeouts-td15632.html
>>>>>>> but did not find any resolution to that issue.
>>>>>>>
>>>>>>>
>>>>>>>  Thanks in advance,
>>>>>>> NB
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
>
> --
>    [image: eXenSa]
>  *Guillaume PITEL, Président*
> +33(0)626 222 431
>
> eXenSa S.A.S. <http://www.exensa.com/>
>  41, rue Périer - 92120 Montrouge - FRANCE
> Tel +33(0)184 163 677 / Fax +33(0)972 283 705
>

Reply via email to