danny0405 commented on code in PR #18405:
URL: https://github.com/apache/hudi/pull/18405#discussion_r3214248837


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -874,8 +876,32 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> 
writeToSinkAndDoMetaSync(Hood
           totalSuccessfulRecords);
       String commitActionType = CommitUtils.getCommitActionType(cfg.operation, 
HoodieTableType.valueOf(cfg.tableType));
 
-      boolean success = writeClient.commit(instantTime, writeStatusRDD, 
Option.of(checkpointCommitMetadata), commitActionType, 
partitionToReplacedFileIds, Option.empty(),
-          Option.of(writeStatusValidator));
+      // Cache the RDD if not already persisted, so both validators (collect) 
and
+      // writeClient.commit() share the same materialized result without 
re-evaluation.
+      // shouldUnpersist is true when we created the cache here (storage level 
was NONE),
+      // so the finally block knows to release it.
+      boolean shouldUnpersist = 
writeStatusRDD.getStorageLevel().equals(StorageLevel.NONE());
+      if (shouldUnpersist) {
+        writeStatusRDD.cache();

Review Comment:
   1. can we trigger the cache only when the validators are not empty;
   2. I see we already have a `writeStatusValidator` for validating the write 
errors, it should be feasible that we migrate it as one of the pre-commit 
validators.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to