Yes, let me describe an example use-case that I'm trying to implement 
efficiently within Flink.

We've been asked to aggregate per-user data on a daily level, and from there 
produce aggregates on a variety of time frames. For example, 7 days, 30 days, 
180 days, and 365 days.

We can talk about the hardest one, the 365 day window, with the knowledge that 
adding the other time windows magnifies the problem.

I can easily use tumbling time windows of 1-day size for the first aggregation. 
However, for the longer aggregation, if I take the naive approach and use a 
sliding window, the window size would be 365 days and the slide would be one 
day. If a user comes back every day, I run the risk of magnifying the size of 
the data by up to 365 because each day of data will be included in up to 365 
year-long window panes. Also, if I want to fire the aggregate information more 
rapidly than once a day, then I have to worry about getting 365 different 
windows fired at the same time & trying to figure out which one to pay 
attention to, or coming up with a hare-brained custom firing trigger. We tried 
emitting each day-aggregate into a time series database and doing the final 365 
day aggregation as a query, but that was more complicated than we wanted: in 
particular we'd like to have all the logic in the Flink job not split across 
different technology & infrastructure.

The work-around I'm thinking of is to use a single window that contains 365 
days of data (relative to the current watermark) on an ongoing basis. The 
windowing function would be responsible for evicting old data based on the 
current watermark.

Does that make sense? Does it seem logical, or am I misunderstanding something 
about how Flink works?

-Shannon


From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>>
Date: Monday, August 29, 2016 at 3:56 AM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Firing windows multiple times

Hi,
that would certainly be possible? What do you think can be gained by having 
knowledge about the current watermark in the WindowFunction, in a specific 
case, possibly?

Cheers,
Aljoscha

On Wed, 24 Aug 2016 at 23:21 Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>> wrote:
What do you think about adding the current watermark to the window function 
metadata in FLIP-2?

From: Shannon Carey <sca...@expedia.com<mailto:sca...@expedia.com>>
Date: Friday, August 12, 2016 at 6:24 PM
To: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>>, 
"user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>

Subject: Re: Firing windows multiple times

Thanks Aljoscha, I didn't know about those. Yes, they look like handy changes, 
especially to enable flexible approaches for eviction. In particular, having 
the current watermark available to the evictor via EvictorContext is helpful: 
it will be able to evict the old data more easily without needing to rely on 
Window#maxTimestamp().

However, I think you might still be missing a piece. Specifically, it would 
still not be possible for the window function to choose which items to 
aggregate based on the current watermark. In particular, it is desirable to be 
able to aggregate only the items below the watermark, omitting items which have 
come in with timestamps larger than the watermark. Does that make sense?

-Shannon

From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>>
Date: Friday, August 12, 2016 at 4:25 AM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Firing windows multiple times

Hi,
there is already this FLIP: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor
 which also links to a mailing list discussion. And this FLIP: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata.
 The former proposes to enhance the Evictor API a bit, among other things we 
propose to give the evictor access to the current watermark. The other FLIP 
proposes to extend the amount of meta-data we give to the window function. The 
first to things we propose to add is a "firing reason" that would tell you 
whether this was an early firing, an on time firing or a late firing. The 
second thing is a firing counter that would tell you how many times the trigger 
has fired so far for the current window.

Would a combination of these help with your use case?

Cheers,
Aljoscha

On Thu, 11 Aug 2016 at 19:19 Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>> wrote:
"If Window B is a Folding Window and does not have an evictor then it should 
not keep the list of all received elements."

Agreed! Upon closer inspection, the behavior I'm describing is only present 
when using EvictingWindowOperator, not when using WindowOperator. I misread 
line 382 of WindowOperator which calls windowState.add(): in actuality, the 
windowState is a FoldingState which incorporates the user-provided fold 
function in order to eagerly fold the data. In contrast, if you use an evictor, 
EvictingWindowOperator has the behavior I describe.

I am already using a custom Trigger which uses a processing timer to FIRE a 
short time after a new event comes in, and an event timer to FIRE_AND_PURGE.

It seems that I can achieve the desired effect by avoiding use of an evictor so 
that the intermediate events are not retained in an EvictingWindowOperator's 
state, and perform any necessary eviction within my fold function. This has the 
aforementioned drawbacks of the windowed fold function not knowing about 
watermarks, and therefore it is difficult to be precise about choosing which 
items to evict. However, this seems to be the best choice within the current 
framework.

Interestingly, it appears that TimeEvictor doesn't really know about watermarks 
either. When a window emits an event, regardless of how it was fired, it is 
assigned the timestamp given by its window's maxTimestamp(), which might be 
much greater than the processing time that actually fired the event. Then, 
TimeEvictor compares the max timestamp of all items in the window against the 
other ones in order to determine which ones to evict. Basically, it assumes 
that the events were emitted due to the window terminating with FIRE_AND_PURGE. 
What if we gave more information (specifically, the current watermark) to the 
evictor in order to allow it to deal with a mix of intermediate events (fired 
by processing time) and final events (fired by event time when the watermark 
reaches the window)? That value is already available in the WindowOperator & 
could be passed to the Evictor very easily. It would be an API change, of 
course.

Other than that, is it worth considering a change to EvictingWindowOperator to 
allow user-supplied functions to reduce the size of its state when people fire 
upstream windows repeatedly? From what I see when I monitor the state with 
debugger print statements, the EvictingWindowOperator is definitely holding on 
to all the elements ever received, not just the aggregated result. You can see 
this clearly because EvictingWindowOperator holds a ListState instead of a 
FoldingState. The user-provided fold function is only applied upon fire().

-Shannon


Reply via email to