I think I got it
Glad you solved this tricky issue and thanks for sharing your solution :-)

Best, Fabian


2018-01-06 14:33 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:

> Yep, this though is suboptimal as you imagined.   Two things
>
> * <IN> has a internally has a <INLite> that is a ultra lite version of IN,
> only required for the path analysis.
> * Sessionization being expensive, we piggy back multiple other
> aggregations that do not depend on the path or order ( count etc ) .
> Essentially Session is (order path + accumulated stats).
>
> The code seems pretty all right and please tell me if you need a see it.
> All generics so no secrets here.
>
>
>
>
>
>
>
>
> On Fri, Jan 5, 2018 at 11:58 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>
>> Hi,
>>
>> you would not need the ListStateDescriptor. A WindowProcessFunction
>> stores all events that are assigned to a window (IN objects in your case)
>> in an internal ListState.
>> The Iterable<IN> parameter of the process() method iterates over the
>> internal list state.
>>
>> So you would have a Trigger that fires when a new watermark is received
>> (or in regular intervals like every minute) and at the end of the window.
>> The process() method looks up the current watermark in the Context
>> object, traverses the Iterable<IN> filtering out all events with timestamp
>> > watermark (you would need to enrich the events with the timestamp which
>> can be done in a ProcessFunction), inserting the remaining ones into a
>> sorted data structure (possibly leveraging the almost sorted nature of the
>> events) and create a Session from it.
>>
>> This is probably less efficient than your ProcessFunction because
>> process() would go over the complete list over and over again and not be
>> able to persist the result of previous invocations.
>> However, the code should be easier to maintain.
>>
>> Does that make sense?
>>
>> Best, Fabian
>>
>> 2018-01-05 17:28 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:
>>
>>> Hello Fabian, Thank you for your response.
>>>
>>>                      I thought about it and may be am missing something
>>> obvious here. The code below is what I think you suggest. The issue is that
>>> the window now is a list of Session's ( or shall subsets of the Session).
>>>
>>> What is required is that on a new watermark
>>>
>>> * We sort these Session objects
>>> * Get the subset that are before the new Watermark and an emit without
>>> purge.
>>>
>>> I do not see how the Trigger approach helps us. It does tell us that the
>>> watermark has progressed but to get a subset of the ListState that falls
>>> before the watermark, we would need access to *the new value  of the
>>> watermark*. That was what my initial query was.
>>>
>>>
>>>
>>> public class SessionProcessWindow<IN extends HasTime & HasKey, OUT extends 
>>> SessionState<IN, OUT>> extends ProcessWindowFunction<IN, OUT, String, 
>>> TimeWindow> {
>>>
>>>
>>>     OUT toCreateNew;
>>>     Long gap;
>>>     private final ListStateDescriptor< OUT> mergingSetsStateDescriptor;
>>>
>>>     public SessionProcessWindow(TypeInformation<OUT> aggregationResultType,
>>>                                 OUT toCreateNew) {
>>>         this.toCreateNew = toCreateNew;
>>>         mergingSetsStateDescriptor =
>>>                 new ListStateDescriptor<>("sessions", 
>>> aggregationResultType);
>>>     }
>>>     @Override
>>>     public void process(String s, Context context, Iterable<IN> elements, 
>>> Collector<OUT> out) throws Exception {
>>>         OUT session = toCreateNew.createNew();
>>>         elements.forEach(f -> session.add(f));
>>>         
>>> context.windowState().getListState(mergingSetsStateDescriptor).add(session);
>>>     }
>>> }
>>>
>>>
>>> On Fri, Jan 5, 2018 at 7:35 AM, Fabian Hueske <fhue...@gmail.com> wrote:
>>>
>>>> Hi Vishal,
>>>>
>>>> thanks for sharing your solution!
>>>>
>>>> Looking at this issue again and your mail in which you shared your
>>>> SessionProcessWindow ProcessWindowFunction, I'm wondering why you need the
>>>> ValueState that prevents the ProcessWindowFunction to be used in a
>>>> mergeable window.
>>>> You could have created a new Session object in each invocation of the
>>>> ProcessWindowFucntion and simply keep the elements in the (mergable) list
>>>> state of the window.
>>>> In that case you would simply need a custom trigger that calls the
>>>> ProcessWindowFunction when a new watermark arrives. For intermediate calls,
>>>> you just FIRE and for the final call you FIRE_AND_PURGE to remove the
>>>> elements from the window's state.
>>>> Did you try that?
>>>>
>>>> Best, Fabian
>>>>
>>>>
>>>>
>>>> 2018-01-03 15:57 GMT+01:00 Vishal Santoshi <vishal.santo...@gmail.com>:
>>>>
>>>>> Dear Fabian,
>>>>>
>>>>>            I was able to create a pretty functional ProcessFunction
>>>>> and here is the synopsis and please see if it makes sense.
>>>>>
>>>>> Sessionization is unique as in it entails windows of dynamic length.
>>>>> The way flink approaches is pretty simple. It will create a TimeWindow of
>>>>> size "gap" relative to the event time, find an overlapping window (
>>>>> intersection ) and create a covering window. Each such window has a 
>>>>> "state"
>>>>> associated with it, which too has to be merged when a cover window is
>>>>> created on intersection of 2 or more incident windows.To be more
>>>>> precise if Window1 spans ( t1, t2 ) and a new record creates a window ( 
>>>>> t3,
>>>>> t4 ) and  t1<=t3<=t2 a new Window is created ( t1, t4 ) and the
>>>>> associated states are merged.
>>>>>
>>>>>
>>>>> In the current Window API the states are external and are
>>>>> Accumulator based. This approach pretty much works for all cases
>>>>> where the aggregation is accumulative/reduced  and does not depend on
>>>>> order, as in no order list of incoming records needs to be kept and
>>>>> reduction is to a single aggregated element ( think counts, min max etc).
>>>>> In path analysis ( and other use cases ) however this approach has
>>>>> drawbacks. Even though in our accumulator we could keep an ordered list of
>>>>> events it becomes unreasonable if not within bounds. An approach that does
>>>>> *attempt* to bind state, is to preemptively analyze paths using the
>>>>> WM as the marker that defines the *subset* of the state that is safe
>>>>> to analyze. So if we have n events in the window state and m fall before
>>>>> WM, we can safely analyze the m subset, emitting paths seen and reducing
>>>>> the cumulative state size. There are caveats though that I will go into
>>>>> later.
>>>>>
>>>>>
>>>>> Unfortunately the Accumulators in Flink Window runtime defaults do not
>>>>> have access to the WM.
>>>>>
>>>>>
>>>>> This lead to this generic approach  ( implemented and tested )
>>>>>
>>>>>
>>>>> * Use a low level ProcessFunction that allows access to WM and
>>>>> definitely nearer to the guts of Flink.
>>>>>
>>>>>
>>>>> * Still use the merge Windows on intersection approach but use WM to
>>>>> trigger ( through Timers)  reductions in state. This is not very
>>>>> dissimilar to what Flink does but we have more control over what to do and
>>>>> when to do it. Essentially have exposed a lifecycle method that
>>>>> reacts to WM progression.
>>>>>
>>>>>
>>>>> * There are essentially 2 Timers. The first timer is the
>>>>> maxTimeStamp() of a Window, which if there is no further mutation b'coz of
>>>>> merge etc will fire to reflect a Session End. The second one is  on
>>>>> currentWaterMark+1 that essentially calls a "reduceToWM" on each keyed
>>>>> Window and thus State.
>>>>>
>>>>>
>>>>> * There are 2 ways to short circuit a Session 1. On Session time span
>>>>> 2. On Session size.
>>>>>
>>>>>
>>>>> * There is a safety valve to blacklist keys when it is obvious that it
>>>>> is a bot ( again
>>>>>
>>>>>
>>>>> The solution will thus preemptively push out Patterns ( and correct
>>>>> patterns ) while keeping the ordered state within reasonable bounds. The
>>>>> incident data of course has to be analyzed . Are the paths to large etc.
>>>>> But one has full control over how to fashion the solution.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Regards and Thanks,
>>>>>
>>>>>
>>>>> Vishal
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Wed, Dec 27, 2017 at 10:41 AM, Vishal Santoshi <
>>>>> vishal.santo...@gmail.com> wrote:
>>>>>
>>>>>> This makes sense.  Thanks.
>>>>>>
>>>>>> On Sat, Dec 23, 2017 at 10:58 AM, Fabian Hueske <fhue...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>>
>>>>>>> all calls to onElement() or onTimer() are syncronized for any keys.
>>>>>>> Think of a single thread calling these methods.
>>>>>>> Event-time timers are called when a watermark passes the timer.
>>>>>>> Watermarks are received as special records, so the methods are called in
>>>>>>> the same order as records (actual records or watermarks) arrive at the
>>>>>>> function. Only for processing-time timers, actual synchronization is
>>>>>>> required.
>>>>>>>
>>>>>>> The NPE might be thrown because of two timers that fire one after
>>>>>>> the other without a new record being processed in between the onTimer()
>>>>>>> calls. In that case the state is cleared in the first call and null in 
>>>>>>> the
>>>>>>> second.
>>>>>>>
>>>>>>> Best, Fabian
>>>>>>>
>>>>>>> 2017-12-23 16:36 GMT+01:00 Vishal Santoshi <
>>>>>>> vishal.santo...@gmail.com>:
>>>>>>>
>>>>>>>> Thanks.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>     I have a few follow up questions regarding ProcessFunction. I
>>>>>>>> think that the core should take care of any synchronization issues 
>>>>>>>> between
>>>>>>>> calls to onElement and onTimer in case of a keyed stream but tests do 
>>>>>>>> not
>>>>>>>> seem to suggest that.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> I have  specifically 2 questions.
>>>>>>>>
>>>>>>>>
>>>>>>>> 1.  Are calls  to onElement(..) single threaded if scoped to a key
>>>>>>>> ? As in on a keyed stream, is there a  way that 2 or more threads
>>>>>>>> can execute on the more than one element of a single key at one time ?
>>>>>>>> Would I have to synchronize this construction
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *OUT accumulator = accumulatorState.value();        if (accumulator == 
>>>>>>>> null) {            accumulator = acc.createNew();        }*
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> 2. Can concurrent calls happen  onTimer(..) and onElement(..) for
>>>>>>>> the same key ? I intend to clean up state but I see  NullPointers
>>>>>>>> in OnTimer(..) thrown and I presume it is b'coz the onElement and 
>>>>>>>> onTimer
>>>>>>>> are executed on 2  separate threads, with on Timer removing the
>>>>>>>> state ( clear() ) but after another thread has registered a Timer ( in
>>>>>>>> onElement ).
>>>>>>>>
>>>>>>>>
>>>>>>>> if (timestamp == accumulator.getLastModified() + gap) {* // 
>>>>>>>> NullPointers on Race Conditions*
>>>>>>>>         accumulatorState.clear();
>>>>>>>>     }
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> PS. This is the full code.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> @Override
>>>>>>>> public  void processElement(IN event, Context context, Collector<OUT> 
>>>>>>>> out) throws Exception {
>>>>>>>>     TimerService timerService = context.timerService();
>>>>>>>>     if (context.timestamp() > timerService.currentWatermark()) {
>>>>>>>>         OUT accumulator = accumulatorState.value();
>>>>>>>>         if (accumulator == null) {
>>>>>>>>             accumulator = acc.createNew();
>>>>>>>>         }
>>>>>>>>         accumulator.setLastModified(context.timestamp());
>>>>>>>>         accumulatorState.update(accumulator);
>>>>>>>>         timerService.registerEventTimeTimer(context.timestamp() + gap);
>>>>>>>>     }
>>>>>>>> }
>>>>>>>>
>>>>>>>> @Override
>>>>>>>> public  void onTimer(long timestamp, OnTimerContext context, 
>>>>>>>> Collector<OUT> out) throws Exception {
>>>>>>>>     OUT accumulator = accumulatorState.value();
>>>>>>>>     if (timestamp == accumulator.getLastModified() + gap) {* // 
>>>>>>>> NullPointers on Race Conditions*
>>>>>>>>         accumulatorState.clear();
>>>>>>>>     }
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske <fhue...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> That's correct. Removal of timers is not supported in
>>>>>>>>> ProcessFunction. Not sure why this is supported for Triggers.
>>>>>>>>> The common workaround for ProcessFunctions is to register multiple
>>>>>>>>> timers and have a ValueState that stores the valid timestamp on which 
>>>>>>>>> the
>>>>>>>>> onTimer method should be executed.
>>>>>>>>> When a timer fires and calls onTimer(), the method first checks
>>>>>>>>> whether the timestamp is the correct one and leaves the method if 
>>>>>>>>> that is
>>>>>>>>> not the case.
>>>>>>>>> If you want to fire on the next watermark, another trick is to
>>>>>>>>> register multiple timers on (currentWatermark + 1). Since there is 
>>>>>>>>> only one
>>>>>>>>> timer per timestamp, there is only one timer which gets continuously
>>>>>>>>> overwritten. The timer is called when the watermark is advanced.
>>>>>>>>>
>>>>>>>>> On the performance of the timer service. AFAIK, all methods that
>>>>>>>>> work with some kind of timer use this service. So there is not much 
>>>>>>>>> choice.
>>>>>>>>>
>>>>>>>>> 2017-12-20 22:36 GMT+01:00 Vishal Santoshi <
>>>>>>>>> vishal.santo...@gmail.com>:
>>>>>>>>>
>>>>>>>>>> And that further begs the question.. how performant is Timer
>>>>>>>>>> Service. I tried to peruse through the architecture behind it but 
>>>>>>>>>> cold not
>>>>>>>>>> find a definite clue. Is it a Scheduled Service and if yes how many 
>>>>>>>>>> threads
>>>>>>>>>> etc...
>>>>>>>>>>
>>>>>>>>>> On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <
>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Makes sense. Did a first stab at Using ProcessFunction. The
>>>>>>>>>>> TimeService exposed by the Context does not have remove timer. Is it
>>>>>>>>>>> primarily b'coz A Priority Queue is the storage ad remove from a
>>>>>>>>>>> PriorityQueue is expensive ?  Trigger Context does expose another 
>>>>>>>>>>> version
>>>>>>>>>>> that has removal abilities so was wondering why this dissonance...
>>>>>>>>>>>
>>>>>>>>>>> On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske <
>>>>>>>>>>> fhue...@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi Vishal,
>>>>>>>>>>>>
>>>>>>>>>>>> it is not guaranteed that add() and onElement() receive the
>>>>>>>>>>>> same object, and even if they do it is not guaranteed that a 
>>>>>>>>>>>> mutation of
>>>>>>>>>>>> the object in onElement() has an effect. The object might have been
>>>>>>>>>>>> serialized and stored in RocksDB.
>>>>>>>>>>>> Hence, elements should not be modified in onElement().
>>>>>>>>>>>>
>>>>>>>>>>>> Have you considered to implement the operation completely in a
>>>>>>>>>>>> ProcessFunction instead of a session window?
>>>>>>>>>>>> This might be more code but easier to design and reason about
>>>>>>>>>>>> because there is no interaction of window assigner, trigger, and 
>>>>>>>>>>>> window
>>>>>>>>>>>> function.
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> 2017-12-18 20:49 GMT+01:00 Vishal Santoshi <
>>>>>>>>>>>> vishal.santo...@gmail.com>:
>>>>>>>>>>>>
>>>>>>>>>>>>> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9
>>>>>>>>>>>>> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/o
>>>>>>>>>>>>> rg/apache/flink/streaming/runtime/operators/windowing/Window
>>>>>>>>>>>>> Operator.java#L362
>>>>>>>>>>>>>
>>>>>>>>>>>>> is where We could fashion as to what is emitted. Again for us
>>>>>>>>>>>>> it seems natural to use WM to materialize a micro batches with
>>>>>>>>>>>>> "approximate" order ( and no I am not a fan of spark micro 
>>>>>>>>>>>>> batches :)). Any
>>>>>>>>>>>>> pointers as to how we could write an implementation that allows 
>>>>>>>>>>>>> for "up
>>>>>>>>>>>>> till WM emission" through a trigger on a Session Window would be 
>>>>>>>>>>>>> very
>>>>>>>>>>>>> helpful. In essence I believe that for any "funnel" analysis it 
>>>>>>>>>>>>> is crucial.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Something like https://github.com/apache
>>>>>>>>>>>>> /flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-s
>>>>>>>>>>>>> treaming-java/src/main/java/org/apache/flink/streaming/runti
>>>>>>>>>>>>> me/operators/windowing/EvictingWindowOperator.java#L346
>>>>>>>>>>>>>
>>>>>>>>>>>>> I know I am simplifying this and there has to be more to it...
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <
>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> The Trigger in this case would be some CountBased Trigger....
>>>>>>>>>>>>>> Again the motive is the keep the state lean as we desire to 
>>>>>>>>>>>>>> search for
>>>>>>>>>>>>>> patterns, sorted on even time,  in the incoming sessionized ( 
>>>>>>>>>>>>>> and thus of
>>>>>>>>>>>>>> un deterministic length ) stream....
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <
>>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> For example, this would have worked perfect if it did not
>>>>>>>>>>>>>>> complain about MergeableWindow and state. The Session class in 
>>>>>>>>>>>>>>> this
>>>>>>>>>>>>>>> encapsulates the  trim up to watermark behavior ( reduce call 
>>>>>>>>>>>>>>> after telling
>>>>>>>>>>>>>>> it the current WM )  we desire
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> public class SessionProcessWindow extends 
>>>>>>>>>>>>>>> ProcessWindowFunction<Event, Session, String, TimeWindow> {
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     private static final ValueStateDescriptor<Session> 
>>>>>>>>>>>>>>> sessionState = new ValueStateDescriptor<>("session", 
>>>>>>>>>>>>>>> Session.class);
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>>     public void process(String key, Context context, 
>>>>>>>>>>>>>>> Iterable<Event> elements, Collector<Session> out) throws 
>>>>>>>>>>>>>>> Exception {
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>         ValueState<Session> session = 
>>>>>>>>>>>>>>> context.windowState().getState(sessionState);
>>>>>>>>>>>>>>>         Session s = session.value() != null ? session.value() : 
>>>>>>>>>>>>>>> new Session();
>>>>>>>>>>>>>>>         for (Event e : elements) {
>>>>>>>>>>>>>>>             s.add(e);
>>>>>>>>>>>>>>>         }
>>>>>>>>>>>>>>>         s.lastWaterMarkedEventLite.serverTime = 
>>>>>>>>>>>>>>> context.currentWatermark();
>>>>>>>>>>>>>>>         s.reduce();
>>>>>>>>>>>>>>>         out.collect(s);
>>>>>>>>>>>>>>>         session.update(s);
>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>>     public void clear(Context context){
>>>>>>>>>>>>>>>         ValueState<Session> session = 
>>>>>>>>>>>>>>> context.windowState().getState(sessionState);
>>>>>>>>>>>>>>>         session.clear();
>>>>>>>>>>>>>>>     }
>>>>>>>>>>>>>>> }
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <
>>>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Hello Fabian, Thank you for the response.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>  I think that does not work, as it is the WM of the Window
>>>>>>>>>>>>>>>> Operator is what is desired to make deterministic decisions 
>>>>>>>>>>>>>>>> rather than off
>>>>>>>>>>>>>>>> an operator the precedes the Window ? This is doable using
>>>>>>>>>>>>>>>> ProcessWindowFunction using state but only in the case of non 
>>>>>>>>>>>>>>>> mergeable
>>>>>>>>>>>>>>>> windows.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>    The best API  option I think is a TimeBaseTrigger that
>>>>>>>>>>>>>>>> fires every configured time progression of WM  and a Window 
>>>>>>>>>>>>>>>> implementation
>>>>>>>>>>>>>>>> that materializes *only data up till that WM* ( it might
>>>>>>>>>>>>>>>> have more data but that data has event time grater than the WM 
>>>>>>>>>>>>>>>> ). I am not
>>>>>>>>>>>>>>>> sure we have that built in option and thus was asking for an 
>>>>>>>>>>>>>>>> access the
>>>>>>>>>>>>>>>> current WM for the window operator to allow  us handle the 
>>>>>>>>>>>>>>>> "*only
>>>>>>>>>>>>>>>> data up till that WM" *range retrieval using some  custom
>>>>>>>>>>>>>>>> data structure.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> Regards.
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske <
>>>>>>>>>>>>>>>> fhue...@gmail.com> wrote:
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Hi Vishal,
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> the Trigger is not designed to augment records but just to
>>>>>>>>>>>>>>>>> control when a window is evaluated.
>>>>>>>>>>>>>>>>> I would recommend to use a ProcessFunction to enrich
>>>>>>>>>>>>>>>>> records with the current watermark before passing them into 
>>>>>>>>>>>>>>>>> the window
>>>>>>>>>>>>>>>>> operator.
>>>>>>>>>>>>>>>>> The context object of the processElement() method gives
>>>>>>>>>>>>>>>>> access to the current watermark and timestamp of a record.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Please note that watermarks are not deterministic but may
>>>>>>>>>>>>>>>>> depend on the order in which parallel inputs are consumed by 
>>>>>>>>>>>>>>>>> an operator.
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> Best, Fabian
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi <
>>>>>>>>>>>>>>>>> vishal.santo...@gmail.com>:
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> An addendum
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> Is the element reference IN  in onElement(IN element.. )
>>>>>>>>>>>>>>>>>> in Trigger<IN,..>, the same as IN the one provided to add(IN
>>>>>>>>>>>>>>>>>>  value) in Accumulator<IN,..>. It seems that any
>>>>>>>>>>>>>>>>>> mutations to IN in the onElement() is not visible to the 
>>>>>>>>>>>>>>>>>> Accumulator that
>>>>>>>>>>>>>>>>>> is carrying it as a previous element  reference albeit in 
>>>>>>>>>>>>>>>>>> the next
>>>>>>>>>>>>>>>>>> invocation of add(). This seems to be only in distributed 
>>>>>>>>>>>>>>>>>> mode, which makes
>>>>>>>>>>>>>>>>>> sense only if theses reference point to different objects.
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> The pipeline
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> .keyBy(keySelector)
>>>>>>>>>>>>>>>>>> .window(EventTimeSessionWindows.<IN>withGap(gap))
>>>>>>>>>>>>>>>>>> .trigger(new 
>>>>>>>>>>>>>>>>>> CountBasedWMAugmentationTrigger<IN,TimeWindow>(triggerCount))
>>>>>>>>>>>>>>>>>> .aggregate(
>>>>>>>>>>>>>>>>>>         new AggregateFunction<IN, ACC, OUT>() {
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>             @Override
>>>>>>>>>>>>>>>>>>             public ACC createAccumulator() {
>>>>>>>>>>>>>>>>>>                 ACC newInstance = (ACC) accumulator.clone();
>>>>>>>>>>>>>>>>>>                 newInstance.resetLocal();
>>>>>>>>>>>>>>>>>>                 return newInstance;
>>>>>>>>>>>>>>>>>>             }
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>             @Override
>>>>>>>>>>>>>>>>>>             public void add(IN value, ACC accumulator) {
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>                 /** This method is called before onElement 
>>>>>>>>>>>>>>>>>> of the Trigger and keeps the reference to the last IN **/
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>                 accumulator.add(value);
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>             }
>>>>>>>>>>>>>>>>>>             .....
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>    The Trigger
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> public class CountBasedWMAugmentationTrigger<T extends
>>>>>>>>>>>>>>>>>>         Serializable & 
>>>>>>>>>>>>>>>>>> CountBasedWMAugmentationTrigger.HasWaterMark, W extends 
>>>>>>>>>>>>>>>>>> Window> extends Trigger<T, W> {
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>     @Override
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>     public TriggerResult onElement(T element, long 
>>>>>>>>>>>>>>>>>> timestamp, W window, TriggerContext ctx) throws Exception {
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>         /** The element T is mutated to carry the watermark 
>>>>>>>>>>>>>>>>>> **/
>>>>>>>>>>>>>>>>>>         *element.setWaterMark(ctx.getCurrentWatermark());*
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>         .
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
>>>>>>>>>>>>>>>>>> vishal.santo...@gmail.com> wrote:
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> I want to augment a POJO in  Trigger's onElement method,
>>>>>>>>>>>>>>>>>>> specifically supply the POJO with the watermark from the 
>>>>>>>>>>>>>>>>>>> TriggerContext.
>>>>>>>>>>>>>>>>>>> The sequence of execution is this sequence
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> 1. call to add() in the accumulator for the window  and
>>>>>>>>>>>>>>>>>>> save the POJO  reference in the Accumulator.
>>>>>>>>>>>>>>>>>>> 2. call to onElement on Tigger
>>>>>>>>>>>>>>>>>>> 3. set watermark to the POJO
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> The next add() method should have the last reference and
>>>>>>>>>>>>>>>>>>> any mutation done in step 3.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> That works in a local test case, using
>>>>>>>>>>>>>>>>>>> LocalFlinkMiniCluster, as in I have access to the mutation 
>>>>>>>>>>>>>>>>>>> by the
>>>>>>>>>>>>>>>>>>> onElement() in the POJO in the subsequent add(),  but not 
>>>>>>>>>>>>>>>>>>> on a distributed
>>>>>>>>>>>>>>>>>>> cluster. The specific question I had is whether  add() on a 
>>>>>>>>>>>>>>>>>>> supplied
>>>>>>>>>>>>>>>>>>> accumulator on a window and onElement() method of the 
>>>>>>>>>>>>>>>>>>> trigger on that
>>>>>>>>>>>>>>>>>>> window are inline executions, on the same thread or is 
>>>>>>>>>>>>>>>>>>> there any
>>>>>>>>>>>>>>>>>>> serialization/deserialization IPC that causes these 
>>>>>>>>>>>>>>>>>>> divergence ( local
>>>>>>>>>>>>>>>>>>> versus distributed )
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>> Regards.
>>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to