Hi,
there is already this FLIP:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor
which
also links to a mailing list discussion. And this FLIP:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata.
The former proposes to enhance the Evictor API a bit, among other things we
propose to give the evictor access to the current watermark. The other FLIP
proposes to extend the amount of meta-data we give to the window function.
The first to things we propose to add is a "firing reason" that would tell
you whether this was an early firing, an on time firing or a late firing.
The second thing is a firing counter that would tell you how many times the
trigger has fired so far for the current window.

Would a combination of these help with your use case?

Cheers,
Aljoscha

On Thu, 11 Aug 2016 at 19:19 Shannon Carey <sca...@expedia.com> wrote:

> "If Window B is a Folding Window and does not have an evictor then it
> should not keep the list of all received elements."
>
> Agreed! Upon closer inspection, the behavior I'm describing is only
> present when using EvictingWindowOperator, not when using WindowOperator. I
> misread line 382 of WindowOperator which calls windowState.add(): in
> actuality, the windowState is a FoldingState which incorporates the
> user-provided fold function in order to eagerly fold the data. In contrast,
> if you use an evictor, EvictingWindowOperator has the behavior I describe.
>
> I am already using a custom Trigger which uses a processing timer to FIRE
> a short time after a new event comes in, and an event timer to
> FIRE_AND_PURGE.
>
> It seems that I can achieve the desired effect by avoiding use of an
> evictor so that the intermediate events are not retained in an
> EvictingWindowOperator's state, and perform any necessary eviction within
> my fold function. This has the aforementioned drawbacks of the windowed
> fold function not knowing about watermarks, and therefore it is difficult
> to be precise about choosing which items to evict. However, this seems to
> be the best choice within the current framework.
>
> Interestingly, it appears that TimeEvictor doesn't really know about
> watermarks either. When a window emits an event, regardless of how it was
> fired, it is assigned the timestamp given by its window's maxTimestamp(),
> which might be much greater than the processing time that actually fired
> the event. Then, TimeEvictor compares the max timestamp of all items in the
> window against the other ones in order to determine which ones to evict.
> Basically, it assumes that the events were emitted due to the window
> terminating with FIRE_AND_PURGE. What if we gave more information
> (specifically, the current watermark) to the evictor in order to allow it
> to deal with a mix of intermediate events (fired by processing time) and
> final events (fired by event time when the watermark reaches the window)?
> That value is already available in the WindowOperator & could be passed to
> the Evictor very easily. It would be an API change, of course.
>
> Other than that, is it worth considering a change to
> EvictingWindowOperator to allow user-supplied functions to reduce the size
> of its state when people fire upstream windows repeatedly? From what I see
> when I monitor the state with debugger print statements, the
> EvictingWindowOperator is definitely holding on to all the elements ever
> received, not just the aggregated result. You can see this clearly because
> EvictingWindowOperator holds a ListState instead of a FoldingState. The
> user-provided fold function is only applied upon fire().
>
> -Shannon
>
>
>

Reply via email to