Yes it could..where should I emit the MAX_WATERMARK and how do I detect
that the input reached its end?

On Sat, Jul 25, 2020 at 8:08 PM David Anderson <da...@alpinegizmo.com>
wrote:

> In this use case, couldn't the custom trigger register an event time timer
> for MAX_WATERMARK, which would be triggered when the bounded input reaches
> its end?
>
> David
>
> On Mon, Jul 20, 2020 at 5:47 PM Piotr Nowojski <pnowoj...@apache.org>
> wrote:
>
>> Hi,
>>
>> I'm afraid that there is not out of the box way of doing this. I've
>> created a ticket [1] to write down and document a discussion that we had
>> about this issue in the past.
>>
>> The issue is that currently, untriggered processing time timers are
>> ignored on end of input and it seems like there might be no one single
>> perfect way to handle it for all of the cases, but it probably needs to be
>> customized.
>>
>> Maybe you could:
>> 1. extend `WindowOperator`  (`MyWindowOperator`)
>> 2. implement
>> `org.apache.flink.streaming.api.operators.BoundedOneInput#endInput` in your
>> `MyWindowOperator`
>> 3. Inside `MyWindowOperator#endInput`  invoke
>> `internalTimerService.forEachProcessingTimeTimer(...)` and:
>>   a) manually trigger timers `WindowOperator#onProcessingTime`
>>   b) delete manually triggered timer
>>
>> Piotrek
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-18647
>>
>> pt., 17 lip 2020 o 10:30 Flavio Pompermaier <pomperma...@okkam.it>
>> napisaƂ(a):
>>
>>> Hi to all,
>>> I was trying to port another job we have that use dataset API to
>>> datastream.
>>> The legacy program was doing basically a dataset.mapPartition().reduce()
>>> so I tried to replicate this thing with a
>>>
>>>  final BasicTypeInfo<Double> columnType = BasicTypeInfo.DOUBLE_TYPE_INFO;
>>>   final DataStream<Row> input = env.fromElements(//
>>>         Row.of(1.0), //
>>>         Row.of(2.0), //
>>>         Row.of(3.0), //
>>>         Row.of(5.0), //
>>>         Row.of(6.0)).returns(new RowTypeInfo(columnType));
>>>  inputStream.map(new SubtaskIndexAssigner(columnType))
>>>         .keyBy(t -> t.f0)
>>>         .window(GlobalWindows.create())
>>>
>>> .trigger(PurgingTrigger.of(CountWithTimeoutTriggerPartition.of(Time.seconds(5),
>>> 100L))).
>>>         .process(..)
>>>
>>> Unfortunately the program exits before reaching the Process function
>>> (moreover I need to add another window + trigger after it before adding the
>>> reduce function).
>>> Is there a way to do this with the DataStream API or should I still use
>>> DataSet API for the moment (when the batch will be fully supported)? I
>>> append to the footer all the code required to test the job.
>>>
>>> Best,
>>> Flavio
>>>
>>> -----------------------------------------------------------------
>>>
>>> package org.apache.flink.stats.sketches;
>>>
>>> import org.apache.flink.api.common.functions.ReduceFunction;
>>> import org.apache.flink.api.common.functions.RichMapFunction;
>>> import org.apache.flink.api.common.state.ReducingState;
>>> import org.apache.flink.api.common.state.ReducingStateDescriptor;
>>> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
>>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>>> import org.apache.flink.api.common.typeutils.base.LongSerializer;
>>> import org.apache.flink.api.java.io.PrintingOutputFormat;
>>> import org.apache.flink.api.java.tuple.Tuple2;
>>> import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
>>> import org.apache.flink.api.java.typeutils.RowTypeInfo;
>>> import org.apache.flink.api.java.typeutils.TupleTypeInfo;
>>> import org.apache.flink.configuration.Configuration;
>>> import org.apache.flink.streaming.api.TimeCharacteristic;
>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>> import
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import
>>> org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
>>> import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
>>> import org.apache.flink.streaming.api.windowing.time.Time;
>>> import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
>>> import org.apache.flink.streaming.api.windowing.triggers.Trigger;
>>> import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
>>> import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
>>> import org.apache.flink.types.Row;
>>> import org.apache.flink.util.Collector;
>>>
>>> public class Test {
>>>   public static void main(String[] args) throws Exception {
>>>     StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>     env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>>>     env.setParallelism(1);
>>>
>>>     final BasicTypeInfo<Double> columnType =
>>> BasicTypeInfo.DOUBLE_TYPE_INFO;
>>>     final DataStream<Row> input = env.fromElements(//
>>>         Row.of(1.0), //
>>>         Row.of(2.0), //
>>>         Row.of(3.0), //
>>>         Row.of(5.0), //
>>>         Row.of(6.0)).returns(new RowTypeInfo(columnType));
>>>     final DataStream<Row> out = input.map(new
>>> SubtaskIndexAssigner(columnType))//
>>>         .keyBy(t -> t.f0)//
>>>         .window(GlobalWindows.create())
>>>         .trigger(PurgingTrigger.of(new
>>> CountWithTimeoutTriggerPartition(Time.seconds(5), 100L)))
>>>         .process(new ProcessWindowFunction<Tuple2<Integer, Row>, Row,
>>> Integer, GlobalWindow>() {
>>>
>>>           @Override
>>>           public void process(Integer key,
>>>               ProcessWindowFunction<Tuple2<Integer, Row>, Row, Integer,
>>> GlobalWindow>.Context context,
>>>               Iterable<Tuple2<Integer, Row>> it, Collector<Row> out)
>>> throws Exception {
>>>             for (Tuple2<Integer, Row> tuple : it) {
>>>               out.collect(Row.of(tuple.f1.getField(0).toString()));
>>>             }
>>>
>>>           }
>>>         }).returns(new RowTypeInfo(BasicTypeInfo.STRING_TYPE_INFO));
>>>     out.writeUsingOutputFormat(new PrintingOutputFormat<Row>());
>>>     env.execute();
>>>   }
>>>
>>>   private static final class SubtaskIndexAssigner extends
>>> RichMapFunction<Row, Tuple2<Integer, Row>>
>>>       implements ResultTypeQueryable<Tuple2<Integer, Row>> {
>>>     private static final long serialVersionUID = 1L;
>>>
>>>     private int myTaskId;
>>>     private TypeInformation<?> columnType;
>>>
>>>     public SubtaskIndexAssigner(TypeInformation<?> columnType) {
>>>       this.columnType = columnType;
>>>     }
>>>
>>>     @Override
>>>     public void open(Configuration parameters) throws Exception {
>>>       this.myTaskId = getRuntimeContext().getIndexOfThisSubtask();
>>>     }
>>>
>>>     @Override
>>>     public Tuple2<Integer, Row> map(Row row) throws Exception {
>>>       return Tuple2.of(myTaskId, row);
>>>     }
>>>
>>>     @Override
>>>     public TypeInformation<Tuple2<Integer, Row>> getProducedType() {
>>>       return new TupleTypeInfo<Tuple2<Integer,
>>> Row>>(BasicTypeInfo.INT_TYPE_INFO,
>>>           new RowTypeInfo(columnType));
>>>     }
>>>   }
>>>
>>>   private static class CountWithTimeoutTriggerPartition
>>>       extends Trigger<Tuple2<Integer, Row>, GlobalWindow> {
>>>
>>>     private static final long serialVersionUID = 1L;
>>>     private final long maxCount;
>>>     private final long maxTime;
>>>
>>>     private final ReducingStateDescriptor<Long> countstateDesc =
>>>         new ReducingStateDescriptor<>("count", new Sum(),
>>> LongSerializer.INSTANCE);
>>>     private final ReducingStateDescriptor<Long> timestateDesc =
>>>         new ReducingStateDescriptor<>("fire-time", new Min(),
>>> LongSerializer.INSTANCE);
>>>
>>>     public CountWithTimeoutTriggerPartition(long maxTime, long maxCount)
>>> {
>>>       this.maxCount = maxCount;
>>>       this.maxTime = maxTime;
>>>     }
>>>
>>>     public CountWithTimeoutTriggerPartition(Time maxTime, long maxCount)
>>> {
>>>       this(maxTime.toMilliseconds(), maxCount);
>>>     }
>>>
>>>     @Override
>>>     public TriggerResult onElement(Tuple2<Integer, Row> element, long
>>> timestamp,
>>>         GlobalWindow window,
>>>
>>> org.apache.flink.streaming.api.windowing.triggers.Trigger.TriggerContext
>>> ctx)
>>>         throws Exception {
>>>
>>>       ReducingState<Long> fireTimestamp =
>>> ctx.getPartitionedState(timestateDesc);
>>>
>>>       timestamp = ctx.getCurrentProcessingTime();
>>>
>>>       if (fireTimestamp.get() == null) {
>>>         long start = timestamp - (timestamp % maxTime);
>>>         long nextFireTimestamp = start + maxTime;
>>>
>>>         ctx.registerProcessingTimeTimer(nextFireTimestamp);
>>>
>>>         fireTimestamp.add(nextFireTimestamp);
>>>         return TriggerResult.CONTINUE;
>>>       }
>>>       ReducingState<Long> count =
>>> ctx.getPartitionedState(countstateDesc);
>>>       count.add(1L);
>>>       if (count.get() >= maxCount) {
>>>         count.clear();
>>>         fireTimestamp.clear();
>>>         return TriggerResult.FIRE_AND_PURGE;
>>>       }
>>>       return TriggerResult.CONTINUE;
>>>     }
>>>
>>>     @Override
>>>     public TriggerResult onProcessingTime(long time, GlobalWindow
>>> window, TriggerContext ctx)
>>>         throws Exception {
>>>       ReducingState<Long> fireTimestamp =
>>> ctx.getPartitionedState(timestateDesc);
>>>       ReducingState<Long> count =
>>> ctx.getPartitionedState(countstateDesc);
>>>       if (fireTimestamp.get().equals(time)) {
>>>         count.clear();
>>>         fireTimestamp.clear();
>>>         fireTimestamp.add(time + maxTime);
>>>         ctx.registerProcessingTimeTimer(time + maxTime);
>>>         return TriggerResult.FIRE_AND_PURGE;
>>>       }
>>>       return TriggerResult.CONTINUE;
>>>     }
>>>
>>>     @Override
>>>     public TriggerResult onEventTime(@SuppressWarnings("unused") long
>>> time,
>>>         @SuppressWarnings("unused") GlobalWindow window,
>>>         @SuppressWarnings("unused") TriggerContext ctx) throws Exception
>>> {
>>>       return TriggerResult.CONTINUE;
>>>     }
>>>
>>>     @Override
>>>     public void clear(GlobalWindow window, TriggerContext ctx) throws
>>> Exception {
>>>       ReducingState<Long> fireTimestamp =
>>> ctx.getPartitionedState(timestateDesc);
>>>       long timestamp = fireTimestamp.get();
>>>       ctx.deleteProcessingTimeTimer(timestamp);
>>>       fireTimestamp.clear();
>>>       ctx.getPartitionedState(countstateDesc).clear();
>>>     }
>>>
>>>     @Override
>>>     public boolean canMerge() {
>>>       return true;
>>>     }
>>>
>>>     @Override
>>>     public void onMerge(GlobalWindow window, OnMergeContext ctx) {
>>>       ctx.mergePartitionedState(countstateDesc);
>>>       ctx.mergePartitionedState(timestateDesc);
>>>     }
>>>
>>>     class Sum implements ReduceFunction<Long> {
>>>       private static final long serialVersionUID = 1L;
>>>
>>>       @Override
>>>       public Long reduce(Long value1, Long value2) throws Exception {
>>>         return value1 + value2;
>>>       }
>>>     }
>>>
>>>     class Min implements ReduceFunction<Long> {
>>>       private static final long serialVersionUID = 1L;
>>>
>>>       @Override
>>>       public Long reduce(Long value1, Long value2) throws Exception {
>>>         return Math.min(value1, value2);
>>>       }
>>>     }
>>>   }
>>>
>>> }
>>>
>>

Reply via email to