This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 9a5da78176 [Improve][Connector-V2] Add pre-check for table enable cdc
(#8152)
9a5da78176 is described below
commit 9a5da781767358db6afcccc08c1087de8a2dd3b4
Author: Jia Fan <[email protected]>
AuthorDate: Fri Nov 29 14:13:39 2024 +0800
[Improve][Connector-V2] Add pre-check for table enable cdc (#8152)
---
.../cdc/base/dialect/JdbcDataSourceDialect.java | 3 ++
.../cdc/postgres/source/PostgresDialect.java | 30 +++++++++++++++--
.../cdc/sqlserver/source/SqlServerDialect.java | 38 ++++++++++++++++++++--
.../seatunnel/cdc/postgres/PostgresCDCIT.java | 34 +++++++++++++++++++
.../connector/cdc/sqlserver/SqlServerCDCIT.java | 35 ++++++++++++++++++++
5 files changed, 136 insertions(+), 4 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
index 05e9a89c04..f9193c9eb7 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-base/src/main/java/org/apache/seatunnel/connectors/cdc/base/dialect/JdbcDataSourceDialect.java
@@ -49,6 +49,9 @@ public interface JdbcDataSourceDialect extends
DataSourceDialect<JdbcSourceConfi
@Override
List<TableId> discoverDataCollections(JdbcSourceConfig sourceConfig);
+ default void checkAllTablesEnabledCapture(JdbcConnection jdbcConnection,
List<TableId> tableIds)
+ throws SQLException {}
+
/**
* Creates and opens a new {@link JdbcConnection} backing connection pool.
*
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresDialect.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresDialect.java
index 94781a2e54..921cb52518 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresDialect.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-postgres/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/source/PostgresDialect.java
@@ -43,6 +43,7 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseI
import io.debezium.connector.postgresql.PostgresConnectorConfig;
import io.debezium.connector.postgresql.connection.PostgresConnection;
+import io.debezium.connector.postgresql.connection.ServerInfo;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.relational.TableId;
@@ -103,13 +104,32 @@ public class PostgresDialect implements
JdbcDataSourceDialect {
public List<TableId> discoverDataCollections(JdbcSourceConfig
sourceConfig) {
PostgresSourceConfig postgresSourceConfig = (PostgresSourceConfig)
sourceConfig;
try (JdbcConnection jdbcConnection = openJdbcConnection(sourceConfig))
{
- return TableDiscoveryUtils.listTables(
- jdbcConnection, postgresSourceConfig.getTableFilters());
+ List<TableId> tables =
+ TableDiscoveryUtils.listTables(
+ jdbcConnection,
postgresSourceConfig.getTableFilters());
+ this.checkAllTablesEnabledCapture(jdbcConnection, tables);
+ return tables;
} catch (SQLException e) {
throw new SeaTunnelException("Error to discover tables: " +
e.getMessage(), e);
}
}
+ @Override
+ public void checkAllTablesEnabledCapture(JdbcConnection jdbcConnection,
List<TableId> tableIds)
+ throws SQLException {
+ PostgresConnection postgresConnection = (PostgresConnection)
jdbcConnection;
+ for (TableId tableId : tableIds) {
+ ServerInfo.ReplicaIdentity replicaIdentity =
+ postgresConnection.readReplicaIdentityInfo(tableId);
+ if (!ServerInfo.ReplicaIdentity.FULL.equals(replicaIdentity)) {
+ throw new SeaTunnelException(
+ String.format(
+ "Table %s does not have a full replica
identity, please execute: ALTER TABLE %s REPLICA IDENTITY FULL;",
+ tableId, tableId));
+ }
+ }
+ }
+
@Override
public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc,
TableId tableId) {
if (postgresSchema == null) {
@@ -155,6 +175,12 @@ public class PostgresDialect implements
JdbcDataSourceDialect {
if (sourceSplitBase.isSnapshotSplit()) {
return new
PostgresSnapshotFetchTask(sourceSplitBase.asSnapshotSplit());
} else {
+ try (JdbcConnection jdbcConnection =
openJdbcConnection(sourceConfig)) {
+ List<TableId> tables =
sourceSplitBase.asIncrementalSplit().getTableIds();
+ this.checkAllTablesEnabledCapture(jdbcConnection, tables);
+ } catch (SQLException e) {
+ throw new SeaTunnelException("Error to check tables: " +
e.getMessage(), e);
+ }
postgresWalFetchTask = new
PostgresWalFetchTask(sourceSplitBase.asIncrementalSplit());
return postgresWalFetchTask;
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerDialect.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerDialect.java
index 816ba8eb4b..55838d16b1 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerDialect.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-sqlserver/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/sqlserver/source/SqlServerDialect.java
@@ -38,6 +38,8 @@ import
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils.SqlServerSc
import
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.utils.TableDiscoveryUtils;
import
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.DatabaseIdentifier;
+import io.debezium.connector.sqlserver.SqlServerChangeTable;
+import io.debezium.connector.sqlserver.SqlServerConnection;
import io.debezium.jdbc.JdbcConnection;
import io.debezium.relational.TableId;
import io.debezium.relational.history.TableChanges;
@@ -46,6 +48,8 @@ import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
/** The {@link JdbcDataSourceDialect} implementation for MySQL datasource. */
public class SqlServerDialect implements JdbcDataSourceDialect {
@@ -88,13 +92,37 @@ public class SqlServerDialect implements
JdbcDataSourceDialect {
public List<TableId> discoverDataCollections(JdbcSourceConfig
sourceConfig) {
SqlServerSourceConfig sqlServerSourceConfig = (SqlServerSourceConfig)
sourceConfig;
try (JdbcConnection jdbcConnection = openJdbcConnection(sourceConfig))
{
- return TableDiscoveryUtils.listTables(
- jdbcConnection, sqlServerSourceConfig.getTableFilters());
+ List<TableId> tables =
+ TableDiscoveryUtils.listTables(
+ jdbcConnection,
sqlServerSourceConfig.getTableFilters());
+ this.checkAllTablesEnabledCapture(jdbcConnection, tables);
+ return tables;
} catch (SQLException e) {
throw new SeaTunnelException("Error to discover tables: " +
e.getMessage(), e);
}
}
+ @Override
+ public void checkAllTablesEnabledCapture(JdbcConnection jdbcConnection,
List<TableId> tableIds)
+ throws SQLException {
+ Map<String, List<TableId>> databases =
+ tableIds.stream()
+ .collect(Collectors.groupingBy(TableId::catalog,
Collectors.toList()));
+ for (String database : databases.keySet()) {
+ Set<TableId> tables =
+ ((SqlServerConnection) jdbcConnection)
+ .getChangeTables(database).stream()
+
.map(SqlServerChangeTable::getSourceTableId)
+ .collect(Collectors.toSet());
+ for (TableId tableId : databases.get(database)) {
+ if (!tables.contains(tableId)) {
+ throw new SeaTunnelException(
+ "Table " + tableId + " is not enabled for
capture");
+ }
+ }
+ }
+ }
+
@Override
public TableChanges.TableChange queryTableSchema(JdbcConnection jdbc,
TableId tableId) {
if (sqlServerSchema == null) {
@@ -115,6 +143,12 @@ public class SqlServerDialect implements
JdbcDataSourceDialect {
if (sourceSplitBase.isSnapshotSplit()) {
return new
SqlServerSnapshotFetchTask(sourceSplitBase.asSnapshotSplit());
} else {
+ try (JdbcConnection jdbcConnection =
openJdbcConnection(sourceConfig)) {
+ List<TableId> tables =
sourceSplitBase.asIncrementalSplit().getTableIds();
+ this.checkAllTablesEnabledCapture(jdbcConnection, tables);
+ } catch (SQLException e) {
+ throw new SeaTunnelException("Error to check tables: " +
e.getMessage(), e);
+ }
return new
SqlServerTransactionLogFetchTask(sourceSplitBase.asIncrementalSplit());
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
index acb9a2a41c..80961b61a3 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-postgres-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/postgres/PostgresCDCIT.java
@@ -17,6 +17,10 @@
package org.apache.seatunnel.connectors.seatunnel.cdc.postgres;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfigFactory;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.postgres.config.PostgresSourceConfigFactory;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.postgres.source.PostgresDialect;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
@@ -29,6 +33,7 @@ import org.apache.seatunnel.e2e.common.util.JobIdGenerator;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,6 +44,8 @@ import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerImageName;
import com.google.common.collect.Lists;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.TableId;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
@@ -53,6 +60,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
@@ -594,6 +602,32 @@ public class PostgresCDCIT extends TestSuiteBase
implements TestResource {
}
}
+ @Test
+ public void testDialectCheckDisabledCDCTable() throws SQLException {
+ JdbcSourceConfigFactory factory =
+ new PostgresSourceConfigFactory()
+ .hostname(POSTGRES_CONTAINER.getHost())
+ .port(5432)
+ .username("postgres")
+ .password("postgres")
+ .databaseList(POSTGRESQL_DATABASE);
+ PostgresDialect dialect =
+ new PostgresDialect((PostgresSourceConfigFactory) factory,
Collections.emptyList());
+ try (JdbcConnection connection =
dialect.openJdbcConnection(factory.create(0))) {
+ SeaTunnelException exception =
+ Assertions.assertThrows(
+ SeaTunnelException.class,
+ () ->
+ dialect.checkAllTablesEnabledCapture(
+ connection,
+ Collections.singletonList(
+
TableId.parse(SINK_TABLE_1))));
+ Assertions.assertEquals(
+ "Table sink_postgres_cdc_table_1 does not have a full
replica identity, please execute: ALTER TABLE sink_postgres_cdc_table_1 REPLICA
IDENTITY FULL;",
+ exception.getMessage());
+ }
+ }
+
private Connection getJdbcConnection() throws SQLException {
return DriverManager.getConnection(
POSTGRES_CONTAINER.getJdbcUrl(),
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
index 1b699d5805..94867d2014 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-sqlserver-e2e/src/test/java/org/apache/seatunnel/e2e/connector/cdc/sqlserver/SqlServerCDCIT.java
@@ -17,6 +17,10 @@
package org.apache.seatunnel.e2e.connector.cdc.sqlserver;
+import org.apache.seatunnel.common.utils.SeaTunnelException;
+import org.apache.seatunnel.connectors.cdc.base.config.JdbcSourceConfigFactory;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.config.SqlServerSourceConfigFactory;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.sqlserver.source.SqlServerDialect;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
@@ -31,6 +35,7 @@ import org.awaitility.core.ConditionTimeoutException;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.MSSQLServerContainer;
@@ -39,6 +44,8 @@ import org.testcontainers.lifecycle.Startables;
import org.testcontainers.utility.DockerLoggerFactory;
import com.google.common.collect.Lists;
+import io.debezium.jdbc.JdbcConnection;
+import io.debezium.relational.TableId;
import lombok.extern.slf4j.Slf4j;
import java.io.IOException;
@@ -52,6 +59,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
@@ -346,6 +354,33 @@ public class SqlServerCDCIT extends TestSuiteBase
implements TestResource {
}
}
+ @Test
+ public void testDialectCheckDisabledCDCTable() throws SQLException {
+ initializeSqlServerTable("column_type_test");
+ JdbcSourceConfigFactory factory =
+ new SqlServerSourceConfigFactory()
+ .hostname(MSSQL_SERVER_CONTAINER.getHost())
+ .port(PORT)
+ .username("sa")
+ .password("Password!")
+ .databaseList("column_type_test");
+ SqlServerDialect dialect =
+ new SqlServerDialect(
+ (SqlServerSourceConfigFactory) factory,
Collections.emptyList());
+ try (JdbcConnection connection =
dialect.openJdbcConnection(factory.create(0))) {
+ SeaTunnelException exception =
+ Assertions.assertThrows(
+ SeaTunnelException.class,
+ () ->
+ dialect.checkAllTablesEnabledCapture(
+ connection,
+
Collections.singletonList(TableId.parse(SINK_TABLE))));
+ Assertions.assertEquals(
+ "Table column_type_test.dbo.full_types_sink is not enabled
for capture",
+ exception.getMessage());
+ }
+ }
+
/**
* Executes a JDBC statement using the default jdbc config without
autocommitting the
* connection.