haruki-830 commented on code in PR #4437:
URL: https://github.com/apache/flink-cdc/pull/4437#discussion_r3518604120
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/DataChangeEventSerializer.java:
##########
@@ -62,10 +62,17 @@ public void serialize(DataChangeEvent event, DataOutputView
target) throws IOExc
opSerializer.serialize(event.op(), target);
tableIdSerializer.serialize(event.tableId(), target);
- if (event.before() != null) {
+ // Write explicit presence flags so deserialize can recover null
before/after.
+ // Required when changelog-mode=upsert (FLINK-38647) emits UPDATE
events with
+ // before == null. Adds 2 bytes per event but makes the wire format
symmetric.
+ boolean beforePresent = event.before() != null;
+ boolean afterPresent = event.after() != null;
+ target.writeBoolean(beforePresent);
+ target.writeBoolean(afterPresent);
Review Comment:
This looks like a wire format breaking change — old deserializers would
interpret these 2 boolean bytes as the start of the before RecordData, which
could cause stream corruption on checkpoint restore.Would it be possible to
bump DataChangeEventSerializerSnapshot.CURRENT_VERSION to 2 in this PR and add
a version-aware fallback?
I think this would be important to include before merging, since checkpoint
restoration is a production-critical path.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]