Re: A question about Triggers

2018-01-08 Thread Fabian Hueske
I think I got it
Glad you solved this tricky issue and thanks for sharing your solution :-)

Best, Fabian


2018-01-06 14:33 GMT+01:00 Vishal Santoshi :

> Yep, this though is suboptimal as you imagined.   Two things
>
> *  has a internally has a  that is a ultra lite version of IN,
> only required for the path analysis.
> * Sessionization being expensive, we piggy back multiple other
> aggregations that do not depend on the path or order ( count etc ) .
> Essentially Session is (order path + accumulated stats).
>
> The code seems pretty all right and please tell me if you need a see it.
> All generics so no secrets here.
>
>
>
>
>
>
>
>
> On Fri, Jan 5, 2018 at 11:58 AM, Fabian Hueske  wrote:
>
>> Hi,
>>
>> you would not need the ListStateDescriptor. A WindowProcessFunction
>> stores all events that are assigned to a window (IN objects in your case)
>> in an internal ListState.
>> The Iterable parameter of the process() method iterates over the
>> internal list state.
>>
>> So you would have a Trigger that fires when a new watermark is received
>> (or in regular intervals like every minute) and at the end of the window.
>> The process() method looks up the current watermark in the Context
>> object, traverses the Iterable filtering out all events with timestamp
>> > watermark (you would need to enrich the events with the timestamp which
>> can be done in a ProcessFunction), inserting the remaining ones into a
>> sorted data structure (possibly leveraging the almost sorted nature of the
>> events) and create a Session from it.
>>
>> This is probably less efficient than your ProcessFunction because
>> process() would go over the complete list over and over again and not be
>> able to persist the result of previous invocations.
>> However, the code should be easier to maintain.
>>
>> Does that make sense?
>>
>> Best, Fabian
>>
>> 2018-01-05 17:28 GMT+01:00 Vishal Santoshi :
>>
>>> Hello Fabian, Thank you for your response.
>>>
>>>  I thought about it and may be am missing something
>>> obvious here. The code below is what I think you suggest. The issue is that
>>> the window now is a list of Session's ( or shall subsets of the Session).
>>>
>>> What is required is that on a new watermark
>>>
>>> * We sort these Session objects
>>> * Get the subset that are before the new Watermark and an emit without
>>> purge.
>>>
>>> I do not see how the Trigger approach helps us. It does tell us that the
>>> watermark has progressed but to get a subset of the ListState that falls
>>> before the watermark, we would need access to *the new value  of the
>>> watermark*. That was what my initial query was.
>>>
>>>
>>>
>>> public class SessionProcessWindow>> SessionState> extends ProcessWindowFunction>> TimeWindow> {
>>>
>>>
>>> OUT toCreateNew;
>>> Long gap;
>>> private final ListStateDescriptor< OUT> mergingSetsStateDescriptor;
>>>
>>> public SessionProcessWindow(TypeInformation aggregationResultType,
>>> OUT toCreateNew) {
>>> this.toCreateNew = toCreateNew;
>>> mergingSetsStateDescriptor =
>>> new ListStateDescriptor<>("sessions", 
>>> aggregationResultType);
>>> }
>>> @Override
>>> public void process(String s, Context context, Iterable elements, 
>>> Collector out) throws Exception {
>>> OUT session = toCreateNew.createNew();
>>> elements.forEach(f -> session.add(f));
>>> 
>>> context.windowState().getListState(mergingSetsStateDescriptor).add(session);
>>> }
>>> }
>>>
>>>
>>> On Fri, Jan 5, 2018 at 7:35 AM, Fabian Hueske  wrote:
>>>
 Hi Vishal,

 thanks for sharing your solution!

 Looking at this issue again and your mail in which you shared your
 SessionProcessWindow ProcessWindowFunction, I'm wondering why you need the
 ValueState that prevents the ProcessWindowFunction to be used in a
 mergeable window.
 You could have created a new Session object in each invocation of the
 ProcessWindowFucntion and simply keep the elements in the (mergable) list
 state of the window.
 In that case you would simply need a custom trigger that calls the
 ProcessWindowFunction when a new watermark arrives. For intermediate calls,
 you just FIRE and for the final call you FIRE_AND_PURGE to remove the
 elements from the window's state.
 Did you try that?

 Best, Fabian



 2018-01-03 15:57 GMT+01:00 Vishal Santoshi :

> Dear Fabian,
>
>I was able to create a pretty functional ProcessFunction
> and here is the synopsis and please see if it makes sense.
>
> Sessionization is unique as in it entails windows of dynamic length.
> The way flink approaches is pretty simple. It will create a TimeWindow of
> size "gap" relative to 

Re: A question about Triggers

2018-01-06 Thread Vishal Santoshi
Yep, this though is suboptimal as you imagined.   Two things

*  has a internally has a  that is a ultra lite version of IN,
only required for the path analysis.
* Sessionization being expensive, we piggy back multiple other aggregations
that do not depend on the path or order ( count etc ) . Essentially Session
is (order path + accumulated stats).

The code seems pretty all right and please tell me if you need a see it.
All generics so no secrets here.








On Fri, Jan 5, 2018 at 11:58 AM, Fabian Hueske  wrote:

> Hi,
>
> you would not need the ListStateDescriptor. A WindowProcessFunction stores
> all events that are assigned to a window (IN objects in your case) in an
> internal ListState.
> The Iterable parameter of the process() method iterates over the
> internal list state.
>
> So you would have a Trigger that fires when a new watermark is received
> (or in regular intervals like every minute) and at the end of the window.
> The process() method looks up the current watermark in the Context object,
> traverses the Iterable filtering out all events with timestamp >
> watermark (you would need to enrich the events with the timestamp which can
> be done in a ProcessFunction), inserting the remaining ones into a sorted
> data structure (possibly leveraging the almost sorted nature of the events)
> and create a Session from it.
>
> This is probably less efficient than your ProcessFunction because
> process() would go over the complete list over and over again and not be
> able to persist the result of previous invocations.
> However, the code should be easier to maintain.
>
> Does that make sense?
>
> Best, Fabian
>
> 2018-01-05 17:28 GMT+01:00 Vishal Santoshi :
>
>> Hello Fabian, Thank you for your response.
>>
>>  I thought about it and may be am missing something
>> obvious here. The code below is what I think you suggest. The issue is that
>> the window now is a list of Session's ( or shall subsets of the Session).
>>
>> What is required is that on a new watermark
>>
>> * We sort these Session objects
>> * Get the subset that are before the new Watermark and an emit without
>> purge.
>>
>> I do not see how the Trigger approach helps us. It does tell us that the
>> watermark has progressed but to get a subset of the ListState that falls
>> before the watermark, we would need access to *the new value  of the
>> watermark*. That was what my initial query was.
>>
>>
>>
>> public class SessionProcessWindow> SessionState> extends ProcessWindowFunction> TimeWindow> {
>>
>>
>> OUT toCreateNew;
>> Long gap;
>> private final ListStateDescriptor< OUT> mergingSetsStateDescriptor;
>>
>> public SessionProcessWindow(TypeInformation aggregationResultType,
>> OUT toCreateNew) {
>> this.toCreateNew = toCreateNew;
>> mergingSetsStateDescriptor =
>> new ListStateDescriptor<>("sessions", aggregationResultType);
>> }
>> @Override
>> public void process(String s, Context context, Iterable elements, 
>> Collector out) throws Exception {
>> OUT session = toCreateNew.createNew();
>> elements.forEach(f -> session.add(f));
>> 
>> context.windowState().getListState(mergingSetsStateDescriptor).add(session);
>> }
>> }
>>
>>
>> On Fri, Jan 5, 2018 at 7:35 AM, Fabian Hueske  wrote:
>>
>>> Hi Vishal,
>>>
>>> thanks for sharing your solution!
>>>
>>> Looking at this issue again and your mail in which you shared your
>>> SessionProcessWindow ProcessWindowFunction, I'm wondering why you need the
>>> ValueState that prevents the ProcessWindowFunction to be used in a
>>> mergeable window.
>>> You could have created a new Session object in each invocation of the
>>> ProcessWindowFucntion and simply keep the elements in the (mergable) list
>>> state of the window.
>>> In that case you would simply need a custom trigger that calls the
>>> ProcessWindowFunction when a new watermark arrives. For intermediate calls,
>>> you just FIRE and for the final call you FIRE_AND_PURGE to remove the
>>> elements from the window's state.
>>> Did you try that?
>>>
>>> Best, Fabian
>>>
>>>
>>>
>>> 2018-01-03 15:57 GMT+01:00 Vishal Santoshi :
>>>
 Dear Fabian,

I was able to create a pretty functional ProcessFunction and
 here is the synopsis and please see if it makes sense.

 Sessionization is unique as in it entails windows of dynamic length.
 The way flink approaches is pretty simple. It will create a TimeWindow of
 size "gap" relative to the event time, find an overlapping window (
 intersection ) and create a covering window. Each such window has a "state"
 associated with it, which too has to be merged when a cover window is
 created on intersection of 2 or more incident windows.To be more
 precise if Window1 spans ( t1, t2 ) and a 

Re: A question about Triggers

2018-01-05 Thread Fabian Hueske
Hi,

you would not need the ListStateDescriptor. A WindowProcessFunction stores
all events that are assigned to a window (IN objects in your case) in an
internal ListState.
The Iterable parameter of the process() method iterates over the
internal list state.

So you would have a Trigger that fires when a new watermark is received (or
in regular intervals like every minute) and at the end of the window.
The process() method looks up the current watermark in the Context object,
traverses the Iterable filtering out all events with timestamp >
watermark (you would need to enrich the events with the timestamp which can
be done in a ProcessFunction), inserting the remaining ones into a sorted
data structure (possibly leveraging the almost sorted nature of the events)
and create a Session from it.

This is probably less efficient than your ProcessFunction because process()
would go over the complete list over and over again and not be able to
persist the result of previous invocations.
However, the code should be easier to maintain.

Does that make sense?

Best, Fabian

2018-01-05 17:28 GMT+01:00 Vishal Santoshi :

> Hello Fabian, Thank you for your response.
>
>  I thought about it and may be am missing something
> obvious here. The code below is what I think you suggest. The issue is that
> the window now is a list of Session's ( or shall subsets of the Session).
>
> What is required is that on a new watermark
>
> * We sort these Session objects
> * Get the subset that are before the new Watermark and an emit without
> purge.
>
> I do not see how the Trigger approach helps us. It does tell us that the
> watermark has progressed but to get a subset of the ListState that falls
> before the watermark, we would need access to *the new value  of the
> watermark*. That was what my initial query was.
>
>
>
> public class SessionProcessWindow SessionState> extends ProcessWindowFunction TimeWindow> {
>
>
> OUT toCreateNew;
> Long gap;
> private final ListStateDescriptor< OUT> mergingSetsStateDescriptor;
>
> public SessionProcessWindow(TypeInformation aggregationResultType,
> OUT toCreateNew) {
> this.toCreateNew = toCreateNew;
> mergingSetsStateDescriptor =
> new ListStateDescriptor<>("sessions", aggregationResultType);
> }
> @Override
> public void process(String s, Context context, Iterable elements, 
> Collector out) throws Exception {
> OUT session = toCreateNew.createNew();
> elements.forEach(f -> session.add(f));
> 
> context.windowState().getListState(mergingSetsStateDescriptor).add(session);
> }
> }
>
>
> On Fri, Jan 5, 2018 at 7:35 AM, Fabian Hueske  wrote:
>
>> Hi Vishal,
>>
>> thanks for sharing your solution!
>>
>> Looking at this issue again and your mail in which you shared your
>> SessionProcessWindow ProcessWindowFunction, I'm wondering why you need the
>> ValueState that prevents the ProcessWindowFunction to be used in a
>> mergeable window.
>> You could have created a new Session object in each invocation of the
>> ProcessWindowFucntion and simply keep the elements in the (mergable) list
>> state of the window.
>> In that case you would simply need a custom trigger that calls the
>> ProcessWindowFunction when a new watermark arrives. For intermediate calls,
>> you just FIRE and for the final call you FIRE_AND_PURGE to remove the
>> elements from the window's state.
>> Did you try that?
>>
>> Best, Fabian
>>
>>
>>
>> 2018-01-03 15:57 GMT+01:00 Vishal Santoshi :
>>
>>> Dear Fabian,
>>>
>>>I was able to create a pretty functional ProcessFunction and
>>> here is the synopsis and please see if it makes sense.
>>>
>>> Sessionization is unique as in it entails windows of dynamic length. The
>>> way flink approaches is pretty simple. It will create a TimeWindow of size
>>> "gap" relative to the event time, find an overlapping window ( intersection
>>> ) and create a covering window. Each such window has a "state" associated
>>> with it, which too has to be merged when a cover window is created on
>>> intersection of 2 or more incident windows.To be more precise if
>>> Window1 spans ( t1, t2 ) and a new record creates a window ( t3, t4 ) and
>>>   t1<=t3<=t2 a new Window is created ( t1, t4 ) and the associated
>>> states are merged.
>>>
>>>
>>> In the current Window API the states are external and are
>>> Accumulator based. This approach pretty much works for all cases where
>>> the aggregation is accumulative/reduced  and does not depend on order,
>>> as in no order list of incoming records needs to be kept and reduction is
>>> to a single aggregated element ( think counts, min max etc). In path
>>> analysis ( and other use cases ) however this approach has drawbacks. Even
>>> though in our accumulator we could keep an ordered list of events it
>>> becomes 

Re: A question about Triggers

2018-01-05 Thread Vishal Santoshi
Hello Fabian, Thank you for your response.

 I thought about it and may be am missing something
obvious here. The code below is what I think you suggest. The issue is that
the window now is a list of Session's ( or shall subsets of the Session).

What is required is that on a new watermark

* We sort these Session objects
* Get the subset that are before the new Watermark and an emit without
purge.

I do not see how the Trigger approach helps us. It does tell us that the
watermark has progressed but to get a subset of the ListState that falls
before the watermark, we would need access to *the new value  of the
watermark*. That was what my initial query was.



public class SessionProcessWindow> extends ProcessWindowFunction {


OUT toCreateNew;
Long gap;
private final ListStateDescriptor< OUT> mergingSetsStateDescriptor;

public SessionProcessWindow(TypeInformation aggregationResultType,
OUT toCreateNew) {
this.toCreateNew = toCreateNew;
mergingSetsStateDescriptor =
new ListStateDescriptor<>("sessions", aggregationResultType);
}
@Override
public void process(String s, Context context, Iterable
elements, Collector out) throws Exception {
OUT session = toCreateNew.createNew();
elements.forEach(f -> session.add(f));

context.windowState().getListState(mergingSetsStateDescriptor).add(session);
}
}


On Fri, Jan 5, 2018 at 7:35 AM, Fabian Hueske  wrote:

> Hi Vishal,
>
> thanks for sharing your solution!
>
> Looking at this issue again and your mail in which you shared your
> SessionProcessWindow ProcessWindowFunction, I'm wondering why you need the
> ValueState that prevents the ProcessWindowFunction to be used in a
> mergeable window.
> You could have created a new Session object in each invocation of the
> ProcessWindowFucntion and simply keep the elements in the (mergable) list
> state of the window.
> In that case you would simply need a custom trigger that calls the
> ProcessWindowFunction when a new watermark arrives. For intermediate calls,
> you just FIRE and for the final call you FIRE_AND_PURGE to remove the
> elements from the window's state.
> Did you try that?
>
> Best, Fabian
>
>
>
> 2018-01-03 15:57 GMT+01:00 Vishal Santoshi :
>
>> Dear Fabian,
>>
>>I was able to create a pretty functional ProcessFunction and
>> here is the synopsis and please see if it makes sense.
>>
>> Sessionization is unique as in it entails windows of dynamic length. The
>> way flink approaches is pretty simple. It will create a TimeWindow of size
>> "gap" relative to the event time, find an overlapping window ( intersection
>> ) and create a covering window. Each such window has a "state" associated
>> with it, which too has to be merged when a cover window is created on
>> intersection of 2 or more incident windows.To be more precise if Window1
>> spans ( t1, t2 ) and a new record creates a window ( t3, t4 ) and  t1<=t3<=t2
>> a new Window is created ( t1, t4 ) and the associated states are merged.
>>
>>
>> In the current Window API the states are external and are
>> Accumulator based. This approach pretty much works for all cases where
>> the aggregation is accumulative/reduced  and does not depend on order,
>> as in no order list of incoming records needs to be kept and reduction is
>> to a single aggregated element ( think counts, min max etc). In path
>> analysis ( and other use cases ) however this approach has drawbacks. Even
>> though in our accumulator we could keep an ordered list of events it
>> becomes unreasonable if not within bounds. An approach that does
>> *attempt* to bind state, is to preemptively analyze paths using the WM
>> as the marker that defines the *subset* of the state that is safe to
>> analyze. So if we have n events in the window state and m fall before WM,
>> we can safely analyze the m subset, emitting paths seen and reducing the
>> cumulative state size. There are caveats though that I will go into later.
>>
>>
>>
>> Unfortunately the Accumulators in Flink Window runtime defaults do not
>> have access to the WM.
>>
>>
>> This lead to this generic approach  ( implemented and tested )
>>
>>
>> * Use a low level ProcessFunction that allows access to WM and definitely
>> nearer to the guts of Flink.
>>
>>
>> * Still use the merge Windows on intersection approach but use WM to
>> trigger ( through Timers)  reductions in state. This is not very
>> dissimilar to what Flink does but we have more control over what to do and
>> when to do it. Essentially have exposed a lifecycle method that reacts
>> to WM progression.
>>
>>
>> * There are essentially 2 Timers. The first timer is the maxTimeStamp()
>> of a Window, which if there is no further mutation b'coz of merge etc will
>> fire to reflect a Session End. The second one is  on currentWaterMark+1
>> that essentially calls 

Re: A question about Triggers

2018-01-05 Thread Fabian Hueske
Hi Vishal,

thanks for sharing your solution!

Looking at this issue again and your mail in which you shared your
SessionProcessWindow ProcessWindowFunction, I'm wondering why you need the
ValueState that prevents the ProcessWindowFunction to be used in a
mergeable window.
You could have created a new Session object in each invocation of the
ProcessWindowFucntion and simply keep the elements in the (mergable) list
state of the window.
In that case you would simply need a custom trigger that calls the
ProcessWindowFunction when a new watermark arrives. For intermediate calls,
you just FIRE and for the final call you FIRE_AND_PURGE to remove the
elements from the window's state.
Did you try that?

Best, Fabian



2018-01-03 15:57 GMT+01:00 Vishal Santoshi :

> Dear Fabian,
>
>I was able to create a pretty functional ProcessFunction and
> here is the synopsis and please see if it makes sense.
>
> Sessionization is unique as in it entails windows of dynamic length. The
> way flink approaches is pretty simple. It will create a TimeWindow of size
> "gap" relative to the event time, find an overlapping window ( intersection
> ) and create a covering window. Each such window has a "state" associated
> with it, which too has to be merged when a cover window is created on
> intersection of 2 or more incident windows.To be more precise if Window1
> spans ( t1, t2 ) and a new record creates a window ( t3, t4 ) and  t1<=t3<=t2
> a new Window is created ( t1, t4 ) and the associated states are merged.
>
>
> In the current Window API the states are external and are
> Accumulator based. This approach pretty much works for all cases where
> the aggregation is accumulative/reduced  and does not depend on order, as
> in no order list of incoming records needs to be kept and reduction is to a
> single aggregated element ( think counts, min max etc). In path analysis (
> and other use cases ) however this approach has drawbacks. Even though in
> our accumulator we could keep an ordered list of events it becomes
> unreasonable if not within bounds. An approach that does *attempt* to
> bind state, is to preemptively analyze paths using the WM as the marker
> that defines the *subset* of the state that is safe to analyze. So if we
> have n events in the window state and m fall before WM, we can safely
> analyze the m subset, emitting paths seen and reducing the cumulative state
> size. There are caveats though that I will go into later.
>
>
> Unfortunately the Accumulators in Flink Window runtime defaults do not
> have access to the WM.
>
>
> This lead to this generic approach  ( implemented and tested )
>
>
> * Use a low level ProcessFunction that allows access to WM and definitely
> nearer to the guts of Flink.
>
>
> * Still use the merge Windows on intersection approach but use WM to
> trigger ( through Timers)  reductions in state. This is not very
> dissimilar to what Flink does but we have more control over what to do and
> when to do it. Essentially have exposed a lifecycle method that reacts to
> WM progression.
>
>
> * There are essentially 2 Timers. The first timer is the maxTimeStamp() of
> a Window, which if there is no further mutation b'coz of merge etc will
> fire to reflect a Session End. The second one is  on currentWaterMark+1
> that essentially calls a "reduceToWM" on each keyed Window and thus State.
>
>
> * There are 2 ways to short circuit a Session 1. On Session time span 2.
> On Session size.
>
>
> * There is a safety valve to blacklist keys when it is obvious that it is
> a bot ( again
>
>
> The solution will thus preemptively push out Patterns ( and correct
> patterns ) while keeping the ordered state within reasonable bounds. The
> incident data of course has to be analyzed . Are the paths to large etc.
> But one has full control over how to fashion the solution.
>
>
>
>
> Regards and Thanks,
>
>
> Vishal
>
>
>
>
>
>
>
>
>
> On Wed, Dec 27, 2017 at 10:41 AM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> This makes sense.  Thanks.
>>
>> On Sat, Dec 23, 2017 at 10:58 AM, Fabian Hueske 
>> wrote:
>>
>>> Hi,
>>>
>>> all calls to onElement() or onTimer() are syncronized for any keys.
>>> Think of a single thread calling these methods.
>>> Event-time timers are called when a watermark passes the timer.
>>> Watermarks are received as special records, so the methods are called in
>>> the same order as records (actual records or watermarks) arrive at the
>>> function. Only for processing-time timers, actual synchronization is
>>> required.
>>>
>>> The NPE might be thrown because of two timers that fire one after the
>>> other without a new record being processed in between the onTimer() calls.
>>> In that case the state is cleared in the first call and null in the second.
>>>
>>> Best, Fabian
>>>
>>> 2017-12-23 16:36 GMT+01:00 Vishal Santoshi :
>>>
 Thanks.




 I have a few follow up 

Re: A question about Triggers

2018-01-03 Thread Vishal Santoshi
Dear Fabian,

   I was able to create a pretty functional ProcessFunction and
here is the synopsis and please see if it makes sense.

Sessionization is unique as in it entails windows of dynamic length. The
way flink approaches is pretty simple. It will create a TimeWindow of size
"gap" relative to the event time, find an overlapping window ( intersection
) and create a covering window. Each such window has a "state" associated
with it, which too has to be merged when a cover window is created on
intersection of 2 or more incident windows.To be more precise if Window1
spans ( t1, t2 ) and a new record creates a window ( t3, t4 ) and  t1<=t3<=t2
a new Window is created ( t1, t4 ) and the associated states are merged.


In the current Window API the states are external and are
Accumulator based. This approach pretty much works for all cases where the
aggregation is accumulative/reduced  and does not depend on order, as in no
order list of incoming records needs to be kept and reduction is to a
single aggregated element ( think counts, min max etc). In path analysis (
and other use cases ) however this approach has drawbacks. Even though in
our accumulator we could keep an ordered list of events it becomes
unreasonable if not within bounds. An approach that does *attempt* to bind
state, is to preemptively analyze paths using the WM as the marker that
defines the *subset* of the state that is safe to analyze. So if we have n
events in the window state and m fall before WM, we can safely analyze the
m subset, emitting paths seen and reducing the cumulative state size. There
are caveats though that I will go into later.


Unfortunately the Accumulators in Flink Window runtime defaults do not have
access to the WM.


This lead to this generic approach  ( implemented and tested )


* Use a low level ProcessFunction that allows access to WM and definitely
nearer to the guts of Flink.


* Still use the merge Windows on intersection approach but use WM to
trigger ( through Timers)  reductions in state. This is not very dissimilar
to what Flink does but we have more control over what to do and when to do
it. Essentially have exposed a lifecycle method that reacts to WM
progression.


* There are essentially 2 Timers. The first timer is the maxTimeStamp() of
a Window, which if there is no further mutation b'coz of merge etc will
fire to reflect a Session End. The second one is  on currentWaterMark+1
that essentially calls a "reduceToWM" on each keyed Window and thus State.


* There are 2 ways to short circuit a Session 1. On Session time span 2. On
Session size.


* There is a safety valve to blacklist keys when it is obvious that it is a
bot ( again


The solution will thus preemptively push out Patterns ( and correct
patterns ) while keeping the ordered state within reasonable bounds. The
incident data of course has to be analyzed . Are the paths to large etc.
But one has full control over how to fashion the solution.




Regards and Thanks,


Vishal









On Wed, Dec 27, 2017 at 10:41 AM, Vishal Santoshi  wrote:

> This makes sense.  Thanks.
>
> On Sat, Dec 23, 2017 at 10:58 AM, Fabian Hueske  wrote:
>
>> Hi,
>>
>> all calls to onElement() or onTimer() are syncronized for any keys. Think
>> of a single thread calling these methods.
>> Event-time timers are called when a watermark passes the timer.
>> Watermarks are received as special records, so the methods are called in
>> the same order as records (actual records or watermarks) arrive at the
>> function. Only for processing-time timers, actual synchronization is
>> required.
>>
>> The NPE might be thrown because of two timers that fire one after the
>> other without a new record being processed in between the onTimer() calls.
>> In that case the state is cleared in the first call and null in the second.
>>
>> Best, Fabian
>>
>> 2017-12-23 16:36 GMT+01:00 Vishal Santoshi :
>>
>>> Thanks.
>>>
>>>
>>>
>>>
>>> I have a few follow up questions regarding ProcessFunction. I think
>>> that the core should take care of any synchronization issues between calls
>>> to onElement and onTimer in case of a keyed stream but tests do not seem to
>>> suggest that.
>>>
>>>
>>>
>>> I have  specifically 2 questions.
>>>
>>>
>>> 1.  Are calls  to onElement(..) single threaded if scoped to a key ? As
>>> in on a keyed stream, is there a  way that 2 or more threads can
>>> execute on the more than one element of a single key at one time ? Would I
>>> have to synchronize this construction
>>>
>>>
>>>
>>>
>>> *OUT accumulator = accumulatorState.value();if (accumulator == 
>>> null) {accumulator = acc.createNew();}*
>>>
>>>
>>>
>>> 2. Can concurrent calls happen  onTimer(..) and onElement(..) for the
>>> same key ? I intend to clean up state but I see  NullPointers in
>>> OnTimer(..) thrown and I presume it is b'coz the onElement and onTimer are
>>> executed on 2  separate 

Re: A question about Triggers

2017-12-23 Thread Fabian Hueske
Hi,

all calls to onElement() or onTimer() are syncronized for any keys. Think
of a single thread calling these methods.
Event-time timers are called when a watermark passes the timer. Watermarks
are received as special records, so the methods are called in the same
order as records (actual records or watermarks) arrive at the function.
Only for processing-time timers, actual synchronization is required.

The NPE might be thrown because of two timers that fire one after the other
without a new record being processed in between the onTimer() calls. In
that case the state is cleared in the first call and null in the second.

Best, Fabian

2017-12-23 16:36 GMT+01:00 Vishal Santoshi :

> Thanks.
>
>
>
>
> I have a few follow up questions regarding ProcessFunction. I think
> that the core should take care of any synchronization issues between calls
> to onElement and onTimer in case of a keyed stream but tests do not seem to
> suggest that.
>
>
>
> I have  specifically 2 questions.
>
>
> 1.  Are calls  to onElement(..) single threaded if scoped to a key ? As
> in on a keyed stream, is there a  way that 2 or more threads can execute
> on the more than one element of a single key at one time ? Would I have to
> synchronize this construction
>
>
>
>
> *OUT accumulator = accumulatorState.value();if (accumulator == null) 
> {accumulator = acc.createNew();}*
>
>
>
> 2. Can concurrent calls happen  onTimer(..) and onElement(..) for the
> same key ? I intend to clean up state but I see  NullPointers in
> OnTimer(..) thrown and I presume it is b'coz the onElement and onTimer are
> executed on 2  separate threads, with on Timer removing the state (
> clear() ) but after another thread has registered a Timer ( in onElement ).
>
>
> if (timestamp == accumulator.getLastModified() + gap) {* // NullPointers on 
> Race Conditions*
> accumulatorState.clear();
> }
>
>
>
>
>
>
> PS. This is the full code.
>
>
>
> @Override
> public  void processElement(IN event, Context context, Collector out) 
> throws Exception {
> TimerService timerService = context.timerService();
> if (context.timestamp() > timerService.currentWatermark()) {
> OUT accumulator = accumulatorState.value();
> if (accumulator == null) {
> accumulator = acc.createNew();
> }
> accumulator.setLastModified(context.timestamp());
> accumulatorState.update(accumulator);
> timerService.registerEventTimeTimer(context.timestamp() + gap);
> }
> }
>
> @Override
> public  void onTimer(long timestamp, OnTimerContext context, Collector 
> out) throws Exception {
> OUT accumulator = accumulatorState.value();
> if (timestamp == accumulator.getLastModified() + gap) {* // NullPointers 
> on Race Conditions*
> accumulatorState.clear();
> }
> }
>
>
>
> On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske  wrote:
>
>> That's correct. Removal of timers is not supported in ProcessFunction.
>> Not sure why this is supported for Triggers.
>> The common workaround for ProcessFunctions is to register multiple timers
>> and have a ValueState that stores the valid timestamp on which the onTimer
>> method should be executed.
>> When a timer fires and calls onTimer(), the method first checks whether
>> the timestamp is the correct one and leaves the method if that is not the
>> case.
>> If you want to fire on the next watermark, another trick is to register
>> multiple timers on (currentWatermark + 1). Since there is only one timer
>> per timestamp, there is only one timer which gets continuously overwritten.
>> The timer is called when the watermark is advanced.
>>
>> On the performance of the timer service. AFAIK, all methods that work
>> with some kind of timer use this service. So there is not much choice.
>>
>> 2017-12-20 22:36 GMT+01:00 Vishal Santoshi :
>>
>>> And that further begs the question.. how performant is Timer Service. I
>>> tried to peruse through the architecture behind it but cold not find a
>>> definite clue. Is it a Scheduled Service and if yes how many threads etc...
>>>
>>> On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 Makes sense. Did a first stab at Using ProcessFunction. The TimeService
 exposed by the Context does not have remove timer. Is it primarily b'coz A
 Priority Queue is the storage ad remove from a PriorityQueue is expensive
 ?  Trigger Context does expose another version that has removal abilities
 so was wondering why this dissonance...

 On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske 
 wrote:

> Hi Vishal,
>
> it is not guaranteed that add() and onElement() receive the same
> object, and even if they do it is not guaranteed that a mutation of the
> object in onElement() has an effect. The object might have been serialized
> and 

Re: A question about Triggers

2017-12-23 Thread Vishal Santoshi
Thanks.




I have a few follow up questions regarding ProcessFunction. I think
that the core should take care of any synchronization issues between calls
to onElement and onTimer in case of a keyed stream but tests do not seem to
suggest that.



I have  specifically 2 questions.


1.  Are calls  to onElement(..) single threaded if scoped to a key ? As in
on a keyed stream, is there a  way that 2 or more threads can execute on
the more than one element of a single key at one time ? Would I have to
synchronize this construction




*OUT accumulator = accumulatorState.value();if (accumulator ==
null) {accumulator = acc.createNew();}*



2. Can concurrent calls happen  onTimer(..) and onElement(..) for the same
key ? I intend to clean up state but I see  NullPointers in OnTimer(..)
thrown and I presume it is b'coz the onElement and onTimer are executed on 2
separate threads, with on Timer removing the state ( clear() ) but after
another thread has registered a Timer ( in onElement ).


if (timestamp == accumulator.getLastModified() + gap) {* //
NullPointers on Race Conditions*
accumulatorState.clear();
}






PS. This is the full code.



@Override
public  void processElement(IN event, Context context, Collector
out) throws Exception {
TimerService timerService = context.timerService();
if (context.timestamp() > timerService.currentWatermark()) {
OUT accumulator = accumulatorState.value();
if (accumulator == null) {
accumulator = acc.createNew();
}
accumulator.setLastModified(context.timestamp());
accumulatorState.update(accumulator);
timerService.registerEventTimeTimer(context.timestamp() + gap);
}
}

@Override
public  void onTimer(long timestamp, OnTimerContext context,
Collector out) throws Exception {
OUT accumulator = accumulatorState.value();
if (timestamp == accumulator.getLastModified() + gap) {* //
NullPointers on Race Conditions*
accumulatorState.clear();
}
}



On Thu, Dec 21, 2017 at 3:49 AM, Fabian Hueske  wrote:

> That's correct. Removal of timers is not supported in ProcessFunction. Not
> sure why this is supported for Triggers.
> The common workaround for ProcessFunctions is to register multiple timers
> and have a ValueState that stores the valid timestamp on which the onTimer
> method should be executed.
> When a timer fires and calls onTimer(), the method first checks whether
> the timestamp is the correct one and leaves the method if that is not the
> case.
> If you want to fire on the next watermark, another trick is to register
> multiple timers on (currentWatermark + 1). Since there is only one timer
> per timestamp, there is only one timer which gets continuously overwritten.
> The timer is called when the watermark is advanced.
>
> On the performance of the timer service. AFAIK, all methods that work with
> some kind of timer use this service. So there is not much choice.
>
> 2017-12-20 22:36 GMT+01:00 Vishal Santoshi :
>
>> And that further begs the question.. how performant is Timer Service. I
>> tried to peruse through the architecture behind it but cold not find a
>> definite clue. Is it a Scheduled Service and if yes how many threads etc...
>>
>> On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Makes sense. Did a first stab at Using ProcessFunction. The TimeService
>>> exposed by the Context does not have remove timer. Is it primarily b'coz A
>>> Priority Queue is the storage ad remove from a PriorityQueue is expensive
>>> ?  Trigger Context does expose another version that has removal abilities
>>> so was wondering why this dissonance...
>>>
>>> On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske 
>>> wrote:
>>>
 Hi Vishal,

 it is not guaranteed that add() and onElement() receive the same
 object, and even if they do it is not guaranteed that a mutation of the
 object in onElement() has an effect. The object might have been serialized
 and stored in RocksDB.
 Hence, elements should not be modified in onElement().

 Have you considered to implement the operation completely in a
 ProcessFunction instead of a session window?
 This might be more code but easier to design and reason about because
 there is no interaction of window assigner, trigger, and window function.


 2017-12-18 20:49 GMT+01:00 Vishal Santoshi :

> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9
> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/o
> rg/apache/flink/streaming/runtime/operators/windowing/Window
> Operator.java#L362
>
> is where We could fashion as to what is emitted. Again for us it seems
> natural to use WM to materialize a micro batches with "approximate" order 
> (
> and no I am not a fan of spark micro 

Re: A question about Triggers

2017-12-21 Thread Fabian Hueske
That's correct. Removal of timers is not supported in ProcessFunction. Not
sure why this is supported for Triggers.
The common workaround for ProcessFunctions is to register multiple timers
and have a ValueState that stores the valid timestamp on which the onTimer
method should be executed.
When a timer fires and calls onTimer(), the method first checks whether the
timestamp is the correct one and leaves the method if that is not the case.
If you want to fire on the next watermark, another trick is to register
multiple timers on (currentWatermark + 1). Since there is only one timer
per timestamp, there is only one timer which gets continuously overwritten.
The timer is called when the watermark is advanced.

On the performance of the timer service. AFAIK, all methods that work with
some kind of timer use this service. So there is not much choice.

2017-12-20 22:36 GMT+01:00 Vishal Santoshi :

> And that further begs the question.. how performant is Timer Service. I
> tried to peruse through the architecture behind it but cold not find a
> definite clue. Is it a Scheduled Service and if yes how many threads etc...
>
> On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> Makes sense. Did a first stab at Using ProcessFunction. The TimeService
>> exposed by the Context does not have remove timer. Is it primarily b'coz A
>> Priority Queue is the storage ad remove from a PriorityQueue is expensive
>> ?  Trigger Context does expose another version that has removal abilities
>> so was wondering why this dissonance...
>>
>> On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske  wrote:
>>
>>> Hi Vishal,
>>>
>>> it is not guaranteed that add() and onElement() receive the same object,
>>> and even if they do it is not guaranteed that a mutation of the object in
>>> onElement() has an effect. The object might have been serialized and stored
>>> in RocksDB.
>>> Hence, elements should not be modified in onElement().
>>>
>>> Have you considered to implement the operation completely in a
>>> ProcessFunction instead of a session window?
>>> This might be more code but easier to design and reason about because
>>> there is no interaction of window assigner, trigger, and window function.
>>>
>>>
>>> 2017-12-18 20:49 GMT+01:00 Vishal Santoshi :
>>>
 I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9
 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/o
 rg/apache/flink/streaming/runtime/operators/windowing/Window
 Operator.java#L362

 is where We could fashion as to what is emitted. Again for us it seems
 natural to use WM to materialize a micro batches with "approximate" order (
 and no I am not a fan of spark micro batches :)). Any pointers as to how we
 could write an implementation that allows for "up till WM emission" through
 a trigger on a Session Window would be very helpful. In essence I believe
 that for any "funnel" analysis it is crucial.

 Something like https://github.com/apache/flink/blob/7f99a0df669dc73c98
 3913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/or
 g/apache/flink/streaming/runtime/operators/windowing/Evictin
 gWindowOperator.java#L346

 I know I am simplifying this and there has to be more to it...




 On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> The Trigger in this case would be some CountBased Trigger Again
> the motive is the keep the state lean as we desire to search for  
> patterns,
> sorted on even time,  in the incoming sessionized ( and thus of un
> deterministic length ) stream
>
> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> For example, this would have worked perfect if it did not complain
>> about MergeableWindow and state. The Session class in this encapsulates
>> the  trim up to watermark behavior ( reduce call after telling it the
>> current WM )  we desire
>>
>> public class SessionProcessWindow extends ProcessWindowFunction> Session, String, TimeWindow> {
>>
>> private static final ValueStateDescriptor sessionState = 
>> new ValueStateDescriptor<>("session", Session.class);
>>
>> @Override
>> public void process(String key, Context context, Iterable 
>> elements, Collector out) throws Exception {
>>
>> ValueState session = 
>> context.windowState().getState(sessionState);
>> Session s = session.value() != null ? session.value() : new 
>> Session();
>> for (Event e : elements) {
>> s.add(e);
>> }
>> s.lastWaterMarkedEventLite.serverTime = 
>> context.currentWatermark();
>> s.reduce();
>> out.collect(s);
>> 

Re: A question about Triggers

2017-12-20 Thread Vishal Santoshi
And that further begs the question.. how performant is Timer Service. I
tried to peruse through the architecture behind it but cold not find a
definite clue. Is it a Scheduled Service and if yes how many threads etc...

On Wed, Dec 20, 2017 at 4:25 PM, Vishal Santoshi 
wrote:

> Makes sense. Did a first stab at Using ProcessFunction. The TimeService
> exposed by the Context does not have remove timer. Is it primarily b'coz A
> Priority Queue is the storage ad remove from a PriorityQueue is expensive
> ?  Trigger Context does expose another version that has removal abilities
> so was wondering why this dissonance...
>
> On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske  wrote:
>
>> Hi Vishal,
>>
>> it is not guaranteed that add() and onElement() receive the same object,
>> and even if they do it is not guaranteed that a mutation of the object in
>> onElement() has an effect. The object might have been serialized and stored
>> in RocksDB.
>> Hence, elements should not be modified in onElement().
>>
>> Have you considered to implement the operation completely in a
>> ProcessFunction instead of a session window?
>> This might be more code but easier to design and reason about because
>> there is no interaction of window assigner, trigger, and window function.
>>
>>
>> 2017-12-18 20:49 GMT+01:00 Vishal Santoshi :
>>
>>> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9
>>> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/o
>>> rg/apache/flink/streaming/runtime/operators/windowing/Window
>>> Operator.java#L362
>>>
>>> is where We could fashion as to what is emitted. Again for us it seems
>>> natural to use WM to materialize a micro batches with "approximate" order (
>>> and no I am not a fan of spark micro batches :)). Any pointers as to how we
>>> could write an implementation that allows for "up till WM emission" through
>>> a trigger on a Session Window would be very helpful. In essence I believe
>>> that for any "funnel" analysis it is crucial.
>>>
>>> Something like https://github.com/apache/flink/blob/7f99a0df669dc73c98
>>> 3913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/or
>>> g/apache/flink/streaming/runtime/operators/windowing/Evictin
>>> gWindowOperator.java#L346
>>>
>>> I know I am simplifying this and there has to be more to it...
>>>
>>>
>>>
>>>
>>> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 The Trigger in this case would be some CountBased Trigger Again the
 motive is the keep the state lean as we desire to search for  patterns,
 sorted on even time,  in the incoming sessionized ( and thus of un
 deterministic length ) stream

 On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> For example, this would have worked perfect if it did not complain
> about MergeableWindow and state. The Session class in this encapsulates
> the  trim up to watermark behavior ( reduce call after telling it the
> current WM )  we desire
>
> public class SessionProcessWindow extends ProcessWindowFunction Session, String, TimeWindow> {
>
> private static final ValueStateDescriptor sessionState = new 
> ValueStateDescriptor<>("session", Session.class);
>
> @Override
> public void process(String key, Context context, Iterable 
> elements, Collector out) throws Exception {
>
> ValueState session = 
> context.windowState().getState(sessionState);
> Session s = session.value() != null ? session.value() : new 
> Session();
> for (Event e : elements) {
> s.add(e);
> }
> s.lastWaterMarkedEventLite.serverTime = 
> context.currentWatermark();
> s.reduce();
> out.collect(s);
> session.update(s);
> }
>
> @Override
> public void clear(Context context){
> ValueState session = 
> context.windowState().getState(sessionState);
> session.clear();
> }
> }
>
>
>
>
> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> Hello Fabian, Thank you for the response.
>>
>>  I think that does not work, as it is the WM of the Window Operator
>> is what is desired to make deterministic decisions rather than off an
>> operator the precedes the Window ? This is doable using
>> ProcessWindowFunction using state but only in the case of non mergeable
>> windows.
>>
>>The best API  option I think is a TimeBaseTrigger that fires every
>> configured time progression of WM  and a Window implementation that
>> materializes *only data up till that WM* ( it might have more data
>> but that data has event time grater than the WM ). I am not sure we 

Re: A question about Triggers

2017-12-20 Thread Vishal Santoshi
Makes sense. Did a first stab at Using ProcessFunction. The TimeService
exposed by the Context does not have remove timer. Is it primarily b'coz A
Priority Queue is the storage ad remove from a PriorityQueue is expensive
?  Trigger Context does expose another version that has removal abilities
so was wondering why this dissonance...

On Tue, Dec 19, 2017 at 4:53 AM, Fabian Hueske  wrote:

> Hi Vishal,
>
> it is not guaranteed that add() and onElement() receive the same object,
> and even if they do it is not guaranteed that a mutation of the object in
> onElement() has an effect. The object might have been serialized and stored
> in RocksDB.
> Hence, elements should not be modified in onElement().
>
> Have you considered to implement the operation completely in a
> ProcessFunction instead of a session window?
> This might be more code but easier to design and reason about because
> there is no interaction of window assigner, trigger, and window function.
>
>
> 2017-12-18 20:49 GMT+01:00 Vishal Santoshi :
>
>> I guess https://github.com/apache/flink/blob/7f99a0df669dc73c9
>> 83913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/
>> org/apache/flink/streaming/runtime/operators/windowing/Wi
>> ndowOperator.java#L362
>>
>> is where We could fashion as to what is emitted. Again for us it seems
>> natural to use WM to materialize a micro batches with "approximate" order (
>> and no I am not a fan of spark micro batches :)). Any pointers as to how we
>> could write an implementation that allows for "up till WM emission" through
>> a trigger on a Session Window would be very helpful. In essence I believe
>> that for any "funnel" analysis it is crucial.
>>
>> Something like https://github.com/apache/flink/blob/7f99a0df669dc73c98
>> 3913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/
>> org/apache/flink/streaming/runtime/operators/windowing/Ev
>> ictingWindowOperator.java#L346
>>
>> I know I am simplifying this and there has to be more to it...
>>
>>
>>
>>
>> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> The Trigger in this case would be some CountBased Trigger Again the
>>> motive is the keep the state lean as we desire to search for  patterns,
>>> sorted on even time,  in the incoming sessionized ( and thus of un
>>> deterministic length ) stream
>>>
>>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 For example, this would have worked perfect if it did not complain
 about MergeableWindow and state. The Session class in this encapsulates
 the  trim up to watermark behavior ( reduce call after telling it the
 current WM )  we desire

 public class SessionProcessWindow extends ProcessWindowFunction {

 private static final ValueStateDescriptor sessionState = new 
 ValueStateDescriptor<>("session", Session.class);

 @Override
 public void process(String key, Context context, Iterable 
 elements, Collector out) throws Exception {

 ValueState session = 
 context.windowState().getState(sessionState);
 Session s = session.value() != null ? session.value() : new 
 Session();
 for (Event e : elements) {
 s.add(e);
 }
 s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
 s.reduce();
 out.collect(s);
 session.update(s);
 }

 @Override
 public void clear(Context context){
 ValueState session = 
 context.windowState().getState(sessionState);
 session.clear();
 }
 }




 On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> Hello Fabian, Thank you for the response.
>
>  I think that does not work, as it is the WM of the Window Operator is
> what is desired to make deterministic decisions rather than off an 
> operator
> the precedes the Window ? This is doable using ProcessWindowFunction using
> state but only in the case of non mergeable windows.
>
>The best API  option I think is a TimeBaseTrigger that fires every
> configured time progression of WM  and a Window implementation that
> materializes *only data up till that WM* ( it might have more data
> but that data has event time grater than the WM ). I am not sure we have
> that built in option and thus was asking for an access the current WM for
> the window operator to allow  us handle the "*only data up till that
> WM" *range retrieval using some  custom data structure.
>
> Regards.
>
>
>
>
> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske 
> wrote:
>
>> Hi Vishal,
>>
>> the Trigger is not designed to augment 

Re: A question about Triggers

2017-12-19 Thread Fabian Hueske
Hi Vishal,

it is not guaranteed that add() and onElement() receive the same object,
and even if they do it is not guaranteed that a mutation of the object in
onElement() has an effect. The object might have been serialized and stored
in RocksDB.
Hence, elements should not be modified in onElement().

Have you considered to implement the operation completely in a
ProcessFunction instead of a session window?
This might be more code but easier to design and reason about because there
is no interaction of window assigner, trigger, and window function.


2017-12-18 20:49 GMT+01:00 Vishal Santoshi :

> I guess https://github.com/apache/flink/blob/
> 7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-streaming-
> java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/
> WindowOperator.java#L362
>
> is where We could fashion as to what is emitted. Again for us it seems
> natural to use WM to materialize a micro batches with "approximate" order (
> and no I am not a fan of spark micro batches :)). Any pointers as to how we
> could write an implementation that allows for "up till WM emission" through
> a trigger on a Session Window would be very helpful. In essence I believe
> that for any "funnel" analysis it is crucial.
>
> Something like https://github.com/apache/flink/blob/
> 7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-streaming-
> java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/
> EvictingWindowOperator.java#L346
>
> I know I am simplifying this and there has to be more to it...
>
>
>
>
> On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> The Trigger in this case would be some CountBased Trigger Again the
>> motive is the keep the state lean as we desire to search for  patterns,
>> sorted on even time,  in the incoming sessionized ( and thus of un
>> deterministic length ) stream
>>
>> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> For example, this would have worked perfect if it did not complain about
>>> MergeableWindow and state. The Session class in this encapsulates the  trim
>>> up to watermark behavior ( reduce call after telling it the current WM )
>>> we desire
>>>
>>> public class SessionProcessWindow extends ProcessWindowFunction>> Session, String, TimeWindow> {
>>>
>>> private static final ValueStateDescriptor sessionState = new 
>>> ValueStateDescriptor<>("session", Session.class);
>>>
>>> @Override
>>> public void process(String key, Context context, Iterable 
>>> elements, Collector out) throws Exception {
>>>
>>> ValueState session = 
>>> context.windowState().getState(sessionState);
>>> Session s = session.value() != null ? session.value() : new 
>>> Session();
>>> for (Event e : elements) {
>>> s.add(e);
>>> }
>>> s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
>>> s.reduce();
>>> out.collect(s);
>>> session.update(s);
>>> }
>>>
>>> @Override
>>> public void clear(Context context){
>>> ValueState session = 
>>> context.windowState().getState(sessionState);
>>> session.clear();
>>> }
>>> }
>>>
>>>
>>>
>>>
>>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 Hello Fabian, Thank you for the response.

  I think that does not work, as it is the WM of the Window Operator is
 what is desired to make deterministic decisions rather than off an operator
 the precedes the Window ? This is doable using ProcessWindowFunction using
 state but only in the case of non mergeable windows.

The best API  option I think is a TimeBaseTrigger that fires every
 configured time progression of WM  and a Window implementation that
 materializes *only data up till that WM* ( it might have more data but
 that data has event time grater than the WM ). I am not sure we have that
 built in option and thus was asking for an access the current WM for the
 window operator to allow  us handle the "*only data up till that WM" *range
 retrieval using some  custom data structure.

 Regards.




 On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske 
 wrote:

> Hi Vishal,
>
> the Trigger is not designed to augment records but just to control
> when a window is evaluated.
> I would recommend to use a ProcessFunction to enrich records with the
> current watermark before passing them into the window operator.
> The context object of the processElement() method gives access to the
> current watermark and timestamp of a record.
>
> Please note that watermarks are not deterministic but may depend on
> the order in which parallel inputs are consumed by an operator.
>
> Best, Fabian
>
> 2017-12-17 16:59 

Re: A question about Triggers

2017-12-18 Thread Vishal Santoshi
I guess
https://github.com/apache/flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L362

is where We could fashion as to what is emitted. Again for us it seems
natural to use WM to materialize a micro batches with "approximate" order (
and no I am not a fan of spark micro batches :)). Any pointers as to how we
could write an implementation that allows for "up till WM emission" through
a trigger on a Session Window would be very helpful. In essence I believe
that for any "funnel" analysis it is crucial.

Something like
https://github.com/apache/flink/blob/7f99a0df669dc73c983913c505c7f72dab3c0a4d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java#L346

I know I am simplifying this and there has to be more to it...




On Mon, Dec 18, 2017 at 11:31 AM, Vishal Santoshi  wrote:

> The Trigger in this case would be some CountBased Trigger Again the
> motive is the keep the state lean as we desire to search for  patterns,
> sorted on even time,  in the incoming sessionized ( and thus of un
> deterministic length ) stream
>
> On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> For example, this would have worked perfect if it did not complain about
>> MergeableWindow and state. The Session class in this encapsulates the  trim
>> up to watermark behavior ( reduce call after telling it the current WM )
>> we desire
>>
>> public class SessionProcessWindow extends ProcessWindowFunction> Session, String, TimeWindow> {
>>
>> private static final ValueStateDescriptor sessionState = new 
>> ValueStateDescriptor<>("session", Session.class);
>>
>> @Override
>> public void process(String key, Context context, Iterable 
>> elements, Collector out) throws Exception {
>>
>> ValueState session = 
>> context.windowState().getState(sessionState);
>> Session s = session.value() != null ? session.value() : new 
>> Session();
>> for (Event e : elements) {
>> s.add(e);
>> }
>> s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
>> s.reduce();
>> out.collect(s);
>> session.update(s);
>> }
>>
>> @Override
>> public void clear(Context context){
>> ValueState session = 
>> context.windowState().getState(sessionState);
>> session.clear();
>> }
>> }
>>
>>
>>
>>
>> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Hello Fabian, Thank you for the response.
>>>
>>>  I think that does not work, as it is the WM of the Window Operator is
>>> what is desired to make deterministic decisions rather than off an operator
>>> the precedes the Window ? This is doable using ProcessWindowFunction using
>>> state but only in the case of non mergeable windows.
>>>
>>>The best API  option I think is a TimeBaseTrigger that fires every
>>> configured time progression of WM  and a Window implementation that
>>> materializes *only data up till that WM* ( it might have more data but
>>> that data has event time grater than the WM ). I am not sure we have that
>>> built in option and thus was asking for an access the current WM for the
>>> window operator to allow  us handle the "*only data up till that WM" *range
>>> retrieval using some  custom data structure.
>>>
>>> Regards.
>>>
>>>
>>>
>>>
>>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske 
>>> wrote:
>>>
 Hi Vishal,

 the Trigger is not designed to augment records but just to control when
 a window is evaluated.
 I would recommend to use a ProcessFunction to enrich records with the
 current watermark before passing them into the window operator.
 The context object of the processElement() method gives access to the
 current watermark and timestamp of a record.

 Please note that watermarks are not deterministic but may depend on the
 order in which parallel inputs are consumed by an operator.

 Best, Fabian

 2017-12-17 16:59 GMT+01:00 Vishal Santoshi :

> An addendum
>
> Is the element reference IN  in onElement(IN element.. ) in
> Trigger, the same as IN the one provided to add(IN value) in
> Accumulator. It seems that any mutations to IN in the onElement() 
> is
> not visible to the Accumulator that is carrying it as a previous element
> reference albeit in the next invocation of add(). This seems to be only in
> distributed mode, which makes sense only if theses reference point to
> different objects.
>
> The pipeline
>
> .keyBy(keySelector)
> .window(EventTimeSessionWindows.withGap(gap))
> .trigger(new CountBasedWMAugmentationTrigger(triggerCount))
> 

Re: A question about Triggers

2017-12-18 Thread Vishal Santoshi
The Trigger in this case would be some CountBased Trigger Again the
motive is the keep the state lean as we desire to search for  patterns,
sorted on even time,  in the incoming sessionized ( and thus of un
deterministic length ) stream

On Mon, Dec 18, 2017 at 11:26 AM, Vishal Santoshi  wrote:

> For example, this would have worked perfect if it did not complain about
> MergeableWindow and state. The Session class in this encapsulates the  trim
> up to watermark behavior ( reduce call after telling it the current WM )
> we desire
>
> public class SessionProcessWindow extends ProcessWindowFunction Session, String, TimeWindow> {
>
> private static final ValueStateDescriptor sessionState = new 
> ValueStateDescriptor<>("session", Session.class);
>
> @Override
> public void process(String key, Context context, Iterable 
> elements, Collector out) throws Exception {
>
> ValueState session = 
> context.windowState().getState(sessionState);
> Session s = session.value() != null ? session.value() : new Session();
> for (Event e : elements) {
> s.add(e);
> }
> s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
> s.reduce();
> out.collect(s);
> session.update(s);
> }
>
> @Override
> public void clear(Context context){
> ValueState session = 
> context.windowState().getState(sessionState);
> session.clear();
> }
> }
>
>
>
>
> On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> Hello Fabian, Thank you for the response.
>>
>>  I think that does not work, as it is the WM of the Window Operator is
>> what is desired to make deterministic decisions rather than off an operator
>> the precedes the Window ? This is doable using ProcessWindowFunction using
>> state but only in the case of non mergeable windows.
>>
>>The best API  option I think is a TimeBaseTrigger that fires every
>> configured time progression of WM  and a Window implementation that
>> materializes *only data up till that WM* ( it might have more data but
>> that data has event time grater than the WM ). I am not sure we have that
>> built in option and thus was asking for an access the current WM for the
>> window operator to allow  us handle the "*only data up till that WM" *range
>> retrieval using some  custom data structure.
>>
>> Regards.
>>
>>
>>
>>
>> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske  wrote:
>>
>>> Hi Vishal,
>>>
>>> the Trigger is not designed to augment records but just to control when
>>> a window is evaluated.
>>> I would recommend to use a ProcessFunction to enrich records with the
>>> current watermark before passing them into the window operator.
>>> The context object of the processElement() method gives access to the
>>> current watermark and timestamp of a record.
>>>
>>> Please note that watermarks are not deterministic but may depend on the
>>> order in which parallel inputs are consumed by an operator.
>>>
>>> Best, Fabian
>>>
>>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi :
>>>
 An addendum

 Is the element reference IN  in onElement(IN element.. ) in
 Trigger, the same as IN the one provided to add(IN value) in
 Accumulator. It seems that any mutations to IN in the onElement() is
 not visible to the Accumulator that is carrying it as a previous element
 reference albeit in the next invocation of add(). This seems to be only in
 distributed mode, which makes sense only if theses reference point to
 different objects.

 The pipeline

 .keyBy(keySelector)
 .window(EventTimeSessionWindows.withGap(gap))
 .trigger(new CountBasedWMAugmentationTrigger(triggerCount))
 .aggregate(
 new AggregateFunction() {

 @Override
 public ACC createAccumulator() {
 ACC newInstance = (ACC) accumulator.clone();
 newInstance.resetLocal();
 return newInstance;
 }

 @Override
 public void add(IN value, ACC accumulator) {

 /** This method is called before onElement of the Trigger 
 and keeps the reference to the last IN **/


 accumulator.add(value);

 }
 .

The Trigger


 public class CountBasedWMAugmentationTrigger>>> Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W 
 extends Window> extends Trigger {


 @Override

 public TriggerResult onElement(T element, long timestamp, W window, 
 TriggerContext ctx) throws Exception {

 /** The element T is mutated to carry the watermark **/
 

Re: A question about Triggers

2017-12-18 Thread Vishal Santoshi
For example, this would have worked perfect if it did not complain about
MergeableWindow and state. The Session class in this encapsulates the  trim
up to watermark behavior ( reduce call after telling it the current WM )
we desire

public class SessionProcessWindow extends ProcessWindowFunction {

private static final ValueStateDescriptor sessionState =
new ValueStateDescriptor<>("session", Session.class);

@Override
public void process(String key, Context context, Iterable
elements, Collector out) throws Exception {

ValueState session =
context.windowState().getState(sessionState);
Session s = session.value() != null ? session.value() : new Session();
for (Event e : elements) {
s.add(e);
}
s.lastWaterMarkedEventLite.serverTime = context.currentWatermark();
s.reduce();
out.collect(s);
session.update(s);
}

@Override
public void clear(Context context){
ValueState session =
context.windowState().getState(sessionState);
session.clear();
}
}




On Mon, Dec 18, 2017 at 11:08 AM, Vishal Santoshi  wrote:

> Hello Fabian, Thank you for the response.
>
>  I think that does not work, as it is the WM of the Window Operator is
> what is desired to make deterministic decisions rather than off an operator
> the precedes the Window ? This is doable using ProcessWindowFunction using
> state but only in the case of non mergeable windows.
>
>The best API  option I think is a TimeBaseTrigger that fires every
> configured time progression of WM  and a Window implementation that
> materializes *only data up till that WM* ( it might have more data but
> that data has event time grater than the WM ). I am not sure we have that
> built in option and thus was asking for an access the current WM for the
> window operator to allow  us handle the "*only data up till that WM" *range
> retrieval using some  custom data structure.
>
> Regards.
>
>
>
>
> On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske  wrote:
>
>> Hi Vishal,
>>
>> the Trigger is not designed to augment records but just to control when a
>> window is evaluated.
>> I would recommend to use a ProcessFunction to enrich records with the
>> current watermark before passing them into the window operator.
>> The context object of the processElement() method gives access to the
>> current watermark and timestamp of a record.
>>
>> Please note that watermarks are not deterministic but may depend on the
>> order in which parallel inputs are consumed by an operator.
>>
>> Best, Fabian
>>
>> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi :
>>
>>> An addendum
>>>
>>> Is the element reference IN  in onElement(IN element.. ) in
>>> Trigger, the same as IN the one provided to add(IN value) in
>>> Accumulator. It seems that any mutations to IN in the onElement() is
>>> not visible to the Accumulator that is carrying it as a previous element
>>> reference albeit in the next invocation of add(). This seems to be only in
>>> distributed mode, which makes sense only if theses reference point to
>>> different objects.
>>>
>>> The pipeline
>>>
>>> .keyBy(keySelector)
>>> .window(EventTimeSessionWindows.withGap(gap))
>>> .trigger(new CountBasedWMAugmentationTrigger(triggerCount))
>>> .aggregate(
>>> new AggregateFunction() {
>>>
>>> @Override
>>> public ACC createAccumulator() {
>>> ACC newInstance = (ACC) accumulator.clone();
>>> newInstance.resetLocal();
>>> return newInstance;
>>> }
>>>
>>> @Override
>>> public void add(IN value, ACC accumulator) {
>>>
>>> /** This method is called before onElement of the Trigger 
>>> and keeps the reference to the last IN **/
>>>
>>>
>>> accumulator.add(value);
>>>
>>> }
>>> .
>>>
>>>The Trigger
>>>
>>>
>>> public class CountBasedWMAugmentationTrigger>> Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W 
>>> extends Window> extends Trigger {
>>>
>>>
>>> @Override
>>>
>>> public TriggerResult onElement(T element, long timestamp, W window, 
>>> TriggerContext ctx) throws Exception {
>>>
>>> /** The element T is mutated to carry the watermark **/
>>> *element.setWaterMark(ctx.getCurrentWatermark());*
>>>
>>> .
>>>
>>>
>>>
>>>
>>>
>>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
>>> vishal.santo...@gmail.com> wrote:
>>>
 I want to augment a POJO in  Trigger's onElement method, specifically
 supply the POJO with the watermark from the TriggerContext. The sequence of
 execution is this sequence

 1. call to add() in the accumulator for the window  and save the POJO
 reference in the Accumulator.
 2. call to onElement on Tigger

Re: A question about Triggers

2017-12-18 Thread Vishal Santoshi
Hello Fabian, Thank you for the response.

 I think that does not work, as it is the WM of the Window Operator is what
is desired to make deterministic decisions rather than off an operator the
precedes the Window ? This is doable using ProcessWindowFunction using
state but only in the case of non mergeable windows.

   The best API  option I think is a TimeBaseTrigger that fires every
configured time progression of WM  and a Window implementation that
materializes *only data up till that WM* ( it might have more data but that
data has event time grater than the WM ). I am not sure we have that built
in option and thus was asking for an access the current WM for the window
operator to allow  us handle the "*only data up till that WM" *range
retrieval using some  custom data structure.

Regards.




On Mon, Dec 18, 2017 at 5:14 AM, Fabian Hueske  wrote:

> Hi Vishal,
>
> the Trigger is not designed to augment records but just to control when a
> window is evaluated.
> I would recommend to use a ProcessFunction to enrich records with the
> current watermark before passing them into the window operator.
> The context object of the processElement() method gives access to the
> current watermark and timestamp of a record.
>
> Please note that watermarks are not deterministic but may depend on the
> order in which parallel inputs are consumed by an operator.
>
> Best, Fabian
>
> 2017-12-17 16:59 GMT+01:00 Vishal Santoshi :
>
>> An addendum
>>
>> Is the element reference IN  in onElement(IN element.. ) in
>> Trigger, the same as IN the one provided to add(IN value) in
>> Accumulator. It seems that any mutations to IN in the onElement() is
>> not visible to the Accumulator that is carrying it as a previous element
>> reference albeit in the next invocation of add(). This seems to be only in
>> distributed mode, which makes sense only if theses reference point to
>> different objects.
>>
>> The pipeline
>>
>> .keyBy(keySelector)
>> .window(EventTimeSessionWindows.withGap(gap))
>> .trigger(new CountBasedWMAugmentationTrigger(triggerCount))
>> .aggregate(
>> new AggregateFunction() {
>>
>> @Override
>> public ACC createAccumulator() {
>> ACC newInstance = (ACC) accumulator.clone();
>> newInstance.resetLocal();
>> return newInstance;
>> }
>>
>> @Override
>> public void add(IN value, ACC accumulator) {
>>
>> /** This method is called before onElement of the Trigger 
>> and keeps the reference to the last IN **/
>>
>>
>> accumulator.add(value);
>>
>> }
>> .
>>
>>The Trigger
>>
>>
>> public class CountBasedWMAugmentationTrigger> Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W 
>> extends Window> extends Trigger {
>>
>>
>> @Override
>>
>> public TriggerResult onElement(T element, long timestamp, W window, 
>> TriggerContext ctx) throws Exception {
>>
>> /** The element T is mutated to carry the watermark **/
>> *element.setWaterMark(ctx.getCurrentWatermark());*
>>
>> .
>>
>>
>>
>>
>>
>> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> I want to augment a POJO in  Trigger's onElement method, specifically
>>> supply the POJO with the watermark from the TriggerContext. The sequence of
>>> execution is this sequence
>>>
>>> 1. call to add() in the accumulator for the window  and save the POJO
>>> reference in the Accumulator.
>>> 2. call to onElement on Tigger
>>> 3. set watermark to the POJO
>>>
>>> The next add() method should have the last reference and any mutation
>>> done in step 3.
>>>
>>> That works in a local test case, using LocalFlinkMiniCluster, as in I
>>> have access to the mutation by the onElement() in the POJO in the
>>> subsequent add(),  but not on a distributed cluster. The specific question
>>> I had is whether  add() on a supplied accumulator on a window and
>>> onElement() method of the trigger on that window are inline executions, on
>>> the same thread or is there any serialization/deserialization IPC that
>>> causes these divergence ( local versus distributed )
>>>
>>> Regards.
>>>
>>
>>
>


Re: A question about Triggers

2017-12-18 Thread Fabian Hueske
Hi Vishal,

the Trigger is not designed to augment records but just to control when a
window is evaluated.
I would recommend to use a ProcessFunction to enrich records with the
current watermark before passing them into the window operator.
The context object of the processElement() method gives access to the
current watermark and timestamp of a record.

Please note that watermarks are not deterministic but may depend on the
order in which parallel inputs are consumed by an operator.

Best, Fabian

2017-12-17 16:59 GMT+01:00 Vishal Santoshi :

> An addendum
>
> Is the element reference IN  in onElement(IN element.. ) in
> Trigger, the same as IN the one provided to add(IN value) in
> Accumulator. It seems that any mutations to IN in the onElement() is
> not visible to the Accumulator that is carrying it as a previous element
> reference albeit in the next invocation of add(). This seems to be only in
> distributed mode, which makes sense only if theses reference point to
> different objects.
>
> The pipeline
>
> .keyBy(keySelector)
> .window(EventTimeSessionWindows.withGap(gap))
> .trigger(new CountBasedWMAugmentationTrigger(triggerCount))
> .aggregate(
> new AggregateFunction() {
>
> @Override
> public ACC createAccumulator() {
> ACC newInstance = (ACC) accumulator.clone();
> newInstance.resetLocal();
> return newInstance;
> }
>
> @Override
> public void add(IN value, ACC accumulator) {
>
> /** This method is called before onElement of the Trigger and 
> keeps the reference to the last IN **/
>
>
> accumulator.add(value);
>
> }
> .
>
>The Trigger
>
>
> public class CountBasedWMAugmentationTrigger Serializable & CountBasedWMAugmentationTrigger.HasWaterMark, W 
> extends Window> extends Trigger {
>
>
> @Override
>
> public TriggerResult onElement(T element, long timestamp, W window, 
> TriggerContext ctx) throws Exception {
>
> /** The element T is mutated to carry the watermark **/
> *element.setWaterMark(ctx.getCurrentWatermark());*
>
> .
>
>
>
>
>
> On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi <
> vishal.santo...@gmail.com> wrote:
>
>> I want to augment a POJO in  Trigger's onElement method, specifically
>> supply the POJO with the watermark from the TriggerContext. The sequence of
>> execution is this sequence
>>
>> 1. call to add() in the accumulator for the window  and save the POJO
>> reference in the Accumulator.
>> 2. call to onElement on Tigger
>> 3. set watermark to the POJO
>>
>> The next add() method should have the last reference and any mutation
>> done in step 3.
>>
>> That works in a local test case, using LocalFlinkMiniCluster, as in I
>> have access to the mutation by the onElement() in the POJO in the
>> subsequent add(),  but not on a distributed cluster. The specific question
>> I had is whether  add() on a supplied accumulator on a window and
>> onElement() method of the trigger on that window are inline executions, on
>> the same thread or is there any serialization/deserialization IPC that
>> causes these divergence ( local versus distributed )
>>
>> Regards.
>>
>
>


Re: A question about Triggers

2017-12-17 Thread Vishal Santoshi
An addendum

Is the element reference IN  in onElement(IN element.. ) in Trigger,
the same as IN the one provided to add(IN value) in Accumulator. It
seems that any mutations to IN in the onElement() is not visible to the
Accumulator that is carrying it as a previous element  reference albeit in
the next invocation of add(). This seems to be only in distributed mode,
which makes sense only if theses reference point to different objects.

The pipeline

.keyBy(keySelector)
.window(EventTimeSessionWindows.withGap(gap))
.trigger(new CountBasedWMAugmentationTrigger(triggerCount))
.aggregate(
new AggregateFunction() {

@Override
public ACC createAccumulator() {
ACC newInstance = (ACC) accumulator.clone();
newInstance.resetLocal();
return newInstance;
}

@Override
public void add(IN value, ACC accumulator) {

/** This method is called before onElement of the
Trigger and keeps the reference to the last IN **/


accumulator.add(value);

}
.

   The Trigger


public class CountBasedWMAugmentationTrigger extends Trigger {


@Override

public TriggerResult onElement(T element, long timestamp, W
window, TriggerContext ctx) throws Exception {

/** The element T is mutated to carry the watermark **/
*element.setWaterMark(ctx.getCurrentWatermark());*

.





On Sun, Dec 17, 2017 at 1:52 AM, Vishal Santoshi 
wrote:

> I want to augment a POJO in  Trigger's onElement method, specifically
> supply the POJO with the watermark from the TriggerContext. The sequence of
> execution is this sequence
>
> 1. call to add() in the accumulator for the window  and save the POJO
> reference in the Accumulator.
> 2. call to onElement on Tigger
> 3. set watermark to the POJO
>
> The next add() method should have the last reference and any mutation done
> in step 3.
>
> That works in a local test case, using LocalFlinkMiniCluster, as in I have
> access to the mutation by the onElement() in the POJO in the subsequent
> add(),  but not on a distributed cluster. The specific question I had is
> whether  add() on a supplied accumulator on a window and onElement() method
> of the trigger on that window are inline executions, on the same thread or
> is there any serialization/deserialization IPC that causes these divergence
> ( local versus distributed )
>
> Regards.
>


A question about Triggers

2017-12-16 Thread Vishal Santoshi
I want to augment a POJO in  Trigger's onElement method, specifically
supply the POJO with the watermark from the TriggerContext. The sequence of
execution is this sequence

1. call to add() in the accumulator for the window  and save the POJO
reference in the Accumulator.
2. call to onElement on Tigger
3. set watermark to the POJO

The next add() method should have the last reference and any mutation done
in step 3.

That works in a local test case, using LocalFlinkMiniCluster, as in I have
access to the mutation by the onElement() in the POJO in the subsequent
add(),  but not on a distributed cluster. The specific question I had is
whether  add() on a supplied accumulator on a window and onElement() method
of the trigger on that window are inline executions, on the same thread or
is there any serialization/deserialization IPC that causes these divergence
( local versus distributed )

Regards.