updateStateByKey performance / API
Hi all, We are having a few issues with the performance of updateStateByKey operation in Spark Streaming (1.2.1 at the moment) and any advice would be greatly appreciated. Specifically, on each tick of the system (which is set at 10 secs) we need to update a state tuple where the key is the user_id and value an object with some state about the user. The problem is that using Kryo serialization for 5M users, this gets really slow to the point that we have to increase the period to more than 10 seconds so as not to fall behind. The input for the streaming job is a Kafka stream which is consists of key value pairs of user_ids with some sort of action codes, we join this to our checkpointed state key and update the state. I understand that the reason for iterating over the whole state set is for evicting items or updating state for everyone for time-depended computations but this does not apply on our situation and it hurts performance really bad. Is there a possibility of implementing in the future and extra call in the API for updating only a specific subset of keys? p.s. i will try asap to setting the dstream as non-serialized but then i am worried about GC and checkpointing performance
Re: Idempotent count
Hi Yes spark streaming is capable of stateful stream processing. With or without state is a way of classifying state. Checkpoints hold metadata and Data. Thanks On Wed, Mar 18, 2015 at 4:00 AM, Binh Nguyen Van binhn...@gmail.com wrote: Hi all, I am new to Spark so please forgive me if my questions is stupid. I am trying to use Spark-Streaming in an application that read data from a queue (Kafka) and do some aggregation (sum, count..) and then persist result to an external storage system (MySQL, VoltDB...) From my understanding of Spark-Streaming, I can have two ways of doing aggregation: - Stateless: I don't have to keep state and just apply new delta values to the external system. From my understanding, doing in this way I may end up with over counting when there is failure and replay. - Statefull: Use checkpoint to keep state and blindly save new state to external system. Doing in this way I have correct aggregation result but I have to keep data in two places (state and external system) My questions are: - Is my understanding of Stateless and Statefull aggregation correct? If not please correct me! - For the Statefull aggregation, What does Spark-Streaming keep when it saves checkpoint? Please kindly help! Thanks -Binh -- [image: Sigmoid Analytics] http://htmlsig.com/www.sigmoidanalytics.com *Arush Kharbanda* || Technical Teamlead ar...@sigmoidanalytics.com || www.sigmoidanalytics.com
updateStateByKey performance API
Hi all, We are having a few issues with the performance of updateStateByKey operation in Spark Streaming (1.2.1 at the moment) and any advice would be greatly appreciated. Specifically, on each tick of the system (which is set at 10 secs) we need to update a state tuple where the key is the user_id and value an object with some state about the user. The problem is that using Kryo serialization for 5M users, this gets really slow to the point that we have to increase the period to more than 10 seconds so as not to fall behind. The input for the streaming job is a Kafka stream which is consists of key value pairs of user_ids with some sort of action codes, we join this to our checkpointed state key and update the state. I understand that the reason for iterating over the whole state set is for evicting items or updating state for everyone for time-depended computations but this does not apply on our situation and it hurts performance really bad. Is there a possibility of implementing in the future and extra call in the API for updating only a specific subset of keys? p.s. i will try asap to setting the dstream as non-serialized but then i am worried about GC and checkpointing performance -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/updateStateByKey-performance-API-tp22113.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
RE: [spark-streaming] can shuffle write to disk be disabled?
Would you please check your driver log or streaming web UI to see each job's latency, including processing latency and total latency. Seems from your code, sliding window is just 3 seconds, so you will process each 60 second's data in 3 seconds, if processing latency is larger than the sliding window, so maybe you computation power cannot reach to the qps you wanted. I think you need to identify the bottleneck at first, and then trying to tune your code, balance the data, add more computation resources. Thanks Jerry From: Darren Hoo [mailto:darren@gmail.com] Sent: Wednesday, March 18, 2015 1:39 PM To: user@spark.apache.org Subject: [spark-streaming] can shuffle write to disk be disabled? I use spark-streaming reading messages from a Kafka, the producer creates messages about 1500 per second def hash(x: String): Int = { MurmurHash3.stringHash(x) } val stream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_ONLY_SER).map(_._2) val clickstream = stream.map(log = { //parse log ... (hash(log.url), HashSet(hash(log.userid))) }).window(Seconds(60), Seconds(3)) val upv = clickstream.transform( rdd = rdd.reduceByKey(_ ++ _ ).map{ case(url, visits) = { val uv = visits.size (uv, url) }}) upv.foreach(rdd = println(new Date() + \n---\n + rdd.top(20).mkString(\n) + \n)) it is quite quick upon startup, but after running for a few minutes, it goes slower and slower and the latency can be minutes. I found a lot of shuffle writes at /tmp/spark- in several gigabytes. with 1500 qps of message and window size of 60 seconds, I think it should be done within memory without writing to disk at all I've set executor-memory to 8G, So there is plenty of memory. $SPARK_HOME/bin/spark-submit \ --class SimpleApp \ --master spark://localhost:7077 \ --driver-memory 16G \ --executor-memory 8G \ target/scala-2.10/simple-assembly-1.0.jar I also tries these settings, but it still spill to disk. spark.master spark://localhost:7077 #spark.driver.memory 4g #spark.shuffle.file.buffer.kb 4096 #spark.shuffle.memoryFraction 0.8 #spark.storage.unrollFraction 0.8 #spark.storage.unrollMemoryThreshold 1073741824 spark.io.compression.codec lz4 spark.shuffle.spill false spark.serializer org.apache.spark.serializer.KryoSerializer where am I wrong?
Re: HIVE SparkSQL
Hallo, Depending non your needs, search technology, such as SolrCloud or ElasticSearch makes more sense. If you go for the Cassandra solution you can use the lucene text indexer... I am not sure if hive or sparksql are very suitable for text. However, if you do not need text search then feel free to go for them. What kind of statistics / aggregates do.you want to get out of of your logs? Best regards Le 18 mars 2015 04:29, 宫勐 shadowinl...@gmail.com a écrit : Hi: I need to migrate a Log Analysis System from mysql + some C++ real time computer framwork to Hadoop ecosystem. When I want to build a data warehouse. don't know which one is the right choice. Cassandra? HIVE? Or just SparkSQL ? There is few benchmark for these systems. My scenario as below: Every 5 seconds, flume will translate a log file from IDC. The log file is pre-format to adapt Mysql Load event。 There is many IDCs,and will close down OR reconnect to the flume random. Every online IDC must receive analyse of their LOG every 5mins Any Suggestion? Thanks Yours Meng