I think I got it Glad you solved this tricky issue and thanks for sharing your solution :-)
Best, Fabian 2018-01-06 14:33 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>: > Yep, this though is suboptimal as you imagined. Two things > > * <IN> has a internally has a <INLite> that is a ultra lite version of IN, > only required for the path analysis. > * Sessionization being expensive, we piggy back multiple other > aggregations that do not depend on the path or order ( count etc ) . > Essentially Session is (order path + accumulated stats). > > The code seems pretty all right and please tell me if you need a see it. > All generics so no secrets here. > > > > > > > > > On Fri, Jan 5, 2018 at 11:58 AM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Hi, >> >> you would not need the ListStateDescriptor. A WindowProcessFunction >> stores all events that are assigned to a window (IN objects in your case) >> in an internal ListState. >> The Iterable<IN> parameter of the process() method iterates over the >> internal list state. >> >> So you would have a Trigger that fires when a new watermark is received >> (or in regular intervals like every minute) and at the end of the window. >> The process() method looks up the current watermark in the Context >> object, traverses the Iterable<IN> filtering out all events with timestamp >> > watermark (you would need to enrich the events with the timestamp which >> can be done in a ProcessFunction), inserting the remaining ones into a >> sorted data structure (possibly leveraging the almost sorted nature of the >> events) and create a Session from it. >> >> This is probably less efficient than your ProcessFunction because >> process() would go over the complete list over and over again and not be >> able to persist the result of previous invocations. >> However, the code should be easier to maintain. >> >> Does that make sense? >> >> Best, Fabian >> >> 2018-01-05 17:28 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>: >> >>> Hello Fabian, Thank you for your response. >>> >>> I thought about it and may be am missing something >>> obvious here. The code below is what I think you suggest. The issue is that >>> the window now is a list of Session's ( or shall subsets of the Session). >>> >>> What is required is that on a new watermark >>> >>> * We sort these Session objects >>> * Get the subset that are before the new Watermark and an emit without >>> purge. >>> >>> I do not see how the Trigger approach helps us. It does tell us that the >>> watermark has progressed but to get a subset of the ListState that falls >>> before the watermark, we would need access to *the new value of the >>> watermark*. That was what my initial query was. >>> >>> >>> >>> public class SessionProcessWindow<IN extends HasTime & HasKey, OUT extends >>> SessionState<IN, OUT>> extends ProcessWindowFunction<IN, OUT, String, >>> TimeWindow> { >>> >>> >>> OUT toCreateNew; >>> Long gap; >>> private final ListStateDescriptor< OUT> mergingSetsStateDescriptor; >>> >>> public SessionProcessWindow(TypeInformation<OUT> aggregationResultType, >>> OUT toCreateNew) { >>> this.toCreateNew = toCreateNew; >>> mergingSetsStateDescriptor = >>> new ListStateDescriptor<>("sessions", >>> aggregationResultType); >>> } >>> @Override >>> public void process(String s, Context context, Iterable<IN> elements, >>> Collector<OUT> out) throws Exception { >>> OUT session = toCreateNew.createNew(); >>> elements.forEach(f -> session.add(f)); >>> >>> context.windowState().getListState(mergingSetsStateDescriptor).add(session); >>> } >>> } >>> >>> >>> On Fri, Jan 5, 2018 at 7:35 AM, Fabian Hueske <fhue...@gmail.com> wrote: >>> >>>> Hi Vishal, >>>> >>>> thanks for sharing your solution! >>>> >>>> Looking at this issue again and your mail in which you shared your >>>> SessionProcessWindow ProcessWindowFunction, I'm wondering why you need the >>>> ValueState that prevents the ProcessWindowFunction to be used in a >>>> mergeable window. >>>> You could have created a new Session object in each invocation of the >>>> ProcessWindowFucntion and simply keep the elements in the (mergable) list >>>> state of the window. >>>> In that case you would simply need a custom trigger that calls the >>>> ProcessWindowFunction when a new watermark arrives. For intermediate calls, >>>> you just FIRE and for the final call you FIRE_AND_PURGE to remove the >>>> elements from the window's state. >>>> Did you try that? >>>> >>>> Best, Fabian >>>> >>>> >>>> >>>> 2018-01-03 15:57 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>: >>>> >>>>> Dear Fabian, >>>>> >>>>> I was able to create a pretty functional ProcessFunction >>>>> and here is the synopsis and please see if it makes sense. >>>>> >>>>> Sessionization is unique as in it entails windows of dynamic length. >>>>> The way flink approaches is pretty simple. It will create a TimeWindow of >>>>> size "gap" relative to the event time, find an overlapping window ( >>>>> intersection ) and create a covering window. Each such window has a >>>>> "state" >>>>> associated with it, which too has to be merged when a cover window is >>>>> created on intersection of 2 or more incident windows.To be more >>>>> precise if Window1 spans ( t1, t2 ) and a new record creates a window ( >>>>> t3, >>>>> t4 ) and t1<=t3<=t2 a new Window is created ( t1, t4 ) and the >>>>> associated states are merged. >>>>> >>>>> >>>>> In the current Window API the states are external and are >>>>> Accumulator based. This approach pretty much works for all cases >>>>> where the aggregation is accumulative/reduced and does not depend on >>>>> order, as in no order list of incoming records needs to be kept and >>>>> reduction is to a single aggregated element ( think counts, min max etc). >>>>> In path analysis ( and other use cases ) however this approach has >>>>> drawbacks. Even though in our accumulator we could keep an ordered list of >>>>> events it becomes unreasonable if not within bounds. An approach that does >>>>> *attempt* to bind state, is to preemptively analyze paths using the >>>>> WM as the marker that defines the *subset* of the state that is safe >>>>> to analyze. So if we have n events in the window state and m fall before >>>>> WM, we can safely analyze the m subset, emitting paths seen and reducing >>>>> the cumulative state size. There are caveats though that I will go into >>>>> later. >>>>> >>>>> >>>>> Unfortunately the Accumulators in Flink Window runtime defaults do not >>>>> have access to the WM. >>>>> >>>>> >>>>> This lead to this generic approach ( implemented and tested ) >>>>> >>>>> >>>>> * Use a low level ProcessFunction that allows access to WM and >>>>> definitely nearer to the guts of Flink. >>>>> >>>>> >>>>> * Still use the merge Windows on intersection approach but use WM to >>>>> trigger ( through Timers) reductions in state. This is not very >>>>> dissimilar to what Flink does but we have more control over what to do and >>>>> when to do it. Essentially have exposed a lifecycle method that >>>>> reacts to WM progression. >>>>> >>>>> >>>>> * There are essentially 2 Timers. The first timer is the >>>>> maxTimeStamp() of a Window, which if there is no further mutation b'coz of >>>>> merge etc will fire to reflect a Session End. The second one is on >>>>> currentWaterMark+1 that essentially calls a "reduceToWM" on each keyed >>>>> Window and thus State. >>>>> >>>>> >>>>> * There are 2 ways to short circuit a Session 1. On Session time span >>>>> 2. On Session size. >>>>> >>>>> >>>>> * There is a safety valve to blacklist keys when it is obvious that it >>>>> is a bot ( again >>>>> >>>>> >>>>> The solution will thus preemptively push out Patterns ( and correct >>>>> patterns ) while keeping the ordered state within reasonable bounds. The >>>>> incident data of course has to be analyzed . Are the paths to large etc. >>>>> But one has full control over how to fashion the solution. >>>>> >>>>> >>>>> >>>>> >>>>> Regards and Thanks, >>>>> >>>>> >>>>> Vishal >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On Wed, Dec 27, 2017 at 10:41 AM, Vishal Santoshi < >>>>> vishal.santo...@gmail.com> wrote: >>>>> >>>>>> This makes sense. Thanks. >>>>>> >>>>>> On Sat, Dec 23, 2017 at 10:58 AM, Fabian Hueske <fhue...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi, >>>>>>> >>>>>>> all calls to onElement() or onTimer() are syncronized for any keys. >>>>>>> Think of a single thread calling these methods. >>>>>>> Event-time timers are called when a watermark passes the timer. >>>>>>> Watermarks are received as special records, so the methods are called in >>>>>>> the same order as records (actual records or watermarks) arrive at the >>>>>>> function. Only for processing-time timers, actual synchronization is >>>>>>> required. >>>>>>> >>>>>>> The NPE might be thrown because of two timers that fire one after >>>>>>> the other without a new record being processed in between the onTimer() >>>>>>> calls. In that case the state is cleared in the first call and null in >>>>>>> the >>>>>>> second. >>>>>>> >>>>>>> Best, Fabian >>>>>>> >>>>>>> 2017-12-23 16:36 GMT+01:00 Vishal Santoshi < >>>>>>> vishal.santo...@gmail.com>: >>>>>>> >>>>>>>> Thanks. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> I have a few follow up questions regarding ProcessFunction. I >>>>>>>> think that the core should take care of any synchronization issues >>>>>>>> between >>>>>>>> calls to onElement and onTimer in case of a keyed stream but tests do >>>>>>>> not >>>>>>>> seem to suggest that. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> I have specifically 2 questions. >>>>>>>> >>>>>>>> >>>>>>>> 1. Are calls to onElement(..) single threaded if scoped to a key >>>>>>>> ? As in on a keyed stream, is there a way that 2 or more threads >>>>>>>> can execute on the more than one element of a single key at one time ? >>>>>>>> Would I have to synchronize this construction >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> *OUT accumulator = accumulatorState.value(); if (accumulator == >>>>>>>> null) { accumulator = acc.createNew(); }* >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> 2. Can concurrent calls happen onTimer(..) and onElement(..) for >>>>>>>> the same key ? I intend to clean up state but I see NullPointers >>>>>>>> in OnTimer(..) thrown and I presume it is b'coz the onElement and >>>>>>>> onTimer >>>>>>>> are executed on 2 separate threads, with on Timer removing the >>>>>>>> state ( clear() ) but after another thread has registered a Timer ( in >>>>>>>> onElement ). >>>>>>>> >>>>>>>> >>>>>>>> if (timestamp == accumulator.getLastModified() + gap) {* // >>>>>>>> NullPointers on Race Conditions* >>>>>>>> accumulatorState.clear(); >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> PS. This is the full code. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> @Override >>>>>>>> public void processElement(IN event, Context context, Collector<OUT> >>>>>>>> out) throws Exception { >>>>>>>> TimerService timerService = context.timerService(); >>>>>>>> if (context.timestamp() > timerService.currentWatermark()) { >>>>>>>> OUT accumulator = accumulatorState.value(); >>>>>>>> if (accumulator == null) { >>>>>>>> accumulator = acc.createNew(); >>>>>>>> } >>>>>>>> accumulator.setLastModified(context.timestamp()); >>>>>>>> accumulatorState.update(accumulator); >>>>>>>> timerService.registerEventTimeTimer(context.timestamp() + gap); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> @Override >>>>>>>> public void onTimer(long timestamp, OnTimerContext context, >>>>>>>> Collector<OUT> out) throws Exception { >>>>>>>> OUT accumulator = accumulatorState.value(); >>>>>>>> if (timestamp == accumulator.getLastModified() + gap) {* // >>>>>>>> NullPointers on Race Conditions* >>>>>>>> accumulatorState.clear(); >>>>>>>> } >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske <fhue...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> That's correct. Removal of timers is not supported in >>>>>>>>> ProcessFunction. Not sure why this is supported for Triggers. >>>>>>>>> The common workaround for ProcessFunctions is to register multiple >>>>>>>>> timers and have a ValueState that stores the valid timestamp on which >>>>>>>>> the >>>>>>>>> onTimer method should be executed. >>>>>>>>> When a timer fires and calls onTimer(), the method first checks >>>>>>>>> whether the timestamp is the correct one and leaves the method if >>>>>>>>> that is >>>>>>>>> not the case. >>>>>>>>> If you want to fire on the next watermark, another trick is to >>>>>>>>> register multiple timers on (currentWatermark + 1). Since there is >>>>>>>>> only one >>>>>>>>> timer per timestamp, there is only one timer which gets continuously >>>>>>>>> overwritten. The timer is called when the watermark is advanced. >>>>>>>>> >>>>>>>>> On the performance of the timer service. AFAIK, all methods that >>>>>>>>> work with some kind of timer use this service. So there is not much >>>>>>>>> choice. >>>>>>>>> >>>>>>>>> 2017-12-20 22:36 GMT+01:00 Vishal Santoshi < >>>>>>>>> vishal.santo...@gmail.com>: >>>>>>>>> >>>>>>>>>> And that further begs the question.. how performant is Timer >>>>>>>>>> Service. I tried to peruse through the architecture behind it but >>>>>>>>>> cold not >>>>>>>>>> find a definite clue. Is it a Scheduled Service and if yes how many >>>>>>>>>> threads >>>>>>>>>> etc... >>>>>>>>>> >>>>>>>>>> On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi < >>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Makes sense. Did a first stab at Using ProcessFunction. The >>>>>>>>>>> TimeService exposed by the Context does not have remove timer. Is it >>>>>>>>>>> primarily b'coz A Priority Queue is the storage ad remove from a >>>>>>>>>>> PriorityQueue is expensive ? Trigger Context does expose another >>>>>>>>>>> version >>>>>>>>>>> that has removal abilities so was wondering why this dissonance... >>>>>>>>>>> >>>>>>>>>>> On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske < >>>>>>>>>>> fhue...@gmail.com> wrote: >>>>>>>>>>> >>>>>>>>>>>> Hi Vishal, >>>>>>>>>>>> >>>>>>>>>>>> it is not guaranteed that add() and onElement() receive the >>>>>>>>>>>> same object, and even if they do it is not guaranteed that a >>>>>>>>>>>> mutation of >>>>>>>>>>>> the object in onElement() has an effect. The object might have been >>>>>>>>>>>> serialized and stored in RocksDB. >>>>>>>>>>>> Hence, elements should not be modified in onElement(). >>>>>>>>>>>> >>>>>>>>>>>> Have you considered to implement the operation completely in a >>>>>>>>>>>> ProcessFunction instead of a session window? >>>>>>>>>>>> This might be more code but easier to design and reason about >>>>>>>>>>>> because there is no interaction of window assigner, trigger, and >>>>>>>>>>>> window >>>>>>>>>>>> function. >>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>>> 2017-12-18 20:49 GMT+01:00 Vishal Santoshi < >>>>>>>>>>>> vishal.santo...@gmail.com>: >>>>>>>>>>>> >>>>>>>>>>>>> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9 >>>>>>>>>>>>> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/o >>>>>>>>>>>>> rg/apache/flink/streaming/runtime/operators/windowing/Window >>>>>>>>>>>>> Operator.java#L362 >>>>>>>>>>>>> >>>>>>>>>>>>> is where We could fashion as to what is emitted. Again for us >>>>>>>>>>>>> it seems natural to use WM to materialize a micro batches with >>>>>>>>>>>>> "approximate" order ( and no I am not a fan of spark micro >>>>>>>>>>>>> batches :)). Any >>>>>>>>>>>>> pointers as to how we could write an implementation that allows >>>>>>>>>>>>> for "up >>>>>>>>>>>>> till WM emission" through a trigger on a Session Window would be >>>>>>>>>>>>> very >>>>>>>>>>>>> helpful. In essence I believe that for any "funnel" analysis it >>>>>>>>>>>>> is crucial. >>>>>>>>>>>>> >>>>>>>>>>>>> Something like https://github.com/apache >>>>>>>>>>>>> /flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-s >>>>>>>>>>>>> treaming-java/src/main/java/org/apache/flink/streaming/runti >>>>>>>>>>>>> me/operators/windowing/EvictingWindowOperator.java#L346 >>>>>>>>>>>>> >>>>>>>>>>>>> I know I am simplifying this and there has to be more to it... >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>>> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi < >>>>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>>>> >>>>>>>>>>>>>> The Trigger in this case would be some CountBased Trigger.... >>>>>>>>>>>>>> Again the motive is the keep the state lean as we desire to >>>>>>>>>>>>>> search for >>>>>>>>>>>>>> patterns, sorted on even time, in the incoming sessionized ( >>>>>>>>>>>>>> and thus of >>>>>>>>>>>>>> un deterministic length ) stream.... >>>>>>>>>>>>>> >>>>>>>>>>>>>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi < >>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>>>>> >>>>>>>>>>>>>>> For example, this would have worked perfect if it did not >>>>>>>>>>>>>>> complain about MergeableWindow and state. The Session class in >>>>>>>>>>>>>>> this >>>>>>>>>>>>>>> encapsulates the trim up to watermark behavior ( reduce call >>>>>>>>>>>>>>> after telling >>>>>>>>>>>>>>> it the current WM ) we desire >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> public class SessionProcessWindow extends >>>>>>>>>>>>>>> ProcessWindowFunction<Event, Session, String, TimeWindow> { >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> private static final ValueStateDescriptor<Session> >>>>>>>>>>>>>>> sessionState = new ValueStateDescriptor<>("session", >>>>>>>>>>>>>>> Session.class); >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>> public void process(String key, Context context, >>>>>>>>>>>>>>> Iterable<Event> elements, Collector<Session> out) throws >>>>>>>>>>>>>>> Exception { >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> ValueState<Session> session = >>>>>>>>>>>>>>> context.windowState().getState(sessionState); >>>>>>>>>>>>>>> Session s = session.value() != null ? session.value() : >>>>>>>>>>>>>>> new Session(); >>>>>>>>>>>>>>> for (Event e : elements) { >>>>>>>>>>>>>>> s.add(e); >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> s.lastWaterMarkedEventLite.serverTime = >>>>>>>>>>>>>>> context.currentWatermark(); >>>>>>>>>>>>>>> s.reduce(); >>>>>>>>>>>>>>> out.collect(s); >>>>>>>>>>>>>>> session.update(s); >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>> public void clear(Context context){ >>>>>>>>>>>>>>> ValueState<Session> session = >>>>>>>>>>>>>>> context.windowState().getState(sessionState); >>>>>>>>>>>>>>> session.clear(); >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> } >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi < >>>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Hello Fabian, Thank you for the response. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> I think that does not work, as it is the WM of the Window >>>>>>>>>>>>>>>> Operator is what is desired to make deterministic decisions >>>>>>>>>>>>>>>> rather than off >>>>>>>>>>>>>>>> an operator the precedes the Window ? This is doable using >>>>>>>>>>>>>>>> ProcessWindowFunction using state but only in the case of non >>>>>>>>>>>>>>>> mergeable >>>>>>>>>>>>>>>> windows. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> The best API option I think is a TimeBaseTrigger that >>>>>>>>>>>>>>>> fires every configured time progression of WM and a Window >>>>>>>>>>>>>>>> implementation >>>>>>>>>>>>>>>> that materializes *only data up till that WM* ( it might >>>>>>>>>>>>>>>> have more data but that data has event time grater than the WM >>>>>>>>>>>>>>>> ). I am not >>>>>>>>>>>>>>>> sure we have that built in option and thus was asking for an >>>>>>>>>>>>>>>> access the >>>>>>>>>>>>>>>> current WM for the window operator to allow us handle the >>>>>>>>>>>>>>>> "*only >>>>>>>>>>>>>>>> data up till that WM" *range retrieval using some custom >>>>>>>>>>>>>>>> data structure. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> Regards. >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske < >>>>>>>>>>>>>>>> fhue...@gmail.com> wrote: >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Hi Vishal, >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> the Trigger is not designed to augment records but just to >>>>>>>>>>>>>>>>> control when a window is evaluated. >>>>>>>>>>>>>>>>> I would recommend to use a ProcessFunction to enrich >>>>>>>>>>>>>>>>> records with the current watermark before passing them into >>>>>>>>>>>>>>>>> the window >>>>>>>>>>>>>>>>> operator. >>>>>>>>>>>>>>>>> The context object of the processElement() method gives >>>>>>>>>>>>>>>>> access to the current watermark and timestamp of a record. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Please note that watermarks are not deterministic but may >>>>>>>>>>>>>>>>> depend on the order in which parallel inputs are consumed by >>>>>>>>>>>>>>>>> an operator. >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> Best, Fabian >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi < >>>>>>>>>>>>>>>>> vishal.santo...@gmail.com>: >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> An addendum >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> Is the element reference IN in onElement(IN element.. ) >>>>>>>>>>>>>>>>>> in Trigger<IN,..>, the same as IN the one provided to add(IN >>>>>>>>>>>>>>>>>> value) in Accumulator<IN,..>. It seems that any >>>>>>>>>>>>>>>>>> mutations to IN in the onElement() is not visible to the >>>>>>>>>>>>>>>>>> Accumulator that >>>>>>>>>>>>>>>>>> is carrying it as a previous element reference albeit in >>>>>>>>>>>>>>>>>> the next >>>>>>>>>>>>>>>>>> invocation of add(). This seems to be only in distributed >>>>>>>>>>>>>>>>>> mode, which makes >>>>>>>>>>>>>>>>>> sense only if theses reference point to different objects. >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> The pipeline >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> .keyBy(keySelector) >>>>>>>>>>>>>>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap)) >>>>>>>>>>>>>>>>>> .trigger(new >>>>>>>>>>>>>>>>>> CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount)) >>>>>>>>>>>>>>>>>> .aggregate( >>>>>>>>>>>>>>>>>> new AggregateFunction<IN, ACC, OUT>() { >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>>>> public ACC createAccumulator() { >>>>>>>>>>>>>>>>>> ACC newInstance = (ACC) accumulator.clone(); >>>>>>>>>>>>>>>>>> newInstance.resetLocal(); >>>>>>>>>>>>>>>>>> return newInstance; >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>>>> public void add(IN value, ACC accumulator) { >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> /** This method is called before onElement >>>>>>>>>>>>>>>>>> of the Trigger and keeps the reference to the last IN **/ >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> accumulator.add(value); >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> } >>>>>>>>>>>>>>>>>> ..... >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> The Trigger >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> public class CountBasedWMAugmentationTrigger<T extends >>>>>>>>>>>>>>>>>> Serializable & >>>>>>>>>>>>>>>>>> CountBasedWMAugmentationTrigger.HasWaterMark, W extends >>>>>>>>>>>>>>>>>> Window> extends Trigger<T, W> { >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> @Override >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> public TriggerResult onElement(T element, long >>>>>>>>>>>>>>>>>> timestamp, W window, TriggerContext ctx) throws Exception { >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> /** The element T is mutated to carry the watermark >>>>>>>>>>>>>>>>>> **/ >>>>>>>>>>>>>>>>>> *element.setWaterMark(ctx.getCurrentWatermark());* >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> . >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi < >>>>>>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote: >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> I want to augment a POJO in Trigger's onElement method, >>>>>>>>>>>>>>>>>>> specifically supply the POJO with the watermark from the >>>>>>>>>>>>>>>>>>> TriggerContext. >>>>>>>>>>>>>>>>>>> The sequence of execution is this sequence >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> 1. call to add() in the accumulator for the window and >>>>>>>>>>>>>>>>>>> save the POJO reference in the Accumulator. >>>>>>>>>>>>>>>>>>> 2. call to onElement on Tigger >>>>>>>>>>>>>>>>>>> 3. set watermark to the POJO >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> The next add() method should have the last reference and >>>>>>>>>>>>>>>>>>> any mutation done in step 3. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> That works in a local test case, using >>>>>>>>>>>>>>>>>>> LocalFlinkMiniCluster, as in I have access to the mutation >>>>>>>>>>>>>>>>>>> by the >>>>>>>>>>>>>>>>>>> onElement() in the POJO in the subsequent add(), but not >>>>>>>>>>>>>>>>>>> on a distributed >>>>>>>>>>>>>>>>>>> cluster. The specific question I had is whether add() on a >>>>>>>>>>>>>>>>>>> supplied >>>>>>>>>>>>>>>>>>> accumulator on a window and onElement() method of the >>>>>>>>>>>>>>>>>>> trigger on that >>>>>>>>>>>>>>>>>>> window are inline executions, on the same thread or is >>>>>>>>>>>>>>>>>>> there any >>>>>>>>>>>>>>>>>>> serialization/deserialization IPC that causes these >>>>>>>>>>>>>>>>>>> divergence ( local >>>>>>>>>>>>>>>>>>> versus distributed ) >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>>> Regards. >>>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>>> >>>>>>>>>>>>>>>> >>>>>>>>>>>>>>> >>>>>>>>>>>>>> >>>>>>>>>>>>> >>>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >