This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new ff77a490b [FLINK-36517][pipeline-connector][paimon] Use 
filterAndCommit API to avoid committing the same datafile twice
ff77a490b is described below

commit ff77a490bd88d86952d47607f44a07163f5d8ced
Author: Junbo wang <beryllw...@gmail.com>
AuthorDate: Mon Nov 11 19:57:23 2024 +0800

    [FLINK-36517][pipeline-connector][paimon] Use filterAndCommit API to avoid 
committing the same datafile twice
    
     This closes  #3639
---
 .../connectors/paimon/sink/v2/PaimonCommitter.java |  26 +++---
 .../paimon/sink/v2/PaimonSinkITCase.java           | 102 +++++++++++++++++++++
 2 files changed, 114 insertions(+), 14 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java
index f89809468..07abb03bf 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/main/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonCommitter.java
@@ -27,6 +27,7 @@ import org.apache.paimon.options.Options;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
@@ -49,7 +50,8 @@ public class PaimonCommitter implements 
Committer<MultiTableCommittable> {
     }
 
     @Override
-    public void commit(Collection<CommitRequest<MultiTableCommittable>> 
commitRequests) {
+    public void commit(Collection<CommitRequest<MultiTableCommittable>> 
commitRequests)
+            throws IOException {
         if (commitRequests.isEmpty()) {
             return;
         }
@@ -60,27 +62,23 @@ public class PaimonCommitter implements 
Committer<MultiTableCommittable> {
                         .collect(Collectors.toList());
         // All CommitRequest shared the same checkpointId.
         long checkpointId = committables.get(0).checkpointId();
-        int retriedNumber = 
commitRequests.stream().findFirst().get().getNumberOfRetries();
         WrappedManifestCommittable wrappedManifestCommittable =
                 storeMultiCommitter.combine(checkpointId, 1L, committables);
         try {
-            if (retriedNumber > 0) {
-                storeMultiCommitter.filterAndCommit(
-                        Collections.singletonList(wrappedManifestCommittable));
-            } else {
-                
storeMultiCommitter.commit(Collections.singletonList(wrappedManifestCommittable));
-            }
+            storeMultiCommitter.filterAndCommit(
+                    Collections.singletonList(wrappedManifestCommittable));
             commitRequests.forEach(CommitRequest::signalAlreadyCommitted);
             LOGGER.info(
-                    String.format(
-                            "Commit succeeded for %s with %s committable",
-                            checkpointId, committables.size()));
+                    "Commit succeeded for {} with {} committable",
+                    checkpointId,
+                    committables.size());
         } catch (Exception e) {
             commitRequests.forEach(CommitRequest::retryLater);
             LOGGER.warn(
-                    String.format(
-                            "Commit failed for %s with %s committable",
-                            checkpointId, committables.size()));
+                    "Commit failed for {} with {} committable",
+                    checkpointId,
+                    committables.size(),
+                    e);
         }
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
index f44fd47a3..833d0ee03 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-paimon/src/test/java/org/apache/flink/cdc/connectors/paimon/sink/v2/PaimonSinkITCase.java
@@ -84,6 +84,8 @@ public class PaimonSinkITCase {
 
     private BinaryRecordDataGenerator generator;
 
+    private static int checkpointId = 1;
+
     public static final String TEST_DATABASE = "test";
     private static final String HADOOP_CONF_DIR =
             Objects.requireNonNull(
@@ -188,6 +190,7 @@ public class PaimonSinkITCase {
         writer.flush(false);
         Collection<Committer.CommitRequest<MultiTableCommittable>> 
commitRequests =
                 writer.prepareCommit().stream()
+                        .map(this::correctCheckpointId)
                         .map(MockCommitRequestImpl::new)
                         .collect(Collectors.toList());
         committer.commit(commitRequests);
@@ -214,6 +217,7 @@ public class PaimonSinkITCase {
         writer.flush(false);
         commitRequests =
                 writer.prepareCommit().stream()
+                        .map(this::correctCheckpointId)
                         .map(MockCommitRequestImpl::new)
                         .collect(Collectors.toList());
         committer.commit(commitRequests);
@@ -243,6 +247,7 @@ public class PaimonSinkITCase {
         writer.flush(false);
         commitRequests =
                 writer.prepareCommit().stream()
+                        .map(this::correctCheckpointId)
                         .map(MockCommitRequestImpl::new)
                         .collect(Collectors.toList());
         committer.commit(commitRequests);
@@ -274,6 +279,7 @@ public class PaimonSinkITCase {
         writer.flush(false);
         Collection<Committer.CommitRequest<MultiTableCommittable>> 
commitRequests =
                 writer.prepareCommit().stream()
+                        .map(this::correctCheckpointId)
                         .map(MockCommitRequestImpl::new)
                         .collect(Collectors.toList());
         committer.commit(commitRequests);
@@ -324,6 +330,7 @@ public class PaimonSinkITCase {
         writer.flush(false);
         commitRequests =
                 writer.prepareCommit().stream()
+                        .map(this::correctCheckpointId)
                         .map(MockCommitRequestImpl::new)
                         .collect(Collectors.toList());
         committer.commit(commitRequests);
@@ -371,6 +378,7 @@ public class PaimonSinkITCase {
         writer.flush(false);
         commitRequests =
                 writer.prepareCommit().stream()
+                        .map(this::correctCheckpointId)
                         .map(MockCommitRequestImpl::new)
                         .collect(Collectors.toList());
         committer.commit(commitRequests);
@@ -433,6 +441,7 @@ public class PaimonSinkITCase {
         writer.flush(false);
         Collection<Committer.CommitRequest<MultiTableCommittable>> 
commitRequests =
                 writer.prepareCommit().stream()
+                        .map(this::correctCheckpointId)
                         .map(MockCommitRequestImpl::new)
                         .collect(Collectors.toList());
         committer.commit(commitRequests);
@@ -454,6 +463,99 @@ public class PaimonSinkITCase {
                 Collections.singletonList(Row.ofKind(RowKind.INSERT, "1", 
"1")), result);
     }
 
+    @ParameterizedTest
+    @ValueSource(strings = {"filesystem", "hive"})
+    public void testDuplicateCommitAfterRestore(String metastore)
+            throws IOException, InterruptedException, 
Catalog.DatabaseNotEmptyException,
+                    Catalog.DatabaseNotExistException, SchemaEvolveException {
+        initialize(metastore);
+        PaimonSink<Event> paimonSink =
+                new PaimonSink<>(
+                        catalogOptions, new 
PaimonRecordEventSerializer(ZoneId.systemDefault()));
+        PaimonWriter<Event> writer = paimonSink.createWriter(new 
MockInitContext());
+        Committer<MultiTableCommittable> committer = 
paimonSink.createCommitter();
+
+        // insert
+        for (Event event : createTestEvents()) {
+            writer.write(event, null);
+        }
+        writer.flush(false);
+        Collection<Committer.CommitRequest<MultiTableCommittable>> 
commitRequests =
+                writer.prepareCommit().stream()
+                        .map(this::correctCheckpointId)
+                        .map(MockCommitRequestImpl::new)
+                        .collect(Collectors.toList());
+        committer.commit(commitRequests);
+
+        // We add a loop for restore 7 times
+        for (int i = 2; i < 9; i++) {
+            // We've two steps in checkpoint: 1. snapshotState(ckp); 2.
+            // notifyCheckpointComplete(ckp).
+            // It's possible that flink job will restore from a checkpoint 
with only step#1 finished
+            // and
+            // step#2 not.
+            // CommitterOperator will try to re-commit recovered transactions.
+            committer.commit(commitRequests);
+            List<DataChangeEvent> events =
+                    Arrays.asList(
+                            DataChangeEvent.insertEvent(
+                                    table1,
+                                    generator.generate(
+                                            new Object[] {
+                                                
BinaryStringData.fromString(Integer.toString(i)),
+                                                
BinaryStringData.fromString(Integer.toString(i))
+                                            })));
+            Assertions.assertDoesNotThrow(
+                    () -> {
+                        for (Event event : events) {
+                            writer.write(event, null);
+                        }
+                    });
+            writer.flush(false);
+            // Checkpoint id start from 1
+            committer.commit(
+                    writer.prepareCommit().stream()
+                            .map(this::correctCheckpointId)
+                            .map(MockCommitRequestImpl::new)
+                            .collect(Collectors.toList()));
+        }
+
+        List<Row> result = new ArrayList<>();
+        tEnv.sqlQuery("select * from paimon_catalog.test.`table1$snapshots`")
+                .execute()
+                .collect()
+                .forEachRemaining(result::add);
+        // 8 APPEND and 1 COMPACT
+        Assertions.assertEquals(result.size(), 9);
+        result.clear();
+
+        tEnv.sqlQuery("select * from paimon_catalog.test.`table1`")
+                .execute()
+                .collect()
+                .forEachRemaining(result::add);
+        Assertions.assertEquals(
+                Arrays.asList(
+                        Row.ofKind(RowKind.INSERT, "1", "1"),
+                        Row.ofKind(RowKind.INSERT, "2", "2"),
+                        Row.ofKind(RowKind.INSERT, "3", "3"),
+                        Row.ofKind(RowKind.INSERT, "4", "4"),
+                        Row.ofKind(RowKind.INSERT, "5", "5"),
+                        Row.ofKind(RowKind.INSERT, "6", "6"),
+                        Row.ofKind(RowKind.INSERT, "7", "7"),
+                        Row.ofKind(RowKind.INSERT, "8", "8")),
+                result);
+    }
+
+    private MultiTableCommittable correctCheckpointId(MultiTableCommittable 
committable) {
+        // update the right checkpointId for MultiTableCommittable
+        return new MultiTableCommittable(
+                committable.getDatabase(),
+                committable.getTable(),
+                checkpointId++,
+                committable.kind(),
+                committable.wrappedCommittable());
+    }
+
     private static class MockCommitRequestImpl<CommT> extends 
CommitRequestImpl<CommT> {
 
         protected MockCommitRequestImpl(CommT committable) {

Reply via email to