Tejansh Rana created FLINK-38691:
------------------------------------

             Summary: Support for Transaction Boundary Events in Flink CDC 
Connector
                 Key: FLINK-38691
                 URL: https://issues.apache.org/jira/browse/FLINK-38691
             Project: Flink
          Issue Type: New Feature
          Components: Flink CDC
            Reporter: Tejansh Rana


Following my discussion with [~leonard] at Flink Forward, I am writing to 
propose a feature enhancement for the Flink CDC connectors based on Debezium, 
related to how it handles transaction metadata.

*Problem Statement:*

In data streaming pipelines that require transactional guarantees or need to 
group atomic changes together, it is essential to identify the boundaries of 
the original database transaction (i.e., the BEGIN and COMMIT or END events). 
Currently, the Flink CDC connectors appear to skip these transaction lifecycle 
events - 
[https://github.com/apache/flink-cdc/blob/23a1c2efb6fa9ce1c9f17b3836f6aaa995bb0660/flink-cdc-connect/flink-cdc-source-connectors/flink-connector-mysql-cdc/src/main/java/org/apache/flink/cdc/connectors/mysql/source/reader/MySqlRecordEmitter.java#L77]
 

[https://github.com/apache/flink-cdc/blob/23a1c2efb6fa9ce1c9f17b3836f6aaa995bb0660/flink-cdc-connect/flink-cdc-source-connectors/flink-cdc-base/src/main/java/org/apache/flink/cdc/connectors/base/source/reader/IncrementalSourceRecordEmitter.java#L92]

This omission makes it challenging to reconstruct the original transaction 
scope. Without explicit transaction markers, downstream Flink jobs cannot 
easily guarantee atomicity across sinks.

*Proposed Solution:*

The underlying CDC mechanism, Debezium, supports emitting transaction boundary 
events (BEGIN and END/COMMIT) through its configuration.

We propose enhancing the Flink CDC connectors to expose this transaction 
metadata to the Flink pipeline. The connector should emit specialised records 
or metadata fields that indicate the start and end of a transaction as emitted.

*Logs of current behaviour:*

MySQL CDC connector

{{2025-11-04 14:34:14,033 INFO  
org.apache.flink.cdc.connectors.mysql.source.reader.MySqlRecordEmitter [] - 
Meet unknown element 
SourceRecord\{sourcePartition={server=mysql_binlog_source}, 
sourceOffset=\{transaction_id=4541146d-b988-11f0-87f6-0242ac140006:18, 
file=mysql-bin.000003, pos=5850, 
gtids=4541146d-b988-11f0-87f6-0242ac140006:1-17, server_id=1}} 
ConnectRecord\{topic='mysql_binlog_source.transaction', kafkaPartition=null, 
key=Struct{id=4541146d-b988-11f0-87f6-0242ac140006:18}, 
keySchema=Schema\{io.debezium.connector.common.TransactionMetadataKey:STRUCT}, 
value=Struct{color:#FF0000}{status=BEGIN,id=4541146d-b988-11f0-87f6-0242ac140006:18},
 
valueSchema=Schema\{io.debezium.connector.common.TransactionMetadataValue:STRUCT},
 timestamp=null, headers=ConnectHeaders(headers=)}, just skip{color}.}}

{color:#172b4d}{{PostgreSQL CDC Connector}}{color}

{color:#172b4d}{{2025-11-18 09:02:25,544 INFO  
org.apache.flink.cdc.connectors.base.source.reader.IncrementalSourceRecordEmitter
 [] - Meet unknown element 
SourceRecord\{sourcePartition={server=postgres_cdc_source}, 
sourceOffset=\{transaction_id=843, lsn_proc=31198112, lsn=31198568, txId=843, 
ts_usec=1763456544336451}} 
ConnectRecord\{topic='postgres_cdc_source.transaction', kafkaPartition=null, 
key=Struct{id=843}, 
keySchema={color:#de350b}Schema\{io.debezium.connector.common.TransactionMetadataKey:STRUCT},
 value=Struct\{status=BEGIN,id=843}{color}, 
valueSchema=Schema\{io.debezium.connector.common.TransactionMetadataValue:STRUCT},
 timestamp=null, headers=ConnectHeaders(headers=)} for splitState = 
StreamSplitState\{startingOffset=Offset{lsn=LSN{0/1DC0BA0}, txId=840, 
lastCommitTs=-9223372036854775808}, 
endingOffset=Offset\{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null, 
lastCommitTs=-9223372036853775810}, split=StreamSplit\{splitId='stream-split', 
offset=Offset{lsn=LSN{0/1DC0BA0}, txId=840, lastCommitTs=-9223372036854775808}, 
endOffset=Offset\{lsn=LSN{FFFFFFFF/FFFFFFFF}, txId=null, 
lastCommitTs=-9223372036853775810}, isSuspended=false, 
isSnapshotCompleted=true}}, just skip.}}{color}

 

{color:#172b4d}Draft PR for MySQL connector with proposed changes - 
[https://github.com/apache/flink-cdc/pull/4170]{color}

{color:#172b4d}I would be happy to create a corresponding PR for the base 
Incremental Source Record Emitter as well should we accept this proposal{color}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to