Re: spark.cleaner.ttl for 1.4.1
AFAIK the ContextCleaner should perform all of the cleaning *as long as garbage collection is performed frequently enough on the driver*. See https://issues.apache.org/jira/browse/SPARK-7689 and https://github.com/apache/spark/pull/6220#issuecomment-102950055 for discussion of this technicality. On Mon, Nov 30, 2015 at 8:46 AM Michal Čizmazia wrote: > Does *spark.cleaner.ttl *still need to be used for Spark *1.4.1 *long-running > streaming jobs? Or does *ContextCleaner* alone do all the cleaning? >
spark.cleaner.ttl for 1.4.1
Does *spark.cleaner.ttl *still need to be used for Spark *1.4.1 *long-running streaming jobs? Or does *ContextCleaner* alone do all the cleaning?
RE: [SparkStreaming 1.3.0] Broadcast failure after setting "spark.cleaner.ttl"
The shuffle data can be deleted through weak reference mechanism, you could check the code of ContextCleaner, also you could trigger a full gc manually with JVisualVM or some other tools to see if shuffle files are deleted. Thanks Jerry From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Tuesday, June 9, 2015 5:28 PM To: Shao, Saisai; user Subject: RE: [SparkStreaming 1.3.0] Broadcast failure after setting "spark.cleaner.ttl" Jerry, I agree with you. However, in my case, I kept the monitoring the "blockmanager" folder. I do see sometimes the number of files decreased, but the folder's size kept increasing. And below is a screenshot of the folder. You can see some old files are not deleted somehow. [cid:image001.jpg@01D0A2DB.739904D0] -Original Message- From: Shao, Saisai [mailto:saisai.s...@intel.com] Sent: Tuesday, June 09, 2015 4:33 PM To: Haopu Wang; user Subject: RE: [SparkStreaming 1.3.0] Broadcast failure after setting "spark.cleaner.ttl" >From the stack I think this problem may be due to the deletion of broadcast >variable, as you set the spark.cleaner.ttl, so after this timeout limit, the >old broadcast variable will be deleted, you will meet this exception when you >want to use it again after that time limit. Basically I think you don't need to use this configuration, Spark Streaming will automatically delete the old, unused data, also Spark itself will delete this metadata using weak reference. Also this configuration will be deprecated in the coming release. Thanks Jerry -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Tuesday, June 9, 2015 3:30 PM To: user Subject: [SparkStreaming 1.3.0] Broadcast failure after setting "spark.cleaner.ttl" When I ran a spark streaming application longer, I noticed the local directory's size was kept increasing. I set "spark.cleaner.ttl" to 1800 seconds in order clean the metadata. The spark streaming batch duration is 10 seconds and checkpoint duration is 10 minutes. The setting took effect but after that, below exception happened. Do you have any idea about this error? Thank you! 15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0 (TID 27045, host2): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr oadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro adcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal a:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc ala:87) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter .scala:202) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter. scala:56) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6 8) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4 1) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav a:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja va:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBr
RE: [SparkStreaming 1.3.0] Broadcast failure after setting "spark.cleaner.ttl"
Jerry, I agree with you. However, in my case, I kept the monitoring the "blockmanager" folder. I do see sometimes the number of files decreased, but the folder's size kept increasing. And below is a screenshot of the folder. You can see some old files are not deleted somehow. -Original Message- From: Shao, Saisai [mailto:saisai.s...@intel.com] Sent: Tuesday, June 09, 2015 4:33 PM To: Haopu Wang; user Subject: RE: [SparkStreaming 1.3.0] Broadcast failure after setting "spark.cleaner.ttl" >From the stack I think this problem may be due to the deletion of broadcast variable, as you set the spark.cleaner.ttl, so after this timeout limit, the old broadcast variable will be deleted, you will meet this exception when you want to use it again after that time limit. Basically I think you don't need to use this configuration, Spark Streaming will automatically delete the old, unused data, also Spark itself will delete this metadata using weak reference. Also this configuration will be deprecated in the coming release. Thanks Jerry -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Tuesday, June 9, 2015 3:30 PM To: user Subject: [SparkStreaming 1.3.0] Broadcast failure after setting "spark.cleaner.ttl" When I ran a spark streaming application longer, I noticed the local directory's size was kept increasing. I set "spark.cleaner.ttl" to 1800 seconds in order clean the metadata. The spark streaming batch duration is 10 seconds and checkpoint duration is 10 minutes. The setting took effect but after that, below exception happened. Do you have any idea about this error? Thank you! 15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0 (TID 27045, host2): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr oadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro adcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal a:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc ala:87) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter .scala:202) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter. scala:56) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6 8) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4 1) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav a:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja va:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast .scala:137) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast .scala:137) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.sc ala:136) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apa
RE: [SparkStreaming 1.3.0] Broadcast failure after setting "spark.cleaner.ttl"
>From the stack I think this problem may be due to the deletion of broadcast >variable, as you set the spark.cleaner.ttl, so after this timeout limit, the >old broadcast variable will be deleted, you will meet this exception when you >want to use it again after that time limit. Basically I think you don't need to use this configuration, Spark Streaming will automatically delete the old, unused data, also Spark itself will delete this metadata using weak reference. Also this configuration will be deprecated in the coming release. Thanks Jerry -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Tuesday, June 9, 2015 3:30 PM To: user Subject: [SparkStreaming 1.3.0] Broadcast failure after setting "spark.cleaner.ttl" When I ran a spark streaming application longer, I noticed the local directory's size was kept increasing. I set "spark.cleaner.ttl" to 1800 seconds in order clean the metadata. The spark streaming batch duration is 10 seconds and checkpoint duration is 10 minutes. The setting took effect but after that, below exception happened. Do you have any idea about this error? Thank you! 15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0 (TID 27045, host2): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr oadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro adcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal a:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc ala:87) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter .scala:202) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter. scala:56) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6 8) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4 1) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav a:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja va:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast .scala:137) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast .scala:137) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.sc ala:136) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$T orrentBroadcast$$readBlocks(TorrentBroadcast.scala:119) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$ 1.apply(TorrentBroadcast.scala:174) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:11
Re: [SparkStreaming 1.3.0] Broadcast failure after setting "spark.cleaner.ttl"
Hi, Are you restarting your Spark streaming context through getOrCreate? On 9 Jun 2015 09:30, "Haopu Wang" wrote: > When I ran a spark streaming application longer, I noticed the local > directory's size was kept increasing. > > I set "spark.cleaner.ttl" to 1800 seconds in order clean the metadata. > > The spark streaming batch duration is 10 seconds and checkpoint duration > is 10 minutes. > > The setting took effect but after that, below exception happened. > > Do you have any idea about this error? Thank you! > > > > 15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0 > (TID 27045, host2): java.io.IOException: > org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of > broadcast_82 > at > org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155) > at > org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr > oadcast.scala:164) > at > org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro > adcast.scala:64) > at > org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal > a:64) > at > org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc > ala:87) > at > org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) > at > org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute > $3.apply(HashmapEnrichDStream.scala:39) > at > org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute > $3.apply(HashmapEnrichDStream.scala:39) > at > scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) > at > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at > scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) > at > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at > scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) > at > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at > scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) > at > org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter > .scala:202) > at > org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter. > scala:56) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6 > 8) > at > org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4 > 1) > at org.apache.spark.scheduler.Task.run(Task.scala:64) > at > org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav > a:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja > va:615) > at java.lang.Thread.run(Thread.java:745) > Caused by: org.apache.spark.SparkException: Failed to get > broadcast_82_piece0 of broadcast_82 > at > org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br > oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast > .scala:137) > at > org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br > oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast > .scala:137) > at scala.Option.getOrElse(Option.scala:120) > at > org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br > oadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.sc > ala:136) > at > org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br > oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119) > at > org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br > oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$T > orrentBroadcast$$readBlocks(TorrentBroadcast.scala:119) > at > org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$ > 1.apply(TorrentBroadcast.scala:174) > at > org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152) > ... 25 more > > 15/06/09 12:57:30 ERROR TaskSetManager: Task 2 in stage 5038.0 failed 4 > times; aborting job > 15/06/09 12:57:30 ERROR JobScheduler: Error running job streaming job > 143382585 ms.0 > > > > > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
[SparkStreaming 1.3.0] Broadcast failure after setting "spark.cleaner.ttl"
When I ran a spark streaming application longer, I noticed the local directory's size was kept increasing. I set "spark.cleaner.ttl" to 1800 seconds in order clean the metadata. The spark streaming batch duration is 10 seconds and checkpoint duration is 10 minutes. The setting took effect but after that, below exception happened. Do you have any idea about this error? Thank you! 15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0 (TID 27045, host2): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr oadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro adcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal a:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc ala:87) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter .scala:202) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter. scala:56) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6 8) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4 1) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav a:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja va:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast .scala:137) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast .scala:137) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.sc ala:136) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$T orrentBroadcast$$readBlocks(TorrentBroadcast.scala:119) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$ 1.apply(TorrentBroadcast.scala:174) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152) ... 25 more 15/06/09 12:57:30 ERROR TaskSetManager: Task 2 in stage 5038.0 failed 4 times; aborting job 15/06/09 12:57:30 ERROR JobScheduler: Error running job streaming job 143382585 ms.0 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark.cleaner.ttl
Hi, I am using spark v 1.1.0. The default value of spark.cleaner.ttl is infinite as per the online docs. Since a lot of shuffle files are generated in /tmp/spark-local* and the disk is running out of space, we tested with a smaller value of ttl. However, even when job has completed and the timer expires, the files remain and instead of deleting, the timestamps of the files keep changing. How can we automatically delete these shuffle files, say after every 24 hours? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-cleaner-ttl-tp15574.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark.cleaner.ttl and spark.streaming.unpersist
I switched from Yarn to StandAlone mode and haven't had OOM issue yet. However, now I have Akka issues killing the executor: 2014-09-11 02:43:34,543 INFO akka.actor.LocalActorRef: Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://sparkWorker/deadLetters] to Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.2.16.8%3A44405-6#1549270895] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. Before I switched from Yarn to Standalone, I tried looking at heaps of running executors. What I found odd was that while both - jmap histo:live and jmap histo showed heap usage in few hundreds of MBytes, Yarn kept showing that memory utilization is in several Gigabytes - eventually leading to the container being killed. I would appreciate if someone can duplicate what I am seeing. Basically: 1. Tail your yarn container logs and see what it is reporting as memory used by the JVM 2. In parallel, run "jmap -histo:live " or "jmap histo " on the executor process. They should be about the same, right? Also, in the heap dump, 99% of the heap seems to be occupied with "unreachable objects" (and most of it is byte arrays). On Wed, Sep 10, 2014 at 12:06 PM, Tim Smith wrote: > Actually, I am not doing any explicit shuffle/updateByKey or other > transform functions. In my program flow, I take in data from Kafka, > match each message against a list of regex and then if a msg matches a > regex then extract groups, stuff them in json and push out back to > kafka (different topic). So there is really no dependency between two > messages in terms of processing. Here's my container histogram: > http://pastebin.com/s3nAT3cY > > Essentially, my app is a cluster grep on steroids. > > > > On Wed, Sep 10, 2014 at 11:34 AM, Yana Kadiyska > wrote: >> Tim, I asked a similar question twice: >> here >> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-executors-to-stay-alive-tt12940.html >> and here >> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Executor-OOM-tt12383.html >> >> and have not yet received any responses. I noticed that the heapdump only >> contains a very large byte array consuming about 66%(the second link >> contains a picture of my heap -- I ran with a small heap to be able to get >> the failure quickly) >> >> I don't have solutions but wanted to affirm that I've observed a similar >> situation... >> >> On Wed, Sep 10, 2014 at 2:24 PM, Tim Smith wrote: >>> >>> I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case, >>> the receivers die within an hour because Yarn kills the containers for high >>> memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I >>> don't think stale RDDs are an issue here. I did a "jmap -histo" on a couple >>> of running receiver processes and in a heap of 30G, roughly ~16G is taken by >>> "[B" which is byte arrays. >>> >>> Still investigating more and would appreciate pointers for >>> troubleshooting. I have dumped the heap of a receiver and will try to go >>> over it. >>> >>> >>> >>> >>> On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez >>> wrote: >>>> >>>> I somehow missed that parameter when I was reviewing the documentation, >>>> that should do the trick! Thank you! >>>> >>>> 2014-09-10 2:10 GMT+01:00 Shao, Saisai : >>>> >>>>> Hi Luis, >>>>> >>>>> >>>>> >>>>> The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be >>>>> used to remove useless timeout streaming data, the difference is that >>>>> “spark.cleaner.ttl” is time-based cleaner, it does not only clean >>>>> streaming >>>>> input data, but also Spark’s useless metadata; while >>>>> “spark.streaming.unpersist” is reference-based cleaning mechanism, >>>>> streaming >>>>> data will be removed when out of slide duration. >>>>> >>>>> >>>>> >>>>> Both these two parameter can alleviate the memory occupation of Spark >>>>> Streaming. But if the data is flooded into Spark Streaming when start up >>>>> like your situation using Kafka, these two parameters cannot well mitigate >>>
Re: spark.cleaner.ttl and spark.streaming.unpersist
Actually, I am not doing any explicit shuffle/updateByKey or other transform functions. In my program flow, I take in data from Kafka, match each message against a list of regex and then if a msg matches a regex then extract groups, stuff them in json and push out back to kafka (different topic). So there is really no dependency between two messages in terms of processing. Here's my container histogram: http://pastebin.com/s3nAT3cY Essentially, my app is a cluster grep on steroids. On Wed, Sep 10, 2014 at 11:34 AM, Yana Kadiyska wrote: > Tim, I asked a similar question twice: > here > http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-executors-to-stay-alive-tt12940.html > and here > http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Executor-OOM-tt12383.html > > and have not yet received any responses. I noticed that the heapdump only > contains a very large byte array consuming about 66%(the second link > contains a picture of my heap -- I ran with a small heap to be able to get > the failure quickly) > > I don't have solutions but wanted to affirm that I've observed a similar > situation... > > On Wed, Sep 10, 2014 at 2:24 PM, Tim Smith wrote: >> >> I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case, >> the receivers die within an hour because Yarn kills the containers for high >> memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I >> don't think stale RDDs are an issue here. I did a "jmap -histo" on a couple >> of running receiver processes and in a heap of 30G, roughly ~16G is taken by >> "[B" which is byte arrays. >> >> Still investigating more and would appreciate pointers for >> troubleshooting. I have dumped the heap of a receiver and will try to go >> over it. >> >> >> >> >> On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez >> wrote: >>> >>> I somehow missed that parameter when I was reviewing the documentation, >>> that should do the trick! Thank you! >>> >>> 2014-09-10 2:10 GMT+01:00 Shao, Saisai : >>> >>>> Hi Luis, >>>> >>>> >>>> >>>> The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be >>>> used to remove useless timeout streaming data, the difference is that >>>> “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming >>>> input data, but also Spark’s useless metadata; while >>>> “spark.streaming.unpersist” is reference-based cleaning mechanism, >>>> streaming >>>> data will be removed when out of slide duration. >>>> >>>> >>>> >>>> Both these two parameter can alleviate the memory occupation of Spark >>>> Streaming. But if the data is flooded into Spark Streaming when start up >>>> like your situation using Kafka, these two parameters cannot well mitigate >>>> the problem. Actually you need to control the input data rate to not inject >>>> so fast, you can try “spark.straming.receiver.maxRate” to control the >>>> inject >>>> rate. >>>> >>>> >>>> >>>> Thanks >>>> >>>> Jerry >>>> >>>> >>>> >>>> From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com] >>>> Sent: Wednesday, September 10, 2014 5:21 AM >>>> To: user@spark.apache.org >>>> Subject: spark.cleaner.ttl and spark.streaming.unpersist >>>> >>>> >>>> >>>> The executors of my spark streaming application are being killed due to >>>> memory issues. The memory consumption is quite high on startup because is >>>> the first run and there are quite a few events on the kafka queues that are >>>> consumed at a rate of 100K events per sec. >>>> >>>> I wonder if it's recommended to use spark.cleaner.ttl and >>>> spark.streaming.unpersist together to mitigate that problem. And I also >>>> wonder if new RDD are being batched while a RDD is being processed. >>>> >>>> Regards, >>>> >>>> Luis >>> >>> >> > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark.cleaner.ttl and spark.streaming.unpersist
Tim, I asked a similar question twice: here http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-executors-to-stay-alive-tt12940.html and here http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Executor-OOM-tt12383.html and have not yet received any responses. I noticed that the heapdump only contains a very large byte array consuming about 66%(the second link contains a picture of my heap -- I ran with a small heap to be able to get the failure quickly) I don't have solutions but wanted to affirm that I've observed a similar situation... On Wed, Sep 10, 2014 at 2:24 PM, Tim Smith wrote: > I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case, > the receivers die within an hour because Yarn kills the containers for high > memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I > don't think stale RDDs are an issue here. I did a "jmap -histo" on a couple > of running receiver processes and in a heap of 30G, roughly ~16G is taken > by "[B" which is byte arrays. > > Still investigating more and would appreciate pointers for > troubleshooting. I have dumped the heap of a receiver and will try to go > over it. > > > > > On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez < > langel.gro...@gmail.com> wrote: > >> I somehow missed that parameter when I was reviewing the documentation, >> that should do the trick! Thank you! >> >> 2014-09-10 2:10 GMT+01:00 Shao, Saisai : >> >> Hi Luis, >>> >>> >>> >>> The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be >>> used to remove useless timeout streaming data, the difference is that >>> “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming >>> input data, but also Spark’s useless metadata; while >>> “spark.streaming.unpersist” is reference-based cleaning mechanism, >>> streaming data will be removed when out of slide duration. >>> >>> >>> >>> Both these two parameter can alleviate the memory occupation of Spark >>> Streaming. But if the data is flooded into Spark Streaming when start up >>> like your situation using Kafka, these two parameters cannot well mitigate >>> the problem. Actually you need to control the input data rate to not inject >>> so fast, you can try “spark.straming.receiver.maxRate” to control the >>> inject rate. >>> >>> >>> >>> Thanks >>> >>> Jerry >>> >>> >>> >>> *From:* Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com] >>> *Sent:* Wednesday, September 10, 2014 5:21 AM >>> *To:* user@spark.apache.org >>> *Subject:* spark.cleaner.ttl and spark.streaming.unpersist >>> >>> >>> >>> The executors of my spark streaming application are being killed due to >>> memory issues. The memory consumption is quite high on startup because is >>> the first run and there are quite a few events on the kafka queues that are >>> consumed at a rate of 100K events per sec. >>> >>> I wonder if it's recommended to use spark.cleaner.ttl and >>> spark.streaming.unpersist together to mitigate that problem. And I also >>> wonder if new RDD are being batched while a RDD is being processed. >>> >>> Regards, >>> >>> Luis >>> >> >> >
Re: spark.cleaner.ttl and spark.streaming.unpersist
I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case, the receivers die within an hour because Yarn kills the containers for high memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I don't think stale RDDs are an issue here. I did a "jmap -histo" on a couple of running receiver processes and in a heap of 30G, roughly ~16G is taken by "[B" which is byte arrays. Still investigating more and would appreciate pointers for troubleshooting. I have dumped the heap of a receiver and will try to go over it. On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez < langel.gro...@gmail.com> wrote: > I somehow missed that parameter when I was reviewing the documentation, > that should do the trick! Thank you! > > 2014-09-10 2:10 GMT+01:00 Shao, Saisai : > > Hi Luis, >> >> >> >> The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be >> used to remove useless timeout streaming data, the difference is that >> “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming >> input data, but also Spark’s useless metadata; while >> “spark.streaming.unpersist” is reference-based cleaning mechanism, >> streaming data will be removed when out of slide duration. >> >> >> >> Both these two parameter can alleviate the memory occupation of Spark >> Streaming. But if the data is flooded into Spark Streaming when start up >> like your situation using Kafka, these two parameters cannot well mitigate >> the problem. Actually you need to control the input data rate to not inject >> so fast, you can try “spark.straming.receiver.maxRate” to control the >> inject rate. >> >> >> >> Thanks >> >> Jerry >> >> >> >> *From:* Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com] >> *Sent:* Wednesday, September 10, 2014 5:21 AM >> *To:* user@spark.apache.org >> *Subject:* spark.cleaner.ttl and spark.streaming.unpersist >> >> >> >> The executors of my spark streaming application are being killed due to >> memory issues. The memory consumption is quite high on startup because is >> the first run and there are quite a few events on the kafka queues that are >> consumed at a rate of 100K events per sec. >> >> I wonder if it's recommended to use spark.cleaner.ttl and >> spark.streaming.unpersist together to mitigate that problem. And I also >> wonder if new RDD are being batched while a RDD is being processed. >> >> Regards, >> >> Luis >> > >
Re: spark.cleaner.ttl and spark.streaming.unpersist
I somehow missed that parameter when I was reviewing the documentation, that should do the trick! Thank you! 2014-09-10 2:10 GMT+01:00 Shao, Saisai : > Hi Luis, > > > > The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be > used to remove useless timeout streaming data, the difference is that > “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming > input data, but also Spark’s useless metadata; while > “spark.streaming.unpersist” is reference-based cleaning mechanism, > streaming data will be removed when out of slide duration. > > > > Both these two parameter can alleviate the memory occupation of Spark > Streaming. But if the data is flooded into Spark Streaming when start up > like your situation using Kafka, these two parameters cannot well mitigate > the problem. Actually you need to control the input data rate to not inject > so fast, you can try “spark.straming.receiver.maxRate” to control the > inject rate. > > > > Thanks > > Jerry > > > > *From:* Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com] > *Sent:* Wednesday, September 10, 2014 5:21 AM > *To:* user@spark.apache.org > *Subject:* spark.cleaner.ttl and spark.streaming.unpersist > > > > The executors of my spark streaming application are being killed due to > memory issues. The memory consumption is quite high on startup because is > the first run and there are quite a few events on the kafka queues that are > consumed at a rate of 100K events per sec. > > I wonder if it's recommended to use spark.cleaner.ttl and > spark.streaming.unpersist together to mitigate that problem. And I also > wonder if new RDD are being batched while a RDD is being processed. > > Regards, > > Luis >
RE: spark.cleaner.ttl and spark.streaming.unpersist
Hi Luis, The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be used to remove useless timeout streaming data, the difference is that “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming input data, but also Spark’s useless metadata; while “spark.streaming.unpersist” is reference-based cleaning mechanism, streaming data will be removed when out of slide duration. Both these two parameter can alleviate the memory occupation of Spark Streaming. But if the data is flooded into Spark Streaming when start up like your situation using Kafka, these two parameters cannot well mitigate the problem. Actually you need to control the input data rate to not inject so fast, you can try “spark.straming.receiver.maxRate” to control the inject rate. Thanks Jerry From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com] Sent: Wednesday, September 10, 2014 5:21 AM To: user@spark.apache.org Subject: spark.cleaner.ttl and spark.streaming.unpersist The executors of my spark streaming application are being killed due to memory issues. The memory consumption is quite high on startup because is the first run and there are quite a few events on the kafka queues that are consumed at a rate of 100K events per sec. I wonder if it's recommended to use spark.cleaner.ttl and spark.streaming.unpersist together to mitigate that problem. And I also wonder if new RDD are being batched while a RDD is being processed. Regards, Luis
spark.cleaner.ttl and spark.streaming.unpersist
The executors of my spark streaming application are being killed due to memory issues. The memory consumption is quite high on startup because is the first run and there are quite a few events on the kafka queues that are consumed at a rate of 100K events per sec. I wonder if it's recommended to use spark.cleaner.ttl and spark.streaming.unpersist together to mitigate that problem. And I also wonder if new RDD are being batched while a RDD is being processed. Regards, Luis
Re: "spark.streaming.unpersist" and "spark.cleaner.ttl"
Yeah, I wrote those lines a while back, I wanted to contrast storage levels with and without serialization. Should have realized that StorageLevel.MEMORY_ONLY_SER can be confused to be the default level. TD On Wed, Jul 23, 2014 at 5:12 AM, Shao, Saisai wrote: > Yeah, the document may not be precisely aligned with latest code, so the best > way is to check the code. > > -Original Message- > From: Haopu Wang [mailto:hw...@qilinsoft.com] > Sent: Wednesday, July 23, 2014 5:56 PM > To: user@spark.apache.org > Subject: RE: "spark.streaming.unpersist" and "spark.cleaner.ttl" > > Jerry, thanks for the response. > > For the default storage level of DStream, it looks like Spark's document is > wrong. In this link: > http://spark.apache.org/docs/latest/streaming-programming-guide.html#memory-tuning > It mentions: > "Default persistence level of DStreams: Unlike RDDs, the default persistence > level of DStreams serializes the data in memory (that is, > StorageLevel.MEMORY_ONLY_SER for DStream compared to StorageLevel.MEMORY_ONLY > for RDDs). Even though keeping the data serialized incurs higher > serialization/deserialization overheads, it significantly reduces GC pauses." > > I will take a look at DStream.scala although I have no Scala experience. > > -Original Message- > From: Shao, Saisai [mailto:saisai.s...@intel.com] > Sent: 2014年7月23日 15:13 > To: user@spark.apache.org > Subject: RE: "spark.streaming.unpersist" and "spark.cleaner.ttl" > > Hi Haopu, > > Please see the inline comments. > > Thanks > Jerry > > -Original Message- > From: Haopu Wang [mailto:hw...@qilinsoft.com] > Sent: Wednesday, July 23, 2014 3:00 PM > To: user@spark.apache.org > Subject: "spark.streaming.unpersist" and "spark.cleaner.ttl" > > I have a DStream receiving data from a socket. I'm using local mode. > I set "spark.streaming.unpersist" to "false" and leave " > spark.cleaner.ttl" to be infinite. > I can see files for input and shuffle blocks under "spark.local.dir" > folder and the size of folder keeps increasing, although JVM's memory usage > seems to be stable. > > [question] In this case, because input RDDs are persisted but they don't fit > into memory, so write to disk, right? And where can I see the details about > these RDDs? I don't see them in web UI. > > [answer] Yes, if memory is not enough to put input RDDs, this data will be > flush to disk, because the default storage level is "MEMORY_AND_DISK_SER_2" > as you can see in StreamingContext.scala. Actually you cannot not see the > input RDD in web UI, you can only see the cached RDD in web UI. > > Then I set "spark.streaming.unpersist" to "true", the size of > "spark.local.dir" folder and JVM's used heap size are reduced regularly. > > [question] In this case, because I didn't change "spark.cleaner.ttl", which > component is doing the cleanup? And what's the difference if I set > "spark.cleaner.ttl" to some duration in this case? > > [answer] If you set "spark.streaming.unpersist" to true, old unused rdd will > be deleted, as you can see in DStream.scala. While "spark.cleaner.ttl" is > timer-based spark cleaner, not only clean streaming data, but also broadcast, > shuffle and other data. > > Thank you! >
RE: "spark.streaming.unpersist" and "spark.cleaner.ttl"
Yeah, the document may not be precisely aligned with latest code, so the best way is to check the code. -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Wednesday, July 23, 2014 5:56 PM To: user@spark.apache.org Subject: RE: "spark.streaming.unpersist" and "spark.cleaner.ttl" Jerry, thanks for the response. For the default storage level of DStream, it looks like Spark's document is wrong. In this link: http://spark.apache.org/docs/latest/streaming-programming-guide.html#memory-tuning It mentions: "Default persistence level of DStreams: Unlike RDDs, the default persistence level of DStreams serializes the data in memory (that is, StorageLevel.MEMORY_ONLY_SER for DStream compared to StorageLevel.MEMORY_ONLY for RDDs). Even though keeping the data serialized incurs higher serialization/deserialization overheads, it significantly reduces GC pauses." I will take a look at DStream.scala although I have no Scala experience. -Original Message- From: Shao, Saisai [mailto:saisai.s...@intel.com] Sent: 2014年7月23日 15:13 To: user@spark.apache.org Subject: RE: "spark.streaming.unpersist" and "spark.cleaner.ttl" Hi Haopu, Please see the inline comments. Thanks Jerry -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Wednesday, July 23, 2014 3:00 PM To: user@spark.apache.org Subject: "spark.streaming.unpersist" and "spark.cleaner.ttl" I have a DStream receiving data from a socket. I'm using local mode. I set "spark.streaming.unpersist" to "false" and leave " spark.cleaner.ttl" to be infinite. I can see files for input and shuffle blocks under "spark.local.dir" folder and the size of folder keeps increasing, although JVM's memory usage seems to be stable. [question] In this case, because input RDDs are persisted but they don't fit into memory, so write to disk, right? And where can I see the details about these RDDs? I don't see them in web UI. [answer] Yes, if memory is not enough to put input RDDs, this data will be flush to disk, because the default storage level is "MEMORY_AND_DISK_SER_2" as you can see in StreamingContext.scala. Actually you cannot not see the input RDD in web UI, you can only see the cached RDD in web UI. Then I set "spark.streaming.unpersist" to "true", the size of "spark.local.dir" folder and JVM's used heap size are reduced regularly. [question] In this case, because I didn't change "spark.cleaner.ttl", which component is doing the cleanup? And what's the difference if I set "spark.cleaner.ttl" to some duration in this case? [answer] If you set "spark.streaming.unpersist" to true, old unused rdd will be deleted, as you can see in DStream.scala. While "spark.cleaner.ttl" is timer-based spark cleaner, not only clean streaming data, but also broadcast, shuffle and other data. Thank you!
RE: "spark.streaming.unpersist" and "spark.cleaner.ttl"
Jerry, thanks for the response. For the default storage level of DStream, it looks like Spark's document is wrong. In this link: http://spark.apache.org/docs/latest/streaming-programming-guide.html#memory-tuning It mentions: "Default persistence level of DStreams: Unlike RDDs, the default persistence level of DStreams serializes the data in memory (that is, StorageLevel.MEMORY_ONLY_SER for DStream compared to StorageLevel.MEMORY_ONLY for RDDs). Even though keeping the data serialized incurs higher serialization/deserialization overheads, it significantly reduces GC pauses." I will take a look at DStream.scala although I have no Scala experience. -Original Message- From: Shao, Saisai [mailto:saisai.s...@intel.com] Sent: 2014年7月23日 15:13 To: user@spark.apache.org Subject: RE: "spark.streaming.unpersist" and "spark.cleaner.ttl" Hi Haopu, Please see the inline comments. Thanks Jerry -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Wednesday, July 23, 2014 3:00 PM To: user@spark.apache.org Subject: "spark.streaming.unpersist" and "spark.cleaner.ttl" I have a DStream receiving data from a socket. I'm using local mode. I set "spark.streaming.unpersist" to "false" and leave " spark.cleaner.ttl" to be infinite. I can see files for input and shuffle blocks under "spark.local.dir" folder and the size of folder keeps increasing, although JVM's memory usage seems to be stable. [question] In this case, because input RDDs are persisted but they don't fit into memory, so write to disk, right? And where can I see the details about these RDDs? I don't see them in web UI. [answer] Yes, if memory is not enough to put input RDDs, this data will be flush to disk, because the default storage level is "MEMORY_AND_DISK_SER_2" as you can see in StreamingContext.scala. Actually you cannot not see the input RDD in web UI, you can only see the cached RDD in web UI. Then I set "spark.streaming.unpersist" to "true", the size of "spark.local.dir" folder and JVM's used heap size are reduced regularly. [question] In this case, because I didn't change "spark.cleaner.ttl", which component is doing the cleanup? And what's the difference if I set "spark.cleaner.ttl" to some duration in this case? [answer] If you set "spark.streaming.unpersist" to true, old unused rdd will be deleted, as you can see in DStream.scala. While "spark.cleaner.ttl" is timer-based spark cleaner, not only clean streaming data, but also broadcast, shuffle and other data. Thank you!
RE: "spark.streaming.unpersist" and "spark.cleaner.ttl"
Hi Haopu, Please see the inline comments. Thanks Jerry -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Wednesday, July 23, 2014 3:00 PM To: user@spark.apache.org Subject: "spark.streaming.unpersist" and "spark.cleaner.ttl" I have a DStream receiving data from a socket. I'm using local mode. I set "spark.streaming.unpersist" to "false" and leave " spark.cleaner.ttl" to be infinite. I can see files for input and shuffle blocks under "spark.local.dir" folder and the size of folder keeps increasing, although JVM's memory usage seems to be stable. [question] In this case, because input RDDs are persisted but they don't fit into memory, so write to disk, right? And where can I see the details about these RDDs? I don't see them in web UI. [answer] Yes, if memory is not enough to put input RDDs, this data will be flush to disk, because the default storage level is "MEMORY_AND_DISK_SER_2" as you can see in StreamingContext.scala. Actually you cannot not see the input RDD in web UI, you can only see the cached RDD in web UI. Then I set "spark.streaming.unpersist" to "true", the size of "spark.local.dir" folder and JVM's used heap size are reduced regularly. [question] In this case, because I didn't change "spark.cleaner.ttl", which component is doing the cleanup? And what's the difference if I set "spark.cleaner.ttl" to some duration in this case? [answer] If you set "spark.streaming.unpersist" to true, old unused rdd will be deleted, as you can see in DStream.scala. While "spark.cleaner.ttl" is timer-based spark cleaner, not only clean streaming data, but also broadcast, shuffle and other data. Thank you!
"spark.streaming.unpersist" and "spark.cleaner.ttl"
I have a DStream receiving data from a socket. I'm using local mode. I set "spark.streaming.unpersist" to "false" and leave " spark.cleaner.ttl" to be infinite. I can see files for input and shuffle blocks under "spark.local.dir" folder and the size of folder keeps increasing, although JVM's memory usage seems to be stable. [question] In this case, because input RDDs are persisted but they don't fit into memory, so write to disk, right? And where can I see the details about these RDDs? I don't see them in web UI. Then I set "spark.streaming.unpersist" to "true", the size of "spark.local.dir" folder and JVM's used heap size are reduced regularly. [question] In this case, because I didn't change "spark.cleaner.ttl", which component is doing the cleanup? And what's the difference if I set "spark.cleaner.ttl" to some duration in this case? Thank you!
Re: is spark.cleaner.ttl safe?
Yes, we are also facing same problem. The workaround we came up with is - store the broadcast variable id when it was first created - then create a scheduled job which runs every (spark.cleaner.ttl - 1minute) interval and creates the same broadcast variable using same id. This way spark is happy finding same broadcast file (broadcast_) val httpBroadcastFactory = new HttpBroadcastFactory() httpBroadcastFactory.newBroadcast(bcastVariable.value, false, id) On Wed, Mar 12, 2014 at 2:38 AM, Aaron Davidson wrote: > And to answer your original question, spark.cleaner.ttl is not safe for > the exact reason you brought up. The PR Mark linked intends to provide a > much cleaner (and safer) solution. > > > On Tue, Mar 11, 2014 at 2:01 PM, Mark Hamstra wrote: > >> Actually, TD's work-in-progress is probably more what you want: >> https://github.com/apache/spark/pull/126 >> >> >> On Tue, Mar 11, 2014 at 1:58 PM, Michael Allman wrote: >> >>> Hello, >>> >>> I've been trying to run an iterative spark job that spills 1+ GB to disk >>> per iteration on a system with limited disk space. I believe there's enough >>> space if spark would clean up unused data from previous iterations, but as >>> it stands the number of iterations I can run is limited by available disk >>> space. >>> >>> I found a thread on the usage of spark.cleaner.ttl on the old Spark >>> Users Google group here: >>> >>> https://groups.google.com/forum/#!topic/spark-users/9ebKcNCDih4 >>> >>> I think this setting may be what I'm looking for, however the cleaner >>> seems to delete data that's still in use. The effect is I get bizarre >>> exceptions from Spark complaining about missing broadcast data or >>> ArrayIndexOutOfBounds. When is spark.cleaner.ttl safe to use? Is it >>> supposed to delete in-use data or is this a bug/shortcoming? >>> >>> Cheers, >>> >>> Michael >>> >>> >>> >> > -- Sourav Chandra Senior Software Engineer · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · sourav.chan...@livestream.com o: +91 80 4121 8723 m: +91 988 699 3746 skype: sourav.chandra Livestream "Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area, Bangalore 560034 www.livestream.com
Re: is spark.cleaner.ttl safe?
And to answer your original question, spark.cleaner.ttl is not safe for the exact reason you brought up. The PR Mark linked intends to provide a much cleaner (and safer) solution. On Tue, Mar 11, 2014 at 2:01 PM, Mark Hamstra wrote: > Actually, TD's work-in-progress is probably more what you want: > https://github.com/apache/spark/pull/126 > > > On Tue, Mar 11, 2014 at 1:58 PM, Michael Allman wrote: > >> Hello, >> >> I've been trying to run an iterative spark job that spills 1+ GB to disk >> per iteration on a system with limited disk space. I believe there's enough >> space if spark would clean up unused data from previous iterations, but as >> it stands the number of iterations I can run is limited by available disk >> space. >> >> I found a thread on the usage of spark.cleaner.ttl on the old Spark Users >> Google group here: >> >> https://groups.google.com/forum/#!topic/spark-users/9ebKcNCDih4 >> >> I think this setting may be what I'm looking for, however the cleaner >> seems to delete data that's still in use. The effect is I get bizarre >> exceptions from Spark complaining about missing broadcast data or >> ArrayIndexOutOfBounds. When is spark.cleaner.ttl safe to use? Is it >> supposed to delete in-use data or is this a bug/shortcoming? >> >> Cheers, >> >> Michael >> >> >> >
Re: is spark.cleaner.ttl safe?
Actually, TD's work-in-progress is probably more what you want: https://github.com/apache/spark/pull/126 On Tue, Mar 11, 2014 at 1:58 PM, Michael Allman wrote: > Hello, > > I've been trying to run an iterative spark job that spills 1+ GB to disk > per iteration on a system with limited disk space. I believe there's enough > space if spark would clean up unused data from previous iterations, but as > it stands the number of iterations I can run is limited by available disk > space. > > I found a thread on the usage of spark.cleaner.ttl on the old Spark Users > Google group here: > > https://groups.google.com/forum/#!topic/spark-users/9ebKcNCDih4 > > I think this setting may be what I'm looking for, however the cleaner > seems to delete data that's still in use. The effect is I get bizarre > exceptions from Spark complaining about missing broadcast data or > ArrayIndexOutOfBounds. When is spark.cleaner.ttl safe to use? Is it > supposed to delete in-use data or is this a bug/shortcoming? > > Cheers, > > Michael > > >
is spark.cleaner.ttl safe?
Hello, I've been trying to run an iterative spark job that spills 1+ GB to disk per iteration on a system with limited disk space. I believe there's enough space if spark would clean up unused data from previous iterations, but as it stands the number of iterations I can run is limited by available disk space. I found a thread on the usage of spark.cleaner.ttl on the old Spark Users Google group here: https://groups.google.com/forum/#!topic/spark-users/9ebKcNCDih4 I think this setting may be what I'm looking for, however the cleaner seems to delete data that's still in use. The effect is I get bizarre exceptions from Spark complaining about missing broadcast data or ArrayIndexOutOfBounds. When is spark.cleaner.ttl safe to use? Is it supposed to delete in-use data or is this a bug/shortcoming? Cheers, Michael