The article is interesting but doesn't really help. It has only one
sentence about data distribution in partitions.

How can I diagnose skewed data distribution?

How could evenly sized blocks in HDFS lead to skewed data anyway?
On 9 Sep 2015 2:29 pm, "Akhil Das" <ak...@sigmoidanalytics.com> wrote:

> This post here has a bit information
> http://blog.cloudera.com/blog/2015/05/working-with-apache-spark-or-how-i-learned-to-stop-worrying-and-love-the-shuffle/
>
> Thanks
> Best Regards
>
> On Wed, Sep 9, 2015 at 6:44 AM, mark <manwoodv...@googlemail.com> wrote:
>
>> As I understand things (maybe naively), my input data are stored in equal
>> sized blocks in HDFS, and each block  represents a partition within Spark
>> when read from HDFS, therefore each block should hold roughly the same
>> number of records.
>>
>> So something is missing in my understanding - what can cause some
>> partitions to have zero records and others to have roughly equal sized
>> chunks (~50k in this case)?
>>
>> Before writing a custom partitioner, I would like to understand why has
>> the default partitioner failed in my case?
>> On 8 Sep 2015 3:00 pm, "Akhil Das" <ak...@sigmoidanalytics.com> wrote:
>>
>>> Try using a custom partitioner for the keys so that they will get evenly
>>> distributed across tasks
>>>
>>> Thanks
>>> Best Regards
>>>
>>> On Fri, Sep 4, 2015 at 7:19 PM, mark <manwoodv...@googlemail.com> wrote:
>>>
>>>> I am trying to tune a Spark job and have noticed some strange behavior
>>>> - tasks in a stage vary in execution time, ranging from 2 seconds to 20
>>>> seconds. I assume tasks should all run in roughly the same amount of time
>>>> in a well tuned job.
>>>>
>>>> So I did some investigation - the fast tasks appear to have no records,
>>>> whilst the slow tasks do. I need help understanding why this is happening.
>>>>
>>>> The code in the stage is pretty simple. All it does is:
>>>>
>>>> - filters records
>>>> - maps records to a (key, record) tuple
>>>> - reduces by key
>>>>
>>>> The data are Avro objects stored in Parquet files in 16MB blocks in
>>>> HDFS.
>>>>
>>>> To establish how many records in each partition I added this snippet:
>>>>
>>>> val counts = rdd.mapPartitions(iter => {
>>>>       val ctx = TaskContext.get
>>>>       val stageId = ctx.stageId
>>>>       val partId = ctx.partitionId
>>>>       val attemptid = ctx.taskAttemptId()
>>>>         Array(Array(stageId, partId, attemptid, iter.size)).iterator }
>>>>       , true).collect()
>>>>
>>>> Which produces the following:
>>>>
>>>> 1              1              0              0
>>>> 1              2              1              50489
>>>> 1              3              2              0
>>>> 1              4              3              0
>>>> 1              5              4              0
>>>> 1              6              5              53200
>>>> 1              7              6              0
>>>> 1              8              7              0
>>>> 1              9              8              0
>>>> 1              10           9              56946
>>>> 1              11           10           0
>>>> 1              12           11           0
>>>> 1              13           12           0
>>>> 1              14           13           59209
>>>> 1              15           14           0
>>>> 1              16           15           0
>>>> 1              17           16           0
>>>> 1              18           17           50202
>>>> 1              19           18           0
>>>> 1              20           19           0
>>>> 1              21           20           0
>>>> 1              22           21           54613
>>>> 1              23           22           0
>>>> 1              24           23           0
>>>> 1              25           24           54157
>>>> 1              26           25           0
>>>> 1              27           26           0
>>>> 1              28           27           0
>>>> 1              29           28           53595
>>>> 1              30           29           0
>>>> 1              31           30           0
>>>> 1              32           31           10750
>>>>
>>>>
>>>> Looking at the logs, you can see the tasks that contain records have
>>>> the longest run time:
>>>>
>>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 25.0 in stage 1.0
>>>> (TID 26) in 2782 ms on DG1322 (6/32)
>>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 7.0 in stage 1.0
>>>> (TID 8) in 2815 ms on DG1322 (7/32)
>>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 19.0 in stage 1.0
>>>> (TID 20) in 2815 ms on DG1322 (8/32)
>>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 23.0 in stage 1.0
>>>> (TID 24) in 2840 ms on DG1321 (9/32)
>>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 29.0 in stage 1.0
>>>> (TID 30) in 2839 ms on DG1321 (10/32)
>>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 11.0 in stage 1.0
>>>> (TID 12) in 2878 ms on DG1321 (11/32)
>>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 30.0 in stage 1.0
>>>> (TID 31) in 2870 ms on DG1321 (12/32)
>>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 18.0 in stage 1.0
>>>> (TID 19) in 2892 ms on DG1321 (13/32)
>>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 0.0 in stage 1.0
>>>> (TID 1) in 2930 ms on DG1321 (14/32)
>>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 6.0 in stage 1.0
>>>> (TID 7) in 2934 ms on DG1321 (15/32)
>>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 12.0 in stage 1.0
>>>> (TID 13) in 2931 ms on DG1321 (16/32)
>>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 3.0 in stage 1.0
>>>> (TID 4) in 3246 ms on DG1323 (17/32)
>>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 27.0 in stage 1.0
>>>> (TID 28) in 3226 ms on DG1323 (18/32)
>>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 15.0 in stage 1.0
>>>> (TID 16) in 3249 ms on DG1323 (19/32)
>>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 10.0 in stage 1.0
>>>> (TID 11) in 3669 ms on DG1323 (20/32)
>>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 16.0 in stage 1.0
>>>> (TID 17) in 3666 ms on DG1323 (21/32)
>>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 22.0 in stage 1.0
>>>> (TID 23) in 3664 ms on DG1323 (22/32)
>>>> 15/09/03 16:26:36 INFO TaskSetManager: Finished task 4.0 in stage 1.0
>>>> (TID 5) in 3692 ms on DG1323 (23/32)
>>>> *15/09/03 16:26:39 INFO TaskSetManager: Finished task 31.0 in stage 1.0
>>>> (TID 32) in 6668 ms on DG1322 (24/32)*
>>>> *15/09/03 16:26:48 INFO TaskSetManager: Finished task 17.0 in stage 1.0
>>>> (TID 18) in 15690 ms on DG1321 (25/32)*
>>>> *15/09/03 16:26:49 INFO TaskSetManager: Finished task 1.0 in stage 1.0
>>>> (TID 2) in 16194 ms on DG1322 (26/32)*
>>>> *15/09/03 16:26:49 INFO TaskSetManager: Finished task 5.0 in stage 1.0
>>>> (TID 6) in 16384 ms on DG1321 (27/32)*
>>>> *15/09/03 16:26:50 INFO TaskSetManager: Finished task 28.0 in stage 1.0
>>>> (TID 29) in 17194 ms on DG1323 (28/32)*
>>>> *15/09/03 16:26:50 INFO TaskSetManager: Finished task 21.0 in stage 1.0
>>>> (TID 22) in 17408 ms on DG1323 (29/32)*
>>>> *15/09/03 16:26:50 INFO TaskSetManager: Finished task 13.0 in stage 1.0
>>>> (TID 14) in 17711 ms on DG1322 (30/32)*
>>>> *15/09/03 16:26:51 INFO TaskSetManager: Finished task 24.0 in stage 1.0
>>>> (TID 25) in 17995 ms on DG1321 (31/32)*
>>>> *15/09/03 16:26:51 INFO TaskSetManager: Finished task 9.0 in stage 1.0
>>>> (TID 10) in 18183 ms on DG1323 (32/32)*
>>>>
>>>>
>>>> The Spark UI for the stage shows this:
>>>>
>>>> Summary Metrics for 32 Completed Tasks
>>>>
>>>> Metric Min 25th percentile Median 75th percentile Max
>>>>
>>>> Duration
>>>> 0.9s 2s 2s 16s 18 s
>>>>
>>>> GC Time
>>>> 0ms 0ms 0ms 0.5s 1s
>>>>
>>>> Input Size / Records
>>>> 0.0 B/0     0.0 B/0     0.0 B / 0     0.0 B / 53437      122.0 MB /
>>>> 59209
>>>>
>>>> Shuffle Write Size / Records
>>>> 0.0 B / 0      0.0 B / 0      0.0 B / 0      3.7 KB / 1     127.8 KB /
>>>> 35
>>>>
>>>> Aggregated Metrics by Executor
>>>> Executor ID      Address      Task Time      Total Tasks      Failed
>>>> Tasks      Succeeded Tasks      Input Size / Records      Shuffle Write
>>>> Size / Records
>>>>
>>>> 0      DG1322:41308      35 s      5      0      5      243.9 MB /
>>>> 106948      224.3 KB / 63
>>>> 1      DG1321:33576      48 s      5      0      5      0.0 B / 111750
>>>>             7.4 KB / 2
>>>> 2      DG1323:49348      30 s      5      0      5      0.0 B / 53595
>>>>             3.7 KB / 1
>>>> 3      DG1321:43093      12 s      5      0      5      0.0 B / 0
>>>>                0.0 B / 0
>>>> 4      DG1323:51009      34 s      6      0      6      0.0 B / 54767
>>>>             98.7 KB / 28
>>>> 5      DG1322:36787      50 s      6      0      6      0.0 B / 123977
>>>>            138.9 KB / 38
>>>>
>>>>
>>>> *** Edited for brevity - included an example of a short and long task
>>>> ***
>>>> Tasks
>>>> Index   ID   Attempt   Status   Locality Level   Executor ID / Host
>>>> Duration   GC Time   Accumulators            Input Size / Records
>>>> Shuffle Write Size / Records
>>>> 0          1          0    SUCCESS NODE_LOCAL 4 / DG1323            2 s
>>>>                                                           0.0 B (hadoop) /
>>>> 0         0.0 B / 0
>>>> 1          2          0    SUCCESS NODE_LOCAL 5 / DG1322            16
>>>> s         0.5 s         RecordCount: 54018   0.0 B (hadoop) / 54018 127.8
>>>> KB / 35
>>>>
>>>>
>>>> Why are the tasks in the stage not processing an equal number of
>>>> records? What are the empty tasks doing? How can I even the tasks in this
>>>> stage out?
>>>>
>>>> Any insights much appreciated...
>>>>
>>>>
>>>
>

Reply via email to