Re: Spark streaming Processing time keeps increasing

2015-07-19 Thread N B
Hi TD,

Yay! Thanks for the help. That solved our issue of ever increasing
processing time. I added filter functions to all our reduceByKeyAndWindow()
operations and now its been stable for over 2 days already! :-).

One small feedback about the API though. The one that accepts the filter
function also expects either a partitioner or number of partitions. There
should be another call there that just accepts the filter function in
addition to other parameters but uses the default Parallelism from the
spark context set already. I ended up doing that in our code but would be
nice to have that as an overloaded call also.

Also, I am still trying to fix the issue in the reduceByKeyAndWindow
operation that throws those exceptions. However, I do believe I know what
is wrong there as the logic itself may be wrong. I am looking for an
alternative of doing it (preferably purely using Spark transformations
only) and will pose that question on this mailing list separately.

Thanks
Nikunj


On Fri, Jul 17, 2015 at 2:45 AM, N B nb.nos...@gmail.com wrote:

 Hi TD,

 Thanks for the response. I do believe I understand the concept and the
 need for the filterfunction now. I made the requisite code changes and
 keeping it running overnight to see the effect of it. Hopefully this should
 fix our issue.

 However, there was one place where I encountered a followup issue and had
 to disable that reduce operation for the moment in order to proceed with my
 testing for the rest of the changes.

 This particular reduceByKeyAndWindow operates on a key-value pair String,
 HashSetLong. Once the size of a HashSet drops to 0, we remove the
 corresponding Key with the filterfunction specified as

 ( p - ! p._2().isEmpty())

 That looks about right to me. However, soon after the first slide occurs
 in this window, its throwing the following exceptions and aborting that
 batch. The stack trace is below. I am not quite sure what to make of it
 (perhaps partly due to the late hour :-D ). Any idea what could be wrong
 here? As far as I know, String and HashSetLong should hash quite
 consistently.

 Also, if there is no way to avoid this issue, I am thinking of rewriting
 that part of the code to use a foldByKey or combineByKey operation instead
 of reduceByKey.

 Thanks
 Nikunj


 java.lang.Exception: Neither previous window has value for key, nor new
 values found. Are you sure your key class hashes consistently?
 at
 org.apache.spark.streaming.dstream.ReducedWindowedDStream$$anonfun$4.apply(ReducedWindowedDStream.scala:147)
 at
 org.apache.spark.streaming.dstream.ReducedWindowedDStream$$anonfun$4.apply(ReducedWindowedDStream.scala:134)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:700)
 at
 org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:700)
 at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
 at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
 at
 org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:276)
 at
 org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
 at
 org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:139)
 at
 org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:135)
 at
 scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
 at scala.collection.immutable.List.foreach(List.scala:318)
 at
 scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
 at
 org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:135)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
 at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
 at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
 at
 

Re: Spark streaming Processing time keeps increasing

2015-07-17 Thread Tathagata Das
Responses inline.

On Thu, Jul 16, 2015 at 9:27 PM, N B nb.nos...@gmail.com wrote:

 Hi TD,

 Yes, we do have the invertible function provided. However, I am not sure I
 understood how to use the filterFunction. Is there an example somewhere
 showing its usage?

 The header comment on the function says :

 * @param filterFunc function to filter expired key-value pairs;
 *   only pairs that satisfy the function are retained
 *   set this to null if you do not want to filter

 These are the questions I am confused about:

 1. The code comment seems to imply that the filterFunc is only used to figure 
 out which keyvalue pairs are used to form the window but how does it actually 
 help expire the old data?

 It applies to filter and retains only the keys that pass through it.
Underneath, its all RDDs, so only the filtered K, V pairs are retained (and
cached) for future batches.


 2. Shouldn't the values that are falling off of the window period 
 automatically be removed without the need for an additional filter function?

 It cannot figure out the falling off the in this incremental version.
For example, if you are counting over the window by adding (reduceFunc) and
subtracting (invRedueFunc), unless your provided the concept of a zero ,
it will not know when to throw away the keys that have become 0. Over a
window, the count may increase from nothing to 10, and then reduce 0
when the window moves forward, but it does not know 0 means dont track
it any more. The filter function introduces that concept of zero.



 3. Which side of the key-value pairs are passed to this function? The ones 
 that are coming in or the ones that are going out of window or both?

 All of the k,v pairs that are being tracked.



 4. The key-value pairs in use in a particular reduceByKeyAndWindow operation 
 may not have  the requisite info (such as a timestamp or similar eg if its 
 aggregated data) to help determine whether to return true or false. What is 
 the semantic expected here?


 I am not sure I get your question. It is upto you to provide sufficient
information as part of the value so that you can take that decision in
the filter function.



 As always, thanks for your help

 Nikunj





 On Thu, Jul 16, 2015 at 1:16 AM, Tathagata Das t...@databricks.com
 wrote:

 MAke sure you provide the filterFunction with the invertible
 reduceByKeyAndWindow. Otherwise none of the keys will get removed, and the
 key space will continue increase. This is what is leading to the lag. So
 use the filtering function to filter out the keys that are not needed any
 more.

 On Thu, Jul 16, 2015 at 12:44 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 What is your data volume? Are you having checkpointing/WAL enabled? In
 that case make sure you are having SSD disks as this behavior is mainly due
 to the IO wait.

 Thanks
 Best Regards

 On Thu, Jul 16, 2015 at 8:43 AM, N B nb.nos...@gmail.com wrote:

 Hello,

 We have a Spark streaming application and the problem that we are
 encountering is that the batch processing time keeps on increasing and
 eventually causes the application to start lagging. I am hoping that
 someone here can point me to any underlying cause of why this might happen.

 The batch interval is 1 minute as of now and the app does some maps,
 filters, joins and reduceByKeyAndWindow operations. All the reduces are
 invertible functions and so we do provide the inverse-reduce functions in
 all those. The largest window size we have is 1 hour right now. When the
 app is started, we see that the batch processing time is between 20 and 30
 seconds. It keeps creeping up slowly and by the time it hits the 1 hour
 mark, it somewhere around 35-40 seconds. Somewhat expected and still not
 bad!

 I would expect that since the largest window we have is 1 hour long,
 the application should stabilize around the 1 hour mark and start
 processing subsequent batches within that 35-40 second zone. However, that
 is not what is happening. The processing time still keeps increasing and
 eventually in a few hours it exceeds 1 minute mark and then starts lagging.
 Eventually the lag builds up and becomes in minutes at which point we have
 to restart the system.

 Any pointers on why this could be happening and what we can do to
 troubleshoot further?

 Thanks
 Nikunj







Re: Spark streaming Processing time keeps increasing

2015-07-17 Thread N B
Hi TD,

Thanks for the response. I do believe I understand the concept and the need
for the filterfunction now. I made the requisite code changes and keeping
it running overnight to see the effect of it. Hopefully this should fix our
issue.

However, there was one place where I encountered a followup issue and had
to disable that reduce operation for the moment in order to proceed with my
testing for the rest of the changes.

This particular reduceByKeyAndWindow operates on a key-value pair String,
HashSetLong. Once the size of a HashSet drops to 0, we remove the
corresponding Key with the filterfunction specified as

( p - ! p._2().isEmpty())

That looks about right to me. However, soon after the first slide occurs in
this window, its throwing the following exceptions and aborting that batch.
The stack trace is below. I am not quite sure what to make of it (perhaps
partly due to the late hour :-D ). Any idea what could be wrong here? As
far as I know, String and HashSetLong should hash quite consistently.

Also, if there is no way to avoid this issue, I am thinking of rewriting
that part of the code to use a foldByKey or combineByKey operation instead
of reduceByKey.

Thanks
Nikunj


java.lang.Exception: Neither previous window has value for key, nor new
values found. Are you sure your key class hashes consistently?
at
org.apache.spark.streaming.dstream.ReducedWindowedDStream$$anonfun$4.apply(ReducedWindowedDStream.scala:147)
at
org.apache.spark.streaming.dstream.ReducedWindowedDStream$$anonfun$4.apply(ReducedWindowedDStream.scala:134)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:700)
at
org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$41$$anonfun$apply$42.apply(PairRDDFunctions.scala:700)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:389)
at
org.apache.spark.storage.MemoryStore.unrollSafely(MemoryStore.scala:276)
at
org.apache.spark.CacheManager.putInBlockManager(CacheManager.scala:171)
at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:78)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:242)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:139)
at
org.apache.spark.rdd.CoGroupedRDD$$anonfun$compute$2.apply(CoGroupedRDD.scala:135)
at
scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
at scala.collection.immutable.List.foreach(List.scala:318)
at
scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
at org.apache.spark.rdd.CoGroupedRDD.compute(CoGroupedRDD.scala:135)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:70)
at
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
at org.apache.spark.scheduler.Task.run(Task.scala:70)
at
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:213)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)




On Fri, Jul 17, 2015 at 12:39 AM, Tathagata Das t...@databricks.com wrote:

 Responses inline.

 On Thu, Jul 16, 2015 at 9:27 PM, N B nb.nos...@gmail.com wrote:

 Hi TD,

 Yes, we do have the invertible function provided. However, I am not sure
 I understood how to use the filterFunction. Is there an example
 somewhere showing its usage?

 The header comment on the function says :

 * @param filterFunc function to filter expired key-value pairs;
 *   only pairs that satisfy the function are retained
 *   set this to null if you do not want to filter


Re: Spark streaming Processing time keeps increasing

2015-07-16 Thread Tathagata Das
MAke sure you provide the filterFunction with the invertible
reduceByKeyAndWindow. Otherwise none of the keys will get removed, and the
key space will continue increase. This is what is leading to the lag. So
use the filtering function to filter out the keys that are not needed any
more.

On Thu, Jul 16, 2015 at 12:44 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 What is your data volume? Are you having checkpointing/WAL enabled? In
 that case make sure you are having SSD disks as this behavior is mainly due
 to the IO wait.

 Thanks
 Best Regards

 On Thu, Jul 16, 2015 at 8:43 AM, N B nb.nos...@gmail.com wrote:

 Hello,

 We have a Spark streaming application and the problem that we are
 encountering is that the batch processing time keeps on increasing and
 eventually causes the application to start lagging. I am hoping that
 someone here can point me to any underlying cause of why this might happen.

 The batch interval is 1 minute as of now and the app does some maps,
 filters, joins and reduceByKeyAndWindow operations. All the reduces are
 invertible functions and so we do provide the inverse-reduce functions in
 all those. The largest window size we have is 1 hour right now. When the
 app is started, we see that the batch processing time is between 20 and 30
 seconds. It keeps creeping up slowly and by the time it hits the 1 hour
 mark, it somewhere around 35-40 seconds. Somewhat expected and still not
 bad!

 I would expect that since the largest window we have is 1 hour long, the
 application should stabilize around the 1 hour mark and start processing
 subsequent batches within that 35-40 second zone. However, that is not what
 is happening. The processing time still keeps increasing and eventually in
 a few hours it exceeds 1 minute mark and then starts lagging. Eventually
 the lag builds up and becomes in minutes at which point we have to restart
 the system.

 Any pointers on why this could be happening and what we can do to
 troubleshoot further?

 Thanks
 Nikunj





Re: Spark streaming Processing time keeps increasing

2015-07-16 Thread N B
Thanks Akhil. For doing reduceByKeyAndWindow, one has to have checkpointing
enabled. So, yes we do have it enabled. But not Write Ahead Log because we
don't have a need for recovery and we do not recover the process state on
restart.

I don't know if IO Wait fully explains the increasing processing time.
Below is a full minute of 'sar' output every 2 seconds. The iowait values
don't seem too bad to me except for a brief small spike in the middle.
Also, how does one explain the continued degradation of processing time
even beyond the largest window interval?

Thanks
Nikunj


$ sar 2 30
Linux 3.13.0-48-generic (ip-X-X-X-X)07/16/2015  _x86_64_
 (16 CPU)

01:11:14 AM CPU %user %nice   %system   %iowait%steal
%idle
01:11:16 AM all 66.70  0.03 11.10  0.03  0.00
22.13
01:11:18 AM all 79.99  0.00 10.81  0.00  0.03
 9.17
01:11:20 AM all 62.66  0.03 10.84  0.00  0.03
26.43
01:11:22 AM all 68.59  0.00 10.83  0.00  0.10
20.49
01:11:24 AM all 77.74  0.00 10.83  0.00  0.03
11.40
01:11:26 AM all 65.01  0.00 10.83  0.03  0.07
24.06
01:11:28 AM all 66.33  0.00 10.87  0.00  0.03
22.77
01:11:30 AM all 72.38  0.03 12.48  0.54  0.06
14.50
01:11:32 AM all 68.35  0.00 12.98  7.46  0.03
11.18
01:11:34 AM all 75.94  0.03 14.02  3.27  0.03
 6.71
01:11:36 AM all 68.60  0.00 14.34  2.76  0.03
14.27
01:11:38 AM all 61.99  0.03 13.34  0.07  0.07
24.51
01:11:40 AM all 52.21  0.03 12.79  1.04  0.13
33.79
01:11:42 AM all 37.91  0.03 12.43  0.03  0.10
49.48
01:11:44 AM all 26.92  0.00 11.68  0.14  0.10
61.16
01:11:46 AM all 24.86  0.00 12.07  0.00  0.10
62.97
01:11:48 AM all 25.49  0.00 11.96  0.00  0.10
62.45
01:11:50 AM all 21.16  0.00 12.35  0.03  0.14
66.32
01:11:52 AM all 29.89  0.00 12.06  0.03  0.10
57.91
01:11:54 AM all 26.77  0.00 11.81  0.00  0.10
61.32
01:11:56 AM all 25.34  0.03 11.81  0.03  0.14
62.65
01:11:58 AM all 22.42  0.00 12.60  0.00  0.10
64.88
01:12:00 AM all 30.27  0.00 12.10  0.03  0.14
57.46
01:12:02 AM all 80.59  0.00 10.58  0.35  0.03
 8.44
01:12:04 AM all 49.05  0.00 12.89  0.66  0.07
37.32
01:12:06 AM all 31.21  0.03 13.54  6.54  0.17
48.50
01:12:08 AM all 31.66  0.00 13.26  6.30  0.10
48.67
01:12:10 AM all 36.19  0.00 12.87  3.04  0.14
47.76
01:12:12 AM all 82.63  0.03 10.60  0.00  0.03
 6.70
01:12:14 AM all 77.72  0.00 10.66  0.00  0.03
11.59
Average:all 52.22  0.01 12.04  1.08  0.08
34.58


On Thu, Jul 16, 2015 at 12:44 AM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 What is your data volume? Are you having checkpointing/WAL enabled? In
 that case make sure you are having SSD disks as this behavior is mainly due
 to the IO wait.

 Thanks
 Best Regards

 On Thu, Jul 16, 2015 at 8:43 AM, N B nb.nos...@gmail.com wrote:

 Hello,

 We have a Spark streaming application and the problem that we are
 encountering is that the batch processing time keeps on increasing and
 eventually causes the application to start lagging. I am hoping that
 someone here can point me to any underlying cause of why this might happen.

 The batch interval is 1 minute as of now and the app does some maps,
 filters, joins and reduceByKeyAndWindow operations. All the reduces are
 invertible functions and so we do provide the inverse-reduce functions in
 all those. The largest window size we have is 1 hour right now. When the
 app is started, we see that the batch processing time is between 20 and 30
 seconds. It keeps creeping up slowly and by the time it hits the 1 hour
 mark, it somewhere around 35-40 seconds. Somewhat expected and still not
 bad!

 I would expect that since the largest window we have is 1 hour long, the
 application should stabilize around the 1 hour mark and start processing
 subsequent batches within that 35-40 second zone. However, that is not what
 is happening. The processing time still keeps increasing and eventually in
 a few hours it exceeds 1 minute mark and then starts lagging. Eventually
 the lag builds up and becomes in minutes at which point we have to restart
 the system.

 Any pointers on why this could be happening and what we can do to
 troubleshoot further?

 Thanks
 Nikunj





Re: Spark streaming Processing time keeps increasing

2015-07-16 Thread N B
Hi TD,

Yes, we do have the invertible function provided. However, I am not sure I
understood how to use the filterFunction. Is there an example somewhere
showing its usage?

The header comment on the function says :

* @param filterFunc function to filter expired key-value pairs;
*   only pairs that satisfy the function are retained
*   set this to null if you do not want to filter

These are the questions I am confused about:

1. The code comment seems to imply that the filterFunc is only used to
figure out which keyvalue pairs are used to form the window but how
does it actually help expire the old data?

2. Shouldn't the values that are falling off of the window period
automatically be removed without the need for an additional filter
function?

3. Which side of the key-value pairs are passed to this function? The
ones that are coming in or the ones that are going out of window or
both?

4. The key-value pairs in use in a particular reduceByKeyAndWindow
operation may not have  the requisite info (such as a timestamp or
similar eg if its aggregated data) to help determine whether to return
true or false. What is the semantic expected here?


As always, thanks for your help

Nikunj





On Thu, Jul 16, 2015 at 1:16 AM, Tathagata Das t...@databricks.com wrote:

 MAke sure you provide the filterFunction with the invertible
 reduceByKeyAndWindow. Otherwise none of the keys will get removed, and the
 key space will continue increase. This is what is leading to the lag. So
 use the filtering function to filter out the keys that are not needed any
 more.

 On Thu, Jul 16, 2015 at 12:44 AM, Akhil Das ak...@sigmoidanalytics.com
 wrote:

 What is your data volume? Are you having checkpointing/WAL enabled? In
 that case make sure you are having SSD disks as this behavior is mainly due
 to the IO wait.

 Thanks
 Best Regards

 On Thu, Jul 16, 2015 at 8:43 AM, N B nb.nos...@gmail.com wrote:

 Hello,

 We have a Spark streaming application and the problem that we are
 encountering is that the batch processing time keeps on increasing and
 eventually causes the application to start lagging. I am hoping that
 someone here can point me to any underlying cause of why this might happen.

 The batch interval is 1 minute as of now and the app does some maps,
 filters, joins and reduceByKeyAndWindow operations. All the reduces are
 invertible functions and so we do provide the inverse-reduce functions in
 all those. The largest window size we have is 1 hour right now. When the
 app is started, we see that the batch processing time is between 20 and 30
 seconds. It keeps creeping up slowly and by the time it hits the 1 hour
 mark, it somewhere around 35-40 seconds. Somewhat expected and still not
 bad!

 I would expect that since the largest window we have is 1 hour long, the
 application should stabilize around the 1 hour mark and start processing
 subsequent batches within that 35-40 second zone. However, that is not what
 is happening. The processing time still keeps increasing and eventually in
 a few hours it exceeds 1 minute mark and then starts lagging. Eventually
 the lag builds up and becomes in minutes at which point we have to restart
 the system.

 Any pointers on why this could be happening and what we can do to
 troubleshoot further?

 Thanks
 Nikunj