haruki-830 commented on code in PR #4437:
URL: https://github.com/apache/flink-cdc/pull/4437#discussion_r3518604120


##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/event/DataChangeEventSerializer.java:
##########
@@ -62,10 +62,17 @@ public void serialize(DataChangeEvent event, DataOutputView 
target) throws IOExc
         opSerializer.serialize(event.op(), target);
         tableIdSerializer.serialize(event.tableId(), target);
 
-        if (event.before() != null) {
+        // Write explicit presence flags so deserialize can recover null 
before/after.
+        // Required when changelog-mode=upsert (FLINK-38647) emits UPDATE 
events with
+        // before == null. Adds 2 bytes per event but makes the wire format 
symmetric.
+        boolean beforePresent = event.before() != null;
+        boolean afterPresent = event.after() != null;
+        target.writeBoolean(beforePresent);
+        target.writeBoolean(afterPresent);

Review Comment:
   This looks like a wire format breaking change — old deserializers would 
interpret these 2 boolean bytes as the start of the before RecordData, which 
could cause stream corruption on checkpoint restore.Would it be possible to 
bump DataChangeEventSerializerSnapshot.CURRENT_VERSION to 2 in this PR and add 
a version-aware fallback? 
   I think this would be important to include before merging, since checkpoint 
restoration is a production-critical path.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to