This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new b934633f066a feat(utilities): add Spark/HoodieStreamer validators for
pre-commit validation - Phase 3 (#18405)
b934633f066a is described below
commit b934633f066a6efa57e04837b2536baa66b4f2f4
Author: Xinli Shang <[email protected]>
AuthorDate: Sat May 16 20:37:56 2026 -0700
feat(utilities): add Spark/HoodieStreamer validators for pre-commit
validation - Phase 3 (#18405)
* feat: Add Spark streamer validators for phase 3 precommit validation
Implements phase 3 of the precommit validation framework by adding:
- SparkKafkaOffsetValidator: Validates Kafka offset consistency
- SparkValidationContext: Provides Spark-specific validation context
- SparkStreamerValidatorUtils: Utility functions for Spark streamer
validation
- Comprehensive test coverage for all validator components
- Integration with StreamSync and HoodiePreCommitValidatorConfig
Co-Authored-By: Claude Opus 4.6 <[email protected]>
* fix: address code review and fix checkstyle violations
- Remove unused imports (java.io.IOException, HoodieCommitMetadata,
HoodieTestTable, Option) that caused checkstyle build failures
- Remove accidentally committed bootstrap_register_only_issue.md
- Cache writeStatusRDD before collect() to prevent second DAG evaluation
and potential driver OOM
- Add comment explaining why validator runs before writeClient.commit():
offset validation is a stronger guard than commitOnErrors and must
prevent the commit when data loss is detected
- Clarify buildCommitMetadata() produces a pre-commit preview object,
not a fully-constructed commit record
- Add Javadoc to SparkKafkaOffsetValidator and SparkStreamerValidatorUtils
explaining incompatibility with SparkValidatorUtils (different interface
and constructor signature) to prevent misconfiguration
- Add two-commit integration tests (testSecondCommitMatchingOffsetsPasses,
testSecondCommitDataLossDetected) using HoodieTestTable to exercise the
real offset comparison path, not just the first-commit skip path
* fix: skip non-SparkPreCommitValidator classes in SparkValidatorUtils
SparkKafkaOffsetValidator (and similar streaming validators) extend
BasePreCommitValidator with a (TypedProperties) constructor, not the
(HoodieSparkTable, HoodieEngineContext, HoodieWriteConfig) constructor
that SparkValidatorUtils expects. Listing such a validator in
hoodie.precommit.validators previously caused a reflection error in the
Spark table write path.
Add a Class.isAssignableFrom check to filter out classes that don't
implement SparkPreCommitValidator before attempting instantiation, with
a clear warning pointing users to SparkStreamerValidatorUtils for
streaming validators.
* ci: trigger CI re-run for flaky trino test
* fix: address reviewer comments on pre-commit streaming offset validator
- Unpersist cached RDD in finally block to prevent executor memory leak
- Let IOException propagate from loadPreviousCommitMetadata instead of
silently swallowing it
- Filter empty validator class names before Class.forName to handle
trailing comma in config
- Add write error count to validation message to distinguish write failures
from silent data loss
* fix: address reviewer follow-up comments on pre-commit streaming validator
- Change runValidators to accept List<WriteStatus> instead of JavaRDD
to fix RDD unpersist-before-commit bug; StreamSync now caches the RDD,
collects to list for validators, passes RDD to commit, then unpersists
- Remove generic catch(Exception) in loadPreviousCommitMetadata so
non-IOException failures propagate instead of silently skipping validation
- Implement getPreviousCommitInstant() in SparkValidationContext via
timeline lookup instead of throwing UnsupportedOperationException
- Add Objects::nonNull filter when building writeStats list
- Add BasePreCommitValidator assignability guard in
SparkStreamerValidatorUtils
to warn and skip SparkPreCommitValidator classes (reverse-direction guard)
- Eliminate double class loading in SparkValidatorUtils by combining
filter+map into a single flatMap; remove unused ReflectionUtils import
- Remove trivial constructor Javadoc from SparkKafkaOffsetValidator
- Add HoodieTestUtils import in test; remove Spark context boilerplate
now that runValidators accepts List<WriteStatus> directly
* fix: guard cache() call when writeStatusRDD is already persisted
Calling cache() on an RDD that already has a storage level assigned throws
SparkUnsupportedOperationException. The write path may cache the RDD
internally before returning it. Track whether we own the cache and only
call cache()/unpersist() when the RDD was not already persisted.
* Ensure writeStatusRDD is always unpersisted via try/finally
* fix: use proper import for StorageLevel instead of fully-qualified class
reference
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude Sonnet 4.6 <[email protected]>
* fix: address final reviewer feedback on pre-commit streaming validator
- Move runValidators() inside the try/finally so writeStatusRDD.unpersist()
always runs, including on validator exceptions (FAIL policy or
HoodieIOException from loadPreviousCommitMetadata).
- Use ReflectionUtils.loadClass in SparkValidatorUtils for instantiation,
matching SparkStreamerValidatorUtils and the rest of the codebase.
- Rename weOwnCache to shouldUnpersist to read in the direction of its
actual use (gating unpersist in finally).
* fix: only cache writeStatusRDD when pre-commit validators are configured
Address danny0405's review comment on PR #18405: skip the
.cache()/.unpersist()
cycle when no pre-commit validators are configured, since without
validators the
RDD is consumed exactly once by writeClient.commit() and caching adds no
value.
Guards both the cache call and the validator collect+run on a single
validatorsConfigured boolean derived from hoodie.precommit.validators.
* address codope review: V2-then-V1 checkpoint key resolution + V2 test
coverage
Comment 1 (SparkKafkaOffsetValidator hardcoded V1 key):
- StreamingOffsetValidator base class now exposes a no-key constructor that
auto-resolves the checkpoint via CheckpointUtils.getCheckpoint(metadata),
which prefers V2 and falls back to V1. The explicit-key constructor stays
for subclasses that read a custom non-streamer key (e.g. Flink's
HOODIE_METADATA_KEY).
- SparkKafkaOffsetValidator switches to the no-key constructor.
Comment 2 (tests only cover V1 path):
- testSecondCommitMatchingOffsetsPasses and testSecondCommitDataLossDetected
are now parameterized over both V1 and V2 checkpoint keys.
- Added testV2CheckpointKeyOnTableVersionEightFires on a
HoodieTableVersion.EIGHT
table with V2 keys, asserting the validator fires on data loss.
---------
Co-authored-by: Xinli Shang <[email protected]>
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
.../client/validator/StreamingOffsetValidator.java | 93 +++++-
.../config/HoodiePreCommitValidatorConfig.java | 8 +-
.../validator/TestStreamingOffsetValidator.java | 2 +-
.../hudi/client/utils/SparkValidatorUtils.java | 25 +-
.../hudi/client/validator/ValidationContext.java | 14 +
.../apache/hudi/utilities/streamer/StreamSync.java | 37 ++-
.../validator/SparkKafkaOffsetValidator.java | 60 ++++
.../validator/SparkStreamerValidatorUtils.java | 194 +++++++++++++
.../streamer/validator/SparkValidationContext.java | 139 +++++++++
.../validator/TestSparkKafkaOffsetValidator.java | 322 +++++++++++++++++++++
.../validator/TestSparkStreamerValidatorUtils.java | 290 +++++++++++++++++++
.../validator/TestSparkValidationContext.java | 156 ++++++++++
12 files changed, 1317 insertions(+), 23 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/StreamingOffsetValidator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/StreamingOffsetValidator.java
index ce577d84ca01..40e7a4f1635f 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/StreamingOffsetValidator.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/StreamingOffsetValidator.java
@@ -20,11 +20,13 @@
package org.apache.hudi.client.validator;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.util.CheckpointUtils;
import org.apache.hudi.common.util.CheckpointUtils.CheckpointFormat;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
import
org.apache.hudi.config.HoodiePreCommitValidatorConfig.ValidationFailurePolicy;
+import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieValidationException;
import lombok.extern.slf4j.Slf4j;
@@ -50,7 +52,11 @@ import lombok.extern.slf4j.Slf4j;
*
* Subclasses specify:
* - Checkpoint format (SPARK_KAFKA, FLINK_KAFKA, etc.)
- * - Checkpoint metadata key
+ * - Checkpoint metadata key (optional — when omitted, the validator
auto-resolves the
+ * active streamer key from commit metadata using
+ * {@link
org.apache.hudi.common.table.checkpoint.CheckpointUtils#getCheckpoint(HoodieCommitMetadata)},
+ * which prefers V2 and falls back to V1. Subclasses that read a custom
non-streamer key
+ * (e.g. Flink's HOODIE_METADATA_KEY) must pass it explicitly.)
* - Source-specific parsing logic (if needed)
*
* Configuration:
@@ -66,7 +72,26 @@ public abstract class StreamingOffsetValidator extends
BasePreCommitValidator {
protected final CheckpointFormat checkpointFormat;
/**
- * Create a streaming offset validator.
+ * Create a streaming offset validator that auto-resolves the checkpoint key
from commit
+ * metadata using {@link
org.apache.hudi.common.table.checkpoint.CheckpointUtils#getCheckpoint(HoodieCommitMetadata)}.
+ *
+ * <p>Use this constructor for streamer pipelines (V1 or V2 checkpoint
keys). The validator
+ * will prefer V2 (table version 8+) and fall back to V1 transparently, so
subclasses don't
+ * need to know which key the writer used.</p>
+ *
+ * @param config Validator configuration
+ * @param checkpointFormat Format of the checkpoint string
+ */
+ protected StreamingOffsetValidator(TypedProperties config,
+ CheckpointFormat checkpointFormat) {
+ this(config, null, checkpointFormat);
+ }
+
+ /**
+ * Create a streaming offset validator with an explicit checkpoint metadata
key.
+ *
+ * <p>Use this constructor when the writer stores its checkpoint under a
custom key that
+ * is not the standard streamer V1/V2 key (e.g. Flink's
HOODIE_METADATA_KEY).</p>
*
* @param config Validator configuration
* @param checkpointKey Key to extract checkpoint from extraMetadata
@@ -95,10 +120,12 @@ public abstract class StreamingOffsetValidator extends
BasePreCommitValidator {
return;
}
- // Extract current checkpoint
- Option<String> currentCheckpointOpt =
context.getExtraMetadata(checkpointKey);
+ // Extract current checkpoint — either from the explicit key (custom
writers like Flink) or
+ // by auto-resolving from commit metadata (streamer pipelines, V2-then-V1
fallback).
+ Option<String> currentCheckpointOpt =
resolveCheckpoint(context.getCommitMetadata());
if (!currentCheckpointOpt.isPresent()) {
- log.warn("Current checkpoint not found with key: {}. Skipping
validation.", checkpointKey);
+ log.warn("Current checkpoint not found (key: {}). Skipping validation.",
+ checkpointKey == null ? "<auto-resolved>" : checkpointKey);
return;
}
String currentCheckpoint = currentCheckpointOpt.get();
@@ -110,8 +137,7 @@ public abstract class StreamingOffsetValidator extends
BasePreCommitValidator {
}
// Extract previous checkpoint
- Option<String> previousCheckpointOpt = context.getPreviousCommitMetadata()
- .flatMap(metadata ->
Option.ofNullable(metadata.getMetadata(checkpointKey)));
+ Option<String> previousCheckpointOpt =
resolveCheckpoint(context.getPreviousCommitMetadata());
if (!previousCheckpointOpt.isPresent()) {
log.info("Previous checkpoint not found. May be first streaming commit.
Skipping validation.");
@@ -139,6 +165,10 @@ public abstract class StreamingOffsetValidator extends
BasePreCommitValidator {
long recordsWritten = context.getTotalInsertRecordsWritten()
+ context.getTotalUpdateRecordsWritten();
+ // Track write errors so callers can distinguish write-failure deviation
(write errors > 0)
+ // from silent data loss (write errors == 0) when the validator fires.
+ long writeErrors = context.getTotalWriteErrors();
+
// For empty commits (e.g., no new data from source), both offsetDiff and
recordsWritten
// can be zero. This is a valid scenario — skip validation to avoid false
positives.
if (offsetDifference == 0 && recordsWritten == 0) {
@@ -147,7 +177,7 @@ public abstract class StreamingOffsetValidator extends
BasePreCommitValidator {
}
// Validate offset vs record consistency
- validateOffsetConsistency(offsetDifference, recordsWritten,
+ validateOffsetConsistency(offsetDifference, recordsWritten, writeErrors,
currentCheckpoint, previousCheckpoint);
}
@@ -155,12 +185,13 @@ public abstract class StreamingOffsetValidator extends
BasePreCommitValidator {
* Validate that offset difference matches record count within tolerance.
*
* @param offsetDiff Expected records based on offset difference
- * @param recordsWritten Actual records written
+ * @param recordsWritten Actual records written (inserts + updates)
+ * @param writeErrors Records that failed to write (tracked in write status
errors)
* @param currentCheckpoint Current checkpoint string (for error messages)
* @param previousCheckpoint Previous checkpoint string (for error messages)
* @throws HoodieValidationException if validation fails and policy is FAIL
*/
- protected void validateOffsetConsistency(long offsetDiff, long
recordsWritten,
+ protected void validateOffsetConsistency(long offsetDiff, long
recordsWritten, long writeErrors,
String currentCheckpoint, String
previousCheckpoint)
throws HoodieValidationException {
@@ -169,10 +200,13 @@ public abstract class StreamingOffsetValidator extends
BasePreCommitValidator {
if (deviation > tolerancePercentage) {
String errorMsg = String.format(
"Streaming offset validation failed. "
- + "Offset difference: %d, Records written: %d, Deviation:
%.2f%%, Tolerance: %.2f%%. "
- + "This may indicate data loss or filtering. "
+ + "Offset difference: %d, Records written: %d, Write errors: %d,
Deviation: %.2f%%, Tolerance: %.2f%%. "
+ + "%s"
+ "Previous checkpoint: %s, Current checkpoint: %s",
- offsetDiff, recordsWritten, deviation, tolerancePercentage,
+ offsetDiff, recordsWritten, writeErrors, deviation,
tolerancePercentage,
+ writeErrors > 0
+ ? "Non-zero write errors suggest records failed to write rather
than silent data loss. "
+ : "This may indicate data loss or filtering. ",
previousCheckpoint, currentCheckpoint);
if (failurePolicy == ValidationFailurePolicy.WARN_LOG) {
@@ -181,8 +215,8 @@ public abstract class StreamingOffsetValidator extends
BasePreCommitValidator {
throw new HoodieValidationException(errorMsg);
}
} else {
- log.info("Offset validation passed. Offset diff: {}, Records: {},
Deviation: {}% (within {}%)",
- offsetDiff, recordsWritten, String.format("%.2f", deviation),
tolerancePercentage);
+ log.info("Offset validation passed. Offset diff: {}, Records: {}, Write
errors: {}, Deviation: {}% (within {}%)",
+ offsetDiff, recordsWritten, writeErrors, String.format("%.2f",
deviation), tolerancePercentage);
}
}
@@ -210,4 +244,33 @@ public abstract class StreamingOffsetValidator extends
BasePreCommitValidator {
long difference = Math.abs(offsetDiff - recordsWritten);
return (100.0 * difference) / offsetDiff;
}
+
+ /**
+ * Resolve the checkpoint string from commit metadata.
+ *
+ * <p>When the validator was constructed with an explicit {@code
checkpointKey}, that key
+ * is read directly. Otherwise, {@link
org.apache.hudi.common.table.checkpoint.CheckpointUtils#getCheckpoint(HoodieCommitMetadata)}
+ * is used to locate the active streamer checkpoint (V2 first, V1 fallback),
so callers
+ * don't need to know which key the writer used.</p>
+ *
+ * @param commitMetadataOpt Optional commit metadata containing extraMetadata
+ * @return Optional checkpoint string (empty if metadata is absent or no
checkpoint key matches)
+ */
+ private Option<String> resolveCheckpoint(Option<HoodieCommitMetadata>
commitMetadataOpt) {
+ if (!commitMetadataOpt.isPresent()) {
+ return Option.empty();
+ }
+ HoodieCommitMetadata metadata = commitMetadataOpt.get();
+ if (checkpointKey != null) {
+ return Option.ofNullable(metadata.getMetadata(checkpointKey));
+ }
+ try {
+ return Option.ofNullable(
+
org.apache.hudi.common.table.checkpoint.CheckpointUtils.getCheckpoint(metadata)
+ .getCheckpointKey());
+ } catch (HoodieException e) {
+ // No V1 or V2 streamer checkpoint key present in extraMetadata.
+ return Option.empty();
+ }
+ }
}
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java
index f85cc44120d4..169494b7244a 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java
@@ -43,7 +43,10 @@ public class HoodiePreCommitValidatorConfig extends
HoodieConfig {
.key("hoodie.precommit.validators")
.defaultValue("")
.markAdvanced()
- .withDocumentation("Comma separated list of class names that can be
invoked to validate commit");
+ .withDocumentation("Comma separated list of class names that can be
invoked to validate commit. "
+ + "Available streaming offset validators: "
+ + "org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator (Flink
Kafka), "
+ +
"org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator
(Spark/HoodieStreamer Kafka)");
public static final String VALIDATOR_TABLE_VARIABLE = "<TABLE_NAME>";
public static final ConfigProperty<String> EQUALITY_SQL_QUERIES =
ConfigProperty
@@ -71,7 +74,8 @@ public class HoodiePreCommitValidatorConfig extends
HoodieConfig {
.markAdvanced()
.withDocumentation("Tolerance percentage for streaming offset validation
"
+ "(used by
org.apache.hudi.client.validator.StreamingOffsetValidator "
- + "and org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator). "
+ + "and org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator "
+ + "and
org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator). "
+ "The validator compares the offset difference (expected records
from source) "
+ "with actual records written. If the deviation exceeds this
percentage, "
+ "the commit is rejected or warned depending on the validation
failure policy. "
diff --git
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/validator/TestStreamingOffsetValidator.java
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/validator/TestStreamingOffsetValidator.java
index 818bc2087a3c..bc402e5ee5a0 100644
---
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/validator/TestStreamingOffsetValidator.java
+++
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/validator/TestStreamingOffsetValidator.java
@@ -62,7 +62,7 @@ public class TestStreamingOffsetValidator {
// Expose protected method for testing
public void testValidateOffsetConsistency(long offsetDiff, long
recordsWritten,
String current, String
previous) {
- validateOffsetConsistency(offsetDiff, recordsWritten, current, previous);
+ validateOffsetConsistency(offsetDiff, recordsWritten, 0L, current,
previous);
}
// Expose validateWithMetadata for testing
diff --git
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
index c7804631d3f6..40ee7d8cefff 100644
---
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
+++
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/utils/SparkValidatorUtils.java
@@ -84,9 +84,28 @@ public class SparkValidatorUtils {
Dataset<Row> beforeState = getRecordsFromCommittedFiles(sqlContext,
partitionsModified, table, afterState.schema());
Stream<SparkPreCommitValidator> validators =
Arrays.stream(config.getPreCommitValidators().split(","))
- .map(validatorClass -> ((SparkPreCommitValidator)
ReflectionUtils.loadClass(validatorClass,
- new Class<?>[] {HoodieSparkTable.class,
HoodieEngineContext.class, HoodieWriteConfig.class},
- table, context, config)));
+ .map(String::trim)
+ .filter(s -> !s.isEmpty())
+ .flatMap(validatorClass -> {
+ try {
+ Class<?> clazz = Class.forName(validatorClass);
+ if (!SparkPreCommitValidator.class.isAssignableFrom(clazz)) {
+ LOG.warn("Skipping validator {} — it does not implement
SparkPreCommitValidator. "
+ + "If this is a streaming offset validator (e.g.
SparkKafkaOffsetValidator), "
+ + "it will be invoked by SparkStreamerValidatorUtils
instead.", validatorClass);
+ return Stream.empty();
+ }
+ SparkPreCommitValidator validator = (SparkPreCommitValidator)
ReflectionUtils.loadClass(
+ validatorClass,
+ new Class<?>[] {HoodieSparkTable.class,
HoodieEngineContext.class, HoodieWriteConfig.class},
+ table, context, config);
+ return Stream.of(validator);
+ } catch (ClassNotFoundException e) {
+ throw new HoodieValidationException("Cannot find validator
class: " + validatorClass, e);
+ } catch (ReflectiveOperationException e) {
+ throw new HoodieValidationException("Failed to instantiate
validator: " + validatorClass, e);
+ }
+ });
boolean allSuccess = validators.map(v -> runValidatorAsync(v,
writeMetadata, beforeState, afterState,
instantTime)).map(CompletableFuture::join)
.reduce(true, Boolean::logicalAnd);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/client/validator/ValidationContext.java
b/hudi-common/src/main/java/org/apache/hudi/client/validator/ValidationContext.java
index 30fdbb3ba3b4..b85218e587e8 100644
---
a/hudi-common/src/main/java/org/apache/hudi/client/validator/ValidationContext.java
+++
b/hudi-common/src/main/java/org/apache/hudi/client/validator/ValidationContext.java
@@ -169,6 +169,20 @@ public interface ValidationContext {
.orElse(0L);
}
+ /**
+ * Calculate total write errors in the current commit.
+ * Records that failed to write are tracked in {@link
org.apache.hudi.common.model.HoodieWriteStat#getTotalWriteErrors()}.
+ * A non-zero error count alongside a deviation in offset validation
indicates write failures
+ * rather than silent data loss — useful context for distinguishing the two
failure modes.
+ *
+ * @return Total count of records that failed to write
+ */
+ default long getTotalWriteErrors() {
+ return getWriteStats()
+ .map(stats ->
stats.stream().mapToLong(HoodieWriteStat::getTotalWriteErrors).sum())
+ .orElse(0L);
+ }
+
/**
* Check if this is the first commit (no previous commits exist).
* Derived from {@link #getPreviousCommitInstant()}.
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
index 600891c85dff..2b1406c7deab 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java
@@ -74,6 +74,7 @@ import org.apache.hudi.config.HoodieCompactionConfig;
import org.apache.hudi.config.HoodieErrorTableConfig;
import org.apache.hudi.config.HoodieIndexConfig;
import org.apache.hudi.config.HoodiePayloadConfig;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.config.metrics.HoodieMetricsConfig;
import org.apache.hudi.data.HoodieJavaRDD;
@@ -115,6 +116,7 @@ import
org.apache.hudi.utilities.schema.SimpleSchemaProvider;
import org.apache.hudi.utilities.sources.InputBatch;
import org.apache.hudi.utilities.sources.Source;
import org.apache.hudi.utilities.streamer.HoodieStreamer.Config;
+import
org.apache.hudi.utilities.streamer.validator.SparkStreamerValidatorUtils;
import org.apache.hudi.utilities.transform.Transformer;
import com.codahale.metrics.Timer;
@@ -128,6 +130,7 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.StructType;
+import org.apache.spark.storage.StorageLevel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -874,8 +877,38 @@ public class StreamSync implements Serializable, Closeable
{
totalSuccessfulRecords);
String commitActionType = CommitUtils.getCommitActionType(cfg.operation,
HoodieTableType.valueOf(cfg.tableType));
- boolean success = writeClient.commit(instantTime, writeStatusRDD,
Option.of(checkpointCommitMetadata), commitActionType,
partitionToReplacedFileIds, Option.empty(),
- Option.of(writeStatusValidator));
+ // Cache the RDD only when pre-commit validators are configured.
Validators collect the RDD
+ // before commit, so without caching the same DAG would re-evaluate
inside writeClient.commit().
+ // When no validators are configured, commit consumes the RDD once and
caching adds no value.
+ // shouldUnpersist is true only when we created the cache here
(validators present and storage
+ // level was NONE), so the finally block knows to release it.
+ boolean validatorsConfigured =
!StringUtils.isNullOrEmpty(props.getString(
+ HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
+
HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue()));
+ boolean shouldUnpersist = validatorsConfigured &&
writeStatusRDD.getStorageLevel().equals(StorageLevel.NONE());
+ if (shouldUnpersist) {
+ writeStatusRDD.cache();
+ }
+ boolean success;
+ try {
+ if (validatorsConfigured) {
+ List<WriteStatus> writeStatuses = writeStatusRDD.collect();
+
+ // Run pre-commit streaming offset validators (if configured).
+ // Placement before writeClient.commit() is intentional: offset
validation is a stronger
+ // guard than commitOnErrors — if offset deviation indicates
potential data loss, the commit
+ // must be prevented regardless of the commitOnErrors policy.
+ SparkStreamerValidatorUtils.runValidators(props, instantTime,
writeStatuses,
+ checkpointCommitMetadata, metaClient);
+ }
+
+ success = writeClient.commit(instantTime, writeStatusRDD,
Option.of(checkpointCommitMetadata), commitActionType,
partitionToReplacedFileIds, Option.empty(),
+ Option.of(writeStatusValidator));
+ } finally {
+ if (shouldUnpersist) {
+ writeStatusRDD.unpersist();
+ }
+ }
releaseResourcesInvoked = true;
if (success) {
LOG.info("Commit " + instantTime + " successful!");
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkKafkaOffsetValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkKafkaOffsetValidator.java
new file mode 100644
index 000000000000..589e09e96de8
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkKafkaOffsetValidator.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.validator.StreamingOffsetValidator;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.CheckpointUtils.CheckpointFormat;
+
+/**
+ * Spark/HoodieStreamer-specific Kafka offset validator.
+ *
+ * <p>Validates that the number of records written matches the Kafka offset
difference
+ * between the current and previous HoodieStreamer checkpoints. The active
checkpoint key
+ * (V1 {@code deltastreamer.checkpoint.key} or V2 {@code
streamer.checkpoint.key.v2}) is
+ * resolved at validation time via {@link
org.apache.hudi.common.table.checkpoint.CheckpointUtils#getCheckpoint},
+ * so this validator works against tables written with either checkpoint key
version.</p>
+ *
+ * <p>Configuration:
+ * <ul>
+ * <li>{@code hoodie.precommit.validators}: Include
+ * {@code
org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator}</li>
+ * <li>{@code
hoodie.precommit.validators.streaming.offset.tolerance.percentage}:
+ * Acceptable deviation (default: 0.0 = strict)</li>
+ * <li>{@code hoodie.precommit.validators.failure.policy}:
+ * FAIL (default) or WARN_LOG</li>
+ * </ul></p>
+ *
+ * <p>This validator is primarily intended for append-only ingestion from
Kafka via HoodieStreamer.
+ * For upsert workloads with deduplication, configure a higher tolerance or
use WARN_LOG.</p>
+ *
+ * <p><b>Important:</b> This class extends {@link
org.apache.hudi.client.validator.BasePreCommitValidator}
+ * and is invoked by {@link SparkStreamerValidatorUtils}, NOT by {@code
SparkValidatorUtils}
+ * (which expects {@code SparkPreCommitValidator} with a different constructor
signature).
+ * Listing this class in {@code hoodie.precommit.validators} while also using
the standard
+ * Spark table write-path validators will cause an instantiation failure in
{@code SparkValidatorUtils}.
+ * Use this validator exclusively with HoodieStreamer pipelines.</p>
+ */
+public class SparkKafkaOffsetValidator extends StreamingOffsetValidator {
+
+ public SparkKafkaOffsetValidator(TypedProperties config) {
+ super(config, CheckpointFormat.SPARK_KAFKA);
+ }
+}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java
new file mode 100644
index 000000000000..474215ec0f3c
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.validator.BasePreCommitValidator;
+import org.apache.hudi.client.validator.ValidationContext;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for running pre-commit validators in the HoodieStreamer commit flow.
+ *
+ * <p>Instantiates and executes validators configured via
+ * {@code hoodie.precommit.validators}. Each validator must extend
+ * {@link BasePreCommitValidator} and have a constructor that accepts
+ * {@link TypedProperties}.</p>
+ *
+ * <p>Called from {@code StreamSync.writeToSinkAndDoMetaSync()} before
+ * the commit is finalized.</p>
+ *
+ * <p><b>Note on validator compatibility:</b> This utility uses a different
instantiation
+ * mechanism than {@code SparkValidatorUtils} (used by the Spark table write
path).
+ * {@code SparkValidatorUtils} expects validators implementing {@code
SparkPreCommitValidator}
+ * with a {@code (HoodieSparkTable, HoodieEngineContext, HoodieWriteConfig)}
constructor.
+ * Validators registered here (e.g. {@link SparkKafkaOffsetValidator}) extend
+ * {@link BasePreCommitValidator} with a {@code (TypedProperties)} constructor
and
+ * are NOT compatible with {@code SparkValidatorUtils}. Do not mix them under
the same
+ * {@code hoodie.precommit.validators} config if both paths are active.</p>
+ */
+public class SparkStreamerValidatorUtils {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(SparkStreamerValidatorUtils.class);
+
+ /**
+ * Run all configured pre-commit validators.
+ *
+ * <p>The caller is responsible for caching and unpersisting the source RDD
if needed.
+ * This method accepts pre-collected write statuses to avoid a second DAG
evaluation —
+ * the caller should cache the RDD, collect to this list, call this method,
then pass
+ * the same RDD to {@code writeClient.commit()}, and unpersist after commit
completes.</p>
+ *
+ * @param props Configuration properties containing validator class names
+ * @param instantTime Commit instant time
+ * @param writeStatuses Pre-collected write statuses from Spark write
operations
+ * @param checkpointCommitMetadata Extra metadata being committed (contains
checkpoint info)
+ * @param metaClient Table meta client for timeline access and previous
commit lookup
+ * @throws HoodieValidationException if any validator fails with FAIL policy
+ */
+ public static void runValidators(TypedProperties props,
+ String instantTime,
+ List<WriteStatus> writeStatuses,
+ Map<String, String>
checkpointCommitMetadata,
+ HoodieTableMetaClient metaClient) {
+ String validatorClassNames = props.getString(
+ HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
+ HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue());
+
+ if (StringUtils.isNullOrEmpty(validatorClassNames)) {
+ return;
+ }
+
+ HoodieCommitMetadata currentMetadata = buildCommitMetadata(writeStatuses,
checkpointCommitMetadata);
+ List<HoodieWriteStat> writeStats = writeStatuses.stream()
+ .map(WriteStatus::getStat)
+ .filter(Objects::nonNull)
+ .collect(Collectors.toList());
+
+ Option<HoodieCommitMetadata> previousCommitMetadata =
loadPreviousCommitMetadata(metaClient);
+
+ ValidationContext context = new SparkValidationContext(
+ instantTime,
+ Option.of(currentMetadata),
+ Option.of(writeStats),
+ previousCommitMetadata,
+ metaClient);
+
+ List<String> classNames = Arrays.stream(validatorClassNames.split(","))
+ .map(String::trim)
+ .filter(s -> !s.isEmpty())
+ .collect(Collectors.toList());
+
+ for (String className : classNames) {
+ try {
+ Class<?> clazz = Class.forName(className);
+ if (!BasePreCommitValidator.class.isAssignableFrom(clazz)) {
+ LOG.warn("Skipping validator {} in HoodieStreamer path — it does not
extend BasePreCommitValidator. "
+ + "If this is a SparkPreCommitValidator (e.g.
SqlQueryEqualityPreCommitValidator), "
+ + "it must be invoked via SparkValidatorUtils in the standard
Spark write path instead.", className);
+ continue;
+ }
+ BasePreCommitValidator validator = (BasePreCommitValidator)
+ ReflectionUtils.loadClass(className, new Class<?>[]
{TypedProperties.class}, props);
+ LOG.info("Running pre-commit validator: {} for instant: {}",
className, instantTime);
+ validator.validateWithMetadata(context);
+ LOG.info("Pre-commit validator {} passed for instant: {}", className,
instantTime);
+ } catch (HoodieValidationException e) {
+ LOG.error("Pre-commit validator {} failed for instant: {}", className,
instantTime, e);
+ throw e;
+ } catch (Exception e) {
+ LOG.error("Failed to instantiate or run validator: {}", className, e);
+ throw new HoodieValidationException(
+ "Failed to run pre-commit validator: " + className, e);
+ }
+ }
+ }
+
+ /**
+ * Build a pre-commit snapshot of {@link HoodieCommitMetadata} from write
statuses and extra metadata.
+ *
+ * <p>This is intentionally a partial/preview object used only for
validation — it contains
+ * write stats and checkpoint extra-metadata, but omits fields that are not
available before the
+ * commit (e.g. schema, operation type). Validators should treat this as a
read-only snapshot
+ * of what will be committed, not a fully-constructed commit record.</p>
+ */
+ private static HoodieCommitMetadata buildCommitMetadata(
+ List<WriteStatus> writeStatuses, Map<String, String> extraMetadata) {
+ HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+
+ // Add write stats
+ for (WriteStatus status : writeStatuses) {
+ HoodieWriteStat stat = status.getStat();
+ if (stat != null) {
+ metadata.addWriteStat(stat.getPartitionPath(), stat);
+ }
+ }
+
+ // Add extra metadata (includes checkpoint info like
deltastreamer.checkpoint.key)
+ if (extraMetadata != null) {
+ extraMetadata.forEach(metadata::addMetadata);
+ }
+
+ return metadata;
+ }
+
+ /**
+ * Load the previous completed commit metadata from the timeline.
+ */
+ private static Option<HoodieCommitMetadata>
loadPreviousCommitMetadata(HoodieTableMetaClient metaClient) {
+ try {
+ HoodieTimeline completedTimeline = metaClient.reloadActiveTimeline()
+ .getWriteTimeline()
+ .filterCompletedInstants();
+ Option<HoodieInstant> lastInstant = completedTimeline.lastInstant();
+ if (lastInstant.isPresent()) {
+ return
Option.of(completedTimeline.readCommitMetadata(lastInstant.get()));
+ }
+ } catch (IOException e) {
+ throw new HoodieIOException("Failed to load previous commit metadata",
e);
+ }
+ return Option.empty();
+ }
+
+ private SparkStreamerValidatorUtils() {
+ // Utility class
+ }
+}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkValidationContext.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkValidationContext.java
new file mode 100644
index 000000000000..1d46e13aeedb
--- /dev/null
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkValidationContext.java
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.validator.ValidationContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+
+import java.util.List;
+
+/**
+ * Spark/HoodieStreamer implementation of {@link ValidationContext}.
+ *
+ * <p>Constructed from data available in {@code
StreamSync.writeToSinkAndDoMetaSync()}
+ * before the commit is finalized. Provides validators with access to commit
metadata,
+ * write statistics, and previous commit information for streaming offset
validation.</p>
+ *
+ * <p>Unlike Flink's implementation, Spark can optionally provide active
timeline access
+ * via {@link HoodieTableMetaClient} for richer validation patterns.</p>
+ */
+public class SparkValidationContext implements ValidationContext {
+
+ private final String instantTime;
+ private final Option<HoodieCommitMetadata> commitMetadata;
+ private final Option<List<HoodieWriteStat>> writeStats;
+ private final Option<HoodieCommitMetadata> previousCommitMetadata;
+ private final HoodieTableMetaClient metaClient;
+
+ /**
+ * Create a Spark validation context with full timeline access.
+ *
+ * @param instantTime Current commit instant time
+ * @param commitMetadata Current commit metadata (with extraMetadata
including checkpoints)
+ * @param writeStats Write statistics from write operations
+ * @param previousCommitMetadata Metadata from the previous completed commit
+ * @param metaClient Table meta client for timeline access (may be null for
testing)
+ */
+ public SparkValidationContext(String instantTime,
+ Option<HoodieCommitMetadata> commitMetadata,
+ Option<List<HoodieWriteStat>> writeStats,
+ Option<HoodieCommitMetadata>
previousCommitMetadata,
+ HoodieTableMetaClient metaClient) {
+ this.instantTime = instantTime;
+ this.commitMetadata = commitMetadata;
+ this.writeStats = writeStats;
+ this.previousCommitMetadata = previousCommitMetadata;
+ this.metaClient = metaClient;
+ }
+
+ /**
+ * Create a Spark validation context without timeline access (for testing).
+ *
+ * @param instantTime Current commit instant time
+ * @param commitMetadata Current commit metadata (with extraMetadata
including checkpoints)
+ * @param writeStats Write statistics from write operations
+ * @param previousCommitMetadata Metadata from the previous completed commit
+ */
+ public SparkValidationContext(String instantTime,
+ Option<HoodieCommitMetadata> commitMetadata,
+ Option<List<HoodieWriteStat>> writeStats,
+ Option<HoodieCommitMetadata>
previousCommitMetadata) {
+ this(instantTime, commitMetadata, writeStats, previousCommitMetadata,
null);
+ }
+
+ @Override
+ public String getInstantTime() {
+ return instantTime;
+ }
+
+ @Override
+ public Option<HoodieCommitMetadata> getCommitMetadata() {
+ return commitMetadata;
+ }
+
+ @Override
+ public Option<List<HoodieWriteStat>> getWriteStats() {
+ return writeStats;
+ }
+
+ /**
+ * Get the active timeline. Available when metaClient is provided.
+ *
+ * @throws UnsupportedOperationException if metaClient was not provided
+ */
+ @Override
+ public HoodieActiveTimeline getActiveTimeline() {
+ if (metaClient == null) {
+ throw new UnsupportedOperationException(
+ "Active timeline is not available without HoodieTableMetaClient.");
+ }
+ return metaClient.getActiveTimeline();
+ }
+
+ /**
+ * Get the previous completed commit instant by querying the timeline.
+ * Returns {@link Option#empty()} if this is the first commit or metaClient
is unavailable.
+ */
+ @Override
+ public Option<HoodieInstant> getPreviousCommitInstant() {
+ if (metaClient == null) {
+ return Option.empty();
+ }
+ return metaClient.getActiveTimeline()
+ .getWriteTimeline()
+ .filterCompletedInstants()
+ .lastInstant();
+ }
+
+ @Override
+ public boolean isFirstCommit() {
+ return !previousCommitMetadata.isPresent();
+ }
+
+ @Override
+ public Option<HoodieCommitMetadata> getPreviousCommitMetadata() {
+ return previousCommitMetadata;
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkKafkaOffsetValidator.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkKafkaOffsetValidator.java
new file mode 100644
index 000000000000..d109aa3246f6
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkKafkaOffsetValidator.java
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Tests for {@link SparkKafkaOffsetValidator}.
+ */
+public class TestSparkKafkaOffsetValidator {
+
+ // ========== Helper methods ==========
+
+ private static TypedProperties defaultConfig() {
+ TypedProperties props = new TypedProperties();
+
props.setProperty(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
"0.0");
+
props.setProperty(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(),
"FAIL");
+ return props;
+ }
+
+ private static TypedProperties configWithTolerance(double tolerance) {
+ TypedProperties props = defaultConfig();
+
props.setProperty(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
+ String.valueOf(tolerance));
+ return props;
+ }
+
+ private static TypedProperties configWithWarnPolicy() {
+ TypedProperties props = defaultConfig();
+
props.setProperty(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(),
"WARN_LOG");
+ return props;
+ }
+
+ /**
+ * Build a Spark Kafka checkpoint string.
+ * Format: topic,partition:offset,partition:offset,...
+ */
+ private static String buildSparkKafkaCheckpoint(String topic, int[]
partitions, long[] offsets) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(topic);
+ for (int i = 0; i < partitions.length; i++) {
+ sb.append(",").append(partitions[i]).append(":").append(offsets[i]);
+ }
+ return sb.toString();
+ }
+
+ private static HoodieCommitMetadata buildMetadata(String checkpointValue) {
+ HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+ if (checkpointValue != null) {
+ metadata.addMetadata(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1,
checkpointValue);
+ }
+ return metadata;
+ }
+
+ private static List<HoodieWriteStat> buildWriteStats(long numInserts, long
numUpdates) {
+ HoodieWriteStat stat = new HoodieWriteStat();
+ stat.setNumInserts(numInserts);
+ stat.setNumUpdateWrites(numUpdates);
+ stat.setPartitionPath("partition1");
+ return Collections.singletonList(stat);
+ }
+
+ private static SparkValidationContext buildContext(
+ String instantTime,
+ HoodieCommitMetadata currentMetadata,
+ List<HoodieWriteStat> writeStats,
+ HoodieCommitMetadata previousMetadata) {
+ return new SparkValidationContext(
+ instantTime,
+ Option.of(currentMetadata),
+ Option.of(writeStats),
+ previousMetadata != null ? Option.of(previousMetadata) :
Option.empty());
+ }
+
+ // ========== Tests ==========
+
+ @Test
+ public void testExactMatchPasses() {
+ // Previous: partition 0 at offset 100, partition 1 at offset 200
+ // Current: partition 0 at offset 200, partition 1 at offset 300
+ // Diff = (200-100) + (300-200) = 200. Records written = 200.
+ String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0,
1}, new long[]{100, 200});
+ String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0,
1}, new long[]{200, 300});
+
+ SparkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(200, 0),
+ buildMetadata(prevCheckpoint));
+
+ SparkKafkaOffsetValidator validator = new
SparkKafkaOffsetValidator(defaultConfig());
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testDataLossDetected() {
+ // Diff = 1000 but only 500 records written -> 50% deviation
+ String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0},
new long[]{0});
+ String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0},
new long[]{1000});
+
+ SparkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(500, 0),
+ buildMetadata(prevCheckpoint));
+
+ SparkKafkaOffsetValidator validator = new
SparkKafkaOffsetValidator(defaultConfig());
+ assertThrows(HoodieValidationException.class, () ->
validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testWithinTolerancePasses() {
+ // Diff = 1000, records = 950 -> 5% deviation, tolerance = 10%
+ String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0},
new long[]{0});
+ String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0},
new long[]{1000});
+
+ SparkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(950, 0),
+ buildMetadata(prevCheckpoint));
+
+ SparkKafkaOffsetValidator validator = new
SparkKafkaOffsetValidator(configWithTolerance(10.0));
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testWarnPolicyDoesNotThrow() {
+ // Data loss but WARN_LOG policy
+ String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0},
new long[]{0});
+ String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0},
new long[]{1000});
+
+ SparkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(0, 0),
+ buildMetadata(prevCheckpoint));
+
+ SparkKafkaOffsetValidator validator = new
SparkKafkaOffsetValidator(configWithWarnPolicy());
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testSkipsFirstCommit() {
+ String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0},
new long[]{1000});
+
+ // No previous commit
+ SparkValidationContext ctx = new SparkValidationContext(
+ "20260320120000000",
+ Option.of(buildMetadata(currCheckpoint)),
+ Option.of(buildWriteStats(500, 0)),
+ Option.empty());
+
+ SparkKafkaOffsetValidator validator = new
SparkKafkaOffsetValidator(defaultConfig());
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testSkipsWhenNoCheckpointKey() {
+ // Current metadata has no checkpoint key
+ HoodieCommitMetadata currentMeta = new HoodieCommitMetadata();
+ String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0},
new long[]{100});
+
+ SparkValidationContext ctx = buildContext("20260320120000000",
+ currentMeta,
+ buildWriteStats(500, 0),
+ buildMetadata(prevCheckpoint));
+
+ SparkKafkaOffsetValidator validator = new
SparkKafkaOffsetValidator(defaultConfig());
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testMultiPartitionValidation() {
+ // 4 partitions, each advancing by 250 = total diff 1000
+ String prevCheckpoint = buildSparkKafkaCheckpoint("events",
+ new int[]{0, 1, 2, 3}, new long[]{0, 0, 0, 0});
+ String currCheckpoint = buildSparkKafkaCheckpoint("events",
+ new int[]{0, 1, 2, 3}, new long[]{250, 250, 250, 250});
+
+ SparkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(800, 200), // 800 inserts + 200 updates = 1000
+ buildMetadata(prevCheckpoint));
+
+ SparkKafkaOffsetValidator validator = new
SparkKafkaOffsetValidator(defaultConfig());
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testEmptyCommitSkipsValidation() {
+ // Both offsets same and no records written
+ String checkpoint = buildSparkKafkaCheckpoint("events", new int[]{0}, new
long[]{100});
+
+ SparkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(checkpoint),
+ buildWriteStats(0, 0),
+ buildMetadata(checkpoint));
+
+ SparkKafkaOffsetValidator validator = new
SparkKafkaOffsetValidator(defaultConfig());
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testPreviousCheckpointMissingSkipsValidation() {
+ // Previous metadata exists but has no checkpoint key
+ HoodieCommitMetadata prevMeta = new HoodieCommitMetadata();
+
+ String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0},
new long[]{1000});
+
+ SparkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(500, 0),
+ prevMeta);
+
+ SparkKafkaOffsetValidator validator = new
SparkKafkaOffsetValidator(defaultConfig());
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testOvercountingDetected() {
+ // More records written than offset diff
+ // Diff = 100, records = 200 -> |100-200|/100 = 100% deviation
+ String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0},
new long[]{0});
+ String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0},
new long[]{100});
+
+ SparkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(200, 0),
+ buildMetadata(prevCheckpoint));
+
+ SparkKafkaOffsetValidator validator = new
SparkKafkaOffsetValidator(defaultConfig());
+ assertThrows(HoodieValidationException.class, () ->
validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testExactToleranceBoundaryPasses() {
+ // Diff = 1000, records = 900 -> 10% deviation, tolerance = 10%
+ String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0},
new long[]{0});
+ String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0},
new long[]{1000});
+
+ SparkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(900, 0),
+ buildMetadata(prevCheckpoint));
+
+ SparkKafkaOffsetValidator validator = new
SparkKafkaOffsetValidator(configWithTolerance(10.0));
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testJustOverToleranceFails() {
+ // Diff = 1000, records = 899 -> 10.1% deviation, tolerance = 10%
+ String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0},
new long[]{0});
+ String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0},
new long[]{1000});
+
+ SparkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(899, 0),
+ buildMetadata(prevCheckpoint));
+
+ SparkKafkaOffsetValidator validator = new
SparkKafkaOffsetValidator(configWithTolerance(10.0));
+ assertThrows(HoodieValidationException.class, () ->
validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testOnlyInsertsNoUpdates() {
+ // Pure insert workload
+ String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0,
1}, new long[]{0, 0});
+ String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0,
1}, new long[]{500, 500});
+
+ SparkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(1000, 0),
+ buildMetadata(prevCheckpoint));
+
+ SparkKafkaOffsetValidator validator = new
SparkKafkaOffsetValidator(defaultConfig());
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testUpdatesCountedInRecordTotal() {
+ // Diff = 1000. 600 inserts + 400 updates = 1000 total
+ String prevCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0},
new long[]{0});
+ String currCheckpoint = buildSparkKafkaCheckpoint("events", new int[]{0},
new long[]{1000});
+
+ SparkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(600, 400),
+ buildMetadata(prevCheckpoint));
+
+ SparkKafkaOffsetValidator validator = new
SparkKafkaOffsetValidator(defaultConfig());
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkStreamerValidatorUtils.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkStreamerValidatorUtils.java
new file mode 100644
index 000000000000..69d5d228dab6
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkStreamerValidatorUtils.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV2;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.testutils.HoodieTestUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link SparkStreamerValidatorUtils}.
+ *
+ * <p>Tests cover orchestration logic (class loading, config passing, error
handling)
+ * as well as end-to-end offset validation using a two-commit timeline to
verify
+ * the real comparison path is exercised.</p>
+ */
+public class TestSparkStreamerValidatorUtils {
+
+ @TempDir
+ Path tempDir;
+
+ private static TypedProperties propsWithValidator(String validatorClassName)
{
+ TypedProperties props = new TypedProperties();
+
props.setProperty(HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
validatorClassName);
+
props.setProperty(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
"0.0");
+
props.setProperty(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(),
"FAIL");
+ return props;
+ }
+
+ private static WriteStatus buildWriteStatus(String partitionPath, long
numInserts, long numUpdates) {
+ HoodieWriteStat stat = new HoodieWriteStat();
+ stat.setPartitionPath(partitionPath);
+ stat.setNumInserts(numInserts);
+ stat.setNumUpdateWrites(numUpdates);
+
+ WriteStatus ws = new WriteStatus(false, 0.0);
+ ws.setStat(stat);
+ return ws;
+ }
+
+ private HoodieTableMetaClient createMetaClient() throws IOException {
+ return HoodieTestUtils.init(tempDir.toAbsolutePath().toString());
+ }
+
+ private HoodieTableMetaClient createMetaClient(HoodieTableVersion version)
throws IOException {
+ return HoodieTestUtils.init(tempDir.toAbsolutePath().toString(),
HoodieTableType.COPY_ON_WRITE, version);
+ }
+
+ // ========== Tests ==========
+
+ @Test
+ public void testNoValidatorsConfigured() throws IOException {
+ TypedProperties props = new TypedProperties();
+ List<WriteStatus> writeStatuses =
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+ assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators(
+ props, "20260320120000000", writeStatuses,
+ new HashMap<>(), createMetaClient()));
+ }
+
+ @Test
+ public void testEmptyValidatorString() throws IOException {
+ TypedProperties props = new TypedProperties();
+
props.setProperty(HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
"");
+ List<WriteStatus> writeStatuses =
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+ assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators(
+ props, "20260320120000000", writeStatuses,
+ new HashMap<>(), createMetaClient()));
+ }
+
+ @Test
+ public void testValidValidatorFirstCommitPasses() throws IOException {
+ TypedProperties props = propsWithValidator(
+
"org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator");
+
+ List<WriteStatus> writeStatuses =
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+ Map<String, String> extraMeta = new HashMap<>();
+ extraMeta.put(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1,
"events,0:100");
+
+ // First commit (no previous metadata on timeline) — validator should skip
and pass
+ assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators(
+ props, "20260320120000000", writeStatuses, extraMeta,
createMetaClient()));
+ }
+
+ @Test
+ public void testInvalidValidatorClassThrows() throws IOException {
+ TypedProperties props =
propsWithValidator("com.nonexistent.FakeValidator");
+ List<WriteStatus> writeStatuses =
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+ assertThrows(HoodieValidationException.class,
+ () -> SparkStreamerValidatorUtils.runValidators(
+ props, "20260320120000000", writeStatuses, new HashMap<>(),
createMetaClient()));
+ }
+
+ @Test
+ public void testMultipleValidators() throws IOException {
+ TypedProperties props = propsWithValidator(
+
"org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator,"
+ +
"org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator");
+
+ List<WriteStatus> writeStatuses =
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+ Map<String, String> extraMeta = new HashMap<>();
+ extraMeta.put(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1,
"events,0:100");
+
+ assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators(
+ props, "20260320120000000", writeStatuses, extraMeta,
createMetaClient()));
+ }
+
+ @Test
+ public void testValidatorWithWhitespaceInClassNames() throws IOException {
+ TypedProperties props = propsWithValidator(
+ "
org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator , ");
+
+ List<WriteStatus> writeStatuses =
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+ assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators(
+ props, "20260320120000000", writeStatuses, new HashMap<>(),
createMetaClient()));
+ }
+
+ @Test
+ public void testNullExtraMetadataHandled() throws IOException {
+ TypedProperties props = propsWithValidator(
+
"org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator");
+
+ List<WriteStatus> writeStatuses =
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+ assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators(
+ props, "20260320120000000", writeStatuses, null, createMetaClient()));
+ }
+
+ @Test
+ public void testMultipleWriteStatusesAggregated() throws IOException {
+ TypedProperties props = propsWithValidator(
+
"org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator");
+
+ List<WriteStatus> writeStatuses = new ArrayList<>();
+ writeStatuses.add(buildWriteStatus("p1", 60, 0));
+ writeStatuses.add(buildWriteStatus("p2", 40, 0));
+
+ Map<String, String> extraMeta = new HashMap<>();
+ extraMeta.put(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1,
"events,0:100");
+
+ assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators(
+ props, "20260320120000000", writeStatuses, extraMeta,
createMetaClient()));
+ }
+
+ @Test
+ public void testEmptyWriteStatuses() throws IOException {
+ TypedProperties props = propsWithValidator(
+
"org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator");
+
+ List<WriteStatus> writeStatuses = Collections.emptyList();
+ Map<String, String> extraMeta = new HashMap<>();
+ extraMeta.put(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1,
"events,0:100");
+
+ assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators(
+ props, "20260320120000000", writeStatuses, extraMeta,
createMetaClient()));
+ }
+
+ @Test
+ public void testValidationExceptionPreservedAcrossValidators() throws
IOException {
+ TypedProperties props = propsWithValidator(
+
"org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator,"
+ + "com.nonexistent.FakeValidator");
+
+ List<WriteStatus> writeStatuses =
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+ HoodieValidationException ex =
assertThrows(HoodieValidationException.class,
+ () -> SparkStreamerValidatorUtils.runValidators(
+ props, "20260320120000000", writeStatuses, new HashMap<>(),
createMetaClient()));
+ assertTrue(ex.getMessage().contains("FakeValidator"));
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {
+ StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1,
+ StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2
+ })
+ public void testSecondCommitMatchingOffsetsPasses(String checkpointKey)
throws Exception {
+ TypedProperties props = propsWithValidator(
+
"org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator");
+
+ // Create table with a previous committed instant: offset 0 -> 500
+ HoodieTableMetaClient metaClient = createMetaClient();
+ HoodieCommitMetadata prevMeta = new HoodieCommitMetadata();
+ prevMeta.addMetadata(checkpointKey, "events,0:500");
+ HoodieTestTable.of(metaClient).addCommit("20260320110000000",
Option.of(prevMeta));
+
+ // Second commit: offset 500 -> 600, 100 records written — matches diff
exactly
+ Map<String, String> extraMeta = new HashMap<>();
+ extraMeta.put(checkpointKey, "events,0:600");
+ List<WriteStatus> writeStatuses =
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+ assertDoesNotThrow(() -> SparkStreamerValidatorUtils.runValidators(
+ props, "20260320120000000", writeStatuses, extraMeta, metaClient));
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {
+ StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1,
+ StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2
+ })
+ public void testSecondCommitDataLossDetected(String checkpointKey) throws
Exception {
+ TypedProperties props = propsWithValidator(
+
"org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator");
+
+ // Create table with a previous committed instant: offset 0 -> 1000
+ HoodieTableMetaClient metaClient = createMetaClient();
+ HoodieCommitMetadata prevMeta = new HoodieCommitMetadata();
+ prevMeta.addMetadata(checkpointKey, "events,0:1000");
+ HoodieTestTable.of(metaClient).addCommit("20260320110000000",
Option.of(prevMeta));
+
+ // Second commit: offset 1000 -> 2000 (diff=1000) but only 500 records
written — data loss
+ Map<String, String> extraMeta = new HashMap<>();
+ extraMeta.put(checkpointKey, "events,0:2000");
+ List<WriteStatus> writeStatuses =
Collections.singletonList(buildWriteStatus("p1", 500, 0));
+
+ assertThrows(HoodieValidationException.class,
+ () -> SparkStreamerValidatorUtils.runValidators(
+ props, "20260320120000000", writeStatuses, extraMeta, metaClient));
+ }
+
+ @Test
+ public void testV2CheckpointKeyOnTableVersionEightFires() throws Exception {
+ // Verifies the validator actually fires on a writeTableVersion=8 table
that uses the
+ // V2 checkpoint key — i.e. the auto-resolution in
StreamingOffsetValidator picks up V2
+ // and runs the comparison instead of silently skipping.
+ TypedProperties props = propsWithValidator(
+
"org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator");
+
+ HoodieTableMetaClient metaClient =
createMetaClient(HoodieTableVersion.EIGHT);
+ HoodieCommitMetadata prevMeta = new HoodieCommitMetadata();
+ prevMeta.addMetadata(StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2,
"events,0:1000");
+ HoodieTestTable.of(metaClient).addCommit("20260320110000000",
Option.of(prevMeta));
+
+ // Offset diff = 1000 but only 200 records written — must fail
+ Map<String, String> extraMeta = new HashMap<>();
+ extraMeta.put(StreamerCheckpointV2.STREAMER_CHECKPOINT_KEY_V2,
"events,0:2000");
+ List<WriteStatus> writeStatuses =
Collections.singletonList(buildWriteStatus("p1", 200, 0));
+
+ assertThrows(HoodieValidationException.class,
+ () -> SparkStreamerValidatorUtils.runValidators(
+ props, "20260320120000000", writeStatuses, extraMeta, metaClient));
+ }
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkValidationContext.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkValidationContext.java
new file mode 100644
index 000000000000..7f94262e98c4
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkValidationContext.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
+import org.apache.hudi.common.util.Option;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link SparkValidationContext}.
+ */
+public class TestSparkValidationContext {
+
+ private static HoodieWriteStat buildStat(long inserts, long updates) {
+ HoodieWriteStat stat = new HoodieWriteStat();
+ stat.setNumInserts(inserts);
+ stat.setNumUpdateWrites(updates);
+ stat.setPartitionPath("partition1");
+ return stat;
+ }
+
+ @Test
+ public void testBasicProperties() {
+ HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+ metadata.addMetadata("key1", "value1");
+ List<HoodieWriteStat> writeStats =
Collections.singletonList(buildStat(100, 50));
+
+ SparkValidationContext ctx = new SparkValidationContext(
+ "20260320120000000",
+ Option.of(metadata),
+ Option.of(writeStats),
+ Option.empty());
+
+ assertEquals("20260320120000000", ctx.getInstantTime());
+ assertTrue(ctx.getCommitMetadata().isPresent());
+ assertTrue(ctx.getWriteStats().isPresent());
+ assertEquals(1, ctx.getWriteStats().get().size());
+ }
+
+ @Test
+ public void testRecordCounting() {
+ List<HoodieWriteStat> writeStats = Arrays.asList(
+ buildStat(100, 50), // partition1: 100 inserts, 50 updates
+ buildStat(200, 30)); // partition2: 200 inserts, 30 updates
+
+ SparkValidationContext ctx = new SparkValidationContext(
+ "20260320120000000",
+ Option.of(new HoodieCommitMetadata()),
+ Option.of(writeStats),
+ Option.empty());
+
+ assertEquals(300, ctx.getTotalInsertRecordsWritten());
+ assertEquals(80, ctx.getTotalUpdateRecordsWritten());
+ assertEquals(380, ctx.getTotalRecordsWritten());
+ }
+
+ @Test
+ public void testFirstCommitDetection() {
+ // No previous commit metadata -> first commit
+ SparkValidationContext ctx = new SparkValidationContext(
+ "20260320120000000",
+ Option.of(new HoodieCommitMetadata()),
+ Option.of(Collections.emptyList()),
+ Option.empty());
+
+ assertTrue(ctx.isFirstCommit());
+ }
+
+ @Test
+ public void testNotFirstCommitWhenPreviousExists() {
+ HoodieCommitMetadata prevMeta = new HoodieCommitMetadata();
+
+ SparkValidationContext ctx = new SparkValidationContext(
+ "20260320120000000",
+ Option.of(new HoodieCommitMetadata()),
+ Option.of(Collections.emptyList()),
+ Option.of(prevMeta));
+
+ assertFalse(ctx.isFirstCommit());
+ }
+
+ @Test
+ public void testExtraMetadataAccess() {
+ HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+ metadata.addMetadata(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1,
"events,0:1000");
+ metadata.addMetadata("custom.key", "custom_value");
+
+ SparkValidationContext ctx = new SparkValidationContext(
+ "20260320120000000",
+ Option.of(metadata),
+ Option.of(Collections.emptyList()),
+ Option.empty());
+
+ assertEquals("events,0:1000",
+
ctx.getExtraMetadata(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1).get());
+ assertEquals("custom_value", ctx.getExtraMetadata("custom.key").get());
+ assertFalse(ctx.getExtraMetadata("nonexistent.key").isPresent());
+ }
+
+ @Test
+ public void testPreviousCommitMetadataAccess() {
+ HoodieCommitMetadata prevMeta = new HoodieCommitMetadata();
+ prevMeta.addMetadata(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1,
"events,0:500");
+
+ SparkValidationContext ctx = new SparkValidationContext(
+ "20260320120000000",
+ Option.of(new HoodieCommitMetadata()),
+ Option.of(Collections.emptyList()),
+ Option.of(prevMeta));
+
+ assertTrue(ctx.getPreviousCommitMetadata().isPresent());
+ assertEquals("events,0:500",
+
ctx.getPreviousCommitMetadata().get().getMetadata(StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1));
+ }
+
+ @Test
+ public void testEmptyWriteStats() {
+ SparkValidationContext ctx = new SparkValidationContext(
+ "20260320120000000",
+ Option.of(new HoodieCommitMetadata()),
+ Option.empty(),
+ Option.empty());
+
+ assertEquals(0, ctx.getTotalRecordsWritten());
+ assertEquals(0, ctx.getTotalInsertRecordsWritten());
+ assertEquals(0, ctx.getTotalUpdateRecordsWritten());
+ }
+}