danny0405 commented on code in PR #18405:
URL: https://github.com/apache/hudi/pull/18405#discussion_r3214248837
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -874,8 +876,32 @@ private Pair<Option<String>, JavaRDD<WriteStatus>>
writeToSinkAndDoMetaSync(Hood
totalSuccessfulRecords);
String commitActionType = CommitUtils.getCommitActionType(cfg.operation,
HoodieTableType.valueOf(cfg.tableType));
- boolean success = writeClient.commit(instantTime, writeStatusRDD,
Option.of(checkpointCommitMetadata), commitActionType,
partitionToReplacedFileIds, Option.empty(),
- Option.of(writeStatusValidator));
+ // Cache the RDD if not already persisted, so both validators (collect)
and
+ // writeClient.commit() share the same materialized result without
re-evaluation.
+ // shouldUnpersist is true when we created the cache here (storage level
was NONE),
+ // so the finally block knows to release it.
+ boolean shouldUnpersist =
writeStatusRDD.getStorageLevel().equals(StorageLevel.NONE());
+ if (shouldUnpersist) {
+ writeStatusRDD.cache();
Review Comment:
1. can we trigger the cache only when the validators are not empty;
2. I see we already have a `writeStatusValidator` for validating the write
errors, it should be feasible that we migrate it as one of the pre-commit
validators.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]