Is possible to invoke updateStateByKey twice on the same RDD
I need to implement following logic in a spark streaming app: for the incoming dStream, do some transformation, and invoke updateStateByKey to update state object for each key (mark data entries that are updated as dirty for next step), then let state objects produce event(s) based (based on dirty flag). After that, need to update the state object to clean up the dirty flags so it won't produce the same event(s) in next batch. The pseudo code is followings: 1. val dstream0 = ... 2. val state = dstream0.updateStateByKey[...](...) 3. val events = state.flatMapValues(stateobj = stateobj.produceEvents(...)) 4. ?? how to update state again ?? Is it possible to do this? If so, how? Or is there alternative way to archive the same thing? I tried to update state object in stateobj.produceEvents(...) method, but the state object at line#2 in next batch doesn't contain the change made in line#3 (in previous batch). Any suggestion? Thanks! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Is-possible-to-invoke-updateStateByKey-twice-on-the-same-RDD-tp16107.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: Change RDDs using map()
if you want the result as RDD of (key, 1) new_rdd = rdd.filter(x = x._2 == 1) if you want result as RDD of keys (since you know the values are 1), then new_rdd = rdd.filter(x = x._2 == 1).map(x = x._1) x._1 and x._2 are the way of scala to access the key and value from key/value pair. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Change-RDDs-using-map-tp14436p14481.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Dealing with Time Series Data
what are you trying to do? generate time series from your data in HDFS, or doing some transformation and/or aggregation from your time series data in HDFS? -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Dealing-with-Time-Series-Data-tp14275p14482.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming - batchDuration for streaming
Here's official spark document about batch size/interval: http://spark.apache.org/docs/latest/streaming-programming-guide.html#setting-the-right-batch-size spark is batch oriented processing. As you mentioned, the streaming is continuous flow, and core spark can not handle it. Spark streaming bridges the gap between the continuous flow and batch oriented processing. It generates an RDD from continuous data flow/stream every batch interval, then the spark can process them as normal RDDs. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-batchDuration-for-streaming-tp14469p14487.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to initialize StateDStream
there's no need to initialize StateDStream. Take a look at example StatefulNetworkWordCount.scala, it's part of spark source code. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-initialize-StateDStream-tp14113p14146.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to initialize StateDStream
I'm not sure what you mean by previous run. Is it previous batch? or previous run of spark-submit? If it's previous batch (spark streaming creates a batch every batch interval), then there's nothing to do. If it's previous run of spark-submit (assuming you are able to save the result somewhere), then I can think of two possible ways to do it: 1. read saved result as RDD (just do this once), and join the RDD with each RDD of the stateStream. 2. add extra logic to updateFunction: when the previous state is None (one of two Option type values), you get save state for the given key from saved result somehow, then your original logic to create new state object based on Seq[V] and previous state. note that you need use this version of updateFunction: updateFunc: (Iterator[(K, Seq[V], Option[S])]) = Iterator[(K, S)], which make key available to the update function. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-initialize-StateDStream-tp14113p14176.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: compiling spark source code
follow the instruction here: http://spark.apache.org/docs/latest/building-with-maven.html -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/compiling-spark-source-code-tp13980p14144.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: how to setup steady state stream partitions
Thanks for your response! I found that too, and it does the trick! Here's refined code: val inputDStream = ... val keyedDStream = inputDStream.map(...) // use sensorId as key val partitionedDStream = keyedDstream.transform(rdd = rdd.partitionBy(new MyPartitioner(...))) val stateDStream = partitionedDStream.updateStateByKey[...](udpateFunction, new MyPartitioner(...)) -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-setup-steady-state-stream-partitions-tp13850p13931.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: How to set java.library.path in a spark cluster
Add something like following to spark-env.sh export LD_LIBRARY_PATH=path of libmosekjava7_0.so:$LD_LIBRARY_PATH (and remove all 5 exports you listed). Then restart all worker nodes, and try again. Good luck! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-set-java-library-path-in-a-spark-cluster-tp13854p13857.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: how to choose right DStream batch interval
repost since original msg was marked with This post has NOT been accepted by the mailing list yet. I have some questions regarding DStream batch interval: 1. if it only take 0.5 second to process the batch 99% of time, but 1% of batches need 5 seconds to process (due to some random factor or failures), then what's the right batch interval? 5 seconds (the worst case)? 2. What will happen to DStream processing if 1 batch took longer than batch interval? Can Spark recover from that? Thanks, Qihong -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/how-to-choose-right-DStream-batch-interval-tp13578p13579.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running spark-shell (or queries) over the network (not from master)
the command should be spark-shell --master spark://master ip on EC2:7077. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-spark-shell-or-queries-over-the-network-not-from-master-tp13543p13593.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Running spark-shell (or queries) over the network (not from master)
Since you are using your home computer, so it's probably not reachable by EC2 from internet. You can try to set spark.driver.host to your WAN ip, spark.driver.port to a fixed port in SparkConf, and open that port in your home network (port forwarding to the computer you are using). see if that helps. -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Running-spark-shell-or-queries-over-the-network-not-from-master-tp13543p13595.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org