This is an automated email from the ASF dual-hosted git repository.
yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new aabf30622 [FLINK-36520] MySQL CDC Supports connecting to MySQL server
8.4+ (#3666)
aabf30622 is described below
commit aabf3062279523b740b1a00bbb132aa1ac90983c
Author: yuxiqian <[email protected]>
AuthorDate: Fri Mar 20 19:28:23 2026 +0800
[FLINK-36520] MySQL CDC Supports connecting to MySQL server 8.4+ (#3666)
---
.../source/MySqlPipelineCompatibilityITCase.java | 277 ++++++++
.../flink/cdc/debezium/DebeziumSourceFunction.java | 9 +-
.../debezium/connector/mysql/MySqlConnection.java | 56 +-
.../mysql/MySqlSnapshotChangeEventSource.java | 717 +++++++++++++++++++++
.../connector/mysql/util/ErrorMessageUtils.java | 2 +-
.../connectors/mysql/debezium/DebeziumUtils.java | 10 +-
.../mysql/table/MySqlCompatibilityITCase.java | 154 ++---
.../connectors/mysql/testutils/MySqlVersion.java | 3 +-
.../pipeline/tests/MySqlToIcebergE2eITCase.java | 17 -
pom.xml | 2 +-
10 files changed, 1137 insertions(+), 110 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineCompatibilityITCase.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineCompatibilityITCase.java
new file mode 100644
index 000000000..d0ad0d017
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineCompatibilityITCase.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.source.FlinkSourceProvider;
+import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory;
+import
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
+import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.RestartStrategyUtils;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedClass;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Stream;
+
+import static
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD;
+import static
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration tests to check MySQL pipeline connector works well with
different MySQL server
+ * versions.
+ */
+@ParameterizedClass
+@EnumSource(
+ value = MySqlVersion.class,
+ names = {"V5_7", "V8_0", "V8_4"})
+class MySqlPipelineCompatibilityITCase {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(MySqlPipelineCompatibilityITCase.class);
+
+ private static Path tempFolder;
+ private static File resourceFolder;
+
+ private final MySqlVersion version;
+ private final MySqlContainer mySqlContainer;
+ private final UniqueDatabase testDatabase;
+
+ private final StreamExecutionEnvironment env =
+ StreamExecutionEnvironment.getExecutionEnvironment();
+
+ MySqlPipelineCompatibilityITCase(MySqlVersion version) {
+ this.version = version;
+ this.mySqlContainer =
+ (MySqlContainer)
+ new MySqlContainer(version)
+
.withConfigurationOverride(buildCustomMySqlConfig(version))
+ .withSetupSQL("docker/setup.sql")
+ .withDatabaseName("flink-test")
+ .withUsername("flinkuser")
+ .withPassword("flinkpw")
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+ this.testDatabase =
+ new UniqueDatabase(mySqlContainer, "inventory", TEST_USER,
TEST_PASSWORD);
+ }
+
+ @BeforeEach
+ void setup() throws Exception {
+ // Initialize static resources if needed
+ if (resourceFolder == null) {
+ resourceFolder =
+ Paths.get(
+ Objects.requireNonNull(
+
MySqlPipelineCompatibilityITCase.class
+ .getClassLoader()
+ .getResource("."))
+ .toURI())
+ .toFile();
+ tempFolder = Files.createTempDirectory(resourceFolder.toPath(),
"mysql-config");
+ }
+
+ env.setParallelism(4);
+ env.enableCheckpointing(200);
+ RestartStrategyUtils.configureNoRestartStrategy(env);
+
+ LOG.info("Starting container for MySQL {}...", version.getVersion());
+ Startables.deepStart(Stream.of(mySqlContainer)).join();
+ LOG.info("Container is started.");
+
+ testDatabase.createAndInitialize();
+ }
+
+ @AfterEach
+ void tearDown() {
+ try {
+ testDatabase.dropDatabase();
+ } catch (IllegalStateException e) {
+ LOG.warn("Failed to drop test database during teardown.", e);
+ }
+ if (mySqlContainer != null) {
+ LOG.info("Stopping container for MySQL {}...",
version.getVersion());
+ mySqlContainer.stop();
+ LOG.info("Container is stopped.");
+ }
+ }
+
+ @Test
+ void testSnapshotRead() throws Exception {
+ MySqlSourceConfigFactory configFactory =
+ new MySqlSourceConfigFactory()
+ .hostname(mySqlContainer.getHost())
+ .port(mySqlContainer.getDatabasePort())
+ .username(TEST_USER)
+ .password(TEST_PASSWORD)
+ .databaseList(testDatabase.getDatabaseName())
+ .tableList(testDatabase.getDatabaseName() +
".products")
+ .startupOptions(StartupOptions.initial())
+ .serverId(getServerId(env.getParallelism()))
+ .serverTimeZone("UTC");
+
+ FlinkSourceProvider sourceProvider =
+ (FlinkSourceProvider) new
MySqlDataSource(configFactory).getEventSourceProvider();
+ CloseableIterator<Event> events =
+ env.fromSource(
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ MySqlDataSourceFactory.IDENTIFIER,
+ new EventTypeInfo())
+ .executeAndCollect();
+
+ List<Event> snapshotEvents = fetchEvents(events, 9);
+
+ assertThat(snapshotEvents).hasSize(9);
+ assertThat(snapshotEvents.stream().filter(e -> e instanceof
DataChangeEvent)).hasSize(9);
+
+ events.close();
+ }
+
+ @Test
+ void testBinlogRead() throws Exception {
+ MySqlSourceConfigFactory configFactory =
+ new MySqlSourceConfigFactory()
+ .hostname(mySqlContainer.getHost())
+ .port(mySqlContainer.getDatabasePort())
+ .username(TEST_USER)
+ .password(TEST_PASSWORD)
+ .databaseList(testDatabase.getDatabaseName())
+ .tableList(testDatabase.getDatabaseName() +
".products")
+ .startupOptions(StartupOptions.initial())
+ .serverId(getServerId(env.getParallelism()))
+ .serverTimeZone("UTC");
+
+ FlinkSourceProvider sourceProvider =
+ (FlinkSourceProvider) new
MySqlDataSource(configFactory).getEventSourceProvider();
+ CloseableIterator<Event> events =
+ env.fromSource(
+ sourceProvider.getSource(),
+ WatermarkStrategy.noWatermarks(),
+ MySqlDataSourceFactory.IDENTIFIER,
+ new EventTypeInfo())
+ .executeAndCollect();
+
+ fetchEvents(events, 9);
+
+ try (Connection connection = testDatabase.getJdbcConnection();
+ Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "INSERT INTO `%s`.`products` VALUES
(default,'test_product','desc',1.0);",
+ testDatabase.getDatabaseName()));
+ }
+
+ List<Event> binlogEvents = fetchEvents(events, 1);
+ assertThat(binlogEvents).hasSize(1);
+ assertThat(binlogEvents.get(0)).isInstanceOf(DataChangeEvent.class);
+
+ events.close();
+ }
+
+ private String getServerId(int parallelism) {
+ int serverId = (int) (Math.random() * 100) + 5400;
+ return serverId + "-" + (serverId + parallelism);
+ }
+
+ private List<Event> fetchEvents(CloseableIterator<Event> iterator, int
count) {
+ List<Event> events = new ArrayList<>();
+ while (count > 0 && iterator.hasNext()) {
+ Event event = iterator.next();
+ if (event instanceof DataChangeEvent) {
+ events.add(event);
+ count--;
+ }
+ }
+ return events;
+ }
+
+ private String buildCustomMySqlConfig(MySqlVersion version) {
+ try {
+ if (resourceFolder == null) {
+ resourceFolder =
+ Paths.get(
+ Objects.requireNonNull(
+
MySqlPipelineCompatibilityITCase.class
+
.getClassLoader()
+
.getResource("."))
+ .toURI())
+ .toFile();
+ tempFolder =
Files.createTempDirectory(resourceFolder.toPath(), "mysql-config");
+ }
+ // Create version-specific directory to avoid conflicts
+ Path versionDir =
+ Files.createDirectories(
+ Paths.get(
+ tempFolder.toString(),
version.getVersion().replace(".", "_")));
+ Path cnf = Paths.get(versionDir.toString(), "my.cnf");
+ // Check if file already exists to avoid FileAlreadyExistsException
+ if (!Files.exists(cnf)) {
+ Files.createFile(cnf);
+ }
+ StringBuilder mysqlConfBuilder = new StringBuilder();
+ mysqlConfBuilder.append(
+ "[mysqld]\n"
+ + "binlog_format = row\n"
+ + "log_bin = mysql-bin\n"
+ + "server-id = 223344\n"
+ + "binlog_row_image = FULL\n"
+ + "gtid-mode = OFF\n");
+
+ if (version == MySqlVersion.V8_0 || version == MySqlVersion.V8_4) {
+ mysqlConfBuilder.append("secure_file_priv=/var/lib/mysql\n");
+ }
+
+ Files.write(
+ cnf,
+ Collections.singleton(mysqlConfBuilder.toString()),
+ StandardCharsets.UTF_8,
+ StandardOpenOption.TRUNCATE_EXISTING);
+ return
Paths.get(resourceFolder.getAbsolutePath()).relativize(cnf).toString();
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to create my.cnf file.", e);
+ }
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/DebeziumSourceFunction.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/DebeziumSourceFunction.java
index 6386fb400..21d822a06 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/DebeziumSourceFunction.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/DebeziumSourceFunction.java
@@ -448,9 +448,12 @@ public class DebeziumSourceFunction<T> extends
RichSourceFunction<T>
debeziumChangeFetcher.runFetchLoop();
} catch (Throwable t) {
if (t.getMessage() != null
- && t.getMessage()
- .contains(
- "A slave with the same
server_uuid/server_id as this slave has connected to the master")) {
+ && (t.getMessage()
+ .contains(
+ "A slave with the same
server_uuid/server_id as this slave has connected to the master")
+ || t.getMessage()
+ .contains(
+ "A replica with the same
server_uuid/server_id as this replica has connected to the source"))) {
throw new RuntimeException(
"The 'server-id' in the mysql cdc connector should be
globally unique, but conflicts happen now.\n"
+ "The server id conflict may happen in the
following situations: \n"
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java
index 801e27d6b..aa9699e4e 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java
@@ -40,22 +40,32 @@ import java.util.Properties;
/**
* Copied from Debezium project(1.9.8.final) to add custom jdbc properties in
the jdbc url. The new
- * parameter {@code jdbcProperties} in the constructor of {@link
MySqlConnectionConfiguration} will
- * be used to generate the jdbc url pattern, and may overwrite the default
value.
+ * parameter {@link
MySqlConnectionConfiguration#MySqlConnectionConfiguration(Configuration config,
+ * Properties jdbcProperties)} in the constructor of {@link
MySqlConnectionConfiguration} will be
+ * used to generate the jdbc url pattern, and may overwrite the default value.
*
- * <p>Line 75: Add field {@code urlPattern} in {@link MySqlConnection} and
remove old pattern.
+ * <p>Add field {@link MySqlConnection#urlPattern} in {@link MySqlConnection}
and remove old
+ * pattern.
*
- * <p>Line 92: Init {@code urlPattern} using the url pattern from {@link
+ * <p>Added MySQL 8.4+ compatible probing fields {@link
+ * MySqlConnection#MYSQL_CLASSIC_SHOW_BINARY_LOG_STATEMENT}, {@link
+ * MySqlConnection#MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT}, and {@link
+ * MySqlConnection#showBinaryLogStatement}.
+ *
+ * <p>Init {@link MySqlConnection#urlPattern} using the url pattern from {@link
* MySqlConnectionConfiguration}.
*
- * <p>Line 544: Generate the connection string by the new field {@code
urlPattern}.
+ * <p>Generate the connection string by the new field {@link
MySqlConnection#urlPattern}.
*
- * <p>Line 569 ~ 574: Add new constant and field {@code urlPattern} to {@link
+ * <p>Add new constant and field {@link MySqlConnection#urlPattern} to {@link
* MySqlConnectionConfiguration}.
*
- * <p>Line 625: Init new field {@code urlPattern} in {@link
MySqlConnectionConfiguration}.
+ * <p>Init new field {@link MySqlConnection#urlPattern} in {@link
MySqlConnectionConfiguration}.
+ *
+ * <p>Add utility methods helping to generate the url pattern and add default
values.
*
- * <p>Line 715 ~ 741: Add some methods helping to generate the url pattern and
add default values.
+ * <p>Added utility method {@link MySqlConnection#getShowBinaryLogStatement}
and {@link
+ * MySqlConnection#probeShowBinaryLogStatement} for MySQL 8.4 compatibility.
*/
public class MySqlConnection extends JdbcConnection {
@@ -74,6 +84,10 @@ public class MySqlConnection extends JdbcConnection {
private final String urlPattern;
+ private static final String MYSQL_CLASSIC_SHOW_BINARY_LOG_STATEMENT =
"SHOW MASTER STATUS";
+ private static final String MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT = "SHOW
BINARY LOG STATUS";
+ private final String showBinaryLogStatement;
+
/**
* Creates a new connection using the supplied configuration.
*
@@ -90,6 +104,7 @@ public class MySqlConnection extends JdbcConnection {
this.connectionConfig = connectionConfig;
this.mysqlFieldReader = fieldReader;
this.urlPattern = connectionConfig.getUrlPattern();
+ this.showBinaryLogStatement = probeShowBinaryLogStatement();
}
/**
@@ -275,7 +290,7 @@ public class MySqlConnection extends JdbcConnection {
public String knownGtidSet() {
try {
return queryAndMap(
- "SHOW MASTER STATUS",
+ showBinaryLogStatement,
rs -> {
if (rs.next() && rs.getMetaData().getColumnCount() >
4) {
return rs.getString(
@@ -769,4 +784,27 @@ public class MySqlConnection extends JdbcConnection {
public String quotedTableIdString(TableId tableId) {
return tableId.toQuotedString('`');
}
+
+ public String getShowBinaryLogStatement() {
+ return showBinaryLogStatement;
+ }
+
+ private String probeShowBinaryLogStatement() {
+ LOGGER.info("Probing binary log statement.");
+ try {
+ // Attempt to query
+ query(MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT, rs -> {});
+ LOGGER.info(
+ "Successfully found show binary log statement with `{}`.",
+ MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT);
+ return MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT;
+ } catch (SQLException e) {
+ LOGGER.info(
+ "Probing with {} failed, fallback to classic {}. Caused
by: {}",
+ MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT,
+ MYSQL_CLASSIC_SHOW_BINARY_LOG_STATEMENT,
+ e.getMessage());
+ return MYSQL_CLASSIC_SHOW_BINARY_LOG_STATEMENT;
+ }
+ }
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java
new file mode 100644
index 000000000..5a4fc26b1
--- /dev/null
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java
@@ -0,0 +1,717 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at
http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+package io.debezium.connector.mysql;
+
+import io.debezium.DebeziumException;
+import io.debezium.connector.SnapshotRecord;
+import io.debezium.connector.mysql.legacy.MySqlJdbcContext.DatabaseLocales;
+import io.debezium.data.Envelope;
+import io.debezium.function.BlockingConsumer;
+import io.debezium.pipeline.EventDispatcher;
+import io.debezium.relational.RelationalSnapshotChangeEventSource;
+import io.debezium.relational.RelationalTableFilters;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+import io.debezium.schema.SchemaChangeEvent;
+import io.debezium.util.Clock;
+import io.debezium.util.Collect;
+import io.debezium.util.Strings;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Copied from Debezium project(1.9.8.Final) to fix MySQL 8.x compatibility.
+ *
+ * <p>Line 338: Use probing methods to determine the statement.
+ */
+public class MySqlSnapshotChangeEventSource
+ extends RelationalSnapshotChangeEventSource<MySqlPartition,
MySqlOffsetContext> {
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(MySqlSnapshotChangeEventSource.class);
+
+ private final MySqlConnectorConfig connectorConfig;
+ private final MySqlConnection connection;
+ private long globalLockAcquiredAt = -1;
+ private long tableLockAcquiredAt = -1;
+ private final RelationalTableFilters filters;
+ private final MySqlSnapshotChangeEventSourceMetrics metrics;
+ private final MySqlDatabaseSchema databaseSchema;
+ private final List<SchemaChangeEvent> schemaEvents = new ArrayList<>();
+ private Set<TableId> delayedSchemaSnapshotTables = Collections.emptySet();
+ private final BlockingConsumer<Function<SourceRecord, SourceRecord>>
lastEventProcessor;
+
+ public MySqlSnapshotChangeEventSource(
+ MySqlConnectorConfig connectorConfig,
+ MySqlConnection connection,
+ MySqlDatabaseSchema schema,
+ EventDispatcher<MySqlPartition, TableId> dispatcher,
+ Clock clock,
+ MySqlSnapshotChangeEventSourceMetrics metrics,
+ BlockingConsumer<Function<SourceRecord, SourceRecord>>
lastEventProcessor) {
+ super(connectorConfig, connection, schema, dispatcher, clock, metrics);
+ this.connectorConfig = connectorConfig;
+ this.connection = connection;
+ this.filters = connectorConfig.getTableFilters();
+ this.metrics = metrics;
+ this.databaseSchema = schema;
+ this.lastEventProcessor = lastEventProcessor;
+ }
+
+ @Override
+ protected SnapshottingTask getSnapshottingTask(
+ MySqlPartition partition, MySqlOffsetContext previousOffset) {
+ boolean snapshotSchema = true;
+ boolean snapshotData = true;
+
+ // found a previous offset and the earlier snapshot has completed
+ if (previousOffset != null && !previousOffset.isSnapshotRunning()) {
+ LOGGER.info(
+ "A previous offset indicating a completed snapshot has
been found. Neither schema nor data will be snapshotted.");
+ snapshotSchema = databaseSchema.isStorageInitializationExecuted();
+ snapshotData = false;
+ } else {
+ LOGGER.info("No previous offset has been found");
+ if (connectorConfig.getSnapshotMode().includeData()) {
+ LOGGER.info(
+ "According to the connector configuration both schema
and data will be snapshotted");
+ } else {
+ LOGGER.info(
+ "According to the connector configuration only schema
will be snapshotted");
+ }
+ snapshotData = connectorConfig.getSnapshotMode().includeData();
+ snapshotSchema = connectorConfig.getSnapshotMode().includeSchema();
+ }
+
+ return new SnapshottingTask(snapshotSchema, snapshotData);
+ }
+
+ @Override
+ protected SnapshotContext<MySqlPartition, MySqlOffsetContext>
prepare(MySqlPartition partition)
+ throws Exception {
+ return new MySqlSnapshotContext(partition);
+ }
+
+ @Override
+ protected void connectionCreated(
+ RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext>
snapshotContext)
+ throws Exception {}
+
+ @Override
+ protected Set<TableId> getAllTableIds(
+ RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> ctx)
throws Exception {
+ // -------------------
+ // READ DATABASE NAMES
+ // -------------------
+ // Get the list of databases ...
+ LOGGER.info("Read list of available databases");
+ final List<String> databaseNames = new ArrayList<>();
+ connection.query(
+ "SHOW DATABASES",
+ rs -> {
+ while (rs.next()) {
+ databaseNames.add(rs.getString(1));
+ }
+ });
+ LOGGER.info("\t list of available databases is: {}", databaseNames);
+
+ // ----------------
+ // READ TABLE NAMES
+ // ----------------
+ // Get the list of table IDs for each database. We can't use a
prepared statement with
+ // MySQL, so we have to
+ // build the SQL statement each time. Although in other cases this
might lead to SQL
+ // injection, in our case
+ // we are reading the database names from the database and not taking
them from the user ...
+ LOGGER.info("Read list of available tables in each database");
+ final Set<TableId> tableIds = new HashSet<>();
+ final Set<String> readableDatabaseNames = new HashSet<>();
+ for (String dbName : databaseNames) {
+ try {
+ // MySQL sometimes considers some local files as databases
(see DBZ-164),
+ // so we will simply try each one and ignore the problematic
ones ...
+ connection.query(
+ "SHOW FULL TABLES IN " + quote(dbName) + " where
Table_Type = 'BASE TABLE'",
+ rs -> {
+ while (rs.next()) {
+ TableId id = new TableId(dbName, null,
rs.getString(1));
+ tableIds.add(id);
+ }
+ });
+ readableDatabaseNames.add(dbName);
+ } catch (SQLException e) {
+ // We were unable to execute the query or process the results,
so skip this ...
+ LOGGER.warn(
+ "\t skipping database '{}' due to error reading
tables: {}",
+ dbName,
+ e.getMessage());
+ }
+ }
+ final Set<String> includedDatabaseNames =
+ readableDatabaseNames.stream()
+ .filter(filters.databaseFilter())
+ .collect(Collectors.toSet());
+ LOGGER.info("\tsnapshot continuing with database(s): {}",
includedDatabaseNames);
+ return tableIds;
+ }
+
+ @Override
+ protected void lockTablesForSchemaSnapshot(
+ ChangeEventSourceContext sourceContext,
+ RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext>
snapshotContext)
+ throws SQLException, InterruptedException {
+ // Set the transaction isolation level to REPEATABLE READ. This is the
default, but the
+ // default can be changed
+ // which is why we explicitly set it here.
+ //
+ // With REPEATABLE READ, all SELECT queries within the scope of a
transaction (which we
+ // don't yet have) will read
+ // from the same MVCC snapshot. Thus each plain (non-locking) SELECT
statements within the
+ // same transaction are
+ // consistent also with respect to each other.
+ //
+ // See: https://dev.mysql.com/doc/refman/5.7/en/set-transaction.html
+ // See:
https://dev.mysql.com/doc/refman/5.7/en/innodb-transaction-isolation-levels.html
+ // See:
https://dev.mysql.com/doc/refman/5.7/en/innodb-consistent-read.html
+
connection.connection().setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
+ connection.executeWithoutCommitting(
+ "SET SESSION lock_wait_timeout="
+ + connectorConfig.snapshotLockTimeout().getSeconds());
+ try {
+ connection.executeWithoutCommitting(
+ "SET SESSION innodb_lock_wait_timeout="
+ +
connectorConfig.snapshotLockTimeout().getSeconds());
+ } catch (SQLException e) {
+ LOGGER.warn("Unable to set innodb_lock_wait_timeout", e);
+ }
+
+ // ------------------------------------
+ // LOCK TABLES
+ // ------------------------------------
+ // Obtain read lock on all tables. This statement closes all open
tables and locks all
+ // tables
+ // for all databases with a global read lock, and it prevents ALL
updates while we have this
+ // lock.
+ // It also ensures that everything we do while we have this lock will
be consistent.
+ if (connectorConfig.getSnapshotLockingMode().usesLocking()
+ && connectorConfig.useGlobalLock()) {
+ try {
+ globalLock();
+ metrics.globalLockAcquired();
+ } catch (SQLException e) {
+ LOGGER.info(
+ "Unable to flush and acquire global read lock, will
use table read locks after reading table names");
+ // Continue anyway, since RDS (among others) don't allow
setting a global lock
+ assert !isGloballyLocked();
+ }
+ if
(connectorConfig.getSnapshotLockingMode().flushResetsIsolationLevel()) {
+ // FLUSH TABLES resets TX and isolation level
+ connection.executeWithoutCommitting(
+ "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ");
+ }
+ }
+ }
+
+ @Override
+ protected void releaseSchemaSnapshotLocks(
+ RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext>
snapshotContext)
+ throws SQLException {
+ if (connectorConfig.getSnapshotLockingMode().usesMinimalLocking()) {
+ if (isGloballyLocked()) {
+ globalUnlock();
+ }
+ if (isTablesLocked()) {
+ // We could not acquire a global read lock and instead had to
obtain individual
+ // table-level read locks
+ // using 'FLUSH TABLE <tableName> WITH READ LOCK'. However, if
we were to do this,
+ // the 'UNLOCK TABLES'
+ // would implicitly commit our active transaction, and this
would break our
+ // consistent snapshot logic.
+ // Therefore, we cannot unlock the tables here!
+ // https://dev.mysql.com/doc/refman/5.7/en/flush.html
+ LOGGER.warn(
+ "Tables were locked explicitly, but to get a
consistent snapshot we cannot release the locks until we've read all tables.");
+ }
+ }
+ }
+
+ @Override
+ protected void releaseDataSnapshotLocks(
+ RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext>
snapshotContext)
+ throws Exception {
+ if (isGloballyLocked()) {
+ globalUnlock();
+ }
+ if (isTablesLocked()) {
+ tableUnlock();
+ if (!delayedSchemaSnapshotTables.isEmpty()) {
+ schemaEvents.clear();
+ createSchemaEventsForTables(snapshotContext,
delayedSchemaSnapshotTables, false);
+
+ for (Iterator<SchemaChangeEvent> i = schemaEvents.iterator();
i.hasNext(); ) {
+ final SchemaChangeEvent event = i.next();
+
+ if (databaseSchema.storeOnlyCapturedTables()
+ && event.getDatabase() != null
+ && event.getDatabase().length() != 0
+ && !connectorConfig
+ .getTableFilters()
+ .databaseFilter()
+ .test(event.getDatabase())) {
+ LOGGER.debug(
+ "Skipping schema event as it belongs to a
non-captured database: '{}'",
+ event);
+ continue;
+ }
+
+ LOGGER.debug("Processing schema event {}", event);
+
+ final TableId tableId =
+ event.getTables().isEmpty()
+ ? null
+ : event.getTables().iterator().next().id();
+ snapshotContext.offset.event(tableId,
getClock().currentTime());
+
+ if (!i.hasNext()) {
+ super.lastSnapshotRecord(snapshotContext);
+ }
+
+ dispatcher.dispatchSchemaChangeEvent(
+ snapshotContext.partition,
+ tableId,
+ (receiver) -> receiver.schemaChangeEvent(event));
+ }
+
+ // Make schema available for snapshot source
+ databaseSchema
+ .tableIds()
+ .forEach(
+ x ->
+ snapshotContext.tables.overwriteTable(
+ databaseSchema.tableFor(x)));
+ }
+ }
+ }
+
+ @Override
+ protected void determineSnapshotOffset(
+ RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> ctx,
+ MySqlOffsetContext previousOffset)
+ throws Exception {
+ if (!isGloballyLocked()
+ && !isTablesLocked()
+ && connectorConfig.getSnapshotLockingMode().usesLocking()) {
+ return;
+ }
+ if (previousOffset != null) {
+ ctx.offset = previousOffset;
+ tryStartingSnapshot(ctx);
+ return;
+ }
+ final MySqlOffsetContext offsetContext =
MySqlOffsetContext.initial(connectorConfig);
+ ctx.offset = offsetContext;
+ LOGGER.info("Read binlog position of MySQL primary server");
+ final String showMasterStmt = connection.getShowBinaryLogStatement();
+ connection.query(
+ showMasterStmt,
+ rs -> {
+ if (rs.next()) {
+ final String binlogFilename = rs.getString(1);
+ final long binlogPosition = rs.getLong(2);
+ offsetContext.setBinlogStartPoint(binlogFilename,
binlogPosition);
+ if (rs.getMetaData().getColumnCount() > 4) {
+ // This column exists only in MySQL 5.6.5 or later
...
+ final String gtidSet =
+ rs.getString(
+ 5); // GTID set, may be null,
blank, or contain a GTID
+ // set
+ offsetContext.setCompletedGtidSet(gtidSet);
+ LOGGER.info(
+ "\t using binlog '{}' at position '{}' and
gtid '{}'",
+ binlogFilename,
+ binlogPosition,
+ gtidSet);
+ } else {
+ LOGGER.info(
+ "\t using binlog '{}' at position '{}'",
+ binlogFilename,
+ binlogPosition);
+ }
+ } else {
+ throw new DebeziumException(
+ "Cannot read the binlog filename and position
via '"
+ + showMasterStmt
+ + "'. Make sure your server is
correctly configured");
+ }
+ });
+ tryStartingSnapshot(ctx);
+ }
+
+ private void addSchemaEvent(
+ RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext>
snapshotContext,
+ String database,
+ String ddl) {
+ schemaEvents.addAll(
+ databaseSchema.parseSnapshotDdl(
+ snapshotContext.partition,
+ ddl,
+ database,
+ snapshotContext.offset,
+ clock.currentTimeAsInstant()));
+ }
+
+ @Override
+ protected void readTableStructure(
+ ChangeEventSourceContext sourceContext,
+ RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext>
snapshotContext,
+ MySqlOffsetContext offsetContext)
+ throws Exception {
+ Set<TableId> capturedSchemaTables;
+ if (twoPhaseSchemaSnapshot()) {
+ // Capture schema of captured tables after they are locked
+ tableLock(snapshotContext);
+ determineSnapshotOffset(snapshotContext, offsetContext);
+ capturedSchemaTables = snapshotContext.capturedTables;
+ LOGGER.info(
+ "Table level locking is in place, the schema will be
capture in two phases, now capturing: {}",
+ capturedSchemaTables);
+ delayedSchemaSnapshotTables =
+ Collect.minus(
+ snapshotContext.capturedSchemaTables,
snapshotContext.capturedTables);
+ LOGGER.info("Tables for delayed schema capture: {}",
delayedSchemaSnapshotTables);
+ }
+ if (databaseSchema.storeOnlyCapturedTables()) {
+ capturedSchemaTables = snapshotContext.capturedTables;
+ LOGGER.info(
+ "Only captured tables schema should be captured,
capturing: {}",
+ capturedSchemaTables);
+ } else {
+ capturedSchemaTables = snapshotContext.capturedSchemaTables;
+ LOGGER.info(
+ "All eligible tables schema should be captured, capturing:
{}",
+ capturedSchemaTables);
+ }
+ final Map<String, List<TableId>> tablesToRead =
+ capturedSchemaTables.stream()
+ .collect(
+ Collectors.groupingBy(
+ TableId::catalog, LinkedHashMap::new,
Collectors.toList()));
+ final Set<String> databases = tablesToRead.keySet();
+
+ // Record default charset
+ addSchemaEvent(
+ snapshotContext,
+ "",
+
connection.setStatementFor(connection.readMySqlCharsetSystemVariables()));
+
+ for (TableId tableId : capturedSchemaTables) {
+ if (!sourceContext.isRunning()) {
+ throw new InterruptedException(
+ "Interrupted while emitting initial DROP TABLE
events");
+ }
+ addSchemaEvent(
+ snapshotContext, tableId.catalog(), "DROP TABLE IF EXISTS
" + quote(tableId));
+ }
+
+ final Map<String, DatabaseLocales> databaseCharsets =
connection.readDatabaseCollations();
+ for (String database : databases) {
+ if (!sourceContext.isRunning()) {
+ throw new InterruptedException(
+ "Interrupted while reading structure of schema " +
databases);
+ }
+
+ LOGGER.info("Reading structure of database '{}'", database);
+ addSchemaEvent(snapshotContext, database, "DROP DATABASE IF EXISTS
" + quote(database));
+ final StringBuilder createDatabaseDddl =
+ new StringBuilder("CREATE DATABASE " + quote(database));
+ final DatabaseLocales defaultDatabaseLocales =
databaseCharsets.get(database);
+ if (defaultDatabaseLocales != null) {
+ defaultDatabaseLocales.appendToDdlStatement(database,
createDatabaseDddl);
+ }
+ addSchemaEvent(snapshotContext, database,
createDatabaseDddl.toString());
+ addSchemaEvent(snapshotContext, database, "USE " +
quote(database));
+
+ createSchemaEventsForTables(snapshotContext,
tablesToRead.get(database), true);
+ }
+ }
+
+ void createSchemaEventsForTables(
+ RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext>
snapshotContext,
+ final Collection<TableId> tablesToRead,
+ final boolean firstPhase)
+ throws SQLException {
+ for (TableId tableId : tablesToRead) {
+ if (firstPhase && delayedSchemaSnapshotTables.contains(tableId)) {
+ continue;
+ }
+ connection.query(
+ "SHOW CREATE TABLE " + quote(tableId),
+ rs -> {
+ if (rs.next()) {
+ addSchemaEvent(snapshotContext, tableId.catalog(),
rs.getString(2));
+ }
+ });
+ }
+ }
+
+ private boolean twoPhaseSchemaSnapshot() {
+ return connectorConfig.getSnapshotLockingMode().usesLocking() &&
!isGloballyLocked();
+ }
+
+ @Override
+ protected SchemaChangeEvent getCreateTableEvent(
+ RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext>
snapshotContext,
+ Table table)
+ throws SQLException {
+ return SchemaChangeEvent.ofSnapshotCreate(
+ snapshotContext.partition,
+ snapshotContext.offset,
+ snapshotContext.catalogName,
+ table);
+ }
+
+ @Override
+ protected void complete(SnapshotContext<MySqlPartition,
MySqlOffsetContext> snapshotContext) {}
+
+ /**
+ * Generate a valid MySQL query string for the specified table and columns
+ *
+ * @param tableId the table to generate a query for
+ * @return a valid query string
+ */
+ @Override
+ protected Optional<String> getSnapshotSelect(
+ RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext>
snapshotContext,
+ TableId tableId,
+ List<String> columns) {
+ String snapshotSelectColumns =
columns.stream().collect(Collectors.joining(", "));
+
+ return Optional.of(
+ String.format(
+ "SELECT %s FROM `%s`.`%s`",
+ snapshotSelectColumns, tableId.catalog(),
tableId.table()));
+ }
+
+ private boolean isGloballyLocked() {
+ return globalLockAcquiredAt != -1;
+ }
+
+ private boolean isTablesLocked() {
+ return tableLockAcquiredAt != -1;
+ }
+
+ private void globalLock() throws SQLException {
+ LOGGER.info("Flush and obtain global read lock to prevent writes to
database");
+ connection.executeWithoutCommitting(
+ connectorConfig.getSnapshotLockingMode().getLockStatement());
+ globalLockAcquiredAt = clock.currentTimeInMillis();
+ }
+
+ private void globalUnlock() throws SQLException {
+ LOGGER.info("Releasing global read lock to enable MySQL writes");
+ connection.executeWithoutCommitting("UNLOCK TABLES");
+ long lockReleased = clock.currentTimeInMillis();
+ metrics.globalLockReleased();
+ LOGGER.info(
+ "Writes to MySQL tables prevented for a total of {}",
+ Strings.duration(lockReleased - globalLockAcquiredAt));
+ globalLockAcquiredAt = -1;
+ }
+
+ private void tableLock(
+ RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext>
snapshotContext)
+ throws SQLException {
+ // ------------------------------------
+ // LOCK TABLES and READ BINLOG POSITION
+ // ------------------------------------
+ // We were not able to acquire the global read lock, so instead we
have to obtain a read
+ // lock on each table.
+ // This requires different privileges than normal, and also means we
can't unlock the tables
+ // without
+ // implicitly committing our transaction ...
+ if (!connection.userHasPrivileges("LOCK TABLES")) {
+ // We don't have the right privileges
+ throw new DebeziumException(
+ "User does not have the 'LOCK TABLES' privilege required
to obtain a "
+ + "consistent snapshot by preventing concurrent
writes to tables.");
+ }
+ // We have the required privileges, so try to lock all of the tables
we're interested in ...
+ LOGGER.info(
+ "Flush and obtain read lock for {} tables (preventing writes)",
+ snapshotContext.capturedTables);
+ if (!snapshotContext.capturedTables.isEmpty()) {
+ final String tableList =
+ snapshotContext.capturedTables.stream()
+ .map(tid -> quote(tid))
+ .collect(Collectors.joining(","));
+ connection.executeWithoutCommitting("FLUSH TABLES " + tableList +
" WITH READ LOCK");
+ }
+ tableLockAcquiredAt = clock.currentTimeInMillis();
+ metrics.globalLockAcquired();
+ }
+
+ private void tableUnlock() throws SQLException {
+ LOGGER.info("Releasing table read lock to enable MySQL writes");
+ connection.executeWithoutCommitting("UNLOCK TABLES");
+ long lockReleased = clock.currentTimeInMillis();
+ metrics.globalLockReleased();
+ LOGGER.info(
+ "Writes to MySQL tables prevented for a total of {}",
+ Strings.duration(lockReleased - tableLockAcquiredAt));
+ tableLockAcquiredAt = -1;
+ }
+
+ private String quote(String dbOrTableName) {
+ return "`" + dbOrTableName + "`";
+ }
+
+ private String quote(TableId id) {
+ return quote(id.catalog()) + "." + quote(id.table());
+ }
+
+ @Override
+ protected OptionalLong rowCountForTable(TableId tableId) {
+ return connection.getEstimatedTableSize(tableId);
+ }
+
+ @Override
+ protected Statement readTableStatement(OptionalLong rowCount) throws
SQLException {
+ final long largeTableRowCount =
connectorConfig.rowCountForLargeTable();
+ if (!rowCount.isPresent()
+ || largeTableRowCount == 0
+ || rowCount.getAsLong() <= largeTableRowCount) {
+ return super.readTableStatement(rowCount);
+ }
+ return createStatementWithLargeResultSet();
+ }
+
+ /**
+ * Create a JDBC statement that can be used for large result sets.
+ *
+ * <p>By default, the MySQL Connector/J driver retrieves all rows for
ResultSets and stores them
+ * in memory. In most cases this is the most efficient way to operate and,
due to the design of
+ * the MySQL network protocol, is easier to implement. However, when
ResultSets that have a
+ * large number of rows or large values, the driver may not be able to
allocate heap space in
+ * the JVM and may result in an {@link OutOfMemoryError}. See <a
+ * href="https://issues.jboss.org/browse/DBZ-94">DBZ-94</a> for details.
+ *
+ * <p>This method handles such cases using the <a
+ *
href="https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html">recommended
+ * technique</a> for MySQL by creating the JDBC {@link Statement} with
{@link
+ * ResultSet#TYPE_FORWARD_ONLY forward-only} cursor and {@link
ResultSet#CONCUR_READ_ONLY
+ * read-only concurrency} flags, and with a {@link Integer#MIN_VALUE
minimum value} {@link
+ * Statement#setFetchSize(int) fetch size hint}.
+ *
+ * @return the statement; never null
+ * @throws SQLException if there is a problem creating the statement
+ */
+ private Statement createStatementWithLargeResultSet() throws SQLException {
+ int fetchSize = connectorConfig.getSnapshotFetchSize();
+ Statement stmt =
+ connection
+ .connection()
+ .createStatement(ResultSet.TYPE_FORWARD_ONLY,
ResultSet.CONCUR_READ_ONLY);
+ stmt.setFetchSize(fetchSize);
+ return stmt;
+ }
+
+ /** Mutable context which is populated in the course of snapshotting. */
+ private static class MySqlSnapshotContext
+ extends RelationalSnapshotContext<MySqlPartition,
MySqlOffsetContext> {
+
+ public MySqlSnapshotContext(MySqlPartition partition) throws
SQLException {
+ super(partition, "");
+ }
+ }
+
+ @Override
+ protected void createSchemaChangeEventsForTables(
+ ChangeEventSourceContext sourceContext,
+ RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext>
snapshotContext,
+ SnapshottingTask snapshottingTask)
+ throws Exception {
+ tryStartingSnapshot(snapshotContext);
+
+ for (Iterator<SchemaChangeEvent> i = schemaEvents.iterator();
i.hasNext(); ) {
+ final SchemaChangeEvent event = i.next();
+ if (!sourceContext.isRunning()) {
+ throw new InterruptedException("Interrupted while processing
event " + event);
+ }
+
+ if (databaseSchema.skipSchemaChangeEvent(event)) {
+ continue;
+ }
+
+ LOGGER.debug("Processing schema event {}", event);
+
+ final TableId tableId =
+ event.getTables().isEmpty() ? null :
event.getTables().iterator().next().id();
+ snapshotContext.offset.event(tableId, getClock().currentTime());
+
+ // If data are not snapshotted then the last schema change must
set last snapshot flag
+ if (!snapshottingTask.snapshotData() && !i.hasNext()) {
+ lastSnapshotRecord(snapshotContext);
+ }
+ dispatcher.dispatchSchemaChangeEvent(
+ snapshotContext.partition,
+ tableId,
+ (receiver) -> receiver.schemaChangeEvent(event));
+ }
+
+ // Make schema available for snapshot source
+ databaseSchema
+ .tableIds()
+ .forEach(x ->
snapshotContext.tables.overwriteTable(databaseSchema.tableFor(x)));
+ }
+
+ @Override
+ protected void lastSnapshotRecord(
+ RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext>
snapshotContext) {
+ if (delayedSchemaSnapshotTables.isEmpty()) {
+ super.lastSnapshotRecord(snapshotContext);
+ }
+ }
+
+ @Override
+ protected void postSnapshot() throws InterruptedException {
+ // We cannot be sure that the last event as the last one
+ // - last table could be empty
+ // - data snapshot was not executed
+ // - the last table schema snaphsotted is not monitored and storing of
monitored is disabled
+ lastEventProcessor.accept(
+ record -> {
+ record.sourceOffset().remove(SourceInfo.SNAPSHOT_KEY);
+ ((Struct) record.value())
+ .getStruct(Envelope.FieldName.SOURCE)
+ .put(
+ SourceInfo.SNAPSHOT_KEY,
+
SnapshotRecord.LAST.toString().toLowerCase());
+ return record;
+ });
+ super.postSnapshot();
+ }
+}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/util/ErrorMessageUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/util/ErrorMessageUtils.java
index 2f7b4feca..47efa694f 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/util/ErrorMessageUtils.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/util/ErrorMessageUtils.java
@@ -23,7 +23,7 @@ import java.util.regex.Pattern;
public class ErrorMessageUtils {
private static final Pattern SERVER_ID_CONFLICT =
Pattern.compile(
- ".*A slave with the same server_uuid/server_id as this
slave has connected to the master.*");
+ ".*A (slave|replica) with the same server_uuid/server_id
as this (slave|replica) has connected to the (master|source).*");
private static final Pattern MISSING_BINLOG_POSITION_WHEN_BINLOG_EXPIRE =
Pattern.compile(
".*The connector is trying to read binlog.*but this is no
longer available on the server.*");
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java
index 4e512a81c..f567dde73 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java
@@ -117,11 +117,11 @@ public class DebeziumUtils {
}
/** Fetch current binlog offsets in MySql Server. */
- public static BinlogOffset currentBinlogOffset(JdbcConnection jdbc) {
- final String showMasterStmt = "SHOW MASTER STATUS";
+ public static BinlogOffset currentBinlogOffset(MySqlConnection jdbc) {
+ String showBinaryLogStatement = jdbc.getShowBinaryLogStatement();
try {
return jdbc.queryAndMap(
- showMasterStmt,
+ showBinaryLogStatement,
rs -> {
if (rs.next()) {
final String binlogFilename = rs.getString(1);
@@ -135,14 +135,14 @@ public class DebeziumUtils {
} else {
throw new FlinkRuntimeException(
"Cannot read the binlog filename and
position via '"
- + showMasterStmt
+ + showBinaryLogStatement
+ "'. Make sure your server is
correctly configured");
}
});
} catch (SQLException e) {
throw new FlinkRuntimeException(
"Cannot read the binlog filename and position via '"
- + showMasterStmt
+ + showBinaryLogStatement
+ "'. Make sure your server is correctly
configured",
e);
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlCompatibilityITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlCompatibilityITCase.java
index 210913708..72aaa0a2f 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlCompatibilityITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlCompatibilityITCase.java
@@ -28,8 +28,11 @@ import
org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedClass;
+import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -56,85 +59,82 @@ import static
org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase.a
import static
org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase.assertEqualsInOrder;
/** Integration tests to check mysql-cdc works well with different MySQL
server version. */
+@ParameterizedClass
+@EnumSource(
+ value = MySqlVersion.class,
+ names = {"V5_7", "V8_0", "V8_4"})
class MySqlCompatibilityITCase {
private static final Logger LOG =
LoggerFactory.getLogger(MySqlCompatibilityITCase.class);
- private Path tempFolder;
+ private static Path tempFolder;
private static File resourceFolder;
+ private final MySqlVersion version;
+ private final MySqlContainer mySqlContainer;
+ private final UniqueDatabase testDatabase;
+
private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
private final StreamTableEnvironment tEnv =
StreamTableEnvironment.create(
env,
EnvironmentSettings.newInstance().inStreamingMode().build());
- @BeforeEach
- public void setup() throws Exception {
- resourceFolder =
- Paths.get(
- Objects.requireNonNull(
- MySqlValidatorTest.class
- .getClassLoader()
- .getResource("."))
- .toURI())
- .toFile();
- env.setParallelism(4);
- env.enableCheckpointing(200);
- tempFolder = Files.createTempDirectory(resourceFolder.toPath(),
"mysql-config");
- }
-
- @Test
- void testMySqlV56() throws Exception {
- testDifferentMySqlVersion(MySqlVersion.V5_6, false);
- }
-
- @Test
- void testMySqlV56WithGtidModeOn() throws Exception {
- testDifferentMySqlVersion(MySqlVersion.V5_6, true);
- }
-
- @Test
- void testMySqlV57() throws Exception {
- testDifferentMySqlVersion(MySqlVersion.V5_7, false);
- }
-
- @Test
- void testMySqlV57WithGtidModeOn() throws Exception {
- testDifferentMySqlVersion(MySqlVersion.V5_7, true);
- }
-
- @Test
- void testMySqlV8() throws Exception {
- testDifferentMySqlVersion(MySqlVersion.V8_0, false);
- }
-
- @Test
- void testMySqlV8WithGtidModeOn() throws Exception {
- testDifferentMySqlVersion(MySqlVersion.V8_0, true);
- }
-
- private void testDifferentMySqlVersion(MySqlVersion version, boolean
enableGtid)
- throws Exception {
- final MySqlContainer mySqlContainer =
+ MySqlCompatibilityITCase(MySqlVersion version) {
+ this.version = version;
+ this.mySqlContainer =
(MySqlContainer)
new MySqlContainer(version)
- .withConfigurationOverride(
- buildCustomMySqlConfig(version,
enableGtid))
+
.withConfigurationOverride(buildCustomMySqlConfig(version))
.withSetupSQL("docker/setup.sql")
.withDatabaseName("flink-test")
.withUsername("flinkuser")
.withPassword("flinkpw")
.withLogConsumer(new Slf4jLogConsumer(LOG));
+ this.testDatabase = new UniqueDatabase(mySqlContainer, "inventory",
"mysqluser", "mysqlpw");
+ }
+
+ @BeforeEach
+ void setup() throws Exception {
+ // Initialize static resources if needed
+ if (resourceFolder == null) {
+ resourceFolder =
+ Paths.get(
+ Objects.requireNonNull(
+ MySqlValidatorTest.class
+ .getClassLoader()
+ .getResource("."))
+ .toURI())
+ .toFile();
+ tempFolder = Files.createTempDirectory(resourceFolder.toPath(),
"mysql-config");
+ }
+
+ env.setParallelism(4);
+ env.enableCheckpointing(200);
- LOG.info("Starting containers...");
+ LOG.info("Starting container for MySQL {}...", version.getVersion());
Startables.deepStart(Stream.of(mySqlContainer)).join();
- LOG.info("Containers are started.");
+ LOG.info("Container is started.");
- UniqueDatabase testDatabase =
- new UniqueDatabase(mySqlContainer, "inventory", "mysqluser",
"mysqlpw");
testDatabase.createAndInitialize();
+ }
+
+ @AfterEach
+ void tearDown() {
+ try {
+ testDatabase.dropDatabase();
+ } catch (IllegalStateException e) {
+ LOG.warn("Failed to drop test database during teardown.", e);
+ }
+ if (mySqlContainer != null) {
+ LOG.info("Stopping container for MySQL {}...",
version.getVersion());
+ mySqlContainer.stop();
+ LOG.info("Container is stopped.");
+ }
+ }
+ @Test
+ void testMySqlVersionCompatibility() throws Exception {
String sourceDDL =
String.format(
"CREATE TABLE products ("
@@ -163,7 +163,6 @@ class MySqlCompatibilityITCase {
getServerId());
tEnv.executeSql(sourceDDL);
- // async submit job
TableResult result =
tEnv.executeSql("SELECT `id`, name, description, weight FROM
products");
@@ -217,7 +216,6 @@ class MySqlCompatibilityITCase {
assertEqualsInOrder(
Arrays.asList(expectedBinlog), fetchRows(iterator,
expectedBinlog.length));
result.getJobClient().get().cancel().get();
- mySqlContainer.stop();
}
private String getServerId() {
@@ -236,29 +234,39 @@ class MySqlCompatibilityITCase {
return rows;
}
- private String buildCustomMySqlConfig(MySqlVersion version, boolean
enableGtid) {
+ private String buildCustomMySqlConfig(MySqlVersion version) {
try {
- Path cnf = Files.createFile(Paths.get(tempFolder.toString(),
"my.cnf"));
+ if (resourceFolder == null) {
+ resourceFolder =
+ Paths.get(
+ Objects.requireNonNull(
+
MySqlValidatorTest.class
+
.getClassLoader()
+
.getResource("."))
+ .toURI())
+ .toFile();
+ tempFolder =
Files.createTempDirectory(resourceFolder.toPath(), "mysql-config");
+ }
+ // Create version-specific directory to avoid conflicts
+ Path versionDir =
+ Files.createDirectories(
+ Paths.get(
+ tempFolder.toString(),
version.getVersion().replace(".", "_")));
+ Path cnf = Paths.get(versionDir.toString(), "my.cnf");
+ // Check if file already exists to avoid FileAlreadyExistsException
+ if (!Files.exists(cnf)) {
+ Files.createFile(cnf);
+ }
StringBuilder mysqlConfBuilder = new StringBuilder();
mysqlConfBuilder.append(
"[mysqld]\n"
+ "binlog_format = row\n"
+ "log_bin = mysql-bin\n"
+ "server-id = 223344\n"
- + "binlog_row_image = FULL\n");
- if (!enableGtid) {
- mysqlConfBuilder.append("gtid-mode = OFF\n");
- } else {
- mysqlConfBuilder.append("gtid-mode = ON\n");
- mysqlConfBuilder.append("enforce-gtid-consistency = 1\n");
- // see
- //
https://dev.mysql.com/doc/refman/5.7/en/replication-options-gtids.html#sysvar_gtid_mode
- if (version == MySqlVersion.V5_6 || version ==
MySqlVersion.V5_7) {
- mysqlConfBuilder.append("log-slave-updates = ON\n");
- }
- }
+ + "binlog_row_image = FULL\n"
+ + "gtid-mode = OFF\n");
- if (version == MySqlVersion.V8_0) {
+ if (version == MySqlVersion.V8_0 || version == MySqlVersion.V8_4) {
mysqlConfBuilder.append("secure_file_priv=/var/lib/mysql\n");
}
@@ -266,7 +274,7 @@ class MySqlCompatibilityITCase {
cnf,
Collections.singleton(mysqlConfBuilder.toString()),
StandardCharsets.UTF_8,
- StandardOpenOption.APPEND);
+ StandardOpenOption.TRUNCATE_EXISTING);
return
Paths.get(resourceFolder.getAbsolutePath()).relativize(cnf).toString();
} catch (Exception e) {
throw new RuntimeException("Failed to create my.cnf file.", e);
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqlVersion.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqlVersion.java
index f972b3c0f..9e2e916fe 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqlVersion.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqlVersion.java
@@ -25,7 +25,8 @@ public enum MySqlVersion {
V8_0_17("8.0.17"),
V8_0_18("8.0.18"),
V8_0_19("8.0.19"),
- V8_0("8.0");
+ V8_0("8.0"),
+ V8_4("8.4");
private String version;
diff --git
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToIcebergE2eITCase.java
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToIcebergE2eITCase.java
index c63c54ad6..8b4d7768d 100644
---
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToIcebergE2eITCase.java
+++
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToIcebergE2eITCase.java
@@ -18,8 +18,6 @@
package org.apache.flink.cdc.pipeline.tests;
import org.apache.flink.cdc.common.test.utils.TestUtils;
-import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
-import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
import org.apache.flink.cdc.pipeline.tests.utils.TarballFetcher;
@@ -43,7 +41,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
import org.testcontainers.containers.output.ToStringConsumer;
import org.testcontainers.lifecycle.Startables;
@@ -76,20 +73,6 @@ public class MySqlToIcebergE2eITCase extends
PipelineTestEnvironment {
@TempDir public Path temporaryFolder;
- @org.testcontainers.junit.jupiter.Container
- public static final MySqlContainer MYSQL =
- (MySqlContainer)
- new MySqlContainer(
- MySqlVersion.V8_0) // v8 support both ARM
and AMD architectures
- .withConfigurationOverride("docker/mysql/my.cnf")
- .withSetupSQL("docker/mysql/setup.sql")
- .withDatabaseName("flink-test")
- .withUsername("flinkuser")
- .withPassword("flinkpw")
- .withNetwork(NETWORK)
- .withNetworkAliases("mysql")
- .withLogConsumer(new Slf4jLogConsumer(LOG));
-
protected final UniqueDatabase inventoryDatabase =
new UniqueDatabase(MYSQL, "iceberg_inventory", MYSQL_TEST_USER,
MYSQL_TEST_PASSWORD);
diff --git a/pom.xml b/pom.xml
index 18224068b..d25309bd3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,7 +89,7 @@ limitations under the License.
<commons-lang3.version>3.12.0</commons-lang3.version>
<json-path.version>2.7.0</json-path.version>
<jackson.version>2.13.2</jackson.version>
- <junit.version>5.10.1</junit.version>
+ <junit.version>5.13.1</junit.version>
<assertj.version>3.24.2</assertj.version>
<markBundledAsOptional>true</markBundledAsOptional>
<flatten-maven-plugin.version>1.5.0</flatten-maven-plugin.version>