Yes it could..where should I emit the MAX_WATERMARK and how do I detect that the input reached its end?
On Sat, Jul 25, 2020 at 8:08 PM David Anderson <da...@alpinegizmo.com> wrote: > In this use case, couldn't the custom trigger register an event time timer > for MAX_WATERMARK, which would be triggered when the bounded input reaches > its end? > > David > > On Mon, Jul 20, 2020 at 5:47 PM Piotr Nowojski <pnowoj...@apache.org> > wrote: > >> Hi, >> >> I'm afraid that there is not out of the box way of doing this. I've >> created a ticket [1] to write down and document a discussion that we had >> about this issue in the past. >> >> The issue is that currently, untriggered processing time timers are >> ignored on end of input and it seems like there might be no one single >> perfect way to handle it for all of the cases, but it probably needs to be >> customized. >> >> Maybe you could: >> 1. extend `WindowOperator` (`MyWindowOperator`) >> 2. implement >> `org.apache.flink.streaming.api.operators.BoundedOneInput#endInput` in your >> `MyWindowOperator` >> 3. Inside `MyWindowOperator#endInput` invoke >> `internalTimerService.forEachProcessingTimeTimer(...)` and: >> a) manually trigger timers `WindowOperator#onProcessingTime` >> b) delete manually triggered timer >> >> Piotrek >> >> [1] https://issues.apache.org/jira/browse/FLINK-18647 >> >> pt., 17 lip 2020 o 10:30 Flavio Pompermaier <pomperma...@okkam.it> >> napisaĆ(a): >> >>> Hi to all, >>> I was trying to port another job we have that use dataset API to >>> datastream. >>> The legacy program was doing basically a dataset.mapPartition().reduce() >>> so I tried to replicate this thing with a >>> >>> final BasicTypeInfo<Double> columnType = BasicTypeInfo.DOUBLE_TYPE_INFO; >>> final DataStream<Row> input = env.fromElements(// >>> Row.of(1.0), // >>> Row.of(2.0), // >>> Row.of(3.0), // >>> Row.of(5.0), // >>> Row.of(6.0)).returns(new RowTypeInfo(columnType)); >>> inputStream.map(new SubtaskIndexAssigner(columnType)) >>> .keyBy(t -> t.f0) >>> .window(GlobalWindows.create()) >>> >>> .trigger(PurgingTrigger.of(CountWithTimeoutTriggerPartition.of(Time.seconds(5), >>> 100L))). >>> .process(..) >>> >>> Unfortunately the program exits before reaching the Process function >>> (moreover I need to add another window + trigger after it before adding the >>> reduce function). >>> Is there a way to do this with the DataStream API or should I still use >>> DataSet API for the moment (when the batch will be fully supported)? I >>> append to the footer all the code required to test the job. >>> >>> Best, >>> Flavio >>> >>> ----------------------------------------------------------------- >>> >>> package org.apache.flink.stats.sketches; >>> >>> import org.apache.flink.api.common.functions.ReduceFunction; >>> import org.apache.flink.api.common.functions.RichMapFunction; >>> import org.apache.flink.api.common.state.ReducingState; >>> import org.apache.flink.api.common.state.ReducingStateDescriptor; >>> import org.apache.flink.api.common.typeinfo.BasicTypeInfo; >>> import org.apache.flink.api.common.typeinfo.TypeInformation; >>> import org.apache.flink.api.common.typeutils.base.LongSerializer; >>> import org.apache.flink.api.java.io.PrintingOutputFormat; >>> import org.apache.flink.api.java.tuple.Tuple2; >>> import org.apache.flink.api.java.typeutils.ResultTypeQueryable; >>> import org.apache.flink.api.java.typeutils.RowTypeInfo; >>> import org.apache.flink.api.java.typeutils.TupleTypeInfo; >>> import org.apache.flink.configuration.Configuration; >>> import org.apache.flink.streaming.api.TimeCharacteristic; >>> import org.apache.flink.streaming.api.datastream.DataStream; >>> import >>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>> import >>> org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; >>> import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; >>> import org.apache.flink.streaming.api.windowing.time.Time; >>> import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; >>> import org.apache.flink.streaming.api.windowing.triggers.Trigger; >>> import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; >>> import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; >>> import org.apache.flink.types.Row; >>> import org.apache.flink.util.Collector; >>> >>> public class Test { >>> public static void main(String[] args) throws Exception { >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); >>> env.setParallelism(1); >>> >>> final BasicTypeInfo<Double> columnType = >>> BasicTypeInfo.DOUBLE_TYPE_INFO; >>> final DataStream<Row> input = env.fromElements(// >>> Row.of(1.0), // >>> Row.of(2.0), // >>> Row.of(3.0), // >>> Row.of(5.0), // >>> Row.of(6.0)).returns(new RowTypeInfo(columnType)); >>> final DataStream<Row> out = input.map(new >>> SubtaskIndexAssigner(columnType))// >>> .keyBy(t -> t.f0)// >>> .window(GlobalWindows.create()) >>> .trigger(PurgingTrigger.of(new >>> CountWithTimeoutTriggerPartition(Time.seconds(5), 100L))) >>> .process(new ProcessWindowFunction<Tuple2<Integer, Row>, Row, >>> Integer, GlobalWindow>() { >>> >>> @Override >>> public void process(Integer key, >>> ProcessWindowFunction<Tuple2<Integer, Row>, Row, Integer, >>> GlobalWindow>.Context context, >>> Iterable<Tuple2<Integer, Row>> it, Collector<Row> out) >>> throws Exception { >>> for (Tuple2<Integer, Row> tuple : it) { >>> out.collect(Row.of(tuple.f1.getField(0).toString())); >>> } >>> >>> } >>> }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO)); >>> out.writeUsingOutputFormat(new PrintingOutputFormat<Row>()); >>> env.execute(); >>> } >>> >>> private static final class SubtaskIndexAssigner extends >>> RichMapFunction<Row, Tuple2<Integer, Row>> >>> implements ResultTypeQueryable<Tuple2<Integer, Row>> { >>> private static final long serialVersionUID = 1L; >>> >>> private int myTaskId; >>> private TypeInformation<?> columnType; >>> >>> public SubtaskIndexAssigner(TypeInformation<?> columnType) { >>> this.columnType = columnType; >>> } >>> >>> @Override >>> public void open(Configuration parameters) throws Exception { >>> this.myTaskId = getRuntimeContext().getIndexOfThisSubtask(); >>> } >>> >>> @Override >>> public Tuple2<Integer, Row> map(Row row) throws Exception { >>> return Tuple2.of(myTaskId, row); >>> } >>> >>> @Override >>> public TypeInformation<Tuple2<Integer, Row>> getProducedType() { >>> return new TupleTypeInfo<Tuple2<Integer, >>> Row>>(BasicTypeInfo.INT_TYPE_INFO, >>> new RowTypeInfo(columnType)); >>> } >>> } >>> >>> private static class CountWithTimeoutTriggerPartition >>> extends Trigger<Tuple2<Integer, Row>, GlobalWindow> { >>> >>> private static final long serialVersionUID = 1L; >>> private final long maxCount; >>> private final long maxTime; >>> >>> private final ReducingStateDescriptor<Long> countstateDesc = >>> new ReducingStateDescriptor<>("count", new Sum(), >>> LongSerializer.INSTANCE); >>> private final ReducingStateDescriptor<Long> timestateDesc = >>> new ReducingStateDescriptor<>("fire-time", new Min(), >>> LongSerializer.INSTANCE); >>> >>> public CountWithTimeoutTriggerPartition(long maxTime, long maxCount) >>> { >>> this.maxCount = maxCount; >>> this.maxTime = maxTime; >>> } >>> >>> public CountWithTimeoutTriggerPartition(Time maxTime, long maxCount) >>> { >>> this(maxTime.toMilliseconds(), maxCount); >>> } >>> >>> @Override >>> public TriggerResult onElement(Tuple2<Integer, Row> element, long >>> timestamp, >>> GlobalWindow window, >>> >>> org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext >>> ctx) >>> throws Exception { >>> >>> ReducingState<Long> fireTimestamp = >>> ctx.getPartitionedState(timestateDesc); >>> >>> timestamp = ctx.getCurrentProcessingTime(); >>> >>> if (fireTimestamp.get() == null) { >>> long start = timestamp - (timestamp % maxTime); >>> long nextFireTimestamp = start + maxTime; >>> >>> ctx.registerProcessingTimeTimer(nextFireTimestamp); >>> >>> fireTimestamp.add(nextFireTimestamp); >>> return TriggerResult.CONTINUE; >>> } >>> ReducingState<Long> count = >>> ctx.getPartitionedState(countstateDesc); >>> count.add(1L); >>> if (count.get() >= maxCount) { >>> count.clear(); >>> fireTimestamp.clear(); >>> return TriggerResult.FIRE_AND_PURGE; >>> } >>> return TriggerResult.CONTINUE; >>> } >>> >>> @Override >>> public TriggerResult onProcessingTime(long time, GlobalWindow >>> window, TriggerContext ctx) >>> throws Exception { >>> ReducingState<Long> fireTimestamp = >>> ctx.getPartitionedState(timestateDesc); >>> ReducingState<Long> count = >>> ctx.getPartitionedState(countstateDesc); >>> if (fireTimestamp.get().equals(time)) { >>> count.clear(); >>> fireTimestamp.clear(); >>> fireTimestamp.add(time + maxTime); >>> ctx.registerProcessingTimeTimer(time + maxTime); >>> return TriggerResult.FIRE_AND_PURGE; >>> } >>> return TriggerResult.CONTINUE; >>> } >>> >>> @Override >>> public TriggerResult onEventTime(@SuppressWarnings("unused") long >>> time, >>> @SuppressWarnings("unused") GlobalWindow window, >>> @SuppressWarnings("unused") TriggerContext ctx) throws Exception >>> { >>> return TriggerResult.CONTINUE; >>> } >>> >>> @Override >>> public void clear(GlobalWindow window, TriggerContext ctx) throws >>> Exception { >>> ReducingState<Long> fireTimestamp = >>> ctx.getPartitionedState(timestateDesc); >>> long timestamp = fireTimestamp.get(); >>> ctx.deleteProcessingTimeTimer(timestamp); >>> fireTimestamp.clear(); >>> ctx.getPartitionedState(countstateDesc).clear(); >>> } >>> >>> @Override >>> public boolean canMerge() { >>> return true; >>> } >>> >>> @Override >>> public void onMerge(GlobalWindow window, OnMergeContext ctx) { >>> ctx.mergePartitionedState(countstateDesc); >>> ctx.mergePartitionedState(timestateDesc); >>> } >>> >>> class Sum implements ReduceFunction<Long> { >>> private static final long serialVersionUID = 1L; >>> >>> @Override >>> public Long reduce(Long value1, Long value2) throws Exception { >>> return value1 + value2; >>> } >>> } >>> >>> class Min implements ReduceFunction<Long> { >>> private static final long serialVersionUID = 1L; >>> >>> @Override >>> public Long reduce(Long value1, Long value2) throws Exception { >>> return Math.min(value1, value2); >>> } >>> } >>> } >>> >>> } >>> >>