Thank you, that was helpful. I didn't appreciate that a Trigger is
fully in control of when to fire / purge regardless of the watermark.

Now I am wondering the best way to distinguish different instances of
the same time window with completely different data, vs. repeated
fires that include data used in previous fires. More specifically:

- If the data is not late, I will hold onto an aggregate maintained by
a FoldFunction, and fire the window periodically without purging. Each
of these fires should carry some key that is the same for every fire,
so that a database update downstream can overwrite the previous
partial aggregates.

- If the data is late, I will only hold onto the aggregate for some
period, and then fire and purge. But this late fire should carry a
different key, so that  the aggregates from the non-late data are not
overwritten. That way, I am able to deal with arbitrarily late data
statelessly without knowing what aggregates for the time window have
already been written.

It looks like I could do this by using RichWindowFunction in the Java
API, and saving the key using the RuntimeContext state API. However, I
can't seem to pass a RichWindowFunction to the Scala WindowedStream's
apply method. Is there any easy way around this?

I was also hoping that the FoldFunction passed to WindowedStream.apply
could be a RichFoldFunction, but that is specifically prohibited for
some reason.

Any hints on how to make a stateful WindowedStream.apply in Scala
would be much appreciated.

Thanks,
Mike

On Wed, Mar 2, 2016 at 2:11 AM, Kostas Kloudas
<k.klou...@data-artisans.com> wrote:
> Hello Mike,
>
> The code that Aljiosha mentioned is here:
>
> https://github.com/kl0u/flink-examples/blob/master/src/main/java/com/dataartisans/flinksolo/customTriggers/EventTimeTriggerWithEarlyAndLateFiring.java
>
> This allows you to specify a trigger like:
>
> EventTimeTriggerWithEarlyAndLateFiring trigger =
> EventTimeTriggerWithEarlyAndLateFiring.create()
> .withEarlyFiringEvery(Time.minutes(10))
> .withLateFiringEvery(Time.minutes(5))
> .withAllowedLateness(Time.minutes(20))
> .accumulating();
>
> The means that it will fire every 10 minutes (in processing time) until the
> end of the window (event time), and then
> every 5 minutes (processing time) for late elements up to 20 minutes late.
> In addition, previous elements are not discarded.
>
> Hope this helps,
> Kostas
>
> On Mar 2, 2016, at 11:02 AM, Aljoscha Krettek <aljos...@apache.org> wrote:
>
> Hi,
> I did some initial work on extending the EventTimeTrigger a bit to allow
> more complex behavior. Specifically, this allows setting an “allowed
> lateness” after which elements should no longer lead to windows being
> emitted. Also, it allows to specify to keep an emitted window in memory and
> when a late element arrives emit the whole window again.
>
> The code I have is here:
> https://github.com/aljoscha/flink/blob/window-late/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
>
> Kostas Kloudas worked on extending it, so maybe he could share his version
> of the trigger as well.
>
> Cheers,
> Aljoscha
>
> On 01 Mar 2016, at 18:35, Michael Radford <mub...@gmail.com> wrote:
>
> I'm evaluating Flink for a reporting application that will keep
> various aggregates updated in a database. It will be consuming from
> Kafka queues that are replicated from remote data centers, so in case
> there is a long outage in replication, I need to decide what to do
> about windowing and late data.
>
> If I use Flink's built-in windows and watermarks, any late data will
> be come in 1-element windows, which could overwhelm the database if a
> large batch of late data comes in and they are each mapped to
> individual database updates.
>
> As far as I can tell, I have two options:
>
> 1. Ignore late data, by marking it as late in an
> AssignerWithPunctuatedWatermarks function, and then discarding it in a
> flatMap operator. In this scenario, I would rely on a batch process to
> fill in the missing data later, in the lambda architecture style.
>
> 2. Implement my own watermark logic to allow full windows of late
> data. It seems like I could, for example, emit a "tick" message that
> is replicated to all partitions every n messages, and then a custom
> Trigger could decide when to purge each window based on the ticks and
> a timeout duration. The system would never emit a real Watermark.
>
> My questions are:
> - Am I mistaken about either of these, or are there any other options
> I'm not seeing for avoiding 1-element windows?
> - For option 2, are there any problems with not emitting actual
> watermarks, as long as the windows are eventually purged by a trigger?
>
> Thanks,
> Mike
>
>
>

Reply via email to