[ 
https://issues.apache.org/jira/browse/FLINK-23011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17367037#comment-17367037
 ] 

Dawid Wysakowicz commented on FLINK-23011:
------------------------------------------

After some investigation of FLINK-22926 I realized I accidently changed the 
behaviour for situation when there are no splits assigned. In 1.13 and older if 
there were no {{PartialWatermarks}} in {{WatermarkOutputMultiplexer}} we were 
not emitting the {{StreamStatus.IDLE}}.

I tried reverting that behaviour in: 
https://github.com/apache/flink/pull/16221/commits/9841894efcb68d3c5d72695a19ca9b26dba08ef4
I emit {{StreamStatus.IDLE}} only if there are no splits, but the {{Watermark}} 
has progressed. If it is at its initial value and there are no splits assigned 
yet, we do not emit {{StreamStatus}}. Effectively that implements the 
workaround suggested by [~AHeise].

> FLIP-27 sources are generating non-deterministic results when using event time
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-23011
>                 URL: https://issues.apache.org/jira/browse/FLINK-23011
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.14.0, 1.13.1, 1.12.4
>         Environment: 
>            Reporter: Piotr Nowojski
>            Priority: Critical
>
> FLIP-27 sources currently start in the {{StreamStatus.IDLE}} state and they 
> switch to {{ACTIVE}} only after emitting first {{Watermark}}. Until this 
> happens, downstream operators are ignoring {{IDLE}} inputs from calculating 
> the input (min) watermark. 
> An extreme example to what problem this leads to, are completely bogus 
> results if for example one FLIP-27 source subtask is slower than others for 
> some reason:
> {code:java}
> env.getConfig().setAutoWatermarkInterval(2000);
> env.setParallelism(2);
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
> 10));
> DataStream<Long> eventStream =
>         env.fromSource(
>                         new NumberSequenceSource(0, Long.MAX_VALUE),
>                         WatermarkStrategy.<Long>forMonotonousTimestamps()
>                                 .withTimestampAssigner(new 
> LongTimestampAssigner()),
>                         "NumberSequenceSource")
>                 .map(
>                         new RichMapFunction<Long, Long>() {
>                             @Override
>                             public Long map(Long value) throws Exception {
>                                 if 
> (getRuntimeContext().getIndexOfThisSubtask() == 0) {
>                                     Thread.sleep(1);
>                                 }
>                                 return 1L;
>                             }
>                         });
> eventStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).sum(0).print();
> (...)
> private static class LongTimestampAssigner implements 
> SerializableTimestampAssigner<Long> {
>     private long counter = 0;
>     @Override
>     public long extractTimestamp(Long record, long recordTimeStamp) {
>         return counter++;
>     }
> }
> {code}
> In such case, after 2 seconds ({{setAutoWatermarkInterval}}) the not 
> throttled subtask (subTaskId == 1) generates very high watermarks. The other 
> source subtask (subTaskId == 0) emits very low watermarks. If the non 
> throttled watermark reaches the downstream {{WindowOperator}} first, while 
> the other input channel is still idle, it will take those high watermarks as 
> combined input watermark for the the whole {{WindowOperator}}. When the input 
> channel from the throttled source subtask finally receives it's {{ACTIVE}} 
> status and a much lower watermark, that's already too late.
> Actual output of the example program:
> {noformat}
> 1596
> 2000
> 1000
> 1000
> 1000
> 1000
> 1000
> 1000
> (...)
> {noformat}
> while the expected output should be always "2000" (2000 records fitting in 
> every 1 second global window)
> {noformat}
> 2000
> 2000
> 2000
> 2000
> (...)
> {noformat}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to