Re: Sessionization using updateStateByKey
I would just like to add that we do the very same/similar thing at Webtrends, updateStateByKey has been a life-saver for our sessionization use-cases. Cheers, Sean On Jul 15, 2015, at 11:20 AM, Silvio Fiorito silvio.fior...@granturing.commailto:silvio.fior...@granturing.com wrote: Hi Cody, I’ve had success using updateStateByKey for real-time sessionization by aging off timed-out sessions (returning None in the update function). This was on a large commercial website with millions of hits per day. This was over a year ago so I don’t have access to the stats any longer for length of sessions unfortunately, but I seem to remember they were around 10-30 minutes long. Even with peaks in volume, Spark managed to keep up very well. Thanks, Silvio From: Cody Koeninger Date: Wednesday, July 15, 2015 at 5:38 PM To: algermissen1971 Cc: Tathagata Das, swetha, user Subject: Re: Sessionization using updateStateByKey An in-memory hash key data structure of some kind so that you're close to linear on the number of items in a batch, not the number of outstanding keys. That's more complex, because you have to deal with expiration for keys that never get hit, and for unusually long sessions you have to either drop them or hit durable storage. Maybe someone has a better idea, I'd like to hear it. On Wed, Jul 15, 2015 at 8:54 AM, algermissen1971 algermissen1...@icloud.commailto:algermissen1...@icloud.com wrote: Hi Cody, oh ... I though that was one of *the* use cases for it. Do you have a suggestion / best practice how to achieve the same thing with better scaling characteristics? Jan On 15 Jul 2015, at 15:33, Cody Koeninger c...@koeninger.orgmailto:c...@koeninger.org wrote: I personally would try to avoid updateStateByKey for sessionization when you have long sessions / a lot of keys, because it's linear on the number of keys. On Tue, Jul 14, 2015 at 6:25 PM, Tathagata Das t...@databricks.commailto:t...@databricks.com wrote: [Apologies for repost, for those who have seen this response already in the dev mailing list] 1. When you set ssc.checkpoint(checkpointDir), the spark streaming periodically saves the state RDD (which is a snapshot of all the state data) to HDFS using RDD checkpointing. In fact, a streaming app with updateStateByKey will not start until you set checkpoint directory. 2. The updateStateByKey performance is sort of independent of the what is the source that is being use - receiver based or direct Kafka. The absolutely performance obvious depends on a LOT of variables, size of the cluster, parallelization, etc. The key things is that you must ensure sufficient parallelization at every stage - receiving, shuffles (updateStateByKey included), and output. Some more discussion in my talk - https://www.youtube.com/watch?v=d5UJonrruHk On Tue, Jul 14, 2015 at 4:13 PM, swetha swethakasire...@gmail.commailto:swethakasire...@gmail.com wrote: Hi, I have a question regarding sessionization using updateStateByKey. If near real time state needs to be maintained in a Streaming application, what happens when the number of RDDs to maintain the state becomes very large? Does it automatically get saved to HDFS and reload when needed or do I have to use any code like ssc.checkpoint(checkpointDir)? Also, how is the performance if I use both DStream Checkpointing for maintaining the state and use Kafka Direct approach for exactly once semantics? Thanks, Swetha -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Sessionization-using-updateStateByKey-tp23838.html Sent from the Apache Spark User List mailing list archive at Nabble.comhttp://Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Re: Counters in Spark
.map is just a transformation, so no work will actually be performed until something takes action against it. Try adding a .count(), like so: inputRDD.map { x = { counter += 1 } }.count() In case it is helpful, here are the docs on what exactly the transformations and actions are: http://spark.apache.org/docs/1.2.0/programming-guide.html#transformations http://spark.apache.org/docs/1.2.0/programming-guide.html#actions Cheers, Sean On Feb 13, 2015, at 9:50 AM, nitinkak001 nitinkak...@gmail.commailto:nitinkak...@gmail.com wrote: I am trying to implement counters in Spark and I guess Accumulators are the way to do it. My motive is to update a counter in map function and access/reset it in the driver code. However the /println/ statement at the end still yields value 0(It should 9). Am I doing something wrong? def main(args : Array[String]){ val conf = new SparkConf().setAppName(SortedNeighbourhoodMatching) val sc = new SparkContext(conf) var counter = sc.accumulable(0, Counter) var inputFilePath = args(0) val inputRDD = sc.textFile(inputFilePath) inputRDD.map { x = { counter += 1 } } println(counter.value) } -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Counters-in-Spark-tp21646.html Sent from the Apache Spark User List mailing list archive at Nabble.comhttp://Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.orgmailto:user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.orgmailto:user-h...@spark.apache.org
Re: Spark (SQL) as OLAP engine
We have gone down a similar path at Webtrends, Spark has worked amazingly well for us in this use case. Our solution goes from REST, directly into spark, and back out to the UI instantly. Here is the resulting product in case you are curious (and please pardon the self promotion): https://www.webtrends.com/support-training/training/explore-onboarding/ How can I automatically cache the data once a day... If you are not memory-bounded you could easily cache the daily results for some span of time and re-union them together each time you add new data. You would service queries off the unioned RDD. ... and make them available on a web service From the unioned RDD you could always step into spark SQL at that point. Or you could use a simple scatter/gather pattern for this. As with all things Spark, this is super easy to do: just use aggregate()()! Cheers, Sean On Feb 3, 2015, at 9:59 AM, Adamantios Corais adamantios.cor...@gmail.commailto:adamantios.cor...@gmail.com wrote: Hi, After some research I have decided that Spark (SQL) would be ideal for building an OLAP engine. My goal is to push aggregated data (to Cassandra or other low-latency data storage) and then be able to project the results on a web page (web service). New data will be added (aggregated) once a day, only. On the other hand, the web service must be able to run some fixed(?) queries (either on Spark or Spark SQL) at anytime and plot the results with D3.js. Note that I can already achieve similar speeds while in REPL mode by caching the data. Therefore, I believe that my problem must be re-phrased as follows: How can I automatically cache the data once a day and make them available on a web service that is capable of running any Spark or Spark (SQL) statement in order to plot the results with D3.js? Note that I have already some experience in Spark (+Spark SQL) as well as D3.js but not at all with OLAP engines (at least in their traditional form). Any ideas or suggestions? // Adamantios
Re: Large dataset, reduceByKey - java heap space error
Hi Kane- http://spark.apache.org/docs/latest/tuning.html has excellent information that may be helpful. In particular increasing the number of tasks may help, as well as confirming that you don’t have more data than you're expecting landing on a key. Also, if you are using spark 1.2.0, setting spark.shuffle.manager=sort was a huge help for many of our shuffle heavy workloads (this is the default in 1.2.0 now) Cheers, Sean On Jan 22, 2015, at 3:15 PM, Kane Kim kane.ist...@gmail.commailto:kane.ist...@gmail.com wrote: I'm trying to process a large dataset, mapping/filtering works ok, but as long as I try to reduceByKey, I get out of memory errors: http://pastebin.com/70M5d0Bn Any ideas how I can fix that? Thanks. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Kafka version dependency in Spark 1.2
Can the 0.8.1.1 client still talk to 0.8.0 versions of Kafka Yes it can. 0.8.1 is fully compatible with 0.8. It is buried on this page: http://kafka.apache.org/documentation.html In addition to the pom version bump SPARK-2492 would bring the kafka streaming receiver (which was originally based on kafka 0.7) in line with kafka 0.8: https://issues.apache.org/jira/browse/SPARK-2492 https://github.com/apache/spark/pull/1420 I will soon test that PR on a spark+yarn cluster Cheers, Sean On Nov 10, 2014, at 11:58 AM, Matei Zaharia matei.zaha...@gmail.commailto:matei.zaha...@gmail.com wrote: Just curious, what are the pros and cons of this? Can the 0.8.1.1 client still talk to 0.8.0 versions of Kafka, or do you need it to match your Kafka version exactly? Matei On Nov 10, 2014, at 9:48 AM, Bhaskar Dutta bhas...@gmail.commailto:bhas...@gmail.com wrote: Hi, Is there any plan to bump the Kafka version dependency in Spark 1.2 from 0.8.0 to 0.8.1.1? Current dependency is still on Kafka 0.8.0 https://github.com/apache/spark/blob/branch-1.2/external/kafka/pom.xml Thanks Bhaskie
Re: foreachPartition and task status
Are you using spark streaming? On Oct 14, 2014, at 10:35 AM, Salman Haq sal...@revmetrix.com wrote: Hi, In my application, I am successfully using foreachPartition to write large amounts of data into a Cassandra database. What is the recommended practice if the application wants to know that the tasks have completed for all partitions? Thanks, Salman - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming KafkaUtils Issue
Would you mind sharing the code leading to your createStream? Are you also setting group.id? Thanks, Sean On Oct 10, 2014, at 4:31 PM, Abraham Jacob abe.jac...@gmail.com wrote: Hi Folks, I am seeing some strange behavior when using the Spark Kafka connector in Spark streaming. I have a Kafka topic which has 8 partitions. I have a kafka producer that pumps some messages into this topic. On the consumer side I have a spark streaming application that that has 8 executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka group id connected to the 8 partitions I have for the topic. Also the kafka consumer property auto.offset.reset is set to smallest. Now here is the sequence of steps - (1) I Start the the spark streaming app. (2) Start the producer. As this point I see the messages that are being pumped from the producer in Spark Streaming. Then I - (1) Stopped the producer (2) Wait for all the message to be consumed. (2) Stopped the spark streaming app. Now when I restart the spark streaming app (note - the producer is still down and no messages are being pumped into the topic) - I observe the following - (1) Spark Streaming starts reading from each partition right from the beginning. This is not what I was expecting. I was expecting the consumers started by spark streaming to start from where it left off Is my assumption not correct that the consumers (the kafka/spark connector) to start reading from the topic where it last left off...? Has anyone else seen this behavior? Is there a way to make it such that it starts from where it left off? Regards, - Abraham - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Spark Streaming KafkaUtils Issue
How long do you let the consumers run for? Is it less than 60 seconds by chance? auto.commit.interval.ms defaults to 6 (60 seconds). If so that may explain why you are seeing that behavior. Cheers, Sean On Oct 10, 2014, at 4:47 PM, Abraham Jacob abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote: Sure... I do set the group.idhttp://group.id/ for all the consumers to be the same. Here is the code --- SparkConf sparkConf = new SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount); sparkConf.set(spark.shuffle.manager, SORT); sparkConf.set(spark.streaming.unpersist, true); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000)); MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, zookeeper); kafkaConf.put(group.idhttp://group.id/, consumerGrp); kafkaConf.put(auto.offset.reset, smallest); kafkaConf.put(zookeeper.conection.timeout.mshttp://zookeeper.conection.timeout.ms/, 1000); kafkaConf.put(rebalance.max.retries, 4); kafkaConf.put(rebalance.backoff.mshttp://rebalance.backoff.ms/, 3000); MapString, Integer topicMap = new HashMapString, Integer(); topicMap.put(topic, 1); ListJavaPairDStreambyte[], String kafkaStreams = new ArrayListJavaPairDStreambyte[], String(); for(int i = 0; i numPartitions; i++) { kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, DefaultDecoder.class, PayloadDeSerializer.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunctionTuple2byte[],String, byte[], String() { private static final long serialVersionUID = -1936810126415608167L; public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws Exception { return tuple2; } } ) ); } JavaPairDStreambyte[], String unifiedStream; if (kafkaStreams.size() 1) { unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); } else { unifiedStream = kafkaStreams.get(0); } unifiedStream.print(); jssc.start(); jssc.awaitTermination(); -abe On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara sean.mcnam...@webtrends.commailto:sean.mcnam...@webtrends.com wrote: Would you mind sharing the code leading to your createStream? Are you also setting group.idhttp://group.id/? Thanks, Sean On Oct 10, 2014, at 4:31 PM, Abraham Jacob abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote: Hi Folks, I am seeing some strange behavior when using the Spark Kafka connector in Spark streaming. I have a Kafka topic which has 8 partitions. I have a kafka producer that pumps some messages into this topic. On the consumer side I have a spark streaming application that that has 8 executors on 8 worker nodes and 8 ReceiverInputDStream with the same kafka group id connected to the 8 partitions I have for the topic. Also the kafka consumer property auto.offset.reset is set to smallest. Now here is the sequence of steps - (1) I Start the the spark streaming app. (2) Start the producer. As this point I see the messages that are being pumped from the producer in Spark Streaming. Then I - (1) Stopped the producer (2) Wait for all the message to be consumed. (2) Stopped the spark streaming app. Now when I restart the spark streaming app (note - the producer is still down and no messages are being pumped into the topic) - I observe the following - (1) Spark Streaming starts reading from each partition right from the beginning. This is not what I was expecting. I was expecting the consumers started by spark streaming to start from where it left off Is my assumption not correct that the consumers (the kafka/spark connector) to start reading from the topic where it last left off...? Has anyone else seen this behavior? Is there a way to make it such that it starts from where it left off? Regards, - Abraham -- ~
Re: Spark Streaming KafkaUtils Issue
This jira and comment sums up the issue: https://issues.apache.org/jira/browse/SPARK-2492?focusedCommentId=14069708page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-14069708 Basically the offset param was renamed and had slightly different semantics between kafka 0.7 than 0.8. Also it was useful because earlier versions of the spark streaming receiver could be overwhelmed when having a streaming job down for a period of time. I think this PR quite nicely addresses the issue: https://github.com/apache/spark/pull/1420 Best, Sean On Oct 10, 2014, at 6:48 PM, Abraham Jacob abe.jac...@gmail.com wrote: Thanks Jerry, So, from what I can understand from the code, if I leave out auto.offset.reset, it should theoretically read from the last commit point... Correct? -abe On Fri, Oct 10, 2014 at 5:40 PM, Shao, Saisai saisai.s...@intel.commailto:saisai.s...@intel.com wrote: Hi Abraham, You are correct, the “auto.offset.reset“ behavior in KafkaReceiver is different from original Kafka’s semantics, if you set this configure, KafkaReceiver will clean the related immediately, but for Kafka this configuration is just a hint which will be effective only when offset is out-of-range. So you will always read data from the beginning as you set to “smallest”, otherwise if you set to “largest”, you will always get data from the end immediately. There’s a JIRA and PR to follow this, but still not merged to the master, you can check to see it (https://issues.apache.org/jira/browse/SPARK-2492). Thanks Jerry From: Abraham Jacob [mailto:abe.jac...@gmail.commailto:abe.jac...@gmail.com] Sent: Saturday, October 11, 2014 6:57 AM To: Sean McNamara Cc: user@spark.apache.orgmailto:user@spark.apache.org Subject: Re: Spark Streaming KafkaUtils Issue Probably this is the issue - http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/ •Spark’s usage of the Kafka consumer parameter auto.offset.resethttp://kafka.apache.org/documentation.html#consumerconfigs is different from Kafka’s semantics. In Kafka, the behavior of setting auto.offset.reset to “smallest” is that the consumer will automatically reset the offset to the smallest offset when a) there is no existing offset stored in ZooKeeper or b) there is an existing offset but it is out of range. Spark however will always remove existing offsets and then start all the way from zero again. This means whenever you restart your application with auto.offset.reset = smallest, your application will completely re-process all available Kafka data. Doh! See this discussionhttp://apache-spark-user-list.1001560.n3.nabble.com/spark-streaming-and-the-spark-shell-tp3347p3387.htmland that discussionhttp://markmail.org/message/257a5l3oqyftsjxj. Hmm interesting... Wondering what happens if I set it as largest...? On Fri, Oct 10, 2014 at 3:47 PM, Abraham Jacob abe.jac...@gmail.commailto:abe.jac...@gmail.com wrote: Sure... I do set the group.idhttp://group.id/ for all the consumers to be the same. Here is the code --- SparkConf sparkConf = new SparkConf().setMaster(yarn-cluster).setAppName(Streaming WordCount); sparkConf.set(spark.shuffle.manager, SORT); sparkConf.set(spark.streaming.unpersist, true); JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(1000)); MapString, String kafkaConf = new HashMapString, String(); kafkaConf.put(zookeeper.connect, zookeeper); kafkaConf.put(group.idhttp://group.id/, consumerGrp); kafkaConf.put(auto.offset.reset, smallest); kafkaConf.put(zookeeper.conection.timeout.mshttp://zookeeper.conection.timeout.ms/, 1000); kafkaConf.put(rebalance.max.retries, 4); kafkaConf.put(rebalance.backoff.mshttp://rebalance.backoff.ms/, 3000); MapString, Integer topicMap = new HashMapString, Integer(); topicMap.put(topic, 1); ListJavaPairDStreambyte[], String kafkaStreams = new ArrayListJavaPairDStreambyte[], String(); for(int i = 0; i numPartitions; i++) { kafkaStreams.add(KafkaUtils.createStream(jssc, byte[].class, String.class, DefaultDecoder.class, PayloadDeSerializer.class, kafkaConf, topicMap, StorageLevel.MEMORY_ONLY_SER()).mapToPair(new PairFunctionTuple2byte[],String, byte[], String() { private static final long serialVersionUID = -1936810126415608167L; public Tuple2byte[], String call(Tuple2byte[], String tuple2) throws Exception { return tuple2; } } ) ); } JavaPairDStreambyte[], String unifiedStream; if (kafkaStreams.size() 1) { unifiedStream = jssc.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size())); } else { unifiedStream = kafkaStreams.get(0); } unifiedStream.print(); jssc.start(); jssc.awaitTermination(); -abe On Fri, Oct 10, 2014 at 3:37 PM, Sean McNamara sean.mcnam...@webtrends.commailto:sean.mcnam...@webtrends.com wrote: Would you mind sharing the code leading to your createStream? Are you also setting group.idhttp://group.id/? Thanks, Sean On Oct 10, 2014
Re: balancing RDDs
Yep exactly! I’m not sure how complicated it would be to pull off. If someone wouldn’t mind helping to get me pointed in the right direction I would be happy to look into and contribute this functionality. I imagine this would be implemented in the scheduler codebase and there would be some sort of rebalance configuration property to enable it possibly? Does anyone else have any thoughts on this? Cheers, Sean On Jun 24, 2014, at 4:41 PM, Mayur Rustagi mayur.rust...@gmail.com wrote: This would be really useful. Especially for Shark where shift of partitioning effects all subsequent queries unless task scheduling time beats spark.locality.wait. Can cause overall low performance for all subsequent tasks. Mayur Rustagi Ph: +1 (760) 203 3257 http://www.sigmoidanalytics.com @mayur_rustagi https://twitter.com/mayur_rustagi On Tue, Jun 24, 2014 at 4:10 AM, Sean McNamara sean.mcnam...@webtrends.com wrote: We have a use case where we’d like something to execute once on each node and I thought it would be good to ask here. Currently we achieve this by setting the parallelism to the number of nodes and use a mod partitioner: val balancedRdd = sc.parallelize( (0 until Settings.parallelism) .map(id = id - Settings.settings) ).partitionBy(new ModPartitioner(Settings.parallelism)) .cache() This works great except in two instances where it can become unbalanced: 1. if a worker is restarted or dies, the partition will move to a different node (one of the nodes will run two tasks). When the worker rejoins, is there a way to have a partition move back over to the newly restarted worker so that it’s balanced again? 2. drivers need to be started in a staggered fashion, otherwise one driver can launch two tasks on one set of workers, and the other driver will do the same with the other set. Are there any scheduler/config semantics so that each driver will take one (and only one) core from *each* node? Thanks Sean
balancing RDDs
We have a use case where we’d like something to execute once on each node and I thought it would be good to ask here. Currently we achieve this by setting the parallelism to the number of nodes and use a mod partitioner: val balancedRdd = sc.parallelize( (0 until Settings.parallelism) .map(id = id - Settings.settings) ).partitionBy(new ModPartitioner(Settings.parallelism)) .cache() This works great except in two instances where it can become unbalanced: 1. if a worker is restarted or dies, the partition will move to a different node (one of the nodes will run two tasks). When the worker rejoins, is there a way to have a partition move back over to the newly restarted worker so that it’s balanced again? 2. drivers need to be started in a staggered fashion, otherwise one driver can launch two tasks on one set of workers, and the other driver will do the same with the other set. Are there any scheduler/config semantics so that each driver will take one (and only one) core from *each* node? Thanks Sean