This is an automated email from the ASF dual-hosted git repository.
lvyanquan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new fe79b4df7 [FLINK-38239][oceanbase] Re-enable OceanBase CDC integration
tests in CI with Docker image (#4111)
fe79b4df7 is described below
commit fe79b4df75b9725e499bee41db0bc5d051b19844
Author: yuanoOo <[email protected]>
AuthorDate: Wed Apr 29 11:57:37 2026 +0800
[FLINK-38239][oceanbase] Re-enable OceanBase CDC integration tests in CI
with Docker image (#4111)
---
.../docs/connectors/flink-sources/overview.md | 2 +-
.../docs/connectors/flink-sources/overview.md | 2 +-
.../oceanbase/OceanBaseCharsetITCase.java | 5 +-
.../oceanbase/OceanBaseFailoverITCase.java | 139 +++++++-----
.../oceanbase/OceanBaseSourceITCase.java | 9 +-
.../oceanbase/OceanBaseSourceTestBase.java | 45 +++-
.../flink-cdc-source-e2e-tests/pom.xml | 10 +
.../cdc/connectors/tests/OceanBaseE2eITCase.java | 249 +++++++++++++++++++++
.../src/test/resources/ddl/oceanbase_inventory.sql | 51 +++++
9 files changed, 440 insertions(+), 72 deletions(-)
diff --git a/docs/content.zh/docs/connectors/flink-sources/overview.md
b/docs/content.zh/docs/connectors/flink-sources/overview.md
index b042a8a2e..97abe6af1 100644
--- a/docs/content.zh/docs/connectors/flink-sources/overview.md
+++ b/docs/content.zh/docs/connectors/flink-sources/overview.md
@@ -86,7 +86,7 @@ The following table shows the current features of the
connector:
| [oracle-cdc]({{< ref "docs/connectors/flink-sources/oracle-cdc" >}}) |
✅ | ✅ | ✅ | ✅ |
| [postgres-cdc]({{< ref "docs/connectors/flink-sources/postgres-cdc" >}}) |
✅ | ✅ | ✅ | ✅ |
| [sqlserver-cdc]({{< ref "docs/connectors/flink-sources/sqlserver-cdc" >}}) |
✅ | ✅ | ✅ | ✅ |
-| [oceanbase-cdc]({{< ref "docs/connectors/flink-sources/oceanbase-cdc" >}}) |
❌ | ❌ | ❌ | ❌ |
+| [oceanbase-cdc]({{< ref "docs/connectors/flink-sources/oceanbase-cdc" >}}) |
✅ | ✅ | ✅ | ✅ |
| [tidb-cdc]({{< ref "docs/connectors/flink-sources/tidb-cdc" >}}) |
✅ | ❌ | ❌ | ❌ |
| [db2-cdc]({{< ref "docs/connectors/flink-sources/db2-cdc" >}}) |
✅ | ✅ | ✅ | ✅ |
| [vitess-cdc]({{< ref "docs/connectors/flink-sources/vitess-cdc" >}}) |
✅ | ❌ | ❌ | ❌ |
diff --git a/docs/content/docs/connectors/flink-sources/overview.md
b/docs/content/docs/connectors/flink-sources/overview.md
index 3d99e37d4..7dd38d401 100644
--- a/docs/content/docs/connectors/flink-sources/overview.md
+++ b/docs/content/docs/connectors/flink-sources/overview.md
@@ -86,7 +86,7 @@ The following table shows the current features of the
connector:
| [oracle-cdc]({{< ref "docs/connectors/flink-sources/oracle-cdc" >}}) |
✅ | ✅ | ✅ | ✅ |
| [postgres-cdc]({{< ref "docs/connectors/flink-sources/postgres-cdc" >}}) |
✅ | ✅ | ✅ | ✅ |
| [sqlserver-cdc]({{< ref "docs/connectors/flink-sources/sqlserver-cdc" >}}) |
✅ | ✅ | ✅ | ✅ |
-| [oceanbase-cdc]({{< ref "docs/connectors/flink-sources/oceanbase-cdc" >}}) |
❌ | ❌ | ❌ | ❌ |
+| [oceanbase-cdc]({{< ref "docs/connectors/flink-sources/oceanbase-cdc" >}}) |
✅ | ✅ | ✅ | ✅ |
| [tidb-cdc]({{< ref "docs/connectors/flink-sources/tidb-cdc" >}}) |
✅ | ❌ | ❌ | ❌ |
| [db2-cdc]({{< ref "docs/connectors/flink-sources/db2-cdc" >}}) |
✅ | ✅ | ✅ | ✅ |
| [vitess-cdc]({{< ref "docs/connectors/flink-sources/vitess-cdc" >}}) |
✅ | ❌ | ❌ | ❌ |
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseCharsetITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseCharsetITCase.java
index 5e1e97989..9474a225f 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseCharsetITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseCharsetITCase.java
@@ -29,7 +29,6 @@ import org.apache.flink.util.StringUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
@@ -40,8 +39,6 @@ import java.util.Arrays;
import java.util.stream.Stream;
/** Test supporting different column charsets for OceanBase. */
-@Disabled(
- "Temporarily disabled for GitHub CI due to unavailability of OceanBase
Binlog Service docker image. These tests are currently only supported for local
execution.")
public class OceanBaseCharsetITCase extends OceanBaseSourceTestBase {
private static final String DDL_FILE = "charset_test";
@@ -183,7 +180,7 @@ public class OceanBaseCharsetITCase extends
OceanBaseSourceTestBase {
+ ")",
testName,
getHost(),
- PORT,
+ getPort(),
USER_NAME,
PASSWORD,
DATABASE_NAME,
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java
index 402ff7542..1887b0a85 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseFailoverITCase.java
@@ -21,7 +21,6 @@ import org.apache.flink.api.common.JobID;
import org.apache.flink.cdc.connectors.utils.ExternalResourceProxy;
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.util.RestartStrategyUtils;
import org.apache.flink.table.api.TableResult;
@@ -35,7 +34,6 @@ import io.debezium.jdbc.JdbcConnection;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
@@ -45,28 +43,29 @@ import org.junit.jupiter.params.provider.MethodSource;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.lang.String.format;
import static org.apache.flink.api.common.JobStatus.RUNNING;
/** failover IT tests for oceanbase. */
-@Disabled(
- "Temporarily disabled for GitHub CI due to unavailability of OceanBase
Binlog Service docker image. These tests are currently only supported for local
execution.")
@Timeout(value = 180, unit = TimeUnit.SECONDS)
public class OceanBaseFailoverITCase extends OceanBaseSourceTestBase {
private static final String DEFAULT_SCAN_STARTUP_MODE = "initial";
private static final String DDL_FILE = "oceanbase_ddl_test";
- private static final String DEFAULT_TEST_DATABASE = "customer_" +
getRandomSuffix();
protected static final int DEFAULT_PARALLELISM = 4;
+ private String testDatabase = "customer_" + getRandomSuffix();
private final List<String> firstPartBinlogEvents =
Arrays.asList(
@@ -87,9 +86,9 @@ public class OceanBaseFailoverITCase extends
OceanBaseSourceTestBase {
public static Stream<Arguments> parameters() {
return Stream.of(
- Arguments.of("customers", null),
- Arguments.of("customers", "id"),
- Arguments.of("customers_no_pk", "id"));
+ Arguments.of("customers", null, "false"),
+ Arguments.of("customers", "id", "true"),
+ Arguments.of("customers_no_pk", "id", "true"));
}
@RegisterExtension
@@ -105,47 +104,53 @@ public class OceanBaseFailoverITCase extends
OceanBaseSourceTestBase {
@BeforeEach
public void setup() throws InterruptedException {
+ testDatabase = "customer_" + getRandomSuffix();
initializeOceanBaseTables(
DDL_FILE,
- DEFAULT_TEST_DATABASE,
+ testDatabase,
s -> !StringUtils.isNullOrWhitespaceOnly(s) &&
(s.contains("customers")));
}
@AfterEach
public void clean() {
- dropDatabase(DEFAULT_TEST_DATABASE);
+ dropDatabase(testDatabase);
}
// Failover tests
@ParameterizedTest
@MethodSource("parameters")
- @Timeout(value = 120, unit = TimeUnit.SECONDS)
- public void testTaskManagerFailoverInSnapshotPhase(String tableName,
String chunkColumnName)
- throws Exception {
+ public void testTaskManagerFailoverInSnapshotPhase(
+ String tableName, String chunkColumnName, String
assignEndingFirst) throws Exception {
testMySqlParallelSource(
FailoverType.TM,
FailoverPhase.SNAPSHOT,
new String[] {tableName, "customers_1"},
tableName,
- chunkColumnName);
+ chunkColumnName,
+ Collections.singletonMap(
+
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
}
@ParameterizedTest
@MethodSource("parameters")
- public void testTaskManagerFailoverInBinlogPhase(String tableName, String
chunkColumnName)
- throws Exception {
+ public void testTaskManagerFailoverInBinlogPhase(
+ String tableName, String chunkColumnName, String
assignEndingFirst) throws Exception {
testMySqlParallelSource(
FailoverType.TM,
FailoverPhase.BINLOG,
new String[] {tableName, "customers_1"},
tableName,
- chunkColumnName);
+ chunkColumnName,
+ Collections.singletonMap(
+
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
}
@ParameterizedTest
@MethodSource("parameters")
- public void testTaskManagerFailoverFromLatestOffset(String tableName,
String chunkColumnName)
- throws Exception {
+ public void testTaskManagerFailoverFromLatestOffset(
+ String tableName, String chunkColumnName, String
assignEndingFirst) throws Exception {
testMySqlParallelSource(
DEFAULT_PARALLELISM,
"latest-offset",
@@ -155,37 +160,46 @@ public class OceanBaseFailoverITCase extends
OceanBaseSourceTestBase {
1,
0,
tableName,
- chunkColumnName);
+ chunkColumnName,
+ Collections.singletonMap(
+
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
}
@ParameterizedTest
@MethodSource("parameters")
- public void testJobManagerFailoverInSnapshotPhase(String tableName, String
chunkColumnName)
- throws Exception {
+ public void testJobManagerFailoverInSnapshotPhase(
+ String tableName, String chunkColumnName, String
assignEndingFirst) throws Exception {
testMySqlParallelSource(
FailoverType.JM,
FailoverPhase.SNAPSHOT,
new String[] {tableName, "customers_1"},
tableName,
- chunkColumnName);
+ chunkColumnName,
+ Collections.singletonMap(
+
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
}
@ParameterizedTest
@MethodSource("parameters")
- public void testJobManagerFailoverInBinlogPhase(String tableName, String
chunkColumnName)
- throws Exception {
+ public void testJobManagerFailoverInBinlogPhase(
+ String tableName, String chunkColumnName, String
assignEndingFirst) throws Exception {
testMySqlParallelSource(
FailoverType.JM,
FailoverPhase.BINLOG,
new String[] {tableName, "customers_1"},
tableName,
- chunkColumnName);
+ chunkColumnName,
+ Collections.singletonMap(
+
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
}
@ParameterizedTest
@MethodSource("parameters")
- public void testJobManagerFailoverFromLatestOffset(String tableName,
String chunkColumnName)
- throws Exception {
+ public void testJobManagerFailoverFromLatestOffset(
+ String tableName, String chunkColumnName, String
assignEndingFirst) throws Exception {
testMySqlParallelSource(
DEFAULT_PARALLELISM,
"latest-offset",
@@ -195,33 +209,42 @@ public class OceanBaseFailoverITCase extends
OceanBaseSourceTestBase {
1,
0,
tableName,
- chunkColumnName);
+ chunkColumnName,
+ Collections.singletonMap(
+
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
}
@ParameterizedTest
@MethodSource("parameters")
- public void testTaskManagerFailoverSingleParallelism(String tableName,
String chunkColumnName)
- throws Exception {
+ public void testTaskManagerFailoverSingleParallelism(
+ String tableName, String chunkColumnName, String
assignEndingFirst) throws Exception {
testMySqlParallelSource(
1,
FailoverType.TM,
FailoverPhase.SNAPSHOT,
new String[] {tableName},
tableName,
- chunkColumnName);
+ chunkColumnName,
+ Collections.singletonMap(
+
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
}
@ParameterizedTest
@MethodSource("parameters")
- public void testJobManagerFailoverSingleParallelism(String tableName,
String chunkColumnName)
- throws Exception {
+ public void testJobManagerFailoverSingleParallelism(
+ String tableName, String chunkColumnName, String
assignEndingFirst) throws Exception {
testMySqlParallelSource(
1,
FailoverType.JM,
FailoverPhase.SNAPSHOT,
new String[] {tableName},
tableName,
- chunkColumnName);
+ chunkColumnName,
+ Collections.singletonMap(
+
"scan.incremental.snapshot.unbounded-chunk-first.enabled",
+ assignEndingFirst));
}
private void testMySqlParallelSource(
@@ -229,7 +252,8 @@ public class OceanBaseFailoverITCase extends
OceanBaseSourceTestBase {
FailoverPhase failoverPhase,
String[] captureCustomerTables,
String tableName,
- String chunkColumnName)
+ String chunkColumnName,
+ Map<String, String> otherOptions)
throws Exception {
testMySqlParallelSource(
DEFAULT_PARALLELISM,
@@ -237,7 +261,8 @@ public class OceanBaseFailoverITCase extends
OceanBaseSourceTestBase {
failoverPhase,
captureCustomerTables,
tableName,
- chunkColumnName);
+ chunkColumnName,
+ otherOptions);
}
private void testMySqlParallelSource(
@@ -246,7 +271,8 @@ public class OceanBaseFailoverITCase extends
OceanBaseSourceTestBase {
FailoverPhase failoverPhase,
String[] captureCustomerTables,
String tableName,
- String chunkColumnName)
+ String chunkColumnName,
+ Map<String, String> otherOptions)
throws Exception {
testMySqlParallelSource(
parallelism,
@@ -257,7 +283,8 @@ public class OceanBaseFailoverITCase extends
OceanBaseSourceTestBase {
1,
0,
tableName,
- chunkColumnName);
+ chunkColumnName,
+ otherOptions);
}
private void testMySqlParallelSource(
@@ -269,7 +296,8 @@ public class OceanBaseFailoverITCase extends
OceanBaseSourceTestBase {
int restartAttempts,
long delayBetweenAttempts,
String tableName,
- String chunkColumnName)
+ String chunkColumnName,
+ Map<String, String> otherOptions)
throws Exception {
testMySqlParallelSource(
parallelism,
@@ -281,7 +309,8 @@ public class OceanBaseFailoverITCase extends
OceanBaseSourceTestBase {
delayBetweenAttempts,
false,
tableName,
- chunkColumnName);
+ chunkColumnName,
+ otherOptions);
}
private void testMySqlParallelSource(
@@ -294,11 +323,10 @@ public class OceanBaseFailoverITCase extends
OceanBaseSourceTestBase {
long delayBetweenAttempts,
boolean skipSnapshotBackfill,
String tableName,
- String chunkColumnName)
+ String chunkColumnName,
+ Map<String, String> otherOptions)
throws Exception {
- captureCustomerTables = new String[] {tableName};
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
- env.enableCheckpointing(3000L, CheckpointingMode.EXACTLY_ONCE);
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
env.setParallelism(parallelism);
@@ -316,7 +344,7 @@ public class OceanBaseFailoverITCase extends
OceanBaseSourceTestBase {
? ""
: ", primary key (id) not enforced")
+ ") WITH ("
- + " 'connector' = 'oceanbase-cdc',"
+ + " 'connector' = 'mysql-cdc',"
+ " 'scan.incremental.snapshot.enabled' =
'true',"
+ " 'hostname' = '%s',"
+ " 'port' = '%s',"
@@ -330,12 +358,13 @@ public class OceanBaseFailoverITCase extends
OceanBaseSourceTestBase {
+ " 'server-time-zone' = 'Asia/Shanghai',"
+ " 'server-id' = '%s'"
+ " %s"
+ + " %s"
+ ")",
getHost(),
getPort(),
getUserName(),
getPassword(),
- DEFAULT_TEST_DATABASE,
+ testDatabase,
getTableNameRegex(captureCustomerTables),
scanStartupMode,
skipSnapshotBackfill,
@@ -344,7 +373,17 @@ public class OceanBaseFailoverITCase extends
OceanBaseSourceTestBase {
? ""
: String.format(
",
'scan.incremental.snapshot.chunk.key-column' = '%s'",
- chunkColumnName));
+ chunkColumnName),
+ otherOptions.isEmpty()
+ ? ""
+ : ","
+ + otherOptions.entrySet().stream()
+ .map(
+ e ->
+ String.format(
+
"'%s'='%s'",
+
e.getKey(), e.getValue()))
+
.collect(Collectors.joining(",")));
tEnv.executeSql(sourceDDL);
TableResult tableResult = tEnv.executeSql("select * from customers");
@@ -356,7 +395,7 @@ public class OceanBaseFailoverITCase extends
OceanBaseSourceTestBase {
// second step: check the binlog data
checkBinlogData(tableResult, failoverType, failoverPhase,
captureCustomerTables);
- // sleepMs(3000);
+ sleepMs(3000);
tableResult.getJobClient().get().cancel().get();
}
@@ -423,7 +462,7 @@ public class OceanBaseFailoverITCase extends
OceanBaseSourceTestBase {
JobID jobId = tableResult.getJobClient().get().getJobID();
for (String tableId : captureCustomerTables) {
- makeFirstPartBinlogEvents(getConnection(), DEFAULT_TEST_DATABASE +
'.' + tableId);
+ makeFirstPartBinlogEvents(getConnection(), testDatabase + '.' +
tableId);
}
// wait for the binlog reading
@@ -438,7 +477,7 @@ public class OceanBaseFailoverITCase extends
OceanBaseSourceTestBase {
waitUntilJobRunning(tableResult);
}
for (String tableId : captureCustomerTables) {
- makeSecondPartBinlogEvents(getConnection(), DEFAULT_TEST_DATABASE
+ '.' + tableId);
+ makeSecondPartBinlogEvents(getConnection(), testDatabase + '.' +
tableId);
}
List<String> expectedBinlogData = new ArrayList<>();
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceITCase.java
index 8c061bb79..c5ebaf6f1 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceITCase.java
@@ -27,7 +27,6 @@ import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import java.sql.Connection;
@@ -39,8 +38,6 @@ import java.util.List;
import static java.lang.String.format;
/** OceanBase CDC source connector integration test. */
-@Disabled(
- "Temporarily disabled for GitHub CI due to unavailability of OceanBase
Binlog Service docker image. These tests are currently only supported for local
execution.")
public class OceanBaseSourceITCase extends OceanBaseSourceTestBase {
private static final String DDL_FILE = "oceanbase_ddl_test";
private static final String DATABASE_NAME = "cdc_s_" + getRandomSuffix();
@@ -89,7 +86,7 @@ public class OceanBaseSourceITCase extends
OceanBaseSourceTestBase {
+ " 'server-id' = '%s'"
+ ")",
getHost(),
- PORT,
+ getPort(),
USER_NAME,
PASSWORD,
DATABASE_NAME,
@@ -241,7 +238,7 @@ public class OceanBaseSourceITCase extends
OceanBaseSourceTestBase {
+ " 'server-id' = '%s'"
+ ")",
getHost(),
- PORT,
+ getPort(),
USER_NAME,
PASSWORD,
DATABASE_NAME,
@@ -295,7 +292,7 @@ public class OceanBaseSourceITCase extends
OceanBaseSourceTestBase {
+ " 'server-id' = '%s'"
+ ")",
getHost(),
- PORT,
+ getPort(),
USER_NAME,
PASSWORD,
DATABASE_NAME,
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceTestBase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceTestBase.java
index 0ea3cb966..703e0a523 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceTestBase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-oceanbase-cdc/src/test/java/org/apache/flink/cdc/connectors/oceanbase/OceanBaseSourceTestBase.java
@@ -30,6 +30,12 @@ import org.apache.commons.lang3.StringUtils;
import org.assertj.core.api.Assertions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
import java.net.URL;
import java.nio.file.Files;
@@ -38,6 +44,7 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
+import java.time.Duration;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
@@ -56,22 +63,40 @@ import java.util.stream.Collectors;
import static java.lang.String.format;
import static org.apache.flink.util.Preconditions.checkState;
-/** Basic class for testing Database OceanBase which supported the mysql
protocol. */
+/** Basic class for testing Database OceanBase. */
+@Testcontainers
public abstract class OceanBaseSourceTestBase extends AbstractTestBase {
private static final Logger LOG =
LoggerFactory.getLogger(OceanBaseSourceTestBase.class);
private static final Pattern COMMENT_PATTERN =
Pattern.compile("^(.*)--.*$");
- protected static final Integer PORT = 3306;
- protected static final String USER_NAME =
System.getenv("OCEANBASE_USERNAME");
- protected static final String PASSWORD =
System.getenv("OCEANBASE_PASSWORD");
- protected static final String HOSTNAME =
System.getenv("OCEANBASE_HOSTNAME");
+
+ protected static final Integer INNER_PORT = 2883;
+ protected static final String USER_NAME = "root@test";
+ protected static final String PASSWORD = "123456";
+ protected static final Duration WAITING_TIMEOUT = Duration.ofMinutes(5);
+
+ public static final Network NETWORK = Network.newNetwork();
+
+ @SuppressWarnings("resource")
+ @Container
+ public static final GenericContainer<?> OB_BINLOG_CONTAINER =
+ new GenericContainer<>("quay.io/oceanbase/obbinlog-ce:4.2.5-test")
+ .withNetwork(NETWORK)
+ .withStartupTimeout(WAITING_TIMEOUT)
+ .withExposedPorts(2881, 2883)
+ .withLogConsumer(new Slf4jLogConsumer(LOG))
+ .waitingFor(
+ new LogMessageWaitStrategy()
+ .withRegEx(".*OBBinlog is ready!.*")
+ .withTimes(1)
+
.withStartupTimeout(Duration.ofMinutes(6)));
protected static String getHost() {
- return HOSTNAME;
+ return OB_BINLOG_CONTAINER.getHost();
}
- protected static Integer getPort() {
- return PORT;
+ protected static int getPort() {
+ return OB_BINLOG_CONTAINER.getMappedPort(INNER_PORT);
}
protected static String getUserName() {
@@ -83,7 +108,7 @@ public abstract class OceanBaseSourceTestBase extends
AbstractTestBase {
}
protected static String getJdbcUrl() {
- return String.format("jdbc:mysql://%s:%s", HOSTNAME, PORT);
+ return String.format("jdbc:mysql://%s:%s", getHost(), getPort());
}
protected static Connection getJdbcConnection() throws SQLException {
@@ -253,7 +278,7 @@ public abstract class OceanBaseSourceTestBase extends
AbstractTestBase {
properties.put("database.port", String.valueOf(getPort()));
properties.put("database.user", getUserName());
properties.put("database.password", getPassword());
- properties.put("database.serverTimezone", ZoneId.of("UTC").toString());
+ properties.put("database.serverTimezone",
ZoneId.of("Asia/Shanghai").toString());
io.debezium.config.Configuration configuration =
io.debezium.config.Configuration.from(properties);
return DebeziumUtils.createMySqlConnection(configuration, new
Properties());
diff --git a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
index 60892d162..188e2e36b 100644
--- a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
+++ b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/pom.xml
@@ -328,6 +328,16 @@ limitations under the License.
</outputDirectory>
</artifactItem>
+ <artifactItem>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-sql-connector-oceanbase-cdc</artifactId>
+ <version>${project.version}</version>
+
<destFileName>oceanbase-cdc-connector.jar</destFileName>
+ <type>jar</type>
+
<outputDirectory>${project.build.directory}/dependencies
+ </outputDirectory>
+ </artifactItem>
+
<artifactItem>
<groupId>com.ibm.db2.jcc</groupId>
<artifactId>db2jcc</artifactId>
diff --git
a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java
new file mode 100644
index 000000000..bb0cb26f2
--- /dev/null
+++
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/java/org/apache/flink/cdc/connectors/tests/OceanBaseE2eITCase.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.tests;
+
+import org.apache.flink.cdc.common.test.utils.JdbcProxy;
+import org.apache.flink.cdc.common.test.utils.TestUtils;
+import
org.apache.flink.cdc.connectors.tests.utils.FlinkContainerTestEnvironment;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.containers.wait.strategy.LogMessageWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static
org.apache.flink.cdc.connectors.base.utils.EnvironmentUtils.supportCheckpointsAfterTasksFinished;
+
+/** End-to-end tests for oceanbase-cdc connector uber jar. */
+@Testcontainers
+class OceanBaseE2eITCase extends FlinkContainerTestEnvironment {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(OceanBaseE2eITCase.class);
+
+ private static final Path oceanbaseCdcJar =
+ TestUtils.getResource("oceanbase-cdc-connector.jar");
+ private static final Pattern COMMENT_PATTERN =
Pattern.compile("^(.*)--.*$");
+ private static final String[] CREATE_DATABASE_DDL =
+ new String[] {"CREATE DATABASE `$DBNAME$`;", "USE `$DBNAME$`;"};
+ private static final Path jdbcDriver =
TestUtils.getResource("mysql-driver.jar");
+ private static final String DATABASE_NAME = "oceanbase_inventory";
+
+ private static final int OB_PROXY_PORT = 2883;
+ protected static final String DEFAULT_USERNAME = "root@test";
+ protected static final String DEFAULT_PASSWORD = "123456";
+ protected static final String INTER_CONTAINER_OCEANBASE_ALIAS =
"oceanbase";
+
+ @Container
+ @SuppressWarnings("resource")
+ public static final GenericContainer<?> OB_BINLOG_CONTAINER =
+ new GenericContainer<>("quay.io/oceanbase/obbinlog-ce:4.2.5-test")
+ .withNetwork(NETWORK)
+ .withNetworkAliases(INTER_CONTAINER_OCEANBASE_ALIAS)
+ .withStartupTimeout(Duration.ofMinutes(5))
+ .withExposedPorts(2881, 2883)
+ .withLogConsumer(new Slf4jLogConsumer(LOG))
+ .waitingFor(
+ new LogMessageWaitStrategy()
+ .withRegEx(".*OBBinlog is ready!.*")
+ .withTimes(1)
+
.withStartupTimeout(Duration.ofMinutes(6)));
+
+ @BeforeEach
+ public void before() {
+ super.before();
+ createAndInitializeOBTable("oceanbase_inventory");
+ }
+
+ @AfterEach
+ public void after() {
+ super.after();
+ }
+
+ @Test
+ void testOBBinlogCDC() throws Exception {
+ List<String> sqlLines =
+ Arrays.asList(
+ "SET 'execution.checkpointing.interval' = '3s';",
+ "SET
'execution.checkpointing.checkpoints-after-tasks-finish.enabled' = 'true';",
+ "CREATE TABLE products_source (",
+ " `id` INT NOT NULL,",
+ " name STRING,",
+ " description STRING,",
+ " weight DECIMAL(10,3),",
+ " enum_c STRING,",
+ " json_c STRING,",
+ " point_c STRING,",
+ " primary key (`id`) not enforced",
+ ") WITH (",
+ " 'connector' = 'oceanbase-cdc',",
+ " 'hostname' = '" + INTER_CONTAINER_OCEANBASE_ALIAS +
"',",
+ " 'port' = '2883',",
+ " 'username' = '" + DEFAULT_USERNAME + "',",
+ " 'password' = '" + DEFAULT_PASSWORD + "',",
+ " 'database-name' = '" + DATABASE_NAME + "',",
+ " 'table-name' = 'products_source',",
+ " 'server-time-zone' = 'Asia/Shanghai',",
+ " 'server-id' = '5800-5900',",
+ " 'scan.incremental.snapshot.chunk.size' = '4',",
+ " 'scan.incremental.close-idle-reader.enabled' = '"
+ + supportCheckpointsAfterTasksFinished()
+ + "'",
+ ");",
+ "CREATE TABLE products_sink (",
+ " `id` INT NOT NULL,",
+ " name STRING,",
+ " description STRING,",
+ " weight DECIMAL(10,3),",
+ " enum_c STRING,",
+ " json_c STRING,",
+ " point_c STRING,",
+ " primary key (`id`) not enforced",
+ ") WITH (",
+ " 'connector' = 'jdbc',",
+ String.format(
+ " 'url' = 'jdbc:mysql://%s:2883/%s',",
+ INTER_CONTAINER_OCEANBASE_ALIAS,
DATABASE_NAME),
+ " 'table-name' = 'products_sink',",
+ " 'username' = '" + DEFAULT_USERNAME + "',",
+ " 'password' = '" + DEFAULT_PASSWORD + "'",
+ ");",
+ "INSERT INTO products_sink",
+ "SELECT * FROM products_source;");
+
+ submitSQLJob(sqlLines, oceanbaseCdcJar, jdbcJar, jdbcDriver);
+ waitUntilJobRunning(Duration.ofSeconds(30));
+
+ // generate binlogs
+ String jdbcUrl =
+ String.format(
+ "jdbc:mysql://%s:%s/%s",
+ OB_BINLOG_CONTAINER.getHost(),
+ OB_BINLOG_CONTAINER.getMappedPort(OB_PROXY_PORT),
+ DATABASE_NAME);
+ try (Connection conn =
+ DriverManager.getConnection(jdbcUrl, DEFAULT_USERNAME,
DEFAULT_PASSWORD);
+ Statement stat = conn.createStatement()) {
+ stat.execute(
+ "UPDATE products_source SET description='18oz carpenter
hammer' WHERE id=106;");
+ stat.execute("UPDATE products_source SET weight='5.1' WHERE
id=107;");
+ stat.execute(
+ "INSERT INTO products_source VALUES
(default,'jacket','water resistent white wind breaker',0.2, null, null,
null);"); // 110
+ stat.execute(
+ "INSERT INTO products_source VALUES
(default,'scooter','Big 2-wheel scooter ',5.18, null, null, null);");
+ stat.execute(
+ "UPDATE products_source SET description='new water
resistent white wind breaker', weight='0.5' WHERE id=110;");
+ stat.execute("UPDATE products_source SET weight='5.17' WHERE
id=111;");
+ stat.execute("DELETE FROM products_source WHERE id=111;");
+ // add schema change event in the last.
+ stat.execute("CREATE TABLE new_table (id int, age int);");
+ } catch (SQLException e) {
+ LOG.error("Update table for CDC failed.", e);
+ throw e;
+ }
+
+ // assert final results
+ JdbcProxy proxy =
+ new JdbcProxy(jdbcUrl, DEFAULT_USERNAME, DEFAULT_PASSWORD,
MYSQL_DRIVER_CLASS);
+ List<String> expectResult =
+ Arrays.asList(
+ "101,scooter,Small 2-wheel scooter,3.14,red,{\"key1\":
\"value1\"},{\"coordinates\":[1,1],\"type\":\"Point\",\"srid\":0}",
+ "102,car battery,12V car battery,8.1,white,{\"key2\":
\"value2\"},{\"coordinates\":[2,2],\"type\":\"Point\",\"srid\":0}",
+ "103,12-pack drill bits,12-pack of drill bits with
sizes ranging from #40 to #3,0.8,red,{\"key3\":
\"value3\"},{\"coordinates\":[3,3],\"type\":\"Point\",\"srid\":0}",
+ "104,hammer,12oz carpenter's
hammer,0.75,white,{\"key4\":
\"value4\"},{\"coordinates\":[4,4],\"type\":\"Point\",\"srid\":0}",
+ "105,hammer,14oz carpenter's hammer,0.875,red,{\"k1\":
\"v1\", \"k2\": \"v2\"},{\"coordinates\":[5,5],\"type\":\"Point\",\"srid\":0}",
+ "106,hammer,18oz carpenter hammer,1.0,null,null,null",
+ "107,rocks,box of assorted rocks,5.1,null,null,null",
+ "108,jacket,water resistent black wind
breaker,0.1,null,null,null",
+ "109,spare tire,24 inch spare
tire,22.2,null,null,null",
+ "110,jacket,new water resistent white wind
breaker,0.5,null,null,null");
+ proxy.checkResultWithTimeout(
+ expectResult,
+ "products_sink",
+ new String[] {"id", "name", "description", "weight", "enum_c",
"json_c", "point_c"},
+ 300000L);
+ }
+
+ /** Creates the database and populates it with initialization SQL script.
*/
+ public void createAndInitializeOBTable(String sqlFile) {
+ final String ddlFile = String.format("ddl/%s.sql", sqlFile);
+ final URL ddlTestFile =
OceanBaseE2eITCase.class.getClassLoader().getResource(ddlFile);
+ Assertions.assertThat(ddlTestFile).withFailMessage("Cannot locate " +
ddlFile).isNotNull();
+ try {
+ try (Connection connection = getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ final List<String> statements =
+ Arrays.stream(
+ Stream.concat(
+
Arrays.stream(CREATE_DATABASE_DDL),
+ Files.readAllLines(
+
Paths.get(ddlTestFile.toURI()))
+ .stream())
+ .map(String::trim)
+ .filter(x ->
!x.startsWith("--") && !x.isEmpty())
+ .map(
+ x -> {
+ final Matcher m =
+
COMMENT_PATTERN.matcher(x);
+ return m.matches()
? m.group(1) : x;
+ })
+ .map(sql ->
sql.replace("$DBNAME$", DATABASE_NAME))
+
.collect(Collectors.joining("\n"))
+ .split(";"))
+ .map(x -> x.replace("$$", ";"))
+ .collect(Collectors.toList());
+ for (String stmt : statements) {
+ statement.execute(stmt);
+ }
+ }
+ } catch (final Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public Connection getJdbcConnection() throws SQLException {
+ String url =
+ "jdbc:mysql://"
+ + OB_BINLOG_CONTAINER.getHost()
+ + ":"
+ + OB_BINLOG_CONTAINER.getMappedPort(OB_PROXY_PORT)
+ + "?useSSL=false";
+ return DriverManager.getConnection(url, DEFAULT_USERNAME,
DEFAULT_PASSWORD);
+ }
+}
diff --git
a/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oceanbase_inventory.sql
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oceanbase_inventory.sql
new file mode 100644
index 000000000..73bb75d7f
--- /dev/null
+++
b/flink-cdc-e2e-tests/flink-cdc-source-e2e-tests/src/test/resources/ddl/oceanbase_inventory.sql
@@ -0,0 +1,51 @@
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+
+--
----------------------------------------------------------------------------------------------------------------
+-- DATABASE: inventory
+--
----------------------------------------------------------------------------------------------------------------
+
+-- Create and populate our products using a single insert with many rows
+CREATE TABLE products_source (
+ id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ name VARCHAR(255) NOT NULL DEFAULT 'flink',
+ description VARCHAR(512),
+ weight FLOAT,
+ enum_c enum('red', 'white') default 'red', -- test some complex types as
well,
+ json_c JSON, -- because we use additional
dependencies to deserialize complex types.
+ point_c POINT
+);
+ALTER TABLE products_source AUTO_INCREMENT = 101;
+
+INSERT INTO products_source
+VALUES (default,"scooter","Small 2-wheel scooter",3.14, 'red', '{"key1":
"value1"}', ST_GeomFromText('POINT(1 1)')),
+ (default,"car battery","12V car battery",8.1, 'white', '{"key2":
"value2"}', ST_GeomFromText('POINT(2 2)')),
+ (default,"12-pack drill bits","12-pack of drill bits with sizes ranging
from #40 to #3",0.8, 'red', '{"key3": "value3"}', ST_GeomFromText('POINT(3
3)')),
+ (default,"hammer","12oz carpenter's hammer",0.75, 'white', '{"key4":
"value4"}', ST_GeomFromText('POINT(4 4)')),
+ (default,"hammer","14oz carpenter's hammer",0.875, 'red', '{"k1": "v1",
"k2": "v2"}', ST_GeomFromText('POINT(5 5)')),
+ (default,"hammer","16oz carpenter's hammer",1.0, null, null, null),
+ (default,"rocks","box of assorted rocks",5.3, null, null, null),
+ (default,"jacket","water resistent black wind breaker",0.1, null, null,
null),
+ (default,"spare tire","24 inch spare tire",22.2, null, null, null);
+
+CREATE TABLE products_sink (
+ id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ name VARCHAR(255) NOT NULL DEFAULT 'flink',
+ description VARCHAR(512),
+ weight FLOAT,
+ enum_c VARCHAR(255),
+ json_c VARCHAR(255),
+ point_c VARCHAR(255)
+);
\ No newline at end of file