Is possible to invoke updateStateByKey twice on the same RDD

2014-10-09 Thread qihong
I need to implement following logic in a spark streaming app:

for the incoming dStream, do some transformation, and invoke
updateStateByKey to 
update state object for each key (mark data entries that are updated as
dirty for next
step), then let state objects produce event(s) based (based on dirty flag). 
After that, need to update the state object to clean up the dirty flags so
it won't produce
the same event(s) in next batch. The pseudo code is followings: 

1.  val dstream0 = ... 
2.  val state = dstream0.updateStateByKey[...](...)
3.  val events = state.flatMapValues(stateobj =
stateobj.produceEvents(...))
4.  ?? how to update state again ??

Is it possible to do this? If so, how? Or is there alternative way to
archive the same thing?

I tried to update state object in stateobj.produceEvents(...) method, but
the state object 
at line#2 in next batch doesn't contain the change made in line#3 (in
previous batch).

Any suggestion?

Thanks!




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Is-possible-to-invoke-updateStateByKey-twice-on-the-same-RDD-tp16107.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: Change RDDs using map()

2014-09-17 Thread qihong
if you want the result as RDD of (key, 1)

  new_rdd = rdd.filter(x = x._2 == 1)

if you want result as RDD of keys (since you know the values are 1), then

  new_rdd = rdd.filter(x = x._2 == 1).map(x = x._1)

x._1 and x._2 are the way of scala to access the key and value from
key/value pair.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Change-RDDs-using-map-tp14436p14481.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Dealing with Time Series Data

2014-09-17 Thread qihong
what are you trying to do? generate time series from your data in HDFS, or
doing
some transformation and/or aggregation from your time series data in HDFS?



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Dealing-with-Time-Series-Data-tp14275p14482.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Spark Streaming - batchDuration for streaming

2014-09-17 Thread qihong
Here's official spark document about batch size/interval:
http://spark.apache.org/docs/latest/streaming-programming-guide.html#setting-the-right-batch-size

spark is batch oriented processing. As you mentioned, the streaming
is continuous flow, and core spark can not handle it. 

Spark streaming bridges the gap between the continuous flow and 
batch oriented processing. It generates an RDD from continuous
data flow/stream every batch interval, then the spark can process
them as normal RDDs.
 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-batchDuration-for-streaming-tp14469p14487.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to initialize StateDStream

2014-09-13 Thread qihong
there's no need to initialize StateDStream. Take a look at example
StatefulNetworkWordCount.scala, it's part of spark source code.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-initialize-StateDStream-tp14113p14146.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to initialize StateDStream

2014-09-13 Thread qihong
I'm not sure what you mean by previous run. Is it previous batch? or
previous run of spark-submit?

If it's previous batch (spark streaming creates a batch every batch
interval), then there's nothing to do.

If it's previous run of spark-submit (assuming you are able to save the
result somewhere), then I can think of two possible ways to do it:

1. read saved result as RDD (just do this once), and join the RDD with each
RDD of the stateStream. 

2. add extra logic to updateFunction: when the previous state is None (one
of two Option type values), you get save state for the given key from saved
result somehow, then your original logic to create new state object based on
Seq[V] and previous state. note that you need use this version of
updateFunction: updateFunc: (Iterator[(K, Seq[V], Option[S])]) =
Iterator[(K, S)], which make key available to the update function.





--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-initialize-StateDStream-tp14113p14176.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: compiling spark source code

2014-09-12 Thread qihong
follow the instruction here:
http://spark.apache.org/docs/latest/building-with-maven.html



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/compiling-spark-source-code-tp13980p14144.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



RE: how to setup steady state stream partitions

2014-09-10 Thread qihong
Thanks for your response! I found that too, and it does the trick! Here's
refined code:

val inputDStream = ... 
val keyedDStream = inputDStream.map(...)  // use sensorId as key 
val partitionedDStream = keyedDstream.transform(rdd = rdd.partitionBy(new
MyPartitioner(...))) 
val stateDStream = partitionedDStream.updateStateByKey[...](udpateFunction,
new MyPartitioner(...)) 



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-setup-steady-state-stream-partitions-tp13850p13931.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: How to set java.library.path in a spark cluster

2014-09-09 Thread qihong
Add something like following to spark-env.sh
export LD_LIBRARY_PATH=path of libmosekjava7_0.so:$LD_LIBRARY_PATH

(and remove all 5 exports you listed). Then restart all worker nodes, and
try
again.

Good luck!



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-java-library-path-in-a-spark-cluster-tp13854p13857.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: how to choose right DStream batch interval

2014-09-05 Thread qihong
repost since original msg was marked with This post has NOT been accepted by
the mailing list yet.

I have some questions regarding DStream batch interval: 

1. if it only take 0.5 second to process the batch 99% of time, but 1% of
batches need 5 seconds to process (due to some random factor or failures),
then what's the right batch interval? 5 seconds (the worst case)? 

2. What will happen to DStream processing if 1 batch took longer than batch
interval? Can Spark recover from that? 

Thanks,
Qihong



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/how-to-choose-right-DStream-batch-interval-tp13578p13579.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Running spark-shell (or queries) over the network (not from master)

2014-09-05 Thread qihong
the command should be spark-shell --master spark://master ip on EC2:7077.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-spark-shell-or-queries-over-the-network-not-from-master-tp13543p13593.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Running spark-shell (or queries) over the network (not from master)

2014-09-05 Thread qihong
Since you are using your home computer, so it's probably not reachable by EC2
from internet.

You can try to set spark.driver.host to your WAN ip, spark.driver.port
to a fixed port in SparkConf, and open that port in your home network (port
forwarding to the computer you are using). see if that helps.



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Running-spark-shell-or-queries-over-the-network-not-from-master-tp13543p13595.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org