Hi Scott, Yes, the window trigger firing for every single late element.
If you only want the window to be triggered once, you can: - Remove the allowedLateness() - Use BoundedOutOfOrdernessTimestampExtractor to emit Watermarks that lag behind the element. The code(scala) looks like: > class TimestampExtractor[T1, T2] > extends BoundedOutOfOrdernessTimestampExtractor[(T1, T2, Timestamp)]( > Time.hours(3)) { > override def extractTimestamp(element: (T1, T2, Timestamp)): Long = { > element._3.getTime > } > } Pay attention to that this will increase the latency since only trigger firing for the last element. Best, Hequn On Sat, Oct 20, 2018 at 1:15 AM Scott Kidder <kidder.sc...@gmail.com> wrote: > I'm using event-time windows of 1 hour that have an allowed lateness of > several hours. This supports the processing of access logs that can be > delayed by several hours. The windows aggregate data over the 1 hour period > and write to a database sink. Pretty straightforward. > > Will the event-time trigger lead to the window trigger firing for every > single late element? Suppose thousands of late elements arrive > simultaneously; I'd like to avoid having that lead to thousands of database > updates in a short period of time. Ideally, I could batch up the late > window changes and have it trigger when the window is finally closed or > some processing-time duration passes (e.g. once per minute). > > For reference, here's what the aggregate window definition looks like with > Flink 1.5.3: > > chunkSource.keyBy(record -> record.getRecord().getEnvironmentId()) > .timeWindow(Time.hours(1)) > .allowedLateness(Time.hours(3)) > .aggregate(new EnvironmentAggregateWatchTimeFunction()) > .uid("env-watchtime-stats") > .name("Env Watch-Time Stats") > .addSink(new EnvironmentWatchTimeDBSink()); > > > Thank you, > > -- > Scott Kidder >