Thanks guys! That makes more sense now. 

So does it mean once I start use a window operator, all operations on my 
WindowedStream would be global (across all partitions)? In that case, 
WindowedStream.aggregate (or sum) would apply to all data after shuffling 
instead of each partition. 

If I understand this correctly, once I want to perform some sort of counting 
within each partition for different words (such as word count), I should really 
avoid using keyBy but keep some sort of counting map for each word while also 
keep track of the current time stamp, inside each mapper.

Le




> On Oct 26, 2017, at 3:17 AM, Fabian Hueske <fhue...@gmail.com> wrote:
> 
> Hi,
> 
> in a MapReduce context, combiners are used to reduce the amount of data 1) to 
> shuffle and fully sort (to group the data by key) and 2) to reduce the impact 
> of skewed data.
> 
> The question is, why do you need a combiner in your use case.
> - To reduce the data to shuffle: You should not use a window operator to 
> preaggregate because keyBy implies a shuffle. Instead you could implement a 
> ProcessFunction with operator state. In this solution you need to implement 
> the windowing logic yourself, i.e., group data in window based on their 
> timestamp. Ensure you don't run out of memory (operator state is kept on the 
> heap), etc. So this solution needs quite a bit of manual tuning.
> - To reduce the impact of skewed data: You can use a window aggregation if 
> you don't mind the shuffle. However, you should add an additional artificial 
> key attribute to spread out the computation of the same original key to more 
> grouping key. Note that Flink assigns grouping keys by hash partitioning to 
> workers. This works well for many distinct keys, but might cause issues in 
> case of low key cardinality. Also note that the state size grows and 
> effectiveness reduces with an increasing cardinality of the artificial key.
> 
> Hope this helps,
> Fabian
> 
> 2017-10-26 3:32 GMT+02:00 Kurt Young <ykt...@gmail.com 
> <mailto:ykt...@gmail.com>>:
> Do you mean you want to keep the origin window as well as doing some combine 
> operations inside window in the same time?
> What kind of data do you expect the following operator will receive?
> 
> Best,
> Kurt
> 
> On Thu, Oct 26, 2017 at 5:29 AM, Le Xu <sharonx...@gmail.com 
> <mailto:sharonx...@gmail.com>> wrote:
> Thank Kurt I'm trying out WindowedStream aggregate right now. Just wondering, 
> is there any way for me to preserve the window after aggregation. More 
> specifically, originally i have something like:
> 
> WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> windowStream = 
> dataStream
>                 .keyBy(0) //id 
>                 .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))
> 
> and then for the reducer I can do:
>  
> windowStream.apply(...) 
> 
> and expect the window information is preserved.
> 
> If I were to do use aggregate on window stream, I would end up with something 
> like:
> 
> DataStream<Tuple2<String, Long>> windowStream = dataStream
>                 .keyBy(0) //id 
>                 .timeWindow(Time.of(windowSize, 
> TimeUnit.MILLISECONDS)).aggregate
>                               (new AggregateFunction<Tuple2<String, Long>, 
> Accumulator, Tuple2<String, Long>>() {
>                     @Override
>                     public Accumulator createAccumulator() {
>                         return null;
>                     }
> 
>                     @Override
>                     public void add(Tuple2<String, Long> stringLong, 
> Accumulator o)                                   {
> 
>                     }
> 
>                     @Override
>                     public Tuple2<String, Long> getResult(Accumulator o) {
>                         return null;
>                     }
> 
>                     @Override
>                     public Accumulator merge(Accumulator o, Accumulator acc1) 
> {
>                         return null;
>                     }
>                 });
> 
> Because it looks like aggregate would only transfer WindowedStream to a 
> DataStream. But for a global aggregation phase (a reducer), should I extract 
> the window again?
> 
> 
> Thanks! I apologize if that sounds like a very intuitive questions.
> 
> 
> Le
> 
> 
> 
> 
> 
> 
> On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young <ykt...@gmail.com 
> <mailto:ykt...@gmail.com>> wrote:
> I think you can use WindowedStream.aggreate
> 
> Best,
> Kurt
> 
> On Tue, Oct 24, 2017 at 1:45 PM, Le Xu <sharonx...@gmail.com 
> <mailto:sharonx...@gmail.com>> wrote:
> Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has 
> implementation of combiner in DataStream (to use after keyBy and windowing).
> 
> Thanks again!
> 
> Le
> 
> On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <ykt...@gmail.com 
> <mailto:ykt...@gmail.com>> wrote:
> Hi,
> 
> The document you are looking at is pretty old, you can check the newest 
> version here: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html>
> 
> Regarding to your question, you can use combineGroup 
> 
> Best,
> Kurt
> 
> On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <sharonx...@gmail.com 
> <mailto:sharonx...@gmail.com>> wrote:
> Hello!
> 
> I'm new to Flink and I'm wondering if there is a explicit local combiner to 
> each mapper so I can use to perform a local reduce on each mapper? I looked 
> up on 
> https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html>
>  but couldn't find anything that matches.
> 
> 
> Thanks!
> 
> Le
> 
> 
> 
> 
> 
> 

Reply via email to