Yes, will work. I was trying another route of having a "finalize & purge trigger" that will i) onElement - Register for event time watermark but not alter nested trigger's TriggerResult ii) OnEventTime - Always purge after fire
That will work with CountTrigger and other custom trigger too rt? public class FinalizePurgingTrigger <T, W extends Window> extends Trigger<T, W> { @Override public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception { ctx.registerEventTimeTimer(window.getEnd) return nestedTrigger.onElement(element, timestamp, window, ctx); } @Override public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception { TriggerResult triggerResult = nestedTrigger.onEventTime(time, window, ctx); switch (triggerResult) { case FIRE: return TriggerResult.FIRE_AND_PURGE; case FIRE_AND_PURGE: return TriggerResult.FIRE_AND_PURGE; default: return TriggerResult.CONTINUE; } } } Srikanth On Tue, May 10, 2016 at 11:36 AM, Fabian Hueske <fhue...@gmail.com> wrote: > Maybe the last example of this blog post is helpful [1]. > > Best, Fabian > > [1] > https://www.mapr.com/blog/essential-guide-streaming-first-processing-apache-flink > > 2016-05-10 17:24 GMT+02:00 Srikanth <srikanth...@gmail.com>: > >> Hi, >> >> I read the following in Flink doc "We can explicitly specify a Trigger to >> overwrite the default Trigger provided by the WindowAssigner. Note that >> specifying a triggers does not add an additional trigger condition but >> replaces the current trigger." >> So, I tested out the below code with count trigger. As per my >> understanding this will override the default watermark based trigger. >> >> val testStream = env.fromCollection(List( ("2016-04-07 13:11:59", 157428, >> 4), >> ("2016-04-07 13:11:59", 157428, 4), >> ("2016-04-07 13:11:59", 111283, 23), >> ("2016-04-07 13:11:57", 108042, 23), >> ("2016-04-07 13:12:00", 161374, 9), >> ("2016-04-07 13:12:00", 161374, 9), >> ("2016-04-07 13:11:59", 136505, 4) >> ) >> ) >> .assignAscendingTimestamps(b => f.parse(b._1).getTime()) >> .map(b => (b._3, b._2)) >> >> testStream.print >> >> val countStream = testStream >> .keyBy(_._1) >> .timeWindow(Time.seconds(20)) >> .trigger(CountTrigger.of(3)) >> .fold((0, List[Int]())) { case((k,r),i) => (i._1, r ++ List(i._2)) } >> >> countStream.print >> >> Output I saw confirms the documented behavior. Processing is triggered >> only when we have 3 elements for a key. >> How do I force trigger the left over records when watermark is past the >> window? I.e, I want to use triggers to start early processing but finalize >> the window based on watermark. >> >> Output shows that records for keys 23 & 9 weren't processed. >> (4,157428) >> (4,157428) >> (23,111283) >> (23,108042) >> (9,161374) >> (9,161374) >> (4,136505) >> >> (4,List(157428, 157428, 136505)) >> >> Thanks, >> Srikanth >> > >