Re: spark.cleaner.ttl for 1.4.1

2015-11-30 Thread Josh Rosen
AFAIK the ContextCleaner should perform all of the cleaning *as long as
garbage collection is performed frequently enough on the driver*. See
https://issues.apache.org/jira/browse/SPARK-7689 and
https://github.com/apache/spark/pull/6220#issuecomment-102950055 for
discussion of this technicality.

On Mon, Nov 30, 2015 at 8:46 AM Michal Čizmazia <mici...@gmail.com> wrote:

> Does *spark.cleaner.ttl *still need to be used for Spark *1.4.1 *long-running
> streaming jobs? Or does *ContextCleaner* alone do all the cleaning?
>


RE: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl

2015-06-09 Thread Shao, Saisai
The shuffle data can be deleted through weak reference mechanism, you could 
check the code of ContextCleaner, also you could trigger a full gc manually 
with JVisualVM or some other tools to see if shuffle files are deleted.

Thanks
Jerry

From: Haopu Wang [mailto:hw...@qilinsoft.com]
Sent: Tuesday, June 9, 2015 5:28 PM
To: Shao, Saisai; user
Subject: RE: [SparkStreaming 1.3.0] Broadcast failure after setting 
spark.cleaner.ttl


Jerry, I agree with you.



However, in my case, I kept the monitoring the blockmanager folder. I do see 
sometimes the number of files decreased, but the folder's size kept increasing.



And below is a screenshot of the folder. You can see some old files are not 
deleted somehow.



[cid:image001.jpg@01D0A2DB.739904D0]



-Original Message-
From: Shao, Saisai [mailto:saisai.s...@intel.com]
Sent: Tuesday, June 09, 2015 4:33 PM
To: Haopu Wang; user
Subject: RE: [SparkStreaming 1.3.0] Broadcast failure after setting 
spark.cleaner.ttl



From the stack I think this problem may be due to the deletion of broadcast 
variable, as you set the spark.cleaner.ttl, so after this timeout limit, the 
old broadcast variable will be deleted,  you will meet this exception when you 
want to use it again after that time limit.



Basically I think you don't need to use this configuration, Spark Streaming 
will automatically delete the old, unused data, also Spark itself will delete 
this metadata using weak reference. Also this configuration will be deprecated 
in the coming release.





Thanks

Jerry



-Original Message-

From: Haopu Wang [mailto:hw...@qilinsoft.com]

Sent: Tuesday, June 9, 2015 3:30 PM

To: user

Subject: [SparkStreaming 1.3.0] Broadcast failure after setting 
spark.cleaner.ttl



When I ran a spark streaming application longer, I noticed the local 
directory's size was kept increasing.



I set spark.cleaner.ttl to 1800 seconds in order clean the metadata.



The spark streaming batch duration is 10 seconds and checkpoint duration is 10 
minutes.



The setting took effect but after that, below exception happened.



Do you have any idea about this error? Thank you!







15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0 (TID 
27045, host2): java.io.IOException:

org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of

broadcast_82

at

org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)

at

org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr

oadcast.scala:164)

at

org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro

adcast.scala:64)

at

org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal

a:64)

at

org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc

ala:87)

at

org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)

at

org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute

$3.apply(HashmapEnrichDStream.scala:39)

at

org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute

$3.apply(HashmapEnrichDStream.scala:39)

at

scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter

.scala:202)

at

org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.

scala:56)

at

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6

8)

at

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4

1)

at org.apache.spark.scheduler.Task.run(Task.scala:64)

at

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)

at

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav

a:1145)

at

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja

va:615)

at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.spark.SparkException: Failed to get

broadcast_82_piece0 of broadcast_82

at

org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br

oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast

.scala:137)

at

org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org

RE: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl

2015-06-09 Thread Haopu Wang
Jerry, I agree with you.

 

However, in my case, I kept the monitoring the blockmanager folder. I
do see sometimes the number of files decreased, but the folder's size
kept increasing.

 

And below is a screenshot of the folder. You can see some old files are
not deleted somehow.

 

 

 

-Original Message-
From: Shao, Saisai [mailto:saisai.s...@intel.com] 
Sent: Tuesday, June 09, 2015 4:33 PM
To: Haopu Wang; user
Subject: RE: [SparkStreaming 1.3.0] Broadcast failure after setting
spark.cleaner.ttl

 

From the stack I think this problem may be due to the deletion of
broadcast variable, as you set the spark.cleaner.ttl, so after this
timeout limit, the old broadcast variable will be deleted,  you will
meet this exception when you want to use it again after that time limit.

 

Basically I think you don't need to use this configuration, Spark
Streaming will automatically delete the old, unused data, also Spark
itself will delete this metadata using weak reference. Also this
configuration will be deprecated in the coming release.

 

Thanks

Jerry

 

-Original Message-

From: Haopu Wang [mailto:hw...@qilinsoft.com] 

Sent: Tuesday, June 9, 2015 3:30 PM

To: user

Subject: [SparkStreaming 1.3.0] Broadcast failure after setting
spark.cleaner.ttl

 

When I ran a spark streaming application longer, I noticed the local
directory's size was kept increasing.

 

I set spark.cleaner.ttl to 1800 seconds in order clean the metadata.

 

The spark streaming batch duration is 10 seconds and checkpoint duration
is 10 minutes.

 

The setting took effect but after that, below exception happened.

 

Do you have any idea about this error? Thank you!

 



 

15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0
(TID 27045, host2): java.io.IOException:

org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of

broadcast_82

at

org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)

at

org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr

oadcast.scala:164)

at

org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro

adcast.scala:64)

at

org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal

a:64)

at

org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc

ala:87)

at

org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)

at

org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute

$3.apply(HashmapEnrichDStream.scala:39)

at

org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute

$3.apply(HashmapEnrichDStream.scala:39)

at

scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter

.scala:202)

at

org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.

scala:56)

at

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6

8)

at

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4

1)

at org.apache.spark.scheduler.Task.run(Task.scala:64)

at

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)

at

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav

a:1145)

at

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja

va:615)

at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.spark.SparkException: Failed to get

broadcast_82_piece0 of broadcast_82

at

org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br

oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast

.scala:137)

at

org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br

oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast

.scala:137)

at scala.Option.getOrElse(Option.scala:120)

at

org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br

oadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.sc

ala:136)

at

org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br

oadcast$TorrentBroadcast$$readBlocks$1.apply

Re: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl

2015-06-09 Thread Benjamin Fradet
Hi,

Are you restarting your Spark streaming context through getOrCreate?
On 9 Jun 2015 09:30, Haopu Wang hw...@qilinsoft.com wrote:

 When I ran a spark streaming application longer, I noticed the local
 directory's size was kept increasing.

 I set spark.cleaner.ttl to 1800 seconds in order clean the metadata.

 The spark streaming batch duration is 10 seconds and checkpoint duration
 is 10 minutes.

 The setting took effect but after that, below exception happened.

 Do you have any idea about this error? Thank you!

 

 15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0
 (TID 27045, host2): java.io.IOException:
 org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of
 broadcast_82
 at
 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)
 at
 org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr
 oadcast.scala:164)
 at
 org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro
 adcast.scala:64)
 at
 org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal
 a:64)
 at
 org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc
 ala:87)
 at
 org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
 at
 org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute
 $3.apply(HashmapEnrichDStream.scala:39)
 at
 org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute
 $3.apply(HashmapEnrichDStream.scala:39)
 at
 scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
 at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at
 scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
 at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at
 scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
 at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at
 scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
 at
 org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter
 .scala:202)
 at
 org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.
 scala:56)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6
 8)
 at
 org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4
 1)
 at org.apache.spark.scheduler.Task.run(Task.scala:64)
 at
 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
 at
 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav
 a:1145)
 at
 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja
 va:615)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: org.apache.spark.SparkException: Failed to get
 broadcast_82_piece0 of broadcast_82
 at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
 oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast
 .scala:137)
 at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
 oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast
 .scala:137)
 at scala.Option.getOrElse(Option.scala:120)
 at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
 oadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.sc
 ala:136)
 at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
 oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
 at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
 oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$T
 orrentBroadcast$$readBlocks(TorrentBroadcast.scala:119)
 at
 org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$
 1.apply(TorrentBroadcast.scala:174)
 at
 org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152)
 ... 25 more

 15/06/09 12:57:30 ERROR TaskSetManager: Task 2 in stage 5038.0 failed 4
 times; aborting job
 15/06/09 12:57:30 ERROR JobScheduler: Error running job streaming job
 143382585 ms.0





 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




RE: [SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl

2015-06-09 Thread Shao, Saisai
From the stack I think this problem may be due to the deletion of broadcast 
variable, as you set the spark.cleaner.ttl, so after this timeout limit, the 
old broadcast variable will be deleted,  you will meet this exception when you 
want to use it again after that time limit.

Basically I think you don't need to use this configuration, Spark Streaming 
will automatically delete the old, unused data, also Spark itself will delete 
this metadata using weak reference. Also this configuration will be deprecated 
in the coming release.

Thanks
Jerry

-Original Message-
From: Haopu Wang [mailto:hw...@qilinsoft.com] 
Sent: Tuesday, June 9, 2015 3:30 PM
To: user
Subject: [SparkStreaming 1.3.0] Broadcast failure after setting 
spark.cleaner.ttl

When I ran a spark streaming application longer, I noticed the local 
directory's size was kept increasing.

I set spark.cleaner.ttl to 1800 seconds in order clean the metadata.

The spark streaming batch duration is 10 seconds and checkpoint duration is 10 
minutes.

The setting took effect but after that, below exception happened.

Do you have any idea about this error? Thank you!



15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0 (TID 
27045, host2): java.io.IOException:
org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of
broadcast_82
at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)
at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr
oadcast.scala:164)
at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro
adcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal
a:64)
at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc
ala:87)
at
org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at
org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute
$3.apply(HashmapEnrichDStream.scala:39)
at
org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute
$3.apply(HashmapEnrichDStream.scala:39)
at
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter
.scala:202)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.
scala:56)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6
8)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4
1)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav
a:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja
va:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Failed to get
broadcast_82_piece0 of broadcast_82
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast
.scala:137)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast
.scala:137)
at scala.Option.getOrElse(Option.scala:120)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.sc
ala:136)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
at scala.collection.immutable.List.foreach(List.scala:318)
at
org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$T
orrentBroadcast$$readBlocks(TorrentBroadcast.scala:119)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$
1.apply(TorrentBroadcast.scala:174)
at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152)
... 25 more

15/06/09 12:57:30 ERROR

[SparkStreaming 1.3.0] Broadcast failure after setting spark.cleaner.ttl

2015-06-09 Thread Haopu Wang
When I ran a spark streaming application longer, I noticed the local
directory's size was kept increasing.

I set spark.cleaner.ttl to 1800 seconds in order clean the metadata.

The spark streaming batch duration is 10 seconds and checkpoint duration
is 10 minutes.

The setting took effect but after that, below exception happened.

Do you have any idea about this error? Thank you!



15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0
(TID 27045, host2): java.io.IOException:
org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of
broadcast_82
at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)
at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr
oadcast.scala:164)
at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro
adcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal
a:64)
at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc
ala:87)
at
org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at
org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute
$3.apply(HashmapEnrichDStream.scala:39)
at
org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute
$3.apply(HashmapEnrichDStream.scala:39)
at
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter
.scala:202)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.
scala:56)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6
8)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4
1)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav
a:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja
va:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Failed to get
broadcast_82_piece0 of broadcast_82
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast
.scala:137)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast
.scala:137)
at scala.Option.getOrElse(Option.scala:120)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.sc
ala:136)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
at scala.collection.immutable.List.foreach(List.scala:318)
at
org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$T
orrentBroadcast$$readBlocks(TorrentBroadcast.scala:119)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$
1.apply(TorrentBroadcast.scala:174)
at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152)
... 25 more

15/06/09 12:57:30 ERROR TaskSetManager: Task 2 in stage 5038.0 failed 4
times; aborting job
15/06/09 12:57:30 ERROR JobScheduler: Error running job streaming job
143382585 ms.0





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark.cleaner.ttl

2014-10-01 Thread SK
Hi,

I am using spark v 1.1.0. The default value of spark.cleaner.ttl is infinite
as per the online docs. Since a lot of  shuffle files are generated in
/tmp/spark-local* and the disk is running out of space, we tested with a
smaller value of ttl. However, even when job has completed and the timer 
expires, the files remain and instead of deleting, the timestamps  of the
files keep changing. How can we automatically delete these shuffle files,
say after every 24 hours? 

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-cleaner-ttl-tp15574.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark.cleaner.ttl and spark.streaming.unpersist

2014-09-10 Thread Luis Ángel Vicente Sánchez
I somehow missed that parameter when I was reviewing the documentation,
that should do the trick! Thank you!

2014-09-10 2:10 GMT+01:00 Shao, Saisai saisai.s...@intel.com:

  Hi Luis,



 The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be
 used to remove useless timeout streaming data, the difference is that
 “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming
 input data, but also Spark’s useless metadata; while
 “spark.streaming.unpersist” is reference-based cleaning mechanism,
 streaming data will be removed when out of slide duration.



 Both these two parameter can alleviate the memory occupation of Spark
 Streaming. But if the data is flooded into Spark Streaming when start up
 like your situation using Kafka, these two parameters cannot well mitigate
 the problem. Actually you need to control the input data rate to not inject
 so fast, you can try “spark.straming.receiver.maxRate” to control the
 inject rate.



 Thanks

 Jerry



 *From:* Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
 *Sent:* Wednesday, September 10, 2014 5:21 AM
 *To:* user@spark.apache.org
 *Subject:* spark.cleaner.ttl and spark.streaming.unpersist



 The executors of my spark streaming application are being killed due to
 memory issues. The memory consumption is quite high on startup because is
 the first run and there are quite a few events on the kafka queues that are
 consumed at a rate of 100K events per sec.

 I wonder if it's recommended to use spark.cleaner.ttl and
 spark.streaming.unpersist together to mitigate that problem. And I also
 wonder if new RDD are being batched while a RDD is being processed.

 Regards,

 Luis



Re: spark.cleaner.ttl and spark.streaming.unpersist

2014-09-10 Thread Tim Smith
I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case,
the receivers die within an hour because Yarn kills the containers for high
memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I
don't think stale RDDs are an issue here. I did a jmap -histo on a couple
of running receiver processes and in a heap of 30G, roughly ~16G is taken
by [B which is byte arrays.

Still investigating more and would appreciate pointers for troubleshooting.
I have dumped the heap of a receiver and will try to go over it.




On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez 
langel.gro...@gmail.com wrote:

 I somehow missed that parameter when I was reviewing the documentation,
 that should do the trick! Thank you!

 2014-09-10 2:10 GMT+01:00 Shao, Saisai saisai.s...@intel.com:

  Hi Luis,



 The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be
 used to remove useless timeout streaming data, the difference is that
 “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming
 input data, but also Spark’s useless metadata; while
 “spark.streaming.unpersist” is reference-based cleaning mechanism,
 streaming data will be removed when out of slide duration.



 Both these two parameter can alleviate the memory occupation of Spark
 Streaming. But if the data is flooded into Spark Streaming when start up
 like your situation using Kafka, these two parameters cannot well mitigate
 the problem. Actually you need to control the input data rate to not inject
 so fast, you can try “spark.straming.receiver.maxRate” to control the
 inject rate.



 Thanks

 Jerry



 *From:* Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
 *Sent:* Wednesday, September 10, 2014 5:21 AM
 *To:* user@spark.apache.org
 *Subject:* spark.cleaner.ttl and spark.streaming.unpersist



 The executors of my spark streaming application are being killed due to
 memory issues. The memory consumption is quite high on startup because is
 the first run and there are quite a few events on the kafka queues that are
 consumed at a rate of 100K events per sec.

 I wonder if it's recommended to use spark.cleaner.ttl and
 spark.streaming.unpersist together to mitigate that problem. And I also
 wonder if new RDD are being batched while a RDD is being processed.

 Regards,

 Luis





Re: spark.cleaner.ttl and spark.streaming.unpersist

2014-09-10 Thread Yana Kadiyska
Tim, I asked a similar question twice:
here
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-executors-to-stay-alive-tt12940.html
and here
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Executor-OOM-tt12383.html

and have not yet received any responses. I noticed that the heapdump only
contains a very large byte array consuming about 66%(the second link
contains a picture of my heap -- I ran with a small heap to be able to get
the failure quickly)

I don't have solutions but wanted to affirm that I've observed a similar
situation...

On Wed, Sep 10, 2014 at 2:24 PM, Tim Smith secs...@gmail.com wrote:

 I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case,
 the receivers die within an hour because Yarn kills the containers for high
 memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I
 don't think stale RDDs are an issue here. I did a jmap -histo on a couple
 of running receiver processes and in a heap of 30G, roughly ~16G is taken
 by [B which is byte arrays.

 Still investigating more and would appreciate pointers for
 troubleshooting. I have dumped the heap of a receiver and will try to go
 over it.




 On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez 
 langel.gro...@gmail.com wrote:

 I somehow missed that parameter when I was reviewing the documentation,
 that should do the trick! Thank you!

 2014-09-10 2:10 GMT+01:00 Shao, Saisai saisai.s...@intel.com:

  Hi Luis,



 The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be
 used to remove useless timeout streaming data, the difference is that
 “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming
 input data, but also Spark’s useless metadata; while
 “spark.streaming.unpersist” is reference-based cleaning mechanism,
 streaming data will be removed when out of slide duration.



 Both these two parameter can alleviate the memory occupation of Spark
 Streaming. But if the data is flooded into Spark Streaming when start up
 like your situation using Kafka, these two parameters cannot well mitigate
 the problem. Actually you need to control the input data rate to not inject
 so fast, you can try “spark.straming.receiver.maxRate” to control the
 inject rate.



 Thanks

 Jerry



 *From:* Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
 *Sent:* Wednesday, September 10, 2014 5:21 AM
 *To:* user@spark.apache.org
 *Subject:* spark.cleaner.ttl and spark.streaming.unpersist



 The executors of my spark streaming application are being killed due to
 memory issues. The memory consumption is quite high on startup because is
 the first run and there are quite a few events on the kafka queues that are
 consumed at a rate of 100K events per sec.

 I wonder if it's recommended to use spark.cleaner.ttl and
 spark.streaming.unpersist together to mitigate that problem. And I also
 wonder if new RDD are being batched while a RDD is being processed.

 Regards,

 Luis






Re: spark.cleaner.ttl and spark.streaming.unpersist

2014-09-10 Thread Tim Smith
I switched from Yarn to StandAlone mode and haven't had OOM issue yet.
However, now I have Akka issues killing the executor:

2014-09-11 02:43:34,543 INFO akka.actor.LocalActorRef: Message
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying]
from Actor[akka://sparkWorker/deadLetters] to
Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.2.16.8%3A44405-6#1549270895]
was not delivered. [2] dead letters encountered. This logging can be
turned off or adjusted with configuration settings
'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Before I switched from Yarn to Standalone, I tried looking at heaps of
running executors. What I found odd was that while both - jmap
histo:live and jmap histo showed heap usage in few hundreds of MBytes,
Yarn kept showing that memory utilization is in several Gigabytes -
eventually leading to the container being killed.

I would appreciate if someone can duplicate what I am seeing. Basically:
1. Tail your yarn container logs and see what it is reporting as
memory used by the JVM
2. In parallel, run jmap -histo:live pid or jmap histo pid on
the executor process.

They should be about the same, right?

Also, in the heap dump, 99% of the heap seems to be occupied with
unreachable objects (and most of it is byte arrays).




On Wed, Sep 10, 2014 at 12:06 PM, Tim Smith secs...@gmail.com wrote:
 Actually, I am not doing any explicit shuffle/updateByKey or other
 transform functions. In my program flow, I take in data from Kafka,
 match each message against a list of regex and then if a msg matches a
 regex then extract groups, stuff them in json and push out back to
 kafka (different topic). So there is really no dependency between two
 messages in terms of processing. Here's my container histogram:
 http://pastebin.com/s3nAT3cY

 Essentially, my app is a cluster grep on steroids.



 On Wed, Sep 10, 2014 at 11:34 AM, Yana Kadiyska yana.kadiy...@gmail.com 
 wrote:
 Tim, I asked a similar question twice:
 here
 http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-executors-to-stay-alive-tt12940.html
 and here
 http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Executor-OOM-tt12383.html

 and have not yet received any responses. I noticed that the heapdump only
 contains a very large byte array consuming about 66%(the second link
 contains a picture of my heap -- I ran with a small heap to be able to get
 the failure quickly)

 I don't have solutions but wanted to affirm that I've observed a similar
 situation...

 On Wed, Sep 10, 2014 at 2:24 PM, Tim Smith secs...@gmail.com wrote:

 I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case,
 the receivers die within an hour because Yarn kills the containers for high
 memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I
 don't think stale RDDs are an issue here. I did a jmap -histo on a couple
 of running receiver processes and in a heap of 30G, roughly ~16G is taken by
 [B which is byte arrays.

 Still investigating more and would appreciate pointers for
 troubleshooting. I have dumped the heap of a receiver and will try to go
 over it.




 On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez
 langel.gro...@gmail.com wrote:

 I somehow missed that parameter when I was reviewing the documentation,
 that should do the trick! Thank you!

 2014-09-10 2:10 GMT+01:00 Shao, Saisai saisai.s...@intel.com:

 Hi Luis,



 The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be
 used to remove useless timeout streaming data, the difference is that
 “spark.cleaner.ttl” is time-based cleaner, it does not only clean 
 streaming
 input data, but also Spark’s useless metadata; while
 “spark.streaming.unpersist” is reference-based cleaning mechanism, 
 streaming
 data will be removed when out of slide duration.



 Both these two parameter can alleviate the memory occupation of Spark
 Streaming. But if the data is flooded into Spark Streaming when start up
 like your situation using Kafka, these two parameters cannot well mitigate
 the problem. Actually you need to control the input data rate to not 
 inject
 so fast, you can try “spark.straming.receiver.maxRate” to control the 
 inject
 rate.



 Thanks

 Jerry



 From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
 Sent: Wednesday, September 10, 2014 5:21 AM
 To: user@spark.apache.org
 Subject: spark.cleaner.ttl and spark.streaming.unpersist



 The executors of my spark streaming application are being killed due to
 memory issues. The memory consumption is quite high on startup because is
 the first run and there are quite a few events on the kafka queues that 
 are
 consumed at a rate of 100K events per sec.

 I wonder if it's recommended to use spark.cleaner.ttl and
 spark.streaming.unpersist together to mitigate that problem. And I also
 wonder if new RDD are being batched while a RDD is being

spark.cleaner.ttl and spark.streaming.unpersist

2014-09-09 Thread Luis Ángel Vicente Sánchez
The executors of my spark streaming application are being killed due to
memory issues. The memory consumption is quite high on startup because is
the first run and there are quite a few events on the kafka queues that are
consumed at a rate of 100K events per sec.

I wonder if it's recommended to use spark.cleaner.ttl and
spark.streaming.unpersist together to mitigate that problem. And I also
wonder if new RDD are being batched while a RDD is being processed.

Regards,

Luis


spark.streaming.unpersist and spark.cleaner.ttl

2014-07-23 Thread Haopu Wang
I have a DStream receiving data from a socket. I'm using local mode.
I set spark.streaming.unpersist to false and leave 
spark.cleaner.ttl to be infinite.
I can see files for input and shuffle blocks under spark.local.dir
folder and the size of folder keeps increasing, although JVM's memory
usage seems to be stable.

[question] In this case, because input RDDs are persisted but they don't
fit into memory, so write to disk, right? And where can I see the
details about these RDDs? I don't see them in web UI.

Then I set spark.streaming.unpersist to true, the size of
spark.local.dir folder and JVM's used heap size are reduced regularly.

[question] In this case, because I didn't change spark.cleaner.ttl,
which component is doing the cleanup? And what's the difference if I set
spark.cleaner.ttl to some duration in this case?

Thank you!



RE: spark.streaming.unpersist and spark.cleaner.ttl

2014-07-23 Thread Shao, Saisai
Hi Haopu, 

Please see the inline comments.

Thanks
Jerry

-Original Message-
From: Haopu Wang [mailto:hw...@qilinsoft.com] 
Sent: Wednesday, July 23, 2014 3:00 PM
To: user@spark.apache.org
Subject: spark.streaming.unpersist and spark.cleaner.ttl

I have a DStream receiving data from a socket. I'm using local mode.
I set spark.streaming.unpersist to false and leave 
spark.cleaner.ttl to be infinite.
I can see files for input and shuffle blocks under spark.local.dir
folder and the size of folder keeps increasing, although JVM's memory usage 
seems to be stable.

[question] In this case, because input RDDs are persisted but they don't fit 
into memory, so write to disk, right? And where can I see the details about 
these RDDs? I don't see them in web UI.

[answer] Yes, if memory is not enough to put input RDDs, this data will be 
flush to disk, because the default storage level is MEMORY_AND_DISK_SER_2 as 
you can see in StreamingContext.scala. Actually you cannot not see the input 
RDD in web UI, you can only see the cached RDD in web UI.

Then I set spark.streaming.unpersist to true, the size of spark.local.dir 
folder and JVM's used heap size are reduced regularly.

[question] In this case, because I didn't change spark.cleaner.ttl, which 
component is doing the cleanup? And what's the difference if I set 
spark.cleaner.ttl to some duration in this case?

[answer] If you set spark.streaming.unpersist to true, old unused rdd will be 
deleted, as you can see in DStream.scala. While spark.cleaner.ttl is 
timer-based spark cleaner, not only clean streaming data, but also broadcast, 
shuffle and other data.

Thank you!



RE: spark.streaming.unpersist and spark.cleaner.ttl

2014-07-23 Thread Haopu Wang
Jerry, thanks for the response.

For the default storage level of DStream, it looks like Spark's document is 
wrong. In this link: 
http://spark.apache.org/docs/latest/streaming-programming-guide.html#memory-tuning
It mentions:
Default persistence level of DStreams: Unlike RDDs, the default persistence 
level of DStreams serializes the data in memory (that is, 
StorageLevel.MEMORY_ONLY_SER for DStream compared to StorageLevel.MEMORY_ONLY 
for RDDs). Even though keeping the data serialized incurs higher 
serialization/deserialization overheads, it significantly reduces GC pauses.

I will take a look at DStream.scala although I have no Scala experience.

-Original Message-
From: Shao, Saisai [mailto:saisai.s...@intel.com] 
Sent: 2014年7月23日 15:13
To: user@spark.apache.org
Subject: RE: spark.streaming.unpersist and spark.cleaner.ttl

Hi Haopu, 

Please see the inline comments.

Thanks
Jerry

-Original Message-
From: Haopu Wang [mailto:hw...@qilinsoft.com] 
Sent: Wednesday, July 23, 2014 3:00 PM
To: user@spark.apache.org
Subject: spark.streaming.unpersist and spark.cleaner.ttl

I have a DStream receiving data from a socket. I'm using local mode.
I set spark.streaming.unpersist to false and leave 
spark.cleaner.ttl to be infinite.
I can see files for input and shuffle blocks under spark.local.dir
folder and the size of folder keeps increasing, although JVM's memory usage 
seems to be stable.

[question] In this case, because input RDDs are persisted but they don't fit 
into memory, so write to disk, right? And where can I see the details about 
these RDDs? I don't see them in web UI.

[answer] Yes, if memory is not enough to put input RDDs, this data will be 
flush to disk, because the default storage level is MEMORY_AND_DISK_SER_2 as 
you can see in StreamingContext.scala. Actually you cannot not see the input 
RDD in web UI, you can only see the cached RDD in web UI.

Then I set spark.streaming.unpersist to true, the size of spark.local.dir 
folder and JVM's used heap size are reduced regularly.

[question] In this case, because I didn't change spark.cleaner.ttl, which 
component is doing the cleanup? And what's the difference if I set 
spark.cleaner.ttl to some duration in this case?

[answer] If you set spark.streaming.unpersist to true, old unused rdd will be 
deleted, as you can see in DStream.scala. While spark.cleaner.ttl is 
timer-based spark cleaner, not only clean streaming data, but also broadcast, 
shuffle and other data.

Thank you!



RE: spark.streaming.unpersist and spark.cleaner.ttl

2014-07-23 Thread Shao, Saisai
Yeah, the document may not be precisely aligned with latest code, so the best 
way is to check the code.

-Original Message-
From: Haopu Wang [mailto:hw...@qilinsoft.com] 
Sent: Wednesday, July 23, 2014 5:56 PM
To: user@spark.apache.org
Subject: RE: spark.streaming.unpersist and spark.cleaner.ttl

Jerry, thanks for the response.

For the default storage level of DStream, it looks like Spark's document is 
wrong. In this link: 
http://spark.apache.org/docs/latest/streaming-programming-guide.html#memory-tuning
It mentions:
Default persistence level of DStreams: Unlike RDDs, the default persistence 
level of DStreams serializes the data in memory (that is, 
StorageLevel.MEMORY_ONLY_SER for DStream compared to StorageLevel.MEMORY_ONLY 
for RDDs). Even though keeping the data serialized incurs higher 
serialization/deserialization overheads, it significantly reduces GC pauses.

I will take a look at DStream.scala although I have no Scala experience.

-Original Message-
From: Shao, Saisai [mailto:saisai.s...@intel.com] 
Sent: 2014年7月23日 15:13
To: user@spark.apache.org
Subject: RE: spark.streaming.unpersist and spark.cleaner.ttl

Hi Haopu, 

Please see the inline comments.

Thanks
Jerry

-Original Message-
From: Haopu Wang [mailto:hw...@qilinsoft.com] 
Sent: Wednesday, July 23, 2014 3:00 PM
To: user@spark.apache.org
Subject: spark.streaming.unpersist and spark.cleaner.ttl

I have a DStream receiving data from a socket. I'm using local mode.
I set spark.streaming.unpersist to false and leave 
spark.cleaner.ttl to be infinite.
I can see files for input and shuffle blocks under spark.local.dir
folder and the size of folder keeps increasing, although JVM's memory usage 
seems to be stable.

[question] In this case, because input RDDs are persisted but they don't fit 
into memory, so write to disk, right? And where can I see the details about 
these RDDs? I don't see them in web UI.

[answer] Yes, if memory is not enough to put input RDDs, this data will be 
flush to disk, because the default storage level is MEMORY_AND_DISK_SER_2 as 
you can see in StreamingContext.scala. Actually you cannot not see the input 
RDD in web UI, you can only see the cached RDD in web UI.

Then I set spark.streaming.unpersist to true, the size of spark.local.dir 
folder and JVM's used heap size are reduced regularly.

[question] In this case, because I didn't change spark.cleaner.ttl, which 
component is doing the cleanup? And what's the difference if I set 
spark.cleaner.ttl to some duration in this case?

[answer] If you set spark.streaming.unpersist to true, old unused rdd will be 
deleted, as you can see in DStream.scala. While spark.cleaner.ttl is 
timer-based spark cleaner, not only clean streaming data, but also broadcast, 
shuffle and other data.

Thank you!



is spark.cleaner.ttl safe?

2014-03-11 Thread Michael Allman

Hello,

I've been trying to run an iterative spark job that spills 1+ GB to disk 
per iteration on a system with limited disk space. I believe there's 
enough space if spark would clean up unused data from previous iterations, 
but as it stands the number of iterations I can run is limited by 
available disk space.


I found a thread on the usage of spark.cleaner.ttl on the old Spark Users 
Google group here:


https://groups.google.com/forum/#!topic/spark-users/9ebKcNCDih4

I think this setting may be what I'm looking for, however the cleaner 
seems to delete data that's still in use. The effect is I get bizarre 
exceptions from Spark complaining about missing broadcast data or 
ArrayIndexOutOfBounds. When is spark.cleaner.ttl safe to use? Is it 
supposed to delete in-use data or is this a bug/shortcoming?


Cheers,

Michael