[ https://issues.apache.org/jira/browse/FLINK-23011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Piotr Nowojski updated FLINK-23011: ----------------------------------- Issue Type: Bug (was: New Feature) > FLIP-27 sources are generating non-deterministic results when using event time > ------------------------------------------------------------------------------ > > Key: FLINK-23011 > URL: https://issues.apache.org/jira/browse/FLINK-23011 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.14.0, 1.13.1, 1.12.4 > Environment: > Reporter: Piotr Nowojski > Priority: Critical > > FLIP-27 sources currently start in the {{StreamStatus.IDLE}} state and they > switch to {{ACTIVE}} only after emitting first {{Watermark}}. Until this > happens, downstream operators are ignoring {{IDLE}} inputs from calculating > the input (min) watermark. > An extreme example to what problem this leads to, are completely bogus > results if for example one FLIP-27 source subtask is slower than others for > some reason: > {code:java} > env.getConfig().setAutoWatermarkInterval(2000); > env.setParallelism(2); > env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, > 10)); > DataStream<Long> eventStream = > env.fromSource( > new NumberSequenceSource(0, Long.MAX_VALUE), > WatermarkStrategy.<Long>forMonotonousTimestamps() > .withTimestampAssigner(new > LongTimestampAssigner()), > "NumberSequenceSource") > .map( > new RichMapFunction<Long, Long>() { > @Override > public Long map(Long value) throws Exception { > if > (getRuntimeContext().getIndexOfThisSubtask() == 0) { > Thread.sleep(1); > } > return 1L; > } > }); > eventStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).sum(0).print(); > (...) > private static class LongTimestampAssigner implements > SerializableTimestampAssigner<Long> { > private long counter = 0; > @Override > public long extractTimestamp(Long record, long recordTimeStamp) { > return counter++; > } > } > {code} > In such case, after 2 seconds ({{setAutoWatermarkInterval}}) the not > throttled subtask (subTaskId == 1) generates very high watermarks. The other > source subtask (subTaskId == 0) emits very low watermarks. If the non > throttled watermark reaches the downstream {{WindowOperator}} first, while > the other input channel is still idle, it will take those high watermarks as > combined input watermark for the the whole {{WindowOperator}}. When the input > channel from the throttled source subtask finally receives it's {{ACTIVE}} > status and a much lower watermark, that's already too late. > Actual output of the example program: > {noformat} > 1596 > 2000 > 1000 > 1000 > 1000 > 1000 > 1000 > 1000 > (...) > {noformat} > while the expected output should be always "2000" (2000 records fitting in > every 1 second global window) > {noformat} > 2000 > 2000 > 2000 > 2000 > (...) > {noformat}. -- This message was sent by Atlassian Jira (v8.3.4#803005)