This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
commit ddf557c8e1d693a31a96a2650872c55da2c7aa7b Author: Leonard Xu <@users.noreply.github.com> AuthorDate: Wed Apr 24 19:52:56 2024 +0800 [tests][cdc-connector][db2] Improve the DB2 tests This closes #2870 --- .../base/relational/JdbcSourceEventDispatcher.java | 3 + .../base/source/reader/external/FetchTask.java | 1 + .../source/fetch/Db2SourceFetchTaskContext.java | 5 +- .../flink/cdc/connectors/db2/Db2TestBase.java | 67 ++++++++++++++++------ .../connectors/db2/table/Db2ConnectorITCase.java | 1 + .../mysql/source/MySqlSourceTestBase.java | 4 +- 6 files changed, 61 insertions(+), 20 deletions(-) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/relational/JdbcSourceEventDispatcher.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/relational/JdbcSourceEventDispatcher.java index 7886bf8c8..deb6794ac 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/relational/JdbcSourceEventDispatcher.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/relational/JdbcSourceEventDispatcher.java @@ -209,6 +209,9 @@ public class JdbcSourceEventDispatcher<P extends Partition> extends EventDispatc @Override public void schemaChangeEvent(SchemaChangeEvent event) throws InterruptedException { + if (SchemaChangeEvent.SchemaChangeEventType.DROP.equals(event.getType())) { + LOG.info("Received drop table event " + event + " at offset: " + event.getOffset()); + } historizedSchema.applySchemaChange(event); if (connectorConfig.isSchemaChangesHistoryEnabled()) { try { diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/FetchTask.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/FetchTask.java index beab207e4..fbf86b598 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/FetchTask.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/FetchTask.java @@ -52,6 +52,7 @@ public interface FetchTask<Split> { /** Base context used in the execution of fetch task. */ interface Context { + void configure(SourceSplitBase sourceSplitBase); ChangeEventQueue<DataChangeEvent> getQueue(); diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/fetch/Db2SourceFetchTaskContext.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/fetch/Db2SourceFetchTaskContext.java index 89b8681c3..45bf132ea 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/fetch/Db2SourceFetchTaskContext.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/fetch/Db2SourceFetchTaskContext.java @@ -118,7 +118,10 @@ public class Db2SourceFetchTaskContext extends JdbcSourceFetchTaskContext { this.taskContext = new Db2TaskContext(connectorConfig, databaseSchema); - final int queueSize = getSourceConfig().getDbzConnectorConfig().getMaxQueueSize(); + final int queueSize = + sourceSplitBase.isSnapshotSplit() + ? getSourceConfig().getSplitSize() + : getSourceConfig().getDbzConnectorConfig().getMaxQueueSize(); this.queue = new Builder<DataChangeEvent>() .pollInterval(connectorConfig.getPollInterval()) diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/Db2TestBase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/Db2TestBase.java index 3abd1ec5a..76b389c04 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/Db2TestBase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/Db2TestBase.java @@ -17,9 +17,10 @@ package org.apache.flink.cdc.connectors.db2; +import org.apache.flink.util.FlinkRuntimeException; + import org.apache.commons.lang3.StringUtils; import org.awaitility.Awaitility; -import org.awaitility.core.ConditionTimeoutException; import org.junit.AfterClass; import org.junit.BeforeClass; import org.slf4j.Logger; @@ -44,6 +45,7 @@ import java.util.Arrays; import java.util.List; import java.util.Locale; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -137,7 +139,7 @@ public class Db2TestBase { try { Awaitility.await(String.format("cdc remove table %s", tableName)) - .atMost(60, TimeUnit.SECONDS) + .atMost(30, TimeUnit.SECONDS) .until( () -> { try { @@ -161,13 +163,13 @@ public class Db2TestBase { return false; } }); - } catch (ConditionTimeoutException e) { - throw new IllegalStateException("Failed to cdc remove test table", e); + } catch (Exception e) { + throw new FlinkRuntimeException("Failed to remove cdc table " + tableName, e); } try { Awaitility.await(String.format("Dropping table %s", tableName)) - .atMost(60, TimeUnit.SECONDS) + .atMost(30, TimeUnit.SECONDS) .until( () -> { try { @@ -184,9 +186,46 @@ public class Db2TestBase { return false; } }); - } catch (ConditionTimeoutException e) { - throw new IllegalStateException("Failed to drop test database", e); + } catch (Exception e) { + throw new FlinkRuntimeException("Failed to drop table", e); + } + } + + private static boolean checkTableExists(Connection connection, String tableName) { + AtomicBoolean tableExists = new AtomicBoolean(false); + try { + Awaitility.await(String.format("check table %s exists or not", tableName)) + .atMost(30, TimeUnit.SECONDS) + .until( + () -> { + try { + String tableExistSql = + String.format( + "SELECT COUNT(*) FROM SYSCAT.TABLES WHERE TABNAME = '%s' AND " + + "TABSCHEMA = 'DB2INST1';", + tableName); + ResultSet resultSet = + connection + .createStatement() + .executeQuery(tableExistSql); + if (resultSet.next()) { + if (resultSet.getInt(1) == 1) { + tableExists.set(true); + } + } + return true; + } catch (SQLException e) { + LOG.warn( + String.format( + "check table %s exists failed", tableName), + e.getMessage()); + return false; + } + }); + } catch (Exception e) { + throw new FlinkRuntimeException("Failed to check table " + tableName + " exists", e); } + return tableExists.get(); } /** @@ -199,19 +238,11 @@ public class Db2TestBase { assertNotNull("Cannot locate " + ddlFile, ddlTestFile); try (Connection connection = getJdbcConnection(); Statement statement = connection.createStatement()) { - String tableExistSql = - String.format( - "SELECT COUNT(*) FROM SYSCAT.TABLES WHERE TABNAME = '%s' AND " - + "TABSCHEMA = 'DB2INST1';", - tableName); - ResultSet resultSet = statement.executeQuery(tableExistSql); - int count = 0; - if (resultSet.next()) { - count = resultSet.getInt(1); - } - if (count == 1) { + if (checkTableExists(connection, tableName)) { LOG.info("{} table exist", tableName); dropTestTable(connection, tableName.toUpperCase(Locale.ROOT)); + // sleep 10 seconds to make sure ASN replication agent has been notified + Thread.sleep(10_000); } final List<String> statements = Arrays.stream( diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/table/Db2ConnectorITCase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/table/Db2ConnectorITCase.java index 3305bdf9e..7bf504c1d 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/table/Db2ConnectorITCase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/table/Db2ConnectorITCase.java @@ -55,6 +55,7 @@ import static org.testcontainers.containers.Db2Container.DB2_PORT; /** Integration tests for DB2 CDC source. */ @RunWith(Parameterized.class) public class Db2ConnectorITCase extends Db2TestBase { + private static final Logger LOG = LoggerFactory.getLogger(Db2ConnectorITCase.class); protected static final int DEFAULT_PARALLELISM = 2; diff --git a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java index 8682e871a..0ca776beb 100644 --- a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java +++ b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java @@ -73,7 +73,9 @@ public abstract class MySqlSourceTestBase extends TestLogger { @AfterClass public static void stopContainers() { LOG.info("Stopping containers..."); - MYSQL_CONTAINER.stop(); + if (MYSQL_CONTAINER != null) { + MYSQL_CONTAINER.stop(); + } LOG.info("Containers are stopped."); }