This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push: new ff77a490b [FLINK-36517][pipeline-connector][paimon] Use filterAndCommit API to avoid committing the same datafile twice ff77a490b is described below commit ff77a490bd88d86952d47607f44a07163f5d8ced Author: Junbo wang <beryllw...@gmail.com> AuthorDate: Mon Nov 11 19:57:23 2024 +0800 [FLINK-36517][pipeline-connector][paimon] Use filterAndCommit API to avoid committing the same datafile twice This closes #3639 --- .../connectors/paimon/sink/v2/PaimonCommitter.java | 26 +++--- .../paimon/sink/v2/PaimonSinkITCase.java | 102 +++++++++++++++++++++ 2 files changed, 114 insertions(+), 14 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java index f89809468..07abb03bf 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java @@ -27,6 +27,7 @@ import org.apache.paimon.options.Options; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -49,7 +50,8 @@ public class PaimonCommitter implements Committer<MultiTableCommittable> { } @Override - public void commit(Collection<CommitRequest<MultiTableCommittable>> commitRequests) { + public void commit(Collection<CommitRequest<MultiTableCommittable>> commitRequests) + throws IOException { if (commitRequests.isEmpty()) { return; } @@ -60,27 +62,23 @@ public class PaimonCommitter implements Committer<MultiTableCommittable> { .collect(Collectors.toList()); // All CommitRequest shared the same checkpointId. long checkpointId = committables.get(0).checkpointId(); - int retriedNumber = commitRequests.stream().findFirst().get().getNumberOfRetries(); WrappedManifestCommittable wrappedManifestCommittable = storeMultiCommitter.combine(checkpointId, 1L, committables); try { - if (retriedNumber > 0) { - storeMultiCommitter.filterAndCommit( - Collections.singletonList(wrappedManifestCommittable)); - } else { - storeMultiCommitter.commit(Collections.singletonList(wrappedManifestCommittable)); - } + storeMultiCommitter.filterAndCommit( + Collections.singletonList(wrappedManifestCommittable)); commitRequests.forEach(CommitRequest::signalAlreadyCommitted); LOGGER.info( - String.format( - "Commit succeeded for %s with %s committable", - checkpointId, committables.size())); + "Commit succeeded for {} with {} committable", + checkpointId, + committables.size()); } catch (Exception e) { commitRequests.forEach(CommitRequest::retryLater); LOGGER.warn( - String.format( - "Commit failed for %s with %s committable", - checkpointId, committables.size())); + "Commit failed for {} with {} committable", + checkpointId, + committables.size(), + e); } } diff --git a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java index f44fd47a3..833d0ee03 100644 --- a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java +++ b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java @@ -84,6 +84,8 @@ public class PaimonSinkITCase { private BinaryRecordDataGenerator generator; + private static int checkpointId = 1; + public static final String TEST_DATABASE = "test"; private static final String HADOOP_CONF_DIR = Objects.requireNonNull( @@ -188,6 +190,7 @@ public class PaimonSinkITCase { writer.flush(false); Collection<Committer.CommitRequest<MultiTableCommittable>> commitRequests = writer.prepareCommit().stream() + .map(this::correctCheckpointId) .map(MockCommitRequestImpl::new) .collect(Collectors.toList()); committer.commit(commitRequests); @@ -214,6 +217,7 @@ public class PaimonSinkITCase { writer.flush(false); commitRequests = writer.prepareCommit().stream() + .map(this::correctCheckpointId) .map(MockCommitRequestImpl::new) .collect(Collectors.toList()); committer.commit(commitRequests); @@ -243,6 +247,7 @@ public class PaimonSinkITCase { writer.flush(false); commitRequests = writer.prepareCommit().stream() + .map(this::correctCheckpointId) .map(MockCommitRequestImpl::new) .collect(Collectors.toList()); committer.commit(commitRequests); @@ -274,6 +279,7 @@ public class PaimonSinkITCase { writer.flush(false); Collection<Committer.CommitRequest<MultiTableCommittable>> commitRequests = writer.prepareCommit().stream() + .map(this::correctCheckpointId) .map(MockCommitRequestImpl::new) .collect(Collectors.toList()); committer.commit(commitRequests); @@ -324,6 +330,7 @@ public class PaimonSinkITCase { writer.flush(false); commitRequests = writer.prepareCommit().stream() + .map(this::correctCheckpointId) .map(MockCommitRequestImpl::new) .collect(Collectors.toList()); committer.commit(commitRequests); @@ -371,6 +378,7 @@ public class PaimonSinkITCase { writer.flush(false); commitRequests = writer.prepareCommit().stream() + .map(this::correctCheckpointId) .map(MockCommitRequestImpl::new) .collect(Collectors.toList()); committer.commit(commitRequests); @@ -433,6 +441,7 @@ public class PaimonSinkITCase { writer.flush(false); Collection<Committer.CommitRequest<MultiTableCommittable>> commitRequests = writer.prepareCommit().stream() + .map(this::correctCheckpointId) .map(MockCommitRequestImpl::new) .collect(Collectors.toList()); committer.commit(commitRequests); @@ -454,6 +463,99 @@ public class PaimonSinkITCase { Collections.singletonList(Row.ofKind(RowKind.INSERT, "1", "1")), result); } + @ParameterizedTest + @ValueSource(strings = {"filesystem", "hive"}) + public void testDuplicateCommitAfterRestore(String metastore) + throws IOException, InterruptedException, Catalog.DatabaseNotEmptyException, + Catalog.DatabaseNotExistException, SchemaEvolveException { + initialize(metastore); + PaimonSink<Event> paimonSink = + new PaimonSink<>( + catalogOptions, new PaimonRecordEventSerializer(ZoneId.systemDefault())); + PaimonWriter<Event> writer = paimonSink.createWriter(new MockInitContext()); + Committer<MultiTableCommittable> committer = paimonSink.createCommitter(); + + // insert + for (Event event : createTestEvents()) { + writer.write(event, null); + } + writer.flush(false); + Collection<Committer.CommitRequest<MultiTableCommittable>> commitRequests = + writer.prepareCommit().stream() + .map(this::correctCheckpointId) + .map(MockCommitRequestImpl::new) + .collect(Collectors.toList()); + committer.commit(commitRequests); + + // We add a loop for restore 7 times + for (int i = 2; i < 9; i++) { + // We've two steps in checkpoint: 1. snapshotState(ckp); 2. + // notifyCheckpointComplete(ckp). + // It's possible that flink job will restore from a checkpoint with only step#1 finished + // and + // step#2 not. + // CommitterOperator will try to re-commit recovered transactions. + committer.commit(commitRequests); + List<DataChangeEvent> events = + Arrays.asList( + DataChangeEvent.insertEvent( + table1, + generator.generate( + new Object[] { + BinaryStringData.fromString(Integer.toString(i)), + BinaryStringData.fromString(Integer.toString(i)) + }))); + Assertions.assertDoesNotThrow( + () -> { + for (Event event : events) { + writer.write(event, null); + } + }); + writer.flush(false); + // Checkpoint id start from 1 + committer.commit( + writer.prepareCommit().stream() + .map(this::correctCheckpointId) + .map(MockCommitRequestImpl::new) + .collect(Collectors.toList())); + } + + List<Row> result = new ArrayList<>(); + tEnv.sqlQuery("select * from paimon_catalog.test.`table1$snapshots`") + .execute() + .collect() + .forEachRemaining(result::add); + // 8 APPEND and 1 COMPACT + Assertions.assertEquals(result.size(), 9); + result.clear(); + + tEnv.sqlQuery("select * from paimon_catalog.test.`table1`") + .execute() + .collect() + .forEachRemaining(result::add); + Assertions.assertEquals( + Arrays.asList( + Row.ofKind(RowKind.INSERT, "1", "1"), + Row.ofKind(RowKind.INSERT, "2", "2"), + Row.ofKind(RowKind.INSERT, "3", "3"), + Row.ofKind(RowKind.INSERT, "4", "4"), + Row.ofKind(RowKind.INSERT, "5", "5"), + Row.ofKind(RowKind.INSERT, "6", "6"), + Row.ofKind(RowKind.INSERT, "7", "7"), + Row.ofKind(RowKind.INSERT, "8", "8")), + result); + } + + private MultiTableCommittable correctCheckpointId(MultiTableCommittable committable) { + // update the right checkpointId for MultiTableCommittable + return new MultiTableCommittable( + committable.getDatabase(), + committable.getTable(), + checkpointId++, + committable.kind(), + committable.wrappedCommittable()); + } + private static class MockCommitRequestImpl<CommT> extends CommitRequestImpl<CommT> { protected MockCommitRequestImpl(CommT committable) {