[jira] [Comment Edited] (SPARK-13349) adding a split and union to a streaming application cause big performance hit

2016-02-19 Thread krishna ramachandran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15154558#comment-15154558
 ] 

krishna ramachandran edited comment on SPARK-13349 at 2/19/16 6:01 PM:
---

enabling "cache" for a DStream causes the app to run out of memory. I believe 
this is a bug



was (Author: ramach1776):
enabling "cache" for a DStream causes the app to run out of memory


> adding a split and union to a streaming application cause big performance hit
> -
>
> Key: SPARK-13349
> URL: https://issues.apache.org/jira/browse/SPARK-13349
> Project: Spark
>  Issue Type: Improvement
>Affects Versions: 1.4.1
>Reporter: krishna ramachandran
>Priority: Critical
>
> We have a streaming application containing approximately 12 jobs every batch, 
> running in streaming mode (4 sec batches). Each job writes output to cassandra
> each job can contain several stages.
> job 1
> ---> receive Stream A --> map --> filter -> (union with another stream B) --> 
> map --> groupbykey --> transform --> reducebykey --> map
> we go thro' few more jobs of transforms and save to database. 
> Around stage 5, we union the output of Dstream from job 1 (in red) with 
> another stream (generated by split during job 2) and save that state
> It appears the whole execution thus far is repeated which is redundant (I can 
> see this in execution graph & also performance -> processing time). 
> Processing time per batch nearly doubles or triples.
> This additional & redundant processing cause each batch to run as much as 2.5 
> times slower compared to runs without the union - union for most batches does 
> not alter the original DStream (union with an empty set). If I cache the 
> DStream from job 1(red block output), performance improves substantially but 
> hit out of memory errors within few hours.
> What is the recommended way to cache/unpersist in such a scenario? there is 
> no dstream level "unpersist"
> setting "spark.streaming.unpersist" to true and 
> streamingContext.remember("duration") did not help. Still seeing out of 
> memory errors



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Reopened] (SPARK-13349) adding a split and union to a streaming application cause big performance hit

2016-02-19 Thread krishna ramachandran (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-13349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

krishna ramachandran reopened SPARK-13349:
--

enabling "cache" for a DStream causes the app to run out of memory


> adding a split and union to a streaming application cause big performance hit
> -
>
> Key: SPARK-13349
> URL: https://issues.apache.org/jira/browse/SPARK-13349
> Project: Spark
>  Issue Type: Improvement
>Affects Versions: 1.4.1
>Reporter: krishna ramachandran
>Priority: Critical
>
> We have a streaming application containing approximately 12 jobs every batch, 
> running in streaming mode (4 sec batches). Each job writes output to cassandra
> each job can contain several stages.
> job 1
> ---> receive Stream A --> map --> filter -> (union with another stream B) --> 
> map --> groupbykey --> transform --> reducebykey --> map
> we go thro' few more jobs of transforms and save to database. 
> Around stage 5, we union the output of Dstream from job 1 (in red) with 
> another stream (generated by split during job 2) and save that state
> It appears the whole execution thus far is repeated which is redundant (I can 
> see this in execution graph & also performance -> processing time). 
> Processing time per batch nearly doubles or triples.
> This additional & redundant processing cause each batch to run as much as 2.5 
> times slower compared to runs without the union - union for most batches does 
> not alter the original DStream (union with an empty set). If I cache the 
> DStream from job 1(red block output), performance improves substantially but 
> hit out of memory errors within few hours.
> What is the recommended way to cache/unpersist in such a scenario? there is 
> no dstream level "unpersist"
> setting "spark.streaming.unpersist" to true and 
> streamingContext.remember("duration") did not help. Still seeing out of 
> memory errors



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13349) adding a split and union to a streaming application cause big performance hit

2016-02-19 Thread krishna ramachandran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15154555#comment-15154555
 ] 

krishna ramachandran commented on SPARK-13349:
--

Hi Sean
I posted to user@
2 problems

1) not much traction
2) though I registered multiple times I keep getting this message at Nable

  "This post has NOT been accepted by the mailing list yet"
the message I posted is pasted below.

this is not just a question - it is a bug

We have a streaming application containing approximately 12 jobs every batch, 
running in streaming mode (4 sec batches). Each  job has several 
transformations and 1 action (output to cassandra) which causes the execution 
of the job (DAG) 

For example the first job, 

job 1 
---> receive Stream A --> map --> filter -> (union with another stream B) --> 
map --> groupbykey --> transform --> reducebykey --> map 

Likewise we go thro' few more transforms and save to database (job2, job3...) 

Recently we added a new transformation further downstream wherein we union the 
output of DStream from job 1 (in italics) with output from a new 
transformation(job 5). It appears the whole execution thus far is repeated 
which is redundant (I can see this in execution graph & also performance -> 
processing time). 

That is, with this additional transformation (union with a stream processed 
upstream) each batch runs as much as 2.5 times slower compared to runs without 
the union. If I cache the DStream from job 1(italics), performance improves 
substantially but hit out of memory errors within few hours. 

What is the recommended way to cache/unpersist in such a scenario? there is no 
dstream level "unpersist" 
setting "spark.streaming.unpersist" to true and 
streamingContext.remember("duration") did not help. 

> adding a split and union to a streaming application cause big performance hit
> -
>
> Key: SPARK-13349
> URL: https://issues.apache.org/jira/browse/SPARK-13349
> Project: Spark
>  Issue Type: Improvement
>Affects Versions: 1.4.1
>Reporter: krishna ramachandran
>Priority: Critical
>
> We have a streaming application containing approximately 12 jobs every batch, 
> running in streaming mode (4 sec batches). Each job writes output to cassandra
> each job can contain several stages.
> job 1
> ---> receive Stream A --> map --> filter -> (union with another stream B) --> 
> map --> groupbykey --> transform --> reducebykey --> map
> we go thro' few more jobs of transforms and save to database. 
> Around stage 5, we union the output of Dstream from job 1 (in red) with 
> another stream (generated by split during job 2) and save that state
> It appears the whole execution thus far is repeated which is redundant (I can 
> see this in execution graph & also performance -> processing time). 
> Processing time per batch nearly doubles or triples.
> This additional & redundant processing cause each batch to run as much as 2.5 
> times slower compared to runs without the union - union for most batches does 
> not alter the original DStream (union with an empty set). If I cache the 
> DStream from job 1(red block output), performance improves substantially but 
> hit out of memory errors within few hours.
> What is the recommended way to cache/unpersist in such a scenario? there is 
> no dstream level "unpersist"
> setting "spark.streaming.unpersist" to true and 
> streamingContext.remember("duration") did not help. Still seeing out of 
> memory errors



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-13349) adding a split and union to a streaming application cause big performance hit

2016-02-17 Thread krishna ramachandran (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-13349?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15150893#comment-15150893
 ] 

krishna ramachandran commented on SPARK-13349:
--

i have simple synthetic example below. created 2 "raw streams" and job 1 is 
materialized when stream1 is output (some action print/save)

In job1 
val stream1 = ssc.union(rawStreams).filter(_.contains("Stream:first"))
save.stream1
...
..

job2 create another split using rawStreams and union with stream1

   val stream2 = ssc.union(rawStreams).filter(_.contains("Batch:second"))
   val stream3 = stream1.union(stream2)
   ..
   save.stream3

job2 is materialized and executed. This pattern is executed for every batch
Looking at visual DAG I see, job1 executes first graph and job2 computes both 
"stream1" and "stream2"

Caching DStream stream1 (result from job1) makes job2 go almost twice as fast

In our real app, we have 7 such jobs per batch and typically we union output of 
job5 with job1. That is, union output of 1 with stream generated during job5. 
Caching and reusing output of job1 (stream1) is very efficient (per batch 
execution is 2.5 times faster) - but we start seeing out of memory errors

I would like to be able to "unpersist" stream1 after the union (for that batch)


> adding a split and union to a streaming application cause big performance hit
> -
>
> Key: SPARK-13349
> URL: https://issues.apache.org/jira/browse/SPARK-13349
> Project: Spark
>  Issue Type: Improvement
>Affects Versions: 1.4.1
>Reporter: krishna ramachandran
>Priority: Critical
> Fix For: 1.4.2
>
>
> We have a streaming application containing approximately 12 jobs every batch, 
> running in streaming mode (4 sec batches). Each job writes output to cassandra
> each job can contain several stages.
> job 1
> ---> receive Stream A --> map --> filter -> (union with another stream B) --> 
> map --> groupbykey --> transform --> reducebykey --> map
> we go thro' few more jobs of transforms and save to database. 
> Around stage 5, we union the output of Dstream from job 1 (in red) with 
> another stream (generated by split during job 2) and save that state
> It appears the whole execution thus far is repeated which is redundant (I can 
> see this in execution graph & also performance -> processing time). 
> Processing time per batch nearly doubles or triples.
> This additional & redundant processing cause each batch to run as much as 2.5 
> times slower compared to runs without the union - union for most batches does 
> not alter the original DStream (union with an empty set). If I cache the 
> DStream from job 1(red block output), performance improves substantially but 
> hit out of memory errors within few hours.
> What is the recommended way to cache/unpersist in such a scenario? there is 
> no dstream level "unpersist"
> setting "spark.streaming.unpersist" to true and 
> streamingContext.remember("duration") did not help. Still seeing out of 
> memory errors



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-13349) adding a split and union to a streaming application cause big performance hit

2016-02-16 Thread krishna ramachandran (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-13349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

krishna ramachandran updated SPARK-13349:
-
Description: 
We have a streaming application containing approximately 12 jobs every batch, 
running in streaming mode (4 sec batches). Each job writes output to cassandra

each job can contain several stages.

job 1

---> receive Stream A --> map --> filter -> (union with another stream B) --> 
map --> groupbykey --> transform --> reducebykey --> map

we go thro' few more jobs of transforms and save to database. 

Around stage 5, we union the output of Dstream from job 1 (in red) with another 
stream (generated by split during job 2) and save that state

It appears the whole execution thus far is repeated which is redundant (I can 
see this in execution graph & also performance -> processing time). Processing 
time per batch nearly doubles or triples.

This additional & redundant processing cause each batch to run as much as 2.5 
times slower compared to runs without the union - union for most batches does 
not alter the original DStream (union with an empty set). If I cache the 
DStream from job 1(red block output), performance improves substantially but 
hit out of memory errors within few hours.

What is the recommended way to cache/unpersist in such a scenario? there is no 
dstream level "unpersist"

setting "spark.streaming.unpersist" to true and 
streamingContext.remember("duration") did not help. Still seeing out of memory 
errors

  was:
We have a streaming application containing approximately 12 stages every batch, 
running in streaming mode (4 sec batches). Each stage persists output to 
cassandra

the pipeline stages 
stage 1

---> receive Stream A --> map --> filter -> (union with another stream B) --> 
map --> groupbykey --> transform --> reducebykey --> map

we go thro' few more stages of transforms and save to database. 

Around stage 5, we union the output of Dstream from stage 1 (in red) with 
another stream (generated by split during stage 2) and save that state

It appears the whole execution thus far is repeated which is redundant (I can 
see this in execution graph & also performance -> processing time). Processing 
time per batch nearly doubles or triples.

This additional & redundant processing cause each batch to run as much as 2.5 
times slower compared to runs without the union - union for most batches does 
not alter the original DStream (union with an empty set). If I cache the 
DStream (red block output), performance improves substantially but hit out of 
memory errors within few hours.

What is the recommended way to cache/unpersist in such a scenario? there is no 
dstream level "unpersist"

setting "spark.streaming.unpersist" to true and 
streamingContext.remember("duration") did not help. Still seeing out of memory 
errors


> adding a split and union to a streaming application cause big performance hit
> -
>
> Key: SPARK-13349
> URL: https://issues.apache.org/jira/browse/SPARK-13349
> Project: Spark
>  Issue Type: Improvement
>Affects Versions: 1.4.1
>Reporter: krishna ramachandran
>Priority: Critical
> Fix For: 1.4.2
>
>
> We have a streaming application containing approximately 12 jobs every batch, 
> running in streaming mode (4 sec batches). Each job writes output to cassandra
> each job can contain several stages.
> job 1
> ---> receive Stream A --> map --> filter -> (union with another stream B) --> 
> map --> groupbykey --> transform --> reducebykey --> map
> we go thro' few more jobs of transforms and save to database. 
> Around stage 5, we union the output of Dstream from job 1 (in red) with 
> another stream (generated by split during job 2) and save that state
> It appears the whole execution thus far is repeated which is redundant (I can 
> see this in execution graph & also performance -> processing time). 
> Processing time per batch nearly doubles or triples.
> This additional & redundant processing cause each batch to run as much as 2.5 
> times slower compared to runs without the union - union for most batches does 
> not alter the original DStream (union with an empty set). If I cache the 
> DStream from job 1(red block output), performance improves substantially but 
> hit out of memory errors within few hours.
> What is the recommended way to cache/unpersist in such a scenario? there is 
> no dstream level "unpersist"
> setting "spark.streaming.unpersist" to true and 
> streamingContext.remember("duration") did not help. Still seeing out of 
> memory errors



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, 

[jira] [Updated] (SPARK-13349) adding a split and union to a streaming application cause big performance hit

2016-02-16 Thread krishna ramachandran (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-13349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

krishna ramachandran updated SPARK-13349:
-
Summary: adding a split and union to a streaming application cause big 
performance hit  (was: adding a split and union to a streaming application 
causes big performance hit)

> adding a split and union to a streaming application cause big performance hit
> -
>
> Key: SPARK-13349
> URL: https://issues.apache.org/jira/browse/SPARK-13349
> Project: Spark
>  Issue Type: Improvement
>Affects Versions: 1.4.1
>Reporter: krishna ramachandran
>Priority: Critical
> Fix For: 1.4.2
>
>
> We have a streaming application containing approximately 12 stages every 
> batch, running in streaming mode (4 sec batches). Each stage persists output 
> to cassandra
> the pipeline stages 
> stage 1
> ---> receive Stream A --> map --> filter -> (union with another stream B) --> 
> map --> groupbykey --> transform --> reducebykey --> map
> we go thro' few more stages of transforms and save to database. 
> Around stage 5, we union the output of Dstream from stage 1 (in red) with 
> another stream (generated by split during stage 2) and save that state
> It appears the whole execution thus far is repeated which is redundant (I can 
> see this in execution graph & also performance -> processing time). 
> Processing time per batch nearly doubles or triples.
> This additional & redundant processing cause each batch to run as much as 2.5 
> times slower compared to runs without the union - union for most batches does 
> not alter the original DStream (union with an empty set). If I cache the 
> DStream (red block output), performance improves substantially but hit out of 
> memory errors within few hours.
> What is the recommended way to cache/unpersist in such a scenario? there is 
> no dstream level "unpersist"
> setting "spark.streaming.unpersist" to true and 
> streamingContext.remember("duration") did not help. Still seeing out of 
> memory errors



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-13349) adding a split and union to a streaming application causes big performance hit

2016-02-16 Thread krishna ramachandran (JIRA)

 [ 
https://issues.apache.org/jira/browse/SPARK-13349?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

krishna ramachandran updated SPARK-13349:
-
Summary: adding a split and union to a streaming application causes big 
performance hit  (was: enabling cache causes out of memory error. Caching 
DStream helps reduce processing time in a streaming application but get out of 
memory errors)

> adding a split and union to a streaming application causes big performance hit
> --
>
> Key: SPARK-13349
> URL: https://issues.apache.org/jira/browse/SPARK-13349
> Project: Spark
>  Issue Type: Improvement
>Affects Versions: 1.4.1
>Reporter: krishna ramachandran
>Priority: Critical
> Fix For: 1.4.2
>
>
> We have a streaming application containing approximately 12 stages every 
> batch, running in streaming mode (4 sec batches). Each stage persists output 
> to cassandra
> the pipeline stages 
> stage 1
> ---> receive Stream A --> map --> filter -> (union with another stream B) --> 
> map --> groupbykey --> transform --> reducebykey --> map
> we go thro' few more stages of transforms and save to database. 
> Around stage 5, we union the output of Dstream from stage 1 (in red) with 
> another stream (generated by split during stage 2) and save that state
> It appears the whole execution thus far is repeated which is redundant (I can 
> see this in execution graph & also performance -> processing time). 
> Processing time per batch nearly doubles or triples.
> This additional & redundant processing cause each batch to run as much as 2.5 
> times slower compared to runs without the union - union for most batches does 
> not alter the original DStream (union with an empty set). If I cache the 
> DStream (red block output), performance improves substantially but hit out of 
> memory errors within few hours.
> What is the recommended way to cache/unpersist in such a scenario? there is 
> no dstream level "unpersist"
> setting "spark.streaming.unpersist" to true and 
> streamingContext.remember("duration") did not help. Still seeing out of 
> memory errors



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-13349) enabling cache causes out of memory error. Caching DStream helps reduce processing time in a streaming application but get out of memory errors

2016-02-16 Thread krishna ramachandran (JIRA)
krishna ramachandran created SPARK-13349:


 Summary: enabling cache causes out of memory error. Caching 
DStream helps reduce processing time in a streaming application but get out of 
memory errors
 Key: SPARK-13349
 URL: https://issues.apache.org/jira/browse/SPARK-13349
 Project: Spark
  Issue Type: Improvement
Affects Versions: 1.4.1
Reporter: krishna ramachandran
Priority: Critical
 Fix For: 1.4.2


We have a streaming application containing approximately 12 stages every batch, 
running in streaming mode (4 sec batches). Each stage persists output to 
cassandra

the pipeline stages 
stage 1

---> receive Stream A --> map --> filter -> (union with another stream B) --> 
map --> groupbykey --> transform --> reducebykey --> map

we go thro' few more stages of transforms and save to database. 

Around stage 5, we union the output of Dstream from stage 1 (in red) with 
another stream (generated by split during stage 2) and save that state

It appears the whole execution thus far is repeated which is redundant (I can 
see this in execution graph & also performance -> processing time). Processing 
time per batch nearly doubles or triples.

This additional & redundant processing cause each batch to run as much as 2.5 
times slower compared to runs without the union - union for most batches does 
not alter the original DStream (union with an empty set). If I cache the 
DStream (red block output), performance improves substantially but hit out of 
memory errors within few hours.

What is the recommended way to cache/unpersist in such a scenario? there is no 
dstream level "unpersist"

setting "spark.streaming.unpersist" to true and 
streamingContext.remember("duration") did not help. Still seeing out of memory 
errors



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org