Hi Gábor, hi Ufuk, hi Greg,

thank you for your very helpful responses!


> You can try to make your `RichGroupReduceFunction` implement the

> `GroupCombineFunction` interface, so that Flink can do combining

> before the shuffle, which might significantly reduce the network load.

> (How much the combiner helps the performance can greatly depend on how

> large are your groups on average.)

While implementing my reducers I didn’t thought, that combining is applicable, 
‘cause each Mapper will produce each key only one time. I didn’t think of the 
factor, that some mappers running on the same machine and therefore will 
benefit from precombining before shuffling
After I implemented the combiner mentioned here
https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/dataset_transformations.html#combinable-groupreducefunctions
the difference between the runtime of these algorithms with and without a 
combiner decreased. Thank you for the hint!

On curios thing: My Reducer receives BroadcastVariables and access them in the 
open() Method.
When the combine() Method is called, the BroadCast Variable seems not set yet: 
I got an explicit error messages within the open() method. Is this a potential 
bug? Using Apache Flink 1.0.3.

To avoid changing my reducers, I’am wondering, if I should implement a 
GroupCombineFunction independent from the reducer instead:
https://ci.apache.org/projects/flink/flink-docs-master/apis/batch/dataset_transformations.html#groupcombine-on-a-grouped-dataset

As far as I understand, this will work similar?

-          The following transformation - especially the groupBy() - will run 
first on each local Machine:
     combinedWords = input
                 .groupBy(0)
                 .combineGroup(new GroupCombineFunction<String, Tuple2<String, 
Integer>())


-          And then, the following transformation will shuffle the data within 
the 2nd groupBy() over the network:

     output = combinedWords

          .groupBy(0);

          .reduceGroup(new GroupReduceFunction())



> Alternatively, if you can reformulate your algorithm to use a `reduce`

> instead of a `reduceGroup` that might also improve the performance.

> Also, if you are using a `reduce`, then you can try calling

> `.setCombineHint(CombineHint.HASH)` after the reduce. (The combine

> hint is a relatively new feature, so you need the current master for

> this.)
I have to iterate through each tuple multiple time and the final result can 
only be emitted after the last tuple is processed, so I think, I can’t use a 
reduce.


> This could be further amplified by the blocking intermediate results, which 
> have a very simplistic implementation writing out many different files, which 
> can lead to a lot of random I/O.

Thank you for theses technical explanation. I will mentioned it in my 
evaluation!

> Are you able to simplify the your function input / output types? Flink 
> aggressively serializes the data stream and complex types such as ArrayList 
> and BitSet will be much slower to process. Are you able to reconstruct the 
> lists to be groupings on elements?
For my intention, to simulate an Apache Hadoop MapReduce like behaviour, I 
would say that my current implementation fits.
I will think about rewriting the code after the first Benchmarks to potential 
reveal advantages of Apache Flink in comparison for Hadoop MapReduce for the 
algorithms I implemented.

Thanks again!
Robert


Von: Greg Hogan [mailto:c...@greghogan.com]
Gesendet: Dienstag, 26. Juli 2016 18:57
An: user@flink.apache.org<mailto:user@flink.apache.org>
Betreff: Re: Performance issues with GroupBy?

Hi Robert,
Are you able to simplify the your function input / output types? Flink 
aggressively serializes the data stream and complex types such as ArrayList and 
BitSet will be much slower to process. Are you able to reconstruct the lists to 
be groupings on elements?
Greg


-----Ursprüngliche Nachricht-----
Von: Ufuk Celebi [mailto:u...@apache.org]
Gesendet: Dienstag, 26. Juli 2016 11:53
An: user@flink.apache.org<mailto:user@flink.apache.org>
Betreff: Re: Performance issues with GroupBy?



+1 to what Gavor said. The hash combine will be part of the upcoming

1.1. release, too.



This could be further amplified by the blocking intermediate results, which 
have a very simplistic implementation writing out many different files, which 
can lead to a lot of random I/O.



– Ufuk



On Tue, Jul 26, 2016 at 11:41 AM, Gábor Gévay 
<gga...@gmail.com<mailto:gga...@gmail.com>> wrote:

> Hello Robert,

>

>> Is there something I might could do to optimize the grouping?

>

> You can try to make your `RichGroupReduceFunction` implement the

> `GroupCombineFunction` interface, so that Flink can do combining

> before the shuffle, which might significantly reduce the network load.

> (How much the combiner helps the performance can greatly depend on how

> large are your groups on average.)

>

> Alternatively, if you can reformulate your algorithm to use a `reduce`

> instead of a `reduceGroup` that might also improve the performance.

> Also, if you are using a `reduce`, then you can try calling

> `.setCombineHint(CombineHint.HASH)` after the reduce. (The combine

> hint is a relatively new feature, so you need the current master for

> this.)

>

> Best,

> Gábor

>

>

>

> 2016-07-25 14:06 GMT+02:00 Paschek, Robert 
> <robert.pasc...@tu-berlin.de<mailto:robert.pasc...@tu-berlin.de>>:

>> Hi Mailing List,

>>

>>

>>

>> i actually do some benchmarks with different algorithms. The System

>> has 8 nodes and a configured parallelism of 48 - The IBM-Power-1

>> cluster, if somebody from the TU Berlin read this : - ) – and to

>> “simulate” Hadoop MapReduce, the execution mode is set to “BATCH_FORCED”

>>

>>

>>

>> It is suspicious, that three of the six algorithms had a big gap in

>> runtime (5000ms vs. 20000ms) for easy (low dim) tuple. Additionally

>> the algorithms in the “upper” group using a groupBy transformation

>> and the algorithms in the “lower” group don’t use groupBy.

>>

>> I attached the plot for better visualization.

>>

>>

>>

>> I also checked the logs, especially the time, when the mappers

>> finishing and the reducers start _iterating_ - they hardened my speculation.

>>

>>

>>

>> So my question is, if it is “normal”, that grouping is so

>> cost-intensive that – in my case – the runtime increases by 4 times?

>>

>> I have data from the same experiments running on a 13 nodes cluster

>> with 26 cores with Apache Hadoop MapReduce, where the gap is still

>> present, but smaller (50s vs 57s or 55s vs 65s).

>>

>>

>>

>> Is there something I might could do to optimize the grouping? Some

>> codesnipplets:

>>

>>

>>

>> The Job:

>> DataSet<?> output = input

>>

>>                         .mapPartition(new

>> MR_GPMRS_Mapper()).withBroadcastSet(metaData,

>> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.

>> getSimpleName()+"_MAPPER")

>>

>>                         .groupBy(0)

>>

>>                         .reduceGroup(new

>> MR_GPMRS_Reducer()).returns(input.getType()).withBroadcastSet(metaDat

>> a,

>> "MetaData").withParameters(parameters).name(MR_GPMRS_OPTIMIZED.class.

>> getSimpleName()+"_REDUCER");

>>

>>

>>

>> MR_GPMRS_Mapper():

>>

>> public class MR_GPMRS_Mapper <T extends Tuple> extends

>> RichMapPartitionFunction<T,

>> Tuple2<Integer,Tuple3<ArrayList<ArrayList<T>>,

>> BitSet, BitSet>>>

>>

>>

>>

>> MR_GPMRS_Reducer():

>>

>> public class MR_GPMRS_Reducer <T extends Tuple> extends

>> RichGroupReduceFunction<Tuple2<Integer,Tuple3<ArrayList<ArrayList<T>>

>> ,

>> BitSet, BitSet>>, T>

>>

>>

>>

>> The Tuple2 has as Payload on position f1 the Tuple3 and on position

>> f0 the Integer Key for grouping.

>>

>>

>>

>> Any suggestions (or comments, that it is a “normal” behaviour) are welcome :

>> - )

>>

>>

>>

>> Thank you in advance!

>>

>> Robert


Reply via email to