Thank you, that was helpful. I didn't appreciate that a Trigger is fully in control of when to fire / purge regardless of the watermark.
Now I am wondering the best way to distinguish different instances of the same time window with completely different data, vs. repeated fires that include data used in previous fires. More specifically: - If the data is not late, I will hold onto an aggregate maintained by a FoldFunction, and fire the window periodically without purging. Each of these fires should carry some key that is the same for every fire, so that a database update downstream can overwrite the previous partial aggregates. - If the data is late, I will only hold onto the aggregate for some period, and then fire and purge. But this late fire should carry a different key, so that the aggregates from the non-late data are not overwritten. That way, I am able to deal with arbitrarily late data statelessly without knowing what aggregates for the time window have already been written. It looks like I could do this by using RichWindowFunction in the Java API, and saving the key using the RuntimeContext state API. However, I can't seem to pass a RichWindowFunction to the Scala WindowedStream's apply method. Is there any easy way around this? I was also hoping that the FoldFunction passed to WindowedStream.apply could be a RichFoldFunction, but that is specifically prohibited for some reason. Any hints on how to make a stateful WindowedStream.apply in Scala would be much appreciated. Thanks, Mike On Wed, Mar 2, 2016 at 2:11 AM, Kostas Kloudas <k.klou...@data-artisans.com> wrote: > Hello Mike, > > The code that Aljiosha mentioned is here: > > https://github.com/kl0u/flink-examples/blob/master/src/main/java/com/dataartisans/flinksolo/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java > > This allows you to specify a trigger like: > > EventTimeTriggerWithEarlyAndLateFiring trigger = > EventTimeTriggerWithEarlyAndLateFiring.create() > .withEarlyFiringEvery(Time.minutes(10)) > .withLateFiringEvery(Time.minutes(5)) > .withAllowedLateness(Time.minutes(20)) > .accumulating(); > > The means that it will fire every 10 minutes (in processing time) until the > end of the window (event time), and then > every 5 minutes (processing time) for late elements up to 20 minutes late. > In addition, previous elements are not discarded. > > Hope this helps, > Kostas > > On Mar 2, 2016, at 11:02 AM, Aljoscha Krettek <aljos...@apache.org> wrote: > > Hi, > I did some initial work on extending the EventTimeTrigger a bit to allow > more complex behavior. Specifically, this allows setting an “allowed > lateness” after which elements should no longer lead to windows being > emitted. Also, it allows to specify to keep an emitted window in memory and > when a late element arrives emit the whole window again. > > The code I have is here: > https://github.com/aljoscha/flink/blob/window-late/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java > > Kostas Kloudas worked on extending it, so maybe he could share his version > of the trigger as well. > > Cheers, > Aljoscha > > On 01 Mar 2016, at 18:35, Michael Radford <mub...@gmail.com> wrote: > > I'm evaluating Flink for a reporting application that will keep > various aggregates updated in a database. It will be consuming from > Kafka queues that are replicated from remote data centers, so in case > there is a long outage in replication, I need to decide what to do > about windowing and late data. > > If I use Flink's built-in windows and watermarks, any late data will > be come in 1-element windows, which could overwhelm the database if a > large batch of late data comes in and they are each mapped to > individual database updates. > > As far as I can tell, I have two options: > > 1. Ignore late data, by marking it as late in an > AssignerWithPunctuatedWatermarks function, and then discarding it in a > flatMap operator. In this scenario, I would rely on a batch process to > fill in the missing data later, in the lambda architecture style. > > 2. Implement my own watermark logic to allow full windows of late > data. It seems like I could, for example, emit a "tick" message that > is replicated to all partitions every n messages, and then a custom > Trigger could decide when to purge each window based on the ticks and > a timeout duration. The system would never emit a real Watermark. > > My questions are: > - Am I mistaken about either of these, or are there any other options > I'm not seeing for avoiding 1-element windows? > - For option 2, are there any problems with not emitting actual > watermarks, as long as the windows are eventually purged by a trigger? > > Thanks, > Mike > > >