This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a8c6cc6e0c [Improve] Improve read table schema in cdc connector (#6702)
a8c6cc6e0c is described below

commit a8c6cc6e0cde5c813b697b695c7070700ff8591a
Author: Jia Fan <[email protected]>
AuthorDate: Mon Apr 15 20:18:10 2024 +0800

    [Improve] Improve read table schema in cdc connector (#6702)
---
 .../seatunnel/cdc/oracle/utils/OracleSchema.java   | 23 ++++++++-------
 .../cdc/postgres/utils/PostgresSchema.java         | 33 +++++++++++-----------
 .../sqlserver/source/utils/SqlServerSchema.java    | 24 ++++++++--------
 3 files changed, 43 insertions(+), 37 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleSchema.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleSchema.java
index 6524192845..f2713e3481 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleSchema.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-oracle/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/oracle/utils/OracleSchema.java
@@ -57,14 +57,12 @@ public class OracleSchema {
         TableChange schema = schemasByTableId.get(tableId);
         if (schema == null) {
             schema = readTableSchema(jdbc, tableId);
-            schemasByTableId.put(tableId, schema);
         }
         return schema;
     }
 
     private TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) {
         OracleConnection oracleConnection = (OracleConnection) jdbc;
-        final Map<TableId, TableChange> tableChangeMap = new HashMap<>();
         Tables tables = new Tables();
 
         try {
@@ -75,22 +73,27 @@ public class OracleSchema {
                     connectorConfig.getTableFilters().dataCollectionFilter(),
                     null,
                     false);
-
-            Table table =
-                    CatalogTableUtils.mergeCatalogTableConfig(
-                            tables.forTable(tableId), tableMap.get(tableId));
-            TableChange tableChange = new 
TableChange(TableChanges.TableChangeType.CREATE, table);
-            tableChangeMap.put(tableId, tableChange);
+            for (TableId id : tables.tableIds()) {
+                if (tableMap.containsKey(id)) {
+                    Table table =
+                            CatalogTableUtils.mergeCatalogTableConfig(
+                                    tables.forTable(id), tableMap.get(id));
+                    TableChanges.TableChange tableChange =
+                            new TableChanges.TableChange(
+                                    TableChanges.TableChangeType.CREATE, 
table);
+                    schemasByTableId.put(id, tableChange);
+                }
+            }
         } catch (SQLException e) {
             throw new SeaTunnelException(
                     String.format("Failed to read schema for table %s ", 
tableId), e);
         }
 
-        if (!tableChangeMap.containsKey(tableId)) {
+        if (!schemasByTableId.containsKey(tableId)) {
             throw new SeaTunnelException(
                     String.format("Can't obtain schema for table %s ", 
tableId));
         }
 
-        return tableChangeMap.get(tableId);
+        return schemasByTableId.get(tableId);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresSchema.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresSchema.java
index 7a9048b405..8470f7d95a 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresSchema.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/utils/PostgresSchema.java
@@ -30,7 +30,6 @@ import io.debezium.relational.Tables;
 import io.debezium.relational.history.TableChanges;
 
 import java.sql.SQLException;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -52,44 +51,46 @@ public class PostgresSchema {
         TableChanges.TableChange schema = schemasByTableId.get(tableId);
         if (schema == null) {
             schema = readTableSchema(jdbc, tableId);
-            schemasByTableId.put(tableId, schema);
         }
         return schema;
     }
 
     private TableChanges.TableChange readTableSchema(JdbcConnection jdbc, 
TableId tableId) {
-
-        CatalogTable catalogTable = tableMap.get(tableId);
         // Because the catalog is null in the postgresConnection.readSchema 
method
-        tableId = new TableId(null, tableId.schema(), tableId.table());
+        TableId tableIdWithoutCatalog = new TableId(null, tableId.schema(), 
tableId.table());
 
         PostgresConnection postgresConnection = (PostgresConnection) jdbc;
-        final Map<TableId, TableChanges.TableChange> tableChangeMap = new 
HashMap<>();
         Tables tables = new Tables();
         try {
             postgresConnection.readSchema(
                     tables,
-                    tableId.catalog(),
-                    tableId.schema(),
+                    tableIdWithoutCatalog.catalog(),
+                    tableIdWithoutCatalog.schema(),
                     connectorConfig.getTableFilters().dataCollectionFilter(),
                     null,
                     false);
-            Table table =
-                    CatalogTableUtils.mergeCatalogTableConfig(
-                            tables.forTable(tableId), catalogTable);
-            TableChanges.TableChange tableChange =
-                    new 
TableChanges.TableChange(TableChanges.TableChangeType.CREATE, table);
-            tableChangeMap.put(tableId, tableChange);
+            for (TableId id : tables.tableIds()) {
+                TableId idWithCatalog = new TableId(tableId.catalog(), 
id.schema(), id.table());
+                if (tableMap.containsKey(idWithCatalog)) {
+                    Table table =
+                            CatalogTableUtils.mergeCatalogTableConfig(
+                                    tables.forTable(id), 
tableMap.get(idWithCatalog));
+                    TableChanges.TableChange tableChange =
+                            new TableChanges.TableChange(
+                                    TableChanges.TableChangeType.CREATE, 
table);
+                    schemasByTableId.put(idWithCatalog, tableChange);
+                }
+            }
         } catch (SQLException e) {
             throw new SeaTunnelException(
                     String.format("Failed to read schema for table %s ", 
tableId), e);
         }
 
-        if (!tableChangeMap.containsKey(tableId)) {
+        if (!schemasByTableId.containsKey(tableId)) {
             throw new SeaTunnelException(
                     String.format("Can't obtain schema for table %s ", 
tableId));
         }
 
-        return tableChangeMap.get(tableId);
+        return schemasByTableId.get(tableId);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerSchema.java
 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerSchema.java
index 79f58e3e2d..77b2594f05 100644
--- 
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerSchema.java
+++ 
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/utils/SqlServerSchema.java
@@ -31,7 +31,6 @@ import io.debezium.relational.history.TableChanges;
 import io.debezium.relational.history.TableChanges.TableChange;
 
 import java.sql.SQLException;
-import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -54,15 +53,12 @@ public class SqlServerSchema {
         TableChange schema = schemasByTableId.get(tableId);
         if (schema == null) {
             schema = readTableSchema(jdbc, tableId);
-            schemasByTableId.put(tableId, schema);
         }
         return schema;
     }
 
     private TableChange readTableSchema(JdbcConnection jdbc, TableId tableId) {
         SqlServerConnection sqlServerConnection = (SqlServerConnection) jdbc;
-
-        final Map<TableId, TableChange> tableChangeMap = new HashMap<>();
         Tables tables = new Tables();
         try {
             sqlServerConnection.readSchema(
@@ -72,21 +68,27 @@ public class SqlServerSchema {
                     connectorConfig.getTableFilters().dataCollectionFilter(),
                     null,
                     false);
-            Table table =
-                    CatalogTableUtils.mergeCatalogTableConfig(
-                            tables.forTable(tableId), tableMap.get(tableId));
-            TableChange tableChange = new 
TableChange(TableChanges.TableChangeType.CREATE, table);
-            tableChangeMap.put(tableId, tableChange);
+            for (TableId id : tables.tableIds()) {
+                if (tableMap.containsKey(id)) {
+                    Table table =
+                            CatalogTableUtils.mergeCatalogTableConfig(
+                                    tables.forTable(id), tableMap.get(id));
+                    TableChanges.TableChange tableChange =
+                            new TableChanges.TableChange(
+                                    TableChanges.TableChangeType.CREATE, 
table);
+                    schemasByTableId.put(id, tableChange);
+                }
+            }
         } catch (SQLException e) {
             throw new SeaTunnelException(
                     String.format("Failed to read schema for table %s ", 
tableId), e);
         }
 
-        if (!tableChangeMap.containsKey(tableId)) {
+        if (!schemasByTableId.containsKey(tableId)) {
             throw new SeaTunnelException(
                     String.format("Can't obtain schema for table %s ", 
tableId));
         }
 
-        return tableChangeMap.get(tableId);
+        return schemasByTableId.get(tableId);
     }
 }

Reply via email to