MAX_WATERMARK should be emitted automatically by the
WatermarkAssignerOperator.

Piotrek

pon., 27 lip 2020 o 09:16 Flavio Pompermaier <pomperma...@okkam.it>
napisał(a):

> Yes it could..where should I emit the MAX_WATERMARK and how do I detect
> that the input reached its end?
>
> On Sat, Jul 25, 2020 at 8:08 PM David Anderson <da...@alpinegizmo.com>
> wrote:
>
>> In this use case, couldn't the custom trigger register an event time
>> timer for MAX_WATERMARK, which would be triggered when the bounded input
>> reaches its end?
>>
>> David
>>
>> On Mon, Jul 20, 2020 at 5:47 PM Piotr Nowojski <pnowoj...@apache.org>
>> wrote:
>>
>>> Hi,
>>>
>>> I'm afraid that there is not out of the box way of doing this. I've
>>> created a ticket [1] to write down and document a discussion that we had
>>> about this issue in the past.
>>>
>>> The issue is that currently, untriggered processing time timers are
>>> ignored on end of input and it seems like there might be no one single
>>> perfect way to handle it for all of the cases, but it probably needs to be
>>> customized.
>>>
>>> Maybe you could:
>>> 1. extend `WindowOperator`  (`MyWindowOperator`)
>>> 2. implement
>>> `org.apache.flink.streaming.api.operators.BoundedOneInput#endInput` in your
>>> `MyWindowOperator`
>>> 3. Inside `MyWindowOperator#endInput`  invoke
>>> `internalTimerService.forEachProcessingTimeTimer(...)` and:
>>>   a) manually trigger timers `WindowOperator#onProcessingTime`
>>>   b) delete manually triggered timer
>>>
>>> Piotrek
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-18647
>>>
>>> pt., 17 lip 2020 o 10:30 Flavio Pompermaier <pomperma...@okkam.it>
>>> napisał(a):
>>>
>>>> Hi to all,
>>>> I was trying to port another job we have that use dataset API to
>>>> datastream.
>>>> The legacy program was doing basically a
>>>> dataset.mapPartition().reduce() so I tried to replicate this thing with a
>>>>
>>>>  final BasicTypeInfo<Double> columnType =
>>>> BasicTypeInfo.DOUBLE_TYPE_INFO;
>>>>   final DataStream<Row> input = env.fromElements(//
>>>>         Row.of(1.0), //
>>>>         Row.of(2.0), //
>>>>         Row.of(3.0), //
>>>>         Row.of(5.0), //
>>>>         Row.of(6.0)).returns(new RowTypeInfo(columnType));
>>>>  inputStream.map(new SubtaskIndexAssigner(columnType))
>>>>         .keyBy(t -> t.f0)
>>>>         .window(GlobalWindows.create())
>>>>
>>>> .trigger(PurgingTrigger.of(CountWithTimeoutTriggerPartition.of(Time.seconds(5),
>>>> 100L))).
>>>>         .process(..)
>>>>
>>>> Unfortunately the program exits before reaching the Process function
>>>> (moreover I need to add another window + trigger after it before adding the
>>>> reduce function).
>>>> Is there a way to do this with the DataStream API or should I still use
>>>> DataSet API for the moment (when the batch will be fully supported)? I
>>>> append to the footer all the code required to test the job.
>>>>
>>>> Best,
>>>> Flavio
>>>>
>>>> -----------------------------------------------------------------
>>>>
>>>> package org.apache.flink.stats.sketches;
>>>>
>>>> import org.apache.flink.api.common.functions.ReduceFunction;
>>>> import org.apache.flink.api.common.functions.RichMapFunction;
>>>> import org.apache.flink.api.common.state.ReducingState;
>>>> import org.apache.flink.api.common.state.ReducingStateDescriptor;
>>>> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
>>>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>>>> import org.apache.flink.api.common.typeutils.base.LongSerializer;
>>>> import org.apache.flink.api.java.io.PrintingOutputFormat;
>>>> import org.apache.flink.api.java.tuple.Tuple2;
>>>> import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
>>>> import org.apache.flink.api.java.typeutils.RowTypeInfo;
>>>> import org.apache.flink.api.java.typeutils.TupleTypeInfo;
>>>> import org.apache.flink.configuration.Configuration;
>>>> import org.apache.flink.streaming.api.TimeCharacteristic;
>>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>>> import
>>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>>> import
>>>> org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
>>>> import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
>>>> import org.apache.flink.streaming.api.windowing.time.Time;
>>>> import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
>>>> import org.apache.flink.streaming.api.windowing.triggers.Trigger;
>>>> import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
>>>> import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
>>>> import org.apache.flink.types.Row;
>>>> import org.apache.flink.util.Collector;
>>>>
>>>> public class Test {
>>>>   public static void main(String[] args) throws Exception {
>>>>     StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>>>>     env.setParallelism(1);
>>>>
>>>>     final BasicTypeInfo<Double> columnType =
>>>> BasicTypeInfo.DOUBLE_TYPE_INFO;
>>>>     final DataStream<Row> input = env.fromElements(//
>>>>         Row.of(1.0), //
>>>>         Row.of(2.0), //
>>>>         Row.of(3.0), //
>>>>         Row.of(5.0), //
>>>>         Row.of(6.0)).returns(new RowTypeInfo(columnType));
>>>>     final DataStream<Row> out = input.map(new
>>>> SubtaskIndexAssigner(columnType))//
>>>>         .keyBy(t -> t.f0)//
>>>>         .window(GlobalWindows.create())
>>>>         .trigger(PurgingTrigger.of(new
>>>> CountWithTimeoutTriggerPartition(Time.seconds(5), 100L)))
>>>>         .process(new ProcessWindowFunction<Tuple2<Integer, Row>, Row,
>>>> Integer, GlobalWindow>() {
>>>>
>>>>           @Override
>>>>           public void process(Integer key,
>>>>               ProcessWindowFunction<Tuple2<Integer, Row>, Row, Integer,
>>>> GlobalWindow>.Context context,
>>>>               Iterable<Tuple2<Integer, Row>> it, Collector<Row> out)
>>>> throws Exception {
>>>>             for (Tuple2<Integer, Row> tuple : it) {
>>>>               out.collect(Row.of(tuple.f1.getField(0).toString()));
>>>>             }
>>>>
>>>>           }
>>>>         }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO));
>>>>     out.writeUsingOutputFormat(new PrintingOutputFormat<Row>());
>>>>     env.execute();
>>>>   }
>>>>
>>>>   private static final class SubtaskIndexAssigner extends
>>>> RichMapFunction<Row, Tuple2<Integer, Row>>
>>>>       implements ResultTypeQueryable<Tuple2<Integer, Row>> {
>>>>     private static final long serialVersionUID = 1L;
>>>>
>>>>     private int myTaskId;
>>>>     private TypeInformation<?> columnType;
>>>>
>>>>     public SubtaskIndexAssigner(TypeInformation<?> columnType) {
>>>>       this.columnType = columnType;
>>>>     }
>>>>
>>>>     @Override
>>>>     public void open(Configuration parameters) throws Exception {
>>>>       this.myTaskId = getRuntimeContext().getIndexOfThisSubtask();
>>>>     }
>>>>
>>>>     @Override
>>>>     public Tuple2<Integer, Row> map(Row row) throws Exception {
>>>>       return Tuple2.of(myTaskId, row);
>>>>     }
>>>>
>>>>     @Override
>>>>     public TypeInformation<Tuple2<Integer, Row>> getProducedType() {
>>>>       return new TupleTypeInfo<Tuple2<Integer,
>>>> Row>>(BasicTypeInfo.INT_TYPE_INFO,
>>>>           new RowTypeInfo(columnType));
>>>>     }
>>>>   }
>>>>
>>>>   private static class CountWithTimeoutTriggerPartition
>>>>       extends Trigger<Tuple2<Integer, Row>, GlobalWindow> {
>>>>
>>>>     private static final long serialVersionUID = 1L;
>>>>     private final long maxCount;
>>>>     private final long maxTime;
>>>>
>>>>     private final ReducingStateDescriptor<Long> countstateDesc =
>>>>         new ReducingStateDescriptor<>("count", new Sum(),
>>>> LongSerializer.INSTANCE);
>>>>     private final ReducingStateDescriptor<Long> timestateDesc =
>>>>         new ReducingStateDescriptor<>("fire-time", new Min(),
>>>> LongSerializer.INSTANCE);
>>>>
>>>>     public CountWithTimeoutTriggerPartition(long maxTime, long
>>>> maxCount) {
>>>>       this.maxCount = maxCount;
>>>>       this.maxTime = maxTime;
>>>>     }
>>>>
>>>>     public CountWithTimeoutTriggerPartition(Time maxTime, long
>>>> maxCount) {
>>>>       this(maxTime.toMilliseconds(), maxCount);
>>>>     }
>>>>
>>>>     @Override
>>>>     public TriggerResult onElement(Tuple2<Integer, Row> element, long
>>>> timestamp,
>>>>         GlobalWindow window,
>>>>
>>>> org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
>>>> ctx)
>>>>         throws Exception {
>>>>
>>>>       ReducingState<Long> fireTimestamp =
>>>> ctx.getPartitionedState(timestateDesc);
>>>>
>>>>       timestamp = ctx.getCurrentProcessingTime();
>>>>
>>>>       if (fireTimestamp.get() == null) {
>>>>         long start = timestamp - (timestamp % maxTime);
>>>>         long nextFireTimestamp = start + maxTime;
>>>>
>>>>         ctx.registerProcessingTimeTimer(nextFireTimestamp);
>>>>
>>>>         fireTimestamp.add(nextFireTimestamp);
>>>>         return TriggerResult.CONTINUE;
>>>>       }
>>>>       ReducingState<Long> count =
>>>> ctx.getPartitionedState(countstateDesc);
>>>>       count.add(1L);
>>>>       if (count.get() >= maxCount) {
>>>>         count.clear();
>>>>         fireTimestamp.clear();
>>>>         return TriggerResult.FIRE_AND_PURGE;
>>>>       }
>>>>       return TriggerResult.CONTINUE;
>>>>     }
>>>>
>>>>     @Override
>>>>     public TriggerResult onProcessingTime(long time, GlobalWindow
>>>> window, TriggerContext ctx)
>>>>         throws Exception {
>>>>       ReducingState<Long> fireTimestamp =
>>>> ctx.getPartitionedState(timestateDesc);
>>>>       ReducingState<Long> count =
>>>> ctx.getPartitionedState(countstateDesc);
>>>>       if (fireTimestamp.get().equals(time)) {
>>>>         count.clear();
>>>>         fireTimestamp.clear();
>>>>         fireTimestamp.add(time + maxTime);
>>>>         ctx.registerProcessingTimeTimer(time + maxTime);
>>>>         return TriggerResult.FIRE_AND_PURGE;
>>>>       }
>>>>       return TriggerResult.CONTINUE;
>>>>     }
>>>>
>>>>     @Override
>>>>     public TriggerResult onEventTime(@SuppressWarnings("unused") long
>>>> time,
>>>>         @SuppressWarnings("unused") GlobalWindow window,
>>>>         @SuppressWarnings("unused") TriggerContext ctx) throws
>>>> Exception {
>>>>       return TriggerResult.CONTINUE;
>>>>     }
>>>>
>>>>     @Override
>>>>     public void clear(GlobalWindow window, TriggerContext ctx) throws
>>>> Exception {
>>>>       ReducingState<Long> fireTimestamp =
>>>> ctx.getPartitionedState(timestateDesc);
>>>>       long timestamp = fireTimestamp.get();
>>>>       ctx.deleteProcessingTimeTimer(timestamp);
>>>>       fireTimestamp.clear();
>>>>       ctx.getPartitionedState(countstateDesc).clear();
>>>>     }
>>>>
>>>>     @Override
>>>>     public boolean canMerge() {
>>>>       return true;
>>>>     }
>>>>
>>>>     @Override
>>>>     public void onMerge(GlobalWindow window, OnMergeContext ctx) {
>>>>       ctx.mergePartitionedState(countstateDesc);
>>>>       ctx.mergePartitionedState(timestateDesc);
>>>>     }
>>>>
>>>>     class Sum implements ReduceFunction<Long> {
>>>>       private static final long serialVersionUID = 1L;
>>>>
>>>>       @Override
>>>>       public Long reduce(Long value1, Long value2) throws Exception {
>>>>         return value1 + value2;
>>>>       }
>>>>     }
>>>>
>>>>     class Min implements ReduceFunction<Long> {
>>>>       private static final long serialVersionUID = 1L;
>>>>
>>>>       @Override
>>>>       public Long reduce(Long value1, Long value2) throws Exception {
>>>>         return Math.min(value1, value2);
>>>>       }
>>>>     }
>>>>   }
>>>>
>>>> }
>>>>
>>>
>

Reply via email to