Re: Spark Streaming + reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration
Is it possible to keep the events in memory rather than pushing them out to the file system? Ali -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reduceByWindow-reduceFunc-invReduceFunc-windowDuration-slideDuration-tp11591p11792.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming + reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration
Thank you. I will look into setting up a hadoop hdfs node. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reduceByWindow-reduceFunc-invReduceFunc-windowDuration-slideDuration-tp11591p11790.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming + reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration
That is required for driver fault-tolerance, as well as for some transformations like updateSTateByKey that persist information across batches. It must be a HDFS directory when running on a cluster. TD On Thu, Aug 7, 2014 at 4:25 PM, salemi wrote: > That is correct. I do scc.checkpOint("checkpoint"). Why is the checkpoint > required? > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reduceByWindow-reduceFunc-invReduceFunc-windowDuration-slideDuration-tp11591p11731.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Spark Streaming + reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration
That is correct. I do scc.checkpOint("checkpoint"). Why is the checkpoint required? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reduceByWindow-reduceFunc-invReduceFunc-windowDuration-slideDuration-tp11591p11731.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming + reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration
Are you running on a cluster but giving a local path in ssc.checkpoint(...) ? TD On Thu, Aug 7, 2014 at 3:24 PM, salemi wrote: > Hi, > > Thank you or your help. With the new code I am getting the following error > in the driver. What is going wrong here? > > 14/08/07 13:22:28 ERROR JobScheduler: Error running job streaming job > 1407450148000 ms.0 > org.apache.spark.SparkException: Checkpoint RDD CheckpointRDD[4528] at > apply > at List.scala:318(0) has different number of partitions than original RDD > MappedValuesRDD[4526] at mapValues at ReducedWindowedDStream.scala:169(1) > at > > org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(RDDCheckpointData.scala:98) > at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1164) > at > org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1166) > at > org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1166) > at scala.collection.immutable.List.foreach(List.scala:318) > at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1166) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1054) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1069) > at org.apache.spark.SparkContext.runJob(SparkContext.scala:1083) > at org.apache.spark.rdd.RDD.take(RDD.scala:989) > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$foreachFunc$2$1.apply(DStream.scala:593) > at > > org.apache.spark.streaming.dstream.DStream$$anonfun$foreachFunc$2$1.apply(DStream.scala:592) > at > > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) > at > > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) > at > > org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) > at scala.util.Try$.apply(Try.scala:161) > at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) > at > > org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) > at > > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) > at > > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) > at java.lang.Thread.run(Thread.java:722) > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reduceByWindow-reduceFunc-invReduceFunc-windowDuration-slideDuration-tp11591p11727.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Spark Streaming + reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration
Hi, Thank you or your help. With the new code I am getting the following error in the driver. What is going wrong here? 14/08/07 13:22:28 ERROR JobScheduler: Error running job streaming job 1407450148000 ms.0 org.apache.spark.SparkException: Checkpoint RDD CheckpointRDD[4528] at apply at List.scala:318(0) has different number of partitions than original RDD MappedValuesRDD[4526] at mapValues at ReducedWindowedDStream.scala:169(1) at org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(RDDCheckpointData.scala:98) at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1164) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1166) at org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1166) at scala.collection.immutable.List.foreach(List.scala:318) at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1166) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1054) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1069) at org.apache.spark.SparkContext.runJob(SparkContext.scala:1083) at org.apache.spark.rdd.RDD.take(RDD.scala:989) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachFunc$2$1.apply(DStream.scala:593) at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachFunc$2$1.apply(DStream.scala:592) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40) at scala.util.Try$.apply(Try.scala:161) at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32) at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603) at java.lang.Thread.run(Thread.java:722) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reduceByWindow-reduceFunc-invReduceFunc-windowDuration-slideDuration-tp11591p11727.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming + reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration
Okay, going back to your origin question, it wasnt clear what is the reduce function that you are trying to implement. Going by the 2nd example using window() operation, following by a count+filter (using sql), I am guessing you are trying to maintain a count of the all the active "states" in the last 15 minutes. One way to do this could be eventData.filter(_.state == "Active").countByWindow(Minutes(15), Seconds(3)) This, underneath will do the counting in an incremental manner, using reduce and inverse reduce function. Another different reason could simply be that you really need more resources to process that much data. TD On Wed, Aug 6, 2014 at 7:58 PM, salemi wrote: > Hi, > > The reason I am looking to do it differently is because the latency and > batch processing times are bad about 40 sec. I took the times from the > Streaming UI. > > As you suggested I tried the window as below and still the times are bad. > val dStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap) > val eventData = dStream.map(_._2).map(_.split(",")).map(data => > Data(data(0), data(1), data(2), data(3), data(4))).window(Minutes(15), > Seconds(3)) > > val result = eventData.transform((rdd, time) => { > rdd.registerAsTable("data") > sql("SELECT count(state) FROM data WHERE state='Active'") > }) > result.print() > > Any suggestions? > > Ali > > > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reduceByWindow-reduceFunc-invReduceFunc-windowDuration-slideDuration-tp11591p11612.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >
Re: Spark Streaming + reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration
Hi, The reason I am looking to do it differently is because the latency and batch processing times are bad about 40 sec. I took the times from the Streaming UI. As you suggested I tried the window as below and still the times are bad. val dStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap) val eventData = dStream.map(_._2).map(_.split(",")).map(data => Data(data(0), data(1), data(2), data(3), data(4))).window(Minutes(15), Seconds(3)) val result = eventData.transform((rdd, time) => { rdd.registerAsTable("data") sql("SELECT count(state) FROM data WHERE state='Active'") }) result.print() Any suggestions? Ali -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reduceByWindow-reduceFunc-invReduceFunc-windowDuration-slideDuration-tp11591p11612.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming + reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration
Why isnt a simple window function sufficient? eventData.window(Minutes(15), Seconds(3)) will keep generating RDDs every 3 second, each containing last 15 minutes of data. TD On Wed, Aug 6, 2014 at 3:43 PM, salemi wrote: > Hi, > I have a DStream called eventData and it contains set of Data objects > defined as followed: > > case class Data(startDate: Long, endDate: Long, className: String, id: > String, state: String) > > How would the reducer and inverse reducer functions look like if I would > like to add the data for current 3 second and filter out the last 3 second > data? > > eventData.reduceByWindow(/reduceFunc/, /invReduceFunc/, Minutes(15), > Seconds(3)) > > Thanks > Ali > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reduceByWindow-reduceFunc-invReduceFunc-windowDuration-slideDuration-tp11591.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > - > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > >