Re: Spark Streaming + reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration

2014-08-08 Thread salemi
Is it possible to keep the events in memory rather than pushing them out to
the file system?

Ali



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reduceByWindow-reduceFunc-invReduceFunc-windowDuration-slideDuration-tp11591p11792.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming + reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration

2014-08-08 Thread salemi
Thank you. I will look into setting up a hadoop hdfs node.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reduceByWindow-reduceFunc-invReduceFunc-windowDuration-slideDuration-tp11591p11790.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming + reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration

2014-08-07 Thread Tathagata Das
That is required for driver fault-tolerance, as well as for some
transformations like updateSTateByKey that persist information across
batches. It must be a HDFS directory when running on a cluster.

TD


On Thu, Aug 7, 2014 at 4:25 PM, salemi  wrote:

> That is correct.  I do scc.checkpOint("checkpoint"). Why is the checkpoint
> required?
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reduceByWindow-reduceFunc-invReduceFunc-windowDuration-slideDuration-tp11591p11731.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Streaming + reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration

2014-08-07 Thread salemi
That is correct.  I do scc.checkpOint("checkpoint"). Why is the checkpoint
required?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reduceByWindow-reduceFunc-invReduceFunc-windowDuration-slideDuration-tp11591p11731.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming + reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration

2014-08-07 Thread Tathagata Das
Are you running on a cluster but giving a local path in ssc.checkpoint(...)
?

TD


On Thu, Aug 7, 2014 at 3:24 PM, salemi  wrote:

> Hi,
>
> Thank you or your help. With the new code I am getting the following error
> in the driver. What is going wrong here?
>
> 14/08/07 13:22:28 ERROR JobScheduler: Error running job streaming job
> 1407450148000 ms.0
> org.apache.spark.SparkException: Checkpoint RDD CheckpointRDD[4528] at
> apply
> at List.scala:318(0) has different number of partitions than original RDD
> MappedValuesRDD[4526] at mapValues at ReducedWindowedDStream.scala:169(1)
> at
>
> org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(RDDCheckpointData.scala:98)
> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1164)
> at
> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1166)
> at
> org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1166)
> at scala.collection.immutable.List.foreach(List.scala:318)
> at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1166)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1054)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1069)
> at org.apache.spark.SparkContext.runJob(SparkContext.scala:1083)
> at org.apache.spark.rdd.RDD.take(RDD.scala:989)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachFunc$2$1.apply(DStream.scala:593)
> at
>
> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachFunc$2$1.apply(DStream.scala:592)
> at
>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
> at
>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
> at
>
> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
> at scala.util.Try$.apply(Try.scala:161)
> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
> at
>
> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
> at
>
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
> at
>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
> at java.lang.Thread.run(Thread.java:722)
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reduceByWindow-reduceFunc-invReduceFunc-windowDuration-slideDuration-tp11591p11727.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Streaming + reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration

2014-08-07 Thread salemi
Hi,

Thank you or your help. With the new code I am getting the following error
in the driver. What is going wrong here?

14/08/07 13:22:28 ERROR JobScheduler: Error running job streaming job
1407450148000 ms.0
org.apache.spark.SparkException: Checkpoint RDD CheckpointRDD[4528] at apply
at List.scala:318(0) has different number of partitions than original RDD
MappedValuesRDD[4526] at mapValues at ReducedWindowedDStream.scala:169(1)
at
org.apache.spark.rdd.RDDCheckpointData.doCheckpoint(RDDCheckpointData.scala:98)
at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1164)
at
org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1166)
at
org.apache.spark.rdd.RDD$$anonfun$doCheckpoint$1.apply(RDD.scala:1166)
at scala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.rdd.RDD.doCheckpoint(RDD.scala:1166)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1054)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1069)
at org.apache.spark.SparkContext.runJob(SparkContext.scala:1083)
at org.apache.spark.rdd.RDD.take(RDD.scala:989)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachFunc$2$1.apply(DStream.scala:593)
at
org.apache.spark.streaming.dstream.DStream$$anonfun$foreachFunc$2$1.apply(DStream.scala:592)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
at scala.util.Try$.apply(Try.scala:161)
at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1110)
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:603)
at java.lang.Thread.run(Thread.java:722)




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reduceByWindow-reduceFunc-invReduceFunc-windowDuration-slideDuration-tp11591p11727.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming + reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration

2014-08-06 Thread Tathagata Das
Okay, going back to your origin question, it wasnt clear what is the reduce
function that you are trying to implement. Going by the 2nd example using
window() operation, following by a count+filter (using sql), I am guessing
you are trying to maintain a count of the all the active "states" in the
last 15 minutes. One way to do this could be

eventData.filter(_.state == "Active").countByWindow(Minutes(15), Seconds(3))

This, underneath will do the counting in an incremental manner, using
reduce and inverse reduce function.

Another different reason could simply be that you really need more
resources to process that much data.

TD

On Wed, Aug 6, 2014 at 7:58 PM, salemi  wrote:

> Hi,
>
> The reason I am looking to do it differently is because the latency and
> batch processing times are bad about 40 sec. I took the times from the
> Streaming UI.
>
> As you suggested I tried the window as below and still the times are bad.
>  val dStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap)
>   val eventData = dStream.map(_._2).map(_.split(",")).map(data =>
> Data(data(0), data(1), data(2), data(3), data(4))).window(Minutes(15),
> Seconds(3))
>
>   val result =  eventData.transform((rdd, time) => {
> rdd.registerAsTable("data")
> sql("SELECT count(state) FROM data WHERE state='Active'")
>   })
>   result.print()
>
> Any suggestions?
>
> Ali
>
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reduceByWindow-reduceFunc-invReduceFunc-windowDuration-slideDuration-tp11591p11612.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


Re: Spark Streaming + reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration

2014-08-06 Thread salemi
Hi,

The reason I am looking to do it differently is because the latency and
batch processing times are bad about 40 sec. I took the times from the
Streaming UI.

As you suggested I tried the window as below and still the times are bad.
 val dStream = KafkaUtils.createStream(ssc, zkQuorum, group, topicpMap)
  val eventData = dStream.map(_._2).map(_.split(",")).map(data =>
Data(data(0), data(1), data(2), data(3), data(4))).window(Minutes(15),
Seconds(3))
  
  val result =  eventData.transform((rdd, time) => {
rdd.registerAsTable("data")
sql("SELECT count(state) FROM data WHERE state='Active'")
  })
  result.print()
  
Any suggestions?

Ali







--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reduceByWindow-reduceFunc-invReduceFunc-windowDuration-slideDuration-tp11591p11612.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming + reduceByWindow(reduceFunc, invReduceFunc, windowDuration, slideDuration

2014-08-06 Thread Tathagata Das
Why isnt a simple window function sufficient?

eventData.window(Minutes(15), Seconds(3)) will keep generating RDDs every 3
second, each containing last 15 minutes of data.

TD


On Wed, Aug 6, 2014 at 3:43 PM, salemi  wrote:

> Hi,
> I have a  DStream called eventData and it contains set of  Data objects
> defined as followed:
>
> case class Data(startDate: Long, endDate: Long, className: String, id:
> String, state: String)
>
> How would the reducer and inverse reducer functions look like if I would
> like to add the data for current 3 second and filter out the last 3 second
> data?
>
> eventData.reduceByWindow(/reduceFunc/, /invReduceFunc/, Minutes(15),
> Seconds(3))
>
> Thanks
> Ali
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-reduceByWindow-reduceFunc-invReduceFunc-windowDuration-slideDuration-tp11591.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>