hudi-agent commented on code in PR #18765:
URL: https://github.com/apache/hudi/pull/18765#discussion_r3260475861
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -872,38 +869,110 @@ private Pair<Option<String>, JavaRDD<WriteStatus>>
writeToSinkAndDoMetaSync(Hood
Map<String, String> checkpointCommitMetadata =
extractCheckpointMetadata(inputBatch, props,
writeClient.getConfig().getWriteVersion().versionCode(), cfg);
AtomicLong totalSuccessfulRecords = new AtomicLong(0);
Option<String> latestCommittedInstant = getLatestCommittedInstant();
- WriteStatusValidator writeStatusValidator = new
HoodieStreamerWriteStatusValidator(cfg.commitOnErrors, instantTime,
- cfg, errorTableWriter, errorTableWriteStatusRDDOpt,
errorWriteFailureStrategy, isErrorTableWriteUnificationEnabled, writeClient,
latestCommittedInstant,
- totalSuccessfulRecords);
String commitActionType = CommitUtils.getCommitActionType(cfg.operation,
HoodieTableType.valueOf(cfg.tableType));
- // Cache the RDD only when pre-commit validators are configured.
Validators collect the RDD
- // before commit, so without caching the same DAG would re-evaluate
inside writeClient.commit().
- // When no validators are configured, commit consumes the RDD once and
caching adds no value.
- // shouldUnpersist is true only when we created the cache here
(validators present and storage
- // level was NONE), so the finally block knows to release it.
+ // Pre-commit orchestration (issue #18750): the legacy
HoodieStreamerWriteStatusValidator
+ // ran inside writeClient.commit() via the WriteStatusValidator callback
and combined three
+ // concerns — count records, commit the error table, and gate on write
errors. Each is now
+ // an explicit step here before writeClient.commit(), so the writer no
longer receives a
+ // callback. Step order is deliberate (see comments below).
+ //
+ // We collect the write statuses only when user-configured pre-commit
validators are present,
+ // because the runValidators() entry point requires a materialized
List<WriteStatus>. When no
+ // validators are configured we count via distributed Spark aggregation,
preserving the
+ // pre-#18750 no-overhead behavior for the default path. The cache is
engaged in both paths
+ // so that the count action (or collect) and the later
writeClient.commit() consume the same
+ // materialization rather than re-evaluating the upstream DAG.
boolean validatorsConfigured =
!StringUtils.isNullOrEmpty(props.getString(
HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue()));
- boolean shouldUnpersist = validatorsConfigured &&
writeStatusRDD.getStorageLevel().equals(StorageLevel.NONE());
+ boolean shouldUnpersist =
writeStatusRDD.getStorageLevel().equals(StorageLevel.NONE());
if (shouldUnpersist) {
writeStatusRDD.cache();
}
boolean success;
try {
- if (validatorsConfigured) {
- List<WriteStatus> writeStatuses = writeStatusRDD.collect();
+ // Collect only when validators need the materialized list. Null
marker triggers the
+ // RDD-based code paths for the writeStatuses-on-demand checks below.
+ List<WriteStatus> writeStatuses = validatorsConfigured ?
writeStatusRDD.collect() : null;
+
+ // Step 1: Commit the error table BEFORE running validators or the
write-error gate.
+ // Error records captured here are a genuine artifact of the write
attempt and should
+ // survive even when a validator later blocks the data-table commit
(otherwise the
+ // operator loses the captured errors and the next run has nothing to
triage against).
+ // Note: success here means the error-table commit landed; no rollback
is needed on
+ // failure because the data-table writeClient.commit() has not been
called yet.
+ // Latent design quirk (preserved from HSWSV): if error-table commit
succeeds and any
+ // subsequent step (Step 4 gate, validator, or writeClient.commit)
fails, the error
+ // table will have a committed instant for a data-table instant that
never lands.
+ // Downstream consumers of the error table should tolerate this
divergence.
+ if (errorTableWriter.isPresent()) {
Review Comment:
🤖 Worth confirming this ordering change: in the original flow,
`SparkStreamerValidatorUtils.runValidators(...)` ran before
`writeClient.commit()`, and the error-table commit happened inside `HSWSV`
(i.e. inside `writeClient.commit()`). So if e.g. `SparkKafkaOffsetValidator`
threw, the error table was NOT committed. With Step 1 now ahead of Step 2, an
offset-validator failure leaves the error table committed for a data-table
instant that never lands. The Step 1 comment notes a similar quirk for the
inline gate; could you call out that this now applies to the offset validator
too, and confirm downstream error-table consumers tolerate it?
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -872,38 +869,110 @@ private Pair<Option<String>, JavaRDD<WriteStatus>>
writeToSinkAndDoMetaSync(Hood
Map<String, String> checkpointCommitMetadata =
extractCheckpointMetadata(inputBatch, props,
writeClient.getConfig().getWriteVersion().versionCode(), cfg);
AtomicLong totalSuccessfulRecords = new AtomicLong(0);
Option<String> latestCommittedInstant = getLatestCommittedInstant();
- WriteStatusValidator writeStatusValidator = new
HoodieStreamerWriteStatusValidator(cfg.commitOnErrors, instantTime,
- cfg, errorTableWriter, errorTableWriteStatusRDDOpt,
errorWriteFailureStrategy, isErrorTableWriteUnificationEnabled, writeClient,
latestCommittedInstant,
- totalSuccessfulRecords);
String commitActionType = CommitUtils.getCommitActionType(cfg.operation,
HoodieTableType.valueOf(cfg.tableType));
- // Cache the RDD only when pre-commit validators are configured.
Validators collect the RDD
- // before commit, so without caching the same DAG would re-evaluate
inside writeClient.commit().
- // When no validators are configured, commit consumes the RDD once and
caching adds no value.
- // shouldUnpersist is true only when we created the cache here
(validators present and storage
- // level was NONE), so the finally block knows to release it.
+ // Pre-commit orchestration (issue #18750): the legacy
HoodieStreamerWriteStatusValidator
+ // ran inside writeClient.commit() via the WriteStatusValidator callback
and combined three
+ // concerns — count records, commit the error table, and gate on write
errors. Each is now
+ // an explicit step here before writeClient.commit(), so the writer no
longer receives a
+ // callback. Step order is deliberate (see comments below).
+ //
+ // We collect the write statuses only when user-configured pre-commit
validators are present,
+ // because the runValidators() entry point requires a materialized
List<WriteStatus>. When no
+ // validators are configured we count via distributed Spark aggregation,
preserving the
+ // pre-#18750 no-overhead behavior for the default path. The cache is
engaged in both paths
+ // so that the count action (or collect) and the later
writeClient.commit() consume the same
+ // materialization rather than re-evaluating the upstream DAG.
boolean validatorsConfigured =
!StringUtils.isNullOrEmpty(props.getString(
HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue()));
- boolean shouldUnpersist = validatorsConfigured &&
writeStatusRDD.getStorageLevel().equals(StorageLevel.NONE());
+ boolean shouldUnpersist =
writeStatusRDD.getStorageLevel().equals(StorageLevel.NONE());
if (shouldUnpersist) {
writeStatusRDD.cache();
}
boolean success;
try {
- if (validatorsConfigured) {
- List<WriteStatus> writeStatuses = writeStatusRDD.collect();
+ // Collect only when validators need the materialized list. Null
marker triggers the
+ // RDD-based code paths for the writeStatuses-on-demand checks below.
+ List<WriteStatus> writeStatuses = validatorsConfigured ?
writeStatusRDD.collect() : null;
+
+ // Step 1: Commit the error table BEFORE running validators or the
write-error gate.
+ // Error records captured here are a genuine artifact of the write
attempt and should
+ // survive even when a validator later blocks the data-table commit
(otherwise the
+ // operator loses the captured errors and the next run has nothing to
triage against).
+ // Note: success here means the error-table commit landed; no rollback
is needed on
+ // failure because the data-table writeClient.commit() has not been
called yet.
+ // Latent design quirk (preserved from HSWSV): if error-table commit
succeeds and any
+ // subsequent step (Step 4 gate, validator, or writeClient.commit)
fails, the error
+ // table will have a committed instant for a data-table instant that
never lands.
+ // Downstream consumers of the error table should tolerate this
divergence.
+ if (errorTableWriter.isPresent()) {
+ boolean errorTableSuccess =
ErrorTableCommitter.commit(errorTableWriter.get(),
+ errorTableWriteStatusRDDOpt,
isErrorTableWriteUnificationEnabled, instantTime,
+ latestCommittedInstant);
+ if (!errorTableSuccess) {
+ switch (errorWriteFailureStrategy) {
+ case ROLLBACK_COMMIT:
+ throw new HoodieStreamerWriteException("Error table commit
failed for instant " + instantTime);
Review Comment:
🤖 The legacy `HoodieStreamerWriteStatusValidator` explicitly called
`writeClient.rollback(instantTime)` before throwing here (and in the
write-error gate around line 969). The comment above says "no rollback is
needed... because writeClient.commit() has not been called yet," but
`writeToSink()` has already created an INFLIGHT instant — without an explicit
rollback the inflight is left dangling. The outer `finally` only calls
`releaseResources()` (heartbeat stop), not a rollback. Cleanup relies on
`hoodie.clean.failed.writes.policy` being `EAGER` on the next sync; users
running `LAZY` will accumulate orphan inflights. Could you either re-add
`writeClient.rollback(instantTime)` here and at the write-error throw, or
confirm the next-run-cleanup behavior is intentional? @yihua
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/SuccessfulRecordCounter.java:
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.util.Option;
+
+import org.apache.spark.api.java.JavaRDD;
+
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Computes record counts for a HoodieStreamer commit, summing across the
data-table
+ * write statuses and (optionally) the error-table write statuses when
error-table
+ * write unification is enabled.
+ *
+ * <p>Extracted from {@code HoodieStreamerWriteStatusValidator} (issue #18750)
so the
+ * counting logic can be invoked from the explicit pre-commit orchestration in
+ * {@code StreamSync} without going through the {@code WriteStatusValidator}
callback.</p>
+ *
+ * <p>Two entry points are provided so callers can avoid a driver-side {@code
collect()}
+ * when no pre-commit validators are configured:</p>
+ * <ul>
+ * <li>{@link #compute(List, Option, boolean)} — driver-side sum over an
already-collected
+ * list. Use when a validator path needs the list anyway.</li>
+ * <li>{@link #computeFromRdd(JavaRDD, Option, boolean)} — distributed Spark
aggregation
+ * over the RDD. Use when no validators are configured to keep the
no-overhead path.</li>
+ * </ul>
+ */
+public final class SuccessfulRecordCounter {
+
+ private SuccessfulRecordCounter() {
+ }
+
+ /**
+ * Compute total / errored / successful record counts from a pre-collected
list of write statuses.
+ *
+ * @param dataTableWriteStatuses Pre-collected data-table write
statuses. Must not be null.
+ * @param errorTableWriteStatusRDDOpt Optional error-table write status
RDD; only consulted
+ * when unification is enabled. Must
not be null
+ * ({@link Option#empty()} when no
error table).
+ * @param isErrorTableWriteUnificationEnabled Whether error-table records
contribute to the totals.
+ * @return immutable {@link Counts} snapshot.
+ */
+ public static Counts compute(List<WriteStatus> dataTableWriteStatuses,
+ Option<JavaRDD<WriteStatus>>
errorTableWriteStatusRDDOpt,
+ boolean isErrorTableWriteUnificationEnabled) {
+ Objects.requireNonNull(dataTableWriteStatuses, "dataTableWriteStatuses");
+ Objects.requireNonNull(errorTableWriteStatusRDDOpt,
"errorTableWriteStatusRDDOpt");
+
+ long totalRecords = 0L;
+ long totalErroredRecords = 0L;
+ for (WriteStatus ws : dataTableWriteStatuses) {
+ totalRecords += ws.getTotalRecords();
+ totalErroredRecords += ws.getTotalErrorRecords();
+ }
+ return addErrorTable(totalRecords, totalErroredRecords,
+ errorTableWriteStatusRDDOpt, isErrorTableWriteUnificationEnabled);
+ }
+
+ /**
+ * Compute counts via distributed Spark aggregation without materializing
the data-table write
+ * statuses on the driver. Use this when no pre-commit validators are
configured.
+ *
+ * @param dataTableRdd Data-table write status RDD. Must not be null.
+ */
+ public static Counts computeFromRdd(JavaRDD<WriteStatus> dataTableRdd,
+ Option<JavaRDD<WriteStatus>>
errorTableWriteStatusRDDOpt,
+ boolean
isErrorTableWriteUnificationEnabled) {
Review Comment:
🤖 `computeFromRdd` runs two Spark actions over the same RDD (one for
`TOTAL_RECORDS`, one for `TOTAL_ERROR_RECORDS`). The caller now always caches
`writeStatusRDD` so each pass is cheap, but you can collapse this into a single
aggregate over a tuple if you want a one-pass version. Not a blocker.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -872,38 +869,110 @@ private Pair<Option<String>, JavaRDD<WriteStatus>>
writeToSinkAndDoMetaSync(Hood
Map<String, String> checkpointCommitMetadata =
extractCheckpointMetadata(inputBatch, props,
writeClient.getConfig().getWriteVersion().versionCode(), cfg);
AtomicLong totalSuccessfulRecords = new AtomicLong(0);
Option<String> latestCommittedInstant = getLatestCommittedInstant();
- WriteStatusValidator writeStatusValidator = new
HoodieStreamerWriteStatusValidator(cfg.commitOnErrors, instantTime,
- cfg, errorTableWriter, errorTableWriteStatusRDDOpt,
errorWriteFailureStrategy, isErrorTableWriteUnificationEnabled, writeClient,
latestCommittedInstant,
- totalSuccessfulRecords);
String commitActionType = CommitUtils.getCommitActionType(cfg.operation,
HoodieTableType.valueOf(cfg.tableType));
- // Cache the RDD only when pre-commit validators are configured.
Validators collect the RDD
- // before commit, so without caching the same DAG would re-evaluate
inside writeClient.commit().
- // When no validators are configured, commit consumes the RDD once and
caching adds no value.
- // shouldUnpersist is true only when we created the cache here
(validators present and storage
- // level was NONE), so the finally block knows to release it.
+ // Pre-commit orchestration (issue #18750): the legacy
HoodieStreamerWriteStatusValidator
+ // ran inside writeClient.commit() via the WriteStatusValidator callback
and combined three
+ // concerns — count records, commit the error table, and gate on write
errors. Each is now
+ // an explicit step here before writeClient.commit(), so the writer no
longer receives a
+ // callback. Step order is deliberate (see comments below).
+ //
+ // We collect the write statuses only when user-configured pre-commit
validators are present,
+ // because the runValidators() entry point requires a materialized
List<WriteStatus>. When no
+ // validators are configured we count via distributed Spark aggregation,
preserving the
+ // pre-#18750 no-overhead behavior for the default path. The cache is
engaged in both paths
+ // so that the count action (or collect) and the later
writeClient.commit() consume the same
+ // materialization rather than re-evaluating the upstream DAG.
boolean validatorsConfigured =
!StringUtils.isNullOrEmpty(props.getString(
HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue()));
- boolean shouldUnpersist = validatorsConfigured &&
writeStatusRDD.getStorageLevel().equals(StorageLevel.NONE());
+ boolean shouldUnpersist =
writeStatusRDD.getStorageLevel().equals(StorageLevel.NONE());
if (shouldUnpersist) {
writeStatusRDD.cache();
}
boolean success;
try {
- if (validatorsConfigured) {
- List<WriteStatus> writeStatuses = writeStatusRDD.collect();
+ // Collect only when validators need the materialized list. Null
marker triggers the
+ // RDD-based code paths for the writeStatuses-on-demand checks below.
+ List<WriteStatus> writeStatuses = validatorsConfigured ?
writeStatusRDD.collect() : null;
Review Comment:
🤖 nit: have you considered `Option<List<WriteStatus>> writeStatusesOpt =
validatorsConfigured ? Option.of(writeStatusRDD.collect()) : Option.empty()`
here? The null sentinel works, but it feels a bit out of place in a codebase
that uses `Option` everywhere for may-or-may-not-be-present values — and it
would let you replace the `writeStatuses != null` guard on the error-logging
path with a more idiomatic `isPresent()` check.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkWriteErrorValidator.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.validator.BasePreCommitValidator;
+import org.apache.hudi.client.validator.ValidationContext;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import
org.apache.hudi.config.HoodiePreCommitValidatorConfig.ValidationFailurePolicy;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import lombok.extern.slf4j.Slf4j;
+
+/**
+ * Pre-commit validator that fails the commit when records failed to write.
+ *
+ * <p>Equivalent of the legacy {@code HoodieStreamerWriteStatusValidator}'s
boolean error check
+ * ({@code hasErrorRecords = totalErroredRecords > 0}), wired through the
pre-commit validator
+ * framework (issue #18750). Pure validation: no side effects (no error-table
commit, no
+ * top-100 error logging, no instant rollback). Those side effects are handled
separately by
+ * {@code StreamSync}'s pre-commit orchestration.</p>
+ *
+ * <p><b>Relationship with the inline write-error gate in {@code
StreamSync}:</b> the default
+ * commit path in {@code StreamSync} already applies an equivalent error check
via the
+ * {@code commitOnErrors} flag. This validator exists so that users running
multiple validators
+ * (e.g. write-error + offset checks) can express a unified pass/fail story
through a single
+ * {@code failure.policy} knob. Enabling this validator while leaving {@code
commitOnErrors=false}
+ * means both checks run and either can block the commit — they are
intentionally not mutually
+ * exclusive.</p>
+ *
+ * <p>Behavior mapping from the legacy HSWSV:</p>
+ * <ul>
+ * <li>{@code commitOnErrors = false} (HSWSV default) ↔ {@code
failure.policy = FAIL}</li>
+ * <li>{@code commitOnErrors = true} ↔ {@code failure.policy = WARN_LOG}</li>
+ * </ul>
+ *
+ * <p>Configuration:</p>
+ * <ul>
+ * <li>{@code hoodie.precommit.validators}: Include
+ * {@code
org.apache.hudi.utilities.streamer.validator.SparkWriteErrorValidator}</li>
+ * <li>{@code hoodie.precommit.validators.failure.policy}: FAIL (default) or
WARN_LOG</li>
+ * </ul>
+ *
+ * <p>Like {@link SparkKafkaOffsetValidator}, this class extends {@link
BasePreCommitValidator}
+ * and must be invoked via {@link SparkStreamerValidatorUtils} — not {@code
SparkValidatorUtils},
+ * which expects a different constructor signature.</p>
+ */
+@Slf4j
+public class SparkWriteErrorValidator extends BasePreCommitValidator {
+
+ private final ValidationFailurePolicy failurePolicy;
+
+ public SparkWriteErrorValidator(TypedProperties config) {
+ super(config);
+ String policyStr = config.getString(
+ HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(),
+
HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.defaultValue());
+ try {
+ this.failurePolicy = ValidationFailurePolicy.valueOf(policyStr);
+ } catch (IllegalArgumentException e) {
+ throw new HoodieValidationException(String.format(
+ "Invalid value '%s' for %s. Allowed values: %s.",
+ policyStr,
+ HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(),
+ java.util.Arrays.toString(ValidationFailurePolicy.values())), e);
+ }
Review Comment:
🤖 nit: could you add `import java.util.Arrays;` at the top and write
`Arrays.toString(ValidationFailurePolicy.values())` here? Inline FQNs are a bit
jarring when a plain import would do.
<sub><i>- AI-generated; verify before applying. React 👍/👎 to flag
quality.</i></sub>
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]