This post here has a bit information http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/
Thanks Best Regards On Wed, Sep 9, 2015 at 6:44 AM, mark <manwoodv...@googlemail.com> wrote: > As I understand things (maybe naively), my input data are stored in equal > sized blocks in HDFS, and each block represents a partition within Spark > when read from HDFS, therefore each block should hold roughly the same > number of records. > > So something is missing in my understanding - what can cause some > partitions to have zero records and others to have roughly equal sized > chunks (~50k in this case)? > > Before writing a custom partitioner, I would like to understand why has > the default partitioner failed in my case? > On 8 Sep 2015 3:00 pm, "Akhil Das" <ak...@sigmoidanalytics.com> wrote: > >> Try using a custom partitioner for the keys so that they will get evenly >> distributed across tasks >> >> Thanks >> Best Regards >> >> On Fri, Sep 4, 2015 at 7:19 PM, mark <manwoodv...@googlemail.com> wrote: >> >>> I am trying to tune a Spark job and have noticed some strange behavior - >>> tasks in a stage vary in execution time, ranging from 2 seconds to 20 >>> seconds. I assume tasks should all run in roughly the same amount of time >>> in a well tuned job. >>> >>> So I did some investigation - the fast tasks appear to have no records, >>> whilst the slow tasks do. I need help understanding why this is happening. >>> >>> The code in the stage is pretty simple. All it does is: >>> >>> - filters records >>> - maps records to a (key, record) tuple >>> - reduces by key >>> >>> The data are Avro objects stored in Parquet files in 16MB blocks in HDFS. >>> >>> To establish how many records in each partition I added this snippet: >>> >>> val counts = rdd.mapPartitions(iter => { >>> val ctx = TaskContext.get >>> val stageId = ctx.stageId >>> val partId = ctx.partitionId >>> val attemptid = ctx.taskAttemptId() >>> Array(Array(stageId, partId, attemptid, iter.size)).iterator } >>> , true).collect() >>> >>> Which produces the following: >>> >>> 1 1 0 0 >>> 1 2 1 50489 >>> 1 3 2 0 >>> 1 4 3 0 >>> 1 5 4 0 >>> 1 6 5 53200 >>> 1 7 6 0 >>> 1 8 7 0 >>> 1 9 8 0 >>> 1 10 9 56946 >>> 1 11 10 0 >>> 1 12 11 0 >>> 1 13 12 0 >>> 1 14 13 59209 >>> 1 15 14 0 >>> 1 16 15 0 >>> 1 17 16 0 >>> 1 18 17 50202 >>> 1 19 18 0 >>> 1 20 19 0 >>> 1 21 20 0 >>> 1 22 21 54613 >>> 1 23 22 0 >>> 1 24 23 0 >>> 1 25 24 54157 >>> 1 26 25 0 >>> 1 27 26 0 >>> 1 28 27 0 >>> 1 29 28 53595 >>> 1 30 29 0 >>> 1 31 30 0 >>> 1 32 31 10750 >>> >>> >>> Looking at the logs, you can see the tasks that contain records have the >>> longest run time: >>> >>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 25.0 in stage 1.0 >>> (TID 26) in 2782 ms on DG1322 (6/32) >>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 7.0 in stage 1.0 >>> (TID 8) in 2815 ms on DG1322 (7/32) >>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 19.0 in stage 1.0 >>> (TID 20) in 2815 ms on DG1322 (8/32) >>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 23.0 in stage 1.0 >>> (TID 24) in 2840 ms on DG1321 (9/32) >>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 29.0 in stage 1.0 >>> (TID 30) in 2839 ms on DG1321 (10/32) >>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 11.0 in stage 1.0 >>> (TID 12) in 2878 ms on DG1321 (11/32) >>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 30.0 in stage 1.0 >>> (TID 31) in 2870 ms on DG1321 (12/32) >>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 18.0 in stage 1.0 >>> (TID 19) in 2892 ms on DG1321 (13/32) >>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 0.0 in stage 1.0 >>> (TID 1) in 2930 ms on DG1321 (14/32) >>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 6.0 in stage 1.0 >>> (TID 7) in 2934 ms on DG1321 (15/32) >>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 12.0 in stage 1.0 >>> (TID 13) in 2931 ms on DG1321 (16/32) >>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 3.0 in stage 1.0 >>> (TID 4) in 3246 ms on DG1323 (17/32) >>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 27.0 in stage 1.0 >>> (TID 28) in 3226 ms on DG1323 (18/32) >>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 15.0 in stage 1.0 >>> (TID 16) in 3249 ms on DG1323 (19/32) >>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 10.0 in stage 1.0 >>> (TID 11) in 3669 ms on DG1323 (20/32) >>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 16.0 in stage 1.0 >>> (TID 17) in 3666 ms on DG1323 (21/32) >>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 22.0 in stage 1.0 >>> (TID 23) in 3664 ms on DG1323 (22/32) >>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 4.0 in stage 1.0 >>> (TID 5) in 3692 ms on DG1323 (23/32) >>> *15/09/03 16:26:39 INFO TaskSetManager: Finished task 31.0 in stage 1.0 >>> (TID 32) in 6668 ms on DG1322 (24/32)* >>> *15/09/03 16:26:48 INFO TaskSetManager: Finished task 17.0 in stage 1.0 >>> (TID 18) in 15690 ms on DG1321 (25/32)* >>> *15/09/03 16:26:49 INFO TaskSetManager: Finished task 1.0 in stage 1.0 >>> (TID 2) in 16194 ms on DG1322 (26/32)* >>> *15/09/03 16:26:49 INFO TaskSetManager: Finished task 5.0 in stage 1.0 >>> (TID 6) in 16384 ms on DG1321 (27/32)* >>> *15/09/03 16:26:50 INFO TaskSetManager: Finished task 28.0 in stage 1.0 >>> (TID 29) in 17194 ms on DG1323 (28/32)* >>> *15/09/03 16:26:50 INFO TaskSetManager: Finished task 21.0 in stage 1.0 >>> (TID 22) in 17408 ms on DG1323 (29/32)* >>> *15/09/03 16:26:50 INFO TaskSetManager: Finished task 13.0 in stage 1.0 >>> (TID 14) in 17711 ms on DG1322 (30/32)* >>> *15/09/03 16:26:51 INFO TaskSetManager: Finished task 24.0 in stage 1.0 >>> (TID 25) in 17995 ms on DG1321 (31/32)* >>> *15/09/03 16:26:51 INFO TaskSetManager: Finished task 9.0 in stage 1.0 >>> (TID 10) in 18183 ms on DG1323 (32/32)* >>> >>> >>> The Spark UI for the stage shows this: >>> >>> Summary Metrics for 32 Completed Tasks >>> >>> Metric Min 25th percentile Median 75th percentile Max >>> >>> Duration >>> 0.9s 2s 2s 16s 18 s >>> >>> GC Time >>> 0ms 0ms 0ms 0.5s 1s >>> >>> Input Size / Records >>> 0.0 B/0 0.0 B/0 0.0 B / 0 0.0 B / 53437 122.0 MB / 59209 >>> >>> Shuffle Write Size / Records >>> 0.0 B / 0 0.0 B / 0 0.0 B / 0 3.7 KB / 1 127.8 KB / 35 >>> >>> Aggregated Metrics by Executor >>> Executor ID Address Task Time Total Tasks Failed >>> Tasks Succeeded Tasks Input Size / Records Shuffle Write >>> Size / Records >>> >>> 0 DG1322:41308 35 s 5 0 5 243.9 MB / >>> 106948 224.3 KB / 63 >>> 1 DG1321:33576 48 s 5 0 5 0.0 B / 111750 >>> 7.4 KB / 2 >>> 2 DG1323:49348 30 s 5 0 5 0.0 B / 53595 >>> 3.7 KB / 1 >>> 3 DG1321:43093 12 s 5 0 5 0.0 B / 0 >>> 0.0 B / 0 >>> 4 DG1323:51009 34 s 6 0 6 0.0 B / 54767 >>> 98.7 KB / 28 >>> 5 DG1322:36787 50 s 6 0 6 0.0 B / 123977 >>> 138.9 KB / 38 >>> >>> >>> *** Edited for brevity - included an example of a short and long task *** >>> Tasks >>> Index ID Attempt Status Locality Level Executor ID / Host >>> Duration GC Time Accumulators Input Size / Records >>> Shuffle Write Size / Records >>> 0 1 0 SUCCESS NODE_LOCAL 4 / DG1323 2 s >>> 0.0 B (hadoop) / >>> 0 0.0 B / 0 >>> 1 2 0 SUCCESS NODE_LOCAL 5 / DG1322 16 s >>> 0.5 s RecordCount: 54018 0.0 B (hadoop) / 54018 127.8 KB >>> / 35 >>> >>> >>> Why are the tasks in the stage not processing an equal number of >>> records? What are the empty tasks doing? How can I even the tasks in this >>> stage out? >>> >>> Any insights much appreciated... >>> >>> >>