Would you please check your driver log or streaming web UI to see each job's latency, including processing latency and total latency.
Seems from your code, sliding window is just 3 seconds, so you will process each 60 second's data in 3 seconds, if processing latency is larger than the sliding window, so maybe you computation power cannot reach to the qps you wanted. I think you need to identify the bottleneck at first, and then trying to tune your code, balance the data, add more computation resources. Thanks Jerry From: Darren Hoo [mailto:darren....@gmail.com] Sent: Wednesday, March 18, 2015 1:39 PM To: user@spark.apache.org Subject: [spark-streaming] can shuffle write to disk be disabled? I use spark-streaming reading messages from a Kafka, the producer creates messages about 1500 per second def hash(x: String): Int = { MurmurHash3.stringHash(x) } val stream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, StorageLevel.MEMORY_ONLY_SER).map(_._2) val clickstream = stream.map(log => { //parse log ... (hash(log.url), HashSet(hash(log.userid))) }).window(Seconds(60), Seconds(3)) val upv = clickstream.transform( rdd => rdd.reduceByKey(_ ++ _ ).map{ case(url, visits) => { val uv = visits.size (uv, url) }}) upv.foreach(rdd => println(new Date() + "\n---------------------------------------\n" + rdd.top(20).mkString("\n") + "\n")) it is quite quick upon startup, but after running for a few minutes, it goes slower and slower and the latency can be minutes. I found a lot of shuffle writes at /tmp/spark-xxxx in several gigabytes. with 1500 qps of message and window size of 60 seconds, I think it should be done within memory without writing to disk at all I've set executor-memory to 8G, So there is plenty of memory. $SPARK_HOME/bin/spark-submit \ --class "SimpleApp" \ --master spark://localhost:7077 \ --driver-memory 16G \ --executor-memory 8G \ target/scala-2.10/simple-assembly-1.0.jar I also tries these settings, but it still spill to disk. spark.master spark://localhost:7077 #spark.driver.memory 4g #spark.shuffle.file.buffer.kb 4096 #spark.shuffle.memoryFraction 0.8 #spark.storage.unrollFraction 0.8 #spark.storage.unrollMemoryThreshold 1073741824 spark.io.compression.codec lz4 spark.shuffle.spill false spark.serializer org.apache.spark.serializer.KryoSerializer where am I wrong?