[ 
https://issues.apache.org/jira/browse/FLINK-23011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17368003#comment-17368003
 ] 

Stephan Ewen edited comment on FLINK-23011 at 6/23/21, 10:01 AM:
-----------------------------------------------------------------

Some thoughts on the whole Idleness business and the cases described here:

*(1) About Idleness*

Idleness inherently circumvents the core event-time mechanism. The temporary 
unavailability of events (data) in a partition means we cannot make any 
statement about event time progress (which is data-driven).

However, real-world setups have the need to handle such situations (absence of 
data). But the behavior for that situation is very dependent on the specific 
data characteristics of the stream, and can really only ever be defined by a 
user.

Therefore, I think we should have those two core rules for Idleness:

    (a) Idleness should always be a user-defined thing, coming from the 
configured watermark strategies. The system should never trigger idleness by 
itself.

    (b) Idleness should always refer to the situation where a source has work 
(splits assigned) but no data (empty partition). We should not use it in a 
situation where a source has no work, because in such situations there is no 
way to define idleness relative to stream activity (ensure that idleness is 
only triggered when we are sure to have fallen behind by at least X time).

This means that the FLIP-27 readers should start in stat "active" and never 
switch to "idle" by themselves, but only when the overall watermark status 
(merged from all partition-specific statuses) is idle.

*(2) About Global Watermarks*

There is currently a lack of expressiveness in the watermark system. We cannot 
handle the cases well where splits are in the backlog, not assigned to any 
reader. That is a problem most prominently for the file source at the moment, 
but also foreseeable for other sources.

The problem is that Readers would signal watermark make progress based on their 
local view, lacking the knowledge that more splits are in the backlog.

I think that Idleness is not the right mechanism to deal with this. What we 
need here is a global "watermark holdback" that is handled by the coordinator. 
Meaning the coordinator signals the readers how far they may advance their 
watermarks at most (based on the split backlog).

For long backlogs of splits, this means that watermark alignment is inherently 
off and state is going to be large. But that is inherent in the nature of the 
way such a source generates data. We can fix that in two ways, though:

  - Either change the source to assign all splits immediately and read them 
concurrently and throttle progress on individual splits based on watermark 
alignment (I think this will be not super efficient)

  - Or advance the functionality in mixed batch/streaming execution: As long as 
there is a backlog of splits, execute in batch, then switch to streaming once 
all initial splits were processed and the source is in discovery mode for new 
splits. The batch execution is not sensitive to state size, which is why we 
don't need to worry about the drawbacks of the global watermark holdback.


was (Author: stephanewen):
Some thoughts on the whole Idleness business and the cases described here:

(1) Idleness inherently circumvents the core event-time mechanism. The 
temporary unavailability of events (data) in a partition means we cannot make 
any statement about event time progress (which is data-driven).

However, real-world setups have the need to handle such situations (absence of 
data). But the behavior for that situation is very dependent on the specific 
data characteristics of the stream, and can really only ever be defined by a 
user.

Therefore, I think we should have those two core rules for Idleness:

    (a) Idleness should always be a user-defined thing, coming from the 
configured watermark strategies. The system should never trigger idleness by 
itself.

    (b) Idleness should always refer to the situation where a source has work 
(splits assigned) but no data (empty partition). We should not use it in a 
situation where a source has no work, because in such situations there is no 
way to define idleness relative to stream activity (ensure that idleness is 
only triggered when we are sure to have fallen behind by at least X time).

This means that the FLIP-27 readers should start in stat "active" and never 
switch to "idle" by themselves, but only when the overall watermark status 
(merged from all partition-specific statuses) is idle.

(2) There is currently a lack of expressiveness in the watermark system. We 
cannot handle the cases well where splits are in the backlog, not assigned to 
any reader. That is a problem most prominently for the file source at the 
moment, but also foreseeable for other sources.

The problem is that read would signal watermark make progress based on their 
local view, lacking the knowledge that more splits are in the backlog.

I think that Idleness is not the right mechanism to deal with this. What we 
need here is a global "watermark holdback" that is handled by the coordinator. 
Meaning the coordinator signals the readers how far they may advance their 
watermarks at most (based on the split backlog).

For long backlogs of splits, this means that watermark alignment is inherently 
off and state is going to be large. But that is inherent in the nature of the 
way such a source generates data. We can fix that in two ways, though:

  - Either change the source to assign all splits immediately and read them 
concurrently and throttle progress on individual splits based on watermark 
alignment (I think this will be not super efficient)

  - Or advance the functionality in mixed batch/streaming execution: As long as 
there is a backlog of splits, execute in batch, then switch to streaming once 
all initial splits were processed and the source is in discovery mode for new 
splits. The batch execution is not sensitive to state size, which is why we 
don't need to worry about the drawbacks of the global watermark holdback.

> FLIP-27 sources are generating non-deterministic results when using event time
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-23011
>                 URL: https://issues.apache.org/jira/browse/FLINK-23011
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.14.0
>         Environment: 
>            Reporter: Piotr Nowojski
>            Assignee: Dawid Wysakowicz
>            Priority: Critical
>             Fix For: 1.14.0
>
>
> FLIP-27 sources currently start in the {{StreamStatus.IDLE}} state and they 
> switch to {{ACTIVE}} only after emitting first {{Watermark}}. Until this 
> happens, downstream operators are ignoring {{IDLE}} inputs from calculating 
> the input (min) watermark. 
> An extreme example to what problem this leads to, are completely bogus 
> results if for example one FLIP-27 source subtask is slower than others for 
> some reason:
> {code:java}
> env.getConfig().setAutoWatermarkInterval(2000);
> env.setParallelism(2);
> env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
> 10));
> DataStream<Long> eventStream =
>         env.fromSource(
>                         new NumberSequenceSource(0, Long.MAX_VALUE),
>                         WatermarkStrategy.<Long>forMonotonousTimestamps()
>                                 .withTimestampAssigner(new 
> LongTimestampAssigner()),
>                         "NumberSequenceSource")
>                 .map(
>                         new RichMapFunction<Long, Long>() {
>                             @Override
>                             public Long map(Long value) throws Exception {
>                                 if 
> (getRuntimeContext().getIndexOfThisSubtask() == 0) {
>                                     Thread.sleep(1);
>                                 }
>                                 return 1L;
>                             }
>                         });
> eventStream.windowAll(TumblingEventTimeWindows.of(Time.seconds(1))).sum(0).print();
> (...)
> private static class LongTimestampAssigner implements 
> SerializableTimestampAssigner<Long> {
>     private long counter = 0;
>     @Override
>     public long extractTimestamp(Long record, long recordTimeStamp) {
>         return counter++;
>     }
> }
> {code}
> In such case, after 2 seconds ({{setAutoWatermarkInterval}}) the not 
> throttled subtask (subTaskId == 1) generates very high watermarks. The other 
> source subtask (subTaskId == 0) emits very low watermarks. If the non 
> throttled watermark reaches the downstream {{WindowOperator}} first, while 
> the other input channel is still idle, it will take those high watermarks as 
> combined input watermark for the the whole {{WindowOperator}}. When the input 
> channel from the throttled source subtask finally receives it's {{ACTIVE}} 
> status and a much lower watermark, that's already too late.
> Actual output of the example program:
> {noformat}
> 1596
> 2000
> 1000
> 1000
> 1000
> 1000
> 1000
> 1000
> (...)
> {noformat}
> while the expected output should be always "2000" (2000 records fitting in 
> every 1 second global window)
> {noformat}
> 2000
> 2000
> 2000
> 2000
> (...)
> {noformat}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to