[ 
https://issues.apache.org/jira/browse/FLINK-23011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17364252#comment-17364252
 ] 

Piotr Nowojski edited comment on FLINK-23011 at 6/16/21, 11:36 AM:
-------------------------------------------------------------------

[~AHeise] suggested that as a hotfix we could block emitting 
watermarks/switching to {{IDLE}} until all splits are assigned 
({{SourceReader#notifyNoMoreSplits}}). This would be a very special case of the 
more general solution that I was proposing with {{SplitEnumerator}} capping the 
watermarks. This hotfix should work well in the cases where number of splits is 
determined/known from the beginning.


was (Author: pnowojski):
[~AHeise] suggested that as a hotfix we could block emitting 
watermarks/switching to {{IDLE}} until all splits are assigned 
({{SourceReader#notifyNoMoreSplits}}). This would be a very special case of the 
more general solution that I was proposing with {{SplitEnumerator}} capping the 
watermarks, that should work well in the cases where number of splits is 
determined/known from the beginning.

> FLIP-27 sources are generating non-deterministic results when using event time
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-23011
>                 URL: https://issues.apache.org/jira/browse/FLINK-23011
>             Project: Flink
>          Issue Type: New Feature
>          Components: API / DataStream
>    Affects Versions: 1.14.0, 1.13.1, 1.12.4
>         Environment: 
>            Reporter: Piotr Nowojski
>            Priority: Critical
>
> FLIP-27 sources currently start in the {{StreamStatus.IDLE}} state and they 
> switch to {{ACTIVE}} only after emitting first {{Watermark}}. Until this 
> happens, downstream operators are ignoring {{IDLE}} inputs from calculating 
> the input (min) watermark. 
> An extreme example to what problem this leads to, are completely bogus 
> results if for example one FLIP-27 source subtask is slower than others for 
> some reason:
> {code:java}
> env.getConfig().setAutoWatermarkInterval(2000);
> env.setParallelism(2);
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
> 10));
> DataStream<Long> eventStream =
>         env.fromSource(
>                         new NumberSequenceSource(0, Long.MAX_VALUE),
>                         WatermarkStrategy.<Long>forMonotonousTimestamps()
>                                 .withTimestampAssigner(new 
> LongTimestampAssigner()),
>                         "NumberSequenceSource")
>                 .map(
>                         new RichMapFunction<Long, Long>() {
>                             @Override
>                             public Long map(Long value) throws Exception {
>                                 if 
> (getRuntimeContext().getIndexOfThisSubtask() == 0) {
>                                     Thread.sleep(1);
>                                 }
>                                 return 1L;
>                             }
>                         });
> eventStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).sum(0).print();
> (...)
> private static class LongTimestampAssigner implements 
> SerializableTimestampAssigner<Long> {
>     private long counter = 0;
>     @Override
>     public long extractTimestamp(Long record, long recordTimeStamp) {
>         return counter++;
>     }
> }
> {code}
> In such case, after 2 seconds ({{setAutoWatermarkInterval}}) the not 
> throttled subtask (subTaskId == 1) generates very high watermarks. The other 
> source subtask (subTaskId == 0) emits very low watermarks. If the non 
> throttled watermark reaches the downstream {{WindowOperator}} first, while 
> the other input channel is still idle, it will take those high watermarks as 
> combined input watermark for the the whole {{WindowOperator}}. When the input 
> channel from the throttled source subtask finally receives it's {{ACTIVE}} 
> status and a much lower watermark, that's already too late.
> Actual output of the example program:
> {noformat}
> 1596
> 2000
> 1000
> 1000
> 1000
> 1000
> 1000
> 1000
> (...)
> {noformat}
> while the expected output should be always "2000" (2000 records fitting in 
> every 1 second global window)
> {noformat}
> 2000
> 2000
> 2000
> 2000
> (...)
> {noformat}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to