[ https://issues.apache.org/jira/browse/FLINK-23011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17368244#comment-17368244 ]
Arvid Heise commented on FLINK-23011: ------------------------------------- Re 1b) I agree that we already have it for static assignments. My main concern is that all more recent streaming storages seem to go into dynamic partitions (Kinesis, Pulsar, Pravega). Similarly, work-stealing is a long-term goal of FLIP-27. So having a solution that just works for Kafka with static assignment for now feels unsatisfying to me. Furthermore, with HybridSources, the issue is more pronounced. Think of a job that reads from iceberg and switches to Kafka, now some iceberg splits may be larger and take longer to ingest. At that time, the idle readers have no iceberg splits but can't close either. So we would have no watermarks until the switch to Kafka actually happens. Re 2) I like the idea of thinking in splits that would certainly also help with the unrelated event time alignment. As stated above, I don't agree on applying it only to long-living split. What happens when a storage system simply closes unused partitions after some time and reopens them when they are used finally? Watermark assignment would not happen through enumerator; just watermark generation. A reader goes idle and that tells the enumerator that it needs to generate suitable lower bounds. The enumerator periodically queries the source system for a low watermark and then sends it to ALL idle readers which in turn emit it. We could do the same just on reader level but you could easily DDoS the source system for a high degree of parallelism. > FLIP-27 sources are generating non-deterministic results when using event time > ------------------------------------------------------------------------------ > > Key: FLINK-23011 > URL: https://issues.apache.org/jira/browse/FLINK-23011 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.14.0 > Environment: > Reporter: Piotr Nowojski > Assignee: Dawid Wysakowicz > Priority: Critical > Fix For: 1.14.0 > > > FLIP-27 sources currently start in the {{StreamStatus.IDLE}} state and they > switch to {{ACTIVE}} only after emitting first {{Watermark}}. Until this > happens, downstream operators are ignoring {{IDLE}} inputs from calculating > the input (min) watermark. > An extreme example to what problem this leads to, are completely bogus > results if for example one FLIP-27 source subtask is slower than others for > some reason: > {code:java} > env.getConfig().setAutoWatermarkInterval(2000); > env.setParallelism(2); > env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, > 10)); > DataStream<Long> eventStream = > env.fromSource( > new NumberSequenceSource(0, Long.MAX_VALUE), > WatermarkStrategy.<Long>forMonotonousTimestamps() > .withTimestampAssigner(new > LongTimestampAssigner()), > "NumberSequenceSource") > .map( > new RichMapFunction<Long, Long>() { > @Override > public Long map(Long value) throws Exception { > if > (getRuntimeContext().getIndexOfThisSubtask() == 0) { > Thread.sleep(1); > } > return 1L; > } > }); > eventStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).sum(0).print(); > (...) > private static class LongTimestampAssigner implements > SerializableTimestampAssigner<Long> { > private long counter = 0; > @Override > public long extractTimestamp(Long record, long recordTimeStamp) { > return counter++; > } > } > {code} > In such case, after 2 seconds ({{setAutoWatermarkInterval}}) the not > throttled subtask (subTaskId == 1) generates very high watermarks. The other > source subtask (subTaskId == 0) emits very low watermarks. If the non > throttled watermark reaches the downstream {{WindowOperator}} first, while > the other input channel is still idle, it will take those high watermarks as > combined input watermark for the the whole {{WindowOperator}}. When the input > channel from the throttled source subtask finally receives it's {{ACTIVE}} > status and a much lower watermark, that's already too late. > Actual output of the example program: > {noformat} > 1596 > 2000 > 1000 > 1000 > 1000 > 1000 > 1000 > 1000 > (...) > {noformat} > while the expected output should be always "2000" (2000 records fitting in > every 1 second global window) > {noformat} > 2000 > 2000 > 2000 > 2000 > (...) > {noformat}. -- This message was sent by Atlassian Jira (v8.3.4#803005)