Yes, this should work. On Tue, 10 May 2016 at 19:01 Srikanth <srikanth...@gmail.com> wrote:
> Yes, will work. > I was trying another route of having a "finalize & purge trigger" that will > i) onElement - Register for event time watermark but not alter nested > trigger's TriggerResult > ii) OnEventTime - Always purge after fire > > That will work with CountTrigger and other custom trigger too rt? > > public class FinalizePurgingTrigger <T, W extends Window> extends > Trigger<T, W> { > > @Override > public TriggerResult onElement(T element, long timestamp, W window, > TriggerContext ctx) throws Exception { > ctx.registerEventTimeTimer(window.getEnd) > return nestedTrigger.onElement(element, timestamp, window, ctx); > } > > @Override > public TriggerResult onEventTime(long time, W window, TriggerContext ctx) > throws Exception { > TriggerResult triggerResult = nestedTrigger.onEventTime(time, window, ctx); > switch (triggerResult) { > case FIRE: > return TriggerResult.FIRE_AND_PURGE; > case FIRE_AND_PURGE: > return TriggerResult.FIRE_AND_PURGE; > default: > return TriggerResult.CONTINUE; > } > } > } > > Srikanth > > On Tue, May 10, 2016 at 11:36 AM, Fabian Hueske <fhue...@gmail.com> wrote: > >> Maybe the last example of this blog post is helpful [1]. >> >> Best, Fabian >> >> [1] >> https://www.mapr.com/blog/essential-guide-streaming-first-processing-apache-flink >> >> 2016-05-10 17:24 GMT+02:00 Srikanth <srikanth...@gmail.com>: >> >>> Hi, >>> >>> I read the following in Flink doc "We can explicitly specify a Trigger >>> to overwrite the default Trigger provided by the WindowAssigner. Note that >>> specifying a triggers does not add an additional trigger condition but >>> replaces the current trigger." >>> So, I tested out the below code with count trigger. As per my >>> understanding this will override the default watermark based trigger. >>> >>> val testStream = env.fromCollection(List( ("2016-04-07 13:11:59", >>> 157428, 4), >>> ("2016-04-07 13:11:59", 157428, 4), >>> ("2016-04-07 13:11:59", 111283, 23), >>> ("2016-04-07 13:11:57", 108042, 23), >>> ("2016-04-07 13:12:00", 161374, 9), >>> ("2016-04-07 13:12:00", 161374, 9), >>> ("2016-04-07 13:11:59", 136505, 4) >>> ) >>> ) >>> .assignAscendingTimestamps(b => f.parse(b._1).getTime()) >>> .map(b => (b._3, b._2)) >>> >>> testStream.print >>> >>> val countStream = testStream >>> .keyBy(_._1) >>> .timeWindow(Time.seconds(20)) >>> .trigger(CountTrigger.of(3)) >>> .fold((0, List[Int]())) { case((k,r),i) => (i._1, r ++ List(i._2)) } >>> >>> countStream.print >>> >>> Output I saw confirms the documented behavior. Processing is triggered >>> only when we have 3 elements for a key. >>> How do I force trigger the left over records when watermark is past the >>> window? I.e, I want to use triggers to start early processing but finalize >>> the window based on watermark. >>> >>> Output shows that records for keys 23 & 9 weren't processed. >>> (4,157428) >>> (4,157428) >>> (23,111283) >>> (23,108042) >>> (9,161374) >>> (9,161374) >>> (4,136505) >>> >>> (4,List(157428, 157428, 136505)) >>> >>> Thanks, >>> Srikanth >>> >> >> >