Would you please check your driver log or streaming web UI to see each job's 
latency, including processing latency and total latency.

Seems from your code, sliding window is just 3 seconds, so you will process 
each 60 second's data in 3 seconds, if processing latency is larger than the 
sliding window, so maybe you computation power cannot reach to the qps you 

I think you need to identify the bottleneck at first, and then trying to tune 
your code, balance the data, add more computation resources.


From: Darren Hoo [mailto:darren....@gmail.com]
Sent: Wednesday, March 18, 2015 1:39 PM
To: user@spark.apache.org
Subject: [spark-streaming] can shuffle write to disk be disabled?

I use spark-streaming reading  messages from a Kafka,  the producer creates 
messages about 1500 per second

     def hash(x: String): Int = {



     val stream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, 

     val clickstream = stream.map(log => {

       //parse log


      (hash(log.url), HashSet(hash(log.userid)))

    }).window(Seconds(60), Seconds(3))

    val upv = clickstream.transform( rdd => rdd.reduceByKey(_ ++ _ ).map{ 
case(url, visits) => {

         val uv = visits.size

         (uv, url)


    upv.foreach(rdd => println(new Date() + 
"\n---------------------------------------\n" + rdd.top(20).mkString("\n") + 

it is quite quick upon startup, but after running for a few minutes, it goes 
slower and slower and the latency can be minutes.

I found a lot of shuffle writes at /tmp/spark-xxxx in several gigabytes.

with 1500 qps of message and window size of 60 seconds, I think it should be 
done within memory without writing to disk at all

I've set executor-memory to 8G, So there is plenty of memory.

$SPARK_HOME/bin/spark-submit \

  --class "SimpleApp" \

  --master spark://localhost:7077 \

  --driver-memory 16G  \

  --executor-memory 8G  \


I also tries these settings, but it still spill to disk.

spark.master                     spark://localhost:7077

#spark.driver.memory              4g

#spark.shuffle.file.buffer.kb     4096

#spark.shuffle.memoryFraction     0.8

#spark.storage.unrollFraction     0.8

#spark.storage.unrollMemoryThreshold 1073741824

spark.io.compression.codec       lz4

spark.shuffle.spill              false

spark.serializer                 org.apache.spark.serializer.KryoSerializer

where am I wrong?

Reply via email to