hudi-agent commented on code in PR #18889:
URL: https://github.com/apache/hudi/pull/18889#discussion_r3336087177


##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -472,8 +472,15 @@ private static void 
assertUseV2Checkpoint(HoodieTableMetaClient metaClient) {
         metaClient, metaClient.getActiveTimeline().lastInstant().get());
     assertFalse(metadata.isEmpty());
     Map<String, String> extraMetadata = metadata.get().getExtraMetadata();
-    assertTrue(extraMetadata.containsKey(STREAMER_CHECKPOINT_KEY_V2));
-    assertFalse(extraMetadata.containsKey(STREAMER_CHECKPOINT_KEY_V1));
+    // V1 vs V2 is driven by CheckpointUtils.shouldTargetCheckpointV2 — for v6 
tables the contract
+    // is V1, so the assertion is conditional on the on-disk table version.
+    if (metaClient.getTableConfig().getTableVersion().versionCode() >= 
HoodieTableVersion.EIGHT.versionCode()) {
+      assertTrue(extraMetadata.containsKey(STREAMER_CHECKPOINT_KEY_V2));

Review Comment:
   🤖 nit: the method is still called `assertUseV2Checkpoint` but the body now 
conditionally checks either V1 or V2 depending on table version — could you 
rename it to something like `assertCheckpointMatchesTableVersion` so future 
callers aren't surprised?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -947,10 +950,21 @@ Map<String, String> extractCheckpointMetadata(InputBatch 
inputBatch, TypedProper
       return Collections.emptyMap();
     }
 
-    // If we have a next checkpoint batch, use its metadata
+    // If we have a next checkpoint batch, use its metadata. Some Source 
implementations (e.g.
+    // DFSPathSelector-backed *DFSSource) unconditionally return a 
StreamerCheckpointV2 regardless
+    // of the target table version. The persisted checkpoint key must match 
the table version
+    // contract enforced by CheckpointUtils.shouldTargetCheckpointV2 (V2 keys 
are only valid for
+    // writeTableVersion >= 8 and outside the not-supported set). Normalize 
here so that a v6
+    // write does not get a V2 key stamped into its commit metadata.
     if (inputBatch.getCheckpointForNextBatch() != null) {
-      return inputBatch.getCheckpointForNextBatch()
-          .getCheckpointCommitMetadata(cfg.checkpoint, cfg.ignoreCheckpoint);
+      Checkpoint sourceCheckpoint = inputBatch.getCheckpointForNextBatch();

Review Comment:
   🤖 nit: since you're already introducing a local `sourceCheckpoint`, could 
you assign it before the `if` and null-check the variable instead? That avoids 
calling `getCheckpointForNextBatch()` twice and reads more cleanly as 
`Checkpoint sourceCheckpoint = inputBatch.getCheckpointForNextBatch(); if 
(sourceCheckpoint != null) { … }`.
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to