[ 
https://issues.apache.org/jira/browse/FLINK-23011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364245#comment-17364245
 ] 

Piotr Nowojski commented on FLINK-23011:
----------------------------------------

Let me elaborate on {{FileSource}} example. Let's say we have a bucketed file 
source where, split equals to a single file, with buckets (directories?) 
created per each hour. New files/buckets can be appearing as you go. 

If bucket for [12:00, 13:00) is committed, {{SplitEnumerator}} could emit 
capped watermark for 13:00, and the already assigned splits will be bumping the 
current watermarks until this cap, as the splits are being read. Then next 
bucket [13:00, 14:00) is created, {{SourceReaders}} could even start reading 
files from this bucket before it’s fully committed, but the watermark cap is 
bumped only when this next bucket is committed.

It's also important in this case to automatically switch {{SourceReader}} to 
idle if they don't have assigned splits. As for whatever the reason, for some 
buckets there can be fewer splits than parallel instances of the 
{{SourceReader}}s. In this case you need idleness to make progress.

> FLIP-27 sources are generating non-deterministic results when using event time
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-23011
>                 URL: https://issues.apache.org/jira/browse/FLINK-23011
>             Project: Flink
>          Issue Type: New Feature
>          Components: API / DataStream
>    Affects Versions: 1.14.0, 1.13.1, 1.12.4
>         Environment: 
>            Reporter: Piotr Nowojski
>            Priority: Critical
>
> FLIP-27 sources currently start in the {{StreamStatus.IDLE}} state and they 
> switch to {{ACTIVE}} only after emitting first {{Watermark}}. Until this 
> happens, downstream operators are ignoring {{IDLE}} inputs from calculating 
> the input (min) watermark. 
> An extreme example to what problem this leads to, are completely bogus 
> results if for example one FLIP-27 source subtask is slower than others for 
> some reason:
> {code:java}
> env.getConfig().setAutoWatermarkInterval(2000);
> env.setParallelism(2);
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
> 10));
> DataStream<Long> eventStream =
>         env.fromSource(
>                         new NumberSequenceSource(0, Long.MAX_VALUE),
>                         WatermarkStrategy.<Long>forMonotonousTimestamps()
>                                 .withTimestampAssigner(new 
> LongTimestampAssigner()),
>                         "NumberSequenceSource")
>                 .map(
>                         new RichMapFunction<Long, Long>() {
>                             @Override
>                             public Long map(Long value) throws Exception {
>                                 if 
> (getRuntimeContext().getIndexOfThisSubtask() == 0) {
>                                     Thread.sleep(1);
>                                 }
>                                 return 1L;
>                             }
>                         });
> eventStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).sum(0).print();
> (...)
> private static class LongTimestampAssigner implements 
> SerializableTimestampAssigner<Long> {
>     private long counter = 0;
>     @Override
>     public long extractTimestamp(Long record, long recordTimeStamp) {
>         return counter++;
>     }
> }
> {code}
> In such case, after 2 seconds ({{setAutoWatermarkInterval}}) the not 
> throttled subtask (subTaskId == 1) generates very high watermarks. The other 
> source subtask (subTaskId == 0) emits very low watermarks. If the non 
> throttled watermark reaches the downstream {{WindowOperator}} first, while 
> the other input channel is still idle, it will take those high watermarks as 
> combined input watermark for the the whole {{WindowOperator}}. When the input 
> channel from the throttled source subtask finally receives it's {{ACTIVE}} 
> status and a much lower watermark, that's already too late.
> Actual output of the example program:
> {noformat}
> 1596
> 2000
> 1000
> 1000
> 1000
> 1000
> 1000
> 1000
> (...)
> {noformat}
> while the expected output should be always "2000" (2000 records fitting in 
> every 1 second global window)
> {noformat}
> 2000
> 2000
> 2000
> 2000
> (...)
> {noformat}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to