In this use case, couldn't the custom trigger register an event time timer for MAX_WATERMARK, which would be triggered when the bounded input reaches its end?
David On Mon, Jul 20, 2020 at 5:47 PM Piotr Nowojski <pnowoj...@apache.org> wrote: > Hi, > > I'm afraid that there is not out of the box way of doing this. I've > created a ticket [1] to write down and document a discussion that we had > about this issue in the past. > > The issue is that currently, untriggered processing time timers are > ignored on end of input and it seems like there might be no one single > perfect way to handle it for all of the cases, but it probably needs to be > customized. > > Maybe you could: > 1. extend `WindowOperator` (`MyWindowOperator`) > 2. implement > `org.apache.flink.streaming.api.operators.BoundedOneInput#endInput` in your > `MyWindowOperator` > 3. Inside `MyWindowOperator#endInput` invoke > `internalTimerService.forEachProcessingTimeTimer(...)` and: > a) manually trigger timers `WindowOperator#onProcessingTime` > b) delete manually triggered timer > > Piotrek > > [1] https://issues.apache.org/jira/browse/FLINK-18647 > > pt., 17 lip 2020 o 10:30 Flavio Pompermaier <pomperma...@okkam.it> > napisaĆ(a): > >> Hi to all, >> I was trying to port another job we have that use dataset API to >> datastream. >> The legacy program was doing basically a dataset.mapPartition().reduce() >> so I tried to replicate this thing with a >> >> final BasicTypeInfo<Double> columnType = BasicTypeInfo.DOUBLE_TYPE_INFO; >> final DataStream<Row> input = env.fromElements(// >> Row.of(1.0), // >> Row.of(2.0), // >> Row.of(3.0), // >> Row.of(5.0), // >> Row.of(6.0)).returns(new RowTypeInfo(columnType)); >> inputStream.map(new SubtaskIndexAssigner(columnType)) >> .keyBy(t -> t.f0) >> .window(GlobalWindows.create()) >> >> .trigger(PurgingTrigger.of(CountWithTimeoutTriggerPartition.of(Time.seconds(5), >> 100L))). >> .process(..) >> >> Unfortunately the program exits before reaching the Process function >> (moreover I need to add another window + trigger after it before adding the >> reduce function). >> Is there a way to do this with the DataStream API or should I still use >> DataSet API for the moment (when the batch will be fully supported)? I >> append to the footer all the code required to test the job. >> >> Best, >> Flavio >> >> ----------------------------------------------------------------- >> >> package org.apache.flink.stats.sketches; >> >> import org.apache.flink.api.common.functions.ReduceFunction; >> import org.apache.flink.api.common.functions.RichMapFunction; >> import org.apache.flink.api.common.state.ReducingState; >> import org.apache.flink.api.common.state.ReducingStateDescriptor; >> import org.apache.flink.api.common.typeinfo.BasicTypeInfo; >> import org.apache.flink.api.common.typeinfo.TypeInformation; >> import org.apache.flink.api.common.typeutils.base.LongSerializer; >> import org.apache.flink.api.java.io.PrintingOutputFormat; >> import org.apache.flink.api.java.tuple.Tuple2; >> import org.apache.flink.api.java.typeutils.ResultTypeQueryable; >> import org.apache.flink.api.java.typeutils.RowTypeInfo; >> import org.apache.flink.api.java.typeutils.TupleTypeInfo; >> import org.apache.flink.configuration.Configuration; >> import org.apache.flink.streaming.api.TimeCharacteristic; >> import org.apache.flink.streaming.api.datastream.DataStream; >> import >> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >> import >> org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; >> import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; >> import org.apache.flink.streaming.api.windowing.time.Time; >> import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; >> import org.apache.flink.streaming.api.windowing.triggers.Trigger; >> import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; >> import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; >> import org.apache.flink.types.Row; >> import org.apache.flink.util.Collector; >> >> public class Test { >> public static void main(String[] args) throws Exception { >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); >> env.setParallelism(1); >> >> final BasicTypeInfo<Double> columnType = >> BasicTypeInfo.DOUBLE_TYPE_INFO; >> final DataStream<Row> input = env.fromElements(// >> Row.of(1.0), // >> Row.of(2.0), // >> Row.of(3.0), // >> Row.of(5.0), // >> Row.of(6.0)).returns(new RowTypeInfo(columnType)); >> final DataStream<Row> out = input.map(new >> SubtaskIndexAssigner(columnType))// >> .keyBy(t -> t.f0)// >> .window(GlobalWindows.create()) >> .trigger(PurgingTrigger.of(new >> CountWithTimeoutTriggerPartition(Time.seconds(5), 100L))) >> .process(new ProcessWindowFunction<Tuple2<Integer, Row>, Row, >> Integer, GlobalWindow>() { >> >> @Override >> public void process(Integer key, >> ProcessWindowFunction<Tuple2<Integer, Row>, Row, Integer, >> GlobalWindow>.Context context, >> Iterable<Tuple2<Integer, Row>> it, Collector<Row> out) >> throws Exception { >> for (Tuple2<Integer, Row> tuple : it) { >> out.collect(Row.of(tuple.f1.getField(0).toString())); >> } >> >> } >> }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO)); >> out.writeUsingOutputFormat(new PrintingOutputFormat<Row>()); >> env.execute(); >> } >> >> private static final class SubtaskIndexAssigner extends >> RichMapFunction<Row, Tuple2<Integer, Row>> >> implements ResultTypeQueryable<Tuple2<Integer, Row>> { >> private static final long serialVersionUID = 1L; >> >> private int myTaskId; >> private TypeInformation<?> columnType; >> >> public SubtaskIndexAssigner(TypeInformation<?> columnType) { >> this.columnType = columnType; >> } >> >> @Override >> public void open(Configuration parameters) throws Exception { >> this.myTaskId = getRuntimeContext().getIndexOfThisSubtask(); >> } >> >> @Override >> public Tuple2<Integer, Row> map(Row row) throws Exception { >> return Tuple2.of(myTaskId, row); >> } >> >> @Override >> public TypeInformation<Tuple2<Integer, Row>> getProducedType() { >> return new TupleTypeInfo<Tuple2<Integer, >> Row>>(BasicTypeInfo.INT_TYPE_INFO, >> new RowTypeInfo(columnType)); >> } >> } >> >> private static class CountWithTimeoutTriggerPartition >> extends Trigger<Tuple2<Integer, Row>, GlobalWindow> { >> >> private static final long serialVersionUID = 1L; >> private final long maxCount; >> private final long maxTime; >> >> private final ReducingStateDescriptor<Long> countstateDesc = >> new ReducingStateDescriptor<>("count", new Sum(), >> LongSerializer.INSTANCE); >> private final ReducingStateDescriptor<Long> timestateDesc = >> new ReducingStateDescriptor<>("fire-time", new Min(), >> LongSerializer.INSTANCE); >> >> public CountWithTimeoutTriggerPartition(long maxTime, long maxCount) { >> this.maxCount = maxCount; >> this.maxTime = maxTime; >> } >> >> public CountWithTimeoutTriggerPartition(Time maxTime, long maxCount) { >> this(maxTime.toMilliseconds(), maxCount); >> } >> >> @Override >> public TriggerResult onElement(Tuple2<Integer, Row> element, long >> timestamp, >> GlobalWindow window, >> >> org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext >> ctx) >> throws Exception { >> >> ReducingState<Long> fireTimestamp = >> ctx.getPartitionedState(timestateDesc); >> >> timestamp = ctx.getCurrentProcessingTime(); >> >> if (fireTimestamp.get() == null) { >> long start = timestamp - (timestamp % maxTime); >> long nextFireTimestamp = start + maxTime; >> >> ctx.registerProcessingTimeTimer(nextFireTimestamp); >> >> fireTimestamp.add(nextFireTimestamp); >> return TriggerResult.CONTINUE; >> } >> ReducingState<Long> count = ctx.getPartitionedState(countstateDesc); >> count.add(1L); >> if (count.get() >= maxCount) { >> count.clear(); >> fireTimestamp.clear(); >> return TriggerResult.FIRE_AND_PURGE; >> } >> return TriggerResult.CONTINUE; >> } >> >> @Override >> public TriggerResult onProcessingTime(long time, GlobalWindow window, >> TriggerContext ctx) >> throws Exception { >> ReducingState<Long> fireTimestamp = >> ctx.getPartitionedState(timestateDesc); >> ReducingState<Long> count = ctx.getPartitionedState(countstateDesc); >> if (fireTimestamp.get().equals(time)) { >> count.clear(); >> fireTimestamp.clear(); >> fireTimestamp.add(time + maxTime); >> ctx.registerProcessingTimeTimer(time + maxTime); >> return TriggerResult.FIRE_AND_PURGE; >> } >> return TriggerResult.CONTINUE; >> } >> >> @Override >> public TriggerResult onEventTime(@SuppressWarnings("unused") long >> time, >> @SuppressWarnings("unused") GlobalWindow window, >> @SuppressWarnings("unused") TriggerContext ctx) throws Exception { >> return TriggerResult.CONTINUE; >> } >> >> @Override >> public void clear(GlobalWindow window, TriggerContext ctx) throws >> Exception { >> ReducingState<Long> fireTimestamp = >> ctx.getPartitionedState(timestateDesc); >> long timestamp = fireTimestamp.get(); >> ctx.deleteProcessingTimeTimer(timestamp); >> fireTimestamp.clear(); >> ctx.getPartitionedState(countstateDesc).clear(); >> } >> >> @Override >> public boolean canMerge() { >> return true; >> } >> >> @Override >> public void onMerge(GlobalWindow window, OnMergeContext ctx) { >> ctx.mergePartitionedState(countstateDesc); >> ctx.mergePartitionedState(timestateDesc); >> } >> >> class Sum implements ReduceFunction<Long> { >> private static final long serialVersionUID = 1L; >> >> @Override >> public Long reduce(Long value1, Long value2) throws Exception { >> return value1 + value2; >> } >> } >> >> class Min implements ReduceFunction<Long> { >> private static final long serialVersionUID = 1L; >> >> @Override >> public Long reduce(Long value1, Long value2) throws Exception { >> return Math.min(value1, value2); >> } >> } >> } >> >> } >> >