can distinct transform applied on DStream?

2015-03-20 Thread Darren Hoo
val aDstream = ...

val distinctStream = aDstream.transform(_.distinct())

but the elements in distinctStream  are not distinct.

Did I use it wrong?


Re: [spark-streaming] can shuffle write to disk be disabled?

2015-03-18 Thread Darren Hoo
Thanks, Shao

On Wed, Mar 18, 2015 at 3:34 PM, Shao, Saisai saisai.s...@intel.com wrote:

  Yeah, as I said your job processing time is much larger than the sliding
 window, and streaming job is executed one by one in sequence, so the next
 job will wait until the first job is finished, so the total latency will be
 accumulated.



 I think you need to identify the bottleneck of your job at first. If the
 shuffle is so slow, you could enlarge the shuffle fraction of memory to
 reduce the spill, but finally the shuffle data will be written to disk,
 this cannot be disabled, unless you mount your spark.tmp.dir on ramdisk.



I have increased spark.shuffle.memoryFraction  to  0.8  which I can see
from SparKUI's environment variables

But spill  always happens even from start when latency is less than slide
window(I changed it to 10 seconds),
the shuflle data disk written is really a snow ball effect,  it slows down
eventually.

I noticed that the files spilled to disk are all very small in size but
huge in numbers:

total 344K

drwxr-xr-x  2 root root 4.0K Mar 18 16:55 .

drwxr-xr-x 66 root root 4.0K Mar 18 16:39 ..

-rw-r--r--  1 root root  80K Mar 18 16:54 shuffle_47_519_0.data

-rw-r--r--  1 root root  75K Mar 18 16:54 shuffle_48_419_0.data

-rw-r--r--  1 root root  36K Mar 18 16:54 shuffle_48_518_0.data

-rw-r--r--  1 root root  69K Mar 18 16:55 shuffle_49_319_0.data

-rw-r--r--  1 root root  330 Mar 18 16:55 shuffle_49_418_0.data

-rw-r--r--  1 root root  65K Mar 18 16:55 shuffle_49_517_0.data

MemStore says:

15/03/18 17:59:43 WARN MemoryStore: Failed to reserve initial memory
threshold of 1024.0 KB for computing block rdd_1338_2 in memory.
15/03/18 17:59:43 WARN MemoryStore: Not enough space to cache
rdd_1338_2 in memory! (computed 512.0 B so far)
15/03/18 17:59:43 INFO MemoryStore: Memory use = 529.0 MB (blocks) +
0.0 B (scratch space shared across 0 thread(s)) = 529.0 MB. Storage
limit = 529.9 MB.

Not enough space even for 512 byte??


The executors still has plenty free memory:
0slave1:40778 0   0.0 B / 529.9 MB  0.0 B 16 0 15047 15063 2.17
h  0.0 B  402.3 MB  768.0 B
1 slave2:50452 0 0.0 B / 529.9 MB  0.0 B 16 0 14447 14463 2.17 h  0.0 B
388.8 MB  1248.0 B

1 lvs02:47325116 27.6 MB / 529.9 MB  0.0 B 8 0 58169 58177 3.16
h  893.5 MB  624.0 B  1189.9 MB

driver lvs02:47041 0 0.0 B / 529.9 MB  0.0 B 0 0 0 0 0 ms  0.0 B  0.0
B  0.0 B


Besides if CPU or network is the bottleneck, you might need to add more
 resources to your cluster.



 3 dedicated servers each with CPU 16 cores + 16GB memory and Gigabyte
network.
 CPU load is quite low , about 1~3 from top,  and network usage  is far
from saturated.

 I don't even  do any usefull complex calculations in this small Simple App
yet.


Re: [spark-streaming] can shuffle write to disk be disabled?

2015-03-18 Thread Darren Hoo
I've already done that:

From SparkUI Environment  Spark properties has:

spark.shuffle.spillfalse

On Wed, Mar 18, 2015 at 6:34 PM, Akhil Das ak...@sigmoidanalytics.com
wrote:

 I think you can disable it with spark.shuffle.spill=false

 Thanks
 Best Regards

 On Wed, Mar 18, 2015 at 3:39 PM, Darren Hoo darren@gmail.com wrote:

 Thanks, Shao

 On Wed, Mar 18, 2015 at 3:34 PM, Shao, Saisai saisai.s...@intel.com
 wrote:

  Yeah, as I said your job processing time is much larger than the
 sliding window, and streaming job is executed one by one in sequence, so
 the next job will wait until the first job is finished, so the total
 latency will be accumulated.



 I think you need to identify the bottleneck of your job at first. If the
 shuffle is so slow, you could enlarge the shuffle fraction of memory to
 reduce the spill, but finally the shuffle data will be written to disk,
 this cannot be disabled, unless you mount your spark.tmp.dir on ramdisk.



 I have increased spark.shuffle.memoryFraction  to  0.8  which I can see
 from SparKUI's environment variables

 But spill  always happens even from start when latency is less than slide
 window(I changed it to 10 seconds),
 the shuflle data disk written is really a snow ball effect,  it slows
 down eventually.

 I noticed that the files spilled to disk are all very small in size but
 huge in numbers:

 total 344K

 drwxr-xr-x  2 root root 4.0K Mar 18 16:55 .

 drwxr-xr-x 66 root root 4.0K Mar 18 16:39 ..

 -rw-r--r--  1 root root  80K Mar 18 16:54 shuffle_47_519_0.data

 -rw-r--r--  1 root root  75K Mar 18 16:54 shuffle_48_419_0.data

 -rw-r--r--  1 root root  36K Mar 18 16:54 shuffle_48_518_0.data

 -rw-r--r--  1 root root  69K Mar 18 16:55 shuffle_49_319_0.data

 -rw-r--r--  1 root root  330 Mar 18 16:55 shuffle_49_418_0.data

 -rw-r--r--  1 root root  65K Mar 18 16:55 shuffle_49_517_0.data

 MemStore says:

 15/03/18 17:59:43 WARN MemoryStore: Failed to reserve initial memory 
 threshold of 1024.0 KB for computing block rdd_1338_2 in memory.
 15/03/18 17:59:43 WARN MemoryStore: Not enough space to cache rdd_1338_2 in 
 memory! (computed 512.0 B so far)
 15/03/18 17:59:43 INFO MemoryStore: Memory use = 529.0 MB (blocks) + 0.0 B 
 (scratch space shared across 0 thread(s)) = 529.0 MB. Storage limit = 529.9 
 MB.

 Not enough space even for 512 byte??


 The executors still has plenty free memory:
 0slave1:40778 0   0.0 B / 529.9 MB  0.0 B 16 0 15047 15063 2.17
 h  0.0 B  402.3 MB  768.0 B
 1 slave2:50452 0 0.0 B / 529.9 MB  0.0 B 16 0 14447 14463 2.17 h  0.0 B
 388.8 MB  1248.0 B

 1 lvs02:47325116 27.6 MB / 529.9 MB  0.0 B 8 0 58169 58177 3.16
 h  893.5 MB  624.0 B  1189.9 MB

 driver lvs02:47041 0 0.0 B / 529.9 MB  0.0 B 0 0 0 0 0 ms  0.0 B
 0.0 B  0.0 B


 Besides if CPU or network is the bottleneck, you might need to add more
 resources to your cluster.



  3 dedicated servers each with CPU 16 cores + 16GB memory and Gigabyte
 network.
  CPU load is quite low , about 1~3 from top,  and network usage  is far
 from saturated.

  I don't even  do any usefull complex calculations in this small Simple
 App yet.






Re: [spark-streaming] can shuffle write to disk be disabled?

2015-03-18 Thread Darren Hoo
On Wed, Mar 18, 2015 at 8:31 PM, Shao, Saisai saisai.s...@intel.com wrote:

  From the log you pasted I think this (-rw-r--r--  1 root root  80K Mar
 18 16:54 shuffle_47_519_0.data) is not shuffle spilled data, but the
 final shuffle result.


why the shuffle result  is written to disk?


 As I said, did you think shuffle is the bottleneck which makes your job
 running slowly?


I am quite new to spark, So I am just doing wild guesses. which information
should I provide further that
can help to find the real bottleneck?

Maybe you should identify the cause at first. Besides from the log it looks
 your memory is not enough the cache the data, maybe you should increase the
 memory size of the executor.




 running two executors, the memory ussage is quite low:

executor 0  8.6 MB / 4.1 GB
executor 1  23.9 MB / 4.1 GB
driver 0.0B / 529.9 MB

submitted with args : --executor-memory 8G  --num-executors 2
--driver-memory 1G


Re: [spark-streaming] can shuffle write to disk be disabled?

2015-03-18 Thread Darren Hoo
: Finished task 179.0 in stage 392.0
(TID 100613) in 292 ms on lvs02 (181/291)

15/03/18 15:16:41 INFO TaskSetManager: Starting task 188.0 in stage 392.0
(TID 100622, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:41 INFO TaskSetManager: Finished task 182.0 in stage 392.0
(TID 100616) in 213 ms on lvs02 (182/291)

15/03/18 15:16:41 INFO BlockManagerInfo: Added input-0-1426663001400 in
memory on lvs02:38954 (size: 24.4 KB, free: 1068.1 MB)

15/03/18 15:16:41 INFO TaskSetManager: Starting task 189.0 in stage 392.0
(TID 100623, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:41 INFO TaskSetManager: Finished task 181.0 in stage 392.0
(TID 100615) in 286 ms on lvs02 (183/291)

15/03/18 15:16:41 INFO TaskSetManager: Starting task 190.0 in stage 392.0
(TID 100624, lvs02, PROCESS_LOCAL, 1122 bytes)

15/03/18 15:16:41 INFO TaskSetManager: Finished task 183.0 in stage 392.0
(TID 100617) in 261 ms on lvs02 (184/291)


Any hints?


Thanks!

On Wed, Mar 18, 2015 at 2:19 PM, Shao, Saisai saisai.s...@intel.com wrote:

  Would you please check your driver log or streaming web UI to see each
 job's latency, including processing latency and total latency.



 Seems from your code, sliding window is just 3 seconds, so you will
 process each 60 second's data in 3 seconds, if processing latency is larger
 than the sliding window, so maybe you computation power cannot reach to the
 qps you wanted.



 I think you need to identify the bottleneck at first, and then trying to
 tune your code, balance the data, add more computation resources.



 Thanks

 Jerry



 *From:* Darren Hoo [mailto:darren@gmail.com]
 *Sent:* Wednesday, March 18, 2015 1:39 PM
 *To:* user@spark.apache.org
 *Subject:* [spark-streaming] can shuffle write to disk be disabled?



 I use spark-streaming reading  messages from a Kafka,  the producer
 creates messages about 1500 per second



  def hash(x: String): Int = {

 MurmurHash3.stringHash(x)

  }



  val stream = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap,
 StorageLevel.MEMORY_ONLY_SER).map(_._2)



  val clickstream = stream.map(log = {

//parse log

...

   (hash(log.url), HashSet(hash(log.userid)))

 }).window(Seconds(60), Seconds(3))



 val upv = clickstream.transform( rdd = rdd.reduceByKey(_ ++ _ ).map{
 case(url, visits) = {

  val uv = visits.size

  (uv, url)

 }})



 upv.foreach(rdd = println(new Date() +
 \n---\n + rdd.top(20).mkString(\n)
 + \n))



 it is quite quick upon startup, but after running for a few minutes, it
 goes slower and slower and the latency can be minutes.



 I found a lot of shuffle writes at /tmp/spark- in several gigabytes.



 with 1500 qps of message and window size of 60 seconds, I think it should
 be done within memory without writing to disk at all



 I've set executor-memory to 8G, So there is plenty of memory.



 $SPARK_HOME/bin/spark-submit \

   --class SimpleApp \

   --master spark://localhost:7077 \

   --driver-memory 16G  \

   --executor-memory 8G  \

   target/scala-2.10/simple-assembly-1.0.jar



 I also tries these settings, but it still spill to disk.



 spark.master spark://localhost:7077

 #spark.driver.memory  4g

 #spark.shuffle.file.buffer.kb 4096

 #spark.shuffle.memoryFraction 0.8

 #spark.storage.unrollFraction 0.8

 #spark.storage.unrollMemoryThreshold 1073741824

 spark.io.compression.codec   lz4

 spark.shuffle.spill  false

 spark.serializer org.apache.spark.serializer.KryoSerializer



 where am I wrong?