[
https://issues.apache.org/jira/browse/FLINK-37449?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Leonard Xu reassigned FLINK-37449:
----------------------------------
Assignee: Xin Gong
> Postgres don't commit lsn when taskmanager failover
> ---------------------------------------------------
>
> Key: FLINK-37449
> URL: https://issues.apache.org/jira/browse/FLINK-37449
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.1.0, cdc-3.2.0, cdc-3.1.1, cdc-3.3.0, cdc-3.2.1
> Reporter: Xin Gong
> Assignee: Xin Gong
> Priority: Major
> Labels: pull-request-available
> Fix For: cdc-3.3.1
>
>
> Postgres cdc don't commit lsn when the task has entered incremental phase and
> happens failover. It cause wal data can't be cleaned.
> BeacauseĀ PostgresSourceEnumerator#receiveOffsetCommitAck is true causeĀ
> PostgresSourceReader#isCommitOffset always be false when the task failover
> and receiveOffsetCommitAck is true.Ā It will cause task never call
> notifyCheckpointComplete.
> {code:java}
> @Override
> public void notifyCheckpointComplete(long checkpointId) throws Exception {
> this.minHeap.add(checkpointId);
> if (this.minHeap.size() <= this.lsnCommitCheckpointsDelay) {
> LOG.info("Pending checkpoints '{}'.", this.minHeap);
> return;
> }
> final long checkpointIdToCommit = this.minHeap.poll();
> LOG.info(
> "Pending checkpoints '{}', to be committed checkpoint id '{}'.",
> this.minHeap,
> checkpointIdToCommit);
> // After all snapshot splits are finished, update stream split's metadata and
> reset start
> // offset, which maybe smaller than before.
> // In case that new start-offset of stream split has been recycled, don't
> commit offset
> // during new table added phase.
> if (isCommitOffset()) {
> super.notifyCheckpointComplete(checkpointIdToCommit);
> }
> } {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)