[ 
https://issues.apache.org/jira/browse/FLINK-23011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17368244#comment-17368244
 ] 

Arvid Heise commented on FLINK-23011:
-------------------------------------

Re 1b) I agree that we already have it for static assignments. My main concern 
is that all more recent streaming storages seem to go into dynamic partitions 
(Kinesis, Pulsar, Pravega). Similarly, work-stealing is a long-term goal of 
FLIP-27. So having a solution that just works for Kafka with static assignment 
for now feels unsatisfying to me.

Furthermore, with HybridSources, the issue is more pronounced. Think of a job 
that reads from iceberg and switches to Kafka, now some iceberg splits may be 
larger and take longer to ingest. At that time, the idle readers have no 
iceberg splits but can't close either. So we would have no watermarks until the 
switch to Kafka actually happens.

Re 2) I like the idea of thinking in splits that would certainly also help with 
the unrelated event time alignment. As stated above, I don't agree on applying 
it only to long-living split. What happens when a storage system simply closes 
unused partitions after some time and reopens them when they are used finally?

Watermark assignment would not happen through enumerator; just watermark 
generation. A reader goes idle and that tells the enumerator that it needs to 
generate suitable lower bounds. The enumerator periodically queries the source 
system for a low watermark and then sends it to ALL idle readers which in turn 
emit it. We could do the same just on reader level but you could easily DDoS 
the source system for a high degree of parallelism.

> FLIP-27 sources are generating non-deterministic results when using event time
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-23011
>                 URL: https://issues.apache.org/jira/browse/FLINK-23011
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.14.0
>         Environment: 
>            Reporter: Piotr Nowojski
>            Assignee: Dawid Wysakowicz
>            Priority: Critical
>             Fix For: 1.14.0
>
>
> FLIP-27 sources currently start in the {{StreamStatus.IDLE}} state and they 
> switch to {{ACTIVE}} only after emitting first {{Watermark}}. Until this 
> happens, downstream operators are ignoring {{IDLE}} inputs from calculating 
> the input (min) watermark. 
> An extreme example to what problem this leads to, are completely bogus 
> results if for example one FLIP-27 source subtask is slower than others for 
> some reason:
> {code:java}
> env.getConfig().setAutoWatermarkInterval(2000);
> env.setParallelism(2);
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
> 10));
> DataStream<Long> eventStream =
>         env.fromSource(
>                         new NumberSequenceSource(0, Long.MAX_VALUE),
>                         WatermarkStrategy.<Long>forMonotonousTimestamps()
>                                 .withTimestampAssigner(new 
> LongTimestampAssigner()),
>                         "NumberSequenceSource")
>                 .map(
>                         new RichMapFunction<Long, Long>() {
>                             @Override
>                             public Long map(Long value) throws Exception {
>                                 if 
> (getRuntimeContext().getIndexOfThisSubtask() == 0) {
>                                     Thread.sleep(1);
>                                 }
>                                 return 1L;
>                             }
>                         });
> eventStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).sum(0).print();
> (...)
> private static class LongTimestampAssigner implements 
> SerializableTimestampAssigner<Long> {
>     private long counter = 0;
>     @Override
>     public long extractTimestamp(Long record, long recordTimeStamp) {
>         return counter++;
>     }
> }
> {code}
> In such case, after 2 seconds ({{setAutoWatermarkInterval}}) the not 
> throttled subtask (subTaskId == 1) generates very high watermarks. The other 
> source subtask (subTaskId == 0) emits very low watermarks. If the non 
> throttled watermark reaches the downstream {{WindowOperator}} first, while 
> the other input channel is still idle, it will take those high watermarks as 
> combined input watermark for the the whole {{WindowOperator}}. When the input 
> channel from the throttled source subtask finally receives it's {{ACTIVE}} 
> status and a much lower watermark, that's already too late.
> Actual output of the example program:
> {noformat}
> 1596
> 2000
> 1000
> 1000
> 1000
> 1000
> 1000
> 1000
> (...)
> {noformat}
> while the expected output should be always "2000" (2000 records fitting in 
> every 1 second global window)
> {noformat}
> 2000
> 2000
> 2000
> 2000
> (...)
> {noformat}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to