Re: spark.cleaner.ttl and spark.streaming.unpersist

2014-09-10 Thread Luis Ángel Vicente Sánchez
I somehow missed that parameter when I was reviewing the documentation,
that should do the trick! Thank you!

2014-09-10 2:10 GMT+01:00 Shao, Saisai saisai.s...@intel.com:

  Hi Luis,



 The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be
 used to remove useless timeout streaming data, the difference is that
 “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming
 input data, but also Spark’s useless metadata; while
 “spark.streaming.unpersist” is reference-based cleaning mechanism,
 streaming data will be removed when out of slide duration.



 Both these two parameter can alleviate the memory occupation of Spark
 Streaming. But if the data is flooded into Spark Streaming when start up
 like your situation using Kafka, these two parameters cannot well mitigate
 the problem. Actually you need to control the input data rate to not inject
 so fast, you can try “spark.straming.receiver.maxRate” to control the
 inject rate.



 Thanks

 Jerry



 *From:* Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
 *Sent:* Wednesday, September 10, 2014 5:21 AM
 *To:* user@spark.apache.org
 *Subject:* spark.cleaner.ttl and spark.streaming.unpersist



 The executors of my spark streaming application are being killed due to
 memory issues. The memory consumption is quite high on startup because is
 the first run and there are quite a few events on the kafka queues that are
 consumed at a rate of 100K events per sec.

 I wonder if it's recommended to use spark.cleaner.ttl and
 spark.streaming.unpersist together to mitigate that problem. And I also
 wonder if new RDD are being batched while a RDD is being processed.

 Regards,

 Luis



Re: spark.cleaner.ttl and spark.streaming.unpersist

2014-09-10 Thread Tim Smith
I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case,
the receivers die within an hour because Yarn kills the containers for high
memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I
don't think stale RDDs are an issue here. I did a jmap -histo on a couple
of running receiver processes and in a heap of 30G, roughly ~16G is taken
by [B which is byte arrays.

Still investigating more and would appreciate pointers for troubleshooting.
I have dumped the heap of a receiver and will try to go over it.




On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez 
langel.gro...@gmail.com wrote:

 I somehow missed that parameter when I was reviewing the documentation,
 that should do the trick! Thank you!

 2014-09-10 2:10 GMT+01:00 Shao, Saisai saisai.s...@intel.com:

  Hi Luis,



 The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be
 used to remove useless timeout streaming data, the difference is that
 “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming
 input data, but also Spark’s useless metadata; while
 “spark.streaming.unpersist” is reference-based cleaning mechanism,
 streaming data will be removed when out of slide duration.



 Both these two parameter can alleviate the memory occupation of Spark
 Streaming. But if the data is flooded into Spark Streaming when start up
 like your situation using Kafka, these two parameters cannot well mitigate
 the problem. Actually you need to control the input data rate to not inject
 so fast, you can try “spark.straming.receiver.maxRate” to control the
 inject rate.



 Thanks

 Jerry



 *From:* Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
 *Sent:* Wednesday, September 10, 2014 5:21 AM
 *To:* user@spark.apache.org
 *Subject:* spark.cleaner.ttl and spark.streaming.unpersist



 The executors of my spark streaming application are being killed due to
 memory issues. The memory consumption is quite high on startup because is
 the first run and there are quite a few events on the kafka queues that are
 consumed at a rate of 100K events per sec.

 I wonder if it's recommended to use spark.cleaner.ttl and
 spark.streaming.unpersist together to mitigate that problem. And I also
 wonder if new RDD are being batched while a RDD is being processed.

 Regards,

 Luis





Re: spark.cleaner.ttl and spark.streaming.unpersist

2014-09-10 Thread Yana Kadiyska
Tim, I asked a similar question twice:
here
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-executors-to-stay-alive-tt12940.html
and here
http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Executor-OOM-tt12383.html

and have not yet received any responses. I noticed that the heapdump only
contains a very large byte array consuming about 66%(the second link
contains a picture of my heap -- I ran with a small heap to be able to get
the failure quickly)

I don't have solutions but wanted to affirm that I've observed a similar
situation...

On Wed, Sep 10, 2014 at 2:24 PM, Tim Smith secs...@gmail.com wrote:

 I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case,
 the receivers die within an hour because Yarn kills the containers for high
 memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I
 don't think stale RDDs are an issue here. I did a jmap -histo on a couple
 of running receiver processes and in a heap of 30G, roughly ~16G is taken
 by [B which is byte arrays.

 Still investigating more and would appreciate pointers for
 troubleshooting. I have dumped the heap of a receiver and will try to go
 over it.




 On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez 
 langel.gro...@gmail.com wrote:

 I somehow missed that parameter when I was reviewing the documentation,
 that should do the trick! Thank you!

 2014-09-10 2:10 GMT+01:00 Shao, Saisai saisai.s...@intel.com:

  Hi Luis,



 The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be
 used to remove useless timeout streaming data, the difference is that
 “spark.cleaner.ttl” is time-based cleaner, it does not only clean streaming
 input data, but also Spark’s useless metadata; while
 “spark.streaming.unpersist” is reference-based cleaning mechanism,
 streaming data will be removed when out of slide duration.



 Both these two parameter can alleviate the memory occupation of Spark
 Streaming. But if the data is flooded into Spark Streaming when start up
 like your situation using Kafka, these two parameters cannot well mitigate
 the problem. Actually you need to control the input data rate to not inject
 so fast, you can try “spark.straming.receiver.maxRate” to control the
 inject rate.



 Thanks

 Jerry



 *From:* Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
 *Sent:* Wednesday, September 10, 2014 5:21 AM
 *To:* user@spark.apache.org
 *Subject:* spark.cleaner.ttl and spark.streaming.unpersist



 The executors of my spark streaming application are being killed due to
 memory issues. The memory consumption is quite high on startup because is
 the first run and there are quite a few events on the kafka queues that are
 consumed at a rate of 100K events per sec.

 I wonder if it's recommended to use spark.cleaner.ttl and
 spark.streaming.unpersist together to mitigate that problem. And I also
 wonder if new RDD are being batched while a RDD is being processed.

 Regards,

 Luis






Re: spark.cleaner.ttl and spark.streaming.unpersist

2014-09-10 Thread Tim Smith
I switched from Yarn to StandAlone mode and haven't had OOM issue yet.
However, now I have Akka issues killing the executor:

2014-09-11 02:43:34,543 INFO akka.actor.LocalActorRef: Message
[akka.remote.transport.ActorTransportAdapter$DisassociateUnderlying]
from Actor[akka://sparkWorker/deadLetters] to
Actor[akka://sparkWorker/system/transports/akkaprotocolmanager.tcp0/akkaProtocol-tcp%3A%2F%2FsparkWorker%4010.2.16.8%3A44405-6#1549270895]
was not delivered. [2] dead letters encountered. This logging can be
turned off or adjusted with configuration settings
'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'.

Before I switched from Yarn to Standalone, I tried looking at heaps of
running executors. What I found odd was that while both - jmap
histo:live and jmap histo showed heap usage in few hundreds of MBytes,
Yarn kept showing that memory utilization is in several Gigabytes -
eventually leading to the container being killed.

I would appreciate if someone can duplicate what I am seeing. Basically:
1. Tail your yarn container logs and see what it is reporting as
memory used by the JVM
2. In parallel, run jmap -histo:live pid or jmap histo pid on
the executor process.

They should be about the same, right?

Also, in the heap dump, 99% of the heap seems to be occupied with
unreachable objects (and most of it is byte arrays).




On Wed, Sep 10, 2014 at 12:06 PM, Tim Smith secs...@gmail.com wrote:
 Actually, I am not doing any explicit shuffle/updateByKey or other
 transform functions. In my program flow, I take in data from Kafka,
 match each message against a list of regex and then if a msg matches a
 regex then extract groups, stuff them in json and push out back to
 kafka (different topic). So there is really no dependency between two
 messages in terms of processing. Here's my container histogram:
 http://pastebin.com/s3nAT3cY

 Essentially, my app is a cluster grep on steroids.



 On Wed, Sep 10, 2014 at 11:34 AM, Yana Kadiyska yana.kadiy...@gmail.com 
 wrote:
 Tim, I asked a similar question twice:
 here
 http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-executors-to-stay-alive-tt12940.html
 and here
 http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Executor-OOM-tt12383.html

 and have not yet received any responses. I noticed that the heapdump only
 contains a very large byte array consuming about 66%(the second link
 contains a picture of my heap -- I ran with a small heap to be able to get
 the failure quickly)

 I don't have solutions but wanted to affirm that I've observed a similar
 situation...

 On Wed, Sep 10, 2014 at 2:24 PM, Tim Smith secs...@gmail.com wrote:

 I am using Spark 1.0.0 (on CDH 5.1) and have a similar issue. In my case,
 the receivers die within an hour because Yarn kills the containers for high
 memory usage. I set ttl.cleaner to 30 seconds but that didn't help. So I
 don't think stale RDDs are an issue here. I did a jmap -histo on a couple
 of running receiver processes and in a heap of 30G, roughly ~16G is taken by
 [B which is byte arrays.

 Still investigating more and would appreciate pointers for
 troubleshooting. I have dumped the heap of a receiver and will try to go
 over it.




 On Wed, Sep 10, 2014 at 1:43 AM, Luis Ángel Vicente Sánchez
 langel.gro...@gmail.com wrote:

 I somehow missed that parameter when I was reviewing the documentation,
 that should do the trick! Thank you!

 2014-09-10 2:10 GMT+01:00 Shao, Saisai saisai.s...@intel.com:

 Hi Luis,



 The parameter “spark.cleaner.ttl” and “spark.streaming.unpersist” can be
 used to remove useless timeout streaming data, the difference is that
 “spark.cleaner.ttl” is time-based cleaner, it does not only clean 
 streaming
 input data, but also Spark’s useless metadata; while
 “spark.streaming.unpersist” is reference-based cleaning mechanism, 
 streaming
 data will be removed when out of slide duration.



 Both these two parameter can alleviate the memory occupation of Spark
 Streaming. But if the data is flooded into Spark Streaming when start up
 like your situation using Kafka, these two parameters cannot well mitigate
 the problem. Actually you need to control the input data rate to not 
 inject
 so fast, you can try “spark.straming.receiver.maxRate” to control the 
 inject
 rate.



 Thanks

 Jerry



 From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
 Sent: Wednesday, September 10, 2014 5:21 AM
 To: user@spark.apache.org
 Subject: spark.cleaner.ttl and spark.streaming.unpersist



 The executors of my spark streaming application are being killed due to
 memory issues. The memory consumption is quite high on startup because is
 the first run and there are quite a few events on the kafka queues that 
 are
 consumed at a rate of 100K events per sec.

 I wonder if it's recommended to use spark.cleaner.ttl and
 spark.streaming.unpersist together to mitigate that problem. And I also
 wonder if new RDD are being batched while a RDD is being

spark.cleaner.ttl and spark.streaming.unpersist

2014-09-09 Thread Luis Ángel Vicente Sánchez
The executors of my spark streaming application are being killed due to
memory issues. The memory consumption is quite high on startup because is
the first run and there are quite a few events on the kafka queues that are
consumed at a rate of 100K events per sec.

I wonder if it's recommended to use spark.cleaner.ttl and
spark.streaming.unpersist together to mitigate that problem. And I also
wonder if new RDD are being batched while a RDD is being processed.

Regards,

Luis