Hi Suman,
Would you please provide the code about `*TaxiFareStream*`? It seems we
could use `MapBundleOperator` directly here.
BTW, I have some concerns about using the solution to do local-aggregation
for window aggregation because `MapBundleOperator`
would save input data in a bundle which is a HashMap object which could not
keep the data input sequence. I'm afraid there exists
unorder in a bundle (in your case 10) problem. I'm not sure whether it is
reasonable to assign a watermark based on an unordered
timestamp.

Best,
JING ZHANG



suman shil <cncf.s...@gmail.com> 于2021年8月19日周四 下午12:43写道:

> I am trying to do pre shuffle aggregation in flink. Following is the
> MapBundle implementation.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *public class TaxiFareMapBundleFunction extends MapBundleFunction<Long,
> TaxiFare, TaxiFare, TaxiFare> {    @Override    public TaxiFare
> addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception {
>   if (value == null) {            return input;        }        value.tip =
> value.tip + input.tip;        return value;    }    @Override    public
> void finishBundle(Map<Long, TaxiFare> buffer, Collector<TaxiFare> out)
> throws Exception {        for (Map.Entry<Long, TaxiFare> entry :
> buffer.entrySet()) {            out.collect(entry.getValue());        }
> }}*
>
> I am using "CountBundleTrigger.java" . But the pre-shuffle aggregation is
> not working as the "*count*" variable is always 0. Please let me know If
> I am missing something.
>
>
>
>
>
>
>
>
> *    @Override    public void onElement(T element) throws Exception {
>   count++;        if (count >= maxCount) {
> callback.finishBundle();            reset();        }    }*
>
> Here is the main code.
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *        MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare>
> mapBundleFunction = new TaxiFareMapBundleFunction();
> BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(10);
>   KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new
> KeySelector<TaxiFare, Long>() {            @Override            public Long
> getKey(TaxiFare value) throws Exception {                return
> value.driverId;            }        };        DataStream<Tuple3<Long, Long,
> Float>> hourlyTips =//                            fares.keyBy((TaxiFare
> fare) -> fare.driverId)//
> .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new
> AddTips());;                fares.transform("preshuffle",
> TypeInformation.of(TaxiFare.class),                        new
> TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector
>             ))                        .assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor<TaxiFare>(Time.seconds(20)) {
>                       @Override                            public long
> extractTimestamp(TaxiFare element) {                                return
> element.startTime.getEpochSecond();                            }
>             })                        .keyBy((TaxiFare fare) ->
> fare.driverId)
> .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
>       .process(new AddTips());        DataStream<Tuple3<Long, Long, Float>>
> hourlyMax =
> hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);*
>
> Thanks
> Suman
>

Reply via email to