[ https://issues.apache.org/jira/browse/FLINK-29459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17623149#comment-17623149 ]
Yun Gao commented on FLINK-29459: --------------------------------- Hi [~fpaul] [~KristoffSC] Very thanks for fixing the issues and very sorry for missed the previous notifications for in the holiday then. Regarding the current sink v2 mechanism I have some more thoughts: Currently we rely on the CommittableSummary and CommittableWithLineage message to coordinate between Writers and Committers. For each checkpoint, each Writer subtask would first emit a CommittableSummary to the Committers, which contains the number of Committables to send. Then the Writer subtask emit that number of CommittableWithLineage messages to the Committers. The Committers relies on the number in the summary to detect if it has received all the Committables from each write subtask. But the mechanism contains some issues: # It could only support the partitioner with one target for each source between Writer and Committer, like forward / rescale. If for the long run we want to support the Committers with arbitrary parallelism, it might cause issues if Writer and Committer have different parallelism. Similarly it also complicate the authors of connectors that using PreCommitterTopolgy. # With unaligned checkpoint and rescale after recovering, if some CommittableSummary messages have been processed and stored in the snapshot, but the corresponding CommittableWithLineage messages have been assigned to other tasks, the number of Committables would be not correct. One possible alternative might be instead of relying on numbers, we might first emit the Committables, then followed by a broadcast message that confirms the end of a checkpoint. The Committable would know that it has received all the Committables after received the Confirmed messages from all the previous tasks. The mechanism is a bit like how watermark works. Then for the above two issues: # It would support all the partitioners. # For unaligned checkpoint and rescaling case, we could simply commit all the Committables with the startup id and ignore all the confirmation messages of the same checkpoint id on startup. We could then wait for the confirmation message of the next checkpoint id to mark all the previous checkpoints as finished. How do you think about this? Sorry if I overlook something. > Sink v2 has bugs in supporting legacy v1 implementations with global committer > ------------------------------------------------------------------------------ > > Key: FLINK-29459 > URL: https://issues.apache.org/jira/browse/FLINK-29459 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.16.0, 1.17.0, 1.15.2 > Reporter: Yun Gao > Assignee: Yun Gao > Priority: Major > Fix For: 1.17.0, 1.15.3, 1.16.1 > > > Currently when supporting Sink implementation using version 1 interface, > there are issues after restoring from a checkpoint after failover: > # In global committer operator, when restoring SubtaskCommittableManager, > the subtask id is replaced with the one in the current operator. This means > that the id originally is the id of the sender task (0 ~ N - 1), but after > restoring it has to be 0. This would cause Duplication Key exception during > restoring. > # For Committer operator, the subtaskId of CheckpointCommittableManagerImpl > is always restored to 0 after failover for all the subtasks. This makes the > summary sent to the Global Committer is attached with wrong subtask id. > # For Committer operator, the checkpoint id of SubtaskCommittableManager is > always restored to 1 after failover, this make the following committable sent > to the global committer is attached with wrong checkpoint id. -- This message was sent by Atlassian Jira (v8.20.10#820010)