Re: Is there a way to use stream API with this program?

2020-07-28 Thread David Anderson
MAX_WATERMARK is emitted by ContinuousFileReaderOperator and StreamSource
when they close.

I think you'll find this just works without your having to do anything to
make it happen.

David

On Tue, Jul 28, 2020 at 8:07 AM Piotr Nowojski  wrote:

> MAX_WATERMARK should be emitted automatically by the
> WatermarkAssignerOperator.
>
> Piotrek
>
> pon., 27 lip 2020 o 09:16 Flavio Pompermaier 
> napisał(a):
>
>> Yes it could..where should I emit the MAX_WATERMARK and how do I detect
>> that the input reached its end?
>>
>> On Sat, Jul 25, 2020 at 8:08 PM David Anderson 
>> wrote:
>>
>>> In this use case, couldn't the custom trigger register an event time
>>> timer for MAX_WATERMARK, which would be triggered when the bounded input
>>> reaches its end?
>>>
>>> David
>>>
>>> On Mon, Jul 20, 2020 at 5:47 PM Piotr Nowojski 
>>> wrote:
>>>
 Hi,

 I'm afraid that there is not out of the box way of doing this. I've
 created a ticket [1] to write down and document a discussion that we had
 about this issue in the past.

 The issue is that currently, untriggered processing time timers are
 ignored on end of input and it seems like there might be no one single
 perfect way to handle it for all of the cases, but it probably needs to be
 customized.

 Maybe you could:
 1. extend `WindowOperator`  (`MyWindowOperator`)
 2. implement
 `org.apache.flink.streaming.api.operators.BoundedOneInput#endInput` in your
 `MyWindowOperator`
 3. Inside `MyWindowOperator#endInput`  invoke
 `internalTimerService.forEachProcessingTimeTimer(...)` and:
   a) manually trigger timers `WindowOperator#onProcessingTime`
   b) delete manually triggered timer

 Piotrek

 [1] https://issues.apache.org/jira/browse/FLINK-18647

 pt., 17 lip 2020 o 10:30 Flavio Pompermaier 
 napisał(a):

> Hi to all,
> I was trying to port another job we have that use dataset API to
> datastream.
> The legacy program was doing basically a
> dataset.mapPartition().reduce() so I tried to replicate this thing with a
>
>  final BasicTypeInfo columnType =
> BasicTypeInfo.DOUBLE_TYPE_INFO;
>   final DataStream input = env.fromElements(//
> Row.of(1.0), //
> Row.of(2.0), //
> Row.of(3.0), //
> Row.of(5.0), //
> Row.of(6.0)).returns(new RowTypeInfo(columnType));
>  inputStream.map(new SubtaskIndexAssigner(columnType))
> .keyBy(t -> t.f0)
> .window(GlobalWindows.create())
>
> .trigger(PurgingTrigger.of(CountWithTimeoutTriggerPartition.of(Time.seconds(5),
> 100L))).
> .process(..)
>
> Unfortunately the program exits before reaching the Process function
> (moreover I need to add another window + trigger after it before adding 
> the
> reduce function).
> Is there a way to do this with the DataStream API or should I still
> use DataSet API for the moment (when the batch will be fully supported)? I
> append to the footer all the code required to test the job.
>
> Best,
> Flavio
>
> -
>
> package org.apache.flink.stats.sketches;
>
> import org.apache.flink.api.common.functions.ReduceFunction;
> import org.apache.flink.api.common.functions.RichMapFunction;
> import org.apache.flink.api.common.state.ReducingState;
> import org.apache.flink.api.common.state.ReducingStateDescriptor;
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeutils.base.LongSerializer;
> import org.apache.flink.api.java.io.PrintingOutputFormat;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> import org.apache.flink.api.java.typeutils.TupleTypeInfo;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
> org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
> import
> org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import
> org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
> import org.apache.flink.streaming.api.windowing.triggers.Trigger;
> import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
> import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
> import org.apache.flink.types.Row;
> import org.apache.flink.util.Collector;

Re: Is there a way to use stream API with this program?

2020-07-28 Thread Piotr Nowojski
MAX_WATERMARK should be emitted automatically by the
WatermarkAssignerOperator.

Piotrek

pon., 27 lip 2020 o 09:16 Flavio Pompermaier 
napisał(a):

> Yes it could..where should I emit the MAX_WATERMARK and how do I detect
> that the input reached its end?
>
> On Sat, Jul 25, 2020 at 8:08 PM David Anderson 
> wrote:
>
>> In this use case, couldn't the custom trigger register an event time
>> timer for MAX_WATERMARK, which would be triggered when the bounded input
>> reaches its end?
>>
>> David
>>
>> On Mon, Jul 20, 2020 at 5:47 PM Piotr Nowojski 
>> wrote:
>>
>>> Hi,
>>>
>>> I'm afraid that there is not out of the box way of doing this. I've
>>> created a ticket [1] to write down and document a discussion that we had
>>> about this issue in the past.
>>>
>>> The issue is that currently, untriggered processing time timers are
>>> ignored on end of input and it seems like there might be no one single
>>> perfect way to handle it for all of the cases, but it probably needs to be
>>> customized.
>>>
>>> Maybe you could:
>>> 1. extend `WindowOperator`  (`MyWindowOperator`)
>>> 2. implement
>>> `org.apache.flink.streaming.api.operators.BoundedOneInput#endInput` in your
>>> `MyWindowOperator`
>>> 3. Inside `MyWindowOperator#endInput`  invoke
>>> `internalTimerService.forEachProcessingTimeTimer(...)` and:
>>>   a) manually trigger timers `WindowOperator#onProcessingTime`
>>>   b) delete manually triggered timer
>>>
>>> Piotrek
>>>
>>> [1] https://issues.apache.org/jira/browse/FLINK-18647
>>>
>>> pt., 17 lip 2020 o 10:30 Flavio Pompermaier 
>>> napisał(a):
>>>
 Hi to all,
 I was trying to port another job we have that use dataset API to
 datastream.
 The legacy program was doing basically a
 dataset.mapPartition().reduce() so I tried to replicate this thing with a

  final BasicTypeInfo columnType =
 BasicTypeInfo.DOUBLE_TYPE_INFO;
   final DataStream input = env.fromElements(//
 Row.of(1.0), //
 Row.of(2.0), //
 Row.of(3.0), //
 Row.of(5.0), //
 Row.of(6.0)).returns(new RowTypeInfo(columnType));
  inputStream.map(new SubtaskIndexAssigner(columnType))
 .keyBy(t -> t.f0)
 .window(GlobalWindows.create())

 .trigger(PurgingTrigger.of(CountWithTimeoutTriggerPartition.of(Time.seconds(5),
 100L))).
 .process(..)

 Unfortunately the program exits before reaching the Process function
 (moreover I need to add another window + trigger after it before adding the
 reduce function).
 Is there a way to do this with the DataStream API or should I still use
 DataSet API for the moment (when the batch will be fully supported)? I
 append to the footer all the code required to test the job.

 Best,
 Flavio

 -

 package org.apache.flink.stats.sketches;

 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.api.java.io.PrintingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import
 org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import
 org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
 import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
 import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.Collector;

 public class Test {
   public static void main(String[] args) throws Exception {
 StreamExecutionEnvironment env =
 StreamExecutionEnvironment.getExecutionEnvironment();
 env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 env.setParallelism(1);

 final BasicTypeInfo columnType =
 

Re: Is there a way to use stream API with this program?

2020-07-27 Thread Flavio Pompermaier
Yes it could..where should I emit the MAX_WATERMARK and how do I detect
that the input reached its end?

On Sat, Jul 25, 2020 at 8:08 PM David Anderson 
wrote:

> In this use case, couldn't the custom trigger register an event time timer
> for MAX_WATERMARK, which would be triggered when the bounded input reaches
> its end?
>
> David
>
> On Mon, Jul 20, 2020 at 5:47 PM Piotr Nowojski 
> wrote:
>
>> Hi,
>>
>> I'm afraid that there is not out of the box way of doing this. I've
>> created a ticket [1] to write down and document a discussion that we had
>> about this issue in the past.
>>
>> The issue is that currently, untriggered processing time timers are
>> ignored on end of input and it seems like there might be no one single
>> perfect way to handle it for all of the cases, but it probably needs to be
>> customized.
>>
>> Maybe you could:
>> 1. extend `WindowOperator`  (`MyWindowOperator`)
>> 2. implement
>> `org.apache.flink.streaming.api.operators.BoundedOneInput#endInput` in your
>> `MyWindowOperator`
>> 3. Inside `MyWindowOperator#endInput`  invoke
>> `internalTimerService.forEachProcessingTimeTimer(...)` and:
>>   a) manually trigger timers `WindowOperator#onProcessingTime`
>>   b) delete manually triggered timer
>>
>> Piotrek
>>
>> [1] https://issues.apache.org/jira/browse/FLINK-18647
>>
>> pt., 17 lip 2020 o 10:30 Flavio Pompermaier 
>> napisał(a):
>>
>>> Hi to all,
>>> I was trying to port another job we have that use dataset API to
>>> datastream.
>>> The legacy program was doing basically a dataset.mapPartition().reduce()
>>> so I tried to replicate this thing with a
>>>
>>>  final BasicTypeInfo columnType = BasicTypeInfo.DOUBLE_TYPE_INFO;
>>>   final DataStream input = env.fromElements(//
>>> Row.of(1.0), //
>>> Row.of(2.0), //
>>> Row.of(3.0), //
>>> Row.of(5.0), //
>>> Row.of(6.0)).returns(new RowTypeInfo(columnType));
>>>  inputStream.map(new SubtaskIndexAssigner(columnType))
>>> .keyBy(t -> t.f0)
>>> .window(GlobalWindows.create())
>>>
>>> .trigger(PurgingTrigger.of(CountWithTimeoutTriggerPartition.of(Time.seconds(5),
>>> 100L))).
>>> .process(..)
>>>
>>> Unfortunately the program exits before reaching the Process function
>>> (moreover I need to add another window + trigger after it before adding the
>>> reduce function).
>>> Is there a way to do this with the DataStream API or should I still use
>>> DataSet API for the moment (when the batch will be fully supported)? I
>>> append to the footer all the code required to test the job.
>>>
>>> Best,
>>> Flavio
>>>
>>> -
>>>
>>> package org.apache.flink.stats.sketches;
>>>
>>> import org.apache.flink.api.common.functions.ReduceFunction;
>>> import org.apache.flink.api.common.functions.RichMapFunction;
>>> import org.apache.flink.api.common.state.ReducingState;
>>> import org.apache.flink.api.common.state.ReducingStateDescriptor;
>>> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
>>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>>> import org.apache.flink.api.common.typeutils.base.LongSerializer;
>>> import org.apache.flink.api.java.io.PrintingOutputFormat;
>>> import org.apache.flink.api.java.tuple.Tuple2;
>>> import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
>>> import org.apache.flink.api.java.typeutils.RowTypeInfo;
>>> import org.apache.flink.api.java.typeutils.TupleTypeInfo;
>>> import org.apache.flink.configuration.Configuration;
>>> import org.apache.flink.streaming.api.TimeCharacteristic;
>>> import org.apache.flink.streaming.api.datastream.DataStream;
>>> import
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import
>>> org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
>>> import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
>>> import org.apache.flink.streaming.api.windowing.time.Time;
>>> import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
>>> import org.apache.flink.streaming.api.windowing.triggers.Trigger;
>>> import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
>>> import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
>>> import org.apache.flink.types.Row;
>>> import org.apache.flink.util.Collector;
>>>
>>> public class Test {
>>>   public static void main(String[] args) throws Exception {
>>> StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>>> env.setParallelism(1);
>>>
>>> final BasicTypeInfo columnType =
>>> BasicTypeInfo.DOUBLE_TYPE_INFO;
>>> final DataStream input = env.fromElements(//
>>> Row.of(1.0), //
>>> Row.of(2.0), //
>>> Row.of(3.0), //
>>> Row.of(5.0), //
>>> Row.of(6.0)).returns(new RowTypeInfo(columnType));
>>> final DataStream out = 

Re: Is there a way to use stream API with this program?

2020-07-25 Thread David Anderson
In this use case, couldn't the custom trigger register an event time timer
for MAX_WATERMARK, which would be triggered when the bounded input reaches
its end?

David

On Mon, Jul 20, 2020 at 5:47 PM Piotr Nowojski  wrote:

> Hi,
>
> I'm afraid that there is not out of the box way of doing this. I've
> created a ticket [1] to write down and document a discussion that we had
> about this issue in the past.
>
> The issue is that currently, untriggered processing time timers are
> ignored on end of input and it seems like there might be no one single
> perfect way to handle it for all of the cases, but it probably needs to be
> customized.
>
> Maybe you could:
> 1. extend `WindowOperator`  (`MyWindowOperator`)
> 2. implement
> `org.apache.flink.streaming.api.operators.BoundedOneInput#endInput` in your
> `MyWindowOperator`
> 3. Inside `MyWindowOperator#endInput`  invoke
> `internalTimerService.forEachProcessingTimeTimer(...)` and:
>   a) manually trigger timers `WindowOperator#onProcessingTime`
>   b) delete manually triggered timer
>
> Piotrek
>
> [1] https://issues.apache.org/jira/browse/FLINK-18647
>
> pt., 17 lip 2020 o 10:30 Flavio Pompermaier 
> napisał(a):
>
>> Hi to all,
>> I was trying to port another job we have that use dataset API to
>> datastream.
>> The legacy program was doing basically a dataset.mapPartition().reduce()
>> so I tried to replicate this thing with a
>>
>>  final BasicTypeInfo columnType = BasicTypeInfo.DOUBLE_TYPE_INFO;
>>   final DataStream input = env.fromElements(//
>> Row.of(1.0), //
>> Row.of(2.0), //
>> Row.of(3.0), //
>> Row.of(5.0), //
>> Row.of(6.0)).returns(new RowTypeInfo(columnType));
>>  inputStream.map(new SubtaskIndexAssigner(columnType))
>> .keyBy(t -> t.f0)
>> .window(GlobalWindows.create())
>>
>> .trigger(PurgingTrigger.of(CountWithTimeoutTriggerPartition.of(Time.seconds(5),
>> 100L))).
>> .process(..)
>>
>> Unfortunately the program exits before reaching the Process function
>> (moreover I need to add another window + trigger after it before adding the
>> reduce function).
>> Is there a way to do this with the DataStream API or should I still use
>> DataSet API for the moment (when the batch will be fully supported)? I
>> append to the footer all the code required to test the job.
>>
>> Best,
>> Flavio
>>
>> -
>>
>> package org.apache.flink.stats.sketches;
>>
>> import org.apache.flink.api.common.functions.ReduceFunction;
>> import org.apache.flink.api.common.functions.RichMapFunction;
>> import org.apache.flink.api.common.state.ReducingState;
>> import org.apache.flink.api.common.state.ReducingStateDescriptor;
>> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
>> import org.apache.flink.api.common.typeinfo.TypeInformation;
>> import org.apache.flink.api.common.typeutils.base.LongSerializer;
>> import org.apache.flink.api.java.io.PrintingOutputFormat;
>> import org.apache.flink.api.java.tuple.Tuple2;
>> import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
>> import org.apache.flink.api.java.typeutils.RowTypeInfo;
>> import org.apache.flink.api.java.typeutils.TupleTypeInfo;
>> import org.apache.flink.configuration.Configuration;
>> import org.apache.flink.streaming.api.TimeCharacteristic;
>> import org.apache.flink.streaming.api.datastream.DataStream;
>> import
>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import
>> org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
>> import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
>> import org.apache.flink.streaming.api.windowing.time.Time;
>> import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
>> import org.apache.flink.streaming.api.windowing.triggers.Trigger;
>> import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
>> import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
>> import org.apache.flink.types.Row;
>> import org.apache.flink.util.Collector;
>>
>> public class Test {
>>   public static void main(String[] args) throws Exception {
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
>> env.setParallelism(1);
>>
>> final BasicTypeInfo columnType =
>> BasicTypeInfo.DOUBLE_TYPE_INFO;
>> final DataStream input = env.fromElements(//
>> Row.of(1.0), //
>> Row.of(2.0), //
>> Row.of(3.0), //
>> Row.of(5.0), //
>> Row.of(6.0)).returns(new RowTypeInfo(columnType));
>> final DataStream out = input.map(new
>> SubtaskIndexAssigner(columnType))//
>> .keyBy(t -> t.f0)//
>> .window(GlobalWindows.create())
>> .trigger(PurgingTrigger.of(new
>> CountWithTimeoutTriggerPartition(Time.seconds(5), 100L)))
>> .process(new ProcessWindowFunction, Row,
>> 

Re: Is there a way to use stream API with this program?

2020-07-20 Thread Piotr Nowojski
Hi,

I'm afraid that there is not out of the box way of doing this. I've created
a ticket [1] to write down and document a discussion that we had about this
issue in the past.

The issue is that currently, untriggered processing time timers are ignored
on end of input and it seems like there might be no one single perfect way
to handle it for all of the cases, but it probably needs to be customized.

Maybe you could:
1. extend `WindowOperator`  (`MyWindowOperator`)
2. implement
`org.apache.flink.streaming.api.operators.BoundedOneInput#endInput` in your
`MyWindowOperator`
3. Inside `MyWindowOperator#endInput`  invoke
`internalTimerService.forEachProcessingTimeTimer(...)` and:
  a) manually trigger timers `WindowOperator#onProcessingTime`
  b) delete manually triggered timer

Piotrek

[1] https://issues.apache.org/jira/browse/FLINK-18647

pt., 17 lip 2020 o 10:30 Flavio Pompermaier 
napisał(a):

> Hi to all,
> I was trying to port another job we have that use dataset API to
> datastream.
> The legacy program was doing basically a dataset.mapPartition().reduce()
> so I tried to replicate this thing with a
>
>  final BasicTypeInfo columnType = BasicTypeInfo.DOUBLE_TYPE_INFO;
>   final DataStream input = env.fromElements(//
> Row.of(1.0), //
> Row.of(2.0), //
> Row.of(3.0), //
> Row.of(5.0), //
> Row.of(6.0)).returns(new RowTypeInfo(columnType));
>  inputStream.map(new SubtaskIndexAssigner(columnType))
> .keyBy(t -> t.f0)
> .window(GlobalWindows.create())
>
> .trigger(PurgingTrigger.of(CountWithTimeoutTriggerPartition.of(Time.seconds(5),
> 100L))).
> .process(..)
>
> Unfortunately the program exits before reaching the Process function
> (moreover I need to add another window + trigger after it before adding the
> reduce function).
> Is there a way to do this with the DataStream API or should I still use
> DataSet API for the moment (when the batch will be fully supported)? I
> append to the footer all the code required to test the job.
>
> Best,
> Flavio
>
> -
>
> package org.apache.flink.stats.sketches;
>
> import org.apache.flink.api.common.functions.ReduceFunction;
> import org.apache.flink.api.common.functions.RichMapFunction;
> import org.apache.flink.api.common.state.ReducingState;
> import org.apache.flink.api.common.state.ReducingStateDescriptor;
> import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
> import org.apache.flink.api.common.typeinfo.TypeInformation;
> import org.apache.flink.api.common.typeutils.base.LongSerializer;
> import org.apache.flink.api.java.io.PrintingOutputFormat;
> import org.apache.flink.api.java.tuple.Tuple2;
> import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
> import org.apache.flink.api.java.typeutils.RowTypeInfo;
> import org.apache.flink.api.java.typeutils.TupleTypeInfo;
> import org.apache.flink.configuration.Configuration;
> import org.apache.flink.streaming.api.TimeCharacteristic;
> import org.apache.flink.streaming.api.datastream.DataStream;
> import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import
> org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
> import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
> import org.apache.flink.streaming.api.windowing.time.Time;
> import org.apache.flink.streaming.api.windowing.triggers.PurgingTrigger;
> import org.apache.flink.streaming.api.windowing.triggers.Trigger;
> import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
> import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
> import org.apache.flink.types.Row;
> import org.apache.flink.util.Collector;
>
> public class Test {
>   public static void main(String[] args) throws Exception {
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
> env.setParallelism(1);
>
> final BasicTypeInfo columnType =
> BasicTypeInfo.DOUBLE_TYPE_INFO;
> final DataStream input = env.fromElements(//
> Row.of(1.0), //
> Row.of(2.0), //
> Row.of(3.0), //
> Row.of(5.0), //
> Row.of(6.0)).returns(new RowTypeInfo(columnType));
> final DataStream out = input.map(new
> SubtaskIndexAssigner(columnType))//
> .keyBy(t -> t.f0)//
> .window(GlobalWindows.create())
> .trigger(PurgingTrigger.of(new
> CountWithTimeoutTriggerPartition(Time.seconds(5), 100L)))
> .process(new ProcessWindowFunction, Row,
> Integer, GlobalWindow>() {
>
>   @Override
>   public void process(Integer key,
>   ProcessWindowFunction, Row, Integer,
> GlobalWindow>.Context context,
>   Iterable> it, Collector out)
> throws Exception {
> for (Tuple2 tuple : it) {
>   out.collect(Row.of(tuple.f1.getField(0).toString()));
>