Re: spark.cleaner.ttl for 1.4.1
AFAIK the ContextCleaner should perform all of the cleaning *as long as garbage collection is performed frequently enough on the driver*. See https://issues.apache.org/jira/browse/SPARK-7689 and https://github.com/apache/spark/pull/6220#issuecomment-102950055 for discussion of this technicality. On Mon, Nov 30, 2015 at 8:46 AM Michal Čizmazia <mici...@gmail.com> wrote: > Does *spark.cleaner.ttl *still need to be used for Spark *1.4.1 *long-running > streaming jobs? Or does *ContextCleaner* alone do all the cleaning? >
RE: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl
The shuffle data can be deleted through weak reference mechanism, you could check the code of ContextCleaner, also you could trigger a full gc manually with JVisualVM or some other tools to see if shuffle files are deleted. Thanks Jerry From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Tuesday, June 9, 2015 5:28 PM To: Shao, Saisai; user Subject: RE: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl Jerry, I agree with you. However, in my case, I kept the monitoring the blockmanager folder. I do see sometimes the number of files decreased, but the folder's size kept increasing. And below is a screenshot of the folder. You can see some old files are not deleted somehow. [cid:image001.jpg@01D0A2DB.739904D0] -Original Message- From: Shao, Saisai [mailto:saisai.s...@intel.com] Sent: Tuesday, June 09, 2015 4:33 PM To: Haopu Wang; user Subject: RE: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl From the stack I think this problem may be due to the deletion of broadcast variable, as you set the spark.cleaner.ttl, so after this timeout limit, the old broadcast variable will be deleted, you will meet this exception when you want to use it again after that time limit. Basically I think you don't need to use this configuration, Spark Streaming will automatically delete the old, unused data, also Spark itself will delete this metadata using weak reference. Also this configuration will be deprecated in the coming release. Thanks Jerry -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Tuesday, June 9, 2015 3:30 PM To: user Subject: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl When I ran a spark streaming application longer, I noticed the local directory's size was kept increasing. I set spark.cleaner.ttl to 1800 seconds in order clean the metadata. The spark streaming batch duration is 10 seconds and checkpoint duration is 10 minutes. The setting took effect but after that, below exception happened. Do you have any idea about this error? Thank you! 15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0 (TID 27045, host2): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr oadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro adcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal a:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc ala:87) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter .scala:202) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter. scala:56) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6 8) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4 1) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav a:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja va:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast .scala:137) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org
RE: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl
Jerry, I agree with you. However, in my case, I kept the monitoring the blockmanager folder. I do see sometimes the number of files decreased, but the folder's size kept increasing. And below is a screenshot of the folder. You can see some old files are not deleted somehow. -Original Message- From: Shao, Saisai [mailto:saisai.s...@intel.com] Sent: Tuesday, June 09, 2015 4:33 PM To: Haopu Wang; user Subject: RE: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl From the stack I think this problem may be due to the deletion of broadcast variable, as you set the spark.cleaner.ttl, so after this timeout limit, the old broadcast variable will be deleted, you will meet this exception when you want to use it again after that time limit. Basically I think you don't need to use this configuration, Spark Streaming will automatically delete the old, unused data, also Spark itself will delete this metadata using weak reference. Also this configuration will be deprecated in the coming release. Thanks Jerry -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Tuesday, June 9, 2015 3:30 PM To: user Subject: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl When I ran a spark streaming application longer, I noticed the local directory's size was kept increasing. I set spark.cleaner.ttl to 1800 seconds in order clean the metadata. The spark streaming batch duration is 10 seconds and checkpoint duration is 10 minutes. The setting took effect but after that, below exception happened. Do you have any idea about this error? Thank you! 15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0 (TID 27045, host2): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr oadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro adcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal a:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc ala:87) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter .scala:202) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter. scala:56) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6 8) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4 1) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav a:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja va:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast .scala:137) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast .scala:137) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.sc ala:136) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1.apply
Re: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl
Hi, Are you restarting your Spark streaming context through getOrCreate? On 9 Jun 2015 09:30, Haopu Wang hw...@qilinsoft.com wrote: When I ran a spark streaming application longer, I noticed the local directory's size was kept increasing. I set spark.cleaner.ttl to 1800 seconds in order clean the metadata. The spark streaming batch duration is 10 seconds and checkpoint duration is 10 minutes. The setting took effect but after that, below exception happened. Do you have any idea about this error? Thank you! 15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0 (TID 27045, host2): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr oadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro adcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal a:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc ala:87) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter .scala:202) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter. scala:56) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6 8) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4 1) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav a:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja va:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast .scala:137) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast .scala:137) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.sc ala:136) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$T orrentBroadcast$$readBlocks(TorrentBroadcast.scala:119) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$ 1.apply(TorrentBroadcast.scala:174) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152) ... 25 more 15/06/09 12:57:30 ERROR TaskSetManager: Task 2 in stage 5038.0 failed 4 times; aborting job 15/06/09 12:57:30 ERROR JobScheduler: Error running job streaming job 143382585 ms.0 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl
From the stack I think this problem may be due to the deletion of broadcast variable, as you set the spark.cleaner.ttl, so after this timeout limit, the old broadcast variable will be deleted, you will meet this exception when you want to use it again after that time limit. Basically I think you don't need to use this configuration, Spark Streaming will automatically delete the old, unused data, also Spark itself will delete this metadata using weak reference. Also this configuration will be deprecated in the coming release. Thanks Jerry -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Tuesday, June 9, 2015 3:30 PM To: user Subject: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl When I ran a spark streaming application longer, I noticed the local directory's size was kept increasing. I set spark.cleaner.ttl to 1800 seconds in order clean the metadata. The spark streaming batch duration is 10 seconds and checkpoint duration is 10 minutes. The setting took effect but after that, below exception happened. Do you have any idea about this error? Thank you! 15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0 (TID 27045, host2): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr oadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro adcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal a:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc ala:87) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter .scala:202) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter. scala:56) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6 8) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4 1) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav a:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja va:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast .scala:137) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast .scala:137) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.sc ala:136) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$T orrentBroadcast$$readBlocks(TorrentBroadcast.scala:119) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$ 1.apply(TorrentBroadcast.scala:174) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152) ... 25 more 15/06/09 12:57:30 ERROR
[SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl
When I ran a spark streaming application longer, I noticed the local directory's size was kept increasing. I set spark.cleaner.ttl to 1800 seconds in order clean the metadata. The spark streaming batch duration is 10 seconds and checkpoint duration is 10 minutes. The setting took effect but after that, below exception happened. Do you have any idea about this error? Thank you! 15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0 (TID 27045, host2): java.io.IOException: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155) at org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr oadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro adcast.scala:64) at org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal a:64) at org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc ala:87) at org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute $3.apply(HashmapEnrichDStream.scala:39) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327) at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter .scala:202) at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter. scala:56) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6 8) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4 1) at org.apache.spark.scheduler.Task.run(Task.scala:64) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav a:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja va:615) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of broadcast_82 at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast .scala:137) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast .scala:137) at scala.Option.getOrElse(Option.scala:120) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.sc ala:136) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$T orrentBroadcast$$readBlocks(TorrentBroadcast.scala:119) at org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$ 1.apply(TorrentBroadcast.scala:174) at org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152) ... 25 more 15/06/09 12:57:30 ERROR TaskSetManager: Task 2 in stage 5038.0 failed 4 times; aborting job 15/06/09 12:57:30 ERROR JobScheduler: Error running job streaming job 143382585 ms.0 - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
spark.cleaner.ttl
Hi, I am using spark v 1.1.0. The default value of spark.cleaner.ttl is infinite as per the online docs. Since a lot of shuffle files are generated in /tmp/spark-local* and the disk is running out of space, we tested with a smaller value of ttl. However, even when job has completed and the timer expires, the files remain and instead of deleting, the timestamps of the files keep changing. How can we automatically delete these shuffle files, say after every 24 hours? thanks -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-cleaner-ttl-tp15574.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: spark.cleaner.ttl and spark.streaming.unpersist
I somehow missed that parameter when I was reviewing the documentation, that should do the trick! Thank you! 2014-09-10 2:10 GMT+01:00 Shao, Saisai saisai.s...@intel.com: Hi Luis, The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be used to remove useless timeout streaming data, the difference is that “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming input data, but also Spark’s useless metadata; while “spark.streaming.unpersist” is reference-based cleaning mechanism, streaming data will be removed when out of slide duration. Both these two parameter can alleviate the memory occupation of Spark Streaming. But if the data is flooded into Spark Streaming when start up like your situation using Kafka, these two parameters cannot well mitigate the problem. Actually you need to control the input data rate to not inject so fast, you can try “spark.straming.receiver.maxRate” to control the inject rate. Thanks Jerry *From:* Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com] *Sent:* Wednesday, September 10, 2014 5:21 AM *To:* user@spark.apache.org *Subject:* spark.cleaner.ttl and spark.streaming.unpersist The executors of my spark streaming application are being killed due to memory issues. The memory consumption is quite high on startup because is the first run and there are quite a few events on the kafka queues that are consumed at a rate of 100K events per sec. I wonder if it's recommended to use spark.cleaner.ttl and spark.streaming.unpersist together to mitigate that problem. And I also wonder if new RDD are being batched while a RDD is being processed. Regards, Luis
Re: spark.cleaner.ttl and spark.streaming.unpersist
I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case, the receivers die within an hour because Yarn kills the containers for high memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I don't think stale RDDs are an issue here. I did a jmap -histo on a couple of running receiver processes and in a heap of 30G, roughly ~16G is taken by [B which is byte arrays. Still investigating more and would appreciate pointers for troubleshooting. I have dumped the heap of a receiver and will try to go over it. On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: I somehow missed that parameter when I was reviewing the documentation, that should do the trick! Thank you! 2014-09-10 2:10 GMT+01:00 Shao, Saisai saisai.s...@intel.com: Hi Luis, The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be used to remove useless timeout streaming data, the difference is that “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming input data, but also Spark’s useless metadata; while “spark.streaming.unpersist” is reference-based cleaning mechanism, streaming data will be removed when out of slide duration. Both these two parameter can alleviate the memory occupation of Spark Streaming. But if the data is flooded into Spark Streaming when start up like your situation using Kafka, these two parameters cannot well mitigate the problem. Actually you need to control the input data rate to not inject so fast, you can try “spark.straming.receiver.maxRate” to control the inject rate. Thanks Jerry *From:* Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com] *Sent:* Wednesday, September 10, 2014 5:21 AM *To:* user@spark.apache.org *Subject:* spark.cleaner.ttl and spark.streaming.unpersist The executors of my spark streaming application are being killed due to memory issues. The memory consumption is quite high on startup because is the first run and there are quite a few events on the kafka queues that are consumed at a rate of 100K events per sec. I wonder if it's recommended to use spark.cleaner.ttl and spark.streaming.unpersist together to mitigate that problem. And I also wonder if new RDD are being batched while a RDD is being processed. Regards, Luis
Re: spark.cleaner.ttl and spark.streaming.unpersist
Tim, I asked a similar question twice: here http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-executors-to-stay-alive-tt12940.html and here http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Executor-OOM-tt12383.html and have not yet received any responses. I noticed that the heapdump only contains a very large byte array consuming about 66%(the second link contains a picture of my heap -- I ran with a small heap to be able to get the failure quickly) I don't have solutions but wanted to affirm that I've observed a similar situation... On Wed, Sep 10, 2014 at 2:24 PM, Tim Smith secs...@gmail.com wrote: I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case, the receivers die within an hour because Yarn kills the containers for high memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I don't think stale RDDs are an issue here. I did a jmap -histo on a couple of running receiver processes and in a heap of 30G, roughly ~16G is taken by [B which is byte arrays. Still investigating more and would appreciate pointers for troubleshooting. I have dumped the heap of a receiver and will try to go over it. On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: I somehow missed that parameter when I was reviewing the documentation, that should do the trick! Thank you! 2014-09-10 2:10 GMT+01:00 Shao, Saisai saisai.s...@intel.com: Hi Luis, The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be used to remove useless timeout streaming data, the difference is that “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming input data, but also Spark’s useless metadata; while “spark.streaming.unpersist” is reference-based cleaning mechanism, streaming data will be removed when out of slide duration. Both these two parameter can alleviate the memory occupation of Spark Streaming. But if the data is flooded into Spark Streaming when start up like your situation using Kafka, these two parameters cannot well mitigate the problem. Actually you need to control the input data rate to not inject so fast, you can try “spark.straming.receiver.maxRate” to control the inject rate. Thanks Jerry *From:* Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com] *Sent:* Wednesday, September 10, 2014 5:21 AM *To:* user@spark.apache.org *Subject:* spark.cleaner.ttl and spark.streaming.unpersist The executors of my spark streaming application are being killed due to memory issues. The memory consumption is quite high on startup because is the first run and there are quite a few events on the kafka queues that are consumed at a rate of 100K events per sec. I wonder if it's recommended to use spark.cleaner.ttl and spark.streaming.unpersist together to mitigate that problem. And I also wonder if new RDD are being batched while a RDD is being processed. Regards, Luis
Re: spark.cleaner.ttl and spark.streaming.unpersist
I switched from Yarn to StandAlone mode and haven't had OOM issue yet. However, now I have Akka issues killing the executor: 2014-09-11 02:43:34,543 INFO akka.actor.LocalActorRef: Message [akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying] from Actor[akka://sparkWorker/deadLetters] to Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.2.16.8%3A44405-6#1549270895] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. Before I switched from Yarn to Standalone, I tried looking at heaps of running executors. What I found odd was that while both - jmap histo:live and jmap histo showed heap usage in few hundreds of MBytes, Yarn kept showing that memory utilization is in several Gigabytes - eventually leading to the container being killed. I would appreciate if someone can duplicate what I am seeing. Basically: 1. Tail your yarn container logs and see what it is reporting as memory used by the JVM 2. In parallel, run jmap -histo:live pid or jmap histo pid on the executor process. They should be about the same, right? Also, in the heap dump, 99% of the heap seems to be occupied with unreachable objects (and most of it is byte arrays). On Wed, Sep 10, 2014 at 12:06 PM, Tim Smith secs...@gmail.com wrote: Actually, I am not doing any explicit shuffle/updateByKey or other transform functions. In my program flow, I take in data from Kafka, match each message against a list of regex and then if a msg matches a regex then extract groups, stuff them in json and push out back to kafka (different topic). So there is really no dependency between two messages in terms of processing. Here's my container histogram: http://pastebin.com/s3nAT3cY Essentially, my app is a cluster grep on steroids. On Wed, Sep 10, 2014 at 11:34 AM, Yana Kadiyska yana.kadiy...@gmail.com wrote: Tim, I asked a similar question twice: here http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-executors-to-stay-alive-tt12940.html and here http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Executor-OOM-tt12383.html and have not yet received any responses. I noticed that the heapdump only contains a very large byte array consuming about 66%(the second link contains a picture of my heap -- I ran with a small heap to be able to get the failure quickly) I don't have solutions but wanted to affirm that I've observed a similar situation... On Wed, Sep 10, 2014 at 2:24 PM, Tim Smith secs...@gmail.com wrote: I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case, the receivers die within an hour because Yarn kills the containers for high memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I don't think stale RDDs are an issue here. I did a jmap -histo on a couple of running receiver processes and in a heap of 30G, roughly ~16G is taken by [B which is byte arrays. Still investigating more and would appreciate pointers for troubleshooting. I have dumped the heap of a receiver and will try to go over it. On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez langel.gro...@gmail.com wrote: I somehow missed that parameter when I was reviewing the documentation, that should do the trick! Thank you! 2014-09-10 2:10 GMT+01:00 Shao, Saisai saisai.s...@intel.com: Hi Luis, The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be used to remove useless timeout streaming data, the difference is that “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming input data, but also Spark’s useless metadata; while “spark.streaming.unpersist” is reference-based cleaning mechanism, streaming data will be removed when out of slide duration. Both these two parameter can alleviate the memory occupation of Spark Streaming. But if the data is flooded into Spark Streaming when start up like your situation using Kafka, these two parameters cannot well mitigate the problem. Actually you need to control the input data rate to not inject so fast, you can try “spark.straming.receiver.maxRate” to control the inject rate. Thanks Jerry From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com] Sent: Wednesday, September 10, 2014 5:21 AM To: user@spark.apache.org Subject: spark.cleaner.ttl and spark.streaming.unpersist The executors of my spark streaming application are being killed due to memory issues. The memory consumption is quite high on startup because is the first run and there are quite a few events on the kafka queues that are consumed at a rate of 100K events per sec. I wonder if it's recommended to use spark.cleaner.ttl and spark.streaming.unpersist together to mitigate that problem. And I also wonder if new RDD are being batched while a RDD is being
spark.cleaner.ttl and spark.streaming.unpersist
The executors of my spark streaming application are being killed due to memory issues. The memory consumption is quite high on startup because is the first run and there are quite a few events on the kafka queues that are consumed at a rate of 100K events per sec. I wonder if it's recommended to use spark.cleaner.ttl and spark.streaming.unpersist together to mitigate that problem. And I also wonder if new RDD are being batched while a RDD is being processed. Regards, Luis
spark.streaming.unpersist and spark.cleaner.ttl
I have a DStream receiving data from a socket. I'm using local mode. I set spark.streaming.unpersist to false and leave spark.cleaner.ttl to be infinite. I can see files for input and shuffle blocks under spark.local.dir folder and the size of folder keeps increasing, although JVM's memory usage seems to be stable. [question] In this case, because input RDDs are persisted but they don't fit into memory, so write to disk, right? And where can I see the details about these RDDs? I don't see them in web UI. Then I set spark.streaming.unpersist to true, the size of spark.local.dir folder and JVM's used heap size are reduced regularly. [question] In this case, because I didn't change spark.cleaner.ttl, which component is doing the cleanup? And what's the difference if I set spark.cleaner.ttl to some duration in this case? Thank you!
RE: spark.streaming.unpersist and spark.cleaner.ttl
Hi Haopu, Please see the inline comments. Thanks Jerry -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Wednesday, July 23, 2014 3:00 PM To: user@spark.apache.org Subject: spark.streaming.unpersist and spark.cleaner.ttl I have a DStream receiving data from a socket. I'm using local mode. I set spark.streaming.unpersist to false and leave spark.cleaner.ttl to be infinite. I can see files for input and shuffle blocks under spark.local.dir folder and the size of folder keeps increasing, although JVM's memory usage seems to be stable. [question] In this case, because input RDDs are persisted but they don't fit into memory, so write to disk, right? And where can I see the details about these RDDs? I don't see them in web UI. [answer] Yes, if memory is not enough to put input RDDs, this data will be flush to disk, because the default storage level is MEMORY_AND_DISK_SER_2 as you can see in StreamingContext.scala. Actually you cannot not see the input RDD in web UI, you can only see the cached RDD in web UI. Then I set spark.streaming.unpersist to true, the size of spark.local.dir folder and JVM's used heap size are reduced regularly. [question] In this case, because I didn't change spark.cleaner.ttl, which component is doing the cleanup? And what's the difference if I set spark.cleaner.ttl to some duration in this case? [answer] If you set spark.streaming.unpersist to true, old unused rdd will be deleted, as you can see in DStream.scala. While spark.cleaner.ttl is timer-based spark cleaner, not only clean streaming data, but also broadcast, shuffle and other data. Thank you!
RE: spark.streaming.unpersist and spark.cleaner.ttl
Jerry, thanks for the response. For the default storage level of DStream, it looks like Spark's document is wrong. In this link: http://spark.apache.org/docs/latest/streaming-programming-guide.html#memory-tuning It mentions: Default persistence level of DStreams: Unlike RDDs, the default persistence level of DStreams serializes the data in memory (that is, StorageLevel.MEMORY_ONLY_SER for DStream compared to StorageLevel.MEMORY_ONLY for RDDs). Even though keeping the data serialized incurs higher serialization/deserialization overheads, it significantly reduces GC pauses. I will take a look at DStream.scala although I have no Scala experience. -Original Message- From: Shao, Saisai [mailto:saisai.s...@intel.com] Sent: 2014年7月23日 15:13 To: user@spark.apache.org Subject: RE: spark.streaming.unpersist and spark.cleaner.ttl Hi Haopu, Please see the inline comments. Thanks Jerry -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Wednesday, July 23, 2014 3:00 PM To: user@spark.apache.org Subject: spark.streaming.unpersist and spark.cleaner.ttl I have a DStream receiving data from a socket. I'm using local mode. I set spark.streaming.unpersist to false and leave spark.cleaner.ttl to be infinite. I can see files for input and shuffle blocks under spark.local.dir folder and the size of folder keeps increasing, although JVM's memory usage seems to be stable. [question] In this case, because input RDDs are persisted but they don't fit into memory, so write to disk, right? And where can I see the details about these RDDs? I don't see them in web UI. [answer] Yes, if memory is not enough to put input RDDs, this data will be flush to disk, because the default storage level is MEMORY_AND_DISK_SER_2 as you can see in StreamingContext.scala. Actually you cannot not see the input RDD in web UI, you can only see the cached RDD in web UI. Then I set spark.streaming.unpersist to true, the size of spark.local.dir folder and JVM's used heap size are reduced regularly. [question] In this case, because I didn't change spark.cleaner.ttl, which component is doing the cleanup? And what's the difference if I set spark.cleaner.ttl to some duration in this case? [answer] If you set spark.streaming.unpersist to true, old unused rdd will be deleted, as you can see in DStream.scala. While spark.cleaner.ttl is timer-based spark cleaner, not only clean streaming data, but also broadcast, shuffle and other data. Thank you!
RE: spark.streaming.unpersist and spark.cleaner.ttl
Yeah, the document may not be precisely aligned with latest code, so the best way is to check the code. -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Wednesday, July 23, 2014 5:56 PM To: user@spark.apache.org Subject: RE: spark.streaming.unpersist and spark.cleaner.ttl Jerry, thanks for the response. For the default storage level of DStream, it looks like Spark's document is wrong. In this link: http://spark.apache.org/docs/latest/streaming-programming-guide.html#memory-tuning It mentions: Default persistence level of DStreams: Unlike RDDs, the default persistence level of DStreams serializes the data in memory (that is, StorageLevel.MEMORY_ONLY_SER for DStream compared to StorageLevel.MEMORY_ONLY for RDDs). Even though keeping the data serialized incurs higher serialization/deserialization overheads, it significantly reduces GC pauses. I will take a look at DStream.scala although I have no Scala experience. -Original Message- From: Shao, Saisai [mailto:saisai.s...@intel.com] Sent: 2014年7月23日 15:13 To: user@spark.apache.org Subject: RE: spark.streaming.unpersist and spark.cleaner.ttl Hi Haopu, Please see the inline comments. Thanks Jerry -Original Message- From: Haopu Wang [mailto:hw...@qilinsoft.com] Sent: Wednesday, July 23, 2014 3:00 PM To: user@spark.apache.org Subject: spark.streaming.unpersist and spark.cleaner.ttl I have a DStream receiving data from a socket. I'm using local mode. I set spark.streaming.unpersist to false and leave spark.cleaner.ttl to be infinite. I can see files for input and shuffle blocks under spark.local.dir folder and the size of folder keeps increasing, although JVM's memory usage seems to be stable. [question] In this case, because input RDDs are persisted but they don't fit into memory, so write to disk, right? And where can I see the details about these RDDs? I don't see them in web UI. [answer] Yes, if memory is not enough to put input RDDs, this data will be flush to disk, because the default storage level is MEMORY_AND_DISK_SER_2 as you can see in StreamingContext.scala. Actually you cannot not see the input RDD in web UI, you can only see the cached RDD in web UI. Then I set spark.streaming.unpersist to true, the size of spark.local.dir folder and JVM's used heap size are reduced regularly. [question] In this case, because I didn't change spark.cleaner.ttl, which component is doing the cleanup? And what's the difference if I set spark.cleaner.ttl to some duration in this case? [answer] If you set spark.streaming.unpersist to true, old unused rdd will be deleted, as you can see in DStream.scala. While spark.cleaner.ttl is timer-based spark cleaner, not only clean streaming data, but also broadcast, shuffle and other data. Thank you!
is spark.cleaner.ttl safe?
Hello, I've been trying to run an iterative spark job that spills 1+ GB to disk per iteration on a system with limited disk space. I believe there's enough space if spark would clean up unused data from previous iterations, but as it stands the number of iterations I can run is limited by available disk space. I found a thread on the usage of spark.cleaner.ttl on the old Spark Users Google group here: https://groups.google.com/forum/#!topic/spark-users/9ebKcNCDih4 I think this setting may be what I'm looking for, however the cleaner seems to delete data that's still in use. The effect is I get bizarre exceptions from Spark complaining about missing broadcast data or ArrayIndexOutOfBounds. When is spark.cleaner.ttl safe to use? Is it supposed to delete in-use data or is this a bug/shortcoming? Cheers, Michael