Gyula Fora created FLINK-35157:
----------------------------------

             Summary: Sources with watermark alignment get stuck once some 
subtasks finish
                 Key: FLINK-35157
                 URL: https://issues.apache.org/jira/browse/FLINK-35157
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Coordination
    Affects Versions: 1.18.1, 1.19.0, 1.17.2
            Reporter: Gyula Fora


The current watermark alignment logic can easily get stuck if some subtasks 
finish while others are still running.

The reason is that once a source subtask finishes, the subtask is not excluded 
from alignment, effectively blocking the rest of the job to make progress 
beyond last wm + alignment time for the finished sources.

This can be easily reproduced by the following simple pipeline:
{noformat}
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(2);
    DataStream<Long> s = env.fromSource(new NumberSequenceSource(0, 100),
            
WatermarkStrategy.<Long>forMonotonousTimestamps().withTimestampAssigner((SerializableTimestampAssigner<Long>)
 (aLong, l) -> aLong).withWatermarkAlignment("g1", Duration.ofMillis(10), 
Duration.ofSeconds(2)),
            "Sequence Source").filter((FilterFunction<Long>) aLong -> {
        Thread.sleep(200);
        return true;
    }
);

    s.print();
    env.execute();{noformat}
The solution could be to send out a max watermark event once the sources finish 
or to exclude them from the source coordinator



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to