This is an automated email from the ASF dual-hosted git repository.
ruanhang1993 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new 986f37b30 [FLINK-35674][cdc-connector][mysql]Fix blocking caused by
searching for timestamp in binlog file (#3432)
986f37b30 is described below
commit 986f37b30750da0a84e2da8d0ff9554702dacc01
Author: Thorne <[email protected]>
AuthorDate: Tue Aug 6 15:49:50 2024 +0800
[FLINK-35674][cdc-connector][mysql]Fix blocking caused by searching for
timestamp in binlog file (#3432)
---
.../connectors/mysql/debezium/DebeziumUtils.java | 7 ++--
.../debezium/task/context/StatefulTaskContext.java | 4 ++-
.../mysql/source/offset/BinlogOffsetUtils.java | 6 ++--
.../debezium/reader/BinlogSplitReaderTest.java | 3 +-
.../mysql/source/SpecificStartingOffsetITCase.java | 38 ++++++++++++++++++----
5 files changed, 45 insertions(+), 13 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java
index 33dac6771..ac3b20c4f 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/DebeziumUtils.java
@@ -240,12 +240,15 @@ public class DebeziumUtils {
return variables;
}
- public static BinlogOffset findBinlogOffset(long targetMs, MySqlConnection
connection) {
+ public static BinlogOffset findBinlogOffset(
+ long targetMs, MySqlConnection connection, MySqlSourceConfig
mySqlSourceConfig) {
MySqlConnection.MySqlConnectionConfiguration config =
connection.connectionConfig();
BinaryLogClient client =
new BinaryLogClient(
config.hostname(), config.port(), config.username(),
config.password());
-
+ if (mySqlSourceConfig.getServerIdRange() != null) {
+
client.setServerId(mySqlSourceConfig.getServerIdRange().getStartServerId());
+ }
List<String> binlogFiles = new ArrayList<>();
JdbcConnection.ResultSetConsumer rsc =
rs -> {
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java
index 5fc342a4c..fbce9cdd7 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/debezium/task/context/StatefulTaskContext.java
@@ -193,7 +193,9 @@ public class StatefulTaskContext {
mySqlSplit.isSnapshotSplit()
? BinlogOffset.ofEarliest()
: initializeEffectiveOffset(
-
mySqlSplit.asBinlogSplit().getStartingOffset(), connection);
+ mySqlSplit.asBinlogSplit().getStartingOffset(),
+ connection,
+ sourceConfig);
LOG.info("Starting offset is initialized to {}", offset);
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffsetUtils.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffsetUtils.java
index ce82d6a1b..3c192eaa7 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffsetUtils.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/offset/BinlogOffsetUtils.java
@@ -19,6 +19,7 @@ package org.apache.flink.cdc.connectors.mysql.source.offset;
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
import
org.apache.flink.cdc.connectors.mysql.debezium.task.context.StatefulTaskContext;
+import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import io.debezium.connector.mysql.MySqlConnection;
@@ -45,13 +46,14 @@ public class BinlogOffsetUtils {
* </ul>
*/
public static BinlogOffset initializeEffectiveOffset(
- BinlogOffset offset, MySqlConnection connection) {
+ BinlogOffset offset, MySqlConnection connection, MySqlSourceConfig
mySqlSourceConfig) {
BinlogOffsetKind offsetKind = offset.getOffsetKind();
switch (offsetKind) {
case EARLIEST:
return BinlogOffset.ofBinlogFilePosition("", 0);
case TIMESTAMP:
- return DebeziumUtils.findBinlogOffset(offset.getTimestampSec()
* 1000, connection);
+ return DebeziumUtils.findBinlogOffset(
+ offset.getTimestampSec() * 1000, connection,
mySqlSourceConfig);
case LATEST:
return DebeziumUtils.currentBinlogOffset(connection);
default:
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
index 05e603c00..f9c070d96 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/debezium/reader/BinlogSplitReaderTest.java
@@ -1262,7 +1262,8 @@ public class BinlogSplitReaderTest extends
MySqlSourceTestBase {
? BinlogOffset.ofEarliest()
: initializeEffectiveOffset(
mySqlSplit.asBinlogSplit().getStartingOffset(),
- getConnection());
+ getConnection(),
+ getSourceConfig());
LOG.info("Starting offset is initialized to {}", offset);
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/SpecificStartingOffsetITCase.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/SpecificStartingOffsetITCase.java
index 9761a6f38..44d72a64b 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/SpecificStartingOffsetITCase.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/SpecificStartingOffsetITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.cdc.connectors.mysql.source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.cdc.connectors.mysql.debezium.DebeziumUtils;
+import org.apache.flink.cdc.connectors.mysql.source.config.MySqlSourceConfig;
import org.apache.flink.cdc.connectors.mysql.source.offset.BinlogOffset;
import org.apache.flink.cdc.connectors.mysql.table.StartupOptions;
import org.apache.flink.cdc.connectors.mysql.testutils.MySqlContainer;
@@ -263,9 +264,12 @@ public class SpecificStartingOffsetITCase {
// Purge binary log at first
purgeBinaryLogs();
+ long t0 = System.currentTimeMillis();
+ String servedId0 = "5400";
Assert.equals(
BinlogOffset.ofBinlogFilePosition("mysql-bin.000004", 0),
- DebeziumUtils.findBinlogOffset(System.currentTimeMillis(),
connection));
+ DebeziumUtils.findBinlogOffset(
+ t0, connection, getMySqlSourceConfig(t0, servedId0)));
executeStatements(
String.format(
@@ -273,6 +277,7 @@ public class SpecificStartingOffsetITCase {
customers.getTableId()));
Thread.sleep(1000);
long t1 = System.currentTimeMillis();
+ String servedId1 = "5401";
flushLogs();
executeStatements(
@@ -281,6 +286,7 @@ public class SpecificStartingOffsetITCase {
customers.getTableId()));
Thread.sleep(1000);
long t2 = System.currentTimeMillis();
+ String servedId2 = "5402";
flushLogs();
executeStatements(
@@ -289,6 +295,7 @@ public class SpecificStartingOffsetITCase {
customers.getTableId()));
Thread.sleep(1000);
long t3 = System.currentTimeMillis();
+ String servedId3 = "5403";
flushLogs();
executeStatements(
@@ -297,6 +304,7 @@ public class SpecificStartingOffsetITCase {
customers.getTableId()));
Thread.sleep(1000);
long t4 = System.currentTimeMillis();
+ String servedId4 = "5404";
flushLogs();
executeStatements(
@@ -305,28 +313,35 @@ public class SpecificStartingOffsetITCase {
customers.getTableId()));
Thread.sleep(1000);
long t5 = System.currentTimeMillis();
+ String servedId5 = "5405";
flushLogs();
Assert.equals(
BinlogOffset.ofBinlogFilePosition("mysql-bin.000005", 0),
- DebeziumUtils.findBinlogOffset(t1, connection));
+ DebeziumUtils.findBinlogOffset(
+ t1, connection, getMySqlSourceConfig(t1, servedId1)));
Assert.equals(
BinlogOffset.ofBinlogFilePosition("mysql-bin.000006", 0),
- DebeziumUtils.findBinlogOffset(t2, connection));
+ DebeziumUtils.findBinlogOffset(
+ t2, connection, getMySqlSourceConfig(t1, servedId2)));
Assert.equals(
BinlogOffset.ofBinlogFilePosition("mysql-bin.000007", 0),
- DebeziumUtils.findBinlogOffset(t3, connection));
+ DebeziumUtils.findBinlogOffset(
+ t3, connection, getMySqlSourceConfig(t1, servedId3)));
Assert.equals(
BinlogOffset.ofBinlogFilePosition("mysql-bin.000008", 0),
- DebeziumUtils.findBinlogOffset(t4, connection));
+ DebeziumUtils.findBinlogOffset(
+ t4, connection, getMySqlSourceConfig(t1, servedId4)));
Assert.equals(
BinlogOffset.ofBinlogFilePosition("mysql-bin.000009", 0),
- DebeziumUtils.findBinlogOffset(t5, connection));
+ DebeziumUtils.findBinlogOffset(
+ t5, connection, getMySqlSourceConfig(t1, servedId5)));
purgeBinaryLogs();
Assert.equals(
BinlogOffset.ofBinlogFilePosition("mysql-bin.000009", 0),
- DebeziumUtils.findBinlogOffset(t3, connection));
+ DebeziumUtils.findBinlogOffset(
+ t5, connection, getMySqlSourceConfig(t1, servedId5)));
}
@Test
@@ -440,6 +455,15 @@ public class SpecificStartingOffsetITCase {
return DebeziumUtils.createMySqlConnection(configuration, new
Properties());
}
+ private MySqlSourceConfig getMySqlSourceConfig(Long timestamp, String
serverId) {
+ return getSourceBuilder()
+ .startupOptions(StartupOptions.timestamp(timestamp))
+ .serverId(serverId)
+ .build()
+ .getConfigFactory()
+ .createConfig(0);
+ }
+
private void executeStatements(String... statements) throws Exception {
connection.execute(statements);
connection.commit();