This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git

commit ddf557c8e1d693a31a96a2650872c55da2c7aa7b
Author: Leonard Xu <@users.noreply.github.com>
AuthorDate: Wed Apr 24 19:52:56 2024 +0800

    [tests][cdc-connector][db2] Improve the DB2 tests
    
    This closes #2870
---
 .../base/relational/JdbcSourceEventDispatcher.java |  3 +
 .../base/source/reader/external/FetchTask.java     |  1 +
 .../source/fetch/Db2SourceFetchTaskContext.java    |  5 +-
 .../flink/cdc/connectors/db2/Db2TestBase.java      | 67 ++++++++++++++++------
 .../connectors/db2/table/Db2ConnectorITCase.java   |  1 +
 .../mysql/source/MySqlSourceTestBase.java          |  4 +-
 6 files changed, 61 insertions(+), 20 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/relational/JdbcSourceEventDispatcher.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/relational/JdbcSourceEventDispatcher.java
index 7886bf8c8..deb6794ac 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/relational/JdbcSourceEventDispatcher.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/relational/JdbcSourceEventDispatcher.java
@@ -209,6 +209,9 @@ public class JdbcSourceEventDispatcher<P extends Partition> 
extends EventDispatc
 
         @Override
         public void schemaChangeEvent(SchemaChangeEvent event) throws 
InterruptedException {
+            if 
(SchemaChangeEvent.SchemaChangeEventType.DROP.equals(event.getType())) {
+                LOG.info("Received drop table event " + event + " at offset: " 
+ event.getOffset());
+            }
             historizedSchema.applySchemaChange(event);
             if (connectorConfig.isSchemaChangesHistoryEnabled()) {
                 try {
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/FetchTask.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/FetchTask.java
index beab207e4..fbf86b598 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/FetchTask.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/external/FetchTask.java
@@ -52,6 +52,7 @@ public interface FetchTask<Split> {
 
     /** Base context used in the execution of fetch task. */
     interface Context {
+
         void configure(SourceSplitBase sourceSplitBase);
 
         ChangeEventQueue<DataChangeEvent> getQueue();
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/fetch/Db2SourceFetchTaskContext.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/fetch/Db2SourceFetchTaskContext.java
index 89b8681c3..45bf132ea 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/fetch/Db2SourceFetchTaskContext.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/main/java/org/apache/flink/cdc/connectors/db2/source/fetch/Db2SourceFetchTaskContext.java
@@ -118,7 +118,10 @@ public class Db2SourceFetchTaskContext extends 
JdbcSourceFetchTaskContext {
 
         this.taskContext = new Db2TaskContext(connectorConfig, databaseSchema);
 
-        final int queueSize = 
getSourceConfig().getDbzConnectorConfig().getMaxQueueSize();
+        final int queueSize =
+                sourceSplitBase.isSnapshotSplit()
+                        ? getSourceConfig().getSplitSize()
+                        : 
getSourceConfig().getDbzConnectorConfig().getMaxQueueSize();
         this.queue =
                 new Builder<DataChangeEvent>()
                         .pollInterval(connectorConfig.getPollInterval())
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/Db2TestBase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/Db2TestBase.java
index 3abd1ec5a..76b389c04 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/Db2TestBase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/Db2TestBase.java
@@ -17,9 +17,10 @@
 
 package org.apache.flink.cdc.connectors.db2;
 
+import org.apache.flink.util.FlinkRuntimeException;
+
 import org.apache.commons.lang3.StringUtils;
 import org.awaitility.Awaitility;
-import org.awaitility.core.ConditionTimeoutException;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.slf4j.Logger;
@@ -44,6 +45,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Locale;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
@@ -137,7 +139,7 @@ public class Db2TestBase {
 
         try {
             Awaitility.await(String.format("cdc remove table %s", tableName))
-                    .atMost(60, TimeUnit.SECONDS)
+                    .atMost(30, TimeUnit.SECONDS)
                     .until(
                             () -> {
                                 try {
@@ -161,13 +163,13 @@ public class Db2TestBase {
                                     return false;
                                 }
                             });
-        } catch (ConditionTimeoutException e) {
-            throw new IllegalStateException("Failed to cdc remove test table", 
e);
+        } catch (Exception e) {
+            throw new FlinkRuntimeException("Failed to remove cdc table " + 
tableName, e);
         }
 
         try {
             Awaitility.await(String.format("Dropping table %s", tableName))
-                    .atMost(60, TimeUnit.SECONDS)
+                    .atMost(30, TimeUnit.SECONDS)
                     .until(
                             () -> {
                                 try {
@@ -184,9 +186,46 @@ public class Db2TestBase {
                                     return false;
                                 }
                             });
-        } catch (ConditionTimeoutException e) {
-            throw new IllegalStateException("Failed to drop test database", e);
+        } catch (Exception e) {
+            throw new FlinkRuntimeException("Failed to drop table", e);
+        }
+    }
+
+    private static boolean checkTableExists(Connection connection, String 
tableName) {
+        AtomicBoolean tableExists = new AtomicBoolean(false);
+        try {
+            Awaitility.await(String.format("check table %s exists or not", 
tableName))
+                    .atMost(30, TimeUnit.SECONDS)
+                    .until(
+                            () -> {
+                                try {
+                                    String tableExistSql =
+                                            String.format(
+                                                    "SELECT COUNT(*) FROM 
SYSCAT.TABLES WHERE TABNAME = '%s' AND "
+                                                            + "TABSCHEMA = 
'DB2INST1';",
+                                                    tableName);
+                                    ResultSet resultSet =
+                                            connection
+                                                    .createStatement()
+                                                    
.executeQuery(tableExistSql);
+                                    if (resultSet.next()) {
+                                        if (resultSet.getInt(1) == 1) {
+                                            tableExists.set(true);
+                                        }
+                                    }
+                                    return true;
+                                } catch (SQLException e) {
+                                    LOG.warn(
+                                            String.format(
+                                                    "check table %s exists 
failed", tableName),
+                                            e.getMessage());
+                                    return false;
+                                }
+                            });
+        } catch (Exception e) {
+            throw new FlinkRuntimeException("Failed to check table " + 
tableName + " exists", e);
         }
+        return tableExists.get();
     }
 
     /**
@@ -199,19 +238,11 @@ public class Db2TestBase {
         assertNotNull("Cannot locate " + ddlFile, ddlTestFile);
         try (Connection connection = getJdbcConnection();
                 Statement statement = connection.createStatement()) {
-            String tableExistSql =
-                    String.format(
-                            "SELECT COUNT(*) FROM SYSCAT.TABLES WHERE TABNAME 
= '%s' AND "
-                                    + "TABSCHEMA = 'DB2INST1';",
-                            tableName);
-            ResultSet resultSet = statement.executeQuery(tableExistSql);
-            int count = 0;
-            if (resultSet.next()) {
-                count = resultSet.getInt(1);
-            }
-            if (count == 1) {
+            if (checkTableExists(connection, tableName)) {
                 LOG.info("{} table exist", tableName);
                 dropTestTable(connection, tableName.toUpperCase(Locale.ROOT));
+                // sleep 10 seconds to make sure ASN replication agent has 
been notified
+                Thread.sleep(10_000);
             }
             final List<String> statements =
                     Arrays.stream(
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/table/Db2ConnectorITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/table/Db2ConnectorITCase.java
index 3305bdf9e..7bf504c1d 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/table/Db2ConnectorITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-db2-cdc/src/test/java/org/apache/flink/cdc/connectors/db2/table/Db2ConnectorITCase.java
@@ -55,6 +55,7 @@ import static 
org.testcontainers.containers.Db2Container.DB2_PORT;
 /** Integration tests for DB2 CDC source. */
 @RunWith(Parameterized.class)
 public class Db2ConnectorITCase extends Db2TestBase {
+
     private static final Logger LOG = 
LoggerFactory.getLogger(Db2ConnectorITCase.class);
 
     protected static final int DEFAULT_PARALLELISM = 2;
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java
index 8682e871a..0ca776beb 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceTestBase.java
@@ -73,7 +73,9 @@ public abstract class MySqlSourceTestBase extends TestLogger {
     @AfterClass
     public static void stopContainers() {
         LOG.info("Stopping containers...");
-        MYSQL_CONTAINER.stop();
+        if (MYSQL_CONTAINER != null) {
+            MYSQL_CONTAINER.stop();
+        }
         LOG.info("Containers are stopped.");
     }
 

Reply via email to