Fabian Paul created FLINK-29512:
-----------------------------------
Summary: Align SubtaskCommittableManager checkpointId with
CheckpointCommittableManagerImpl checkpointId during recovery
Key: FLINK-29512
URL: https://issues.apache.org/jira/browse/FLINK-29512
Project: Flink
Issue Type: Bug
Components: Connectors / Common
Affects Versions: 1.15.2, 1.17.0, 1.16.1
Reporter: Fabian Paul
Similar to the issue described in
https://issues.apache.org/jira/browse/FLINK-29509 during the recovery of
committables, the subtaskCommittables checkpointId is set to always 1
[https://github.com/apache/flink/blob/9152af41c2d401e5eacddee1bb10d1b6bea6c61a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java#L193]
while the holding CheckpointCommittableManager is initialized with the
checkpointId that is written into state
[https://github.com/apache/flink/blob/9152af41c2d401e5eacddee1bb10d1b6bea6c61a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java#L155
.|https://github.com/apache/flink/blob/9152af41c2d401e5eacddee1bb10d1b6bea6c61a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/sink/committables/CommittableCollectorSerializer.java#L155.]
This leads to that during a recovery, the post-commit topology will receive a
committable summary with the recovered checkpoint id and multiple
`CommittableWithLinage`s with the reset checkpointId causing orphaned
`CommittableWithLinages` without a `CommittableSummary` failing the job.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)