shangxinli commented on code in PR #18765:
URL: https://github.com/apache/hudi/pull/18765#discussion_r3266615702


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -872,38 +869,110 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> 
writeToSinkAndDoMetaSync(Hood
       Map<String, String> checkpointCommitMetadata = 
extractCheckpointMetadata(inputBatch, props, 
writeClient.getConfig().getWriteVersion().versionCode(), cfg);
       AtomicLong totalSuccessfulRecords = new AtomicLong(0);
       Option<String> latestCommittedInstant = getLatestCommittedInstant();
-      WriteStatusValidator writeStatusValidator = new 
HoodieStreamerWriteStatusValidator(cfg.commitOnErrors, instantTime,
-          cfg, errorTableWriter, errorTableWriteStatusRDDOpt, 
errorWriteFailureStrategy, isErrorTableWriteUnificationEnabled, writeClient, 
latestCommittedInstant,
-          totalSuccessfulRecords);
       String commitActionType = CommitUtils.getCommitActionType(cfg.operation, 
HoodieTableType.valueOf(cfg.tableType));
 
-      // Cache the RDD only when pre-commit validators are configured. 
Validators collect the RDD
-      // before commit, so without caching the same DAG would re-evaluate 
inside writeClient.commit().
-      // When no validators are configured, commit consumes the RDD once and 
caching adds no value.
-      // shouldUnpersist is true only when we created the cache here 
(validators present and storage
-      // level was NONE), so the finally block knows to release it.
+      // Pre-commit orchestration (issue #18750): the legacy 
HoodieStreamerWriteStatusValidator
+      // ran inside writeClient.commit() via the WriteStatusValidator callback 
and combined three
+      // concerns — count records, commit the error table, and gate on write 
errors. Each is now
+      // an explicit step here before writeClient.commit(), so the writer no 
longer receives a
+      // callback. Step order is deliberate (see comments below).
+      //
+      // We collect the write statuses only when user-configured pre-commit 
validators are present,
+      // because the runValidators() entry point requires a materialized 
List<WriteStatus>. When no
+      // validators are configured we count via distributed Spark aggregation, 
preserving the
+      // pre-#18750 no-overhead behavior for the default path. The cache is 
engaged in both paths
+      // so that the count action (or collect) and the later 
writeClient.commit() consume the same
+      // materialization rather than re-evaluating the upstream DAG.
       boolean validatorsConfigured = 
!StringUtils.isNullOrEmpty(props.getString(
           HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
           
HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue()));
-      boolean shouldUnpersist = validatorsConfigured && 
writeStatusRDD.getStorageLevel().equals(StorageLevel.NONE());
+      boolean shouldUnpersist = 
writeStatusRDD.getStorageLevel().equals(StorageLevel.NONE());
       if (shouldUnpersist) {
         writeStatusRDD.cache();
       }
       boolean success;
       try {
-        if (validatorsConfigured) {
-          List<WriteStatus> writeStatuses = writeStatusRDD.collect();
+        // Collect only when validators need the materialized list. Null 
marker triggers the
+        // RDD-based code paths for the writeStatuses-on-demand checks below.
+        List<WriteStatus> writeStatuses = validatorsConfigured ? 
writeStatusRDD.collect() : null;
+
+        // Step 1: Commit the error table BEFORE running validators or the 
write-error gate.
+        // Error records captured here are a genuine artifact of the write 
attempt and should
+        // survive even when a validator later blocks the data-table commit 
(otherwise the
+        // operator loses the captured errors and the next run has nothing to 
triage against).
+        // Note: success here means the error-table commit landed; no rollback 
is needed on
+        // failure because the data-table writeClient.commit() has not been 
called yet.
+        // Latent design quirk (preserved from HSWSV): if error-table commit 
succeeds and any
+        // subsequent step (Step 4 gate, validator, or writeClient.commit) 
fails, the error
+        // table will have a committed instant for a data-table instant that 
never lands.
+        // Downstream consumers of the error table should tolerate this 
divergence.
+        if (errorTableWriter.isPresent()) {
+          boolean errorTableSuccess = 
ErrorTableCommitter.commit(errorTableWriter.get(),
+              errorTableWriteStatusRDDOpt, 
isErrorTableWriteUnificationEnabled, instantTime,
+              latestCommittedInstant);
+          if (!errorTableSuccess) {
+            switch (errorWriteFailureStrategy) {
+              case ROLLBACK_COMMIT:
+                throw new HoodieStreamerWriteException("Error table commit 
failed for instant " + instantTime);

Review Comment:
   Good catch — fixed in 08a3af1. Re-added `writeClient.rollback(instantTime)` 
before the throw at both this site (error-table ROLLBACK_COMMIT) and the Step 4 
write-error gate. The next-run-cleanup path was only safe under `EAGER`; this 
preserves HSWSV's explicit-rollback behavior under `LAZY` too.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -872,38 +869,110 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> 
writeToSinkAndDoMetaSync(Hood
       Map<String, String> checkpointCommitMetadata = 
extractCheckpointMetadata(inputBatch, props, 
writeClient.getConfig().getWriteVersion().versionCode(), cfg);
       AtomicLong totalSuccessfulRecords = new AtomicLong(0);
       Option<String> latestCommittedInstant = getLatestCommittedInstant();
-      WriteStatusValidator writeStatusValidator = new 
HoodieStreamerWriteStatusValidator(cfg.commitOnErrors, instantTime,
-          cfg, errorTableWriter, errorTableWriteStatusRDDOpt, 
errorWriteFailureStrategy, isErrorTableWriteUnificationEnabled, writeClient, 
latestCommittedInstant,
-          totalSuccessfulRecords);
       String commitActionType = CommitUtils.getCommitActionType(cfg.operation, 
HoodieTableType.valueOf(cfg.tableType));
 
-      // Cache the RDD only when pre-commit validators are configured. 
Validators collect the RDD
-      // before commit, so without caching the same DAG would re-evaluate 
inside writeClient.commit().
-      // When no validators are configured, commit consumes the RDD once and 
caching adds no value.
-      // shouldUnpersist is true only when we created the cache here 
(validators present and storage
-      // level was NONE), so the finally block knows to release it.
+      // Pre-commit orchestration (issue #18750): the legacy 
HoodieStreamerWriteStatusValidator
+      // ran inside writeClient.commit() via the WriteStatusValidator callback 
and combined three
+      // concerns — count records, commit the error table, and gate on write 
errors. Each is now
+      // an explicit step here before writeClient.commit(), so the writer no 
longer receives a
+      // callback. Step order is deliberate (see comments below).
+      //
+      // We collect the write statuses only when user-configured pre-commit 
validators are present,
+      // because the runValidators() entry point requires a materialized 
List<WriteStatus>. When no
+      // validators are configured we count via distributed Spark aggregation, 
preserving the
+      // pre-#18750 no-overhead behavior for the default path. The cache is 
engaged in both paths
+      // so that the count action (or collect) and the later 
writeClient.commit() consume the same
+      // materialization rather than re-evaluating the upstream DAG.
       boolean validatorsConfigured = 
!StringUtils.isNullOrEmpty(props.getString(
           HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
           
HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue()));
-      boolean shouldUnpersist = validatorsConfigured && 
writeStatusRDD.getStorageLevel().equals(StorageLevel.NONE());
+      boolean shouldUnpersist = 
writeStatusRDD.getStorageLevel().equals(StorageLevel.NONE());
       if (shouldUnpersist) {
         writeStatusRDD.cache();
       }
       boolean success;
       try {
-        if (validatorsConfigured) {
-          List<WriteStatus> writeStatuses = writeStatusRDD.collect();
+        // Collect only when validators need the materialized list. Null 
marker triggers the
+        // RDD-based code paths for the writeStatuses-on-demand checks below.
+        List<WriteStatus> writeStatuses = validatorsConfigured ? 
writeStatusRDD.collect() : null;
+
+        // Step 1: Commit the error table BEFORE running validators or the 
write-error gate.
+        // Error records captured here are a genuine artifact of the write 
attempt and should
+        // survive even when a validator later blocks the data-table commit 
(otherwise the
+        // operator loses the captured errors and the next run has nothing to 
triage against).
+        // Note: success here means the error-table commit landed; no rollback 
is needed on
+        // failure because the data-table writeClient.commit() has not been 
called yet.
+        // Latent design quirk (preserved from HSWSV): if error-table commit 
succeeds and any
+        // subsequent step (Step 4 gate, validator, or writeClient.commit) 
fails, the error
+        // table will have a committed instant for a data-table instant that 
never lands.
+        // Downstream consumers of the error table should tolerate this 
divergence.
+        if (errorTableWriter.isPresent()) {

Review Comment:
   Right — widened the latent-quirk comment in 08a3af1 to call out that a Step 
2 validator failure (including the offset validator) has the same 
error-table-vs-data-table divergence, not just Step 4 / writeClient.commit. 
Downstream error-table consumers were already expected to tolerate this from 
HSWSV.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to