In this use case, couldn't the custom trigger register an event time timer
for MAX_WATERMARK, which would be triggered when the bounded input reaches
its end?

David

On Mon, Jul 20, 2020 at 5:47 PM Piotr Nowojski <pnowoj...@apache.org> wrote:

> Hi,
>
> I'm afraid that there is not out of the box way of doing this. I've
> created a ticket [1] to write down and document a discussion that we had
> about this issue in the past.
>
> The issue is that currently, untriggered processing time timers are
> ignored on end of input and it seems like there might be no one single
> perfect way to handle it for all of the cases, but it probably needs to be
> customized.
>
> Maybe you could:
> 1. extend `WindowOperator`  (`MyWindowOperator`)
> 2. implement
> `org.apache.flink.streaming.api.operators.BoundedOneInput#endInput` in your
> `MyWindowOperator`
> 3. Inside `MyWindowOperator#endInput`  invoke
> `internalTimerService.forEachProcessingTimeTimer(...)` and:
>   a) manually trigger timers `WindowOperator#onProcessingTime`
>   b) delete manually triggered timer
>
> Piotrek
>
> [1] https://issues.apache.org/jira/browse/FLINK-18647
>
> pt., 17 lip 2020 o 10:30 Flavio Pompermaier <pomperma...@okkam.it>
> napisaƂ(a):
>
>> Hi to all,
>> I was trying to port another job we have that use dataset API to
>> datastream.
>> The legacy program was doing basically a dataset.mapPartition().reduce()
>> so I tried to replicate this thing with a
>>
>>  final BasicTypeInfo<Double> columnType = BasicTypeInfo.DOUBLE_TYPE_INFO;
>>   final DataStream<Row> input = env.fromElements(//
>>         Row.of(1.0), //
>>         Row.of(2.0), //
>>         Row.of(3.0), //
>>         Row.of(5.0), //
>>         Row.of(6.0)).returns(new RowTypeInfo(columnType));
>>  inputStream.map(new SubtaskIndexAssigner(columnType))
>>         .keyBy(t -> t.f0)
>>         .window(GlobalWindows.create())
>>
>> .trigger(PurgingTrigger.of(CountWithTimeoutTriggerPartition.of(Time.seconds(5),
>> 100L))).
>>         .process(..)
>>
>> Unfortunately the program exits before reaching the Process function
>> (moreover I need to add another window + trigger after it before adding the
>> reduce function).
>> Is there a way to do this with the DataStream API or should I still use
>> DataSet API for the moment (when the batch will be fully supported)? I
>> append to the footer all the code required to test the job.
>>
>> Best,
>> Flavio
>>
>> -----------------------------------------------------------------
>>
>> package org.apache.flink.stats.sketches;
>>
>> import org.apache.flink.api.common.functions.ReduceFunction;
>> import org.apache.flink.api.common.functions.RichMapFunction;
>> import org.apache.flink.api.common.state.ReducingState;
>> import org.apache.flink.api.common.state.ReducingStateDescriptor;
>> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.api.common.typeutils.base.LongSerializer;
>> import org.apache.flink.api.java.io.PrintingOutputFormat;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
>> import org.apache.flink.api.java.typeutils.RowTypeInfo;
>> import org.apache.flink.api.java.typeutils.TupleTypeInfo;
>> import org.apache.flink.configuration.Configuration;
>> import org.apache.flink.streaming.api.TimeCharacteristic;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import
>> org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
>> import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
>> import org.apache.flink.streaming.api.windowing.time.Time;
>> import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
>> import org.apache.flink.streaming.api.windowing.triggers.Trigger;
>> import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
>> import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
>> import org.apache.flink.types.Row;
>> import org.apache.flink.util.Collector;
>>
>> public class Test {
>>   public static void main(String[] args) throws Exception {
>>     StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>>     env.setParallelism(1);
>>
>>     final BasicTypeInfo<Double> columnType =
>> BasicTypeInfo.DOUBLE_TYPE_INFO;
>>     final DataStream<Row> input = env.fromElements(//
>>         Row.of(1.0), //
>>         Row.of(2.0), //
>>         Row.of(3.0), //
>>         Row.of(5.0), //
>>         Row.of(6.0)).returns(new RowTypeInfo(columnType));
>>     final DataStream<Row> out = input.map(new
>> SubtaskIndexAssigner(columnType))//
>>         .keyBy(t -> t.f0)//
>>         .window(GlobalWindows.create())
>>         .trigger(PurgingTrigger.of(new
>> CountWithTimeoutTriggerPartition(Time.seconds(5), 100L)))
>>         .process(new ProcessWindowFunction<Tuple2<Integer, Row>, Row,
>> Integer, GlobalWindow>() {
>>
>>           @Override
>>           public void process(Integer key,
>>               ProcessWindowFunction<Tuple2<Integer, Row>, Row, Integer,
>> GlobalWindow>.Context context,
>>               Iterable<Tuple2<Integer, Row>> it, Collector<Row> out)
>> throws Exception {
>>             for (Tuple2<Integer, Row> tuple : it) {
>>               out.collect(Row.of(tuple.f1.getField(0).toString()));
>>             }
>>
>>           }
>>         }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO));
>>     out.writeUsingOutputFormat(new PrintingOutputFormat<Row>());
>>     env.execute();
>>   }
>>
>>   private static final class SubtaskIndexAssigner extends
>> RichMapFunction<Row, Tuple2<Integer, Row>>
>>       implements ResultTypeQueryable<Tuple2<Integer, Row>> {
>>     private static final long serialVersionUID = 1L;
>>
>>     private int myTaskId;
>>     private TypeInformation<?> columnType;
>>
>>     public SubtaskIndexAssigner(TypeInformation<?> columnType) {
>>       this.columnType = columnType;
>>     }
>>
>>     @Override
>>     public void open(Configuration parameters) throws Exception {
>>       this.myTaskId = getRuntimeContext().getIndexOfThisSubtask();
>>     }
>>
>>     @Override
>>     public Tuple2<Integer, Row> map(Row row) throws Exception {
>>       return Tuple2.of(myTaskId, row);
>>     }
>>
>>     @Override
>>     public TypeInformation<Tuple2<Integer, Row>> getProducedType() {
>>       return new TupleTypeInfo<Tuple2<Integer,
>> Row>>(BasicTypeInfo.INT_TYPE_INFO,
>>           new RowTypeInfo(columnType));
>>     }
>>   }
>>
>>   private static class CountWithTimeoutTriggerPartition
>>       extends Trigger<Tuple2<Integer, Row>, GlobalWindow> {
>>
>>     private static final long serialVersionUID = 1L;
>>     private final long maxCount;
>>     private final long maxTime;
>>
>>     private final ReducingStateDescriptor<Long> countstateDesc =
>>         new ReducingStateDescriptor<>("count", new Sum(),
>> LongSerializer.INSTANCE);
>>     private final ReducingStateDescriptor<Long> timestateDesc =
>>         new ReducingStateDescriptor<>("fire-time", new Min(),
>> LongSerializer.INSTANCE);
>>
>>     public CountWithTimeoutTriggerPartition(long maxTime, long maxCount) {
>>       this.maxCount = maxCount;
>>       this.maxTime = maxTime;
>>     }
>>
>>     public CountWithTimeoutTriggerPartition(Time maxTime, long maxCount) {
>>       this(maxTime.toMilliseconds(), maxCount);
>>     }
>>
>>     @Override
>>     public TriggerResult onElement(Tuple2<Integer, Row> element, long
>> timestamp,
>>         GlobalWindow window,
>>
>> org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
>> ctx)
>>         throws Exception {
>>
>>       ReducingState<Long> fireTimestamp =
>> ctx.getPartitionedState(timestateDesc);
>>
>>       timestamp = ctx.getCurrentProcessingTime();
>>
>>       if (fireTimestamp.get() == null) {
>>         long start = timestamp - (timestamp % maxTime);
>>         long nextFireTimestamp = start + maxTime;
>>
>>         ctx.registerProcessingTimeTimer(nextFireTimestamp);
>>
>>         fireTimestamp.add(nextFireTimestamp);
>>         return TriggerResult.CONTINUE;
>>       }
>>       ReducingState<Long> count = ctx.getPartitionedState(countstateDesc);
>>       count.add(1L);
>>       if (count.get() >= maxCount) {
>>         count.clear();
>>         fireTimestamp.clear();
>>         return TriggerResult.FIRE_AND_PURGE;
>>       }
>>       return TriggerResult.CONTINUE;
>>     }
>>
>>     @Override
>>     public TriggerResult onProcessingTime(long time, GlobalWindow window,
>> TriggerContext ctx)
>>         throws Exception {
>>       ReducingState<Long> fireTimestamp =
>> ctx.getPartitionedState(timestateDesc);
>>       ReducingState<Long> count = ctx.getPartitionedState(countstateDesc);
>>       if (fireTimestamp.get().equals(time)) {
>>         count.clear();
>>         fireTimestamp.clear();
>>         fireTimestamp.add(time + maxTime);
>>         ctx.registerProcessingTimeTimer(time + maxTime);
>>         return TriggerResult.FIRE_AND_PURGE;
>>       }
>>       return TriggerResult.CONTINUE;
>>     }
>>
>>     @Override
>>     public TriggerResult onEventTime(@SuppressWarnings("unused") long
>> time,
>>         @SuppressWarnings("unused") GlobalWindow window,
>>         @SuppressWarnings("unused") TriggerContext ctx) throws Exception {
>>       return TriggerResult.CONTINUE;
>>     }
>>
>>     @Override
>>     public void clear(GlobalWindow window, TriggerContext ctx) throws
>> Exception {
>>       ReducingState<Long> fireTimestamp =
>> ctx.getPartitionedState(timestateDesc);
>>       long timestamp = fireTimestamp.get();
>>       ctx.deleteProcessingTimeTimer(timestamp);
>>       fireTimestamp.clear();
>>       ctx.getPartitionedState(countstateDesc).clear();
>>     }
>>
>>     @Override
>>     public boolean canMerge() {
>>       return true;
>>     }
>>
>>     @Override
>>     public void onMerge(GlobalWindow window, OnMergeContext ctx) {
>>       ctx.mergePartitionedState(countstateDesc);
>>       ctx.mergePartitionedState(timestateDesc);
>>     }
>>
>>     class Sum implements ReduceFunction<Long> {
>>       private static final long serialVersionUID = 1L;
>>
>>       @Override
>>       public Long reduce(Long value1, Long value2) throws Exception {
>>         return value1 + value2;
>>       }
>>     }
>>
>>     class Min implements ReduceFunction<Long> {
>>       private static final long serialVersionUID = 1L;
>>
>>       @Override
>>       public Long reduce(Long value1, Long value2) throws Exception {
>>         return Math.min(value1, value2);
>>       }
>>     }
>>   }
>>
>> }
>>
>

Reply via email to