This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2acb511e9 [cdc] Database sync initializes the set of existed tables
(#3621)
2acb511e9 is described below
commit 2acb511e9f888cf343ce8f2c367dd4ec8355091a
Author: yuzelin <[email protected]>
AuthorDate: Thu Jul 4 11:08:21 2024 +0800
[cdc] Database sync initializes the set of existed tables (#3621)
---
.../paimon/flink/action/cdc/SyncDatabaseActionBase.java | 16 ++++++++++++++--
.../sink/cdc/RichCdcMultiplexRecordEventParser.java | 9 ++++++---
2 files changed, 20 insertions(+), 5 deletions(-)
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
index f77bec044..bddebd229 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
@@ -19,6 +19,7 @@
package org.apache.paimon.flink.action.cdc;
import org.apache.paimon.catalog.AbstractCatalog;
+import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.flink.action.Action;
import org.apache.paimon.flink.action.MultiTablesSinkMode;
import org.apache.paimon.flink.sink.cdc.EventParser;
@@ -36,8 +37,10 @@ import javax.annotation.Nullable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.regex.Pattern;
import static org.apache.paimon.flink.action.MultiTablesSinkMode.COMBINED;
@@ -138,10 +141,19 @@ public abstract class SyncDatabaseActionBase extends
SynchronizationActionBase {
excludingTables == null ? null :
Pattern.compile(excludingTables);
TableNameConverter tableNameConverter =
new TableNameConverter(caseSensitive, mergeShards,
tablePrefix, tableSuffix);
-
+ Set<String> createdTables;
+ try {
+ createdTables = new HashSet<>(catalog.listTables(database));
+ } catch (Catalog.DatabaseNotExistException e) {
+ throw new RuntimeException(e);
+ }
return () ->
new RichCdcMultiplexRecordEventParser(
- schemaBuilder, includingPattern, excludingPattern,
tableNameConverter);
+ schemaBuilder,
+ includingPattern,
+ excludingPattern,
+ tableNameConverter,
+ createdTables);
}
@Override
diff --git
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
index d0298e2b9..939410bf4 100644
---
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
+++
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/sink/cdc/RichCdcMultiplexRecordEventParser.java
@@ -49,10 +49,11 @@ public class RichCdcMultiplexRecordEventParser implements
EventParser<RichCdcMul
@Nullable private final Pattern includingPattern;
@Nullable private final Pattern excludingPattern;
private final TableNameConverter tableNameConverter;
+ private final Set<String> createdTables;
+
private final Map<String, RichEventParser> parsers = new HashMap<>();
private final Set<String> includedTables = new HashSet<>();
private final Set<String> excludedTables = new HashSet<>();
- private final Set<String> createdTables = new HashSet<>();
private RichCdcMultiplexRecord record;
private String currentTable;
@@ -60,18 +61,20 @@ public class RichCdcMultiplexRecordEventParser implements
EventParser<RichCdcMul
private RichEventParser currentParser;
public RichCdcMultiplexRecordEventParser(boolean caseSensitive) {
- this(null, null, null, new TableNameConverter(caseSensitive));
+ this(null, null, null, new TableNameConverter(caseSensitive), new
HashSet<>());
}
public RichCdcMultiplexRecordEventParser(
@Nullable NewTableSchemaBuilder schemaBuilder,
@Nullable Pattern includingPattern,
@Nullable Pattern excludingPattern,
- TableNameConverter tableNameConverter) {
+ TableNameConverter tableNameConverter,
+ Set<String> createdTables) {
this.schemaBuilder = schemaBuilder;
this.includingPattern = includingPattern;
this.excludingPattern = excludingPattern;
this.tableNameConverter = tableNameConverter;
+ this.createdTables = createdTables;
}
@Override