Hi Kien:

Is there a similar API for DataStream as well?

Thanks!

Le


> On Oct 26, 2017, at 7:58 AM, Kien Truong <duckientru...@gmail.com> wrote:
> 
> Hi,
> 
> For batch API, you can use GroupReduceFunction, which give you the same 
> benefit as a MapReduce combiner.
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html#combinable-groupreducefunctions
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html#combinable-groupreducefunctions>Regards,
> Kien
> 
> 
> On 10/26/2017 7:37 PM, Le Xu wrote:
>> Thanks guys! That makes more sense now. 
>> 
>> So does it mean once I start use a window operator, all operations on my 
>> WindowedStream would be global (across all partitions)? In that case, 
>> WindowedStream.aggregate (or sum) would apply to all data after shuffling 
>> instead of each partition. 
>> 
>> If I understand this correctly, once I want to perform some sort of counting 
>> within each partition for different words (such as word count), I should 
>> really avoid using keyBy but keep some sort of counting map for each word 
>> while also keep track of the current time stamp, inside each mapper.
>> 
>> Le
>> 
>> 
>> 
>> 
>>> On Oct 26, 2017, at 3:17 AM, Fabian Hueske <fhue...@gmail.com 
>>> <mailto:fhue...@gmail.com>> wrote:
>>> 
>>> Hi,
>>> 
>>> in a MapReduce context, combiners are used to reduce the amount of data 1) 
>>> to shuffle and fully sort (to group the data by key) and 2) to reduce the 
>>> impact of skewed data.
>>> 
>>> The question is, why do you need a combiner in your use case.
>>> - To reduce the data to shuffle: You should not use a window operator to 
>>> preaggregate because keyBy implies a shuffle. Instead you could implement a 
>>> ProcessFunction with operator state. In this solution you need to implement 
>>> the windowing logic yourself, i.e., group data in window based on their 
>>> timestamp. Ensure you don't run out of memory (operator state is kept on 
>>> the heap), etc. So this solution needs quite a bit of manual tuning.
>>> - To reduce the impact of skewed data: You can use a window aggregation if 
>>> you don't mind the shuffle. However, you should add an additional           
>>>         artificial key attribute to spread out the computation of the same 
>>> original key to more grouping key. Note that Flink assigns grouping keys by 
>>> hash partitioning to workers. This works well for many distinct keys, but 
>>> might cause issues in case of low key cardinality. Also note that the state 
>>> size grows and effectiveness reduces with an increasing cardinality of the 
>>> artificial key.
>>> 
>>> Hope this helps,
>>> Fabian
>>> 
>>> 2017-10-26 3:32 GMT+02:00 Kurt Young <ykt...@gmail.com 
>>> <mailto:ykt...@gmail.com>>:
>>> Do you mean you want to keep the origin window as well as doing some 
>>> combine operations inside window in the same time?
>>> What kind of data do you expect the following operator will receive?
>>> 
>>> Best,
>>> Kurt
>>> 
>>> On Thu, Oct 26, 2017 at 5:29 AM, Le Xu <sharonx...@gmail.com 
>>> <mailto:sharonx...@gmail.com>> wrote:
>>> Thank Kurt I'm trying out WindowedStream aggregate right now. Just 
>>> wondering, is there any way for me to preserve the window after 
>>> aggregation. More specifically, originally i have something like:
>>> 
>>> WindowedStream<Tuple2<String, Long>, Tuple, TimeWindow> windowStream = 
>>> dataStream
>>>                 .keyBy(0) //id 
>>>                 .timeWindow(Time.of(windowSize, TimeUnit.MILLISECONDS))
>>> 
>>> and then for the reducer I can do:
>>>  
>>> windowStream.apply(...) 
>>> 
>>> and expect the window information is preserved.
>>> 
>>> If I were to do use aggregate on window stream, I would end up with 
>>> something like:
>>> 
>>> DataStream<Tuple2<String, Long>> windowStream = dataStream
>>>                 .keyBy(0) //id 
>>>                 .timeWindow(Time.of(windowSize, 
>>> TimeUnit.MILLISECONDS)).aggregate
>>>                             (new AggregateFunction<Tuple2<String, Long>, 
>>> Accumulator, Tuple2<String, Long>>() {
>>>                     @Override
>>>                     public Accumulator createAccumulator() {
>>>                         return null;
>>>                     }
>>> 
>>>                     @Override
>>>                     public void add(Tuple2<String, Long> stringLong, 
>>> Accumulator o)                                         {
>>> 
>>>                     }
>>> 
>>>                     @Override
>>>                     public Tuple2<String, Long> getResult(Accumulator o) {
>>>                         return null;
>>>                     }
>>> 
>>>                     @Override
>>>                     public Accumulator merge(Accumulator o, Accumulator 
>>> acc1) {
>>>                         return null;
>>>                     }
>>>                 });
>>> 
>>> Because it looks like aggregate would only transfer WindowedStream to a 
>>> DataStream. But for a global aggregation phase (a reducer), should I 
>>> extract the window again?
>>> 
>>> 
>>> Thanks! I apologize if that sounds like a very intuitive questions.
>>> 
>>> 
>>> Le
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>>> On Tue, Oct 24, 2017 at 4:14 AM, Kurt Young <ykt...@gmail.com 
>>> <mailto:ykt...@gmail.com>> wrote:
>>> I think you can use WindowedStream.aggreate
>>> 
>>> Best,
>>> Kurt
>>> 
>>> On Tue, Oct 24, 2017 at 1:45 PM, Le Xu <sharonx...@gmail.com 
>>> <mailto:sharonx...@gmail.com>> wrote:
>>> Thanks Kurt. Maybe I wasn't clear before, I was wondering if Flink has 
>>> implementation of combiner in DataStream (to use after keyBy and windowing).
>>> 
>>> Thanks again!
>>> 
>>> Le
>>> 
>>> On Sun, Oct 22, 2017 at 8:52 PM, Kurt Young <ykt...@gmail.com 
>>> <mailto:ykt...@gmail.com>> wrote:
>>> Hi,
>>> 
>>> The document you are looking at is pretty old, you can check the newest 
>>> version here: 
>>> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html
>>>  
>>> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/batch/dataset_transformations.html>
>>> 
>>> Regarding to your question, you can use combineGroup 
>>> 
>>> Best,
>>> Kurt
>>> 
>>> On Mon, Oct 23, 2017 at 5:22 AM, Le Xu <sharonx...@gmail.com 
>>> <mailto:sharonx...@gmail.com>> wrote:
>>> Hello!
>>> 
>>> I'm new to Flink and I'm wondering if there is a explicit local combiner to 
>>> each mapper so I can use to perform a local reduce on each mapper? I looked 
>>> up on 
>>> https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html
>>>  
>>> <https://ci.apache.org/projects/flink/flink-docs-release-0.8/dataset_transformations.html>
>>>  but couldn't find anything that matches.
>>> 
>>> 
>>> Thanks!
>>> 
>>> Le
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 
>> 

Reply via email to