This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2acb511e9 [cdc] Database sync initializes the set of existed tables 
(#3621)
2acb511e9 is described below

commit 2acb511e9f888cf343ce8f2c367dd4ec8355091a
Author: yuzelin <[email protected]>
AuthorDate: Thu Jul 4 11:08:21 2024 +0800

    [cdc] Database sync initializes the set of existed tables (#3621)
---
 .../paimon/flink/action/cdc/SyncDatabaseActionBase.java  | 16 ++++++++++++++--
 .../sink/cdc/RichCdcMultiplexRecordEventParser.java      |  9 ++++++---
 2 files changed, 20 insertions(+), 5 deletions(-)

diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
index f77bec044..bddebd229 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
@@ -19,6 +19,7 @@
 package org.apache.paimon.flink.action.cdc;
 
 import org.apache.paimon.catalog.AbstractCatalog;
+import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.MultiTablesSinkMode;
 import org.apache.paimon.flink.sink.cdc.EventParser;
@@ -36,8 +37,10 @@ import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.regex.Pattern;
 
 import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
@@ -138,10 +141,19 @@ public abstract class SyncDatabaseActionBase extends 
SynchronizationActionBase {
                 excludingTables == null ? null : 
Pattern.compile(excludingTables);
         TableNameConverter tableNameConverter =
                 new TableNameConverter(caseSensitive, mergeShards, 
tablePrefix, tableSuffix);
-
+        Set<String> createdTables;
+        try {
+            createdTables = new HashSet<>(catalog.listTables(database));
+        } catch (Catalog.DatabaseNotExistException e) {
+            throw new RuntimeException(e);
+        }
         return () ->
                 new RichCdcMultiplexRecordEventParser(
-                        schemaBuilder, includingPattern, excludingPattern, 
tableNameConverter);
+                        schemaBuilder,
+                        includingPattern,
+                        excludingPattern,
+                        tableNameConverter,
+                        createdTables);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
index d0298e2b9..939410bf4 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
@@ -49,10 +49,11 @@ public class RichCdcMultiplexRecordEventParser implements 
EventParser<RichCdcMul
     @Nullable private final Pattern includingPattern;
     @Nullable private final Pattern excludingPattern;
     private final TableNameConverter tableNameConverter;
+    private final Set<String> createdTables;
+
     private final Map<String, RichEventParser> parsers = new HashMap<>();
     private final Set<String> includedTables = new HashSet<>();
     private final Set<String> excludedTables = new HashSet<>();
-    private final Set<String> createdTables = new HashSet<>();
 
     private RichCdcMultiplexRecord record;
     private String currentTable;
@@ -60,18 +61,20 @@ public class RichCdcMultiplexRecordEventParser implements 
EventParser<RichCdcMul
     private RichEventParser currentParser;
 
     public RichCdcMultiplexRecordEventParser(boolean caseSensitive) {
-        this(null, null, null, new TableNameConverter(caseSensitive));
+        this(null, null, null, new TableNameConverter(caseSensitive), new 
HashSet<>());
     }
 
     public RichCdcMultiplexRecordEventParser(
             @Nullable NewTableSchemaBuilder schemaBuilder,
             @Nullable Pattern includingPattern,
             @Nullable Pattern excludingPattern,
-            TableNameConverter tableNameConverter) {
+            TableNameConverter tableNameConverter,
+            Set<String> createdTables) {
         this.schemaBuilder = schemaBuilder;
         this.includingPattern = includingPattern;
         this.excludingPattern = excludingPattern;
         this.tableNameConverter = tableNameConverter;
+        this.createdTables = createdTables;
     }
 
     @Override

Reply via email to