MAX_WATERMARK should be emitted automatically by the WatermarkAssignerOperator.
Piotrek pon., 27 lip 2020 o 09:16 Flavio Pompermaier <pomperma...@okkam.it> napisał(a): > Yes it could..where should I emit the MAX_WATERMARK and how do I detect > that the input reached its end? > > On Sat, Jul 25, 2020 at 8:08 PM David Anderson <da...@alpinegizmo.com> > wrote: > >> In this use case, couldn't the custom trigger register an event time >> timer for MAX_WATERMARK, which would be triggered when the bounded input >> reaches its end? >> >> David >> >> On Mon, Jul 20, 2020 at 5:47 PM Piotr Nowojski <pnowoj...@apache.org> >> wrote: >> >>> Hi, >>> >>> I'm afraid that there is not out of the box way of doing this. I've >>> created a ticket [1] to write down and document a discussion that we had >>> about this issue in the past. >>> >>> The issue is that currently, untriggered processing time timers are >>> ignored on end of input and it seems like there might be no one single >>> perfect way to handle it for all of the cases, but it probably needs to be >>> customized. >>> >>> Maybe you could: >>> 1. extend `WindowOperator` (`MyWindowOperator`) >>> 2. implement >>> `org.apache.flink.streaming.api.operators.BoundedOneInput#endInput` in your >>> `MyWindowOperator` >>> 3. Inside `MyWindowOperator#endInput` invoke >>> `internalTimerService.forEachProcessingTimeTimer(...)` and: >>> a) manually trigger timers `WindowOperator#onProcessingTime` >>> b) delete manually triggered timer >>> >>> Piotrek >>> >>> [1] https://issues.apache.org/jira/browse/FLINK-18647 >>> >>> pt., 17 lip 2020 o 10:30 Flavio Pompermaier <pomperma...@okkam.it> >>> napisał(a): >>> >>>> Hi to all, >>>> I was trying to port another job we have that use dataset API to >>>> datastream. >>>> The legacy program was doing basically a >>>> dataset.mapPartition().reduce() so I tried to replicate this thing with a >>>> >>>> final BasicTypeInfo<Double> columnType = >>>> BasicTypeInfo.DOUBLE_TYPE_INFO; >>>> final DataStream<Row> input = env.fromElements(// >>>> Row.of(1.0), // >>>> Row.of(2.0), // >>>> Row.of(3.0), // >>>> Row.of(5.0), // >>>> Row.of(6.0)).returns(new RowTypeInfo(columnType)); >>>> inputStream.map(new SubtaskIndexAssigner(columnType)) >>>> .keyBy(t -> t.f0) >>>> .window(GlobalWindows.create()) >>>> >>>> .trigger(PurgingTrigger.of(CountWithTimeoutTriggerPartition.of(Time.seconds(5), >>>> 100L))). >>>> .process(..) >>>> >>>> Unfortunately the program exits before reaching the Process function >>>> (moreover I need to add another window + trigger after it before adding the >>>> reduce function). >>>> Is there a way to do this with the DataStream API or should I still use >>>> DataSet API for the moment (when the batch will be fully supported)? I >>>> append to the footer all the code required to test the job. >>>> >>>> Best, >>>> Flavio >>>> >>>> ----------------------------------------------------------------- >>>> >>>> package org.apache.flink.stats.sketches; >>>> >>>> import org.apache.flink.api.common.functions.ReduceFunction; >>>> import org.apache.flink.api.common.functions.RichMapFunction; >>>> import org.apache.flink.api.common.state.ReducingState; >>>> import org.apache.flink.api.common.state.ReducingStateDescriptor; >>>> import org.apache.flink.api.common.typeinfo.BasicTypeInfo; >>>> import org.apache.flink.api.common.typeinfo.TypeInformation; >>>> import org.apache.flink.api.common.typeutils.base.LongSerializer; >>>> import org.apache.flink.api.java.io.PrintingOutputFormat; >>>> import org.apache.flink.api.java.tuple.Tuple2; >>>> import org.apache.flink.api.java.typeutils.ResultTypeQueryable; >>>> import org.apache.flink.api.java.typeutils.RowTypeInfo; >>>> import org.apache.flink.api.java.typeutils.TupleTypeInfo; >>>> import org.apache.flink.configuration.Configuration; >>>> import org.apache.flink.streaming.api.TimeCharacteristic; >>>> import org.apache.flink.streaming.api.datastream.DataStream; >>>> import >>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; >>>> import >>>> org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction; >>>> import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows; >>>> import org.apache.flink.streaming.api.windowing.time.Time; >>>> import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger; >>>> import org.apache.flink.streaming.api.windowing.triggers.Trigger; >>>> import org.apache.flink.streaming.api.windowing.triggers.TriggerResult; >>>> import org.apache.flink.streaming.api.windowing.windows.GlobalWindow; >>>> import org.apache.flink.types.Row; >>>> import org.apache.flink.util.Collector; >>>> >>>> public class Test { >>>> public static void main(String[] args) throws Exception { >>>> StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); >>>> env.setParallelism(1); >>>> >>>> final BasicTypeInfo<Double> columnType = >>>> BasicTypeInfo.DOUBLE_TYPE_INFO; >>>> final DataStream<Row> input = env.fromElements(// >>>> Row.of(1.0), // >>>> Row.of(2.0), // >>>> Row.of(3.0), // >>>> Row.of(5.0), // >>>> Row.of(6.0)).returns(new RowTypeInfo(columnType)); >>>> final DataStream<Row> out = input.map(new >>>> SubtaskIndexAssigner(columnType))// >>>> .keyBy(t -> t.f0)// >>>> .window(GlobalWindows.create()) >>>> .trigger(PurgingTrigger.of(new >>>> CountWithTimeoutTriggerPartition(Time.seconds(5), 100L))) >>>> .process(new ProcessWindowFunction<Tuple2<Integer, Row>, Row, >>>> Integer, GlobalWindow>() { >>>> >>>> @Override >>>> public void process(Integer key, >>>> ProcessWindowFunction<Tuple2<Integer, Row>, Row, Integer, >>>> GlobalWindow>.Context context, >>>> Iterable<Tuple2<Integer, Row>> it, Collector<Row> out) >>>> throws Exception { >>>> for (Tuple2<Integer, Row> tuple : it) { >>>> out.collect(Row.of(tuple.f1.getField(0).toString())); >>>> } >>>> >>>> } >>>> }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO)); >>>> out.writeUsingOutputFormat(new PrintingOutputFormat<Row>()); >>>> env.execute(); >>>> } >>>> >>>> private static final class SubtaskIndexAssigner extends >>>> RichMapFunction<Row, Tuple2<Integer, Row>> >>>> implements ResultTypeQueryable<Tuple2<Integer, Row>> { >>>> private static final long serialVersionUID = 1L; >>>> >>>> private int myTaskId; >>>> private TypeInformation<?> columnType; >>>> >>>> public SubtaskIndexAssigner(TypeInformation<?> columnType) { >>>> this.columnType = columnType; >>>> } >>>> >>>> @Override >>>> public void open(Configuration parameters) throws Exception { >>>> this.myTaskId = getRuntimeContext().getIndexOfThisSubtask(); >>>> } >>>> >>>> @Override >>>> public Tuple2<Integer, Row> map(Row row) throws Exception { >>>> return Tuple2.of(myTaskId, row); >>>> } >>>> >>>> @Override >>>> public TypeInformation<Tuple2<Integer, Row>> getProducedType() { >>>> return new TupleTypeInfo<Tuple2<Integer, >>>> Row>>(BasicTypeInfo.INT_TYPE_INFO, >>>> new RowTypeInfo(columnType)); >>>> } >>>> } >>>> >>>> private static class CountWithTimeoutTriggerPartition >>>> extends Trigger<Tuple2<Integer, Row>, GlobalWindow> { >>>> >>>> private static final long serialVersionUID = 1L; >>>> private final long maxCount; >>>> private final long maxTime; >>>> >>>> private final ReducingStateDescriptor<Long> countstateDesc = >>>> new ReducingStateDescriptor<>("count", new Sum(), >>>> LongSerializer.INSTANCE); >>>> private final ReducingStateDescriptor<Long> timestateDesc = >>>> new ReducingStateDescriptor<>("fire-time", new Min(), >>>> LongSerializer.INSTANCE); >>>> >>>> public CountWithTimeoutTriggerPartition(long maxTime, long >>>> maxCount) { >>>> this.maxCount = maxCount; >>>> this.maxTime = maxTime; >>>> } >>>> >>>> public CountWithTimeoutTriggerPartition(Time maxTime, long >>>> maxCount) { >>>> this(maxTime.toMilliseconds(), maxCount); >>>> } >>>> >>>> @Override >>>> public TriggerResult onElement(Tuple2<Integer, Row> element, long >>>> timestamp, >>>> GlobalWindow window, >>>> >>>> org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext >>>> ctx) >>>> throws Exception { >>>> >>>> ReducingState<Long> fireTimestamp = >>>> ctx.getPartitionedState(timestateDesc); >>>> >>>> timestamp = ctx.getCurrentProcessingTime(); >>>> >>>> if (fireTimestamp.get() == null) { >>>> long start = timestamp - (timestamp % maxTime); >>>> long nextFireTimestamp = start + maxTime; >>>> >>>> ctx.registerProcessingTimeTimer(nextFireTimestamp); >>>> >>>> fireTimestamp.add(nextFireTimestamp); >>>> return TriggerResult.CONTINUE; >>>> } >>>> ReducingState<Long> count = >>>> ctx.getPartitionedState(countstateDesc); >>>> count.add(1L); >>>> if (count.get() >= maxCount) { >>>> count.clear(); >>>> fireTimestamp.clear(); >>>> return TriggerResult.FIRE_AND_PURGE; >>>> } >>>> return TriggerResult.CONTINUE; >>>> } >>>> >>>> @Override >>>> public TriggerResult onProcessingTime(long time, GlobalWindow >>>> window, TriggerContext ctx) >>>> throws Exception { >>>> ReducingState<Long> fireTimestamp = >>>> ctx.getPartitionedState(timestateDesc); >>>> ReducingState<Long> count = >>>> ctx.getPartitionedState(countstateDesc); >>>> if (fireTimestamp.get().equals(time)) { >>>> count.clear(); >>>> fireTimestamp.clear(); >>>> fireTimestamp.add(time + maxTime); >>>> ctx.registerProcessingTimeTimer(time + maxTime); >>>> return TriggerResult.FIRE_AND_PURGE; >>>> } >>>> return TriggerResult.CONTINUE; >>>> } >>>> >>>> @Override >>>> public TriggerResult onEventTime(@SuppressWarnings("unused") long >>>> time, >>>> @SuppressWarnings("unused") GlobalWindow window, >>>> @SuppressWarnings("unused") TriggerContext ctx) throws >>>> Exception { >>>> return TriggerResult.CONTINUE; >>>> } >>>> >>>> @Override >>>> public void clear(GlobalWindow window, TriggerContext ctx) throws >>>> Exception { >>>> ReducingState<Long> fireTimestamp = >>>> ctx.getPartitionedState(timestateDesc); >>>> long timestamp = fireTimestamp.get(); >>>> ctx.deleteProcessingTimeTimer(timestamp); >>>> fireTimestamp.clear(); >>>> ctx.getPartitionedState(countstateDesc).clear(); >>>> } >>>> >>>> @Override >>>> public boolean canMerge() { >>>> return true; >>>> } >>>> >>>> @Override >>>> public void onMerge(GlobalWindow window, OnMergeContext ctx) { >>>> ctx.mergePartitionedState(countstateDesc); >>>> ctx.mergePartitionedState(timestateDesc); >>>> } >>>> >>>> class Sum implements ReduceFunction<Long> { >>>> private static final long serialVersionUID = 1L; >>>> >>>> @Override >>>> public Long reduce(Long value1, Long value2) throws Exception { >>>> return value1 + value2; >>>> } >>>> } >>>> >>>> class Min implements ReduceFunction<Long> { >>>> private static final long serialVersionUID = 1L; >>>> >>>> @Override >>>> public Long reduce(Long value1, Long value2) throws Exception { >>>> return Math.min(value1, value2); >>>> } >>>> } >>>> } >>>> >>>> } >>>> >>> >