Github user fhueske commented on the pull request:

    https://github.com/apache/flink/pull/1255#issuecomment-165453966
  
    I executed the Flink WordCount example on 4 nodes with 8 parallel tasks and 
roughly 17GB of input data once with hash partitioning and once with range 
partitioning. Both times no combiner was used.
    First of all, both programs compute the same result and the result of the 
range partitioned WordCount is ordered. So the implementation is correct.
    
    The hash partitioned WC took 8:00 mins and the range partitioned 13:17 
mins. 
    The breakdown of the range partitioned WC looks as follows:
    
    1. Source+FlatMap: 3:01 mins
    2. LocalSample: 3:01 mins
    3. GlobalSample: 0:15 mins
    4. Histogram: 24 ms
    5. PreparePartition: 8:49 mins
    6. Partition: 8:48 mins
    7. GroupReduce: 10:14 mins
    8. Sink: 1:09 mins
    
    The breakdown of the hash partitioned WC is:
    
    1. Source + FlatMap: 6:26 mins
    2. Partition: 6:25 mins
    3. GroupReduce: 7:58 mins
    4. Sink: 1:21 mins
    
    So, the overhead of the range partitioned WC comes from additional IO of 
reading the flatMapped words and the additional 4-byte integer. Also the 
sorting of the group reduce does not happen concurrently with the source IO. 
Reducing (w/o sort) and sink take about the same amount of time.
    
    I also check the distribution of input and output records / bytes for the 
GroupReduce.
    
    |  | min records-in | min bytes-in | max records-in | max bytes-in | 
    | --- | --- | --- | --- | --- |
    | Hash | 197M | 1.82GB | 346M | 2.90GB |  
    | Range | 177M | 1.41GB | 352M | 2.90GB |
    
    |  | min records-out | min bytes-out | max records-out | max bytes-out | 
    | --- | --- | --- | --- | --- |
    Hash | 2.5M | 26.5MB | 2.5M | 26.5MB
    Range | 2.3K | 28.2KB | 14M | 154MB
    
    We see that the range partitioner does not perform better (in fact a bit 
worse) than the hash partitioner (the differences for output records are 
expected). Maybe increasing the sample size helps? The overhead of reading the 
the intermediate data set from disk is so high, that a more fine-grained 
histogram can be justified, IMO. How about increasing the sample size from 
`parallelism * 20` to `parallelism * 1000`?
    
    Other thoughts?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to