This is an automated email from the ASF dual-hosted git repository.

lvyanquan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new fe79b4df7 [FLINK-38239][oceanbase] Re-enable OceanBase CDC integration 
tests in CI with Docker image (#4111)
fe79b4df7 is described below

commit fe79b4df75b9725e499bee41db0bc5d051b19844
Author: yuanoOo <[email protected]>
AuthorDate: Wed Apr 29 11:57:37 2026 +0800

    [FLINK-38239][oceanbase] Re-enable OceanBase CDC integration tests in CI 
with Docker image (#4111)
---
 .../docs/connectors/flink-sources/overview.md      |   2 +-
 .../docs/connectors/flink-sources/overview.md      |   2 +-
 .../oceanbase/OceanBaseCharsetITCase.java          |   5 +-
 .../oceanbase/OceanBaseFailoverITCase.java         | 139 +++++++-----
 .../oceanbase/OceanBaseSourceITCase.java           |   9 +-
 .../oceanbase/OceanBaseSourceTestBase.java         |  45 +++-
 .../flink-cdc-source-e2e-tests/pom.xml             |  10 +
 .../cdc/connectors/tests/OceanBaseE2eITCase.java   | 249 +++++++++++++++++++++
 .../src/test/resources/ddl/oceanbase_inventory.sql |  51 +++++
 9 files changed, 440 insertions(+), 72 deletions(-)

diff --git a/docs/content.zh/docs/connectors/flink-sources/overview.md 
b/docs/content.zh/docs/connectors/flink-sources/overview.md
index b042a8a2e..97abe6af1 100644
--- a/docs/content.zh/docs/connectors/flink-sources/overview.md
+++ b/docs/content.zh/docs/connectors/flink-sources/overview.md
@@ -86,7 +86,7 @@ The following table shows the current features of the 
connector:
 | [oracle-cdc]({{< ref "docs/connectors/flink-sources/oracle-cdc" >}})       | 
✅            | ✅             | ✅ | ✅                         |
 | [postgres-cdc]({{< ref "docs/connectors/flink-sources/postgres-cdc" >}})   | 
✅            | ✅             | ✅ | ✅                         |
 | [sqlserver-cdc]({{< ref "docs/connectors/flink-sources/sqlserver-cdc" >}}) | 
✅            | ✅             | ✅ | ✅                         |
-| [oceanbase-cdc]({{< ref "docs/connectors/flink-sources/oceanbase-cdc" >}}) | 
❌            | ❌             | ❌ | ❌                         |
+| [oceanbase-cdc]({{< ref "docs/connectors/flink-sources/oceanbase-cdc" >}}) | 
✅            | ✅             | ✅ | ✅                         |
 | [tidb-cdc]({{< ref "docs/connectors/flink-sources/tidb-cdc" >}})           | 
✅            | ❌             | ❌ | ❌                         |
 | [db2-cdc]({{< ref "docs/connectors/flink-sources/db2-cdc" >}})             | 
✅            | ✅             | ✅ | ✅                         |
 | [vitess-cdc]({{< ref "docs/connectors/flink-sources/vitess-cdc" >}})       | 
✅            | ❌             | ❌ | ❌                         |
diff --git a/docs/content/docs/connectors/flink-sources/overview.md 
b/docs/content/docs/connectors/flink-sources/overview.md
index 3d99e37d4..7dd38d401 100644
--- a/docs/content/docs/connectors/flink-sources/overview.md
+++ b/docs/content/docs/connectors/flink-sources/overview.md
@@ -86,7 +86,7 @@ The following table shows the current features of the 
connector:
 | [oracle-cdc]({{< ref "docs/connectors/flink-sources/oracle-cdc" >}})       | 
✅            | ✅             | ✅ | ✅                         |
 | [postgres-cdc]({{< ref "docs/connectors/flink-sources/postgres-cdc" >}})   | 
✅            | ✅             | ✅ | ✅                         |
 | [sqlserver-cdc]({{< ref "docs/connectors/flink-sources/sqlserver-cdc" >}}) | 
✅            | ✅             | ✅ | ✅                         |
-| [oceanbase-cdc]({{< ref "docs/connectors/flink-sources/oceanbase-cdc" >}}) | 
❌            | ❌             | ❌ | ❌                         |
+| [oceanbase-cdc]({{< ref "docs/connectors/flink-sources/oceanbase-cdc" >}}) | 
✅            | ✅             | ✅ | ✅                         |
 | [tidb-cdc]({{< ref "docs/connectors/flink-sources/tidb-cdc" >}})           | 
✅            | ❌             | ❌ | ❌                         |
 | [db2-cdc]({{< ref "docs/connectors/flink-sources/db2-cdc" >}})             | 
✅            | ✅             | ✅ | ✅                         |
 | [vitess-cdc]({{< ref "docs/connectors/flink-sources/vitess-cdc" >}})       | 
✅            | ❌             | ❌ | ❌                         |
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseCharsetITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseCharsetITCase.java
index 5e1e97989..9474a225f 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseCharsetITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseCharsetITCase.java
@@ -29,7 +29,6 @@ import org.apache.flink.util.StringUtils;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
@@ -40,8 +39,6 @@ import java.util.Arrays;
 import java.util.stream.Stream;
 
 /** Test supporting different column charsets for OceanBase. */
-@Disabled(
-        "Temporarily disabled for GitHub CI due to unavailability of OceanBase 
Binlog Service docker image. These tests are currently only supported for local 
execution.")
 public class OceanBaseCharsetITCase extends OceanBaseSourceTestBase {
 
     private static final String DDL_FILE = "charset_test";
@@ -183,7 +180,7 @@ public class OceanBaseCharsetITCase extends 
OceanBaseSourceTestBase {
                                 + ")",
                         testName,
                         getHost(),
-                        PORT,
+                        getPort(),
                         USER_NAME,
                         PASSWORD,
                         DATABASE_NAME,
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java
index 402ff7542..1887b0a85 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java
@@ -21,7 +21,6 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.cdc.connectors.utils.ExternalResourceProxy;
 import org.apache.flink.runtime.minicluster.RpcServiceSharing;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.RestartStrategyUtils;
 import org.apache.flink.table.api.TableResult;
@@ -35,7 +34,6 @@ import io.debezium.jdbc.JdbcConnection;
 import org.assertj.core.api.Assertions;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.extension.RegisterExtension;
 import org.junit.jupiter.params.ParameterizedTest;
@@ -45,28 +43,29 @@ import org.junit.jupiter.params.provider.MethodSource;
 import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.FutureTask;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
 import static java.lang.String.format;
 import static org.apache.flink.api.common.JobStatus.RUNNING;
 
 /** failover IT tests for oceanbase. */
-@Disabled(
-        "Temporarily disabled for GitHub CI due to unavailability of OceanBase 
Binlog Service docker image. These tests are currently only supported for local 
execution.")
 @Timeout(value = 180, unit = TimeUnit.SECONDS)
 public class OceanBaseFailoverITCase extends OceanBaseSourceTestBase {
 
     private static final String DEFAULT_SCAN_STARTUP_MODE = "initial";
     private static final String DDL_FILE = "oceanbase_ddl_test";
-    private static final String DEFAULT_TEST_DATABASE = "customer_" + 
getRandomSuffix();
     protected static final int DEFAULT_PARALLELISM = 4;
+    private String testDatabase = "customer_" + getRandomSuffix();
 
     private final List<String> firstPartBinlogEvents =
             Arrays.asList(
@@ -87,9 +86,9 @@ public class OceanBaseFailoverITCase extends 
OceanBaseSourceTestBase {
 
     public static Stream<Arguments> parameters() {
         return Stream.of(
-                Arguments.of("customers", null),
-                Arguments.of("customers", "id"),
-                Arguments.of("customers_no_pk", "id"));
+                Arguments.of("customers", null, "false"),
+                Arguments.of("customers", "id", "true"),
+                Arguments.of("customers_no_pk", "id", "true"));
     }
 
     @RegisterExtension
@@ -105,47 +104,53 @@ public class OceanBaseFailoverITCase extends 
OceanBaseSourceTestBase {
 
     @BeforeEach
     public void setup() throws InterruptedException {
+        testDatabase = "customer_" + getRandomSuffix();
         initializeOceanBaseTables(
                 DDL_FILE,
-                DEFAULT_TEST_DATABASE,
+                testDatabase,
                 s -> !StringUtils.isNullOrWhitespaceOnly(s) && 
(s.contains("customers")));
     }
 
     @AfterEach
     public void clean() {
-        dropDatabase(DEFAULT_TEST_DATABASE);
+        dropDatabase(testDatabase);
     }
 
     // Failover tests
     @ParameterizedTest
     @MethodSource("parameters")
-    @Timeout(value = 120, unit = TimeUnit.SECONDS)
-    public void testTaskManagerFailoverInSnapshotPhase(String tableName, 
String chunkColumnName)
-            throws Exception {
+    public void testTaskManagerFailoverInSnapshotPhase(
+            String tableName, String chunkColumnName, String 
assignEndingFirst) throws Exception {
         testMySqlParallelSource(
                 FailoverType.TM,
                 FailoverPhase.SNAPSHOT,
                 new String[] {tableName, "customers_1"},
                 tableName,
-                chunkColumnName);
+                chunkColumnName,
+                Collections.singletonMap(
+                        
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+                        assignEndingFirst));
     }
 
     @ParameterizedTest
     @MethodSource("parameters")
-    public void testTaskManagerFailoverInBinlogPhase(String tableName, String 
chunkColumnName)
-            throws Exception {
+    public void testTaskManagerFailoverInBinlogPhase(
+            String tableName, String chunkColumnName, String 
assignEndingFirst) throws Exception {
         testMySqlParallelSource(
                 FailoverType.TM,
                 FailoverPhase.BINLOG,
                 new String[] {tableName, "customers_1"},
                 tableName,
-                chunkColumnName);
+                chunkColumnName,
+                Collections.singletonMap(
+                        
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+                        assignEndingFirst));
     }
 
     @ParameterizedTest
     @MethodSource("parameters")
-    public void testTaskManagerFailoverFromLatestOffset(String tableName, 
String chunkColumnName)
-            throws Exception {
+    public void testTaskManagerFailoverFromLatestOffset(
+            String tableName, String chunkColumnName, String 
assignEndingFirst) throws Exception {
         testMySqlParallelSource(
                 DEFAULT_PARALLELISM,
                 "latest-offset",
@@ -155,37 +160,46 @@ public class OceanBaseFailoverITCase extends 
OceanBaseSourceTestBase {
                 1,
                 0,
                 tableName,
-                chunkColumnName);
+                chunkColumnName,
+                Collections.singletonMap(
+                        
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+                        assignEndingFirst));
     }
 
     @ParameterizedTest
     @MethodSource("parameters")
-    public void testJobManagerFailoverInSnapshotPhase(String tableName, String 
chunkColumnName)
-            throws Exception {
+    public void testJobManagerFailoverInSnapshotPhase(
+            String tableName, String chunkColumnName, String 
assignEndingFirst) throws Exception {
         testMySqlParallelSource(
                 FailoverType.JM,
                 FailoverPhase.SNAPSHOT,
                 new String[] {tableName, "customers_1"},
                 tableName,
-                chunkColumnName);
+                chunkColumnName,
+                Collections.singletonMap(
+                        
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+                        assignEndingFirst));
     }
 
     @ParameterizedTest
     @MethodSource("parameters")
-    public void testJobManagerFailoverInBinlogPhase(String tableName, String 
chunkColumnName)
-            throws Exception {
+    public void testJobManagerFailoverInBinlogPhase(
+            String tableName, String chunkColumnName, String 
assignEndingFirst) throws Exception {
         testMySqlParallelSource(
                 FailoverType.JM,
                 FailoverPhase.BINLOG,
                 new String[] {tableName, "customers_1"},
                 tableName,
-                chunkColumnName);
+                chunkColumnName,
+                Collections.singletonMap(
+                        
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+                        assignEndingFirst));
     }
 
     @ParameterizedTest
     @MethodSource("parameters")
-    public void testJobManagerFailoverFromLatestOffset(String tableName, 
String chunkColumnName)
-            throws Exception {
+    public void testJobManagerFailoverFromLatestOffset(
+            String tableName, String chunkColumnName, String 
assignEndingFirst) throws Exception {
         testMySqlParallelSource(
                 DEFAULT_PARALLELISM,
                 "latest-offset",
@@ -195,33 +209,42 @@ public class OceanBaseFailoverITCase extends 
OceanBaseSourceTestBase {
                 1,
                 0,
                 tableName,
-                chunkColumnName);
+                chunkColumnName,
+                Collections.singletonMap(
+                        
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+                        assignEndingFirst));
     }
 
     @ParameterizedTest
     @MethodSource("parameters")
-    public void testTaskManagerFailoverSingleParallelism(String tableName, 
String chunkColumnName)
-            throws Exception {
+    public void testTaskManagerFailoverSingleParallelism(
+            String tableName, String chunkColumnName, String 
assignEndingFirst) throws Exception {
         testMySqlParallelSource(
                 1,
                 FailoverType.TM,
                 FailoverPhase.SNAPSHOT,
                 new String[] {tableName},
                 tableName,
-                chunkColumnName);
+                chunkColumnName,
+                Collections.singletonMap(
+                        
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+                        assignEndingFirst));
     }
 
     @ParameterizedTest
     @MethodSource("parameters")
-    public void testJobManagerFailoverSingleParallelism(String tableName, 
String chunkColumnName)
-            throws Exception {
+    public void testJobManagerFailoverSingleParallelism(
+            String tableName, String chunkColumnName, String 
assignEndingFirst) throws Exception {
         testMySqlParallelSource(
                 1,
                 FailoverType.JM,
                 FailoverPhase.SNAPSHOT,
                 new String[] {tableName},
                 tableName,
-                chunkColumnName);
+                chunkColumnName,
+                Collections.singletonMap(
+                        
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+                        assignEndingFirst));
     }
 
     private void testMySqlParallelSource(
@@ -229,7 +252,8 @@ public class OceanBaseFailoverITCase extends 
OceanBaseSourceTestBase {
             FailoverPhase failoverPhase,
             String[] captureCustomerTables,
             String tableName,
-            String chunkColumnName)
+            String chunkColumnName,
+            Map<String, String> otherOptions)
             throws Exception {
         testMySqlParallelSource(
                 DEFAULT_PARALLELISM,
@@ -237,7 +261,8 @@ public class OceanBaseFailoverITCase extends 
OceanBaseSourceTestBase {
                 failoverPhase,
                 captureCustomerTables,
                 tableName,
-                chunkColumnName);
+                chunkColumnName,
+                otherOptions);
     }
 
     private void testMySqlParallelSource(
@@ -246,7 +271,8 @@ public class OceanBaseFailoverITCase extends 
OceanBaseSourceTestBase {
             FailoverPhase failoverPhase,
             String[] captureCustomerTables,
             String tableName,
-            String chunkColumnName)
+            String chunkColumnName,
+            Map<String, String> otherOptions)
             throws Exception {
         testMySqlParallelSource(
                 parallelism,
@@ -257,7 +283,8 @@ public class OceanBaseFailoverITCase extends 
OceanBaseSourceTestBase {
                 1,
                 0,
                 tableName,
-                chunkColumnName);
+                chunkColumnName,
+                otherOptions);
     }
 
     private void testMySqlParallelSource(
@@ -269,7 +296,8 @@ public class OceanBaseFailoverITCase extends 
OceanBaseSourceTestBase {
             int restartAttempts,
             long delayBetweenAttempts,
             String tableName,
-            String chunkColumnName)
+            String chunkColumnName,
+            Map<String, String> otherOptions)
             throws Exception {
         testMySqlParallelSource(
                 parallelism,
@@ -281,7 +309,8 @@ public class OceanBaseFailoverITCase extends 
OceanBaseSourceTestBase {
                 delayBetweenAttempts,
                 false,
                 tableName,
-                chunkColumnName);
+                chunkColumnName,
+                otherOptions);
     }
 
     private void testMySqlParallelSource(
@@ -294,11 +323,10 @@ public class OceanBaseFailoverITCase extends 
OceanBaseSourceTestBase {
             long delayBetweenAttempts,
             boolean skipSnapshotBackfill,
             String tableName,
-            String chunkColumnName)
+            String chunkColumnName,
+            Map<String, String> otherOptions)
             throws Exception {
-        captureCustomerTables = new String[] {tableName};
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
         StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
 
         env.setParallelism(parallelism);
@@ -316,7 +344,7 @@ public class OceanBaseFailoverITCase extends 
OceanBaseSourceTestBase {
                                         ? ""
                                         : ", primary key (id) not enforced")
                                 + ") WITH ("
-                                + " 'connector' = 'oceanbase-cdc',"
+                                + " 'connector' = 'mysql-cdc',"
                                 + " 'scan.incremental.snapshot.enabled' = 
'true',"
                                 + " 'hostname' = '%s',"
                                 + " 'port' = '%s',"
@@ -330,12 +358,13 @@ public class OceanBaseFailoverITCase extends 
OceanBaseSourceTestBase {
                                 + " 'server-time-zone' = 'Asia/Shanghai',"
                                 + " 'server-id' = '%s'"
                                 + " %s"
+                                + " %s"
                                 + ")",
                         getHost(),
                         getPort(),
                         getUserName(),
                         getPassword(),
-                        DEFAULT_TEST_DATABASE,
+                        testDatabase,
                         getTableNameRegex(captureCustomerTables),
                         scanStartupMode,
                         skipSnapshotBackfill,
@@ -344,7 +373,17 @@ public class OceanBaseFailoverITCase extends 
OceanBaseSourceTestBase {
                                 ? ""
                                 : String.format(
                                         ", 
'scan.incremental.snapshot.chunk.key-column' = '%s'",
-                                        chunkColumnName));
+                                        chunkColumnName),
+                        otherOptions.isEmpty()
+                                ? ""
+                                : ","
+                                        + otherOptions.entrySet().stream()
+                                                .map(
+                                                        e ->
+                                                                String.format(
+                                                                        
"'%s'='%s'",
+                                                                        
e.getKey(), e.getValue()))
+                                                
.collect(Collectors.joining(",")));
         tEnv.executeSql(sourceDDL);
         TableResult tableResult = tEnv.executeSql("select * from customers");
 
@@ -356,7 +395,7 @@ public class OceanBaseFailoverITCase extends 
OceanBaseSourceTestBase {
         // second step: check the binlog data
         checkBinlogData(tableResult, failoverType, failoverPhase, 
captureCustomerTables);
 
-        //        sleepMs(3000);
+        sleepMs(3000);
         tableResult.getJobClient().get().cancel().get();
     }
 
@@ -423,7 +462,7 @@ public class OceanBaseFailoverITCase extends 
OceanBaseSourceTestBase {
         JobID jobId = tableResult.getJobClient().get().getJobID();
 
         for (String tableId : captureCustomerTables) {
-            makeFirstPartBinlogEvents(getConnection(), DEFAULT_TEST_DATABASE + 
'.' + tableId);
+            makeFirstPartBinlogEvents(getConnection(), testDatabase + '.' + 
tableId);
         }
 
         // wait for the binlog reading
@@ -438,7 +477,7 @@ public class OceanBaseFailoverITCase extends 
OceanBaseSourceTestBase {
             waitUntilJobRunning(tableResult);
         }
         for (String tableId : captureCustomerTables) {
-            makeSecondPartBinlogEvents(getConnection(), DEFAULT_TEST_DATABASE 
+ '.' + tableId);
+            makeSecondPartBinlogEvents(getConnection(), testDatabase + '.' + 
tableId);
         }
 
         List<String> expectedBinlogData = new ArrayList<>();
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceITCase.java
index 8c061bb79..c5ebaf6f1 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceITCase.java
@@ -27,7 +27,6 @@ import org.apache.flink.util.CloseableIterator;
 
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 import java.sql.Connection;
@@ -39,8 +38,6 @@ import java.util.List;
 import static java.lang.String.format;
 
 /** OceanBase CDC source connector integration test. */
-@Disabled(
-        "Temporarily disabled for GitHub CI due to unavailability of OceanBase 
Binlog Service docker image. These tests are currently only supported for local 
execution.")
 public class OceanBaseSourceITCase extends OceanBaseSourceTestBase {
     private static final String DDL_FILE = "oceanbase_ddl_test";
     private static final String DATABASE_NAME = "cdc_s_" + getRandomSuffix();
@@ -89,7 +86,7 @@ public class OceanBaseSourceITCase extends 
OceanBaseSourceTestBase {
                                 + " 'server-id' = '%s'"
                                 + ")",
                         getHost(),
-                        PORT,
+                        getPort(),
                         USER_NAME,
                         PASSWORD,
                         DATABASE_NAME,
@@ -241,7 +238,7 @@ public class OceanBaseSourceITCase extends 
OceanBaseSourceTestBase {
                                 + " 'server-id' = '%s'"
                                 + ")",
                         getHost(),
-                        PORT,
+                        getPort(),
                         USER_NAME,
                         PASSWORD,
                         DATABASE_NAME,
@@ -295,7 +292,7 @@ public class OceanBaseSourceITCase extends 
OceanBaseSourceTestBase {
                                 + " 'server-id' = '%s'"
                                 + ")",
                         getHost(),
-                        PORT,
+                        getPort(),
                         USER_NAME,
                         PASSWORD,
                         DATABASE_NAME,
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceTestBase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceTestBase.java
index 0ea3cb966..703e0a523 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceTestBase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceTestBase.java
@@ -30,6 +30,12 @@ import org.apache.commons.lang3.StringUtils;
 import org.assertj.core.api.Assertions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
 
 import java.net.URL;
 import java.nio.file.Files;
@@ -38,6 +44,7 @@ import java.sql.Connection;
 import java.sql.DriverManager;
 import java.sql.SQLException;
 import java.sql.Statement;
+import java.time.Duration;
 import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -56,22 +63,40 @@ import java.util.stream.Collectors;
 import static java.lang.String.format;
 import static org.apache.flink.util.Preconditions.checkState;
 
-/** Basic class for testing Database OceanBase which supported the mysql 
protocol. */
+/** Basic class for testing Database OceanBase. */
+@Testcontainers
 public abstract class OceanBaseSourceTestBase extends AbstractTestBase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(OceanBaseSourceTestBase.class);
     private static final Pattern COMMENT_PATTERN = 
Pattern.compile("^(.*)--.*$");
-    protected static final Integer PORT = 3306;
-    protected static final String USER_NAME = 
System.getenv("OCEANBASE_USERNAME");
-    protected static final String PASSWORD = 
System.getenv("OCEANBASE_PASSWORD");
-    protected static final String HOSTNAME = 
System.getenv("OCEANBASE_HOSTNAME");
+
+    protected static final Integer INNER_PORT = 2883;
+    protected static final String USER_NAME = "root@test";
+    protected static final String PASSWORD = "123456";
+    protected static final Duration WAITING_TIMEOUT = Duration.ofMinutes(5);
+
+    public static final Network NETWORK = Network.newNetwork();
+
+    @SuppressWarnings("resource")
+    @Container
+    public static final GenericContainer<?> OB_BINLOG_CONTAINER =
+            new GenericContainer<>("quay.io/oceanbase/obbinlog-ce:4.2.5-test")
+                    .withNetwork(NETWORK)
+                    .withStartupTimeout(WAITING_TIMEOUT)
+                    .withExposedPorts(2881, 2883)
+                    .withLogConsumer(new Slf4jLogConsumer(LOG))
+                    .waitingFor(
+                            new LogMessageWaitStrategy()
+                                    .withRegEx(".*OBBinlog is ready!.*")
+                                    .withTimes(1)
+                                    
.withStartupTimeout(Duration.ofMinutes(6)));
 
     protected static String getHost() {
-        return HOSTNAME;
+        return OB_BINLOG_CONTAINER.getHost();
     }
 
-    protected static Integer getPort() {
-        return PORT;
+    protected static int getPort() {
+        return OB_BINLOG_CONTAINER.getMappedPort(INNER_PORT);
     }
 
     protected static String getUserName() {
@@ -83,7 +108,7 @@ public abstract class OceanBaseSourceTestBase extends 
AbstractTestBase {
     }
 
     protected static String getJdbcUrl() {
-        return String.format("jdbc:mysql://%s:%s", HOSTNAME, PORT);
+        return String.format("jdbc:mysql://%s:%s", getHost(), getPort());
     }
 
     protected static Connection getJdbcConnection() throws SQLException {
@@ -253,7 +278,7 @@ public abstract class OceanBaseSourceTestBase extends 
AbstractTestBase {
         properties.put("database.port", String.valueOf(getPort()));
         properties.put("database.user", getUserName());
         properties.put("database.password", getPassword());
-        properties.put("database.serverTimezone", ZoneId.of("UTC").toString());
+        properties.put("database.serverTimezone", 
ZoneId.of("Asia/Shanghai").toString());
         io.debezium.config.Configuration configuration =
                 io.debezium.config.Configuration.from(properties);
         return DebeziumUtils.createMySqlConnection(configuration, new 
Properties());
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml 
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
index 60892d162..188e2e36b 100644
--- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
@@ -328,6 +328,16 @@ limitations under the License.
                             </outputDirectory>
                         </artifactItem>
 
+                        <artifactItem>
+                            <groupId>org.apache.flink</groupId>
+                            
<artifactId>flink-sql-connector-oceanbase-cdc</artifactId>
+                            <version>${project.version}</version>
+                            
<destFileName>oceanbase-cdc-connector.jar</destFileName>
+                            <type>jar</type>
+                            
<outputDirectory>${project.build.directory}/dependencies
+                            </outputDirectory>
+                        </artifactItem>
+
                         <artifactItem>
                             <groupId>com.ibm.db2.jcc</groupId>
                             <artifactId>db2jcc</artifactId>
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java
new file mode 100644
index 000000000..bb0cb26f2
--- /dev/null
+++ 
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.tests;
+
+import org.apache.flink.cdc.common.test.utils.JdbcProxy;
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import 
org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.cdc.connectors.base.utils.EnvironmentUtils.supportCheckpointsAfterTasksFinished;
+
+/** End-to-end tests for oceanbase-cdc connector uber jar. */
+@Testcontainers
+class OceanBaseE2eITCase extends FlinkContainerTestEnvironment {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(OceanBaseE2eITCase.class);
+
+    private static final Path oceanbaseCdcJar =
+            TestUtils.getResource("oceanbase-cdc-connector.jar");
+    private static final Pattern COMMENT_PATTERN = 
Pattern.compile("^(.*)--.*$");
+    private static final String[] CREATE_DATABASE_DDL =
+            new String[] {"CREATE DATABASE `$DBNAME$`;", "USE `$DBNAME$`;"};
+    private static final Path jdbcDriver = 
TestUtils.getResource("mysql-driver.jar");
+    private static final String DATABASE_NAME = "oceanbase_inventory";
+
+    private static final int OB_PROXY_PORT = 2883;
+    protected static final String DEFAULT_USERNAME = "root@test";
+    protected static final String DEFAULT_PASSWORD = "123456";
+    protected static final String INTER_CONTAINER_OCEANBASE_ALIAS = 
"oceanbase";
+
+    @Container
+    @SuppressWarnings("resource")
+    public static final GenericContainer<?> OB_BINLOG_CONTAINER =
+            new GenericContainer<>("quay.io/oceanbase/obbinlog-ce:4.2.5-test")
+                    .withNetwork(NETWORK)
+                    .withNetworkAliases(INTER_CONTAINER_OCEANBASE_ALIAS)
+                    .withStartupTimeout(Duration.ofMinutes(5))
+                    .withExposedPorts(2881, 2883)
+                    .withLogConsumer(new Slf4jLogConsumer(LOG))
+                    .waitingFor(
+                            new LogMessageWaitStrategy()
+                                    .withRegEx(".*OBBinlog is ready!.*")
+                                    .withTimes(1)
+                                    
.withStartupTimeout(Duration.ofMinutes(6)));
+
+    @BeforeEach
+    public void before() {
+        super.before();
+        createAndInitializeOBTable("oceanbase_inventory");
+    }
+
+    @AfterEach
+    public void after() {
+        super.after();
+    }
+
+    @Test
+    void testOBBinlogCDC() throws Exception {
+        List<String> sqlLines =
+                Arrays.asList(
+                        "SET 'execution.checkpointing.interval' = '3s';",
+                        "SET 
'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true';",
+                        "CREATE TABLE products_source (",
+                        " `id` INT NOT NULL,",
+                        " name STRING,",
+                        " description STRING,",
+                        " weight DECIMAL(10,3),",
+                        " enum_c STRING,",
+                        " json_c STRING,",
+                        " point_c STRING,",
+                        " primary key (`id`) not enforced",
+                        ") WITH (",
+                        " 'connector' = 'oceanbase-cdc',",
+                        " 'hostname' = '" + INTER_CONTAINER_OCEANBASE_ALIAS + 
"',",
+                        " 'port' = '2883',",
+                        " 'username' = '" + DEFAULT_USERNAME + "',",
+                        " 'password' = '" + DEFAULT_PASSWORD + "',",
+                        " 'database-name' = '" + DATABASE_NAME + "',",
+                        " 'table-name' = 'products_source',",
+                        " 'server-time-zone' = 'Asia/Shanghai',",
+                        " 'server-id' = '5800-5900',",
+                        " 'scan.incremental.snapshot.chunk.size' = '4',",
+                        " 'scan.incremental.close-idle-reader.enabled' = '"
+                                + supportCheckpointsAfterTasksFinished()
+                                + "'",
+                        ");",
+                        "CREATE TABLE products_sink (",
+                        " `id` INT NOT NULL,",
+                        " name STRING,",
+                        " description STRING,",
+                        " weight DECIMAL(10,3),",
+                        " enum_c STRING,",
+                        " json_c STRING,",
+                        " point_c STRING,",
+                        " primary key (`id`) not enforced",
+                        ") WITH (",
+                        " 'connector' = 'jdbc',",
+                        String.format(
+                                " 'url' = 'jdbc:mysql://%s:2883/%s',",
+                                INTER_CONTAINER_OCEANBASE_ALIAS, 
DATABASE_NAME),
+                        " 'table-name' = 'products_sink',",
+                        " 'username' = '" + DEFAULT_USERNAME + "',",
+                        " 'password' = '" + DEFAULT_PASSWORD + "'",
+                        ");",
+                        "INSERT INTO products_sink",
+                        "SELECT * FROM products_source;");
+
+        submitSQLJob(sqlLines, oceanbaseCdcJar, jdbcJar, jdbcDriver);
+        waitUntilJobRunning(Duration.ofSeconds(30));
+
+        // generate binlogs
+        String jdbcUrl =
+                String.format(
+                        "jdbc:mysql://%s:%s/%s",
+                        OB_BINLOG_CONTAINER.getHost(),
+                        OB_BINLOG_CONTAINER.getMappedPort(OB_PROXY_PORT),
+                        DATABASE_NAME);
+        try (Connection conn =
+                        DriverManager.getConnection(jdbcUrl, DEFAULT_USERNAME, 
DEFAULT_PASSWORD);
+                Statement stat = conn.createStatement()) {
+            stat.execute(
+                    "UPDATE products_source SET description='18oz carpenter 
hammer' WHERE id=106;");
+            stat.execute("UPDATE products_source SET weight='5.1' WHERE 
id=107;");
+            stat.execute(
+                    "INSERT INTO products_source VALUES 
(default,'jacket','water resistent white wind breaker',0.2, null, null, 
null);"); // 110
+            stat.execute(
+                    "INSERT INTO products_source VALUES 
(default,'scooter','Big 2-wheel scooter ',5.18, null, null, null);");
+            stat.execute(
+                    "UPDATE products_source SET description='new water 
resistent white wind breaker', weight='0.5' WHERE id=110;");
+            stat.execute("UPDATE products_source SET weight='5.17' WHERE 
id=111;");
+            stat.execute("DELETE FROM products_source WHERE id=111;");
+            // add schema change event in the last.
+            stat.execute("CREATE TABLE new_table (id int, age int);");
+        } catch (SQLException e) {
+            LOG.error("Update table for CDC failed.", e);
+            throw e;
+        }
+
+        // assert final results
+        JdbcProxy proxy =
+                new JdbcProxy(jdbcUrl, DEFAULT_USERNAME, DEFAULT_PASSWORD, 
MYSQL_DRIVER_CLASS);
+        List<String> expectResult =
+                Arrays.asList(
+                        "101,scooter,Small 2-wheel scooter,3.14,red,{\"key1\": 
\"value1\"},{\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}",
+                        "102,car battery,12V car battery,8.1,white,{\"key2\": 
\"value2\"},{\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}",
+                        "103,12-pack drill bits,12-pack of drill bits with 
sizes ranging from #40 to #3,0.8,red,{\"key3\": 
\"value3\"},{\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}",
+                        "104,hammer,12oz carpenter's 
hammer,0.75,white,{\"key4\": 
\"value4\"},{\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}",
+                        "105,hammer,14oz carpenter's hammer,0.875,red,{\"k1\": 
\"v1\", \"k2\": \"v2\"},{\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}",
+                        "106,hammer,18oz carpenter hammer,1.0,null,null,null",
+                        "107,rocks,box of assorted rocks,5.1,null,null,null",
+                        "108,jacket,water resistent black wind 
breaker,0.1,null,null,null",
+                        "109,spare tire,24 inch spare 
tire,22.2,null,null,null",
+                        "110,jacket,new water resistent white wind 
breaker,0.5,null,null,null");
+        proxy.checkResultWithTimeout(
+                expectResult,
+                "products_sink",
+                new String[] {"id", "name", "description", "weight", "enum_c", 
"json_c", "point_c"},
+                300000L);
+    }
+
+    /** Creates the database and populates it with initialization SQL script. 
*/
+    public void createAndInitializeOBTable(String sqlFile) {
+        final String ddlFile = String.format("ddl/%s.sql", sqlFile);
+        final URL ddlTestFile = 
OceanBaseE2eITCase.class.getClassLoader().getResource(ddlFile);
+        Assertions.assertThat(ddlTestFile).withFailMessage("Cannot locate " + 
ddlFile).isNotNull();
+        try {
+            try (Connection connection = getJdbcConnection();
+                    Statement statement = connection.createStatement()) {
+                final List<String> statements =
+                        Arrays.stream(
+                                        Stream.concat(
+                                                        
Arrays.stream(CREATE_DATABASE_DDL),
+                                                        Files.readAllLines(
+                                                                
Paths.get(ddlTestFile.toURI()))
+                                                                .stream())
+                                                .map(String::trim)
+                                                .filter(x -> 
!x.startsWith("--") && !x.isEmpty())
+                                                .map(
+                                                        x -> {
+                                                            final Matcher m =
+                                                                    
COMMENT_PATTERN.matcher(x);
+                                                            return m.matches() 
? m.group(1) : x;
+                                                        })
+                                                .map(sql -> 
sql.replace("$DBNAME$", DATABASE_NAME))
+                                                
.collect(Collectors.joining("\n"))
+                                                .split(";"))
+                                .map(x -> x.replace("$$", ";"))
+                                .collect(Collectors.toList());
+                for (String stmt : statements) {
+                    statement.execute(stmt);
+                }
+            }
+        } catch (final Exception e) {
+            throw new IllegalStateException(e);
+        }
+    }
+
+    public Connection getJdbcConnection() throws SQLException {
+        String url =
+                "jdbc:mysql://"
+                        + OB_BINLOG_CONTAINER.getHost()
+                        + ":"
+                        + OB_BINLOG_CONTAINER.getMappedPort(OB_PROXY_PORT)
+                        + "?useSSL=false";
+        return DriverManager.getConnection(url, DEFAULT_USERNAME, 
DEFAULT_PASSWORD);
+    }
+}
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oceanbase_inventory.sql
 
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oceanbase_inventory.sql
new file mode 100644
index 000000000..73bb75d7f
--- /dev/null
+++ 
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oceanbase_inventory.sql
@@ -0,0 +1,51 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements.  See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License.  You may obtain a copy of the License at
+--
+--      http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+-- 
----------------------------------------------------------------------------------------------------------------
+-- DATABASE:  inventory
+-- 
----------------------------------------------------------------------------------------------------------------
+
+-- Create and populate our products using a single insert with many rows
+CREATE TABLE products_source (
+  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+  name VARCHAR(255) NOT NULL DEFAULT 'flink',
+  description VARCHAR(512),
+  weight FLOAT,
+  enum_c enum('red', 'white') default 'red',  -- test some complex types as 
well,
+  json_c JSON,                                -- because we use additional 
dependencies to deserialize complex types.
+  point_c POINT
+);
+ALTER TABLE products_source AUTO_INCREMENT = 101;
+
+INSERT INTO products_source
+VALUES (default,"scooter","Small 2-wheel scooter",3.14, 'red', '{"key1": 
"value1"}', ST_GeomFromText('POINT(1 1)')),
+       (default,"car battery","12V car battery",8.1, 'white', '{"key2": 
"value2"}', ST_GeomFromText('POINT(2 2)')),
+       (default,"12-pack drill bits","12-pack of drill bits with sizes ranging 
from #40 to #3",0.8, 'red', '{"key3": "value3"}', ST_GeomFromText('POINT(3 
3)')),
+       (default,"hammer","12oz carpenter's hammer",0.75, 'white', '{"key4": 
"value4"}', ST_GeomFromText('POINT(4 4)')),
+       (default,"hammer","14oz carpenter's hammer",0.875, 'red', '{"k1": "v1", 
"k2": "v2"}', ST_GeomFromText('POINT(5 5)')),
+       (default,"hammer","16oz carpenter's hammer",1.0, null, null, null),
+       (default,"rocks","box of assorted rocks",5.3, null, null, null),
+       (default,"jacket","water resistent black wind breaker",0.1, null, null, 
null),
+       (default,"spare tire","24 inch spare tire",22.2, null, null, null);
+
+CREATE TABLE products_sink (
+  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+  name VARCHAR(255) NOT NULL DEFAULT 'flink',
+  description VARCHAR(512),
+  weight FLOAT,
+  enum_c VARCHAR(255),
+  json_c VARCHAR(255),
+  point_c VARCHAR(255)
+);
\ No newline at end of file

Reply via email to