Yes, will work.
I was trying another route of having a "finalize & purge trigger" that will
   i) onElement - Register for event time watermark but not alter nested
trigger's TriggerResult
  ii) OnEventTime - Always purge after fire

That will work with CountTrigger and other custom trigger too rt?

public class FinalizePurgingTrigger <T, W extends Window> extends
Trigger<T, W> {

@Override
public TriggerResult onElement(T element, long timestamp, W window,
TriggerContext ctx) throws Exception {
                ctx.registerEventTimeTimer(window.getEnd)
return nestedTrigger.onElement(element, timestamp, window, ctx);
}

@Override
public TriggerResult onEventTime(long time, W window, TriggerContext ctx)
throws Exception {
TriggerResult triggerResult = nestedTrigger.onEventTime(time, window, ctx);
switch (triggerResult) {
case FIRE:
return TriggerResult.FIRE_AND_PURGE;
case FIRE_AND_PURGE:
return TriggerResult.FIRE_AND_PURGE;
default:
return TriggerResult.CONTINUE;
}
}
}

Srikanth

On Tue, May 10, 2016 at 11:36 AM, Fabian Hueske <fhue...@gmail.com> wrote:

> Maybe the last example of this blog post is helpful [1].
>
> Best, Fabian
>
> [1]
> https://www.mapr.com/blog/essential-guide-streaming-first-processing-apache-flink
>
> 2016-05-10 17:24 GMT+02:00 Srikanth <srikanth...@gmail.com>:
>
>> Hi,
>>
>> I read the following in Flink doc "We can explicitly specify a Trigger to
>> overwrite the default Trigger provided by the WindowAssigner. Note that
>> specifying a triggers does not add an additional trigger condition but
>> replaces the current trigger."
>> So, I tested out the below code with count trigger. As per my
>> understanding this will override the default watermark based trigger.
>>
>> val testStream = env.fromCollection(List( ("2016-04-07 13:11:59", 157428,
>> 4),
>>  ("2016-04-07 13:11:59", 157428, 4),
>>  ("2016-04-07 13:11:59", 111283, 23),
>>  ("2016-04-07 13:11:57", 108042, 23),
>>  ("2016-04-07 13:12:00", 161374, 9),
>>  ("2016-04-07 13:12:00", 161374, 9),
>>  ("2016-04-07 13:11:59", 136505, 4)
>> )
>> )
>>    .assignAscendingTimestamps(b => f.parse(b._1).getTime())
>>            .map(b => (b._3, b._2))
>>
>> testStream.print
>>
>> val countStream = testStream
>> .keyBy(_._1)
>> .timeWindow(Time.seconds(20))
>> .trigger(CountTrigger.of(3))
>> .fold((0, List[Int]())) { case((k,r),i) => (i._1, r ++ List(i._2)) }
>>
>> countStream.print
>>
>> Output I saw confirms the documented behavior. Processing is triggered
>> only when we have 3 elements for a key.
>> How do I force trigger the left over records when watermark is past the
>> window? I.e, I want to use triggers to start early processing but finalize
>> the window based on watermark.
>>
>> Output shows that records for keys 23 & 9 weren't processed.
>>   (4,157428)
>>   (4,157428)
>>   (23,111283)
>>   (23,108042)
>>   (9,161374)
>>   (9,161374)
>>   (4,136505)
>>
>>   (4,List(157428, 157428, 136505))
>>
>> Thanks,
>> Srikanth
>>
>
>

Reply via email to