Re: spark.cleaner.ttl for 1.4.1

2015-11-30 Thread Josh Rosen
AFAIK the ContextCleaner should perform all of the cleaning *as long as
garbage collection is performed frequently enough on the driver*. See
https://issues.apache.org/jira/browse/SPARK-7689 and
https://github.com/apache/spark/pull/6220#issuecomment-102950055 for
discussion of this technicality.

On Mon, Nov 30, 2015 at 8:46 AM Michal Čizmazia  wrote:

> Does *spark.cleaner.ttl *still need to be used for Spark *1.4.1 *long-running
> streaming jobs? Or does *ContextCleaner* alone do all the cleaning?
>


spark.cleaner.ttl for 1.4.1

2015-11-30 Thread Michal Čizmazia
Does *spark.cleaner.ttl *still need to be used for Spark *1.4.1 *long-running
streaming jobs? Or does *ContextCleaner* alone do all the cleaning?


RE: [SparkStreaming 1.3.0] Broadcast failure after setting "spark.cleaner.ttl"

2015-06-09 Thread Shao, Saisai
The shuffle data can be deleted through weak reference mechanism, you could 
check the code of ContextCleaner, also you could trigger a full gc manually 
with JVisualVM or some other tools to see if shuffle files are deleted.

Thanks
Jerry

From: Haopu Wang [mailto:hw...@qilinsoft.com]
Sent: Tuesday, June 9, 2015 5:28 PM
To: Shao, Saisai; user
Subject: RE: [SparkStreaming 1.3.0] Broadcast failure after setting 
"spark.cleaner.ttl"


Jerry, I agree with you.



However, in my case, I kept the monitoring the "blockmanager" folder. I do see 
sometimes the number of files decreased, but the folder's size kept increasing.



And below is a screenshot of the folder. You can see some old files are not 
deleted somehow.



[cid:image001.jpg@01D0A2DB.739904D0]



-Original Message-
From: Shao, Saisai [mailto:saisai.s...@intel.com]
Sent: Tuesday, June 09, 2015 4:33 PM
To: Haopu Wang; user
Subject: RE: [SparkStreaming 1.3.0] Broadcast failure after setting 
"spark.cleaner.ttl"



>From the stack I think this problem may be due to the deletion of broadcast 
>variable, as you set the spark.cleaner.ttl, so after this timeout limit, the 
>old broadcast variable will be deleted,  you will meet this exception when you 
>want to use it again after that time limit.



Basically I think you don't need to use this configuration, Spark Streaming 
will automatically delete the old, unused data, also Spark itself will delete 
this metadata using weak reference. Also this configuration will be deprecated 
in the coming release.





Thanks

Jerry



-Original Message-

From: Haopu Wang [mailto:hw...@qilinsoft.com]

Sent: Tuesday, June 9, 2015 3:30 PM

To: user

Subject: [SparkStreaming 1.3.0] Broadcast failure after setting 
"spark.cleaner.ttl"



When I ran a spark streaming application longer, I noticed the local 
directory's size was kept increasing.



I set "spark.cleaner.ttl" to 1800 seconds in order clean the metadata.



The spark streaming batch duration is 10 seconds and checkpoint duration is 10 
minutes.



The setting took effect but after that, below exception happened.



Do you have any idea about this error? Thank you!







15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0 (TID 
27045, host2): java.io.IOException:

org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of

broadcast_82

at

org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)

at

org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr

oadcast.scala:164)

at

org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro

adcast.scala:64)

at

org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal

a:64)

at

org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc

ala:87)

at

org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)

at

org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute

$3.apply(HashmapEnrichDStream.scala:39)

at

org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute

$3.apply(HashmapEnrichDStream.scala:39)

at

scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter

.scala:202)

at

org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.

scala:56)

at

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6

8)

at

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4

1)

at org.apache.spark.scheduler.Task.run(Task.scala:64)

at

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)

at

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav

a:1145)

at

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja

va:615)

at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.spark.SparkException: Failed to get

broadcast_82_piece0 of broadcast_82

at

org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br

oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBr

RE: [SparkStreaming 1.3.0] Broadcast failure after setting "spark.cleaner.ttl"

2015-06-09 Thread Haopu Wang
Jerry, I agree with you.

 

However, in my case, I kept the monitoring the "blockmanager" folder. I
do see sometimes the number of files decreased, but the folder's size
kept increasing.

 

And below is a screenshot of the folder. You can see some old files are
not deleted somehow.

 

 

 

-Original Message-
From: Shao, Saisai [mailto:saisai.s...@intel.com] 
Sent: Tuesday, June 09, 2015 4:33 PM
To: Haopu Wang; user
Subject: RE: [SparkStreaming 1.3.0] Broadcast failure after setting
"spark.cleaner.ttl"

 

>From the stack I think this problem may be due to the deletion of
broadcast variable, as you set the spark.cleaner.ttl, so after this
timeout limit, the old broadcast variable will be deleted,  you will
meet this exception when you want to use it again after that time limit.

 

Basically I think you don't need to use this configuration, Spark
Streaming will automatically delete the old, unused data, also Spark
itself will delete this metadata using weak reference. Also this
configuration will be deprecated in the coming release.

 

Thanks

Jerry

 

-Original Message-

From: Haopu Wang [mailto:hw...@qilinsoft.com] 

Sent: Tuesday, June 9, 2015 3:30 PM

To: user

Subject: [SparkStreaming 1.3.0] Broadcast failure after setting
"spark.cleaner.ttl"

 

When I ran a spark streaming application longer, I noticed the local
directory's size was kept increasing.

 

I set "spark.cleaner.ttl" to 1800 seconds in order clean the metadata.

 

The spark streaming batch duration is 10 seconds and checkpoint duration
is 10 minutes.

 

The setting took effect but after that, below exception happened.

 

Do you have any idea about this error? Thank you!

 



 

15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0
(TID 27045, host2): java.io.IOException:

org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of

broadcast_82

at

org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)

at

org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr

oadcast.scala:164)

at

org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro

adcast.scala:64)

at

org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal

a:64)

at

org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc

ala:87)

at

org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)

at

org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute

$3.apply(HashmapEnrichDStream.scala:39)

at

org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute

$3.apply(HashmapEnrichDStream.scala:39)

at

scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)

at

org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter

.scala:202)

at

org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.

scala:56)

at

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6

8)

at

org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4

1)

at org.apache.spark.scheduler.Task.run(Task.scala:64)

at

org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)

at

java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav

a:1145)

at

java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja

va:615)

at java.lang.Thread.run(Thread.java:745)

Caused by: org.apache.spark.SparkException: Failed to get

broadcast_82_piece0 of broadcast_82

at

org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br

oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast

.scala:137)

at

org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br

oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast

.scala:137)

at scala.Option.getOrElse(Option.scala:120)

at

org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br

oadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.sc

ala:136)

at

org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apa

RE: [SparkStreaming 1.3.0] Broadcast failure after setting "spark.cleaner.ttl"

2015-06-09 Thread Shao, Saisai
>From the stack I think this problem may be due to the deletion of broadcast 
>variable, as you set the spark.cleaner.ttl, so after this timeout limit, the 
>old broadcast variable will be deleted,  you will meet this exception when you 
>want to use it again after that time limit.

Basically I think you don't need to use this configuration, Spark Streaming 
will automatically delete the old, unused data, also Spark itself will delete 
this metadata using weak reference. Also this configuration will be deprecated 
in the coming release.

Thanks
Jerry

-Original Message-
From: Haopu Wang [mailto:hw...@qilinsoft.com] 
Sent: Tuesday, June 9, 2015 3:30 PM
To: user
Subject: [SparkStreaming 1.3.0] Broadcast failure after setting 
"spark.cleaner.ttl"

When I ran a spark streaming application longer, I noticed the local 
directory's size was kept increasing.

I set "spark.cleaner.ttl" to 1800 seconds in order clean the metadata.

The spark streaming batch duration is 10 seconds and checkpoint duration is 10 
minutes.

The setting took effect but after that, below exception happened.

Do you have any idea about this error? Thank you!



15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0 (TID 
27045, host2): java.io.IOException:
org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of
broadcast_82
at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)
at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr
oadcast.scala:164)
at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro
adcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal
a:64)
at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc
ala:87)
at
org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at
org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute
$3.apply(HashmapEnrichDStream.scala:39)
at
org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute
$3.apply(HashmapEnrichDStream.scala:39)
at
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter
.scala:202)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.
scala:56)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6
8)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4
1)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav
a:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja
va:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Failed to get
broadcast_82_piece0 of broadcast_82
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast
.scala:137)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast
.scala:137)
at scala.Option.getOrElse(Option.scala:120)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.sc
ala:136)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
at scala.collection.immutable.List.foreach(List.scala:318)
at
org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$T
orrentBroadcast$$readBlocks(TorrentBroadcast.scala:119)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$
1.apply(TorrentBroadcast.scala:174)
at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:11

Re: [SparkStreaming 1.3.0] Broadcast failure after setting "spark.cleaner.ttl"

2015-06-09 Thread Benjamin Fradet
Hi,

Are you restarting your Spark streaming context through getOrCreate?
On 9 Jun 2015 09:30, "Haopu Wang"  wrote:

> When I ran a spark streaming application longer, I noticed the local
> directory's size was kept increasing.
>
> I set "spark.cleaner.ttl" to 1800 seconds in order clean the metadata.
>
> The spark streaming batch duration is 10 seconds and checkpoint duration
> is 10 minutes.
>
> The setting took effect but after that, below exception happened.
>
> Do you have any idea about this error? Thank you!
>
> 
>
> 15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0
> (TID 27045, host2): java.io.IOException:
> org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of
> broadcast_82
> at
> org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)
> at
> org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr
> oadcast.scala:164)
> at
> org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro
> adcast.scala:64)
> at
> org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal
> a:64)
> at
> org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc
> ala:87)
> at
> org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
> at
> org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute
> $3.apply(HashmapEnrichDStream.scala:39)
> at
> org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute
> $3.apply(HashmapEnrichDStream.scala:39)
> at
> scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
> at
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at
> scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
> at
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at
> scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
> at
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at
> scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> at
> org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter
> .scala:202)
> at
> org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.
> scala:56)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6
> 8)
> at
> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4
> 1)
> at org.apache.spark.scheduler.Task.run(Task.scala:64)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav
> a:1145)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja
> va:615)
> at java.lang.Thread.run(Thread.java:745)
> Caused by: org.apache.spark.SparkException: Failed to get
> broadcast_82_piece0 of broadcast_82
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
> oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast
> .scala:137)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
> oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast
> .scala:137)
> at scala.Option.getOrElse(Option.scala:120)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
> oadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.sc
> ala:136)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
> oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
> oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at
> org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$T
> orrentBroadcast$$readBlocks(TorrentBroadcast.scala:119)
> at
> org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$
> 1.apply(TorrentBroadcast.scala:174)
> at
> org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152)
> ... 25 more
>
> 15/06/09 12:57:30 ERROR TaskSetManager: Task 2 in stage 5038.0 failed 4
> times; aborting job
> 15/06/09 12:57:30 ERROR JobScheduler: Error running job streaming job
> 143382585 ms.0
>
>
>
>
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


[SparkStreaming 1.3.0] Broadcast failure after setting "spark.cleaner.ttl"

2015-06-09 Thread Haopu Wang
When I ran a spark streaming application longer, I noticed the local
directory's size was kept increasing.

I set "spark.cleaner.ttl" to 1800 seconds in order clean the metadata.

The spark streaming batch duration is 10 seconds and checkpoint duration
is 10 minutes.

The setting took effect but after that, below exception happened.

Do you have any idea about this error? Thank you!



15/06/09 12:57:30 WARN TaskSetManager: Lost task 3.0 in stage 5038.0
(TID 27045, host2): java.io.IOException:
org.apache.spark.SparkException: Failed to get broadcast_82_piece0 of
broadcast_82
at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1155)
at
org.apache.spark.broadcast.TorrentBroadcast.readBroadcastBlock(TorrentBr
oadcast.scala:164)
at
org.apache.spark.broadcast.TorrentBroadcast._value$lzycompute(TorrentBro
adcast.scala:64)
at
org.apache.spark.broadcast.TorrentBroadcast._value(TorrentBroadcast.scal
a:64)
at
org.apache.spark.broadcast.TorrentBroadcast.getValue(TorrentBroadcast.sc
ala:87)
at
org.apache.spark.broadcast.Broadcast.value(Broadcast.scala:70)
at
org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute
$3.apply(HashmapEnrichDStream.scala:39)
at
org.apache.spark.streaming.dstream.HashmapEnrichDStream$$anonfun$compute
$3.apply(HashmapEnrichDStream.scala:39)
at
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:390)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter
.scala:202)
at
org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.
scala:56)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:6
8)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:4
1)
at org.apache.spark.scheduler.Task.run(Task.scala:64)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.jav
a:1145)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.ja
va:615)
at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.spark.SparkException: Failed to get
broadcast_82_piece0 of broadcast_82
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast
.scala:137)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1$$anonfun$2.apply(TorrentBroadcast
.scala:137)
at scala.Option.getOrElse(Option.scala:120)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1.apply$mcVI$sp(TorrentBroadcast.sc
ala:136)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$org$apache$spark$br
oadcast$TorrentBroadcast$$readBlocks$1.apply(TorrentBroadcast.scala:119)
at scala.collection.immutable.List.foreach(List.scala:318)
at
org.apache.spark.broadcast.TorrentBroadcast.org$apache$spark$broadcast$T
orrentBroadcast$$readBlocks(TorrentBroadcast.scala:119)
at
org.apache.spark.broadcast.TorrentBroadcast$$anonfun$readBroadcastBlock$
1.apply(TorrentBroadcast.scala:174)
at
org.apache.spark.util.Utils$.tryOrIOException(Utils.scala:1152)
... 25 more

15/06/09 12:57:30 ERROR TaskSetManager: Task 2 in stage 5038.0 failed 4
times; aborting job
15/06/09 12:57:30 ERROR JobScheduler: Error running job streaming job
143382585 ms.0





-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



spark.cleaner.ttl

2014-10-01 Thread SK
Hi,

I am using spark v 1.1.0. The default value of spark.cleaner.ttl is infinite
as per the online docs. Since a lot of  shuffle files are generated in
/tmp/spark-local* and the disk is running out of space, we tested with a
smaller value of ttl. However, even when job has completed and the timer 
expires, the files remain and instead of deleting, the timestamps  of the
files keep changing. How can we automatically delete these shuffle files,
say after every 24 hours? 

thanks



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/spark-cleaner-ttl-tp15574.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark.cleaner.ttl and spark.streaming.unpersist

2014-09-10 Thread Tim Smith
I switched from Yarn to StandAlone mode and haven't had OOM issue yet.
However, now I have Akka issues killing the executor:

2014-09-11 02:43:34,543 INFO akka.actor.LocalActorRef: Message
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying]
from Actor[akka://sparkWorker/deadLetters] to
Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.2.16.8%3A44405-6#1549270895]
was not delivered. [2] dead letters encountered. This logging can be
turned off or adjusted with configuration settings
'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Before I switched from Yarn to Standalone, I tried looking at heaps of
running executors. What I found odd was that while both - jmap
histo:live and jmap histo showed heap usage in few hundreds of MBytes,
Yarn kept showing that memory utilization is in several Gigabytes -
eventually leading to the container being killed.

I would appreciate if someone can duplicate what I am seeing. Basically:
1. Tail your yarn container logs and see what it is reporting as
memory used by the JVM
2. In parallel, run "jmap -histo:live " or "jmap histo " on
the executor process.

They should be about the same, right?

Also, in the heap dump, 99% of the heap seems to be occupied with
"unreachable objects" (and most of it is byte arrays).




On Wed, Sep 10, 2014 at 12:06 PM, Tim Smith  wrote:
> Actually, I am not doing any explicit shuffle/updateByKey or other
> transform functions. In my program flow, I take in data from Kafka,
> match each message against a list of regex and then if a msg matches a
> regex then extract groups, stuff them in json and push out back to
> kafka (different topic). So there is really no dependency between two
> messages in terms of processing. Here's my container histogram:
> http://pastebin.com/s3nAT3cY
>
> Essentially, my app is a cluster grep on steroids.
>
>
>
> On Wed, Sep 10, 2014 at 11:34 AM, Yana Kadiyska  
> wrote:
>> Tim, I asked a similar question twice:
>> here
>> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-executors-to-stay-alive-tt12940.html
>> and here
>> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Executor-OOM-tt12383.html
>>
>> and have not yet received any responses. I noticed that the heapdump only
>> contains a very large byte array consuming about 66%(the second link
>> contains a picture of my heap -- I ran with a small heap to be able to get
>> the failure quickly)
>>
>> I don't have solutions but wanted to affirm that I've observed a similar
>> situation...
>>
>> On Wed, Sep 10, 2014 at 2:24 PM, Tim Smith  wrote:
>>>
>>> I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case,
>>> the receivers die within an hour because Yarn kills the containers for high
>>> memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I
>>> don't think stale RDDs are an issue here. I did a "jmap -histo" on a couple
>>> of running receiver processes and in a heap of 30G, roughly ~16G is taken by
>>> "[B" which is byte arrays.
>>>
>>> Still investigating more and would appreciate pointers for
>>> troubleshooting. I have dumped the heap of a receiver and will try to go
>>> over it.
>>>
>>>
>>>
>>>
>>> On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez
>>>  wrote:
>>>>
>>>> I somehow missed that parameter when I was reviewing the documentation,
>>>> that should do the trick! Thank you!
>>>>
>>>> 2014-09-10 2:10 GMT+01:00 Shao, Saisai :
>>>>
>>>>> Hi Luis,
>>>>>
>>>>>
>>>>>
>>>>> The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be
>>>>> used to remove useless timeout streaming data, the difference is that
>>>>> “spark.cleaner.ttl” is time-based cleaner, it does not only clean 
>>>>> streaming
>>>>> input data, but also Spark’s useless metadata; while
>>>>> “spark.streaming.unpersist” is reference-based cleaning mechanism, 
>>>>> streaming
>>>>> data will be removed when out of slide duration.
>>>>>
>>>>>
>>>>>
>>>>> Both these two parameter can alleviate the memory occupation of Spark
>>>>> Streaming. But if the data is flooded into Spark Streaming when start up
>>>>> like your situation using Kafka, these two parameters cannot well mitigate
>>>

Re: spark.cleaner.ttl and spark.streaming.unpersist

2014-09-10 Thread Tim Smith
Actually, I am not doing any explicit shuffle/updateByKey or other
transform functions. In my program flow, I take in data from Kafka,
match each message against a list of regex and then if a msg matches a
regex then extract groups, stuff them in json and push out back to
kafka (different topic). So there is really no dependency between two
messages in terms of processing. Here's my container histogram:
http://pastebin.com/s3nAT3cY

Essentially, my app is a cluster grep on steroids.



On Wed, Sep 10, 2014 at 11:34 AM, Yana Kadiyska  wrote:
> Tim, I asked a similar question twice:
> here
> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-executors-to-stay-alive-tt12940.html
> and here
> http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Executor-OOM-tt12383.html
>
> and have not yet received any responses. I noticed that the heapdump only
> contains a very large byte array consuming about 66%(the second link
> contains a picture of my heap -- I ran with a small heap to be able to get
> the failure quickly)
>
> I don't have solutions but wanted to affirm that I've observed a similar
> situation...
>
> On Wed, Sep 10, 2014 at 2:24 PM, Tim Smith  wrote:
>>
>> I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case,
>> the receivers die within an hour because Yarn kills the containers for high
>> memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I
>> don't think stale RDDs are an issue here. I did a "jmap -histo" on a couple
>> of running receiver processes and in a heap of 30G, roughly ~16G is taken by
>> "[B" which is byte arrays.
>>
>> Still investigating more and would appreciate pointers for
>> troubleshooting. I have dumped the heap of a receiver and will try to go
>> over it.
>>
>>
>>
>>
>> On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez
>>  wrote:
>>>
>>> I somehow missed that parameter when I was reviewing the documentation,
>>> that should do the trick! Thank you!
>>>
>>> 2014-09-10 2:10 GMT+01:00 Shao, Saisai :
>>>
>>>> Hi Luis,
>>>>
>>>>
>>>>
>>>> The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be
>>>> used to remove useless timeout streaming data, the difference is that
>>>> “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming
>>>> input data, but also Spark’s useless metadata; while
>>>> “spark.streaming.unpersist” is reference-based cleaning mechanism, 
>>>> streaming
>>>> data will be removed when out of slide duration.
>>>>
>>>>
>>>>
>>>> Both these two parameter can alleviate the memory occupation of Spark
>>>> Streaming. But if the data is flooded into Spark Streaming when start up
>>>> like your situation using Kafka, these two parameters cannot well mitigate
>>>> the problem. Actually you need to control the input data rate to not inject
>>>> so fast, you can try “spark.straming.receiver.maxRate” to control the 
>>>> inject
>>>> rate.
>>>>
>>>>
>>>>
>>>> Thanks
>>>>
>>>> Jerry
>>>>
>>>>
>>>>
>>>> From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
>>>> Sent: Wednesday, September 10, 2014 5:21 AM
>>>> To: user@spark.apache.org
>>>> Subject: spark.cleaner.ttl and spark.streaming.unpersist
>>>>
>>>>
>>>>
>>>> The executors of my spark streaming application are being killed due to
>>>> memory issues. The memory consumption is quite high on startup because is
>>>> the first run and there are quite a few events on the kafka queues that are
>>>> consumed at a rate of 100K events per sec.
>>>>
>>>> I wonder if it's recommended to use spark.cleaner.ttl and
>>>> spark.streaming.unpersist together to mitigate that problem. And I also
>>>> wonder if new RDD are being batched while a RDD is being processed.
>>>>
>>>> Regards,
>>>>
>>>> Luis
>>>
>>>
>>
>

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: spark.cleaner.ttl and spark.streaming.unpersist

2014-09-10 Thread Yana Kadiyska
Tim, I asked a similar question twice:
here
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-executors-to-stay-alive-tt12940.html
and here
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Executor-OOM-tt12383.html

and have not yet received any responses. I noticed that the heapdump only
contains a very large byte array consuming about 66%(the second link
contains a picture of my heap -- I ran with a small heap to be able to get
the failure quickly)

I don't have solutions but wanted to affirm that I've observed a similar
situation...

On Wed, Sep 10, 2014 at 2:24 PM, Tim Smith  wrote:

> I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case,
> the receivers die within an hour because Yarn kills the containers for high
> memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I
> don't think stale RDDs are an issue here. I did a "jmap -histo" on a couple
> of running receiver processes and in a heap of 30G, roughly ~16G is taken
> by "[B" which is byte arrays.
>
> Still investigating more and would appreciate pointers for
> troubleshooting. I have dumped the heap of a receiver and will try to go
> over it.
>
>
>
>
> On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez <
> langel.gro...@gmail.com> wrote:
>
>> I somehow missed that parameter when I was reviewing the documentation,
>> that should do the trick! Thank you!
>>
>> 2014-09-10 2:10 GMT+01:00 Shao, Saisai :
>>
>>  Hi Luis,
>>>
>>>
>>>
>>> The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be
>>> used to remove useless timeout streaming data, the difference is that
>>> “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming
>>> input data, but also Spark’s useless metadata; while
>>> “spark.streaming.unpersist” is reference-based cleaning mechanism,
>>> streaming data will be removed when out of slide duration.
>>>
>>>
>>>
>>> Both these two parameter can alleviate the memory occupation of Spark
>>> Streaming. But if the data is flooded into Spark Streaming when start up
>>> like your situation using Kafka, these two parameters cannot well mitigate
>>> the problem. Actually you need to control the input data rate to not inject
>>> so fast, you can try “spark.straming.receiver.maxRate” to control the
>>> inject rate.
>>>
>>>
>>>
>>> Thanks
>>>
>>> Jerry
>>>
>>>
>>>
>>> *From:* Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
>>> *Sent:* Wednesday, September 10, 2014 5:21 AM
>>> *To:* user@spark.apache.org
>>> *Subject:* spark.cleaner.ttl and spark.streaming.unpersist
>>>
>>>
>>>
>>> The executors of my spark streaming application are being killed due to
>>> memory issues. The memory consumption is quite high on startup because is
>>> the first run and there are quite a few events on the kafka queues that are
>>> consumed at a rate of 100K events per sec.
>>>
>>> I wonder if it's recommended to use spark.cleaner.ttl and
>>> spark.streaming.unpersist together to mitigate that problem. And I also
>>> wonder if new RDD are being batched while a RDD is being processed.
>>>
>>> Regards,
>>>
>>> Luis
>>>
>>
>>
>


Re: spark.cleaner.ttl and spark.streaming.unpersist

2014-09-10 Thread Tim Smith
I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case,
the receivers die within an hour because Yarn kills the containers for high
memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I
don't think stale RDDs are an issue here. I did a "jmap -histo" on a couple
of running receiver processes and in a heap of 30G, roughly ~16G is taken
by "[B" which is byte arrays.

Still investigating more and would appreciate pointers for troubleshooting.
I have dumped the heap of a receiver and will try to go over it.




On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez <
langel.gro...@gmail.com> wrote:

> I somehow missed that parameter when I was reviewing the documentation,
> that should do the trick! Thank you!
>
> 2014-09-10 2:10 GMT+01:00 Shao, Saisai :
>
>  Hi Luis,
>>
>>
>>
>> The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be
>> used to remove useless timeout streaming data, the difference is that
>> “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming
>> input data, but also Spark’s useless metadata; while
>> “spark.streaming.unpersist” is reference-based cleaning mechanism,
>> streaming data will be removed when out of slide duration.
>>
>>
>>
>> Both these two parameter can alleviate the memory occupation of Spark
>> Streaming. But if the data is flooded into Spark Streaming when start up
>> like your situation using Kafka, these two parameters cannot well mitigate
>> the problem. Actually you need to control the input data rate to not inject
>> so fast, you can try “spark.straming.receiver.maxRate” to control the
>> inject rate.
>>
>>
>>
>> Thanks
>>
>> Jerry
>>
>>
>>
>> *From:* Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
>> *Sent:* Wednesday, September 10, 2014 5:21 AM
>> *To:* user@spark.apache.org
>> *Subject:* spark.cleaner.ttl and spark.streaming.unpersist
>>
>>
>>
>> The executors of my spark streaming application are being killed due to
>> memory issues. The memory consumption is quite high on startup because is
>> the first run and there are quite a few events on the kafka queues that are
>> consumed at a rate of 100K events per sec.
>>
>> I wonder if it's recommended to use spark.cleaner.ttl and
>> spark.streaming.unpersist together to mitigate that problem. And I also
>> wonder if new RDD are being batched while a RDD is being processed.
>>
>> Regards,
>>
>> Luis
>>
>
>


Re: spark.cleaner.ttl and spark.streaming.unpersist

2014-09-10 Thread Luis Ángel Vicente Sánchez
I somehow missed that parameter when I was reviewing the documentation,
that should do the trick! Thank you!

2014-09-10 2:10 GMT+01:00 Shao, Saisai :

>  Hi Luis,
>
>
>
> The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be
> used to remove useless timeout streaming data, the difference is that
> “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming
> input data, but also Spark’s useless metadata; while
> “spark.streaming.unpersist” is reference-based cleaning mechanism,
> streaming data will be removed when out of slide duration.
>
>
>
> Both these two parameter can alleviate the memory occupation of Spark
> Streaming. But if the data is flooded into Spark Streaming when start up
> like your situation using Kafka, these two parameters cannot well mitigate
> the problem. Actually you need to control the input data rate to not inject
> so fast, you can try “spark.straming.receiver.maxRate” to control the
> inject rate.
>
>
>
> Thanks
>
> Jerry
>
>
>
> *From:* Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
> *Sent:* Wednesday, September 10, 2014 5:21 AM
> *To:* user@spark.apache.org
> *Subject:* spark.cleaner.ttl and spark.streaming.unpersist
>
>
>
> The executors of my spark streaming application are being killed due to
> memory issues. The memory consumption is quite high on startup because is
> the first run and there are quite a few events on the kafka queues that are
> consumed at a rate of 100K events per sec.
>
> I wonder if it's recommended to use spark.cleaner.ttl and
> spark.streaming.unpersist together to mitigate that problem. And I also
> wonder if new RDD are being batched while a RDD is being processed.
>
> Regards,
>
> Luis
>


RE: spark.cleaner.ttl and spark.streaming.unpersist

2014-09-09 Thread Shao, Saisai
Hi Luis,

The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be used 
to remove useless timeout streaming data, the difference is that 
“spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming 
input data, but also Spark’s useless metadata; while 
“spark.streaming.unpersist” is reference-based cleaning mechanism, streaming 
data will be removed when out of slide duration.

Both these two parameter can alleviate the memory occupation of Spark 
Streaming. But if the data is flooded into Spark Streaming when start up like 
your situation using Kafka, these two parameters cannot well mitigate the 
problem. Actually you need to control the input data rate to not inject so 
fast, you can try “spark.straming.receiver.maxRate” to control the inject rate.

Thanks
Jerry

From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
Sent: Wednesday, September 10, 2014 5:21 AM
To: user@spark.apache.org
Subject: spark.cleaner.ttl and spark.streaming.unpersist

The executors of my spark streaming application are being killed due to memory 
issues. The memory consumption is quite high on startup because is the first 
run and there are quite a few events on the kafka queues that are consumed at a 
rate of 100K events per sec.

I wonder if it's recommended to use spark.cleaner.ttl and 
spark.streaming.unpersist together to mitigate that problem. And I also wonder 
if new RDD are being batched while a RDD is being processed.
Regards,

Luis


spark.cleaner.ttl and spark.streaming.unpersist

2014-09-09 Thread Luis Ángel Vicente Sánchez
The executors of my spark streaming application are being killed due to
memory issues. The memory consumption is quite high on startup because is
the first run and there are quite a few events on the kafka queues that are
consumed at a rate of 100K events per sec.

I wonder if it's recommended to use spark.cleaner.ttl and
spark.streaming.unpersist together to mitigate that problem. And I also
wonder if new RDD are being batched while a RDD is being processed.

Regards,

Luis


Re: "spark.streaming.unpersist" and "spark.cleaner.ttl"

2014-07-26 Thread Tathagata Das
Yeah, I wrote those lines a while back, I wanted to contrast storage
levels with and without serialization. Should have realized that
StorageLevel.MEMORY_ONLY_SER can be confused to be the default level.

TD

On Wed, Jul 23, 2014 at 5:12 AM, Shao, Saisai  wrote:
> Yeah, the document may not be precisely aligned with latest code, so the best 
> way is to check the code.
>
> -Original Message-
> From: Haopu Wang [mailto:hw...@qilinsoft.com]
> Sent: Wednesday, July 23, 2014 5:56 PM
> To: user@spark.apache.org
> Subject: RE: "spark.streaming.unpersist" and "spark.cleaner.ttl"
>
> Jerry, thanks for the response.
>
> For the default storage level of DStream, it looks like Spark's document is 
> wrong. In this link: 
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#memory-tuning
> It mentions:
> "Default persistence level of DStreams: Unlike RDDs, the default persistence 
> level of DStreams serializes the data in memory (that is, 
> StorageLevel.MEMORY_ONLY_SER for DStream compared to StorageLevel.MEMORY_ONLY 
> for RDDs). Even though keeping the data serialized incurs higher 
> serialization/deserialization overheads, it significantly reduces GC pauses."
>
> I will take a look at DStream.scala although I have no Scala experience.
>
> -Original Message-
> From: Shao, Saisai [mailto:saisai.s...@intel.com]
> Sent: 2014年7月23日 15:13
> To: user@spark.apache.org
> Subject: RE: "spark.streaming.unpersist" and "spark.cleaner.ttl"
>
> Hi Haopu,
>
> Please see the inline comments.
>
> Thanks
> Jerry
>
> -Original Message-
> From: Haopu Wang [mailto:hw...@qilinsoft.com]
> Sent: Wednesday, July 23, 2014 3:00 PM
> To: user@spark.apache.org
> Subject: "spark.streaming.unpersist" and "spark.cleaner.ttl"
>
> I have a DStream receiving data from a socket. I'm using local mode.
> I set "spark.streaming.unpersist" to "false" and leave "
> spark.cleaner.ttl" to be infinite.
> I can see files for input and shuffle blocks under "spark.local.dir"
> folder and the size of folder keeps increasing, although JVM's memory usage 
> seems to be stable.
>
> [question] In this case, because input RDDs are persisted but they don't fit 
> into memory, so write to disk, right? And where can I see the details about 
> these RDDs? I don't see them in web UI.
>
> [answer] Yes, if memory is not enough to put input RDDs, this data will be 
> flush to disk, because the default storage level is "MEMORY_AND_DISK_SER_2" 
> as you can see in StreamingContext.scala. Actually you cannot not see the 
> input RDD in web UI, you can only see the cached RDD in web UI.
>
> Then I set "spark.streaming.unpersist" to "true", the size of 
> "spark.local.dir" folder and JVM's used heap size are reduced regularly.
>
> [question] In this case, because I didn't change "spark.cleaner.ttl", which 
> component is doing the cleanup? And what's the difference if I set 
> "spark.cleaner.ttl" to some duration in this case?
>
> [answer] If you set "spark.streaming.unpersist" to true, old unused rdd will 
> be deleted, as you can see in DStream.scala. While "spark.cleaner.ttl" is 
> timer-based spark cleaner, not only clean streaming data, but also broadcast, 
> shuffle and other data.
>
> Thank you!
>


RE: "spark.streaming.unpersist" and "spark.cleaner.ttl"

2014-07-23 Thread Shao, Saisai
Yeah, the document may not be precisely aligned with latest code, so the best 
way is to check the code.

-Original Message-
From: Haopu Wang [mailto:hw...@qilinsoft.com] 
Sent: Wednesday, July 23, 2014 5:56 PM
To: user@spark.apache.org
Subject: RE: "spark.streaming.unpersist" and "spark.cleaner.ttl"

Jerry, thanks for the response.

For the default storage level of DStream, it looks like Spark's document is 
wrong. In this link: 
http://spark.apache.org/docs/latest/streaming-programming-guide.html#memory-tuning
It mentions:
"Default persistence level of DStreams: Unlike RDDs, the default persistence 
level of DStreams serializes the data in memory (that is, 
StorageLevel.MEMORY_ONLY_SER for DStream compared to StorageLevel.MEMORY_ONLY 
for RDDs). Even though keeping the data serialized incurs higher 
serialization/deserialization overheads, it significantly reduces GC pauses."

I will take a look at DStream.scala although I have no Scala experience.

-Original Message-
From: Shao, Saisai [mailto:saisai.s...@intel.com] 
Sent: 2014年7月23日 15:13
To: user@spark.apache.org
Subject: RE: "spark.streaming.unpersist" and "spark.cleaner.ttl"

Hi Haopu, 

Please see the inline comments.

Thanks
Jerry

-Original Message-
From: Haopu Wang [mailto:hw...@qilinsoft.com] 
Sent: Wednesday, July 23, 2014 3:00 PM
To: user@spark.apache.org
Subject: "spark.streaming.unpersist" and "spark.cleaner.ttl"

I have a DStream receiving data from a socket. I'm using local mode.
I set "spark.streaming.unpersist" to "false" and leave "
spark.cleaner.ttl" to be infinite.
I can see files for input and shuffle blocks under "spark.local.dir"
folder and the size of folder keeps increasing, although JVM's memory usage 
seems to be stable.

[question] In this case, because input RDDs are persisted but they don't fit 
into memory, so write to disk, right? And where can I see the details about 
these RDDs? I don't see them in web UI.

[answer] Yes, if memory is not enough to put input RDDs, this data will be 
flush to disk, because the default storage level is "MEMORY_AND_DISK_SER_2" as 
you can see in StreamingContext.scala. Actually you cannot not see the input 
RDD in web UI, you can only see the cached RDD in web UI.

Then I set "spark.streaming.unpersist" to "true", the size of "spark.local.dir" 
folder and JVM's used heap size are reduced regularly.

[question] In this case, because I didn't change "spark.cleaner.ttl", which 
component is doing the cleanup? And what's the difference if I set 
"spark.cleaner.ttl" to some duration in this case?

[answer] If you set "spark.streaming.unpersist" to true, old unused rdd will be 
deleted, as you can see in DStream.scala. While "spark.cleaner.ttl" is 
timer-based spark cleaner, not only clean streaming data, but also broadcast, 
shuffle and other data.

Thank you!



RE: "spark.streaming.unpersist" and "spark.cleaner.ttl"

2014-07-23 Thread Haopu Wang
Jerry, thanks for the response.

For the default storage level of DStream, it looks like Spark's document is 
wrong. In this link: 
http://spark.apache.org/docs/latest/streaming-programming-guide.html#memory-tuning
It mentions:
"Default persistence level of DStreams: Unlike RDDs, the default persistence 
level of DStreams serializes the data in memory (that is, 
StorageLevel.MEMORY_ONLY_SER for DStream compared to StorageLevel.MEMORY_ONLY 
for RDDs). Even though keeping the data serialized incurs higher 
serialization/deserialization overheads, it significantly reduces GC pauses."

I will take a look at DStream.scala although I have no Scala experience.

-Original Message-
From: Shao, Saisai [mailto:saisai.s...@intel.com] 
Sent: 2014年7月23日 15:13
To: user@spark.apache.org
Subject: RE: "spark.streaming.unpersist" and "spark.cleaner.ttl"

Hi Haopu, 

Please see the inline comments.

Thanks
Jerry

-Original Message-
From: Haopu Wang [mailto:hw...@qilinsoft.com] 
Sent: Wednesday, July 23, 2014 3:00 PM
To: user@spark.apache.org
Subject: "spark.streaming.unpersist" and "spark.cleaner.ttl"

I have a DStream receiving data from a socket. I'm using local mode.
I set "spark.streaming.unpersist" to "false" and leave "
spark.cleaner.ttl" to be infinite.
I can see files for input and shuffle blocks under "spark.local.dir"
folder and the size of folder keeps increasing, although JVM's memory usage 
seems to be stable.

[question] In this case, because input RDDs are persisted but they don't fit 
into memory, so write to disk, right? And where can I see the details about 
these RDDs? I don't see them in web UI.

[answer] Yes, if memory is not enough to put input RDDs, this data will be 
flush to disk, because the default storage level is "MEMORY_AND_DISK_SER_2" as 
you can see in StreamingContext.scala. Actually you cannot not see the input 
RDD in web UI, you can only see the cached RDD in web UI.

Then I set "spark.streaming.unpersist" to "true", the size of "spark.local.dir" 
folder and JVM's used heap size are reduced regularly.

[question] In this case, because I didn't change "spark.cleaner.ttl", which 
component is doing the cleanup? And what's the difference if I set 
"spark.cleaner.ttl" to some duration in this case?

[answer] If you set "spark.streaming.unpersist" to true, old unused rdd will be 
deleted, as you can see in DStream.scala. While "spark.cleaner.ttl" is 
timer-based spark cleaner, not only clean streaming data, but also broadcast, 
shuffle and other data.

Thank you!



RE: "spark.streaming.unpersist" and "spark.cleaner.ttl"

2014-07-23 Thread Shao, Saisai
Hi Haopu, 

Please see the inline comments.

Thanks
Jerry

-Original Message-
From: Haopu Wang [mailto:hw...@qilinsoft.com] 
Sent: Wednesday, July 23, 2014 3:00 PM
To: user@spark.apache.org
Subject: "spark.streaming.unpersist" and "spark.cleaner.ttl"

I have a DStream receiving data from a socket. I'm using local mode.
I set "spark.streaming.unpersist" to "false" and leave "
spark.cleaner.ttl" to be infinite.
I can see files for input and shuffle blocks under "spark.local.dir"
folder and the size of folder keeps increasing, although JVM's memory usage 
seems to be stable.

[question] In this case, because input RDDs are persisted but they don't fit 
into memory, so write to disk, right? And where can I see the details about 
these RDDs? I don't see them in web UI.

[answer] Yes, if memory is not enough to put input RDDs, this data will be 
flush to disk, because the default storage level is "MEMORY_AND_DISK_SER_2" as 
you can see in StreamingContext.scala. Actually you cannot not see the input 
RDD in web UI, you can only see the cached RDD in web UI.

Then I set "spark.streaming.unpersist" to "true", the size of "spark.local.dir" 
folder and JVM's used heap size are reduced regularly.

[question] In this case, because I didn't change "spark.cleaner.ttl", which 
component is doing the cleanup? And what's the difference if I set 
"spark.cleaner.ttl" to some duration in this case?

[answer] If you set "spark.streaming.unpersist" to true, old unused rdd will be 
deleted, as you can see in DStream.scala. While "spark.cleaner.ttl" is 
timer-based spark cleaner, not only clean streaming data, but also broadcast, 
shuffle and other data.

Thank you!



"spark.streaming.unpersist" and "spark.cleaner.ttl"

2014-07-23 Thread Haopu Wang
I have a DStream receiving data from a socket. I'm using local mode.
I set "spark.streaming.unpersist" to "false" and leave "
spark.cleaner.ttl" to be infinite.
I can see files for input and shuffle blocks under "spark.local.dir"
folder and the size of folder keeps increasing, although JVM's memory
usage seems to be stable.

[question] In this case, because input RDDs are persisted but they don't
fit into memory, so write to disk, right? And where can I see the
details about these RDDs? I don't see them in web UI.

Then I set "spark.streaming.unpersist" to "true", the size of
"spark.local.dir" folder and JVM's used heap size are reduced regularly.

[question] In this case, because I didn't change "spark.cleaner.ttl",
which component is doing the cleanup? And what's the difference if I set
"spark.cleaner.ttl" to some duration in this case?

Thank you!



Re: is spark.cleaner.ttl safe?

2014-03-11 Thread Sourav Chandra
Yes, we are also facing same problem. The workaround we came up with is

 - store the broadcast variable id when it was first created
 - then create a scheduled job which runs every (spark.cleaner.ttl -
1minute) interval and creates the same broadcast variable using same id.
This way spark is happy finding same broadcast file (broadcast_)
val httpBroadcastFactory = new HttpBroadcastFactory()
httpBroadcastFactory.newBroadcast(bcastVariable.value, false, id)




On Wed, Mar 12, 2014 at 2:38 AM, Aaron Davidson  wrote:

> And to answer your original question, spark.cleaner.ttl is not safe for
> the exact reason you brought up. The PR Mark linked intends to provide a
> much cleaner (and safer) solution.
>
>
> On Tue, Mar 11, 2014 at 2:01 PM, Mark Hamstra wrote:
>
>> Actually, TD's work-in-progress is probably more what you want:
>> https://github.com/apache/spark/pull/126
>>
>>
>> On Tue, Mar 11, 2014 at 1:58 PM, Michael Allman  wrote:
>>
>>> Hello,
>>>
>>> I've been trying to run an iterative spark job that spills 1+ GB to disk
>>> per iteration on a system with limited disk space. I believe there's enough
>>> space if spark would clean up unused data from previous iterations, but as
>>> it stands the number of iterations I can run is limited by available disk
>>> space.
>>>
>>> I found a thread on the usage of spark.cleaner.ttl on the old Spark
>>> Users Google group here:
>>>
>>> https://groups.google.com/forum/#!topic/spark-users/9ebKcNCDih4
>>>
>>> I think this setting may be what I'm looking for, however the cleaner
>>> seems to delete data that's still in use. The effect is I get bizarre
>>> exceptions from Spark complaining about missing broadcast data or
>>> ArrayIndexOutOfBounds. When is spark.cleaner.ttl safe to use? Is it
>>> supposed to delete in-use data or is this a bug/shortcoming?
>>>
>>> Cheers,
>>>
>>> Michael
>>>
>>>
>>>
>>
>


-- 

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

sourav.chan...@livestream.com

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd
Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Re: is spark.cleaner.ttl safe?

2014-03-11 Thread Aaron Davidson
And to answer your original question, spark.cleaner.ttl is not safe for the
exact reason you brought up. The PR Mark linked intends to provide a much
cleaner (and safer) solution.


On Tue, Mar 11, 2014 at 2:01 PM, Mark Hamstra wrote:

> Actually, TD's work-in-progress is probably more what you want:
> https://github.com/apache/spark/pull/126
>
>
> On Tue, Mar 11, 2014 at 1:58 PM, Michael Allman  wrote:
>
>> Hello,
>>
>> I've been trying to run an iterative spark job that spills 1+ GB to disk
>> per iteration on a system with limited disk space. I believe there's enough
>> space if spark would clean up unused data from previous iterations, but as
>> it stands the number of iterations I can run is limited by available disk
>> space.
>>
>> I found a thread on the usage of spark.cleaner.ttl on the old Spark Users
>> Google group here:
>>
>> https://groups.google.com/forum/#!topic/spark-users/9ebKcNCDih4
>>
>> I think this setting may be what I'm looking for, however the cleaner
>> seems to delete data that's still in use. The effect is I get bizarre
>> exceptions from Spark complaining about missing broadcast data or
>> ArrayIndexOutOfBounds. When is spark.cleaner.ttl safe to use? Is it
>> supposed to delete in-use data or is this a bug/shortcoming?
>>
>> Cheers,
>>
>> Michael
>>
>>
>>
>


Re: is spark.cleaner.ttl safe?

2014-03-11 Thread Mark Hamstra
Actually, TD's work-in-progress is probably more what you want:
https://github.com/apache/spark/pull/126


On Tue, Mar 11, 2014 at 1:58 PM, Michael Allman  wrote:

> Hello,
>
> I've been trying to run an iterative spark job that spills 1+ GB to disk
> per iteration on a system with limited disk space. I believe there's enough
> space if spark would clean up unused data from previous iterations, but as
> it stands the number of iterations I can run is limited by available disk
> space.
>
> I found a thread on the usage of spark.cleaner.ttl on the old Spark Users
> Google group here:
>
> https://groups.google.com/forum/#!topic/spark-users/9ebKcNCDih4
>
> I think this setting may be what I'm looking for, however the cleaner
> seems to delete data that's still in use. The effect is I get bizarre
> exceptions from Spark complaining about missing broadcast data or
> ArrayIndexOutOfBounds. When is spark.cleaner.ttl safe to use? Is it
> supposed to delete in-use data or is this a bug/shortcoming?
>
> Cheers,
>
> Michael
>
>
>


is spark.cleaner.ttl safe?

2014-03-11 Thread Michael Allman

Hello,

I've been trying to run an iterative spark job that spills 1+ GB to disk 
per iteration on a system with limited disk space. I believe there's 
enough space if spark would clean up unused data from previous iterations, 
but as it stands the number of iterations I can run is limited by 
available disk space.


I found a thread on the usage of spark.cleaner.ttl on the old Spark Users 
Google group here:


https://groups.google.com/forum/#!topic/spark-users/9ebKcNCDih4

I think this setting may be what I'm looking for, however the cleaner 
seems to delete data that's still in use. The effect is I get bizarre 
exceptions from Spark complaining about missing broadcast data or 
ArrayIndexOutOfBounds. When is spark.cleaner.ttl safe to use? Is it 
supposed to delete in-use data or is this a bug/shortcoming?


Cheers,

Michael