yihua commented on code in PR #18889:
URL: https://github.com/apache/hudi/pull/18889#discussion_r3337541595


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -947,10 +950,21 @@ Map<String, String> extractCheckpointMetadata(InputBatch 
inputBatch, TypedProper
       return Collections.emptyMap();
     }
 
-    // If we have a next checkpoint batch, use its metadata
+    // If we have a next checkpoint batch, use its metadata. Some Source 
implementations (e.g.
+    // DFSPathSelector-backed *DFSSource) unconditionally return a 
StreamerCheckpointV2 regardless
+    // of the target table version. The persisted checkpoint key must match 
the table version
+    // contract enforced by CheckpointUtils.shouldTargetCheckpointV2 (V2 keys 
are only valid for
+    // writeTableVersion >= 8 and outside the not-supported set). Normalize 
here so that a v6
+    // write does not get a V2 key stamped into its commit metadata.
     if (inputBatch.getCheckpointForNextBatch() != null) {
-      return inputBatch.getCheckpointForNextBatch()
-          .getCheckpointCommitMetadata(cfg.checkpoint, cfg.ignoreCheckpoint);
+      Checkpoint sourceCheckpoint = inputBatch.getCheckpointForNextBatch();
+      boolean targetV2 = CheckpointUtils.shouldTargetCheckpointV2(versionCode, 
cfg.sourceClassName);
+      if (targetV2 && sourceCheckpoint instanceof StreamerCheckpointV1) {

Review Comment:
   We should fix individual `Source` implementations to pick the right 
checkpoint version instead of converting the checkpoint here, as each source 
can have its own logic of determining which checkpoint version to use.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to