"If Window B is a Folding Window and does not have an evictor then it should 
not keep the list of all received elements."

Agreed! Upon closer inspection, the behavior I'm describing is only present 
when using EvictingWindowOperator, not when using WindowOperator. I misread 
line 382 of WindowOperator which calls windowState.add(): in actuality, the 
windowState is a FoldingState which incorporates the user-provided fold 
function in order to eagerly fold the data. In contrast, if you use an evictor, 
EvictingWindowOperator has the behavior I describe.

I am already using a custom Trigger which uses a processing timer to FIRE a 
short time after a new event comes in, and an event timer to FIRE_AND_PURGE.

It seems that I can achieve the desired effect by avoiding use of an evictor so 
that the intermediate events are not retained in an EvictingWindowOperator's 
state, and perform any necessary eviction within my fold function. This has the 
aforementioned drawbacks of the windowed fold function not knowing about 
watermarks, and therefore it is difficult to be precise about choosing which 
items to evict. However, this seems to be the best choice within the current 
framework.

Interestingly, it appears that TimeEvictor doesn't really know about watermarks 
either. When a window emits an event, regardless of how it was fired, it is 
assigned the timestamp given by its window's maxTimestamp(), which might be 
much greater than the processing time that actually fired the event. Then, 
TimeEvictor compares the max timestamp of all items in the window against the 
other ones in order to determine which ones to evict. Basically, it assumes 
that the events were emitted due to the window terminating with FIRE_AND_PURGE. 
What if we gave more information (specifically, the current watermark) to the 
evictor in order to allow it to deal with a mix of intermediate events (fired 
by processing time) and final events (fired by event time when the watermark 
reaches the window)? That value is already available in the WindowOperator & 
could be passed to the Evictor very easily. It would be an API change, of 
course.

Other than that, is it worth considering a change to EvictingWindowOperator to 
allow user-supplied functions to reduce the size of its state when people fire 
upstream windows repeatedly? From what I see when I monitor the state with 
debugger print statements, the EvictingWindowOperator is definitely holding on 
to all the elements ever received, not just the aggregated result. You can see 
this clearly because EvictingWindowOperator holds a ListState instead of a 
FoldingState. The user-provided fold function is only applied upon fire().

-Shannon


Reply via email to