Hi Suman,
Please try copy `*MapBundleOperator*`, update the `HashMap` to
`LinkedHashMap` to keep the output sequence consistent with input sequence.

Best,
JING ZHANG

suman shil <cncf.s...@gmail.com> 于2021年8月20日周五 上午2:23写道:

> Hi Jing,
> Thanks for looking into this. Here is the code of `TaxiFareStream'. I was
> following this link
> http://felipeogutierrez.blogspot.com/2019/04/implementing-my-own-stream-operator-in.html
> . Please let me know if there is any other way of aggregating elements
> locally.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *public class TaxiFareStream extends MapBundleOperator<Long, TaxiFare,
> TaxiFare, TaxiFare> {    private KeySelector<TaxiFare, Long> keySelector;
>   public TaxiFareStream(MapBundleFunction<Long, TaxiFare, TaxiFare,
> TaxiFare> userFunction,                          BundleTrigger<TaxiFare>
> bundleTrigger,                          KeySelector<TaxiFare, Long>
> keySelector) {        super(userFunction, bundleTrigger, keySelector);
>   this.keySelector = keySelector;    }    @Override    protected Long
> getKey(TaxiFare input) throws Exception {        return
> keySelector.getKey(input);    }}*
>
> Thanks
>
> On Thu, Aug 19, 2021 at 9:23 AM JING ZHANG <beyond1...@gmail.com> wrote:
>
>> Hi Suman,
>> Would you please provide the code about `*TaxiFareStream*`? It seems we
>> could use `MapBundleOperator` directly here.
>> BTW, I have some concerns about using the solution to do
>> local-aggregation for window aggregation because `MapBundleOperator`
>> would save input data in a bundle which is a HashMap object which could
>> not keep the data input sequence. I'm afraid there exists
>> unorder in a bundle (in your case 10) problem. I'm not sure whether it is
>> reasonable to assign a watermark based on an unordered
>> timestamp.
>>
>> Best,
>> JING ZHANG
>>
>>
>>
>> suman shil <cncf.s...@gmail.com> 于2021年8月19日周四 下午12:43写道:
>>
>>> I am trying to do pre shuffle aggregation in flink. Following is the
>>> MapBundle implementation.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *public class TaxiFareMapBundleFunction extends MapBundleFunction<Long,
>>> TaxiFare, TaxiFare, TaxiFare> {    @Override    public TaxiFare
>>> addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception {
>>>   if (value == null) {            return input;        }        value.tip =
>>> value.tip + input.tip;        return value;    }    @Override    public
>>> void finishBundle(Map<Long, TaxiFare> buffer, Collector<TaxiFare> out)
>>> throws Exception {        for (Map.Entry<Long, TaxiFare> entry :
>>> buffer.entrySet()) {            out.collect(entry.getValue());        }
>>> }}*
>>>
>>> I am using "CountBundleTrigger.java" . But the pre-shuffle aggregation
>>> is not working as the "*count*" variable is always 0. Please let me
>>> know If I am missing something.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *    @Override    public void onElement(T element) throws Exception {
>>>     count++;        if (count >= maxCount) {
>>> callback.finishBundle();            reset();        }    }*
>>>
>>> Here is the main code.
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> *        MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare>
>>> mapBundleFunction = new TaxiFareMapBundleFunction();
>>> BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(10);
>>>   KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new
>>> KeySelector<TaxiFare, Long>() {            @Override            public Long
>>> getKey(TaxiFare value) throws Exception {                return
>>> value.driverId;            }        };        DataStream<Tuple3<Long, Long,
>>> Float>> hourlyTips =//                            fares.keyBy((TaxiFare
>>> fare) -> fare.driverId)//
>>> .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new
>>> AddTips());;                fares.transform("preshuffle",
>>> TypeInformation.of(TaxiFare.class),                        new
>>> TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector
>>>             ))                        .assignTimestampsAndWatermarks(new
>>> BoundedOutOfOrdernessTimestampExtractor<TaxiFare>(Time.seconds(20)) {
>>>                       @Override                            public long
>>> extractTimestamp(TaxiFare element) {                                return
>>> element.startTime.getEpochSecond();                            }
>>>             })                        .keyBy((TaxiFare fare) ->
>>> fare.driverId)
>>> .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
>>>       .process(new AddTips());        DataStream<Tuple3<Long, Long, Float>>
>>> hourlyMax =
>>> hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);*
>>>
>>> Thanks
>>> Suman
>>>
>>

Reply via email to