[ 
https://issues.apache.org/jira/browse/FLINK-23011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364191#comment-17364191
 ] 

Piotr Nowojski edited comment on FLINK-23011 at 6/16/21, 11:27 AM:
-------------------------------------------------------------------

Even after fixing FLINK-22926 the problem will persist, as there still would be 
a race condition between registering splits/switching to ACTIVE and emitting 
watermarks between different subtasks.

Hotfix might be to force FLIP-27 source to start ACTIVE (as the legacy sources 
are doing). However this doesn't work if there will be more then one split 
assigned to single {{SourceReader}}, and there can be a delay between assigning 
first and second split.

The problem is that if source doesn't know about some splits, because they 
haven't been yet assigned (or even discovered), it can not know what watermarks 
would result from those unknown splits. This gets more visible if you think 
about some source, where discovering splits takes long time, for example some 
{{FileSource}}.

I think the proper solution should be something like {{SplitEnumerator}} 
emitting it's own watermarks, that would be capping/combined with the 
watermarks emitted from the sources.


was (Author: pnowojski):
Even after fixing FLINK-22926 the problem will persist, as there still would be 
a race condition between registering splits/switching to ACTIVE and emitting 
watermarks between different subtasks.

Hotfix might be to force FLIP-27 source to start ACTIVE (as the legacy sources 
are doing). However this doesn't work if there will be more then one split 
assigned to single {{SourceReader}}, and there can be a delay between assigning 
first and second split.

The problem is that if source doesn't know about some splits, because they 
haven't been yet assigned (or even discovered), it can not know what watermarks 
would result from those unknown splits. This gets more visible if you think 
about some source, where discovering splits takes long time, for example some 
{{FileSource}}.

I think the proper solution should be something like {{SplitEnumerator}} 
emitting it's own watermarks, that would be combined with the watermarks 
emitted from the sources.

> FLIP-27 sources are generating non-deterministic results when using event time
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-23011
>                 URL: https://issues.apache.org/jira/browse/FLINK-23011
>             Project: Flink
>          Issue Type: New Feature
>          Components: API / DataStream
>    Affects Versions: 1.14.0, 1.13.1, 1.12.4
>         Environment: 
>            Reporter: Piotr Nowojski
>            Priority: Critical
>
> FLIP-27 sources currently start in the {{StreamStatus.IDLE}} state and they 
> switch to {{ACTIVE}} only after emitting first {{Watermark}}. Until this 
> happens, downstream operators are ignoring {{IDLE}} inputs from calculating 
> the input (min) watermark. 
> An extreme example to what problem this leads to, are completely bogus 
> results if for example one FLIP-27 source subtask is slower than others for 
> some reason:
> {code:java}
> env.getConfig().setAutoWatermarkInterval(2000);
> env.setParallelism(2);
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
> 10));
> DataStream<Long> eventStream =
>         env.fromSource(
>                         new NumberSequenceSource(0, Long.MAX_VALUE),
>                         WatermarkStrategy.<Long>forMonotonousTimestamps()
>                                 .withTimestampAssigner(new 
> LongTimestampAssigner()),
>                         "NumberSequenceSource")
>                 .map(
>                         new RichMapFunction<Long, Long>() {
>                             @Override
>                             public Long map(Long value) throws Exception {
>                                 if 
> (getRuntimeContext().getIndexOfThisSubtask() == 0) {
>                                     Thread.sleep(1);
>                                 }
>                                 return 1L;
>                             }
>                         });
> eventStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).sum(0).print();
> (...)
> private static class LongTimestampAssigner implements 
> SerializableTimestampAssigner<Long> {
>     private long counter = 0;
>     @Override
>     public long extractTimestamp(Long record, long recordTimeStamp) {
>         return counter++;
>     }
> }
> {code}
> In such case, after 2 seconds ({{setAutoWatermarkInterval}}) the not 
> throttled subtask (subTaskId == 1) generates very high watermarks. The other 
> source subtask (subTaskId == 0) emits very low watermarks. If the non 
> throttled watermark reaches the downstream {{WindowOperator}} first, while 
> the other input channel is still idle, it will take those high watermarks as 
> combined input watermark for the the whole {{WindowOperator}}. When the input 
> channel from the throttled source subtask finally receives it's {{ACTIVE}} 
> status and a much lower watermark, that's already too late.
> Actual output of the example program:
> {noformat}
> 1596
> 2000
> 1000
> 1000
> 1000
> 1000
> 1000
> 1000
> (...)
> {noformat}
> while the expected output should be always "2000" (2000 records fitting in 
> every 1 second global window)
> {noformat}
> 2000
> 2000
> 2000
> 2000
> (...)
> {noformat}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to