I am trying to do pre shuffle aggregation in flink. Following is the
MapBundle implementation.



















*public class TaxiFareMapBundleFunction extends MapBundleFunction<Long,
TaxiFare, TaxiFare, TaxiFare> {    @Override    public TaxiFare
addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception {
  if (value == null) {            return input;        }        value.tip =
value.tip + input.tip;        return value;    }    @Override    public
void finishBundle(Map<Long, TaxiFare> buffer, Collector<TaxiFare> out)
throws Exception {        for (Map.Entry<Long, TaxiFare> entry :
buffer.entrySet()) {            out.collect(entry.getValue());        }
}}*

I am using "CountBundleTrigger.java" . But the pre-shuffle aggregation is
not working as the "*count*" variable is always 0. Please let me know If I
am missing something.








*    @Override    public void onElement(T element) throws Exception {
  count++;        if (count >= maxCount) {
callback.finishBundle();            reset();        }    }*

Here is the main code.


























*        MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare>
mapBundleFunction = new TaxiFareMapBundleFunction();
BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(10);
  KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new
KeySelector<TaxiFare, Long>() {            @Override            public Long
getKey(TaxiFare value) throws Exception {                return
value.driverId;            }        };        DataStream<Tuple3<Long, Long,
Float>> hourlyTips =//                            fares.keyBy((TaxiFare
fare) -> fare.driverId)//
.window(TumblingEventTimeWindows.of(Time.hours(1))).process(new
AddTips());;                fares.transform("preshuffle",
TypeInformation.of(TaxiFare.class),                        new
TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector
            ))                        .assignTimestampsAndWatermarks(new
BoundedOutOfOrdernessTimestampExtractor<TaxiFare>(Time.seconds(20)) {
                      @Override                            public long
extractTimestamp(TaxiFare element) {                                return
element.startTime.getEpochSecond();                            }
            })                        .keyBy((TaxiFare fare) ->
fare.driverId)
.window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
      .process(new AddTips());        DataStream<Tuple3<Long, Long, Float>>
hourlyMax =
hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);*

Thanks
Suman

Reply via email to