This is an automated email from the ASF dual-hosted git repository.

yux pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new aabf30622 [FLINK-36520] MySQL CDC Supports connecting to MySQL server 
8.4+ (#3666)
aabf30622 is described below

commit aabf3062279523b740b1a00bbb132aa1ac90983c
Author: yuxiqian <[email protected]>
AuthorDate: Fri Mar 20 19:28:23 2026 +0800

    [FLINK-36520] MySQL CDC Supports connecting to MySQL server 8.4+ (#3666)
---
 .../source/MySqlPipelineCompatibilityITCase.java   | 277 ++++++++
 .../flink/cdc/debezium/DebeziumSourceFunction.java |   9 +-
 .../debezium/connector/mysql/MySqlConnection.java  |  56 +-
 .../mysql/MySqlSnapshotChangeEventSource.java      | 717 +++++++++++++++++++++
 .../connector/mysql/util/ErrorMessageUtils.java    |   2 +-
 .../connectors/mysql/debezium/DebeziumUtils.java   |  10 +-
 .../mysql/table/MySqlCompatibilityITCase.java      | 154 ++---
 .../connectors/mysql/testutils/MySqlVersion.java   |   3 +-
 .../pipeline/tests/MySqlToIcebergE2eITCase.java    |  17 -
 pom.xml                                            |   2 +-
 10 files changed, 1137 insertions(+), 110 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineCompatibilityITCase.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineCompatibilityITCase.java
new file mode 100644
index 000000000..d0ad0d017
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlPipelineCompatibilityITCase.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.connectors.mysql.source;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.source.FlinkSourceProvider;
+import org.apache.flink.cdc.connectors.mysql.factory.MySqlDataSourceFactory;
+import 
org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfigFactory;
+import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
+import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
+import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
+import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.RestartStrategyUtils;
+import org.apache.flink.util.CloseableIterator;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedClass;
+import org.junit.jupiter.params.provider.EnumSource;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+
+import java.io.File;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+import java.sql.Connection;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Stream;
+
+import static 
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_PASSWORD;
+import static 
org.apache.flink.cdc.connectors.mysql.testutils.MySqSourceTestUtils.TEST_USER;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Integration tests to check MySQL pipeline connector works well with 
different MySQL server
+ * versions.
+ */
+@ParameterizedClass
+@EnumSource(
+        value = MySqlVersion.class,
+        names = {"V5_7", "V8_0", "V8_4"})
+class MySqlPipelineCompatibilityITCase {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(MySqlPipelineCompatibilityITCase.class);
+
+    private static Path tempFolder;
+    private static File resourceFolder;
+
+    private final MySqlVersion version;
+    private final MySqlContainer mySqlContainer;
+    private final UniqueDatabase testDatabase;
+
+    private final StreamExecutionEnvironment env =
+            StreamExecutionEnvironment.getExecutionEnvironment();
+
+    MySqlPipelineCompatibilityITCase(MySqlVersion version) {
+        this.version = version;
+        this.mySqlContainer =
+                (MySqlContainer)
+                        new MySqlContainer(version)
+                                
.withConfigurationOverride(buildCustomMySqlConfig(version))
+                                .withSetupSQL("docker/setup.sql")
+                                .withDatabaseName("flink-test")
+                                .withUsername("flinkuser")
+                                .withPassword("flinkpw")
+                                .withLogConsumer(new Slf4jLogConsumer(LOG));
+        this.testDatabase =
+                new UniqueDatabase(mySqlContainer, "inventory", TEST_USER, 
TEST_PASSWORD);
+    }
+
+    @BeforeEach
+    void setup() throws Exception {
+        // Initialize static resources if needed
+        if (resourceFolder == null) {
+            resourceFolder =
+                    Paths.get(
+                                    Objects.requireNonNull(
+                                                    
MySqlPipelineCompatibilityITCase.class
+                                                            .getClassLoader()
+                                                            .getResource("."))
+                                            .toURI())
+                            .toFile();
+            tempFolder = Files.createTempDirectory(resourceFolder.toPath(), 
"mysql-config");
+        }
+
+        env.setParallelism(4);
+        env.enableCheckpointing(200);
+        RestartStrategyUtils.configureNoRestartStrategy(env);
+
+        LOG.info("Starting container for MySQL {}...", version.getVersion());
+        Startables.deepStart(Stream.of(mySqlContainer)).join();
+        LOG.info("Container is started.");
+
+        testDatabase.createAndInitialize();
+    }
+
+    @AfterEach
+    void tearDown() {
+        try {
+            testDatabase.dropDatabase();
+        } catch (IllegalStateException e) {
+            LOG.warn("Failed to drop test database during teardown.", e);
+        }
+        if (mySqlContainer != null) {
+            LOG.info("Stopping container for MySQL {}...", 
version.getVersion());
+            mySqlContainer.stop();
+            LOG.info("Container is stopped.");
+        }
+    }
+
+    @Test
+    void testSnapshotRead() throws Exception {
+        MySqlSourceConfigFactory configFactory =
+                new MySqlSourceConfigFactory()
+                        .hostname(mySqlContainer.getHost())
+                        .port(mySqlContainer.getDatabasePort())
+                        .username(TEST_USER)
+                        .password(TEST_PASSWORD)
+                        .databaseList(testDatabase.getDatabaseName())
+                        .tableList(testDatabase.getDatabaseName() + 
".products")
+                        .startupOptions(StartupOptions.initial())
+                        .serverId(getServerId(env.getParallelism()))
+                        .serverTimeZone("UTC");
+
+        FlinkSourceProvider sourceProvider =
+                (FlinkSourceProvider) new 
MySqlDataSource(configFactory).getEventSourceProvider();
+        CloseableIterator<Event> events =
+                env.fromSource(
+                                sourceProvider.getSource(),
+                                WatermarkStrategy.noWatermarks(),
+                                MySqlDataSourceFactory.IDENTIFIER,
+                                new EventTypeInfo())
+                        .executeAndCollect();
+
+        List<Event> snapshotEvents = fetchEvents(events, 9);
+
+        assertThat(snapshotEvents).hasSize(9);
+        assertThat(snapshotEvents.stream().filter(e -> e instanceof 
DataChangeEvent)).hasSize(9);
+
+        events.close();
+    }
+
+    @Test
+    void testBinlogRead() throws Exception {
+        MySqlSourceConfigFactory configFactory =
+                new MySqlSourceConfigFactory()
+                        .hostname(mySqlContainer.getHost())
+                        .port(mySqlContainer.getDatabasePort())
+                        .username(TEST_USER)
+                        .password(TEST_PASSWORD)
+                        .databaseList(testDatabase.getDatabaseName())
+                        .tableList(testDatabase.getDatabaseName() + 
".products")
+                        .startupOptions(StartupOptions.initial())
+                        .serverId(getServerId(env.getParallelism()))
+                        .serverTimeZone("UTC");
+
+        FlinkSourceProvider sourceProvider =
+                (FlinkSourceProvider) new 
MySqlDataSource(configFactory).getEventSourceProvider();
+        CloseableIterator<Event> events =
+                env.fromSource(
+                                sourceProvider.getSource(),
+                                WatermarkStrategy.noWatermarks(),
+                                MySqlDataSourceFactory.IDENTIFIER,
+                                new EventTypeInfo())
+                        .executeAndCollect();
+
+        fetchEvents(events, 9);
+
+        try (Connection connection = testDatabase.getJdbcConnection();
+                Statement statement = connection.createStatement()) {
+            statement.execute(
+                    String.format(
+                            "INSERT INTO `%s`.`products` VALUES 
(default,'test_product','desc',1.0);",
+                            testDatabase.getDatabaseName()));
+        }
+
+        List<Event> binlogEvents = fetchEvents(events, 1);
+        assertThat(binlogEvents).hasSize(1);
+        assertThat(binlogEvents.get(0)).isInstanceOf(DataChangeEvent.class);
+
+        events.close();
+    }
+
+    private String getServerId(int parallelism) {
+        int serverId = (int) (Math.random() * 100) + 5400;
+        return serverId + "-" + (serverId + parallelism);
+    }
+
+    private List<Event> fetchEvents(CloseableIterator<Event> iterator, int 
count) {
+        List<Event> events = new ArrayList<>();
+        while (count > 0 && iterator.hasNext()) {
+            Event event = iterator.next();
+            if (event instanceof DataChangeEvent) {
+                events.add(event);
+                count--;
+            }
+        }
+        return events;
+    }
+
+    private String buildCustomMySqlConfig(MySqlVersion version) {
+        try {
+            if (resourceFolder == null) {
+                resourceFolder =
+                        Paths.get(
+                                        Objects.requireNonNull(
+                                                        
MySqlPipelineCompatibilityITCase.class
+                                                                
.getClassLoader()
+                                                                
.getResource("."))
+                                                .toURI())
+                                .toFile();
+                tempFolder = 
Files.createTempDirectory(resourceFolder.toPath(), "mysql-config");
+            }
+            // Create version-specific directory to avoid conflicts
+            Path versionDir =
+                    Files.createDirectories(
+                            Paths.get(
+                                    tempFolder.toString(), 
version.getVersion().replace(".", "_")));
+            Path cnf = Paths.get(versionDir.toString(), "my.cnf");
+            // Check if file already exists to avoid FileAlreadyExistsException
+            if (!Files.exists(cnf)) {
+                Files.createFile(cnf);
+            }
+            StringBuilder mysqlConfBuilder = new StringBuilder();
+            mysqlConfBuilder.append(
+                    "[mysqld]\n"
+                            + "binlog_format = row\n"
+                            + "log_bin = mysql-bin\n"
+                            + "server-id = 223344\n"
+                            + "binlog_row_image = FULL\n"
+                            + "gtid-mode = OFF\n");
+
+            if (version == MySqlVersion.V8_0 || version == MySqlVersion.V8_4) {
+                mysqlConfBuilder.append("secure_file_priv=/var/lib/mysql\n");
+            }
+
+            Files.write(
+                    cnf,
+                    Collections.singleton(mysqlConfBuilder.toString()),
+                    StandardCharsets.UTF_8,
+                    StandardOpenOption.TRUNCATE_EXISTING);
+            return 
Paths.get(resourceFolder.getAbsolutePath()).relativize(cnf).toString();
+        } catch (Exception e) {
+            throw new RuntimeException("Failed to create my.cnf file.", e);
+        }
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/DebeziumSourceFunction.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/DebeziumSourceFunction.java
index 6386fb400..21d822a06 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/DebeziumSourceFunction.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/DebeziumSourceFunction.java
@@ -448,9 +448,12 @@ public class DebeziumSourceFunction<T> extends 
RichSourceFunction<T>
             debeziumChangeFetcher.runFetchLoop();
         } catch (Throwable t) {
             if (t.getMessage() != null
-                    && t.getMessage()
-                            .contains(
-                                    "A slave with the same 
server_uuid/server_id as this slave has connected to the master")) {
+                    && (t.getMessage()
+                                    .contains(
+                                            "A slave with the same 
server_uuid/server_id as this slave has connected to the master")
+                            || t.getMessage()
+                                    .contains(
+                                            "A replica with the same 
server_uuid/server_id as this replica has connected to the source"))) {
                 throw new RuntimeException(
                         "The 'server-id' in the mysql cdc connector should be 
globally unique, but conflicts happen now.\n"
                                 + "The server id conflict may happen in the 
following situations: \n"
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java
index 801e27d6b..aa9699e4e 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlConnection.java
@@ -40,22 +40,32 @@ import java.util.Properties;
 
 /**
  * Copied from Debezium project(1.9.8.final) to add custom jdbc properties in 
the jdbc url. The new
- * parameter {@code jdbcProperties} in the constructor of {@link 
MySqlConnectionConfiguration} will
- * be used to generate the jdbc url pattern, and may overwrite the default 
value.
+ * parameter {@link 
MySqlConnectionConfiguration#MySqlConnectionConfiguration(Configuration config,
+ * Properties jdbcProperties)} in the constructor of {@link 
MySqlConnectionConfiguration} will be
+ * used to generate the jdbc url pattern, and may overwrite the default value.
  *
- * <p>Line 75: Add field {@code urlPattern} in {@link MySqlConnection} and 
remove old pattern.
+ * <p>Add field {@link MySqlConnection#urlPattern} in {@link MySqlConnection} 
and remove old
+ * pattern.
  *
- * <p>Line 92: Init {@code urlPattern} using the url pattern from {@link
+ * <p>Added MySQL 8.4+ compatible probing fields {@link
+ * MySqlConnection#MYSQL_CLASSIC_SHOW_BINARY_LOG_STATEMENT}, {@link
+ * MySqlConnection#MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT}, and {@link
+ * MySqlConnection#showBinaryLogStatement}.
+ *
+ * <p>Init {@link MySqlConnection#urlPattern} using the url pattern from {@link
  * MySqlConnectionConfiguration}.
  *
- * <p>Line 544: Generate the connection string by the new field {@code 
urlPattern}.
+ * <p>Generate the connection string by the new field {@link 
MySqlConnection#urlPattern}.
  *
- * <p>Line 569 ~ 574: Add new constant and field {@code urlPattern} to {@link
+ * <p>Add new constant and field {@link MySqlConnection#urlPattern} to {@link
  * MySqlConnectionConfiguration}.
  *
- * <p>Line 625: Init new field {@code urlPattern} in {@link 
MySqlConnectionConfiguration}.
+ * <p>Init new field {@link MySqlConnection#urlPattern} in {@link 
MySqlConnectionConfiguration}.
+ *
+ * <p>Add utility methods helping to generate the url pattern and add default 
values.
  *
- * <p>Line 715 ~ 741: Add some methods helping to generate the url pattern and 
add default values.
+ * <p>Added utility method {@link MySqlConnection#getShowBinaryLogStatement} 
and {@link
+ * MySqlConnection#probeShowBinaryLogStatement} for MySQL 8.4 compatibility.
  */
 public class MySqlConnection extends JdbcConnection {
 
@@ -74,6 +84,10 @@ public class MySqlConnection extends JdbcConnection {
 
     private final String urlPattern;
 
+    private static final String MYSQL_CLASSIC_SHOW_BINARY_LOG_STATEMENT = 
"SHOW MASTER STATUS";
+    private static final String MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT = "SHOW 
BINARY LOG STATUS";
+    private final String showBinaryLogStatement;
+
     /**
      * Creates a new connection using the supplied configuration.
      *
@@ -90,6 +104,7 @@ public class MySqlConnection extends JdbcConnection {
         this.connectionConfig = connectionConfig;
         this.mysqlFieldReader = fieldReader;
         this.urlPattern = connectionConfig.getUrlPattern();
+        this.showBinaryLogStatement = probeShowBinaryLogStatement();
     }
 
     /**
@@ -275,7 +290,7 @@ public class MySqlConnection extends JdbcConnection {
     public String knownGtidSet() {
         try {
             return queryAndMap(
-                    "SHOW MASTER STATUS",
+                    showBinaryLogStatement,
                     rs -> {
                         if (rs.next() && rs.getMetaData().getColumnCount() > 
4) {
                             return rs.getString(
@@ -769,4 +784,27 @@ public class MySqlConnection extends JdbcConnection {
     public String quotedTableIdString(TableId tableId) {
         return tableId.toQuotedString('`');
     }
+
+    public String getShowBinaryLogStatement() {
+        return showBinaryLogStatement;
+    }
+
+    private String probeShowBinaryLogStatement() {
+        LOGGER.info("Probing binary log statement.");
+        try {
+            // Attempt to query
+            query(MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT, rs -> {});
+            LOGGER.info(
+                    "Successfully found show binary log statement with `{}`.",
+                    MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT);
+            return MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT;
+        } catch (SQLException e) {
+            LOGGER.info(
+                    "Probing with {} failed, fallback to classic {}. Caused 
by: {}",
+                    MYSQL_NEW_SHOW_BINARY_LOG_STATEMENT,
+                    MYSQL_CLASSIC_SHOW_BINARY_LOG_STATEMENT,
+                    e.getMessage());
+            return MYSQL_CLASSIC_SHOW_BINARY_LOG_STATEMENT;
+        }
+    }
 }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java
new file mode 100644
index 000000000..5a4fc26b1
--- /dev/null
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/MySqlSnapshotChangeEventSource.java
@@ -0,0 +1,717 @@
+/*
+ * Copyright Debezium Authors.
+ *
+ * Licensed under the Apache Software License version 2.0, available at 
http://www.apache.org/licenses/LICENSE-2.0
+ */
+
+package io.debezium.connector.mysql;
+
+import io.debezium.DebeziumException;
+import io.debezium.connector.SnapshotRecord;
+import io.debezium.connector.mysql.legacy.MySqlJdbcContext.DatabaseLocales;
+import io.debezium.data.Envelope;
+import io.debezium.function.BlockingConsumer;
+import io.debezium.pipeline.EventDispatcher;
+import io.debezium.relational.RelationalSnapshotChangeEventSource;
+import io.debezium.relational.RelationalTableFilters;
+import io.debezium.relational.Table;
+import io.debezium.relational.TableId;
+import io.debezium.schema.SchemaChangeEvent;
+import io.debezium.util.Clock;
+import io.debezium.util.Collect;
+import io.debezium.util.Strings;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+/**
+ * Copied from Debezium project(1.9.8.Final) to fix MySQL 8.x compatibility.
+ *
+ * <p>Line 338: Use probing methods to determine the statement.
+ */
+public class MySqlSnapshotChangeEventSource
+        extends RelationalSnapshotChangeEventSource<MySqlPartition, 
MySqlOffsetContext> {
+
+    private static final Logger LOGGER =
+            LoggerFactory.getLogger(MySqlSnapshotChangeEventSource.class);
+
+    private final MySqlConnectorConfig connectorConfig;
+    private final MySqlConnection connection;
+    private long globalLockAcquiredAt = -1;
+    private long tableLockAcquiredAt = -1;
+    private final RelationalTableFilters filters;
+    private final MySqlSnapshotChangeEventSourceMetrics metrics;
+    private final MySqlDatabaseSchema databaseSchema;
+    private final List<SchemaChangeEvent> schemaEvents = new ArrayList<>();
+    private Set<TableId> delayedSchemaSnapshotTables = Collections.emptySet();
+    private final BlockingConsumer<Function<SourceRecord, SourceRecord>> 
lastEventProcessor;
+
+    public MySqlSnapshotChangeEventSource(
+            MySqlConnectorConfig connectorConfig,
+            MySqlConnection connection,
+            MySqlDatabaseSchema schema,
+            EventDispatcher<MySqlPartition, TableId> dispatcher,
+            Clock clock,
+            MySqlSnapshotChangeEventSourceMetrics metrics,
+            BlockingConsumer<Function<SourceRecord, SourceRecord>> 
lastEventProcessor) {
+        super(connectorConfig, connection, schema, dispatcher, clock, metrics);
+        this.connectorConfig = connectorConfig;
+        this.connection = connection;
+        this.filters = connectorConfig.getTableFilters();
+        this.metrics = metrics;
+        this.databaseSchema = schema;
+        this.lastEventProcessor = lastEventProcessor;
+    }
+
+    @Override
+    protected SnapshottingTask getSnapshottingTask(
+            MySqlPartition partition, MySqlOffsetContext previousOffset) {
+        boolean snapshotSchema = true;
+        boolean snapshotData = true;
+
+        // found a previous offset and the earlier snapshot has completed
+        if (previousOffset != null && !previousOffset.isSnapshotRunning()) {
+            LOGGER.info(
+                    "A previous offset indicating a completed snapshot has 
been found. Neither schema nor data will be snapshotted.");
+            snapshotSchema = databaseSchema.isStorageInitializationExecuted();
+            snapshotData = false;
+        } else {
+            LOGGER.info("No previous offset has been found");
+            if (connectorConfig.getSnapshotMode().includeData()) {
+                LOGGER.info(
+                        "According to the connector configuration both schema 
and data will be snapshotted");
+            } else {
+                LOGGER.info(
+                        "According to the connector configuration only schema 
will be snapshotted");
+            }
+            snapshotData = connectorConfig.getSnapshotMode().includeData();
+            snapshotSchema = connectorConfig.getSnapshotMode().includeSchema();
+        }
+
+        return new SnapshottingTask(snapshotSchema, snapshotData);
+    }
+
+    @Override
+    protected SnapshotContext<MySqlPartition, MySqlOffsetContext> 
prepare(MySqlPartition partition)
+            throws Exception {
+        return new MySqlSnapshotContext(partition);
+    }
+
+    @Override
+    protected void connectionCreated(
+            RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> 
snapshotContext)
+            throws Exception {}
+
+    @Override
+    protected Set<TableId> getAllTableIds(
+            RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> ctx) 
throws Exception {
+        // -------------------
+        // READ DATABASE NAMES
+        // -------------------
+        // Get the list of databases ...
+        LOGGER.info("Read list of available databases");
+        final List<String> databaseNames = new ArrayList<>();
+        connection.query(
+                "SHOW DATABASES",
+                rs -> {
+                    while (rs.next()) {
+                        databaseNames.add(rs.getString(1));
+                    }
+                });
+        LOGGER.info("\t list of available databases is: {}", databaseNames);
+
+        // ----------------
+        // READ TABLE NAMES
+        // ----------------
+        // Get the list of table IDs for each database. We can't use a 
prepared statement with
+        // MySQL, so we have to
+        // build the SQL statement each time. Although in other cases this 
might lead to SQL
+        // injection, in our case
+        // we are reading the database names from the database and not taking 
them from the user ...
+        LOGGER.info("Read list of available tables in each database");
+        final Set<TableId> tableIds = new HashSet<>();
+        final Set<String> readableDatabaseNames = new HashSet<>();
+        for (String dbName : databaseNames) {
+            try {
+                // MySQL sometimes considers some local files as databases 
(see DBZ-164),
+                // so we will simply try each one and ignore the problematic 
ones ...
+                connection.query(
+                        "SHOW FULL TABLES IN " + quote(dbName) + " where 
Table_Type = 'BASE TABLE'",
+                        rs -> {
+                            while (rs.next()) {
+                                TableId id = new TableId(dbName, null, 
rs.getString(1));
+                                tableIds.add(id);
+                            }
+                        });
+                readableDatabaseNames.add(dbName);
+            } catch (SQLException e) {
+                // We were unable to execute the query or process the results, 
so skip this ...
+                LOGGER.warn(
+                        "\t skipping database '{}' due to error reading 
tables: {}",
+                        dbName,
+                        e.getMessage());
+            }
+        }
+        final Set<String> includedDatabaseNames =
+                readableDatabaseNames.stream()
+                        .filter(filters.databaseFilter())
+                        .collect(Collectors.toSet());
+        LOGGER.info("\tsnapshot continuing with database(s): {}", 
includedDatabaseNames);
+        return tableIds;
+    }
+
+    @Override
+    protected void lockTablesForSchemaSnapshot(
+            ChangeEventSourceContext sourceContext,
+            RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> 
snapshotContext)
+            throws SQLException, InterruptedException {
+        // Set the transaction isolation level to REPEATABLE READ. This is the 
default, but the
+        // default can be changed
+        // which is why we explicitly set it here.
+        //
+        // With REPEATABLE READ, all SELECT queries within the scope of a 
transaction (which we
+        // don't yet have) will read
+        // from the same MVCC snapshot. Thus each plain (non-locking) SELECT 
statements within the
+        // same transaction are
+        // consistent also with respect to each other.
+        //
+        // See: https://dev.mysql.com/doc/refman/5.7/en/set-transaction.html
+        // See: 
https://dev.mysql.com/doc/refman/5.7/en/innodb-transaction-isolation-levels.html
+        // See: 
https://dev.mysql.com/doc/refman/5.7/en/innodb-consistent-read.html
+        
connection.connection().setTransactionIsolation(Connection.TRANSACTION_REPEATABLE_READ);
+        connection.executeWithoutCommitting(
+                "SET SESSION lock_wait_timeout="
+                        + connectorConfig.snapshotLockTimeout().getSeconds());
+        try {
+            connection.executeWithoutCommitting(
+                    "SET SESSION innodb_lock_wait_timeout="
+                            + 
connectorConfig.snapshotLockTimeout().getSeconds());
+        } catch (SQLException e) {
+            LOGGER.warn("Unable to set innodb_lock_wait_timeout", e);
+        }
+
+        // ------------------------------------
+        // LOCK TABLES
+        // ------------------------------------
+        // Obtain read lock on all tables. This statement closes all open 
tables and locks all
+        // tables
+        // for all databases with a global read lock, and it prevents ALL 
updates while we have this
+        // lock.
+        // It also ensures that everything we do while we have this lock will 
be consistent.
+        if (connectorConfig.getSnapshotLockingMode().usesLocking()
+                && connectorConfig.useGlobalLock()) {
+            try {
+                globalLock();
+                metrics.globalLockAcquired();
+            } catch (SQLException e) {
+                LOGGER.info(
+                        "Unable to flush and acquire global read lock, will 
use table read locks after reading table names");
+                // Continue anyway, since RDS (among others) don't allow 
setting a global lock
+                assert !isGloballyLocked();
+            }
+            if 
(connectorConfig.getSnapshotLockingMode().flushResetsIsolationLevel()) {
+                // FLUSH TABLES resets TX and isolation level
+                connection.executeWithoutCommitting(
+                        "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ");
+            }
+        }
+    }
+
+    @Override
+    protected void releaseSchemaSnapshotLocks(
+            RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> 
snapshotContext)
+            throws SQLException {
+        if (connectorConfig.getSnapshotLockingMode().usesMinimalLocking()) {
+            if (isGloballyLocked()) {
+                globalUnlock();
+            }
+            if (isTablesLocked()) {
+                // We could not acquire a global read lock and instead had to 
obtain individual
+                // table-level read locks
+                // using 'FLUSH TABLE <tableName> WITH READ LOCK'. However, if 
we were to do this,
+                // the 'UNLOCK TABLES'
+                // would implicitly commit our active transaction, and this 
would break our
+                // consistent snapshot logic.
+                // Therefore, we cannot unlock the tables here!
+                // https://dev.mysql.com/doc/refman/5.7/en/flush.html
+                LOGGER.warn(
+                        "Tables were locked explicitly, but to get a 
consistent snapshot we cannot release the locks until we've read all tables.");
+            }
+        }
+    }
+
+    @Override
+    protected void releaseDataSnapshotLocks(
+            RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> 
snapshotContext)
+            throws Exception {
+        if (isGloballyLocked()) {
+            globalUnlock();
+        }
+        if (isTablesLocked()) {
+            tableUnlock();
+            if (!delayedSchemaSnapshotTables.isEmpty()) {
+                schemaEvents.clear();
+                createSchemaEventsForTables(snapshotContext, 
delayedSchemaSnapshotTables, false);
+
+                for (Iterator<SchemaChangeEvent> i = schemaEvents.iterator(); 
i.hasNext(); ) {
+                    final SchemaChangeEvent event = i.next();
+
+                    if (databaseSchema.storeOnlyCapturedTables()
+                            && event.getDatabase() != null
+                            && event.getDatabase().length() != 0
+                            && !connectorConfig
+                                    .getTableFilters()
+                                    .databaseFilter()
+                                    .test(event.getDatabase())) {
+                        LOGGER.debug(
+                                "Skipping schema event as it belongs to a 
non-captured database: '{}'",
+                                event);
+                        continue;
+                    }
+
+                    LOGGER.debug("Processing schema event {}", event);
+
+                    final TableId tableId =
+                            event.getTables().isEmpty()
+                                    ? null
+                                    : event.getTables().iterator().next().id();
+                    snapshotContext.offset.event(tableId, 
getClock().currentTime());
+
+                    if (!i.hasNext()) {
+                        super.lastSnapshotRecord(snapshotContext);
+                    }
+
+                    dispatcher.dispatchSchemaChangeEvent(
+                            snapshotContext.partition,
+                            tableId,
+                            (receiver) -> receiver.schemaChangeEvent(event));
+                }
+
+                // Make schema available for snapshot source
+                databaseSchema
+                        .tableIds()
+                        .forEach(
+                                x ->
+                                        snapshotContext.tables.overwriteTable(
+                                                databaseSchema.tableFor(x)));
+            }
+        }
+    }
+
+    @Override
+    protected void determineSnapshotOffset(
+            RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> ctx,
+            MySqlOffsetContext previousOffset)
+            throws Exception {
+        if (!isGloballyLocked()
+                && !isTablesLocked()
+                && connectorConfig.getSnapshotLockingMode().usesLocking()) {
+            return;
+        }
+        if (previousOffset != null) {
+            ctx.offset = previousOffset;
+            tryStartingSnapshot(ctx);
+            return;
+        }
+        final MySqlOffsetContext offsetContext = 
MySqlOffsetContext.initial(connectorConfig);
+        ctx.offset = offsetContext;
+        LOGGER.info("Read binlog position of MySQL primary server");
+        final String showMasterStmt = connection.getShowBinaryLogStatement();
+        connection.query(
+                showMasterStmt,
+                rs -> {
+                    if (rs.next()) {
+                        final String binlogFilename = rs.getString(1);
+                        final long binlogPosition = rs.getLong(2);
+                        offsetContext.setBinlogStartPoint(binlogFilename, 
binlogPosition);
+                        if (rs.getMetaData().getColumnCount() > 4) {
+                            // This column exists only in MySQL 5.6.5 or later 
...
+                            final String gtidSet =
+                                    rs.getString(
+                                            5); // GTID set, may be null, 
blank, or contain a GTID
+                            // set
+                            offsetContext.setCompletedGtidSet(gtidSet);
+                            LOGGER.info(
+                                    "\t using binlog '{}' at position '{}' and 
gtid '{}'",
+                                    binlogFilename,
+                                    binlogPosition,
+                                    gtidSet);
+                        } else {
+                            LOGGER.info(
+                                    "\t using binlog '{}' at position '{}'",
+                                    binlogFilename,
+                                    binlogPosition);
+                        }
+                    } else {
+                        throw new DebeziumException(
+                                "Cannot read the binlog filename and position 
via '"
+                                        + showMasterStmt
+                                        + "'. Make sure your server is 
correctly configured");
+                    }
+                });
+        tryStartingSnapshot(ctx);
+    }
+
+    private void addSchemaEvent(
+            RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> 
snapshotContext,
+            String database,
+            String ddl) {
+        schemaEvents.addAll(
+                databaseSchema.parseSnapshotDdl(
+                        snapshotContext.partition,
+                        ddl,
+                        database,
+                        snapshotContext.offset,
+                        clock.currentTimeAsInstant()));
+    }
+
+    @Override
+    protected void readTableStructure(
+            ChangeEventSourceContext sourceContext,
+            RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> 
snapshotContext,
+            MySqlOffsetContext offsetContext)
+            throws Exception {
+        Set<TableId> capturedSchemaTables;
+        if (twoPhaseSchemaSnapshot()) {
+            // Capture schema of captured tables after they are locked
+            tableLock(snapshotContext);
+            determineSnapshotOffset(snapshotContext, offsetContext);
+            capturedSchemaTables = snapshotContext.capturedTables;
+            LOGGER.info(
+                    "Table level locking is in place, the schema will be 
capture in two phases, now capturing: {}",
+                    capturedSchemaTables);
+            delayedSchemaSnapshotTables =
+                    Collect.minus(
+                            snapshotContext.capturedSchemaTables, 
snapshotContext.capturedTables);
+            LOGGER.info("Tables for delayed schema capture: {}", 
delayedSchemaSnapshotTables);
+        }
+        if (databaseSchema.storeOnlyCapturedTables()) {
+            capturedSchemaTables = snapshotContext.capturedTables;
+            LOGGER.info(
+                    "Only captured tables schema should be captured, 
capturing: {}",
+                    capturedSchemaTables);
+        } else {
+            capturedSchemaTables = snapshotContext.capturedSchemaTables;
+            LOGGER.info(
+                    "All eligible tables schema should be captured, capturing: 
{}",
+                    capturedSchemaTables);
+        }
+        final Map<String, List<TableId>> tablesToRead =
+                capturedSchemaTables.stream()
+                        .collect(
+                                Collectors.groupingBy(
+                                        TableId::catalog, LinkedHashMap::new, 
Collectors.toList()));
+        final Set<String> databases = tablesToRead.keySet();
+
+        // Record default charset
+        addSchemaEvent(
+                snapshotContext,
+                "",
+                
connection.setStatementFor(connection.readMySqlCharsetSystemVariables()));
+
+        for (TableId tableId : capturedSchemaTables) {
+            if (!sourceContext.isRunning()) {
+                throw new InterruptedException(
+                        "Interrupted while emitting initial DROP TABLE 
events");
+            }
+            addSchemaEvent(
+                    snapshotContext, tableId.catalog(), "DROP TABLE IF EXISTS 
" + quote(tableId));
+        }
+
+        final Map<String, DatabaseLocales> databaseCharsets = 
connection.readDatabaseCollations();
+        for (String database : databases) {
+            if (!sourceContext.isRunning()) {
+                throw new InterruptedException(
+                        "Interrupted while reading structure of schema " + 
databases);
+            }
+
+            LOGGER.info("Reading structure of database '{}'", database);
+            addSchemaEvent(snapshotContext, database, "DROP DATABASE IF EXISTS 
" + quote(database));
+            final StringBuilder createDatabaseDddl =
+                    new StringBuilder("CREATE DATABASE " + quote(database));
+            final DatabaseLocales defaultDatabaseLocales = 
databaseCharsets.get(database);
+            if (defaultDatabaseLocales != null) {
+                defaultDatabaseLocales.appendToDdlStatement(database, 
createDatabaseDddl);
+            }
+            addSchemaEvent(snapshotContext, database, 
createDatabaseDddl.toString());
+            addSchemaEvent(snapshotContext, database, "USE " + 
quote(database));
+
+            createSchemaEventsForTables(snapshotContext, 
tablesToRead.get(database), true);
+        }
+    }
+
+    void createSchemaEventsForTables(
+            RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> 
snapshotContext,
+            final Collection<TableId> tablesToRead,
+            final boolean firstPhase)
+            throws SQLException {
+        for (TableId tableId : tablesToRead) {
+            if (firstPhase && delayedSchemaSnapshotTables.contains(tableId)) {
+                continue;
+            }
+            connection.query(
+                    "SHOW CREATE TABLE " + quote(tableId),
+                    rs -> {
+                        if (rs.next()) {
+                            addSchemaEvent(snapshotContext, tableId.catalog(), 
rs.getString(2));
+                        }
+                    });
+        }
+    }
+
+    private boolean twoPhaseSchemaSnapshot() {
+        return connectorConfig.getSnapshotLockingMode().usesLocking() && 
!isGloballyLocked();
+    }
+
+    @Override
+    protected SchemaChangeEvent getCreateTableEvent(
+            RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> 
snapshotContext,
+            Table table)
+            throws SQLException {
+        return SchemaChangeEvent.ofSnapshotCreate(
+                snapshotContext.partition,
+                snapshotContext.offset,
+                snapshotContext.catalogName,
+                table);
+    }
+
+    @Override
+    protected void complete(SnapshotContext<MySqlPartition, 
MySqlOffsetContext> snapshotContext) {}
+
+    /**
+     * Generate a valid MySQL query string for the specified table and columns
+     *
+     * @param tableId the table to generate a query for
+     * @return a valid query string
+     */
+    @Override
+    protected Optional<String> getSnapshotSelect(
+            RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> 
snapshotContext,
+            TableId tableId,
+            List<String> columns) {
+        String snapshotSelectColumns = 
columns.stream().collect(Collectors.joining(", "));
+
+        return Optional.of(
+                String.format(
+                        "SELECT %s FROM `%s`.`%s`",
+                        snapshotSelectColumns, tableId.catalog(), 
tableId.table()));
+    }
+
+    private boolean isGloballyLocked() {
+        return globalLockAcquiredAt != -1;
+    }
+
+    private boolean isTablesLocked() {
+        return tableLockAcquiredAt != -1;
+    }
+
+    private void globalLock() throws SQLException {
+        LOGGER.info("Flush and obtain global read lock to prevent writes to 
database");
+        connection.executeWithoutCommitting(
+                connectorConfig.getSnapshotLockingMode().getLockStatement());
+        globalLockAcquiredAt = clock.currentTimeInMillis();
+    }
+
+    private void globalUnlock() throws SQLException {
+        LOGGER.info("Releasing global read lock to enable MySQL writes");
+        connection.executeWithoutCommitting("UNLOCK TABLES");
+        long lockReleased = clock.currentTimeInMillis();
+        metrics.globalLockReleased();
+        LOGGER.info(
+                "Writes to MySQL tables prevented for a total of {}",
+                Strings.duration(lockReleased - globalLockAcquiredAt));
+        globalLockAcquiredAt = -1;
+    }
+
+    private void tableLock(
+            RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> 
snapshotContext)
+            throws SQLException {
+        // ------------------------------------
+        // LOCK TABLES and READ BINLOG POSITION
+        // ------------------------------------
+        // We were not able to acquire the global read lock, so instead we 
have to obtain a read
+        // lock on each table.
+        // This requires different privileges than normal, and also means we 
can't unlock the tables
+        // without
+        // implicitly committing our transaction ...
+        if (!connection.userHasPrivileges("LOCK TABLES")) {
+            // We don't have the right privileges
+            throw new DebeziumException(
+                    "User does not have the 'LOCK TABLES' privilege required 
to obtain a "
+                            + "consistent snapshot by preventing concurrent 
writes to tables.");
+        }
+        // We have the required privileges, so try to lock all of the tables 
we're interested in ...
+        LOGGER.info(
+                "Flush and obtain read lock for {} tables (preventing writes)",
+                snapshotContext.capturedTables);
+        if (!snapshotContext.capturedTables.isEmpty()) {
+            final String tableList =
+                    snapshotContext.capturedTables.stream()
+                            .map(tid -> quote(tid))
+                            .collect(Collectors.joining(","));
+            connection.executeWithoutCommitting("FLUSH TABLES " + tableList + 
" WITH READ LOCK");
+        }
+        tableLockAcquiredAt = clock.currentTimeInMillis();
+        metrics.globalLockAcquired();
+    }
+
+    private void tableUnlock() throws SQLException {
+        LOGGER.info("Releasing table read lock to enable MySQL writes");
+        connection.executeWithoutCommitting("UNLOCK TABLES");
+        long lockReleased = clock.currentTimeInMillis();
+        metrics.globalLockReleased();
+        LOGGER.info(
+                "Writes to MySQL tables prevented for a total of {}",
+                Strings.duration(lockReleased - tableLockAcquiredAt));
+        tableLockAcquiredAt = -1;
+    }
+
+    private String quote(String dbOrTableName) {
+        return "`" + dbOrTableName + "`";
+    }
+
+    private String quote(TableId id) {
+        return quote(id.catalog()) + "." + quote(id.table());
+    }
+
+    @Override
+    protected OptionalLong rowCountForTable(TableId tableId) {
+        return connection.getEstimatedTableSize(tableId);
+    }
+
+    @Override
+    protected Statement readTableStatement(OptionalLong rowCount) throws 
SQLException {
+        final long largeTableRowCount = 
connectorConfig.rowCountForLargeTable();
+        if (!rowCount.isPresent()
+                || largeTableRowCount == 0
+                || rowCount.getAsLong() <= largeTableRowCount) {
+            return super.readTableStatement(rowCount);
+        }
+        return createStatementWithLargeResultSet();
+    }
+
+    /**
+     * Create a JDBC statement that can be used for large result sets.
+     *
+     * <p>By default, the MySQL Connector/J driver retrieves all rows for 
ResultSets and stores them
+     * in memory. In most cases this is the most efficient way to operate and, 
due to the design of
+     * the MySQL network protocol, is easier to implement. However, when 
ResultSets that have a
+     * large number of rows or large values, the driver may not be able to 
allocate heap space in
+     * the JVM and may result in an {@link OutOfMemoryError}. See <a
+     * href="https://issues.jboss.org/browse/DBZ-94";>DBZ-94</a> for details.
+     *
+     * <p>This method handles such cases using the <a
+     * 
href="https://dev.mysql.com/doc/connector-j/5.1/en/connector-j-reference-implementation-notes.html";>recommended
+     * technique</a> for MySQL by creating the JDBC {@link Statement} with 
{@link
+     * ResultSet#TYPE_FORWARD_ONLY forward-only} cursor and {@link 
ResultSet#CONCUR_READ_ONLY
+     * read-only concurrency} flags, and with a {@link Integer#MIN_VALUE 
minimum value} {@link
+     * Statement#setFetchSize(int) fetch size hint}.
+     *
+     * @return the statement; never null
+     * @throws SQLException if there is a problem creating the statement
+     */
+    private Statement createStatementWithLargeResultSet() throws SQLException {
+        int fetchSize = connectorConfig.getSnapshotFetchSize();
+        Statement stmt =
+                connection
+                        .connection()
+                        .createStatement(ResultSet.TYPE_FORWARD_ONLY, 
ResultSet.CONCUR_READ_ONLY);
+        stmt.setFetchSize(fetchSize);
+        return stmt;
+    }
+
+    /** Mutable context which is populated in the course of snapshotting. */
+    private static class MySqlSnapshotContext
+            extends RelationalSnapshotContext<MySqlPartition, 
MySqlOffsetContext> {
+
+        public MySqlSnapshotContext(MySqlPartition partition) throws 
SQLException {
+            super(partition, "");
+        }
+    }
+
+    @Override
+    protected void createSchemaChangeEventsForTables(
+            ChangeEventSourceContext sourceContext,
+            RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> 
snapshotContext,
+            SnapshottingTask snapshottingTask)
+            throws Exception {
+        tryStartingSnapshot(snapshotContext);
+
+        for (Iterator<SchemaChangeEvent> i = schemaEvents.iterator(); 
i.hasNext(); ) {
+            final SchemaChangeEvent event = i.next();
+            if (!sourceContext.isRunning()) {
+                throw new InterruptedException("Interrupted while processing 
event " + event);
+            }
+
+            if (databaseSchema.skipSchemaChangeEvent(event)) {
+                continue;
+            }
+
+            LOGGER.debug("Processing schema event {}", event);
+
+            final TableId tableId =
+                    event.getTables().isEmpty() ? null : 
event.getTables().iterator().next().id();
+            snapshotContext.offset.event(tableId, getClock().currentTime());
+
+            // If data are not snapshotted then the last schema change must 
set last snapshot flag
+            if (!snapshottingTask.snapshotData() && !i.hasNext()) {
+                lastSnapshotRecord(snapshotContext);
+            }
+            dispatcher.dispatchSchemaChangeEvent(
+                    snapshotContext.partition,
+                    tableId,
+                    (receiver) -> receiver.schemaChangeEvent(event));
+        }
+
+        // Make schema available for snapshot source
+        databaseSchema
+                .tableIds()
+                .forEach(x -> 
snapshotContext.tables.overwriteTable(databaseSchema.tableFor(x)));
+    }
+
+    @Override
+    protected void lastSnapshotRecord(
+            RelationalSnapshotContext<MySqlPartition, MySqlOffsetContext> 
snapshotContext) {
+        if (delayedSchemaSnapshotTables.isEmpty()) {
+            super.lastSnapshotRecord(snapshotContext);
+        }
+    }
+
+    @Override
+    protected void postSnapshot() throws InterruptedException {
+        // We cannot be sure that the last event as the last one
+        // - last table could be empty
+        // - data snapshot was not executed
+        // - the last table schema snaphsotted is not monitored and storing of 
monitored is disabled
+        lastEventProcessor.accept(
+                record -> {
+                    record.sourceOffset().remove(SourceInfo.SNAPSHOT_KEY);
+                    ((Struct) record.value())
+                            .getStruct(Envelope.FieldName.SOURCE)
+                            .put(
+                                    SourceInfo.SNAPSHOT_KEY,
+                                    
SnapshotRecord.LAST.toString().toLowerCase());
+                    return record;
+                });
+        super.postSnapshot();
+    }
+}
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/util/ErrorMessageUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/util/ErrorMessageUtils.java
index 2f7b4feca..47efa694f 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/util/ErrorMessageUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/io/debezium/connector/mysql/util/ErrorMessageUtils.java
@@ -23,7 +23,7 @@ import java.util.regex.Pattern;
 public class ErrorMessageUtils {
     private static final Pattern SERVER_ID_CONFLICT =
             Pattern.compile(
-                    ".*A slave with the same server_uuid/server_id as this 
slave has connected to the master.*");
+                    ".*A (slave|replica) with the same server_uuid/server_id 
as this (slave|replica) has connected to the (master|source).*");
     private static final Pattern MISSING_BINLOG_POSITION_WHEN_BINLOG_EXPIRE =
             Pattern.compile(
                     ".*The connector is trying to read binlog.*but this is no 
longer available on the server.*");
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java
index 4e512a81c..f567dde73 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java
@@ -117,11 +117,11 @@ public class DebeziumUtils {
     }
 
     /** Fetch current binlog offsets in MySql Server. */
-    public static BinlogOffset currentBinlogOffset(JdbcConnection jdbc) {
-        final String showMasterStmt = "SHOW MASTER STATUS";
+    public static BinlogOffset currentBinlogOffset(MySqlConnection jdbc) {
+        String showBinaryLogStatement = jdbc.getShowBinaryLogStatement();
         try {
             return jdbc.queryAndMap(
-                    showMasterStmt,
+                    showBinaryLogStatement,
                     rs -> {
                         if (rs.next()) {
                             final String binlogFilename = rs.getString(1);
@@ -135,14 +135,14 @@ public class DebeziumUtils {
                         } else {
                             throw new FlinkRuntimeException(
                                     "Cannot read the binlog filename and 
position via '"
-                                            + showMasterStmt
+                                            + showBinaryLogStatement
                                             + "'. Make sure your server is 
correctly configured");
                         }
                     });
         } catch (SQLException e) {
             throw new FlinkRuntimeException(
                     "Cannot read the binlog filename and position via '"
-                            + showMasterStmt
+                            + showBinaryLogStatement
                             + "'. Make sure your server is correctly 
configured",
                     e);
         }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlCompatibilityITCase.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlCompatibilityITCase.java
index 210913708..72aaa0a2f 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlCompatibilityITCase.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/table/MySqlCompatibilityITCase.java
@@ -28,8 +28,11 @@ import 
org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.types.Row;
 import org.apache.flink.util.CloseableIterator;
 
+import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedClass;
+import org.junit.jupiter.params.provider.EnumSource;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -56,85 +59,82 @@ import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase.a
 import static 
org.apache.flink.cdc.connectors.mysql.source.MySqlSourceTestBase.assertEqualsInOrder;
 
 /** Integration tests to check mysql-cdc works well with different MySQL 
server version. */
+@ParameterizedClass
+@EnumSource(
+        value = MySqlVersion.class,
+        names = {"V5_7", "V8_0", "V8_4"})
 class MySqlCompatibilityITCase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(MySqlCompatibilityITCase.class);
 
-    private Path tempFolder;
+    private static Path tempFolder;
     private static File resourceFolder;
 
+    private final MySqlVersion version;
+    private final MySqlContainer mySqlContainer;
+    private final UniqueDatabase testDatabase;
+
     private final StreamExecutionEnvironment env =
             StreamExecutionEnvironment.getExecutionEnvironment();
     private final StreamTableEnvironment tEnv =
             StreamTableEnvironment.create(
                     env, 
EnvironmentSettings.newInstance().inStreamingMode().build());
 
-    @BeforeEach
-    public void setup() throws Exception {
-        resourceFolder =
-                Paths.get(
-                                Objects.requireNonNull(
-                                                MySqlValidatorTest.class
-                                                        .getClassLoader()
-                                                        .getResource("."))
-                                        .toURI())
-                        .toFile();
-        env.setParallelism(4);
-        env.enableCheckpointing(200);
-        tempFolder = Files.createTempDirectory(resourceFolder.toPath(), 
"mysql-config");
-    }
-
-    @Test
-    void testMySqlV56() throws Exception {
-        testDifferentMySqlVersion(MySqlVersion.V5_6, false);
-    }
-
-    @Test
-    void testMySqlV56WithGtidModeOn() throws Exception {
-        testDifferentMySqlVersion(MySqlVersion.V5_6, true);
-    }
-
-    @Test
-    void testMySqlV57() throws Exception {
-        testDifferentMySqlVersion(MySqlVersion.V5_7, false);
-    }
-
-    @Test
-    void testMySqlV57WithGtidModeOn() throws Exception {
-        testDifferentMySqlVersion(MySqlVersion.V5_7, true);
-    }
-
-    @Test
-    void testMySqlV8() throws Exception {
-        testDifferentMySqlVersion(MySqlVersion.V8_0, false);
-    }
-
-    @Test
-    void testMySqlV8WithGtidModeOn() throws Exception {
-        testDifferentMySqlVersion(MySqlVersion.V8_0, true);
-    }
-
-    private void testDifferentMySqlVersion(MySqlVersion version, boolean 
enableGtid)
-            throws Exception {
-        final MySqlContainer mySqlContainer =
+    MySqlCompatibilityITCase(MySqlVersion version) {
+        this.version = version;
+        this.mySqlContainer =
                 (MySqlContainer)
                         new MySqlContainer(version)
-                                .withConfigurationOverride(
-                                        buildCustomMySqlConfig(version, 
enableGtid))
+                                
.withConfigurationOverride(buildCustomMySqlConfig(version))
                                 .withSetupSQL("docker/setup.sql")
                                 .withDatabaseName("flink-test")
                                 .withUsername("flinkuser")
                                 .withPassword("flinkpw")
                                 .withLogConsumer(new Slf4jLogConsumer(LOG));
+        this.testDatabase = new UniqueDatabase(mySqlContainer, "inventory", 
"mysqluser", "mysqlpw");
+    }
+
+    @BeforeEach
+    void setup() throws Exception {
+        // Initialize static resources if needed
+        if (resourceFolder == null) {
+            resourceFolder =
+                    Paths.get(
+                                    Objects.requireNonNull(
+                                                    MySqlValidatorTest.class
+                                                            .getClassLoader()
+                                                            .getResource("."))
+                                            .toURI())
+                            .toFile();
+            tempFolder = Files.createTempDirectory(resourceFolder.toPath(), 
"mysql-config");
+        }
+
+        env.setParallelism(4);
+        env.enableCheckpointing(200);
 
-        LOG.info("Starting containers...");
+        LOG.info("Starting container for MySQL {}...", version.getVersion());
         Startables.deepStart(Stream.of(mySqlContainer)).join();
-        LOG.info("Containers are started.");
+        LOG.info("Container is started.");
 
-        UniqueDatabase testDatabase =
-                new UniqueDatabase(mySqlContainer, "inventory", "mysqluser", 
"mysqlpw");
         testDatabase.createAndInitialize();
+    }
+
+    @AfterEach
+    void tearDown() {
+        try {
+            testDatabase.dropDatabase();
+        } catch (IllegalStateException e) {
+            LOG.warn("Failed to drop test database during teardown.", e);
+        }
+        if (mySqlContainer != null) {
+            LOG.info("Stopping container for MySQL {}...", 
version.getVersion());
+            mySqlContainer.stop();
+            LOG.info("Container is stopped.");
+        }
+    }
 
+    @Test
+    void testMySqlVersionCompatibility() throws Exception {
         String sourceDDL =
                 String.format(
                         "CREATE TABLE products ("
@@ -163,7 +163,6 @@ class MySqlCompatibilityITCase {
                         getServerId());
         tEnv.executeSql(sourceDDL);
 
-        // async submit job
         TableResult result =
                 tEnv.executeSql("SELECT `id`, name, description, weight FROM 
products");
 
@@ -217,7 +216,6 @@ class MySqlCompatibilityITCase {
         assertEqualsInOrder(
                 Arrays.asList(expectedBinlog), fetchRows(iterator, 
expectedBinlog.length));
         result.getJobClient().get().cancel().get();
-        mySqlContainer.stop();
     }
 
     private String getServerId() {
@@ -236,29 +234,39 @@ class MySqlCompatibilityITCase {
         return rows;
     }
 
-    private String buildCustomMySqlConfig(MySqlVersion version, boolean 
enableGtid) {
+    private String buildCustomMySqlConfig(MySqlVersion version) {
         try {
-            Path cnf = Files.createFile(Paths.get(tempFolder.toString(), 
"my.cnf"));
+            if (resourceFolder == null) {
+                resourceFolder =
+                        Paths.get(
+                                        Objects.requireNonNull(
+                                                        
MySqlValidatorTest.class
+                                                                
.getClassLoader()
+                                                                
.getResource("."))
+                                                .toURI())
+                                .toFile();
+                tempFolder = 
Files.createTempDirectory(resourceFolder.toPath(), "mysql-config");
+            }
+            // Create version-specific directory to avoid conflicts
+            Path versionDir =
+                    Files.createDirectories(
+                            Paths.get(
+                                    tempFolder.toString(), 
version.getVersion().replace(".", "_")));
+            Path cnf = Paths.get(versionDir.toString(), "my.cnf");
+            // Check if file already exists to avoid FileAlreadyExistsException
+            if (!Files.exists(cnf)) {
+                Files.createFile(cnf);
+            }
             StringBuilder mysqlConfBuilder = new StringBuilder();
             mysqlConfBuilder.append(
                     "[mysqld]\n"
                             + "binlog_format = row\n"
                             + "log_bin = mysql-bin\n"
                             + "server-id = 223344\n"
-                            + "binlog_row_image = FULL\n");
-            if (!enableGtid) {
-                mysqlConfBuilder.append("gtid-mode = OFF\n");
-            } else {
-                mysqlConfBuilder.append("gtid-mode = ON\n");
-                mysqlConfBuilder.append("enforce-gtid-consistency = 1\n");
-                // see
-                // 
https://dev.mysql.com/doc/refman/5.7/en/replication-options-gtids.html#sysvar_gtid_mode
-                if (version == MySqlVersion.V5_6 || version == 
MySqlVersion.V5_7) {
-                    mysqlConfBuilder.append("log-slave-updates = ON\n");
-                }
-            }
+                            + "binlog_row_image = FULL\n"
+                            + "gtid-mode = OFF\n");
 
-            if (version == MySqlVersion.V8_0) {
+            if (version == MySqlVersion.V8_0 || version == MySqlVersion.V8_4) {
                 mysqlConfBuilder.append("secure_file_priv=/var/lib/mysql\n");
             }
 
@@ -266,7 +274,7 @@ class MySqlCompatibilityITCase {
                     cnf,
                     Collections.singleton(mysqlConfBuilder.toString()),
                     StandardCharsets.UTF_8,
-                    StandardOpenOption.APPEND);
+                    StandardOpenOption.TRUNCATE_EXISTING);
             return 
Paths.get(resourceFolder.getAbsolutePath()).relativize(cnf).toString();
         } catch (Exception e) {
             throw new RuntimeException("Failed to create my.cnf file.", e);
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqlVersion.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqlVersion.java
index f972b3c0f..9e2e916fe 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqlVersion.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/testutils/MySqlVersion.java
@@ -25,7 +25,8 @@ public enum MySqlVersion {
     V8_0_17("8.0.17"),
     V8_0_18("8.0.18"),
     V8_0_19("8.0.19"),
-    V8_0("8.0");
+    V8_0("8.0"),
+    V8_4("8.4");
 
     private String version;
 
diff --git 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToIcebergE2eITCase.java
 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToIcebergE2eITCase.java
index c63c54ad6..8b4d7768d 100644
--- 
a/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToIcebergE2eITCase.java
+++ 
b/flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToIcebergE2eITCase.java
@@ -18,8 +18,6 @@
 package org.apache.flink.cdc.pipeline.tests;
 
 import org.apache.flink.cdc.common.test.utils.TestUtils;
-import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
-import org.apache.flink.cdc.connectors.mysql.testutils.MySqlVersion;
 import org.apache.flink.cdc.connectors.mysql.testutils.UniqueDatabase;
 import org.apache.flink.cdc.pipeline.tests.utils.PipelineTestEnvironment;
 import org.apache.flink.cdc.pipeline.tests.utils.TarballFetcher;
@@ -43,7 +41,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testcontainers.containers.BindMode;
 import org.testcontainers.containers.GenericContainer;
-import org.testcontainers.containers.output.Slf4jLogConsumer;
 import org.testcontainers.containers.output.ToStringConsumer;
 import org.testcontainers.lifecycle.Startables;
 
@@ -76,20 +73,6 @@ public class MySqlToIcebergE2eITCase extends 
PipelineTestEnvironment {
 
     @TempDir public Path temporaryFolder;
 
-    @org.testcontainers.junit.jupiter.Container
-    public static final MySqlContainer MYSQL =
-            (MySqlContainer)
-                    new MySqlContainer(
-                                    MySqlVersion.V8_0) // v8 support both ARM 
and AMD architectures
-                            .withConfigurationOverride("docker/mysql/my.cnf")
-                            .withSetupSQL("docker/mysql/setup.sql")
-                            .withDatabaseName("flink-test")
-                            .withUsername("flinkuser")
-                            .withPassword("flinkpw")
-                            .withNetwork(NETWORK)
-                            .withNetworkAliases("mysql")
-                            .withLogConsumer(new Slf4jLogConsumer(LOG));
-
     protected final UniqueDatabase inventoryDatabase =
             new UniqueDatabase(MYSQL, "iceberg_inventory", MYSQL_TEST_USER, 
MYSQL_TEST_PASSWORD);
 
diff --git a/pom.xml b/pom.xml
index 18224068b..d25309bd3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -89,7 +89,7 @@ limitations under the License.
         <commons-lang3.version>3.12.0</commons-lang3.version>
         <json-path.version>2.7.0</json-path.version>
         <jackson.version>2.13.2</jackson.version>
-        <junit.version>5.10.1</junit.version>
+        <junit.version>5.13.1</junit.version>
         <assertj.version>3.24.2</assertj.version>
         <markBundledAsOptional>true</markBundledAsOptional>
         <flatten-maven-plugin.version>1.5.0</flatten-maven-plugin.version>

Reply via email to