This is an automated email from the ASF dual-hosted git repository.
kunni pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
The following commit(s) were added to refs/heads/master by this push:
new e2b8076ec [FLINK-38813][mysql] Support for emitting heartbeat events
(#4193)
e2b8076ec is described below
commit e2b8076ec8fa5043a0f691944a6d1246482ef527
Author: Tejansh <[email protected]>
AuthorDate: Tue Dec 23 10:57:18 2025 +0000
[FLINK-38813][mysql] Support for emitting heartbeat events (#4193)
---
.../mysql/source/reader/MySqlPipelineRecordEmitter.java | 3 ++-
.../flink/cdc/connectors/mysql/source/MySqlSource.java | 3 ++-
.../cdc/connectors/mysql/source/MySqlSourceBuilder.java | 13 +++++++++++++
.../connectors/mysql/source/config/MySqlSourceConfig.java | 7 +++++++
.../mysql/source/config/MySqlSourceConfigFactory.java | 8 ++++++++
.../connectors/mysql/source/reader/MySqlRecordEmitter.java | 8 +++++++-
.../mysql/source/reader/MySqlRecordEmitterTest.java | 1 +
.../mysql/source/reader/MySqlSourceReaderTest.java | 5 +++--
8 files changed, 43 insertions(+), 5 deletions(-)
diff --git
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
index c946b9e29..bf4f15cee 100644
---
a/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
+++
b/flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlPipelineRecordEmitter.java
@@ -92,7 +92,8 @@ public class MySqlPipelineRecordEmitter extends
MySqlRecordEmitter<Event> {
super(
debeziumDeserializationSchema,
sourceReaderMetrics,
- sourceConfig.isIncludeSchemaChanges());
+ sourceConfig.isIncludeSchemaChanges(),
+ false); // Explicitly disable heartbeat events
this.debeziumDeserializationSchema = debeziumDeserializationSchema;
this.sourceConfig = sourceConfig;
this.alreadySendCreateTableTables = new HashSet<>();
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
index 9cd40f015..d7f030574 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSource.java
@@ -132,7 +132,8 @@ public class MySqlSource<T>
new MySqlRecordEmitter<>(
deserializationSchema,
sourceReaderMetrics,
- sourceConfig.isIncludeSchemaChanges()));
+ sourceConfig.isIncludeSchemaChanges(),
+ sourceConfig.isIncludeHeartbeatEvents()));
}
MySqlSource(
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java
index caf316d1b..b37ed19e6 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlSourceBuilder.java
@@ -237,6 +237,19 @@ public class MySqlSourceBuilder<T> {
return this;
}
+ /**
+ * The interval of heartbeat event and whether to emit heartbeat events.
+ *
+ * @param heartbeatInterval the interval of heartbeat event
+ * @param includeHeartbeatEvents whether to emit heartbeat events
+ */
+ public MySqlSourceBuilder<T> heartbeatInterval(
+ Duration heartbeatInterval, boolean includeHeartbeatEvents) {
+ this.configFactory.heartbeatInterval(heartbeatInterval);
+ this.configFactory.includeHeartbeatEvents(includeHeartbeatEvents);
+ return this;
+ }
+
/**
* Whether to skip backfill in snapshot reading phase.
*
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
index 260a7cd2b..49570463b 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfig.java
@@ -62,6 +62,7 @@ public class MySqlSourceConfig implements Serializable {
private final double distributionFactorUpper;
private final double distributionFactorLower;
private final boolean includeSchemaChanges;
+ private final boolean includeHeartbeatEvents;
private final boolean scanNewlyAddedTableEnabled;
private final boolean closeIdleReaders;
private final Properties jdbcProperties;
@@ -99,6 +100,7 @@ public class MySqlSourceConfig implements Serializable {
double distributionFactorUpper,
double distributionFactorLower,
boolean includeSchemaChanges,
+ boolean includeHeartbeatEvents,
boolean scanNewlyAddedTableEnabled,
boolean closeIdleReaders,
Properties dbzProperties,
@@ -128,6 +130,7 @@ public class MySqlSourceConfig implements Serializable {
this.distributionFactorUpper = distributionFactorUpper;
this.distributionFactorLower = distributionFactorLower;
this.includeSchemaChanges = includeSchemaChanges;
+ this.includeHeartbeatEvents = includeHeartbeatEvents;
this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
this.closeIdleReaders = closeIdleReaders;
this.dbzProperties = checkNotNull(dbzProperties);
@@ -227,6 +230,10 @@ public class MySqlSourceConfig implements Serializable {
return includeSchemaChanges;
}
+ public boolean isIncludeHeartbeatEvents() {
+ return includeHeartbeatEvents;
+ }
+
public boolean isScanNewlyAddedTableEnabled() {
return scanNewlyAddedTableEnabled;
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
index 427115ede..7010d732d 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/config/MySqlSourceConfigFactory.java
@@ -63,6 +63,7 @@ public class MySqlSourceConfigFactory implements Serializable
{
private double distributionFactorLower =
MySqlSourceOptions.CHUNK_KEY_EVEN_DISTRIBUTION_FACTOR_LOWER_BOUND.defaultValue();
private boolean includeSchemaChanges = false;
+ private boolean includeHeartbeatEvents = false;
private boolean scanNewlyAddedTableEnabled = false;
private boolean closeIdleReaders = false;
private Properties jdbcProperties;
@@ -235,6 +236,12 @@ public class MySqlSourceConfigFactory implements
Serializable {
return this;
}
+ /** Whether the {@link MySqlSource} should output the heartbeat events or
not. */
+ public MySqlSourceConfigFactory includeHeartbeatEvents(boolean
includeHeartbeatEvents) {
+ this.includeHeartbeatEvents = includeHeartbeatEvents;
+ return this;
+ }
+
/** Whether the {@link MySqlSource} should scan the newly added tables or
not. */
public MySqlSourceConfigFactory scanNewlyAddedTableEnabled(boolean
scanNewlyAddedTableEnabled) {
this.scanNewlyAddedTableEnabled = scanNewlyAddedTableEnabled;
@@ -412,6 +419,7 @@ public class MySqlSourceConfigFactory implements
Serializable {
distributionFactorUpper,
distributionFactorLower,
includeSchemaChanges,
+ includeHeartbeatEvents,
scanNewlyAddedTableEnabled,
closeIdleReaders,
props,
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java
index 449e7f608..486e637ce 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java
@@ -53,15 +53,18 @@ public class MySqlRecordEmitter<T> implements
RecordEmitter<SourceRecords, T, My
private final DebeziumDeserializationSchema<T>
debeziumDeserializationSchema;
private final MySqlSourceReaderMetrics sourceReaderMetrics;
private final boolean includeSchemaChanges;
+ private final boolean includeHeartbeatEvents;
private final OutputCollector<T> outputCollector;
public MySqlRecordEmitter(
DebeziumDeserializationSchema<T> debeziumDeserializationSchema,
MySqlSourceReaderMetrics sourceReaderMetrics,
- boolean includeSchemaChanges) {
+ boolean includeSchemaChanges,
+ boolean includeHeartbeatEvents) {
this.debeziumDeserializationSchema = debeziumDeserializationSchema;
this.sourceReaderMetrics = sourceReaderMetrics;
this.includeSchemaChanges = includeSchemaChanges;
+ this.includeHeartbeatEvents = includeHeartbeatEvents;
this.outputCollector = new OutputCollector<>();
}
@@ -102,6 +105,9 @@ public class MySqlRecordEmitter<T> implements
RecordEmitter<SourceRecords, T, My
emitElement(element, output);
} else if (RecordUtils.isHeartbeatEvent(element)) {
updateStartingOffsetForSplit(splitState, element);
+ if (includeHeartbeatEvents) {
+ emitElement(element, output);
+ }
} else {
// unknown element
LOG.info("Meet unknown element {}, just skip.", element);
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java
index 5553b1ba8..7a3764461 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitterTest.java
@@ -102,6 +102,7 @@ class MySqlRecordEmitterTest {
},
new MySqlSourceReaderMetrics(
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()),
+ false,
false);
}
diff --git
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
index 7b8dfdcdb..08c337967 100644
---
a/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
+++
b/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/test/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlSourceReaderTest.java
@@ -573,7 +573,8 @@ class MySqlSourceReaderTest extends MySqlSourceTestBase {
: new MySqlRecordEmitter<>(
new ForwardDeserializeSchema(),
new
MySqlSourceReaderMetrics(readerContext.metricGroup()),
- configuration.isIncludeSchemaChanges());
+ configuration.isIncludeSchemaChanges(),
+ configuration.isIncludeHeartbeatEvents());
final MySqlSourceReaderContext mySqlSourceReaderContext =
new MySqlSourceReaderContext(readerContext);
return new MySqlSourceReader<>(
@@ -740,7 +741,7 @@ class MySqlSourceReaderTest extends MySqlSourceTestBase {
MySqlSourceReaderMetrics sourceReaderMetrics,
boolean includeSchemaChanges,
int limit) {
- super(debeziumDeserializationSchema, sourceReaderMetrics,
includeSchemaChanges);
+ super(debeziumDeserializationSchema, sourceReaderMetrics,
includeSchemaChanges, false);
this.debeziumDeserializationSchema = debeziumDeserializationSchema;
this.sourceReaderMetrics = sourceReaderMetrics;
this.includeSchemaChanges = includeSchemaChanges;