Re: Firing windows multiple times

2016-09-11 Thread aj.h
In the way that FLIP-2 would solve this problem, secondAggregate would ignore
the early firing updates from firstAggregate to prevent double-counting,
correct? If that's the case, I am trying to understand why we'd want to
trigger early-fires every 30 seconds for the secondAggregate if it's only
accepting new results at a daily rate, after firstAggregate's primary firing
at the end of the window. If we filter out results from early-fires,
wouldn't every 30-second result from secondAggregate remain unchanged within
the same 1-day window?

Similarly (compounded) for a 365-day window aggregating over a 30 day
window: if it filters out early fires, wouldn't it only produce new/unique
results every 30 days?

I very well may have misunderstood this solution.



--
View this message in context: 
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Firing-windows-multiple-times-tp8424p8994.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at 
Nabble.com.


Re: Firing windows multiple times

2016-09-09 Thread Aljoscha Krettek
Hi,
I'd be very happy to give you pointers for FLIP-2 and FLIP-4. Why don't you
start a separate thread on the dev list so that we don't hijack this thread.

For FLIP-4 we also have to coordinate with Vishnu, he was driving FLIP-4
but lately everyone has been a bit inactive on that. Let's see if he as
anything to say, I'll loop him in directly.

Cheers,
Aljoscha

On Thu, 8 Sep 2016 at 21:48 aj.h <drfl...@gmail.com> wrote:

> Hi, I'm interested in helping out on this project. I also want to
> implement a
> continuous time-boxed sliding window, my current use case is a 60-second
> sliding window that moves whenever a newer event arrives, discarding any
> late events that arrive outside the current window, but *also*
> re-triggering
> window processing for any late events within the current window. I
> considered using sliding windows with a 1-second granularity, but I'd be
> discarding a lot of windows on sparse data, and rebuilding pontetially very
> large windows for relatively small 1-second updates.
>
> I'm a fellow in the Insight Data Engineering program. We just got underway,
> and I have 3 weeks in which to complete a project. I'd love to tackle this
> one, and I'm trying to assess the practicality and feasibility of it.
>
> I noticed that FLIP-2 and FLIP-4 are still under discussion; is it
> premature
> to try to implement these enhancements? And would you be at all
> willing/available to help me get up to speed?
>
> Thank you much!
>
>
>
> --
> View this message in context:
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Firing-windows-multiple-times-tp8424p8975.html
> Sent from the Apache Flink User Mailing List archive. mailing list archive
> at Nabble.com.
>


Re: Firing windows multiple times

2016-09-05 Thread Aljoscha Krettek
I forgot to mention the FLIP that would basically provide the functionality
that we need (without handling of late elements):
https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata.
I just need to find some time to implement this or find someone who would
be wiling to implement it.

You're right, the "allowed lateness" feature was newly introduced in Flink
1.1. You're also mostly right right about the possibilities it opens up.
With the addition there are basically two knobs now that can be used to
tune the behavior of Flink when it comes to event-time, watermarks and
lateness. Having a bit of allowed lateness allows the watermark to be a bit
more aggressive in when it updates the time. If you don't allow any
lateness the watermark better be pretty close to correct, otherwise you
might lose data. I agree that this is not really intuitive for everyone and
I myself don't really know what would be good settings in production for
all cases.

How are you dealing with (or planning to deal with) elements that arrive
behind the watermark? Is it ok for you to completely drop them? I'm trying
to learn what the requirements of different folks are.

Best,
Aljoscha

On Fri, 2 Sep 2016 at 19:44 Shannon Carey <sca...@expedia.com> wrote:

> Of course! I really appreciate your interest & attention. I hope we will
> figure out solutions that other people can use.
>
> I agree with your analysis. Your triggering syntax is particularly nice. I
> wrote a custom trigger which does exactly that but without the nice fluent
> API. As I considered the approach you mentioned, it was clear that I would
> not be able to easily solve the problem of multiple windows with
> early-firing events causing over-counting. Modifying the windowing system
> as you describe would be helpful. Events could either be filtered out, as
> you describe, or perhaps the windows themselves could be muted/un-muted
> depending on whether they are the closest window (by end time) to the
> current watermark.
>
> I'm not clear on the purpose of the late firing you describe. I believe
> that was added in Flink 1.1 and it's a new concept to me. I thought late
> events were completely handled by decisions made in the watermark &
> timestamp assigner. Does this feature allow events after the watermark to
> still be incorporated into windows that have already been closed by a
> watermark? Perhaps it's intended to allow window-specific lateness
> allowance, rather than the stream-global watermarker? That does sound
> problematic. I assume there's a reason for closing the window before the
> allowed lateness has elapsed? Otherwise, the window (trigger, really) could
> just add the lateness to the watermark and pretend that the watermark
> hadn't been reached until the lateness had already passed.
>
> I agree that your idea is potentially a lot better than the approach I
> described, if it can be implemented! You are right that the approach I
> described requires that all the events be retained in the window state so
> that aggregation can be done repeatedly from the raw events as new events
> come in and old events are evicted. In practice, we are currently writing
> the first aggregations (day-level) to an external database and then
> querying that time-series from the second-level (year) aggregation so that
> we don't actually need to keep all that data around in Flink state.
> Obviously, that approach can have an impact on the processing guarantees
> when a failure/recovery occurs if we don't do it carefully. Also, we're not
> particularly sophisticated yet with regard to avoiding unnecessary queries
> to the time series data.
>
> -Shannon
>
>
> From: Aljoscha Krettek <aljos...@apache.org>
> Date: Friday, September 2, 2016 at 4:02 AM
>
> To: "user@flink.apache.org" <user@flink.apache.org>
> Subject: Re: Firing windows multiple times
>
> I see, I didn't forget about this, it's just that I'm thinking hard.
>
> I think in your case (which I imagine some other people to also have) we
> would need an addition to the windowing system that the original Google
> Dataflow paper called retractions. The problem is best explained with an
> example. Say you have this program:
>
> DataStream input = ...
>
> DataStream firstAggregate = input
>   .keyBy(...)
>   .window(TumblingTimeWindow(1 Day))
>
> .trigger(EventTime.afterEndOfWindow().withEarlyTrigger(Repeatedly.forever(ProcessingTime.afterFirstElement(Time.seconds(30)
>   .reduce(new SomeAggregate())
>
> DataStream secondAggregate = firstAggregate
>   .keyBy(...)
>   .window(TumblingTimeWindow(5 Days)
>
> .trigger(EventTime.afterEndOfWindow().withEarlyTrigger(Repeatedly.forever(ProcessingTime.afterFirstElement(Time.seconds(30)
>   

Re: Firing windows multiple times

2016-09-02 Thread Shannon Carey
Of course! I really appreciate your interest & attention. I hope we will figure 
out solutions that other people can use.

I agree with your analysis. Your triggering syntax is particularly nice. I 
wrote a custom trigger which does exactly that but without the nice fluent API. 
As I considered the approach you mentioned, it was clear that I would not be 
able to easily solve the problem of multiple windows with early-firing events 
causing over-counting. Modifying the windowing system as you describe would be 
helpful. Events could either be filtered out, as you describe, or perhaps the 
windows themselves could be muted/un-muted depending on whether they are the 
closest window (by end time) to the current watermark.

I'm not clear on the purpose of the late firing you describe. I believe that 
was added in Flink 1.1 and it's a new concept to me. I thought late events were 
completely handled by decisions made in the watermark & timestamp assigner. 
Does this feature allow events after the watermark to still be incorporated 
into windows that have already been closed by a watermark? Perhaps it's 
intended to allow window-specific lateness allowance, rather than the 
stream-global watermarker? That does sound problematic. I assume there's a 
reason for closing the window before the allowed lateness has elapsed? 
Otherwise, the window (trigger, really) could just add the lateness to the 
watermark and pretend that the watermark hadn't been reached until the lateness 
had already passed.

I agree that your idea is potentially a lot better than the approach I 
described, if it can be implemented! You are right that the approach I 
described requires that all the events be retained in the window state so that 
aggregation can be done repeatedly from the raw events as new events come in 
and old events are evicted. In practice, we are currently writing the first 
aggregations (day-level) to an external database and then querying that 
time-series from the second-level (year) aggregation so that we don't actually 
need to keep all that data around in Flink state. Obviously, that approach can 
have an impact on the processing guarantees when a failure/recovery occurs if 
we don't do it carefully. Also, we're not particularly sophisticated yet with 
regard to avoiding unnecessary queries to the time series data.

-Shannon


From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>>
Date: Friday, September 2, 2016 at 4:02 AM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Firing windows multiple times

I see, I didn't forget about this, it's just that I'm thinking hard.

I think in your case (which I imagine some other people to also have) we would 
need an addition to the windowing system that the original Google Dataflow 
paper called retractions. The problem is best explained with an example. Say 
you have this program:

DataStream input = ...

DataStream firstAggregate = input
  .keyBy(...)
  .window(TumblingTimeWindow(1 Day))
  
.trigger(EventTime.afterEndOfWindow().withEarlyTrigger(Repeatedly.forever(ProcessingTime.afterFirstElement(Time.seconds(30)
  .reduce(new SomeAggregate())

DataStream secondAggregate = firstAggregate
  .keyBy(...)
  .window(TumblingTimeWindow(5 Days)
  
.trigger(EventTime.afterEndOfWindow().withEarlyTrigger(Repeatedly.forever(ProcessingTime.afterFirstElement(Time.seconds(30)
  .reduce(new SomeAggregate())

The problem here is that the second windowing operation sees all the 
incremental early-firing updates from the first window operation, it would thus 
over count. This problem could be overcome by introducing meta data in the 
windowing system and filtering out those results that indicate that they come 
from an early (speculative) firing. A second problem is that of late firings, 
i.e. if you have a window specification like this:

DataStream firstAggregate = input
  .keyBy(...)
  .window(TumblingTimeWindow(1 Day))
  .allowedLateness(1 Hour)
  .trigger(
EventTime.afterEndOfWindow()
 
.withEarlyTrigger(Repeatedly.forever(ProcessingTime.afterFirstElement(Time.seconds(30
 
.withLateTrigger(Repeatedly.forever(ProcessingTime.afterFirstElement(Time.seconds(30)
  .reduce(new SomeAggregate())

where you also have late firing data after you got the primary firing when the 
watermark passed the end of the window. That's were retractions come into play, 
before sending data downstream form a late firing the window operator has to 
send the inverse of the previous firing so that the downstream operation can 
"subtract" that from the current aggregate and replace it with the newly 
updated aggregate. This is a somewhat thorny problem, though, and to the best 
of my knowledge Google never implemented this in the publicly available 
Dataflow SDK or what is now Beam.

The reason why I'm thinking in this direction a

Re: Firing windows multiple times

2016-09-02 Thread Aljoscha Krettek
I see, I didn't forget about this, it's just that I'm thinking hard.

I think in your case (which I imagine some other people to also have) we
would need an addition to the windowing system that the original Google
Dataflow paper called retractions. The problem is best explained with an
example. Say you have this program:

DataStream input = ...

DataStream firstAggregate = input
  .keyBy(...)
  .window(TumblingTimeWindow(1 Day))

.trigger(EventTime.afterEndOfWindow().withEarlyTrigger(Repeatedly.forever(ProcessingTime.afterFirstElement(Time.seconds(30)
  .reduce(new SomeAggregate())

DataStream secondAggregate = firstAggregate
  .keyBy(...)
  .window(TumblingTimeWindow(5 Days)

.trigger(EventTime.afterEndOfWindow().withEarlyTrigger(Repeatedly.forever(ProcessingTime.afterFirstElement(Time.seconds(30)
  .reduce(new SomeAggregate())

The problem here is that the second windowing operation sees all the
incremental early-firing updates from the first window operation, it would
thus over count. This problem could be overcome by introducing meta data in
the windowing system and filtering out those results that indicate that
they come from an early (speculative) firing. A second problem is that of
late firings, i.e. if you have a window specification like this:

DataStream firstAggregate = input
  .keyBy(...)
  .window(TumblingTimeWindow(1 Day))
  .allowedLateness(1 Hour)
  .trigger(
EventTime.afterEndOfWindow()

 
.withEarlyTrigger(Repeatedly.forever(ProcessingTime.afterFirstElement(Time.seconds(30

 
.withLateTrigger(Repeatedly.forever(ProcessingTime.afterFirstElement(Time.seconds(30)
  .reduce(new SomeAggregate())

where you also have late firing data after you got the primary firing when
the watermark passed the end of the window. That's were retractions come
into play, before sending data downstream form a late firing the window
operator has to send the inverse of the previous firing so that the
downstream operation can "subtract" that from the current aggregate and
replace it with the newly updated aggregate. This is a somewhat thorny
problem, though, and to the best of my knowledge Google never implemented
this in the publicly available Dataflow SDK or what is now Beam.

The reason why I'm thinking in this direction and not in the direction of
keeping track of the watermark and manually evicting elements as you go is
that I think that this approach would be more memory efficient and easier
to understand. I don't understand yet how a single window computation could
keep track of aggregates for differently sized time windows and evict the
correct elements without keeping all the elements in some store. Maybe you
could shed some light on this? I'd be happy if there was a simple solution
for this. :-)

Cheers,
Aljoscha



On Tue, 30 Aug 2016 at 23:49 Shannon Carey <sca...@expedia.com> wrote:

> I appreciate your suggestion!
>
> However, the main problem with your approach is the amount of time that
> goes by without an updated value from minuteAggregate and hourlyAggregate
> (lack of a continuously updated aggregate).
>
> For example, if we use a tumbling window of 1 month duration, then we only
> get an update for that value once a month! The values from that stream will
> be on average 0.5 months stale. A year-long window is even worse.
>
> -Shannon
>
> From: Aljoscha Krettek <aljos...@apache.org>
> Date: Tuesday, August 30, 2016 at 9:08 AM
> To: Shannon Carey <sca...@expedia.com>, "user@flink.apache.org" <
> user@flink.apache.org>
>
> Subject: Re: Firing windows multiple times
>
> Hi,
> I think this can be neatly expressed by using something like a tree of
> windowed aggregations, i.e. you specify your smallest window computation
> first and then specify larger window computations based smaller windows.
> I've written an example that showcases this approach:
> https://gist.github.com/aljoscha/728ac69361f75c3ca87053b1a6f91fcd
>
> The basic idea in pseudo code is this:
>
> DataStream input = ...
> dailyAggregate = input.keyBy(...).window(Time.days(1)).reduce(new Sum())
> weeklyAggregate =
> dailyAggregate.keyBy(...).window(Time.days(7)).reduce(new Sum())
> monthlyAggregate = weeklyAggregate(...).window(Time.days(30)).reduce(new
> Sum())
>
> the benefit of this approach is that you don't duplicate computation and
> that you can have incremental aggregation using a reduce function. When
> manually keeping elements and evicting them based on time the amount of
> state that would have to be kept would be much larger.
>
> Does that make sense and would it help your use case?
>
> Cheers,
> Aljoscha
>
> On Mon, 29 Aug 2016 at 23:18 Shannon Carey <sca...@expedia.com> wrote:
>
>> Yes, let me describe an example use-case that I'm trying to implement
>> efficiently within Flink.
>>
&g

Re: Firing windows multiple times

2016-08-30 Thread Shannon Carey
I appreciate your suggestion!

However, the main problem with your approach is the amount of time that goes by 
without an updated value from minuteAggregate and hourlyAggregate (lack of a 
continuously updated aggregate).

For example, if we use a tumbling window of 1 month duration, then we only get 
an update for that value once a month! The values from that stream will be on 
average 0.5 months stale. A year-long window is even worse.

-Shannon

From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>>
Date: Tuesday, August 30, 2016 at 9:08 AM
To: Shannon Carey <sca...@expedia.com<mailto:sca...@expedia.com>>, 
"user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Firing windows multiple times

Hi,
I think this can be neatly expressed by using something like a tree of windowed 
aggregations, i.e. you specify your smallest window computation first and then 
specify larger window computations based smaller windows. I've written an 
example that showcases this approach: 
https://gist.github.com/aljoscha/728ac69361f75c3ca87053b1a6f91fcd

The basic idea in pseudo code is this:

DataStream input = ...
dailyAggregate = input.keyBy(...).window(Time.days(1)).reduce(new Sum())
weeklyAggregate = dailyAggregate.keyBy(...).window(Time.days(7)).reduce(new 
Sum())
monthlyAggregate = weeklyAggregate(...).window(Time.days(30)).reduce(new Sum())

the benefit of this approach is that you don't duplicate computation and that 
you can have incremental aggregation using a reduce function. When manually 
keeping elements and evicting them based on time the amount of state that would 
have to be kept would be much larger.

Does that make sense and would it help your use case?

Cheers,
Aljoscha

On Mon, 29 Aug 2016 at 23:18 Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>> wrote:
Yes, let me describe an example use-case that I'm trying to implement 
efficiently within Flink.

We've been asked to aggregate per-user data on a daily level, and from there 
produce aggregates on a variety of time frames. For example, 7 days, 30 days, 
180 days, and 365 days.

We can talk about the hardest one, the 365 day window, with the knowledge that 
adding the other time windows magnifies the problem.

I can easily use tumbling time windows of 1-day size for the first aggregation. 
However, for the longer aggregation, if I take the naive approach and use a 
sliding window, the window size would be 365 days and the slide would be one 
day. If a user comes back every day, I run the risk of magnifying the size of 
the data by up to 365 because each day of data will be included in up to 365 
year-long window panes. Also, if I want to fire the aggregate information more 
rapidly than once a day, then I have to worry about getting 365 different 
windows fired at the same time & trying to figure out which one to pay 
attention to, or coming up with a hare-brained custom firing trigger. We tried 
emitting each day-aggregate into a time series database and doing the final 365 
day aggregation as a query, but that was more complicated than we wanted: in 
particular we'd like to have all the logic in the Flink job not split across 
different technology & infrastructure.

The work-around I'm thinking of is to use a single window that contains 365 
days of data (relative to the current watermark) on an ongoing basis. The 
windowing function would be responsible for evicting old data based on the 
current watermark.

Does that make sense? Does it seem logical, or am I misunderstanding something 
about how Flink works?

-Shannon


From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>>
Date: Monday, August 29, 2016 at 3:56 AM

To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Firing windows multiple times

Hi,
that would certainly be possible? What do you think can be gained by having 
knowledge about the current watermark in the WindowFunction, in a specific 
case, possibly?

Cheers,
Aljoscha

On Wed, 24 Aug 2016 at 23:21 Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>> wrote:
What do you think about adding the current watermark to the window function 
metadata in FLIP-2?

From: Shannon Carey <sca...@expedia.com<mailto:sca...@expedia.com>>
Date: Friday, August 12, 2016 at 6:24 PM
To: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>>, 
"user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>

Subject: Re: Firing windows multiple times

Thanks Aljoscha, I didn't know about those. Yes, they look like handy changes, 
especially to enable flexible approaches for eviction. In particular, having 
the current watermark ava

Re: Firing windows multiple times

2016-08-24 Thread Shannon Carey
What do you think about adding the current watermark to the window function 
metadata in FLIP-2?

From: Shannon Carey <sca...@expedia.com<mailto:sca...@expedia.com>>
Date: Friday, August 12, 2016 at 6:24 PM
To: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>>, 
"user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Firing windows multiple times

Thanks Aljoscha, I didn't know about those. Yes, they look like handy changes, 
especially to enable flexible approaches for eviction. In particular, having 
the current watermark available to the evictor via EvictorContext is helpful: 
it will be able to evict the old data more easily without needing to rely on 
Window#maxTimestamp().

However, I think you might still be missing a piece. Specifically, it would 
still not be possible for the window function to choose which items to 
aggregate based on the current watermark. In particular, it is desirable to be 
able to aggregate only the items below the watermark, omitting items which have 
come in with timestamps larger than the watermark. Does that make sense?

-Shannon

From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>>
Date: Friday, August 12, 2016 at 4:25 AM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Firing windows multiple times

Hi,
there is already this FLIP: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor
 which also links to a mailing list discussion. And this FLIP: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata.
 The former proposes to enhance the Evictor API a bit, among other things we 
propose to give the evictor access to the current watermark. The other FLIP 
proposes to extend the amount of meta-data we give to the window function. The 
first to things we propose to add is a "firing reason" that would tell you 
whether this was an early firing, an on time firing or a late firing. The 
second thing is a firing counter that would tell you how many times the trigger 
has fired so far for the current window.

Would a combination of these help with your use case?

Cheers,
Aljoscha

On Thu, 11 Aug 2016 at 19:19 Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>> wrote:
"If Window B is a Folding Window and does not have an evictor then it should 
not keep the list of all received elements."

Agreed! Upon closer inspection, the behavior I'm describing is only present 
when using EvictingWindowOperator, not when using WindowOperator. I misread 
line 382 of WindowOperator which calls windowState.add(): in actuality, the 
windowState is a FoldingState which incorporates the user-provided fold 
function in order to eagerly fold the data. In contrast, if you use an evictor, 
EvictingWindowOperator has the behavior I describe.

I am already using a custom Trigger which uses a processing timer to FIRE a 
short time after a new event comes in, and an event timer to FIRE_AND_PURGE.

It seems that I can achieve the desired effect by avoiding use of an evictor so 
that the intermediate events are not retained in an EvictingWindowOperator's 
state, and perform any necessary eviction within my fold function. This has the 
aforementioned drawbacks of the windowed fold function not knowing about 
watermarks, and therefore it is difficult to be precise about choosing which 
items to evict. However, this seems to be the best choice within the current 
framework.

Interestingly, it appears that TimeEvictor doesn't really know about watermarks 
either. When a window emits an event, regardless of how it was fired, it is 
assigned the timestamp given by its window's maxTimestamp(), which might be 
much greater than the processing time that actually fired the event. Then, 
TimeEvictor compares the max timestamp of all items in the window against the 
other ones in order to determine which ones to evict. Basically, it assumes 
that the events were emitted due to the window terminating with FIRE_AND_PURGE. 
What if we gave more information (specifically, the current watermark) to the 
evictor in order to allow it to deal with a mix of intermediate events (fired 
by processing time) and final events (fired by event time when the watermark 
reaches the window)? That value is already available in the WindowOperator & 
could be passed to the Evictor very easily. It would be an API change, of 
course.

Other than that, is it worth considering a change to EvictingWindowOperator to 
allow user-supplied functions to reduce the size of its state when people fire 
upstream windows repeatedly? From what I see when I monitor the state with 
debugger print statements, the EvictingWindowOperator is definitely holding on 
to all the el

Re: Firing windows multiple times

2016-08-12 Thread Shannon Carey
Thanks Aljoscha, I didn't know about those. Yes, they look like handy changes, 
especially to enable flexible approaches for eviction. In particular, having 
the current watermark available to the evictor via EvictorContext is helpful: 
it will be able to evict the old data more easily without needing to rely on 
Window#maxTimestamp().

However, I think you might still be missing a piece. Specifically, it would 
still not be possible for the window function to choose which items to 
aggregate based on the current watermark. In particular, it is desirable to be 
able to aggregate only the items below the watermark, omitting items which have 
come in with timestamps larger than the watermark. Does that make sense?

-Shannon

From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>>
Date: Friday, August 12, 2016 at 4:25 AM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Firing windows multiple times

Hi,
there is already this FLIP: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor
 which also links to a mailing list discussion. And this FLIP: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata.
 The former proposes to enhance the Evictor API a bit, among other things we 
propose to give the evictor access to the current watermark. The other FLIP 
proposes to extend the amount of meta-data we give to the window function. The 
first to things we propose to add is a "firing reason" that would tell you 
whether this was an early firing, an on time firing or a late firing. The 
second thing is a firing counter that would tell you how many times the trigger 
has fired so far for the current window.

Would a combination of these help with your use case?

Cheers,
Aljoscha

On Thu, 11 Aug 2016 at 19:19 Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>> wrote:
"If Window B is a Folding Window and does not have an evictor then it should 
not keep the list of all received elements."

Agreed! Upon closer inspection, the behavior I'm describing is only present 
when using EvictingWindowOperator, not when using WindowOperator. I misread 
line 382 of WindowOperator which calls windowState.add(): in actuality, the 
windowState is a FoldingState which incorporates the user-provided fold 
function in order to eagerly fold the data. In contrast, if you use an evictor, 
EvictingWindowOperator has the behavior I describe.

I am already using a custom Trigger which uses a processing timer to FIRE a 
short time after a new event comes in, and an event timer to FIRE_AND_PURGE.

It seems that I can achieve the desired effect by avoiding use of an evictor so 
that the intermediate events are not retained in an EvictingWindowOperator's 
state, and perform any necessary eviction within my fold function. This has the 
aforementioned drawbacks of the windowed fold function not knowing about 
watermarks, and therefore it is difficult to be precise about choosing which 
items to evict. However, this seems to be the best choice within the current 
framework.

Interestingly, it appears that TimeEvictor doesn't really know about watermarks 
either. When a window emits an event, regardless of how it was fired, it is 
assigned the timestamp given by its window's maxTimestamp(), which might be 
much greater than the processing time that actually fired the event. Then, 
TimeEvictor compares the max timestamp of all items in the window against the 
other ones in order to determine which ones to evict. Basically, it assumes 
that the events were emitted due to the window terminating with FIRE_AND_PURGE. 
What if we gave more information (specifically, the current watermark) to the 
evictor in order to allow it to deal with a mix of intermediate events (fired 
by processing time) and final events (fired by event time when the watermark 
reaches the window)? That value is already available in the WindowOperator & 
could be passed to the Evictor very easily. It would be an API change, of 
course.

Other than that, is it worth considering a change to EvictingWindowOperator to 
allow user-supplied functions to reduce the size of its state when people fire 
upstream windows repeatedly? From what I see when I monitor the state with 
debugger print statements, the EvictingWindowOperator is definitely holding on 
to all the elements ever received, not just the aggregated result. You can see 
this clearly because EvictingWindowOperator holds a ListState instead of a 
FoldingState. The user-provided fold function is only applied upon fire().

-Shannon




Re: Firing windows multiple times

2016-08-12 Thread Aljoscha Krettek
Hi,
there is already this FLIP:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-4+%3A+Enhance+Window+Evictor
which
also links to a mailing list discussion. And this FLIP:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-2+Extending+Window+Function+Metadata.
The former proposes to enhance the Evictor API a bit, among other things we
propose to give the evictor access to the current watermark. The other FLIP
proposes to extend the amount of meta-data we give to the window function.
The first to things we propose to add is a "firing reason" that would tell
you whether this was an early firing, an on time firing or a late firing.
The second thing is a firing counter that would tell you how many times the
trigger has fired so far for the current window.

Would a combination of these help with your use case?

Cheers,
Aljoscha

On Thu, 11 Aug 2016 at 19:19 Shannon Carey  wrote:

> "If Window B is a Folding Window and does not have an evictor then it
> should not keep the list of all received elements."
>
> Agreed! Upon closer inspection, the behavior I'm describing is only
> present when using EvictingWindowOperator, not when using WindowOperator. I
> misread line 382 of WindowOperator which calls windowState.add(): in
> actuality, the windowState is a FoldingState which incorporates the
> user-provided fold function in order to eagerly fold the data. In contrast,
> if you use an evictor, EvictingWindowOperator has the behavior I describe.
>
> I am already using a custom Trigger which uses a processing timer to FIRE
> a short time after a new event comes in, and an event timer to
> FIRE_AND_PURGE.
>
> It seems that I can achieve the desired effect by avoiding use of an
> evictor so that the intermediate events are not retained in an
> EvictingWindowOperator's state, and perform any necessary eviction within
> my fold function. This has the aforementioned drawbacks of the windowed
> fold function not knowing about watermarks, and therefore it is difficult
> to be precise about choosing which items to evict. However, this seems to
> be the best choice within the current framework.
>
> Interestingly, it appears that TimeEvictor doesn't really know about
> watermarks either. When a window emits an event, regardless of how it was
> fired, it is assigned the timestamp given by its window's maxTimestamp(),
> which might be much greater than the processing time that actually fired
> the event. Then, TimeEvictor compares the max timestamp of all items in the
> window against the other ones in order to determine which ones to evict.
> Basically, it assumes that the events were emitted due to the window
> terminating with FIRE_AND_PURGE. What if we gave more information
> (specifically, the current watermark) to the evictor in order to allow it
> to deal with a mix of intermediate events (fired by processing time) and
> final events (fired by event time when the watermark reaches the window)?
> That value is already available in the WindowOperator & could be passed to
> the Evictor very easily. It would be an API change, of course.
>
> Other than that, is it worth considering a change to
> EvictingWindowOperator to allow user-supplied functions to reduce the size
> of its state when people fire upstream windows repeatedly? From what I see
> when I monitor the state with debugger print statements, the
> EvictingWindowOperator is definitely holding on to all the elements ever
> received, not just the aggregated result. You can see this clearly because
> EvictingWindowOperator holds a ListState instead of a FoldingState. The
> user-provided fold function is only applied upon fire().
>
> -Shannon
>
>
>


Re: Firing windows multiple times

2016-08-11 Thread Shannon Carey
"If Window B is a Folding Window and does not have an evictor then it should 
not keep the list of all received elements."

Agreed! Upon closer inspection, the behavior I'm describing is only present 
when using EvictingWindowOperator, not when using WindowOperator. I misread 
line 382 of WindowOperator which calls windowState.add(): in actuality, the 
windowState is a FoldingState which incorporates the user-provided fold 
function in order to eagerly fold the data. In contrast, if you use an evictor, 
EvictingWindowOperator has the behavior I describe.

I am already using a custom Trigger which uses a processing timer to FIRE a 
short time after a new event comes in, and an event timer to FIRE_AND_PURGE.

It seems that I can achieve the desired effect by avoiding use of an evictor so 
that the intermediate events are not retained in an EvictingWindowOperator's 
state, and perform any necessary eviction within my fold function. This has the 
aforementioned drawbacks of the windowed fold function not knowing about 
watermarks, and therefore it is difficult to be precise about choosing which 
items to evict. However, this seems to be the best choice within the current 
framework.

Interestingly, it appears that TimeEvictor doesn't really know about watermarks 
either. When a window emits an event, regardless of how it was fired, it is 
assigned the timestamp given by its window's maxTimestamp(), which might be 
much greater than the processing time that actually fired the event. Then, 
TimeEvictor compares the max timestamp of all items in the window against the 
other ones in order to determine which ones to evict. Basically, it assumes 
that the events were emitted due to the window terminating with FIRE_AND_PURGE. 
What if we gave more information (specifically, the current watermark) to the 
evictor in order to allow it to deal with a mix of intermediate events (fired 
by processing time) and final events (fired by event time when the watermark 
reaches the window)? That value is already available in the WindowOperator & 
could be passed to the Evictor very easily. It would be an API change, of 
course.

Other than that, is it worth considering a change to EvictingWindowOperator to 
allow user-supplied functions to reduce the size of its state when people fire 
upstream windows repeatedly? From what I see when I monitor the state with 
debugger print statements, the EvictingWindowOperator is definitely holding on 
to all the elements ever received, not just the aggregated result. You can see 
this clearly because EvictingWindowOperator holds a ListState instead of a 
FoldingState. The user-provided fold function is only applied upon fire().

-Shannon




Re: Firing windows multiple times

2016-08-11 Thread Aljoscha Krettek
, but internal Window state looks like *[x(time=1,
>count=1), y(time=1, count=2)]*
>7. Watermark z
>8. Window A receives watermark, trigger's event timer is reached,
>fires and purges and emits current state as event z(time=1, count=2)
>9. Window B receives event, trigger waits for processing time delay,
>then executes fold() and emits event(time=1 => count=2), but internal
>Window state looks like *[x(time=1, count=1), y(time=1, count=2),
>    z(time=1, count=2)]*
>
> As you can see, the internal window state continues to grow despite what
> fold() is doing.
>
> Does that explanation help interpret my original email?
>
> -Shannon
>
>
> From: Aljoscha Krettek <aljos...@apache.org>
> Date: Wednesday, August 10, 2016 at 12:18 PM
> To: "user@flink.apache.org" <user@flink.apache.org>
> Subject: Re: Firing windows multiple times
>
> Hi,
> from your mail I'm gathering that you are in fact using an Evictor, is
> that correct? If not, then the window operator should not keep all the
> elements ever received for a window but only the aggregated result.
>
> Side note, there seems to be a bug in EvictingWindowOperator that causes
> evicted elements to not actually be removed from the state. They are only
> filtered from the Iterable that is given to the WindowFunction. I opened a
> Jira issue for that: https://issues.apache.org/jira/browse/FLINK-4369
>
> Cheers,
> Aljoscha
>
> On Wed, 10 Aug 2016 at 18:19 Shannon Carey <sca...@expedia.com> wrote:
>
>> One unfortunate aspect of using a fold() instead of a window is that the
>> fold function has no knowledge of the watermarks. As a result, it is
>> difficult to ensure that only items before the current watermark are
>> included in the aggregation, and that old items are evicted correctly. This
>> fact lends more support to the idea of using a custom operator (though that
>> is more complex) or adding support for this use case within Flink.
>>
>> -Shannon
>>
>
>
>


Re: Firing windows multiple times

2016-08-11 Thread Kostas Kloudas
Just to add a drawback in solution 2) you may have some issues because window 
boundaries may not 
be aligned. For example the elements of a day window may be split between the 
last day of a month 
and the first of the next month.

Kostas

> On Aug 11, 2016, at 2:21 PM, Kostas Kloudas <k.klou...@data-artisans.com> 
> wrote:
> 
> Hi Shanon,
> 
> From what I understand, you want to have your results windowed by different 
> different durations, e.g. by minute, by day,
> by month and you use the evictor to  decide which elements should go into 
> each window. If I am correct, then I do not 
> think that you need the evictor which bounds you to keep all the elements 
> that the operator has seen (because it uses a listState).
> 
> In this case you can do one of the following:
> 
> 1) if you just want to have the big window (by month) and all the smaller 
> ones to appear as early firings of the big one, then I would 
> suggest you to go with a custom trigger. The trigger has access to 
> watermarks, can register both event and processing time timers (so you can 
> have firings whenever you want (per minute, per day, etc), can have state 
> (e.g.element counter), and can decide to FIRE or FIRE_AND_PURGE.
> 
> The only downside is that all intermediate firings will appear to belong to 
> the big window. This means that the beginning and the end o the by-minute and 
> daily firings will be those of the month that they belong to. If this is not 
> a problem, I would go for that.
> 
> 2) If the above is a problem, then what you can do, is key your input stream 
> and then have 3 different windowing strategies, e.g. by minute, by day and by 
> month. This way you will have also the desired window boundaries. This would 
> look like:
> 
> keyedStream.timeWindow(byMonth).addSink …
> keyedStream.timeWindow(byDay).addSink …
> keyedStream.timeWindow(byMinute).addSink …
> 
> Please let us know if this answers your question and if you need any more 
> help.
> 
> Kostas
>  
>> On Aug 10, 2016, at 10:15 PM, Shannon Carey <sca...@expedia.com 
>> <mailto:sca...@expedia.com>> wrote:
>> 
>> Hi Aljoscha,
>> 
>> Yes, I am using an Evictor, and I think I have seen the problem you are 
>> referring to. However, that's not what I'm talking about.
>> 
>> If you re-read my first email, the main point is the following: if users 
>> desire updates more frequently than window watermarks are reached, then 
>> window state behaves suboptimally. It doesn't matter if there's an evictor 
>> or not. Specifically:
>> 
>> If I have a windows "A" that I fire multiple times in order to provide 
>> incremental results as data comes in instead of waiting for the watermark to 
>> purge the window
>> And that window's events are gathered into another, bigger window "B"
>> And I want to keep only the latest event from each upstream window "A" (by 
>> timestamp, where each window pane has its own timestamp)
>> Even if I have a fold/reduce method on the bigger window "B" to make sure 
>> that each updated event from "A" overwrites the previous event (by timestamp)
>> Window "B" will hold in state all events from windows "A", including all the 
>> incremental events that were fired by processing-time triggers, even though 
>> I don't actually need those events because the reducer gets rid of them
>> 
>> An example description of execution flow:
>> Event x
>> Window A receives event, trigger waits for processing time delay, then emits 
>> event x(time=1, count=1)
>> Window B receives event, trigger waits for processing time delay, then 
>> executes fold() and emits event(time=1 => count=1), but internal Window 
>> state looks like [x(time=1, count=1)]
>> Event y
>> Window A receives event, trigger '', then emits event y(time=1, count=2)
>> Window B receives event, trigger '', then executes fold() and emits 
>> event(time=1 => count=2), but internal Window state looks like [x(time=1, 
>> count=1), y(time=1, count=2)]
>> Watermark z
>> Window A receives watermark, trigger's event timer is reached, fires and 
>> purges and emits current state as event z(time=1, count=2)
>> Window B receives event, trigger waits for processing time delay, then 
>> executes fold() and emits event(time=1 => count=2), but internal Window 
>> state looks like [x(time=1, count=1), y(time=1, count=2), z(time=1, count=2)]
>> As you can see, the internal window state continues to grow despite what 
>> fold() is doing.
>> 
>> Does that explanation help interpret my original email?
>

Re: Firing windows multiple times

2016-08-11 Thread Kostas Kloudas
Hi Shanon,

From what I understand, you want to have your results windowed by different 
different durations, e.g. by minute, by day,
by month and you use the evictor to  decide which elements should go into each 
window. If I am correct, then I do not 
think that you need the evictor which bounds you to keep all the elements that 
the operator has seen (because it uses a listState).

In this case you can do one of the following:

1) if you just want to have the big window (by month) and all the smaller ones 
to appear as early firings of the big one, then I would 
suggest you to go with a custom trigger. The trigger has access to watermarks, 
can register both event and processing time timers (so you can have firings 
whenever you want (per minute, per day, etc), can have state (e.g.element 
counter), and can decide to FIRE or FIRE_AND_PURGE.

The only downside is that all intermediate firings will appear to belong to the 
big window. This means that the beginning and the end o the by-minute and daily 
firings will be those of the month that they belong to. If this is not a 
problem, I would go for that.

2) If the above is a problem, then what you can do, is key your input stream 
and then have 3 different windowing strategies, e.g. by minute, by day and by 
month. This way you will have also the desired window boundaries. This would 
look like:

keyedStream.timeWindow(byMonth).addSink …
keyedStream.timeWindow(byDay).addSink …
keyedStream.timeWindow(byMinute).addSink …

Please let us know if this answers your question and if you need any more help.

Kostas
 
> On Aug 10, 2016, at 10:15 PM, Shannon Carey <sca...@expedia.com> wrote:
> 
> Hi Aljoscha,
> 
> Yes, I am using an Evictor, and I think I have seen the problem you are 
> referring to. However, that's not what I'm talking about.
> 
> If you re-read my first email, the main point is the following: if users 
> desire updates more frequently than window watermarks are reached, then 
> window state behaves suboptimally. It doesn't matter if there's an evictor or 
> not. Specifically:
> 
> If I have a windows "A" that I fire multiple times in order to provide 
> incremental results as data comes in instead of waiting for the watermark to 
> purge the window
> And that window's events are gathered into another, bigger window "B"
> And I want to keep only the latest event from each upstream window "A" (by 
> timestamp, where each window pane has its own timestamp)
> Even if I have a fold/reduce method on the bigger window "B" to make sure 
> that each updated event from "A" overwrites the previous event (by timestamp)
> Window "B" will hold in state all events from windows "A", including all the 
> incremental events that were fired by processing-time triggers, even though I 
> don't actually need those events because the reducer gets rid of them
> 
> An example description of execution flow:
> Event x
> Window A receives event, trigger waits for processing time delay, then emits 
> event x(time=1, count=1)
> Window B receives event, trigger waits for processing time delay, then 
> executes fold() and emits event(time=1 => count=1), but internal Window state 
> looks like [x(time=1, count=1)]
> Event y
> Window A receives event, trigger '', then emits event y(time=1, count=2)
> Window B receives event, trigger '', then executes fold() and emits 
> event(time=1 => count=2), but internal Window state looks like [x(time=1, 
> count=1), y(time=1, count=2)]
> Watermark z
> Window A receives watermark, trigger's event timer is reached, fires and 
> purges and emits current state as event z(time=1, count=2)
> Window B receives event, trigger waits for processing time delay, then 
> executes fold() and emits event(time=1 => count=2), but internal Window state 
> looks like [x(time=1, count=1), y(time=1, count=2), z(time=1, count=2)]
> As you can see, the internal window state continues to grow despite what 
> fold() is doing.
> 
> Does that explanation help interpret my original email?
> 
> -Shannon
> 
> 
> From: Aljoscha Krettek <aljos...@apache.org <mailto:aljos...@apache.org>>
> Date: Wednesday, August 10, 2016 at 12:18 PM
> To: "user@flink.apache.org <mailto:user@flink.apache.org>" 
> <user@flink.apache.org <mailto:user@flink.apache.org>>
> Subject: Re: Firing windows multiple times
> 
> Hi,
> from your mail I'm gathering that you are in fact using an Evictor, is that 
> correct? If not, then the window operator should not keep all the elements 
> ever received for a window but only the aggregated result.
> 
> Side note, there seems to be a bug in EvictingWindowOperator that causes 
> evicted elements to not actually be removed from the sta

Re: Firing windows multiple times

2016-08-10 Thread Shannon Carey
Hi Aljoscha,

Yes, I am using an Evictor, and I think I have seen the problem you are 
referring to. However, that's not what I'm talking about.

If you re-read my first email, the main point is the following: if users desire 
updates more frequently than window watermarks are reached, then window state 
behaves suboptimally. It doesn't matter if there's an evictor or not. 
Specifically:

If I have a windows "A" that I fire multiple times in order to provide 
incremental results as data comes in instead of waiting for the watermark to 
purge the window
And that window's events are gathered into another, bigger window "B"
And I want to keep only the latest event from each upstream window "A" (by 
timestamp, where each window pane has its own timestamp)
Even if I have a fold/reduce method on the bigger window "B" to make sure that 
each updated event from "A" overwrites the previous event (by timestamp)
Window "B" will hold in state all events from windows "A", including all the 
incremental events that were fired by processing-time triggers, even though I 
don't actually need those events because the reducer gets rid of them

An example description of execution flow:

  1.  Event x
  2.  Window A receives event, trigger waits for processing time delay, then 
emits event x(time=1, count=1)
  3.  Window B receives event, trigger waits for processing time delay, then 
executes fold() and emits event(time=1 => count=1), but internal Window state 
looks like [x(time=1, count=1)]
  4.  Event y
  5.  Window A receives event, trigger '', then emits event y(time=1, count=2)
  6.  Window B receives event, trigger '', then executes fold() and emits 
event(time=1 => count=2), but internal Window state looks like [x(time=1, 
count=1), y(time=1, count=2)]
  7.  Watermark z
  8.  Window A receives watermark, trigger's event timer is reached, fires and 
purges and emits current state as event z(time=1, count=2)
  9.  Window B receives event, trigger waits for processing time delay, then 
executes fold() and emits event(time=1 => count=2), but internal Window state 
looks like [x(time=1, count=1), y(time=1, count=2), z(time=1, count=2)]

As you can see, the internal window state continues to grow despite what fold() 
is doing.

Does that explanation help interpret my original email?

-Shannon


From: Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>>
Date: Wednesday, August 10, 2016 at 12:18 PM
To: "user@flink.apache.org<mailto:user@flink.apache.org>" 
<user@flink.apache.org<mailto:user@flink.apache.org>>
Subject: Re: Firing windows multiple times

Hi,
from your mail I'm gathering that you are in fact using an Evictor, is that 
correct? If not, then the window operator should not keep all the elements ever 
received for a window but only the aggregated result.

Side note, there seems to be a bug in EvictingWindowOperator that causes 
evicted elements to not actually be removed from the state. They are only 
filtered from the Iterable that is given to the WindowFunction. I opened a Jira 
issue for that: https://issues.apache.org/jira/browse/FLINK-4369

Cheers,
Aljoscha

On Wed, 10 Aug 2016 at 18:19 Shannon Carey 
<sca...@expedia.com<mailto:sca...@expedia.com>> wrote:
One unfortunate aspect of using a fold() instead of a window is that the fold 
function has no knowledge of the watermarks. As a result, it is difficult to 
ensure that only items before the current watermark are included in the 
aggregation, and that old items are evicted correctly. This fact lends more 
support to the idea of using a custom operator (though that is more complex) or 
adding support for this use case within Flink.

-Shannon


Re: Firing windows multiple times

2016-08-10 Thread Vishnu Viswanath
Hi Aljoscha,

This looks like the bug that we discussed, as part of Enhance window
evictor JIRA

Thanks,
Vishnu

On Wed, Aug 10, 2016 at 1:18 PM, Aljoscha Krettek 
wrote:

> Hi,
> from your mail I'm gathering that you are in fact using an Evictor, is
> that correct? If not, then the window operator should not keep all the
> elements ever received for a window but only the aggregated result.
>
> Side note, there seems to be a bug in EvictingWindowOperator that causes
> evicted elements to not actually be removed from the state. They are only
> filtered from the Iterable that is given to the WindowFunction. I opened a
> Jira issue for that: https://issues.apache.org/jira/browse/FLINK-4369
>
> Cheers,
> Aljoscha
>
> On Wed, 10 Aug 2016 at 18:19 Shannon Carey  wrote:
>
>> One unfortunate aspect of using a fold() instead of a window is that the
>> fold function has no knowledge of the watermarks. As a result, it is
>> difficult to ensure that only items before the current watermark are
>> included in the aggregation, and that old items are evicted correctly. This
>> fact lends more support to the idea of using a custom operator (though that
>> is more complex) or adding support for this use case within Flink.
>>
>> -Shannon
>>
>


Re: Firing windows multiple times

2016-08-10 Thread Aljoscha Krettek
Hi,
from your mail I'm gathering that you are in fact using an Evictor, is that
correct? If not, then the window operator should not keep all the elements
ever received for a window but only the aggregated result.

Side note, there seems to be a bug in EvictingWindowOperator that causes
evicted elements to not actually be removed from the state. They are only
filtered from the Iterable that is given to the WindowFunction. I opened a
Jira issue for that: https://issues.apache.org/jira/browse/FLINK-4369

Cheers,
Aljoscha

On Wed, 10 Aug 2016 at 18:19 Shannon Carey  wrote:

> One unfortunate aspect of using a fold() instead of a window is that the
> fold function has no knowledge of the watermarks. As a result, it is
> difficult to ensure that only items before the current watermark are
> included in the aggregation, and that old items are evicted correctly. This
> fact lends more support to the idea of using a custom operator (though that
> is more complex) or adding support for this use case within Flink.
>
> -Shannon
>


Re: Firing windows multiple times

2016-08-10 Thread Shannon Carey
One unfortunate aspect of using a fold() instead of a window is that the fold 
function has no knowledge of the watermarks. As a result, it is difficult to 
ensure that only items before the current watermark are included in the 
aggregation, and that old items are evicted correctly. This fact lends more 
support to the idea of using a custom operator (though that is more complex) or 
adding support for this use case within Flink.

-Shannon


Firing windows multiple times

2016-08-10 Thread Shannon Carey
I recently noticed something about windows: they retain (in state) every 
element that they receive regardless of whether the user provides a fold/reduce 
function. I can tell that such an approach is necessary in order for evictors 
to work, but I'm not sure if there are other reasons.

I'll describe a use case where this approach is not optimal, and then maybe we 
can discuss ways to get around it or possible modifications to Flink. My jobs 
include windows that are wider than the frequency at which we want updates. For 
example, I might have a window that is one day long, but I might want an 
updated value to be emitted from that window within (say) one processing-time 
minute of a new event being assigned to it. I can accomplish that with a 
trigger that has processing-time delay FIRE as well as event-time 
FIRE_AND_PURGE. Next, I want to gather those items into a bigger window: 
perhaps a month or a year wide. My fold function can ensure that multiple 
events from an upstream window overwrite each other so that they are not 
counted multiple times. However, as I mentioned, the wide window's state will 
hold all the events: all the processing-time fires as well as the final event 
from the upstream FIRE_AND_PURGE. That will make the state bigger than it needs 
to be.

With regard to solutions within the bounds of the existing framework, I am 
considering using a regular fold() operation instead of a long window. The fold 
function would be responsible for performing the eviction that the window was 
previously responsible for. I could implement that as a RichFoldFunction with a 
ReducingState. The main difference is that there would be no triggering 
involved (incoming items would immediately result in reduce() emitting a new 
aggregate). I could also possibly implement my own operator. Are there 
other/better options I have not considered?

Is it desirable to improve support for this use case within Flink? I can 
imagine that other people may want to get incremental/ongoing results from 
their windows as data comes in instead of waiting for the watermark to purge 
the window. In general, they might want better control over the window state. 
If so, what would the solution look like? Perhaps we could allow users to 
specify an additional method to the window operator which extracts the identity 
of any new event, and then Flink would ensure that new events overwrite 
existing events within the window state, preventing it from growing 
unnecessarily. Or, perhaps there is a way to do it based on the identity of the 
window that produces the event? Or, more generally, perhaps we could allow user 
provided fold/reduce functions to eagerly reduce the state of the window, 
although that might impact the evictor feature?

Thanks for your thoughts,
Shannon