shangxinli commented on code in PR #18405:
URL: https://github.com/apache/hudi/pull/18405#discussion_r3215083965


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -874,8 +876,32 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> 
writeToSinkAndDoMetaSync(Hood
           totalSuccessfulRecords);
       String commitActionType = CommitUtils.getCommitActionType(cfg.operation, 
HoodieTableType.valueOf(cfg.tableType));
 
-      boolean success = writeClient.commit(instantTime, writeStatusRDD, 
Option.of(checkpointCommitMetadata), commitActionType, 
partitionToReplacedFileIds, Option.empty(),
-          Option.of(writeStatusValidator));
+      // Cache the RDD if not already persisted, so both validators (collect) 
and
+      // writeClient.commit() share the same materialized result without 
re-evaluation.
+      // shouldUnpersist is true when we created the cache here (storage level 
was NONE),
+      // so the finally block knows to release it.
+      boolean shouldUnpersist = 
writeStatusRDD.getStorageLevel().equals(StorageLevel.NONE());
+      if (shouldUnpersist) {
+        writeStatusRDD.cache();

Review Comment:
   Thanks @danny0405!
   
   **1. Conditional cache** — done in 5f4bca6. The cache/unpersist cycle is now 
guarded by a `validatorsConfigured` check derived from 
`hoodie.precommit.validators`, so the no-validator path stays a single-pass DAG 
with zero overhead.
   
   **2. Migrating `writeStatusValidator` to a pre-commit validator** — agree 
with the direction, but I'd like to do this as a follow-up PR rather than 
expanding scope here. The reason: `HoodieStreamerWriteStatusValidator` 
(StreamSync.java:1403) does substantially more than pure validation:
   
   - Counts records/errors and writes to a shared `AtomicLong 
totalSuccessfulRecords` consumed *after* commit (drives the `runMetaSync()` 
decision)
   - **Commits the error table** as a side effect, with 
`ROLLBACK_COMMIT`/`LOG_ERROR` strategies
   - Rolls back the instant on validation failure and logs the top-100 errors
   - Runs *inside* `writeClient.commit()` via the `WriteStatusValidator` 
callback hook (not before commit), so the framework difference matters
   
   A `BasePreCommitValidator` only takes `TypedProperties` and exposes state 
via `ValidationContext` — there's no path today to surface the raw 
`JavaRDD<WriteStatus>` (needed for error-table commit) or the post-commit 
counter through that interface. A clean migration would need to either (a) 
extend `ValidationContext` to expose the RDD and inject the error-table writer, 
or (b) split the side-effects out of the callback into a separate 
post-validation step.
   
   Would you be OK if I file a follow-up issue + PR for that refactor so this 
PR can stay focused on the streaming-offset framework? Happy to take it on 
right after this lands.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to