[ https://issues.apache.org/jira/browse/FLINK-23011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17368003#comment-17368003 ]
Stephan Ewen commented on FLINK-23011: -------------------------------------- Some thoughts on the whole Idleness business and the cases described here: (1) Idleness inherently circumvents the core event-time mechanism. The temporary unavailability of events (data) in a partition means we cannot make any statement about event time progress (which is data-driven). However, real-world setups have the need to handle such situations (absence of data). But the behavior for that situation is very dependent on the specific data characteristics of the stream, and can really only ever be defined by a user. Therefore, I think we should have those two core rules for Idleness: (a) Idleness should always be a user-defined thing, coming from the configured watermark strategies. The system should never trigger idleness by itself. (b) Idleness should always refer to the situation where a source has work (splits assigned) but no data (empty partition). We should not use it in a situation where a source has no work, because in such situations there is no way to define idleness relative to stream activity (ensure that idleness is only triggered when we are sure to have fallen behind by at least X time). This means that the FLIP-27 readers should start in stat "active" and never switch to "idle" by themselves, but only when the overall watermark status (merged from all partition-specific statuses) is idle. (2) There is currently a lack of expressiveness in the watermark system. We cannot handle the cases well where splits are in the backlog, not assigned to any reader. That is a problem most prominently for the file source at the moment, but also foreseeable for other sources. The problem is that read would signal watermark make progress based on their local view, lacking the knowledge that more splits are in the backlog. I think that Idleness is not the right mechanism to deal with this. What we need here is a global "watermark holdback" that is handled by the coordinator. Meaning the coordinator signals the readers how far they may advance their watermarks at most (based on the split backlog). For long backlogs of splits, this means that watermark alignment is inherently off and state is going to be large. But that is inherent in the nature of the way such a source generates data. We can fix that in two ways, though: - Either change the source to assign all splits immediately and read them concurrently and throttle progress on individual splits based on watermark alignment (I think this will be not super efficient) - Or advance the functionality in mixed batch/streaming execution: As long as there is a backlog of splits, execute in batch, then switch to streaming once all initial splits were processed and the source is in discovery mode for new splits. The batch execution is not sensitive to state size, which is why we don't need to worry about the drawbacks of the global watermark holdback. > FLIP-27 sources are generating non-deterministic results when using event time > ------------------------------------------------------------------------------ > > Key: FLINK-23011 > URL: https://issues.apache.org/jira/browse/FLINK-23011 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.14.0 > Environment: > Reporter: Piotr Nowojski > Assignee: Dawid Wysakowicz > Priority: Critical > Fix For: 1.14.0 > > > FLIP-27 sources currently start in the {{StreamStatus.IDLE}} state and they > switch to {{ACTIVE}} only after emitting first {{Watermark}}. Until this > happens, downstream operators are ignoring {{IDLE}} inputs from calculating > the input (min) watermark. > An extreme example to what problem this leads to, are completely bogus > results if for example one FLIP-27 source subtask is slower than others for > some reason: > {code:java} > env.getConfig().setAutoWatermarkInterval(2000); > env.setParallelism(2); > env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, > 10)); > DataStream<Long> eventStream = > env.fromSource( > new NumberSequenceSource(0, Long.MAX_VALUE), > WatermarkStrategy.<Long>forMonotonousTimestamps() > .withTimestampAssigner(new > LongTimestampAssigner()), > "NumberSequenceSource") > .map( > new RichMapFunction<Long, Long>() { > @Override > public Long map(Long value) throws Exception { > if > (getRuntimeContext().getIndexOfThisSubtask() == 0) { > Thread.sleep(1); > } > return 1L; > } > }); > eventStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).sum(0).print(); > (...) > private static class LongTimestampAssigner implements > SerializableTimestampAssigner<Long> { > private long counter = 0; > @Override > public long extractTimestamp(Long record, long recordTimeStamp) { > return counter++; > } > } > {code} > In such case, after 2 seconds ({{setAutoWatermarkInterval}}) the not > throttled subtask (subTaskId == 1) generates very high watermarks. The other > source subtask (subTaskId == 0) emits very low watermarks. If the non > throttled watermark reaches the downstream {{WindowOperator}} first, while > the other input channel is still idle, it will take those high watermarks as > combined input watermark for the the whole {{WindowOperator}}. When the input > channel from the throttled source subtask finally receives it's {{ACTIVE}} > status and a much lower watermark, that's already too late. > Actual output of the example program: > {noformat} > 1596 > 2000 > 1000 > 1000 > 1000 > 1000 > 1000 > 1000 > (...) > {noformat} > while the expected output should be always "2000" (2000 records fitting in > every 1 second global window) > {noformat} > 2000 > 2000 > 2000 > 2000 > (...) > {noformat}. -- This message was sent by Atlassian Jira (v8.3.4#803005)