This is an automated email from the ASF dual-hosted git repository.
ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7696ba155 [feature][e2e][cdc] add mysql cdc container (#3667)
7696ba155 is described below
commit 7696ba1551a3cf26f7038fdf108debaec7e5ebef
Author: Zongwen Li <[email protected]>
AuthorDate: Mon Dec 12 21:12:56 2022 +0800
[feature][e2e][cdc] add mysql cdc container (#3667)
* [feature][e2e][cdc] add mysql cdc container
* [build] execution test-jar step
---
.../connector-cdc/connector-cdc-mysql/pom.xml | 37 +++++
.../cdc/mysql/testutils/MySqlContainer.java | 177 +++++++++++++++++++++
.../cdc/mysql/testutils/MySqlVersion.java | 41 +++++
.../cdc/mysql/testutils/UniqueDatabase.java | 160 +++++++++++++++++++
.../connector-cdc-mysql-e2e}/pom.xml | 61 ++++---
.../cdc/mysql/MySqlIncrementalSourceIT.java | 77 +++++++++
.../src/test/resources/ddl/inventory.sql | 95 +++++++++++
.../src/test/resources/docker/server-gtids/my.cnf | 65 ++++++++
.../src/test/resources/docker/setup.sql | 32 ++++
.../src/test/resources/log4j2-test.properties | 29 ++++
seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 1 +
11 files changed, 742 insertions(+), 33 deletions(-)
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/pom.xml
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/pom.xml
index 6f83f8107..4d20afb10 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/pom.xml
+++ b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/pom.xml
@@ -44,6 +44,17 @@
<artifactId>connector-jdbc</artifactId>
<version>${project.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>mysql</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.13.2</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<dependencyManagement>
@@ -62,7 +73,33 @@
<scope>compile</scope>
</dependency>
+ <!-- test dependencies on TestContainers -->
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>mysql</artifactId>
+ <version>${testcontainer.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</dependencyManagement>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <version>2.4</version>
+ <configuration>
+ <skip>false</skip>
+ </configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
</project>
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/testutils/MySqlContainer.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/testutils/MySqlContainer.java
new file mode 100644
index 000000000..3a2f11684
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/testutils/MySqlContainer.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils;
+
+import org.testcontainers.containers.ContainerLaunchException;
+import org.testcontainers.containers.JdbcDatabaseContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * Docker container for MySQL. The difference between this class and {@link
+ * org.testcontainers.containers.MySQLContainer} is that TC MySQLContainer has
problems when
+ * overriding mysql conf file, i.e. my.cnf.
+ */
+@SuppressWarnings("MagicNumber")
+public class MySqlContainer extends JdbcDatabaseContainer<MySqlContainer> {
+
+ public static final String IMAGE = "mysql";
+ public static final Integer MYSQL_PORT = 3306;
+
+ private static final String MY_CNF_CONFIG_OVERRIDE_PARAM_NAME = "MY_CNF";
+ private static final String SETUP_SQL_PARAM_NAME = "SETUP_SQL";
+ private static final String MYSQL_ROOT_USER = "root";
+
+ private String databaseName = "test";
+ private String username = "test";
+ private String password = "test";
+
+ public MySqlContainer() {
+ this(MySqlVersion.V5_7);
+ }
+
+ public MySqlContainer(MySqlVersion version) {
+ super(DockerImageName.parse(IMAGE + ":" + version.getVersion()));
+ addExposedPort(MYSQL_PORT);
+ }
+
+ @Override
+ protected Set<Integer> getLivenessCheckPorts() {
+ return new HashSet<>(getMappedPort(MYSQL_PORT));
+ }
+
+ @Override
+ protected void configure() {
+ optionallyMapResourceParameterAsVolume(
+ MY_CNF_CONFIG_OVERRIDE_PARAM_NAME, "/etc/mysql/",
"mysql-default-conf");
+
+ if (parameters.containsKey(SETUP_SQL_PARAM_NAME)) {
+ optionallyMapResourceParameterAsVolume(
+ SETUP_SQL_PARAM_NAME, "/docker-entrypoint-initdb.d/",
"N/A");
+ }
+
+ addEnv("MYSQL_DATABASE", databaseName);
+ addEnv("MYSQL_USER", username);
+ if (password != null && !password.isEmpty()) {
+ addEnv("MYSQL_PASSWORD", password);
+ addEnv("MYSQL_ROOT_PASSWORD", password);
+ } else if (MYSQL_ROOT_USER.equalsIgnoreCase(username)) {
+ addEnv("MYSQL_ALLOW_EMPTY_PASSWORD", "yes");
+ } else {
+ throw new ContainerLaunchException(
+ "Empty password can be used only with the root user");
+ }
+ setStartupAttempts(3);
+ }
+
+ @Override
+ public String getDriverClassName() {
+ try {
+ Class.forName("com.mysql.cj.jdbc.Driver");
+ return "com.mysql.cj.jdbc.Driver";
+ } catch (ClassNotFoundException e) {
+ return "com.mysql.jdbc.Driver";
+ }
+ }
+
+ public String getJdbcUrl(String databaseName) {
+ String additionalUrlParams = constructUrlParameters("?", "&");
+ return "jdbc:mysql://"
+ + getHost()
+ + ":"
+ + getDatabasePort()
+ + "/"
+ + databaseName
+ + additionalUrlParams;
+ }
+
+ @Override
+ public String getJdbcUrl() {
+ return getJdbcUrl(databaseName);
+ }
+
+ public int getDatabasePort() {
+ return getMappedPort(MYSQL_PORT);
+ }
+
+ @Override
+ protected String constructUrlForConnection(String queryString) {
+ String url = super.constructUrlForConnection(queryString);
+
+ if (!url.contains("useSSL=")) {
+ String separator = url.contains("?") ? "&" : "?";
+ url = url + separator + "useSSL=false";
+ }
+
+ if (!url.contains("allowPublicKeyRetrieval=")) {
+ url = url + "&allowPublicKeyRetrieval=true";
+ }
+
+ return url;
+ }
+
+ @Override
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
+ @Override
+ public String getUsername() {
+ return username;
+ }
+
+ @Override
+ public String getPassword() {
+ return password;
+ }
+
+ @Override
+ protected String getTestQueryString() {
+ return "SELECT 1";
+ }
+
+ public MySqlContainer withConfigurationOverride(String s) {
+ parameters.put(MY_CNF_CONFIG_OVERRIDE_PARAM_NAME, s);
+ return this;
+ }
+
+ public MySqlContainer withSetupSQL(String sqlPath) {
+ parameters.put(SETUP_SQL_PARAM_NAME, sqlPath);
+ return this;
+ }
+
+ @Override
+ public MySqlContainer withDatabaseName(final String databaseName) {
+ this.databaseName = databaseName;
+ return this;
+ }
+
+ @Override
+ public MySqlContainer withUsername(final String username) {
+ this.username = username;
+ return this;
+ }
+
+ @Override
+ public MySqlContainer withPassword(final String password) {
+ this.password = password;
+ return this;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/testutils/MySqlVersion.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/testutils/MySqlVersion.java
new file mode 100644
index 000000000..5763e2dc8
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/testutils/MySqlVersion.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils;
+
+/** MySql version enum. */
+public enum MySqlVersion {
+ V5_5("5.5"),
+ V5_6("5.6"),
+ V5_7("5.7"),
+ V8_0("8.0");
+
+ private final String version;
+
+ MySqlVersion(String version) {
+ this.version = version;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ @Override
+ public String toString() {
+ return "MySqlVersion{" + "version='" + version + '\'' + '}';
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/testutils/UniqueDatabase.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/testutils/UniqueDatabase.java
new file mode 100644
index 000000000..d5f10dd23
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/testutils/UniqueDatabase.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils;
+
+import lombok.extern.slf4j.Slf4j;
+import org.junit.jupiter.api.Assertions;
+
+import java.net.URL;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Create and populate a unique instance of a MySQL database for each run of
JUnit test. A user of
+ * class needs to provide a logical name for Debezium and database name. It is
expected that there
+ * is an init file in
<code>src/test/resources/ddl/<database_name>.sql</code>. The database
+ * name is enriched with a unique suffix that guarantees complete isolation
between runs <code>
+ * <database_name>_<suffix></code>
+ *
+ * <p>This class is inspired from Debezium project.
+ */
+@SuppressWarnings("MagicNumber")
+@Slf4j
+public class UniqueDatabase {
+
+ private static final String[] CREATE_DATABASE_DDL =
+ new String[]{"CREATE DATABASE $DBNAME$;", "USE $DBNAME$;"};
+ private static final Pattern COMMENT_PATTERN =
Pattern.compile("^(.*)--.*$");
+
+ private final MySqlContainer container;
+ private final String databaseName;
+ private final String templateName;
+ private final String username;
+ private final String password;
+
+ public UniqueDatabase(
+ MySqlContainer container, String databaseName, String username, String
password) {
+ this(
+ container,
+ databaseName,
+ Integer.toUnsignedString(new Random().nextInt(), 36),
+ username,
+ password);
+ }
+
+ private UniqueDatabase(
+ MySqlContainer container,
+ String databaseName,
+ final String identifier,
+ String username,
+ String password) {
+ this.container = container;
+ this.databaseName = databaseName + "_" + identifier;
+ this.templateName = databaseName;
+ this.username = username;
+ this.password = password;
+ }
+
+ public String getHost() {
+ return container.getHost();
+ }
+
+ public int getDatabasePort() {
+ return container.getDatabasePort();
+ }
+
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ /**
+ * @return Fully qualified table name
<code><databaseName>.<tableName></code>
+ */
+ public String qualifiedTableName(final String tableName) {
+ return String.format("%s.%s", databaseName, tableName);
+ }
+
+ /**
+ * Creates the database and populates it with initialization SQL script.
+ */
+ public void createAndInitialize() {
+ final String ddlFile = String.format("ddl/%s.sql", templateName);
+ final URL ddlTestFile =
UniqueDatabase.class.getClassLoader().getResource(ddlFile);
+ Assertions.assertNotNull(ddlTestFile, "Cannot locate " + ddlFile);
+ try {
+ try (Connection connection =
+ DriverManager.getConnection(
+ container.getJdbcUrl(), username, password);
+ Statement statement = connection.createStatement()) {
+ final List<String> statements =
+ Arrays.stream(
+ Stream.concat(
+ Arrays.stream(CREATE_DATABASE_DDL),
+ Files.readAllLines(
+ Paths.get(ddlTestFile.toURI()))
+ .stream())
+ .map(String::trim)
+ .filter(x -> !x.startsWith("--") &&
!x.isEmpty())
+ .map(
+ x -> {
+ final Matcher m =
+ COMMENT_PATTERN.matcher(x);
+ return m.matches() ? m.group(1) : x;
+ })
+ .map(this::convertSQL)
+ .collect(Collectors.joining("\n"))
+ .split(";"))
+ .map(x -> x.replace("$$", ";"))
+ .collect(Collectors.toList());
+ for (String stmt : statements) {
+ statement.execute(stmt);
+ log.info(stmt);
+ }
+ }
+ } catch (final Exception e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public Connection getJdbcConnection() throws SQLException {
+ return DriverManager.getConnection(container.getJdbcUrl(databaseName),
username, password);
+ }
+
+ private String convertSQL(final String sql) {
+ return sql.replace("$DBNAME$", databaseName);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/pom.xml
similarity index 72%
copy from seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/pom.xml
copy to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/pom.xml
index 6f83f8107..a773bb488 100644
--- a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mysql/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/pom.xml
@@ -1,68 +1,63 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
-
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
-
http://www.apache.org/licenses/LICENSE-2.0
-
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>connector-cdc</artifactId>
+ <artifactId>seatunnel-connector-v2-e2e</artifactId>
<groupId>org.apache.seatunnel</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>connector-cdc-mysql</artifactId>
- <dependencies>
-
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-cdc-base</artifactId>
- </dependency>
+ <artifactId>connector-cdc-mysql-e2e</artifactId>
- <dependency>
- <groupId>io.debezium</groupId>
- <artifactId>debezium-connector-mysql</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-jdbc</artifactId>
- <version>${project.version}</version>
- </dependency>
- </dependencies>
+ <properties>
+ </properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-cdc-base</artifactId>
+ <artifactId>connector-cdc-mysql</artifactId>
<version>${project.version}</version>
- <scope>compile</scope>
+ <type>pom</type>
+ <scope>import</scope>
</dependency>
-
- <dependency>
- <groupId>io.debezium</groupId>
- <artifactId>debezium-connector-mysql</artifactId>
- <version>${debezium.version}</version>
- <scope>compile</scope>
- </dependency>
-
</dependencies>
</dependencyManagement>
-</project>
+ <dependencies>
+ <!-- SeaTunnel connectors -->
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-cdc-mysql</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-cdc-mysql</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>mysql</artifactId>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MySqlIncrementalSourceIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MySqlIncrementalSourceIT.java
new file mode 100644
index 000000000..4657d0638
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/java/org/apache/seatunnel/connectors/seatunnel/cdc/mysql/MySqlIncrementalSourceIT.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.cdc.mysql;
+
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlContainer;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.MySqlVersion;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mysql.testutils.UniqueDatabase;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.util.stream.Stream;
+
+@DisabledOnContainer(value = {}, type = {EngineType.SPARK, EngineType.FLINK},
disabledReason = "")
+public class MySqlIncrementalSourceIT extends TestSuiteBase implements
TestResource {
+ private static final Logger LOG =
LoggerFactory.getLogger(MySqlIncrementalSourceIT.class);
+
+ private static final MySqlContainer MYSQL_CONTAINER =
createMySqlContainer(MySqlVersion.V8_0);
+ private final UniqueDatabase inventoryDatabase =
+ new UniqueDatabase(MYSQL_CONTAINER, "inventory", "mysqluser",
"mysqlpw");
+
+ private static MySqlContainer createMySqlContainer(MySqlVersion version) {
+ return new MySqlContainer(version)
+ .withConfigurationOverride("docker/server-gtids/my.cnf")
+ .withSetupSQL("docker/setup.sql")
+ .withDatabaseName("seatunnel-test")
+ .withUsername("st_user")
+ .withPassword("seatunnel")
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+ }
+
+ @BeforeAll
+ @Override
+ public void startUp() throws Exception {
+ LOG.info("Starting containers...");
+ Startables.deepStart(Stream.of(MYSQL_CONTAINER)).join();
+ LOG.info("Containers are started.");
+ }
+
+ @Test
+ @Disabled("Offline jobs are not currently supported")
+ public void testMysql() {
+ inventoryDatabase.createAndInitialize();
+ LOG.info("-------mysql port:{}", MYSQL_CONTAINER.getDatabasePort());
+ }
+
+ @AfterAll
+ @Override
+ public void tearDown() throws Exception {
+ MYSQL_CONTAINER.close();
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/inventory.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/inventory.sql
new file mode 100644
index 000000000..9e9fff3f8
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/ddl/inventory.sql
@@ -0,0 +1,95 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+--
----------------------------------------------------------------------------------------------------------------
+-- DATABASE: inventory
+--
----------------------------------------------------------------------------------------------------------------
+
+-- Create and populate our products using a single insert with many rows
+CREATE TABLE products (
+ id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ name VARCHAR(255) NOT NULL DEFAULT 'SeaTunnel',
+ description VARCHAR(512),
+ weight FLOAT
+);
+ALTER TABLE products AUTO_INCREMENT = 101;
+
+INSERT INTO products
+VALUES (default,"scooter","Small 2-wheel scooter",3.14),
+ (default,"car battery","12V car battery",8.1),
+ (default,"12-pack drill bits","12-pack of drill bits with sizes ranging
from #40 to #3",0.8),
+ (default,"hammer","12oz carpenter's hammer",0.75),
+ (default,"hammer","14oz carpenter's hammer",0.875),
+ (default,"hammer","16oz carpenter's hammer",1.0),
+ (default,"rocks","box of assorted rocks",5.3),
+ (default,"jacket","water resistent black wind breaker",0.1),
+ (default,"spare tire","24 inch spare tire",22.2);
+
+-- Create and populate the products on hand using multiple inserts
+CREATE TABLE products_on_hand (
+ product_id INTEGER NOT NULL PRIMARY KEY,
+ quantity INTEGER NOT NULL,
+ FOREIGN KEY (product_id) REFERENCES products(id)
+);
+
+INSERT INTO products_on_hand VALUES (101,3);
+INSERT INTO products_on_hand VALUES (102,8);
+INSERT INTO products_on_hand VALUES (103,18);
+INSERT INTO products_on_hand VALUES (104,4);
+INSERT INTO products_on_hand VALUES (105,5);
+INSERT INTO products_on_hand VALUES (106,0);
+INSERT INTO products_on_hand VALUES (107,44);
+INSERT INTO products_on_hand VALUES (108,2);
+INSERT INTO products_on_hand VALUES (109,5);
+
+-- Create some customers ...
+CREATE TABLE customers (
+ id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ first_name VARCHAR(255) NOT NULL,
+ last_name VARCHAR(255) NOT NULL,
+ email VARCHAR(255) NOT NULL UNIQUE KEY
+) AUTO_INCREMENT=1001;
+
+
+INSERT INTO customers
+VALUES (default,"Sally","Thomas","[email protected]"),
+ (default,"George","Bailey","[email protected]"),
+ (default,"Edward","Walker","[email protected]"),
+ (default,"Anne","Kretchmar","[email protected]");
+
+-- Create some very simple orders
+CREATE TABLE orders (
+ order_number INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ order_date DATE NOT NULL,
+ purchaser INTEGER NOT NULL,
+ quantity INTEGER NOT NULL,
+ product_id INTEGER NOT NULL,
+ FOREIGN KEY order_customer (purchaser) REFERENCES customers(id),
+ FOREIGN KEY ordered_product (product_id) REFERENCES products(id)
+) AUTO_INCREMENT = 10001;
+
+INSERT INTO orders
+VALUES (default, '2016-01-16', 1001, 1, 102),
+ (default, '2016-01-17', 1002, 2, 105),
+ (default, '2016-02-18', 1004, 3, 109),
+ (default, '2016-02-19', 1002, 2, 106),
+ (default, '16-02-21', 1003, 1, 107);
+
+CREATE TABLE category (
+ id INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
+ category_name VARCHAR(255)
+);
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/docker/server-gtids/my.cnf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/docker/server-gtids/my.cnf
new file mode 100644
index 000000000..a39089788
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/docker/server-gtids/my.cnf
@@ -0,0 +1,65 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# For advice on how to change settings please see
+# http://dev.mysql.com/doc/refman/5.7/en/server-configuration-defaults.html
+
+[mysqld]
+#
+# Remove leading # and set to the amount of RAM for the most important data
+# cache in MySQL. Start at 70% of total RAM for dedicated server, else 10%.
+# innodb_buffer_pool_size = 128M
+#
+# Remove leading # to turn on a very important data integrity option: logging
+# changes to the binary log between backups.
+# log_bin
+#
+# Remove leading # to set options mainly useful for reporting servers.
+# The server defaults are faster for transactions and fast SELECTs.
+# Adjust sizes as needed, experiment to find the optimal values.
+# join_buffer_size = 128M
+# sort_buffer_size = 2M
+# read_rnd_buffer_size = 2M
+skip-host-cache
+skip-name-resolve
+#datadir=/var/lib/mysql
+#socket=/var/lib/mysql/mysql.sock
+secure-file-priv=/var/lib/mysql
+user=mysql
+
+# Disabling symbolic-links is recommended to prevent assorted security risks
+symbolic-links=0
+
+#log-error=/var/log/mysqld.log
+#pid-file=/var/run/mysqld/mysqld.pid
+
+# ----------------------------------------------
+# Enable the binlog for replication & CDC
+# ----------------------------------------------
+
+# Enable binary replication log and set the prefix, expiration, and log format.
+# The prefix is arbitrary, expiration can be short for integration tests but
would
+# be longer on a production system. Row-level info is required for ingest to
work.
+# Server ID is required, but this will vary on production systems
+server-id = 223344
+log_bin = mysql-bin
+expire_logs_days = 1
+binlog_format = row
+
+# enable gtid mode
+gtid_mode = on
+enforce_gtid_consistency = on
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/docker/setup.sql
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/docker/setup.sql
new file mode 100644
index 000000000..2edd6c917
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/docker/setup.sql
@@ -0,0 +1,32 @@
+--
+-- Licensed to the Apache Software Foundation (ASF) under one or more
+-- contributor license agreements. See the NOTICE file distributed with
+-- this work for additional information regarding copyright ownership.
+-- The ASF licenses this file to You under the Apache License, Version 2.0
+-- (the "License"); you may not use this file except in compliance with
+-- the License. You may obtain a copy of the License at
+--
+-- http://www.apache.org/licenses/LICENSE-2.0
+--
+-- Unless required by applicable law or agreed to in writing, software
+-- distributed under the License is distributed on an "AS IS" BASIS,
+-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+-- See the License for the specific language governing permissions and
+-- limitations under the License.
+--
+
+-- In production you would almost certainly limit the replication user must be
on the follower (slave) machine,
+-- to prevent other clients accessing the log from other machines. For
example, 'replicator'@'follower.acme.com'.
+-- However, in this database we'll grant 2 users different privileges:
+--
+-- 1) 'st_user' - all privileges required by the snapshot reader AND binlog
reader (used for testing)
+-- 2) 'mysqluser' - all privileges
+--
+GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT,
LOCK TABLES ON *.* TO 'st_user'@'%';
+CREATE USER 'mysqluser' IDENTIFIED BY 'mysqlpw';
+GRANT ALL PRIVILEGES ON *.* TO 'mysqluser'@'%';
+
+--
----------------------------------------------------------------------------------------------------------------
+-- DATABASE: emptydb
+--
----------------------------------------------------------------------------------------------------------------
+CREATE DATABASE emptydb;
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/log4j2-test.properties
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000..d1ca535f2
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-cdc-mysql-e2e/src/test/resources/log4j2-test.properties
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level=INFO
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%t] %-5p %c - %m%n
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 7c4c06422..eec52b79f 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -40,6 +40,7 @@
<module>connector-fake-e2e</module>
<module>connector-elasticsearch-e2e</module>
<module>connector-iotdb-e2e</module>
+ <module>connector-cdc-mysql-e2e</module>
</modules>
<artifactId>seatunnel-connector-v2-e2e</artifactId>