An addendum

Is the element reference IN  in onElement(IN element.. ) in Trigger<IN,..>,
the same as IN the one provided to add(IN value) in Accumulator<IN,..>. It
seems that any mutations to IN in the onElement() is not visible to the
Accumulator that is carrying it as a previous element  reference albeit in
the next invocation of add(). This seems to be only in distributed mode,
which makes sense only if theses reference point to different objects.

The pipeline

.keyBy(keySelector)
.window(EventTimeSessionWindows.<IN>withGap(gap))
.trigger(new CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
.aggregate(
        new AggregateFunction<IN, ACC, OUT>() {

            @Override
            public ACC createAccumulator() {
                ACC newInstance = (ACC) accumulator.clone();
                newInstance.resetLocal();
                return newInstance;
            }

            @Override
            public void add(IN value, ACC accumulator) {

                /** This method is called before onElement of the
Trigger and keeps the reference to the last IN **/


                accumulator.add(value);

            }
            .....

   The Trigger


public class CountBasedWMAugmentationTrigger<T extends
        Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W
extends Window> extends Trigger<T, W> {


    @Override

    public TriggerResult onElement(T element, long timestamp, W
window, TriggerContext ctx) throws Exception {

        /** The element T is mutated to carry the watermark **/
        *element.setWaterMark(ctx.getCurrentWatermark());*

        .





On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <vishal.santo...@gmail.com>
wrote:

> I want to augment a POJO in  Trigger's onElement method, specifically
> supply the POJO with the watermark from the TriggerContext. The sequence of
> execution is this sequence
>
> 1. call to add() in the accumulator for the window  and save the POJO
> reference in the Accumulator.
> 2. call to onElement on Tigger
> 3. set watermark to the POJO
>
> The next add() method should have the last reference and any mutation done
> in step 3.
>
> That works in a local test case, using LocalFlinkMiniCluster, as in I have
> access to the mutation by the onElement() in the POJO in the subsequent
> add(),  but not on a distributed cluster. The specific question I had is
> whether  add() on a supplied accumulator on a window and onElement() method
> of the trigger on that window are inline executions, on the same thread or
> is there any serialization/deserialization IPC that causes these divergence
> ( local versus distributed )
>
> Regards.
>

Reply via email to