This is an automated email from the ASF dual-hosted git repository.

kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new e2b8076ec [FLINK-38813][mysql] Support for emitting heartbeat events 
(#4193)
e2b8076ec is described below

commit e2b8076ec8fa5043a0f691944a6d1246482ef527
Author: Tejansh <[email protected]>
AuthorDate: Tue Dec 23 10:57:18 2025 +0000

    [FLINK-38813][mysql] Support for emitting heartbeat events (#4193)
---
 .../mysql/source/reader/MySqlPipelineRecordEmitter.java     |  3 ++-
 .../flink/cdc/connectors/mysql/source/MySqlSource.java      |  3 ++-
 .../cdc/connectors/mysql/source/MySqlSourceBuilder.java     | 13 +++++++++++++
 .../connectors/mysql/source/config/MySqlSourceConfig.java   |  7 +++++++
 .../mysql/source/config/MySqlSourceConfigFactory.java       |  8 ++++++++
 .../connectors/mysql/source/reader/MySqlRecordEmitter.java  |  8 +++++++-
 .../mysql/source/reader/MySqlRecordEmitterTest.java         |  1 +
 .../mysql/source/reader/MySqlSourceReaderTest.java          |  5 +++--
 8 files changed, 43 insertions(+), 5 deletions(-)

diff --git 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
index c946b9e29..bf4f15cee 100644
--- 
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
+++ 
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
@@ -92,7 +92,8 @@ public class MySqlPipelineRecordEmitter extends 
MySqlRecordEmitter<Event> {
         super(
                 debeziumDeserializationSchema,
                 sourceReaderMetrics,
-                sourceConfig.isIncludeSchemaChanges());
+                sourceConfig.isIncludeSchemaChanges(),
+                false); // Explicitly disable heartbeat events
         this.debeziumDeserializationSchema = debeziumDeserializationSchema;
         this.sourceConfig = sourceConfig;
         this.alreadySendCreateTableTables = new HashSet<>();
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
index 9cd40f015..d7f030574 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
@@ -132,7 +132,8 @@ public class MySqlSource<T>
                         new MySqlRecordEmitter<>(
                                 deserializationSchema,
                                 sourceReaderMetrics,
-                                sourceConfig.isIncludeSchemaChanges()));
+                                sourceConfig.isIncludeSchemaChanges(),
+                                sourceConfig.isIncludeHeartbeatEvents()));
     }
 
     MySqlSource(
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java
index caf316d1b..b37ed19e6 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java
@@ -237,6 +237,19 @@ public class MySqlSourceBuilder<T> {
         return this;
     }
 
+    /**
+     * The interval of heartbeat event and whether to emit heartbeat events.
+     *
+     * @param heartbeatInterval the interval of heartbeat event
+     * @param includeHeartbeatEvents whether to emit heartbeat events
+     */
+    public MySqlSourceBuilder<T> heartbeatInterval(
+            Duration heartbeatInterval, boolean includeHeartbeatEvents) {
+        this.configFactory.heartbeatInterval(heartbeatInterval);
+        this.configFactory.includeHeartbeatEvents(includeHeartbeatEvents);
+        return this;
+    }
+
     /**
      * Whether to skip backfill in snapshot reading phase.
      *
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
index 260a7cd2b..49570463b 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
@@ -62,6 +62,7 @@ public class MySqlSourceConfig implements Serializable {
     private final double distributionFactorUpper;
     private final double distributionFactorLower;
     private final boolean includeSchemaChanges;
+    private final boolean includeHeartbeatEvents;
     private final boolean scanNewlyAddedTableEnabled;
     private final boolean closeIdleReaders;
     private final Properties jdbcProperties;
@@ -99,6 +100,7 @@ public class MySqlSourceConfig implements Serializable {
             double distributionFactorUpper,
             double distributionFactorLower,
             boolean includeSchemaChanges,
+            boolean includeHeartbeatEvents,
             boolean scanNewlyAddedTableEnabled,
             boolean closeIdleReaders,
             Properties dbzProperties,
@@ -128,6 +130,7 @@ public class MySqlSourceConfig implements Serializable {
         this.distributionFactorUpper = distributionFactorUpper;
         this.distributionFactorLower = distributionFactorLower;
         this.includeSchemaChanges = includeSchemaChanges;
+        this.includeHeartbeatEvents = includeHeartbeatEvents;
         this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
         this.closeIdleReaders = closeIdleReaders;
         this.dbzProperties = checkNotNull(dbzProperties);
@@ -227,6 +230,10 @@ public class MySqlSourceConfig implements Serializable {
         return includeSchemaChanges;
     }
 
+    public boolean isIncludeHeartbeatEvents() {
+        return includeHeartbeatEvents;
+    }
+
     public boolean isScanNewlyAddedTableEnabled() {
         return scanNewlyAddedTableEnabled;
     }
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
index 427115ede..7010d732d 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
@@ -63,6 +63,7 @@ public class MySqlSourceConfigFactory implements Serializable 
{
     private double distributionFactorLower =
             
MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue();
     private boolean includeSchemaChanges = false;
+    private boolean includeHeartbeatEvents = false;
     private boolean scanNewlyAddedTableEnabled = false;
     private boolean closeIdleReaders = false;
     private Properties jdbcProperties;
@@ -235,6 +236,12 @@ public class MySqlSourceConfigFactory implements 
Serializable {
         return this;
     }
 
+    /** Whether the {@link MySqlSource} should output the heartbeat events or 
not. */
+    public MySqlSourceConfigFactory includeHeartbeatEvents(boolean 
includeHeartbeatEvents) {
+        this.includeHeartbeatEvents = includeHeartbeatEvents;
+        return this;
+    }
+
     /** Whether the {@link MySqlSource} should scan the newly added tables or 
not. */
     public MySqlSourceConfigFactory scanNewlyAddedTableEnabled(boolean 
scanNewlyAddedTableEnabled) {
         this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
@@ -412,6 +419,7 @@ public class MySqlSourceConfigFactory implements 
Serializable {
                 distributionFactorUpper,
                 distributionFactorLower,
                 includeSchemaChanges,
+                includeHeartbeatEvents,
                 scanNewlyAddedTableEnabled,
                 closeIdleReaders,
                 props,
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java
index 449e7f608..486e637ce 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java
@@ -53,15 +53,18 @@ public class MySqlRecordEmitter<T> implements 
RecordEmitter<SourceRecords, T, My
     private final DebeziumDeserializationSchema<T> 
debeziumDeserializationSchema;
     private final MySqlSourceReaderMetrics sourceReaderMetrics;
     private final boolean includeSchemaChanges;
+    private final boolean includeHeartbeatEvents;
     private final OutputCollector<T> outputCollector;
 
     public MySqlRecordEmitter(
             DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
             MySqlSourceReaderMetrics sourceReaderMetrics,
-            boolean includeSchemaChanges) {
+            boolean includeSchemaChanges,
+            boolean includeHeartbeatEvents) {
         this.debeziumDeserializationSchema = debeziumDeserializationSchema;
         this.sourceReaderMetrics = sourceReaderMetrics;
         this.includeSchemaChanges = includeSchemaChanges;
+        this.includeHeartbeatEvents = includeHeartbeatEvents;
         this.outputCollector = new OutputCollector<>();
     }
 
@@ -102,6 +105,9 @@ public class MySqlRecordEmitter<T> implements 
RecordEmitter<SourceRecords, T, My
             emitElement(element, output);
         } else if (RecordUtils.isHeartbeatEvent(element)) {
             updateStartingOffsetForSplit(splitState, element);
+            if (includeHeartbeatEvents) {
+                emitElement(element, output);
+            }
         } else {
             // unknown element
             LOG.info("Meet unknown element {}, just skip.", element);
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java
index 5553b1ba8..7a3764461 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java
@@ -102,6 +102,7 @@ class MySqlRecordEmitterTest {
                 },
                 new MySqlSourceReaderMetrics(
                         
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()),
+                false,
                 false);
     }
 
diff --git 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
index 7b8dfdcdb..08c337967 100644
--- 
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
+++ 
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
@@ -573,7 +573,8 @@ class MySqlSourceReaderTest extends MySqlSourceTestBase {
                         : new MySqlRecordEmitter<>(
                                 new ForwardDeserializeSchema(),
                                 new 
MySqlSourceReaderMetrics(readerContext.metricGroup()),
-                                configuration.isIncludeSchemaChanges());
+                                configuration.isIncludeSchemaChanges(),
+                                configuration.isIncludeHeartbeatEvents());
         final MySqlSourceReaderContext mySqlSourceReaderContext =
                 new MySqlSourceReaderContext(readerContext);
         return new MySqlSourceReader<>(
@@ -740,7 +741,7 @@ class MySqlSourceReaderTest extends MySqlSourceTestBase {
                 MySqlSourceReaderMetrics sourceReaderMetrics,
                 boolean includeSchemaChanges,
                 int limit) {
-            super(debeziumDeserializationSchema, sourceReaderMetrics, 
includeSchemaChanges);
+            super(debeziumDeserializationSchema, sourceReaderMetrics, 
includeSchemaChanges, false);
             this.debeziumDeserializationSchema = debeziumDeserializationSchema;
             this.sourceReaderMetrics = sourceReaderMetrics;
             this.includeSchemaChanges = includeSchemaChanges;

Reply via email to