yihua commented on code in PR #18889:
URL: https://github.com/apache/hudi/pull/18889#discussion_r3337541595
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -947,10 +950,21 @@ Map<String, String> extractCheckpointMetadata(InputBatch
inputBatch, TypedProper
return Collections.emptyMap();
}
- // If we have a next checkpoint batch, use its metadata
+ // If we have a next checkpoint batch, use its metadata. Some Source
implementations (e.g.
+ // DFSPathSelector-backed *DFSSource) unconditionally return a
StreamerCheckpointV2 regardless
+ // of the target table version. The persisted checkpoint key must match
the table version
+ // contract enforced by CheckpointUtils.shouldTargetCheckpointV2 (V2 keys
are only valid for
+ // writeTableVersion >= 8 and outside the not-supported set). Normalize
here so that a v6
+ // write does not get a V2 key stamped into its commit metadata.
if (inputBatch.getCheckpointForNextBatch() != null) {
- return inputBatch.getCheckpointForNextBatch()
- .getCheckpointCommitMetadata(cfg.checkpoint, cfg.ignoreCheckpoint);
+ Checkpoint sourceCheckpoint = inputBatch.getCheckpointForNextBatch();
+ boolean targetV2 = CheckpointUtils.shouldTargetCheckpointV2(versionCode,
cfg.sourceClassName);
+ if (targetV2 && sourceCheckpoint instanceof StreamerCheckpointV1) {
Review Comment:
We should fix individual `Source` implementations to pick the right
checkpoint version instead of converting the checkpoint here, as each source
can have its own logic of determining which checkpoint version to use.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]