Hi Jing,
I tried using `*MapBundleOperator*` also (I am yet to test with
LinkedHashMap) . But I am always seeing that the following code of `
*AbstractMapBundleOperator.java*`  `*numOfElements` *is always 0. It is
never getting incremented. I replaced `*TaxiFareStream*` with `
*MapBundleOperator*` in the above code. It should increment by 1 each time
an element is processed but that is not happening.
















*    public void processElement(StreamRecord<IN> element) throws Exception
{        // get the key and value for the map bundle        final IN input
= element.getValue();        final K bundleKey = getKey(input);
final V bundleValue = bundle.get(bundleKey);        // get a new value
after adding this element to bundle        final V newBundleValue =
function.addInput(bundleValue, input);        // update to map bundle
  bundle.put(bundleKey, newBundleValue);        numOfElements++;
bundleTrigger.onElement(input);    }*

One more question, you mentioned that I need to test with `*LinkedHashMap*`
instead of `*HashMap*`. Where should I make this change? Do I need to
create a class which extends from `MapBundleOperator` and add it there?

Thanks


On Thu, Aug 19, 2021 at 9:58 PM JING ZHANG <beyond1...@gmail.com> wrote:

> Hi Suman,
> Please try copy `*MapBundleOperator*`, update the `HashMap` to
> `LinkedHashMap` to keep the output sequence consistent with input sequence.
>
> Best,
> JING ZHANG
>
> suman shil <cncf.s...@gmail.com> 于2021年8月20日周五 上午2:23写道:
>
>> Hi Jing,
>> Thanks for looking into this. Here is the code of `TaxiFareStream'. I was
>> following this link
>> http://felipeogutierrez.blogspot.com/2019/04/implementing-my-own-stream-operator-in.html
>> . Please let me know if there is any other way of aggregating elements
>> locally.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> *public class TaxiFareStream extends MapBundleOperator<Long, TaxiFare,
>> TaxiFare, TaxiFare> {    private KeySelector<TaxiFare, Long> keySelector;
>>   public TaxiFareStream(MapBundleFunction<Long, TaxiFare, TaxiFare,
>> TaxiFare> userFunction,                          BundleTrigger<TaxiFare>
>> bundleTrigger,                          KeySelector<TaxiFare, Long>
>> keySelector) {        super(userFunction, bundleTrigger, keySelector);
>>   this.keySelector = keySelector;    }    @Override    protected Long
>> getKey(TaxiFare input) throws Exception {        return
>> keySelector.getKey(input);    }}*
>>
>> Thanks
>>
>> On Thu, Aug 19, 2021 at 9:23 AM JING ZHANG <beyond1...@gmail.com> wrote:
>>
>>> Hi Suman,
>>> Would you please provide the code about `*TaxiFareStream*`? It seems we
>>> could use `MapBundleOperator` directly here.
>>> BTW, I have some concerns about using the solution to do
>>> local-aggregation for window aggregation because `MapBundleOperator`
>>> would save input data in a bundle which is a HashMap object which could
>>> not keep the data input sequence. I'm afraid there exists
>>> unorder in a bundle (in your case 10) problem. I'm not sure whether it
>>> is reasonable to assign a watermark based on an unordered
>>> timestamp.
>>>
>>> Best,
>>> JING ZHANG
>>>
>>>
>>>
>>> suman shil <cncf.s...@gmail.com> 于2021年8月19日周四 下午12:43写道:
>>>
>>>> I am trying to do pre shuffle aggregation in flink. Following is the
>>>> MapBundle implementation.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *public class TaxiFareMapBundleFunction extends MapBundleFunction<Long,
>>>> TaxiFare, TaxiFare, TaxiFare> {    @Override    public TaxiFare
>>>> addInput(@Nullable TaxiFare value, TaxiFare input) throws Exception {
>>>>   if (value == null) {            return input;        }        value.tip =
>>>> value.tip + input.tip;        return value;    }    @Override    public
>>>> void finishBundle(Map<Long, TaxiFare> buffer, Collector<TaxiFare> out)
>>>> throws Exception {        for (Map.Entry<Long, TaxiFare> entry :
>>>> buffer.entrySet()) {            out.collect(entry.getValue());        }
>>>> }}*
>>>>
>>>> I am using "CountBundleTrigger.java" . But the pre-shuffle aggregation
>>>> is not working as the "*count*" variable is always 0. Please let me
>>>> know If I am missing something.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *    @Override    public void onElement(T element) throws Exception {
>>>>       count++;        if (count >= maxCount) {
>>>> callback.finishBundle();            reset();        }    }*
>>>>
>>>> Here is the main code.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> *        MapBundleFunction<Long, TaxiFare, TaxiFare, TaxiFare>
>>>> mapBundleFunction = new TaxiFareMapBundleFunction();
>>>> BundleTrigger<TaxiFare> bundleTrigger = new CountBundleTrigger<>(10);
>>>>   KeySelector<TaxiFare, Long> taxiFareLongKeySelector = new
>>>> KeySelector<TaxiFare, Long>() {            @Override            public Long
>>>> getKey(TaxiFare value) throws Exception {                return
>>>> value.driverId;            }        };        DataStream<Tuple3<Long, Long,
>>>> Float>> hourlyTips =//                            fares.keyBy((TaxiFare
>>>> fare) -> fare.driverId)//
>>>> .window(TumblingEventTimeWindows.of(Time.hours(1))).process(new
>>>> AddTips());;                fares.transform("preshuffle",
>>>> TypeInformation.of(TaxiFare.class),                        new
>>>> TaxiFareStream(mapBundleFunction, bundleTrigger, taxiFareLongKeySelector
>>>>             ))                        .assignTimestampsAndWatermarks(new
>>>> BoundedOutOfOrdernessTimestampExtractor<TaxiFare>(Time.seconds(20)) {
>>>>                       @Override                            public long
>>>> extractTimestamp(TaxiFare element) {                                return
>>>> element.startTime.getEpochSecond();                            }
>>>>             })                        .keyBy((TaxiFare fare) ->
>>>> fare.driverId)
>>>> .window(TumblingProcessingTimeWindows.of(Time.minutes(1)))
>>>>       .process(new AddTips());        DataStream<Tuple3<Long, Long, Float>>
>>>> hourlyMax =
>>>> hourlyTips.windowAll(TumblingEventTimeWindows.of(Time.hours(1))).maxBy(2);*
>>>>
>>>> Thanks
>>>> Suman
>>>>
>>>

Reply via email to