Would you please check your driver log or streaming web UI to see each job's 
latency, including processing latency and total latency.

Seems from your code, sliding window is just 3 seconds, so you will process 
each 60 second's data in 3 seconds, if processing latency is larger than the 
sliding window, so maybe you computation power cannot reach to the qps you 
wanted.

I think you need to identify the bottleneck at first, and then trying to tune 
your code, balance the data, add more computation resources.

Thanks
Jerry

From: Darren Hoo [mailto:darren....@gmail.com]
Sent: Wednesday, March 18, 2015 1:39 PM
To: user@spark.apache.org
Subject: [spark-streaming] can shuffle write to disk be disabled?

I use spark-streaming reading  messages from a Kafka,  the producer creates 
messages about 1500 per second


     def hash(x: String): Int = {

        MurmurHash3.stringHash(x)

     }



     val stream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap, 
StorageLevel.MEMORY_ONLY_SER).map(_._2)



     val clickstream = stream.map(log => {

       //parse log

       ...

      (hash(log.url), HashSet(hash(log.userid)))

    }).window(Seconds(60), Seconds(3))



    val upv = clickstream.transform( rdd => rdd.reduceByKey(_ ++ _ ).map{ 
case(url, visits) => {

         val uv = visits.size

         (uv, url)

    }})



    upv.foreach(rdd => println(new Date() + 
"\n---------------------------------------\n" + rdd.top(20).mkString("\n") + 
"\n"))



it is quite quick upon startup, but after running for a few minutes, it goes 
slower and slower and the latency can be minutes.



I found a lot of shuffle writes at /tmp/spark-xxxx in several gigabytes.



with 1500 qps of message and window size of 60 seconds, I think it should be 
done within memory without writing to disk at all



I've set executor-memory to 8G, So there is plenty of memory.



$SPARK_HOME/bin/spark-submit \

  --class "SimpleApp" \

  --master spark://localhost:7077 \

  --driver-memory 16G  \

  --executor-memory 8G  \

  target/scala-2.10/simple-assembly-1.0.jar



I also tries these settings, but it still spill to disk.



spark.master                     spark://localhost:7077

#spark.driver.memory              4g

#spark.shuffle.file.buffer.kb     4096

#spark.shuffle.memoryFraction     0.8

#spark.storage.unrollFraction     0.8

#spark.storage.unrollMemoryThreshold 1073741824

spark.io.compression.codec       lz4

spark.shuffle.spill              false

spark.serializer                 org.apache.spark.serializer.KryoSerializer



where am I wrong?

Reply via email to