The article is interesting but doesn't really help. It has only one sentence about data distribution in partitions.
How can I diagnose skewed data distribution? How could evenly sized blocks in HDFS lead to skewed data anyway? On 9 Sep 2015 2:29 pm, "Akhil Das" <ak...@sigmoidanalytics.com> wrote: > This post here has a bit information > http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/ > > Thanks > Best Regards > > On Wed, Sep 9, 2015 at 6:44 AM, mark <manwoodv...@googlemail.com> wrote: > >> As I understand things (maybe naively), my input data are stored in equal >> sized blocks in HDFS, and each block represents a partition within Spark >> when read from HDFS, therefore each block should hold roughly the same >> number of records. >> >> So something is missing in my understanding - what can cause some >> partitions to have zero records and others to have roughly equal sized >> chunks (~50k in this case)? >> >> Before writing a custom partitioner, I would like to understand why has >> the default partitioner failed in my case? >> On 8 Sep 2015 3:00 pm, "Akhil Das" <ak...@sigmoidanalytics.com> wrote: >> >>> Try using a custom partitioner for the keys so that they will get evenly >>> distributed across tasks >>> >>> Thanks >>> Best Regards >>> >>> On Fri, Sep 4, 2015 at 7:19 PM, mark <manwoodv...@googlemail.com> wrote: >>> >>>> I am trying to tune a Spark job and have noticed some strange behavior >>>> - tasks in a stage vary in execution time, ranging from 2 seconds to 20 >>>> seconds. I assume tasks should all run in roughly the same amount of time >>>> in a well tuned job. >>>> >>>> So I did some investigation - the fast tasks appear to have no records, >>>> whilst the slow tasks do. I need help understanding why this is happening. >>>> >>>> The code in the stage is pretty simple. All it does is: >>>> >>>> - filters records >>>> - maps records to a (key, record) tuple >>>> - reduces by key >>>> >>>> The data are Avro objects stored in Parquet files in 16MB blocks in >>>> HDFS. >>>> >>>> To establish how many records in each partition I added this snippet: >>>> >>>> val counts = rdd.mapPartitions(iter => { >>>> val ctx = TaskContext.get >>>> val stageId = ctx.stageId >>>> val partId = ctx.partitionId >>>> val attemptid = ctx.taskAttemptId() >>>> Array(Array(stageId, partId, attemptid, iter.size)).iterator } >>>> , true).collect() >>>> >>>> Which produces the following: >>>> >>>> 1 1 0 0 >>>> 1 2 1 50489 >>>> 1 3 2 0 >>>> 1 4 3 0 >>>> 1 5 4 0 >>>> 1 6 5 53200 >>>> 1 7 6 0 >>>> 1 8 7 0 >>>> 1 9 8 0 >>>> 1 10 9 56946 >>>> 1 11 10 0 >>>> 1 12 11 0 >>>> 1 13 12 0 >>>> 1 14 13 59209 >>>> 1 15 14 0 >>>> 1 16 15 0 >>>> 1 17 16 0 >>>> 1 18 17 50202 >>>> 1 19 18 0 >>>> 1 20 19 0 >>>> 1 21 20 0 >>>> 1 22 21 54613 >>>> 1 23 22 0 >>>> 1 24 23 0 >>>> 1 25 24 54157 >>>> 1 26 25 0 >>>> 1 27 26 0 >>>> 1 28 27 0 >>>> 1 29 28 53595 >>>> 1 30 29 0 >>>> 1 31 30 0 >>>> 1 32 31 10750 >>>> >>>> >>>> Looking at the logs, you can see the tasks that contain records have >>>> the longest run time: >>>> >>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 25.0 in stage 1.0 >>>> (TID 26) in 2782 ms on DG1322 (6/32) >>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 7.0 in stage 1.0 >>>> (TID 8) in 2815 ms on DG1322 (7/32) >>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 19.0 in stage 1.0 >>>> (TID 20) in 2815 ms on DG1322 (8/32) >>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 23.0 in stage 1.0 >>>> (TID 24) in 2840 ms on DG1321 (9/32) >>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 29.0 in stage 1.0 >>>> (TID 30) in 2839 ms on DG1321 (10/32) >>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 11.0 in stage 1.0 >>>> (TID 12) in 2878 ms on DG1321 (11/32) >>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 30.0 in stage 1.0 >>>> (TID 31) in 2870 ms on DG1321 (12/32) >>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 18.0 in stage 1.0 >>>> (TID 19) in 2892 ms on DG1321 (13/32) >>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 0.0 in stage 1.0 >>>> (TID 1) in 2930 ms on DG1321 (14/32) >>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 6.0 in stage 1.0 >>>> (TID 7) in 2934 ms on DG1321 (15/32) >>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 12.0 in stage 1.0 >>>> (TID 13) in 2931 ms on DG1321 (16/32) >>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 3.0 in stage 1.0 >>>> (TID 4) in 3246 ms on DG1323 (17/32) >>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 27.0 in stage 1.0 >>>> (TID 28) in 3226 ms on DG1323 (18/32) >>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 15.0 in stage 1.0 >>>> (TID 16) in 3249 ms on DG1323 (19/32) >>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 10.0 in stage 1.0 >>>> (TID 11) in 3669 ms on DG1323 (20/32) >>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 16.0 in stage 1.0 >>>> (TID 17) in 3666 ms on DG1323 (21/32) >>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 22.0 in stage 1.0 >>>> (TID 23) in 3664 ms on DG1323 (22/32) >>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 4.0 in stage 1.0 >>>> (TID 5) in 3692 ms on DG1323 (23/32) >>>> *15/09/03 16:26:39 INFO TaskSetManager: Finished task 31.0 in stage 1.0 >>>> (TID 32) in 6668 ms on DG1322 (24/32)* >>>> *15/09/03 16:26:48 INFO TaskSetManager: Finished task 17.0 in stage 1.0 >>>> (TID 18) in 15690 ms on DG1321 (25/32)* >>>> *15/09/03 16:26:49 INFO TaskSetManager: Finished task 1.0 in stage 1.0 >>>> (TID 2) in 16194 ms on DG1322 (26/32)* >>>> *15/09/03 16:26:49 INFO TaskSetManager: Finished task 5.0 in stage 1.0 >>>> (TID 6) in 16384 ms on DG1321 (27/32)* >>>> *15/09/03 16:26:50 INFO TaskSetManager: Finished task 28.0 in stage 1.0 >>>> (TID 29) in 17194 ms on DG1323 (28/32)* >>>> *15/09/03 16:26:50 INFO TaskSetManager: Finished task 21.0 in stage 1.0 >>>> (TID 22) in 17408 ms on DG1323 (29/32)* >>>> *15/09/03 16:26:50 INFO TaskSetManager: Finished task 13.0 in stage 1.0 >>>> (TID 14) in 17711 ms on DG1322 (30/32)* >>>> *15/09/03 16:26:51 INFO TaskSetManager: Finished task 24.0 in stage 1.0 >>>> (TID 25) in 17995 ms on DG1321 (31/32)* >>>> *15/09/03 16:26:51 INFO TaskSetManager: Finished task 9.0 in stage 1.0 >>>> (TID 10) in 18183 ms on DG1323 (32/32)* >>>> >>>> >>>> The Spark UI for the stage shows this: >>>> >>>> Summary Metrics for 32 Completed Tasks >>>> >>>> Metric Min 25th percentile Median 75th percentile Max >>>> >>>> Duration >>>> 0.9s 2s 2s 16s 18 s >>>> >>>> GC Time >>>> 0ms 0ms 0ms 0.5s 1s >>>> >>>> Input Size / Records >>>> 0.0 B/0 0.0 B/0 0.0 B / 0 0.0 B / 53437 122.0 MB / >>>> 59209 >>>> >>>> Shuffle Write Size / Records >>>> 0.0 B / 0 0.0 B / 0 0.0 B / 0 3.7 KB / 1 127.8 KB / >>>> 35 >>>> >>>> Aggregated Metrics by Executor >>>> Executor ID Address Task Time Total Tasks Failed >>>> Tasks Succeeded Tasks Input Size / Records Shuffle Write >>>> Size / Records >>>> >>>> 0 DG1322:41308 35 s 5 0 5 243.9 MB / >>>> 106948 224.3 KB / 63 >>>> 1 DG1321:33576 48 s 5 0 5 0.0 B / 111750 >>>> 7.4 KB / 2 >>>> 2 DG1323:49348 30 s 5 0 5 0.0 B / 53595 >>>> 3.7 KB / 1 >>>> 3 DG1321:43093 12 s 5 0 5 0.0 B / 0 >>>> 0.0 B / 0 >>>> 4 DG1323:51009 34 s 6 0 6 0.0 B / 54767 >>>> 98.7 KB / 28 >>>> 5 DG1322:36787 50 s 6 0 6 0.0 B / 123977 >>>> 138.9 KB / 38 >>>> >>>> >>>> *** Edited for brevity - included an example of a short and long task >>>> *** >>>> Tasks >>>> Index ID Attempt Status Locality Level Executor ID / Host >>>> Duration GC Time Accumulators Input Size / Records >>>> Shuffle Write Size / Records >>>> 0 1 0 SUCCESS NODE_LOCAL 4 / DG1323 2 s >>>> 0.0 B (hadoop) / >>>> 0 0.0 B / 0 >>>> 1 2 0 SUCCESS NODE_LOCAL 5 / DG1322 16 >>>> s 0.5 s RecordCount: 54018 0.0 B (hadoop) / 54018 127.8 >>>> KB / 35 >>>> >>>> >>>> Why are the tasks in the stage not processing an equal number of >>>> records? What are the empty tasks doing? How can I even the tasks in this >>>> stage out? >>>> >>>> Any insights much appreciated... >>>> >>>> >>> >