This is an automated email from the ASF dual-hosted git repository.
danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 56bc28398a47 feat(flink): add pre-commit validation framework for
Flink - Phase 2 (#18362)
56bc28398a47 is described below
commit 56bc28398a47b9083c883ef9fdc438fd44164823
Author: Xinli Shang <[email protected]>
AuthorDate: Thu Mar 26 19:21:09 2026 -0700
feat(flink): add pre-commit validation framework for Flink - Phase 2
(#18362)
* feat(flink): add pre-commit validation framework for Flink - Phase 2
Wire the pluggable pre-commit validation framework into Flink's
StreamWriteOperatorCoordinator.doCommit() flow. This enables
configurable validators to run before each Flink streaming commit.
Key changes:
- FlinkValidationContext: Flink implementation of ValidationContext
- FlinkKafkaOffsetValidator: validates Kafka offset diff vs records written
- FlinkValidatorUtils: orchestrator that instantiates and runs validators
- CheckpointUtils: add Flink Kafka checkpoint format parsing
- StreamWriteOperatorCoordinator: wire validators into doCommit()
- BasePreCommitValidator: make validateWithMetadata() public for
cross-package invocation
Tests: 52 test cases covering FlinkValidatorUtils, FlinkValidationContext,
FlinkKafkaOffsetValidator, and Flink Kafka checkpoint parsing.
Closes #18067
* fix(test): update TestCheckpointUtils for Phase 2 Flink Kafka
implementation
- Remove FLINK_KAFKA from testUnsupportedFormats since it's now implemented
- Remove FLINK_KAFKA from testIsValidCheckpointFormatUnsupported
- Add testFlinkKafkaCheckpointParsing to verify the new parser
- Add testIsValidCheckpointFormatFlinkKafka for validation
Co-Authored-By: Claude Opus 4.6 <[email protected]>
* fix: address review comments from danny0405
- Use TypedProperties.fromMap() instead of manual iteration
(FlinkValidatorUtils)
- Lazy-evaluate previousCommitMetadata via Supplier to skip fetch when no
validators configured
- Move getPreviousCommitMetadata() from StreamWriteOperatorCoordinator to
StreamerUtil
- Use getWriteTimeline() to filter to write commits only (instead of all
completed instants)
- Update tests for new Supplier-based API
Co-Authored-By: Claude Opus 4.6 <[email protected]>
---------
Co-authored-by: Xinli Shang <[email protected]>
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
.../client/validator/StreamingOffsetValidator.java | 2 +-
.../config/HoodiePreCommitValidatorConfig.java | 3 +-
.../client/validator/BasePreCommitValidator.java | 2 +-
.../apache/hudi/common/util/CheckpointUtils.java | 86 +++++-
.../hudi/common/util/TestCheckpointUtils.java | 28 +-
.../hudi/sink/StreamWriteOperatorCoordinator.java | 6 +
.../sink/validator/FlinkKafkaOffsetValidator.java | 58 ++++
.../sink/validator/FlinkValidationContext.java | 117 +++++++
.../hudi/sink/validator/FlinkValidatorUtils.java | 150 +++++++++
.../java/org/apache/hudi/util/StreamerUtil.java | 24 ++
.../validator/TestFlinkKafkaCheckpointParsing.java | 171 ++++++++++
.../validator/TestFlinkKafkaOffsetValidator.java | 343 +++++++++++++++++++++
.../sink/validator/TestFlinkValidationContext.java | 200 ++++++++++++
.../sink/validator/TestFlinkValidatorUtils.java | 290 +++++++++++++++++
14 files changed, 1463 insertions(+), 17 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/StreamingOffsetValidator.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/StreamingOffsetValidator.java
index 71844f3c1fad..ce577d84ca01 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/StreamingOffsetValidator.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/StreamingOffsetValidator.java
@@ -88,7 +88,7 @@ public abstract class StreamingOffsetValidator extends
BasePreCommitValidator {
}
@Override
- protected void validateWithMetadata(ValidationContext context) throws
HoodieValidationException {
+ public void validateWithMetadata(ValidationContext context) throws
HoodieValidationException {
// Skip validation for first commit (no previous checkpoint)
if (context.isFirstCommit()) {
log.info("Skipping offset validation for first commit");
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java
index b1faa5153d22..f85cc44120d4 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java
@@ -70,7 +70,8 @@ public class HoodiePreCommitValidatorConfig extends
HoodieConfig {
.sinceVersion("1.2.0")
.markAdvanced()
.withDocumentation("Tolerance percentage for streaming offset validation
"
- + "(used by
org.apache.hudi.client.validator.StreamingOffsetValidator). "
+ + "(used by
org.apache.hudi.client.validator.StreamingOffsetValidator "
+ + "and org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator). "
+ "The validator compares the offset difference (expected records
from source) "
+ "with actual records written. If the deviation exceeds this
percentage, "
+ "the commit is rejected or warned depending on the validation
failure policy. "
diff --git
a/hudi-common/src/main/java/org/apache/hudi/client/validator/BasePreCommitValidator.java
b/hudi-common/src/main/java/org/apache/hudi/client/validator/BasePreCommitValidator.java
index b1c718246608..a35412797d90 100644
---
a/hudi-common/src/main/java/org/apache/hudi/client/validator/BasePreCommitValidator.java
+++
b/hudi-common/src/main/java/org/apache/hudi/client/validator/BasePreCommitValidator.java
@@ -72,7 +72,7 @@ public abstract class BasePreCommitValidator {
* @throws HoodieValidationException if validation fails
*/
@PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
- protected void validateWithMetadata(ValidationContext context) throws
HoodieValidationException {
+ public void validateWithMetadata(ValidationContext context) throws
HoodieValidationException {
// Default no-op implementation
// Concrete validators override this to implement validation logic
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/util/CheckpointUtils.java
b/hudi-common/src/main/java/org/apache/hudi/common/util/CheckpointUtils.java
index 5e0fc862b605..e3faabe10db3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CheckpointUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CheckpointUtils.java
@@ -37,10 +37,11 @@ import java.util.Map;
* {@code consumer.endOffsets()} or {@code consumer.offsetsForTimes()}, but
the
* checkpoint string format is identical.
*
- * - FLINK_KAFKA: Base64-encoded serialized Map (TopicPartition → Long)
- * Example: "eyJ0b3BpY..." (base64)
- * Used by: Flink streaming connector
- * Note: Actual implementation requires Flink checkpoint deserialization
(Phase 2)
+ * - FLINK_KAFKA: URL-encoded format with topic and partition offsets
+ * Example: "kafka_metadata%3Aevents%3A0:100;kafka_metadata%3Aevents%3A1:200"
+ * Format: "kafka_metadata%3A{topic}%3A{partition}:{offset}" separated by ";"
+ * Used by: Flink streaming connector (stored in HoodieMetadataKey
extraMetadata)
+ * Note: Cluster metadata entries (kafka_metadata%3Akafka_cluster%3A...) are
skipped
*
* - PULSAR: "partition:ledgerId:entryId,partition:ledgerId:entryId,..."
* Example: "0:123:45,1:234:56"
@@ -66,7 +67,10 @@ public class CheckpointUtils {
/** HoodieStreamer (Spark) Kafka format: "topic,0:1000,1:2000" */
SPARK_KAFKA,
- /** Flink Kafka format: base64-encoded Map<TopicPartition, Long> */
+ /**
+ * Flink Kafka format: URL-encoded
"kafka_metadata%3Atopic%3Apartition:offset" separated by ";".
+ * Cluster metadata entries (containing "kafka_cluster") are skipped
during parsing.
+ */
FLINK_KAFKA,
/** Pulsar format: "0:123:45,1:234:56" (ledgerId:entryId).
Engine-agnostic. */
@@ -92,9 +96,7 @@ public class CheckpointUtils {
case SPARK_KAFKA:
return parseSparkKafkaCheckpoint(checkpointStr);
case FLINK_KAFKA:
- throw new UnsupportedOperationException(
- "Flink Kafka checkpoint parsing not yet implemented. "
- + "This will be added in Phase 2 with Flink checkpoint
deserialization support.");
+ return parseFlinkKafkaCheckpoint(checkpointStr);
case PULSAR:
throw new UnsupportedOperationException(
"Pulsar checkpoint parsing not yet implemented. Planned for Phase
4.");
@@ -254,4 +256,72 @@ public class CheckpointUtils {
return offsetMap;
}
+
+ /**
+ * Parse Flink Kafka checkpoint.
+ * Format: "kafka_metadata%3Atopic%3Apartition:offset" entries separated by
";".
+ * Cluster metadata entries (containing "kafka_cluster") are skipped.
+ *
+ * <p>Example:
"kafka_metadata%3Aevents%3A0:100;kafka_metadata%3Aevents%3A1:200
+ * ;kafka_metadata%3Akafka_cluster%3Aevents%3A:my-cluster"</p>
+ *
+ * <p>The URL-encoded colons (%3A) separate the prefix, topic, and
partition:offset.
+ * The format is produced by {@code StreamerUtil.stringFy()} in
hudi-flink.</p>
+ *
+ * <p>Parsing uses {@code lastIndexOf} to locate the partition and offset
from the end
+ * of each entry, consistent with the production parser in
+ * {@code StreamerUtil.parseKafkaOffsets()}.</p>
+ *
+ * @param checkpointStr Checkpoint string
+ * @return Map of partition → offset (cluster metadata entries excluded)
+ * @throws IllegalArgumentException if format is invalid
+ */
+ private static Map<Integer, Long> parseFlinkKafkaCheckpoint(String
checkpointStr) {
+ if (checkpointStr == null || checkpointStr.trim().isEmpty()) {
+ throw new IllegalArgumentException("Flink Kafka checkpoint string cannot
be null or empty");
+ }
+
+ Map<Integer, Long> offsetMap = new HashMap<>();
+ String[] entries = checkpointStr.split(";");
+
+ for (String entry : entries) {
+ entry = entry.trim();
+ if (entry.isEmpty()) {
+ continue;
+ }
+
+ // Skip cluster metadata entries (e.g.,
kafka_metadata%3Akafka_cluster%3Atopic%3A:cluster)
+ if (!entry.contains(":") || entry.contains("kafka_cluster")) {
+ continue;
+ }
+
+ // Entry format: kafka_metadata%3Atopic%3Apartition:offset
+ // Find the last colon which separates partition:offset
+ int lastColonIndex = entry.lastIndexOf(':');
+ if (lastColonIndex == -1) {
+ continue;
+ }
+
+ String offsetStr = entry.substring(lastColonIndex + 1);
+ String beforeOffset = entry.substring(0, lastColonIndex);
+
+ // Find the partition number (everything after the last %3A)
+ int lastEncodedColonIndex = beforeOffset.lastIndexOf("%3A");
+ if (lastEncodedColonIndex == -1) {
+ continue;
+ }
+
+ String partitionStr = beforeOffset.substring(lastEncodedColonIndex +
"%3A".length());
+
+ try {
+ int partition = Integer.parseInt(partitionStr);
+ long offset = Long.parseLong(offsetStr);
+ offsetMap.put(partition, offset);
+ } catch (NumberFormatException e) {
+ log.warn("Failed to parse partition ID or offset from entry: {}",
entry, e);
+ }
+ }
+
+ return offsetMap;
+ }
}
diff --git
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCheckpointUtils.java
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCheckpointUtils.java
index 4f43926a7a48..3b20b76f632d 100644
---
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCheckpointUtils.java
+++
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCheckpointUtils.java
@@ -199,10 +199,6 @@ public class TestCheckpointUtils {
@Test
public void testUnsupportedFormats() {
- // Flink format not yet implemented (Phase 2)
- assertThrows(UnsupportedOperationException.class, () ->
- CheckpointUtils.parseCheckpoint(CheckpointFormat.FLINK_KAFKA,
"anystring"));
-
// Pulsar format not yet implemented (Phase 4)
assertThrows(UnsupportedOperationException.class, () ->
CheckpointUtils.parseCheckpoint(CheckpointFormat.PULSAR, "anystring"));
@@ -212,6 +208,17 @@ public class TestCheckpointUtils {
CheckpointUtils.parseCheckpoint(CheckpointFormat.KINESIS,
"anystring"));
}
+ @Test
+ public void testFlinkKafkaCheckpointParsing() {
+ // Flink Kafka format is now implemented (Phase 2)
+ Map<Integer, Long> result = CheckpointUtils.parseCheckpoint(
+ CheckpointFormat.FLINK_KAFKA,
+ "kafka_metadata%3Aevents%3A0:100;kafka_metadata%3Aevents%3A1:200");
+ assertEquals(2, result.size());
+ assertEquals(100L, result.get(0));
+ assertEquals(200L, result.get(1));
+ }
+
@Test
public void testCustomFormatThrows() {
assertThrows(IllegalArgumentException.class, () ->
@@ -221,9 +228,18 @@ public class TestCheckpointUtils {
@Test
public void testIsValidCheckpointFormatUnsupported() {
// Unsupported formats should return false (caught internally)
- assertFalse(CheckpointUtils.isValidCheckpointFormat(
- CheckpointFormat.FLINK_KAFKA, "anystring"));
assertFalse(CheckpointUtils.isValidCheckpointFormat(
CheckpointFormat.CUSTOM, "anystring"));
}
+
+ @Test
+ public void testIsValidCheckpointFormatFlinkKafka() {
+ // Flink Kafka format is now supported (Phase 2)
+ assertTrue(CheckpointUtils.isValidCheckpointFormat(
+ CheckpointFormat.FLINK_KAFKA,
+ "kafka_metadata%3Aevents%3A0:100;kafka_metadata%3Aevents%3A1:200"));
+ // Invalid Flink checkpoint should return false
+ assertFalse(CheckpointUtils.isValidCheckpointFormat(
+ CheckpointFormat.FLINK_KAFKA, ""));
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 88f48c1863ec..1c1add8b721b 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -38,6 +38,7 @@ import org.apache.hudi.hive.HiveSyncTool;
import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
import org.apache.hudi.sink.event.Correspondent;
import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.validator.FlinkValidatorUtils;
import org.apache.hudi.sink.utils.CoordinationResponseSerDe;
import org.apache.hudi.sink.utils.EventBuffers;
import org.apache.hudi.sink.utils.EventBuffers.EventBuffer;
@@ -640,6 +641,11 @@ public class StreamWriteOperatorCoordinator
? writeClient.getPartitionToReplacedFileIds(tableState.operationType,
dataWriteResults)
: Collections.emptyMap();
List<WriteStatus> allWriteStatus =
Stream.concat(dataWriteResults.stream(),
indexWriteResults.stream()).collect(Collectors.toList());
+
+ // Run pre-commit validators (if configured) before finalizing the commit
+ FlinkValidatorUtils.runValidators(conf, instant, allWriteStatus,
+ checkpointCommitMetadata, () ->
StreamerUtil.getPreviousCommitMetadata(this.metaClient));
+
boolean success = writeClient.commit(instant, allWriteStatus,
Option.of(checkpointCommitMetadata),
tableState.commitAction, partitionToReplacedFileIds);
if (success) {
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/validator/FlinkKafkaOffsetValidator.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/validator/FlinkKafkaOffsetValidator.java
new file mode 100644
index 000000000000..88c9a4a6f877
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/validator/FlinkKafkaOffsetValidator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sink.validator;
+
+import org.apache.hudi.client.validator.StreamingOffsetValidator;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.CheckpointUtils.CheckpointFormat;
+import org.apache.hudi.util.StreamerUtil;
+
+/**
+ * Flink-specific Kafka offset validator.
+ *
+ * <p>Validates that the number of records written matches the Kafka offset
difference
+ * between the current and previous Flink checkpoints. Uses the Flink Kafka
checkpoint
+ * format stored in {@code HoodieMetadataKey} extraMetadata.</p>
+ *
+ * <p>Configuration:
+ * <ul>
+ * <li>{@code hoodie.precommit.validators}: Include
+ * {@code org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator}</li>
+ * <li>{@code
hoodie.precommit.validators.streaming.offset.tolerance.percentage}:
+ * Acceptable deviation (default: 0.0 = strict)</li>
+ * <li>{@code hoodie.precommit.validators.failure.policy}:
+ * FAIL (default) or WARN_LOG</li>
+ * </ul></p>
+ *
+ * <p>This validator is primarily intended for append-only ingestion from
Kafka.
+ * For upsert workloads with deduplication, configure a higher tolerance or
use WARN_LOG.</p>
+ */
+public class FlinkKafkaOffsetValidator extends StreamingOffsetValidator {
+
+ /**
+ * Create a Flink Kafka offset validator.
+ *
+ * @param config Validator configuration
+ */
+ public FlinkKafkaOffsetValidator(TypedProperties config) {
+ super(config, StreamerUtil.HOODIE_METADATA_KEY,
CheckpointFormat.FLINK_KAFKA);
+ }
+
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/validator/FlinkValidationContext.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/validator/FlinkValidationContext.java
new file mode 100644
index 000000000000..c8b3a2fe8b5b
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/validator/FlinkValidationContext.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sink.validator;
+
+import org.apache.hudi.client.validator.ValidationContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+
+import java.util.List;
+
+/**
+ * Flink implementation of {@link ValidationContext}.
+ *
+ * <p>Constructed from data available in {@code
StreamWriteOperatorCoordinator.doCommit()}
+ * before the commit is finalized. Provides validators with access to commit
metadata,
+ * write statistics, and previous commit information.</p>
+ *
+ * <p>Unlike Spark's ValidationContext (Phase 3), Flink's context does not
have access to
+ * the active timeline at validation time because the commit has not yet been
written.
+ * The previous commit metadata is provided directly from the coordinator's
state.</p>
+ */
+public class FlinkValidationContext implements ValidationContext {
+
+ private final String instantTime;
+ private final Option<HoodieCommitMetadata> commitMetadata;
+ private final Option<List<HoodieWriteStat>> writeStats;
+ private final Option<HoodieCommitMetadata> previousCommitMetadata;
+
+ /**
+ * Create a Flink validation context.
+ *
+ * @param instantTime Current commit instant time
+ * @param commitMetadata Current commit metadata (with extraMetadata
including checkpoints)
+ * @param writeStats Write statistics from all write operators
+ * @param previousCommitMetadata Metadata from the previous completed commit
+ */
+ public FlinkValidationContext(String instantTime,
+ Option<HoodieCommitMetadata> commitMetadata,
+ Option<List<HoodieWriteStat>> writeStats,
+ Option<HoodieCommitMetadata>
previousCommitMetadata) {
+ this.instantTime = instantTime;
+ this.commitMetadata = commitMetadata;
+ this.writeStats = writeStats;
+ this.previousCommitMetadata = previousCommitMetadata;
+ }
+
+ @Override
+ public String getInstantTime() {
+ return instantTime;
+ }
+
+ @Override
+ public Option<HoodieCommitMetadata> getCommitMetadata() {
+ return commitMetadata;
+ }
+
+ @Override
+ public Option<List<HoodieWriteStat>> getWriteStats() {
+ return writeStats;
+ }
+
+ /**
+ * Not supported in Flink context. The active timeline is not available
+ * at pre-commit validation time because the commit has not been written yet.
+ *
+ * @throws UnsupportedOperationException always
+ */
+ @Override
+ public HoodieActiveTimeline getActiveTimeline() {
+ throw new UnsupportedOperationException(
+ "Active timeline is not available in Flink pre-commit validation
context. "
+ + "Use getPreviousCommitMetadata() to access previous commit
information.");
+ }
+
+ /**
+ * Not supported in Flink context. Use {@link #isFirstCommit()} or
+ * {@link #getPreviousCommitMetadata()} instead.
+ *
+ * @throws UnsupportedOperationException always
+ */
+ @Override
+ public Option<HoodieInstant> getPreviousCommitInstant() {
+ throw new UnsupportedOperationException(
+ "getPreviousCommitInstant() is not available in Flink pre-commit
validation context. "
+ + "Use isFirstCommit() or getPreviousCommitMetadata() instead.");
+ }
+
+ @Override
+ public boolean isFirstCommit() {
+ return !previousCommitMetadata.isPresent();
+ }
+
+ @Override
+ public Option<HoodieCommitMetadata> getPreviousCommitMetadata() {
+ return previousCommitMetadata;
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/validator/FlinkValidatorUtils.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/validator/FlinkValidatorUtils.java
new file mode 100644
index 000000000000..a2b1120f8d87
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/validator/FlinkValidatorUtils.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sink.validator;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.validator.BasePreCommitValidator;
+import org.apache.hudi.client.validator.ValidationContext;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.configuration.Configuration;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for running pre-commit validators in the Flink commit flow.
+ *
+ * <p>Instantiates and executes validators configured via
+ * {@code hoodie.precommit.validators}. Each validator must extend
+ * {@link BasePreCommitValidator} and have a constructor that accepts
+ * {@link TypedProperties}.</p>
+ *
+ * <p>Called from {@code StreamWriteOperatorCoordinator.doCommit()} before
+ * the commit is finalized.</p>
+ */
+@Slf4j
+public class FlinkValidatorUtils {
+
+ /**
+ * Run all configured pre-commit validators.
+ *
+ * @param conf Flink configuration containing validator class names
+ * @param instant Commit instant time
+ * @param allWriteStatus Write statuses from all operators
+ * @param checkpointCommitMetadata Extra metadata being committed (contains
checkpoint info)
+ * @param previousCommitMetadataSupplier Supplier for metadata from the
previous completed commit (lazily evaluated)
+ * @throws HoodieValidationException if any validator fails with FAIL policy
+ */
+ public static void runValidators(Configuration conf,
+ String instant,
+ List<WriteStatus> allWriteStatus,
+ Map<String, String>
checkpointCommitMetadata,
+ Supplier<Option<HoodieCommitMetadata>>
previousCommitMetadataSupplier) {
+ String validatorClassNames = conf.getString(
+ HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
+ HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue());
+
+ if (StringUtils.isNullOrEmpty(validatorClassNames)) {
+ return;
+ }
+
+ // Fetch previous commit metadata only when validators are configured
+ Option<HoodieCommitMetadata> previousCommitMetadata =
previousCommitMetadataSupplier.get();
+
+ // Build ValidationContext from available data
+ HoodieCommitMetadata currentMetadata = buildCommitMetadata(allWriteStatus,
checkpointCommitMetadata);
+ List<HoodieWriteStat> writeStats = allWriteStatus.stream()
+ .map(WriteStatus::getStat)
+ .collect(Collectors.toList());
+
+ ValidationContext context = new FlinkValidationContext(
+ instant,
+ Option.of(currentMetadata),
+ Option.of(writeStats),
+ previousCommitMetadata);
+
+ // Build config properties for validators
+ TypedProperties props = TypedProperties.fromMap(conf.toMap());
+
+ // Instantiate and run each validator
+ List<String> classNames = Arrays.stream(validatorClassNames.split(","))
+ .map(String::trim)
+ .filter(s -> !s.isEmpty())
+ .collect(Collectors.toList());
+
+ for (String className : classNames) {
+ try {
+ BasePreCommitValidator validator = (BasePreCommitValidator)
+ ReflectionUtils.loadClass(className, new Class<?>[]
{TypedProperties.class}, props);
+ log.info("Running pre-commit validator: {} for instant: {}",
className, instant);
+ validator.validateWithMetadata(context);
+ log.info("Pre-commit validator {} passed for instant: {}", className,
instant);
+ } catch (HoodieValidationException e) {
+ log.error("Pre-commit validator {} failed for instant: {}", className,
instant, e);
+ throw e;
+ } catch (Exception e) {
+ log.error("Failed to instantiate or run validator: {}", className, e);
+ throw new HoodieValidationException(
+ "Failed to run pre-commit validator: " + className, e);
+ }
+ }
+ }
+
+ /**
+ * Build HoodieCommitMetadata from write statuses and extra metadata.
+ * This constructs the metadata object that would be committed, giving
+ * validators access to the same data.
+ */
+ private static HoodieCommitMetadata buildCommitMetadata(
+ List<WriteStatus> writeStatuses, Map<String, String> extraMetadata) {
+ HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+
+ // Add write stats
+ for (WriteStatus status : writeStatuses) {
+ HoodieWriteStat stat = status.getStat();
+ if (stat != null) {
+ metadata.addWriteStat(stat.getPartitionPath(), stat);
+ }
+ }
+
+ // Add extra metadata (includes checkpoint info like HoodieMetadataKey)
+ if (extraMetadata != null) {
+ extraMetadata.forEach(metadata::addMetadata);
+ }
+
+ return metadata;
+ }
+
+ private FlinkValidatorUtils() {
+ // Utility class
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 503910d970c8..ed5cbbc64ec8 100644
---
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.engine.EngineType;
import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieTableType;
@@ -982,4 +983,27 @@ public class StreamerUtil {
return offsets;
}
+
+ /**
+ * Get commit metadata from the last completed write commit on the timeline.
+ * Used for pre-commit validation to compare current commit against previous.
+ *
+ * @param metaClient the table meta client
+ * @return the previous write commit metadata, or empty if none exists
+ */
+ public static Option<HoodieCommitMetadata>
getPreviousCommitMetadata(HoodieTableMetaClient metaClient) {
+ try {
+ HoodieTimeline completedWriteTimeline = metaClient.reloadActiveTimeline()
+ .getWriteTimeline()
+ .filterCompletedInstants();
+ Option<HoodieInstant> lastInstant = completedWriteTimeline.lastInstant();
+ if (lastInstant.isPresent()) {
+ return
Option.of(completedWriteTimeline.readCommitMetadata(lastInstant.get()));
+ }
+ } catch (Exception e) {
+ log.warn("Failed to read previous commit metadata for pre-commit
validation. "
+ + "Validation will skip previous-commit checks.", e);
+ }
+ return Option.empty();
+ }
}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/validator/TestFlinkKafkaCheckpointParsing.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/validator/TestFlinkKafkaCheckpointParsing.java
new file mode 100644
index 000000000000..24140289114a
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/validator/TestFlinkKafkaCheckpointParsing.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sink.validator;
+
+import org.apache.hudi.common.util.CheckpointUtils;
+import org.apache.hudi.common.util.CheckpointUtils.CheckpointFormat;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for Flink Kafka checkpoint parsing in {@link CheckpointUtils}.
+ */
+public class TestFlinkKafkaCheckpointParsing {
+
+ @Test
+ public void testParseSinglePartition() {
+ String checkpoint = "kafka_metadata%3Aevents%3A0:1000";
+ Map<Integer, Long> offsets =
CheckpointUtils.parseCheckpoint(CheckpointFormat.FLINK_KAFKA, checkpoint);
+ assertEquals(1, offsets.size());
+ assertEquals(1000L, offsets.get(0));
+ }
+
+ @Test
+ public void testParseMultiplePartitions() {
+ String checkpoint =
"kafka_metadata%3Aevents%3A0:100;kafka_metadata%3Aevents%3A1:200;kafka_metadata%3Aevents%3A2:300";
+ Map<Integer, Long> offsets =
CheckpointUtils.parseCheckpoint(CheckpointFormat.FLINK_KAFKA, checkpoint);
+ assertEquals(3, offsets.size());
+ assertEquals(100L, offsets.get(0));
+ assertEquals(200L, offsets.get(1));
+ assertEquals(300L, offsets.get(2));
+ }
+
+ @Test
+ public void testParseWithClusterMetadata() {
+ // Cluster metadata entry should be skipped (not a partition offset)
+ String checkpoint =
"kafka_metadata%3Aevents%3A0:100;kafka_metadata%3Aevents%3A1:200"
+ + ";kafka_metadata%3Akafka_cluster%3Aevents%3A:my-cluster";
+ Map<Integer, Long> offsets =
CheckpointUtils.parseCheckpoint(CheckpointFormat.FLINK_KAFKA, checkpoint);
+ assertEquals(2, offsets.size());
+ assertEquals(100L, offsets.get(0));
+ assertEquals(200L, offsets.get(1));
+ }
+
+ @Test
+ public void testCalculateOffsetDifference() {
+ String prev =
"kafka_metadata%3Aevents%3A0:100;kafka_metadata%3Aevents%3A1:200";
+ String curr =
"kafka_metadata%3Aevents%3A0:300;kafka_metadata%3Aevents%3A1:500";
+ // Diff = (300-100) + (500-200) = 200 + 300 = 500
+ long diff =
CheckpointUtils.calculateOffsetDifference(CheckpointFormat.FLINK_KAFKA, prev,
curr);
+ assertEquals(500L, diff);
+ }
+
+ @Test
+ public void testCalculateOffsetDifferenceWithClusterMetadata() {
+ String prev =
"kafka_metadata%3Aevents%3A0:100;kafka_metadata%3Akafka_cluster%3Aevents%3A:cluster1";
+ String curr =
"kafka_metadata%3Aevents%3A0:200;kafka_metadata%3Akafka_cluster%3Aevents%3A:cluster1";
+ long diff =
CheckpointUtils.calculateOffsetDifference(CheckpointFormat.FLINK_KAFKA, prev,
curr);
+ assertEquals(100L, diff);
+ }
+
+ @Test
+ public void testIsValidFormat() {
+ assertTrue(CheckpointUtils.isValidCheckpointFormat(
+ CheckpointFormat.FLINK_KAFKA, "kafka_metadata%3Aevents%3A0:100"));
+ assertFalse(CheckpointUtils.isValidCheckpointFormat(
+ CheckpointFormat.FLINK_KAFKA, ""));
+ assertFalse(CheckpointUtils.isValidCheckpointFormat(
+ CheckpointFormat.FLINK_KAFKA, null));
+ // "invalid-format" is parsed without error (entries silently skipped), so
it's "valid" format
+ assertTrue(CheckpointUtils.isValidCheckpointFormat(
+ CheckpointFormat.FLINK_KAFKA, "invalid-format"));
+ }
+
+ @Test
+ public void testParseWithNoOffsets() {
+ // Only cluster metadata, no partition offsets
+ String checkpoint =
"kafka_metadata%3Akafka_cluster%3Aevents%3A:my-cluster";
+ Map<Integer, Long> offsets =
CheckpointUtils.parseCheckpoint(CheckpointFormat.FLINK_KAFKA, checkpoint);
+ assertEquals(0, offsets.size());
+ }
+
+ @Test
+ public void testOffsetResetSkippedInDiff() {
+ // Partition 0 offset goes backward (reset). Should be skipped, only
partition 1 counted.
+ String prev =
"kafka_metadata%3Aevents%3A0:500;kafka_metadata%3Aevents%3A1:100";
+ String curr =
"kafka_metadata%3Aevents%3A0:200;kafka_metadata%3Aevents%3A1:300";
+ // Partition 0: 200-500 = -300 (skipped). Partition 1: 300-100 = 200.
+ long diff =
CheckpointUtils.calculateOffsetDifference(CheckpointFormat.FLINK_KAFKA, prev,
curr);
+ assertEquals(200L, diff);
+ }
+
+ @Test
+ public void testParseInvalidFormatReturnsEmpty() {
+ // Missing %3A separators — entry is silently skipped (consistent with
production parser)
+ Map<Integer, Long> offsets = CheckpointUtils.parseCheckpoint(
+ CheckpointFormat.FLINK_KAFKA, "invalid-no-separators");
+ assertEquals(0, offsets.size());
+ }
+
+ @Test
+ public void testParseNullThrows() {
+ assertThrows(IllegalArgumentException.class,
+ () -> CheckpointUtils.parseCheckpoint(CheckpointFormat.FLINK_KAFKA,
null));
+ }
+
+ @Test
+ public void testParseEmptyStringThrows() {
+ assertThrows(IllegalArgumentException.class,
+ () -> CheckpointUtils.parseCheckpoint(CheckpointFormat.FLINK_KAFKA,
""));
+ }
+
+ @Test
+ public void testParseWithEmptySemicolonEntries() {
+ // Extra semicolons should be handled gracefully
+ String checkpoint =
";kafka_metadata%3Aevents%3A0:100;;kafka_metadata%3Aevents%3A1:200;";
+ Map<Integer, Long> offsets =
CheckpointUtils.parseCheckpoint(CheckpointFormat.FLINK_KAFKA, checkpoint);
+ assertEquals(2, offsets.size());
+ assertEquals(100L, offsets.get(0));
+ assertEquals(200L, offsets.get(1));
+ }
+
+ @Test
+ public void testZeroOffsetDifference() {
+ String checkpoint =
"kafka_metadata%3Aevents%3A0:100;kafka_metadata%3Aevents%3A1:200";
+ long diff =
CheckpointUtils.calculateOffsetDifference(CheckpointFormat.FLINK_KAFKA,
checkpoint, checkpoint);
+ assertEquals(0L, diff);
+ }
+
+ @Test
+ public void testLargeOffsets() {
+ // Test with large offset values to ensure no overflow
+ String prev = "kafka_metadata%3Aevents%3A0:" + Long.MAX_VALUE / 2;
+ String curr = "kafka_metadata%3Aevents%3A0:" + (Long.MAX_VALUE / 2 + 1000);
+ long diff =
CheckpointUtils.calculateOffsetDifference(CheckpointFormat.FLINK_KAFKA, prev,
curr);
+ assertEquals(1000L, diff);
+ }
+
+ @Test
+ public void testNewPartitionSkippedInDiff() {
+ // Previous has partition 0, current has 0 and 1 (new partition)
+ String prev = "kafka_metadata%3Aevents%3A0:100";
+ String curr =
"kafka_metadata%3Aevents%3A0:200;kafka_metadata%3Aevents%3A1:50";
+ // Only partition 0 diff counted: 200-100 = 100. Partition 1 skipped (new).
+ long diff =
CheckpointUtils.calculateOffsetDifference(CheckpointFormat.FLINK_KAFKA, prev,
curr);
+ assertEquals(100L, diff);
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/validator/TestFlinkKafkaOffsetValidator.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/validator/TestFlinkKafkaOffsetValidator.java
new file mode 100644
index 000000000000..421e8ec79dd2
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/validator/TestFlinkKafkaOffsetValidator.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sink.validator;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Tests for {@link FlinkKafkaOffsetValidator}.
+ */
+public class TestFlinkKafkaOffsetValidator {
+
+ // ========== Helper methods ==========
+
+ private static TypedProperties defaultConfig() {
+ TypedProperties props = new TypedProperties();
+
props.setProperty(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
"0.0");
+
props.setProperty(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(),
"FAIL");
+ return props;
+ }
+
+ private static TypedProperties configWithTolerance(double tolerance) {
+ TypedProperties props = defaultConfig();
+
props.setProperty(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
+ String.valueOf(tolerance));
+ return props;
+ }
+
+ private static TypedProperties configWithWarnPolicy() {
+ TypedProperties props = defaultConfig();
+
props.setProperty(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(),
"WARN_LOG");
+ return props;
+ }
+
+ /**
+ * Build a Flink Kafka checkpoint string.
+ * Format: kafka_metadata%3Atopic%3Apartition:offset;...
+ */
+ private static String buildFlinkKafkaCheckpoint(String topic, int[]
partitions, long[] offsets) {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < partitions.length; i++) {
+ if (i > 0) {
+ sb.append(";");
+ }
+ sb.append("kafka_metadata%3A").append(topic).append("%3A")
+ .append(partitions[i]).append(":").append(offsets[i]);
+ }
+ return sb.toString();
+ }
+
+ private static HoodieCommitMetadata buildMetadata(String checkpointValue) {
+ HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+ if (checkpointValue != null) {
+ metadata.addMetadata(StreamerUtil.HOODIE_METADATA_KEY, checkpointValue);
+ }
+ return metadata;
+ }
+
+ private static List<HoodieWriteStat> buildWriteStats(long numInserts, long
numUpdates) {
+ HoodieWriteStat stat = new HoodieWriteStat();
+ stat.setNumInserts(numInserts);
+ stat.setNumUpdateWrites(numUpdates);
+ stat.setPartitionPath("partition1");
+ return Collections.singletonList(stat);
+ }
+
+ private static FlinkValidationContext buildContext(
+ String instantTime,
+ HoodieCommitMetadata currentMetadata,
+ List<HoodieWriteStat> writeStats,
+ HoodieCommitMetadata previousMetadata) {
+ return new FlinkValidationContext(
+ instantTime,
+ Option.of(currentMetadata),
+ Option.of(writeStats),
+ previousMetadata != null ? Option.of(previousMetadata) :
Option.empty());
+ }
+
+ // ========== Tests ==========
+
+ @Test
+ public void testExactMatchPasses() {
+ // Previous: partition 0 at offset 100, partition 1 at offset 200
+ // Current: partition 0 at offset 200, partition 1 at offset 300
+ // Diff = (200-100) + (300-200) = 200. Records written = 200.
+ String prevCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0,
1}, new long[]{100, 200});
+ String currCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0,
1}, new long[]{200, 300});
+
+ FlinkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(200, 0),
+ buildMetadata(prevCheckpoint));
+
+ FlinkKafkaOffsetValidator validator = new
FlinkKafkaOffsetValidator(defaultConfig());
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testDataLossDetected() {
+ // Diff = 1000 but only 500 records written -> 50% deviation
+ String prevCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0},
new long[]{0});
+ String currCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0},
new long[]{1000});
+
+ FlinkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(500, 0),
+ buildMetadata(prevCheckpoint));
+
+ FlinkKafkaOffsetValidator validator = new
FlinkKafkaOffsetValidator(defaultConfig());
+ assertThrows(HoodieValidationException.class, () ->
validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testWithinTolerancePasses() {
+ // Diff = 1000, records = 950 -> 5% deviation, tolerance = 10%
+ String prevCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0},
new long[]{0});
+ String currCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0},
new long[]{1000});
+
+ FlinkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(950, 0),
+ buildMetadata(prevCheckpoint));
+
+ FlinkKafkaOffsetValidator validator = new
FlinkKafkaOffsetValidator(configWithTolerance(10.0));
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testWarnPolicyDoesNotThrow() {
+ // Data loss but WARN_LOG policy
+ String prevCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0},
new long[]{0});
+ String currCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0},
new long[]{1000});
+
+ FlinkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(0, 0),
+ buildMetadata(prevCheckpoint));
+
+ FlinkKafkaOffsetValidator validator = new
FlinkKafkaOffsetValidator(configWithWarnPolicy());
+ // Should warn but not throw
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testSkipsFirstCommit() {
+ String currCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0},
new long[]{1000});
+
+ // No previous commit
+ FlinkValidationContext ctx = new FlinkValidationContext(
+ "20260320120000000",
+ Option.of(buildMetadata(currCheckpoint)),
+ Option.of(buildWriteStats(500, 0)),
+ Option.empty());
+
+ FlinkKafkaOffsetValidator validator = new
FlinkKafkaOffsetValidator(defaultConfig());
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testSkipsWhenNoCheckpointKey() {
+ // Current metadata has no HoodieMetadataKey
+ HoodieCommitMetadata currentMeta = new HoodieCommitMetadata();
+ String prevCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0},
new long[]{100});
+
+ FlinkValidationContext ctx = buildContext("20260320120000000",
+ currentMeta,
+ buildWriteStats(500, 0),
+ buildMetadata(prevCheckpoint));
+
+ FlinkKafkaOffsetValidator validator = new
FlinkKafkaOffsetValidator(defaultConfig());
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testMultiPartitionValidation() {
+ // 4 partitions, each advancing by 250 = total diff 1000
+ String prevCheckpoint = buildFlinkKafkaCheckpoint("events",
+ new int[]{0, 1, 2, 3}, new long[]{0, 0, 0, 0});
+ String currCheckpoint = buildFlinkKafkaCheckpoint("events",
+ new int[]{0, 1, 2, 3}, new long[]{250, 250, 250, 250});
+
+ FlinkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(800, 200), // 800 inserts + 200 updates = 1000
+ buildMetadata(prevCheckpoint));
+
+ FlinkKafkaOffsetValidator validator = new
FlinkKafkaOffsetValidator(defaultConfig());
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testEmptyCommitSkipsValidation() {
+ // Both offsets same and no records written
+ String checkpoint = buildFlinkKafkaCheckpoint("events", new int[]{0}, new
long[]{100});
+
+ FlinkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(checkpoint),
+ buildWriteStats(0, 0),
+ buildMetadata(checkpoint));
+
+ FlinkKafkaOffsetValidator validator = new
FlinkKafkaOffsetValidator(defaultConfig());
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testCheckpointWithClusterMetadataIgnored() {
+ // Checkpoint with cluster metadata appended should still parse correctly
+ String prevCheckpoint =
"kafka_metadata%3Aevents%3A0:100;kafka_metadata%3Akafka_cluster%3Aevents%3A:my-cluster";
+ String currCheckpoint =
"kafka_metadata%3Aevents%3A0:200;kafka_metadata%3Akafka_cluster%3Aevents%3A:my-cluster";
+
+ FlinkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(100, 0),
+ buildMetadata(prevCheckpoint));
+
+ FlinkKafkaOffsetValidator validator = new
FlinkKafkaOffsetValidator(defaultConfig());
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testPreviousCheckpointMissingSkipsValidation() {
+ // Previous metadata exists but has no HoodieMetadataKey
+ HoodieCommitMetadata prevMeta = new HoodieCommitMetadata();
+ // No checkpoint key set on previous
+
+ String currCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0},
new long[]{1000});
+
+ FlinkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(500, 0),
+ prevMeta);
+
+ FlinkKafkaOffsetValidator validator = new
FlinkKafkaOffsetValidator(defaultConfig());
+ // Should skip — no previous checkpoint to compare
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testOvercountingDetected() {
+ // More records written than offset diff should also trigger deviation
+ // Diff = 100, records = 200 -> |100-200|/100 = 100% deviation
+ String prevCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0},
new long[]{0});
+ String currCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0},
new long[]{100});
+
+ FlinkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(200, 0),
+ buildMetadata(prevCheckpoint));
+
+ FlinkKafkaOffsetValidator validator = new
FlinkKafkaOffsetValidator(defaultConfig());
+ assertThrows(HoodieValidationException.class, () ->
validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testExactToleranceBoundaryPasses() {
+ // Diff = 1000, records = 900 -> 10% deviation, tolerance = 10% (exactly
at boundary)
+ String prevCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0},
new long[]{0});
+ String currCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0},
new long[]{1000});
+
+ FlinkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(900, 0),
+ buildMetadata(prevCheckpoint));
+
+ FlinkKafkaOffsetValidator validator = new
FlinkKafkaOffsetValidator(configWithTolerance(10.0));
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testJustOverToleranceFails() {
+ // Diff = 1000, records = 899 -> 10.1% deviation, tolerance = 10%
+ String prevCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0},
new long[]{0});
+ String currCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0},
new long[]{1000});
+
+ FlinkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(899, 0),
+ buildMetadata(prevCheckpoint));
+
+ FlinkKafkaOffsetValidator validator = new
FlinkKafkaOffsetValidator(configWithTolerance(10.0));
+ assertThrows(HoodieValidationException.class, () ->
validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testOnlyInsertsNoUpdates() {
+ // Pure insert workload
+ String prevCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0,
1}, new long[]{0, 0});
+ String currCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0,
1}, new long[]{500, 500});
+
+ FlinkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(1000, 0),
+ buildMetadata(prevCheckpoint));
+
+ FlinkKafkaOffsetValidator validator = new
FlinkKafkaOffsetValidator(defaultConfig());
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+
+ @Test
+ public void testUpdatesCountedInRecordTotal() {
+ // Diff = 1000. 600 inserts + 400 updates = 1000 total
+ String prevCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0},
new long[]{0});
+ String currCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0},
new long[]{1000});
+
+ FlinkValidationContext ctx = buildContext("20260320120000000",
+ buildMetadata(currCheckpoint),
+ buildWriteStats(600, 400),
+ buildMetadata(prevCheckpoint));
+
+ FlinkKafkaOffsetValidator validator = new
FlinkKafkaOffsetValidator(defaultConfig());
+ assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/validator/TestFlinkValidationContext.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/validator/TestFlinkValidationContext.java
new file mode 100644
index 000000000000..b6fb31f0c873
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/validator/TestFlinkValidationContext.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sink.validator;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.Option;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link FlinkValidationContext}.
+ */
+public class TestFlinkValidationContext {
+
+ @Test
+ public void testBasicProperties() {
+ HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+ metadata.addMetadata("key1", "value1");
+
+ HoodieWriteStat stat = new HoodieWriteStat();
+ stat.setNumInserts(100);
+ stat.setNumUpdateWrites(50);
+ List<HoodieWriteStat> stats = Collections.singletonList(stat);
+
+ FlinkValidationContext ctx = new FlinkValidationContext(
+ "20260320120000000",
+ Option.of(metadata),
+ Option.of(stats),
+ Option.empty());
+
+ assertEquals("20260320120000000", ctx.getInstantTime());
+ assertTrue(ctx.getCommitMetadata().isPresent());
+ assertTrue(ctx.getWriteStats().isPresent());
+ assertEquals(1, ctx.getWriteStats().get().size());
+ }
+
+ @Test
+ public void testTotalRecordsWritten() {
+ HoodieWriteStat stat1 = new HoodieWriteStat();
+ stat1.setNumInserts(100);
+ stat1.setNumUpdateWrites(50);
+
+ HoodieWriteStat stat2 = new HoodieWriteStat();
+ stat2.setNumInserts(200);
+ stat2.setNumUpdateWrites(30);
+
+ List<HoodieWriteStat> stats = Arrays.asList(stat1, stat2);
+
+ FlinkValidationContext ctx = new FlinkValidationContext(
+ "20260320120000000",
+ Option.of(new HoodieCommitMetadata()),
+ Option.of(stats),
+ Option.empty());
+
+ assertEquals(380, ctx.getTotalRecordsWritten());
+ assertEquals(300, ctx.getTotalInsertRecordsWritten());
+ assertEquals(80, ctx.getTotalUpdateRecordsWritten());
+ }
+
+ @Test
+ public void testIsFirstCommit() {
+ FlinkValidationContext ctxFirst = new FlinkValidationContext(
+ "20260320120000000",
+ Option.of(new HoodieCommitMetadata()),
+ Option.of(Collections.emptyList()),
+ Option.empty());
+
+ assertTrue(ctxFirst.isFirstCommit());
+
+ FlinkValidationContext ctxNotFirst = new FlinkValidationContext(
+ "20260320120000000",
+ Option.of(new HoodieCommitMetadata()),
+ Option.of(Collections.emptyList()),
+ Option.of(new HoodieCommitMetadata()));
+
+ assertFalse(ctxNotFirst.isFirstCommit());
+ }
+
+ @Test
+ public void testExtraMetadata() {
+ HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+ metadata.addMetadata("checkpoint", "topic,0:100");
+ metadata.addMetadata("other_key", "other_value");
+
+ FlinkValidationContext ctx = new FlinkValidationContext(
+ "20260320120000000",
+ Option.of(metadata),
+ Option.of(Collections.emptyList()),
+ Option.empty());
+
+ assertEquals(2, ctx.getExtraMetadata().size());
+ assertTrue(ctx.getExtraMetadata("checkpoint").isPresent());
+ assertEquals("topic,0:100", ctx.getExtraMetadata("checkpoint").get());
+ assertFalse(ctx.getExtraMetadata("nonexistent").isPresent());
+ }
+
+ @Test
+ public void testPreviousCommitMetadata() {
+ HoodieCommitMetadata prevMeta = new HoodieCommitMetadata();
+ prevMeta.addMetadata("checkpoint", "topic,0:50");
+
+ FlinkValidationContext ctx = new FlinkValidationContext(
+ "20260320120000000",
+ Option.of(new HoodieCommitMetadata()),
+ Option.of(Collections.emptyList()),
+ Option.of(prevMeta));
+
+ assertTrue(ctx.getPreviousCommitMetadata().isPresent());
+ assertEquals("topic,0:50",
ctx.getPreviousCommitMetadata().get().getMetadata("checkpoint"));
+ }
+
+ @Test
+ public void testGetActiveTimelineThrows() {
+ FlinkValidationContext ctx = new FlinkValidationContext(
+ "20260320120000000",
+ Option.of(new HoodieCommitMetadata()),
+ Option.of(Collections.emptyList()),
+ Option.empty());
+
+ assertThrows(UnsupportedOperationException.class, ctx::getActiveTimeline);
+ }
+
+ @Test
+ public void testGetPreviousCommitInstantThrows() {
+ FlinkValidationContext ctx = new FlinkValidationContext(
+ "20260320120000000",
+ Option.of(new HoodieCommitMetadata()),
+ Option.of(Collections.emptyList()),
+ Option.of(new HoodieCommitMetadata()));
+
+ assertThrows(UnsupportedOperationException.class,
ctx::getPreviousCommitInstant);
+ }
+
+ @Test
+ public void testMultipleWriteStatsAggregation() {
+ HoodieWriteStat stat1 = new HoodieWriteStat();
+ stat1.setNumInserts(10);
+ stat1.setNumUpdateWrites(5);
+
+ HoodieWriteStat stat2 = new HoodieWriteStat();
+ stat2.setNumInserts(20);
+ stat2.setNumUpdateWrites(15);
+
+ HoodieWriteStat stat3 = new HoodieWriteStat();
+ stat3.setNumInserts(30);
+ stat3.setNumUpdateWrites(25);
+
+ List<HoodieWriteStat> stats = Arrays.asList(stat1, stat2, stat3);
+
+ FlinkValidationContext ctx = new FlinkValidationContext(
+ "20260320120000000",
+ Option.of(new HoodieCommitMetadata()),
+ Option.of(stats),
+ Option.empty());
+
+ assertEquals(60, ctx.getTotalInsertRecordsWritten());
+ assertEquals(45, ctx.getTotalUpdateRecordsWritten());
+ assertEquals(105, ctx.getTotalRecordsWritten());
+ }
+
+ @Test
+ public void testEmptyWriteStats() {
+ FlinkValidationContext ctx = new FlinkValidationContext(
+ "20260320120000000",
+ Option.of(new HoodieCommitMetadata()),
+ Option.empty(),
+ Option.empty());
+
+ assertEquals(0, ctx.getTotalRecordsWritten());
+ assertEquals(0, ctx.getTotalInsertRecordsWritten());
+ assertEquals(0, ctx.getTotalUpdateRecordsWritten());
+ }
+}
diff --git
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/validator/TestFlinkValidatorUtils.java
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/validator/TestFlinkValidatorUtils.java
new file mode 100644
index 000000000000..dea583597183
--- /dev/null
+++
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/validator/TestFlinkValidatorUtils.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sink.validator;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link FlinkValidatorUtils}.
+ */
+public class TestFlinkValidatorUtils {
+
+ private static Configuration confWithValidator(String validatorClassName) {
+ Configuration conf = new Configuration();
+ conf.setString(HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
validatorClassName);
+ return conf;
+ }
+
+ private static WriteStatus buildWriteStatus(String partitionPath, long
numInserts, long numUpdates) {
+ HoodieWriteStat stat = new HoodieWriteStat();
+ stat.setPartitionPath(partitionPath);
+ stat.setNumInserts(numInserts);
+ stat.setNumUpdateWrites(numUpdates);
+
+ WriteStatus ws = new WriteStatus(false, 0.0);
+ ws.setStat(stat);
+ return ws;
+ }
+
+ // ========== Tests ==========
+
+ @Test
+ public void testNoValidatorsConfigured() {
+ Configuration conf = new Configuration();
+ // No validator class names set
+ List<WriteStatus> writeStatuses =
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+ assertDoesNotThrow(() -> FlinkValidatorUtils.runValidators(
+ conf, "20260320120000000", writeStatuses,
+ new HashMap<>(), () -> Option.empty()));
+ }
+
+ @Test
+ public void testEmptyValidatorString() {
+ Configuration conf = new Configuration();
+ conf.setString(HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
"");
+ List<WriteStatus> writeStatuses =
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+ assertDoesNotThrow(() -> FlinkValidatorUtils.runValidators(
+ conf, "20260320120000000", writeStatuses,
+ new HashMap<>(), () -> Option.empty()));
+ }
+
+ @Test
+ public void testValidValidatorRunsSuccessfully() {
+ // FlinkKafkaOffsetValidator with first commit (no previous) should pass
+ Configuration conf = confWithValidator(
+ "org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator");
+
conf.setString(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
"0.0");
+
conf.setString(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(),
"FAIL");
+
+ List<WriteStatus> writeStatuses =
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+ Map<String, String> extraMeta = new HashMap<>();
+ extraMeta.put(StreamerUtil.HOODIE_METADATA_KEY,
+ "kafka_metadata%3Aevents%3A0:100");
+
+ // First commit (no previous metadata) — validator should skip and pass
+ assertDoesNotThrow(() -> FlinkValidatorUtils.runValidators(
+ conf, "20260320120000000", writeStatuses, extraMeta, () ->
Option.empty()));
+ }
+
+ @Test
+ public void testValidatorPassesWithMatchingCounts() {
+ Configuration conf = confWithValidator(
+ "org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator");
+
conf.setString(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
"0.0");
+
conf.setString(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(),
"FAIL");
+
+ // 100 records written, offset diff = 100
+ List<WriteStatus> writeStatuses =
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+ Map<String, String> extraMeta = new HashMap<>();
+ extraMeta.put(StreamerUtil.HOODIE_METADATA_KEY,
+ "kafka_metadata%3Aevents%3A0:200");
+
+ HoodieCommitMetadata prevMeta = new HoodieCommitMetadata();
+ prevMeta.addMetadata(StreamerUtil.HOODIE_METADATA_KEY,
+ "kafka_metadata%3Aevents%3A0:100");
+
+ assertDoesNotThrow(() -> FlinkValidatorUtils.runValidators(
+ conf, "20260320120000000", writeStatuses, extraMeta, () ->
Option.of(prevMeta)));
+ }
+
+ @Test
+ public void testValidatorFailsWithMismatchedCounts() {
+ Configuration conf = confWithValidator(
+ "org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator");
+
conf.setString(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
"0.0");
+
conf.setString(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(),
"FAIL");
+
+ // Only 10 records written but offset diff = 1000
+ List<WriteStatus> writeStatuses =
Collections.singletonList(buildWriteStatus("p1", 10, 0));
+ Map<String, String> extraMeta = new HashMap<>();
+ extraMeta.put(StreamerUtil.HOODIE_METADATA_KEY,
+ "kafka_metadata%3Aevents%3A0:1000");
+
+ HoodieCommitMetadata prevMeta = new HoodieCommitMetadata();
+ prevMeta.addMetadata(StreamerUtil.HOODIE_METADATA_KEY,
+ "kafka_metadata%3Aevents%3A0:0");
+
+ assertThrows(HoodieValidationException.class,
+ () -> FlinkValidatorUtils.runValidators(
+ conf, "20260320120000000", writeStatuses, extraMeta, () ->
Option.of(prevMeta)));
+ }
+
+ @Test
+ public void testInvalidValidatorClassThrows() {
+ Configuration conf = confWithValidator("com.nonexistent.FakeValidator");
+ List<WriteStatus> writeStatuses =
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+ assertThrows(HoodieValidationException.class,
+ () -> FlinkValidatorUtils.runValidators(
+ conf, "20260320120000000", writeStatuses, new HashMap<>(), () ->
Option.empty()));
+ }
+
+ @Test
+ public void testMultipleValidators() {
+ // Two validators comma-separated, both FlinkKafkaOffsetValidator (will
both skip on first commit)
+ Configuration conf = confWithValidator(
+ "org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator,"
+ + "org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator");
+
conf.setString(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
"0.0");
+
conf.setString(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(),
"FAIL");
+
+ List<WriteStatus> writeStatuses =
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+ Map<String, String> extraMeta = new HashMap<>();
+ extraMeta.put(StreamerUtil.HOODIE_METADATA_KEY,
+ "kafka_metadata%3Aevents%3A0:100");
+
+ assertDoesNotThrow(() -> FlinkValidatorUtils.runValidators(
+ conf, "20260320120000000", writeStatuses, extraMeta, () ->
Option.empty()));
+ }
+
+ @Test
+ public void testValidatorWithWhitespaceInClassNames() {
+ // Class names with extra whitespace and trailing comma
+ Configuration conf = confWithValidator(
+ " org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator , ");
+
conf.setString(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
"0.0");
+
conf.setString(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(),
"FAIL");
+
+ List<WriteStatus> writeStatuses =
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+ assertDoesNotThrow(() -> FlinkValidatorUtils.runValidators(
+ conf, "20260320120000000", writeStatuses, new HashMap<>(), () ->
Option.empty()));
+ }
+
+ @Test
+ public void testBuildCommitMetadataWithNullExtraMetadata() {
+ Configuration conf = confWithValidator(
+ "org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator");
+
conf.setString(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
"0.0");
+
conf.setString(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(),
"FAIL");
+
+ List<WriteStatus> writeStatuses =
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+ // null extraMetadata should be handled gracefully
+ assertDoesNotThrow(() -> FlinkValidatorUtils.runValidators(
+ conf, "20260320120000000", writeStatuses, null, () -> Option.empty()));
+ }
+
+ @Test
+ public void testMultipleWriteStatusesAggregated() {
+ Configuration conf = confWithValidator(
+ "org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator");
+
conf.setString(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
"0.0");
+
conf.setString(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(),
"FAIL");
+
+ // Two write statuses: 60 inserts + 40 inserts = 100 total
+ List<WriteStatus> writeStatuses = new ArrayList<>();
+ writeStatuses.add(buildWriteStatus("p1", 60, 0));
+ writeStatuses.add(buildWriteStatus("p2", 40, 0));
+
+ Map<String, String> extraMeta = new HashMap<>();
+ extraMeta.put(StreamerUtil.HOODIE_METADATA_KEY,
+ "kafka_metadata%3Aevents%3A0:200");
+
+ HoodieCommitMetadata prevMeta = new HoodieCommitMetadata();
+ prevMeta.addMetadata(StreamerUtil.HOODIE_METADATA_KEY,
+ "kafka_metadata%3Aevents%3A0:100");
+
+ // Offset diff = 100, records = 100 — should pass
+ assertDoesNotThrow(() -> FlinkValidatorUtils.runValidators(
+ conf, "20260320120000000", writeStatuses, extraMeta, () ->
Option.of(prevMeta)));
+ }
+
+ @Test
+ public void testEmptyWriteStatuses() {
+ Configuration conf = confWithValidator(
+ "org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator");
+
conf.setString(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
"0.0");
+
conf.setString(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(),
"FAIL");
+
+ // Empty write statuses with matching empty offsets
+ List<WriteStatus> writeStatuses = Collections.emptyList();
+ Map<String, String> extraMeta = new HashMap<>();
+ String checkpoint = "kafka_metadata%3Aevents%3A0:100";
+ extraMeta.put(StreamerUtil.HOODIE_METADATA_KEY, checkpoint);
+
+ HoodieCommitMetadata prevMeta = new HoodieCommitMetadata();
+ prevMeta.addMetadata(StreamerUtil.HOODIE_METADATA_KEY, checkpoint);
+
+ // Both offset diff and records are 0 — should skip validation
+ assertDoesNotThrow(() -> FlinkValidatorUtils.runValidators(
+ conf, "20260320120000000", writeStatuses, extraMeta, () ->
Option.of(prevMeta)));
+ }
+
+ @Test
+ public void testValidationExceptionPreservedAcrossValidators() {
+ // First validator: valid, second: invalid class
+ Configuration conf = confWithValidator(
+ "org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator,"
+ + "com.nonexistent.FakeValidator");
+
conf.setString(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
"0.0");
+
conf.setString(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(),
"FAIL");
+
+ List<WriteStatus> writeStatuses =
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+ HoodieValidationException ex =
assertThrows(HoodieValidationException.class,
+ () -> FlinkValidatorUtils.runValidators(
+ conf, "20260320120000000", writeStatuses, new HashMap<>(), () ->
Option.empty()));
+ assertTrue(ex.getMessage().contains("FakeValidator"));
+ }
+
+ @Test
+ public void testConfigPropertiesPassedToValidator() {
+ // Set a high tolerance so even with mismatch, it passes
+ Configuration conf = confWithValidator(
+ "org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator");
+
conf.setString(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
"100.0");
+
conf.setString(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(),
"FAIL");
+
+ // 0 records written but 1000 offset diff — 100% deviation, but tolerance
is 100%
+ List<WriteStatus> writeStatuses =
Collections.singletonList(buildWriteStatus("p1", 0, 0));
+ Map<String, String> extraMeta = new HashMap<>();
+ extraMeta.put(StreamerUtil.HOODIE_METADATA_KEY,
+ "kafka_metadata%3Aevents%3A0:1000");
+
+ HoodieCommitMetadata prevMeta = new HoodieCommitMetadata();
+ prevMeta.addMetadata(StreamerUtil.HOODIE_METADATA_KEY,
+ "kafka_metadata%3Aevents%3A0:0");
+
+ assertDoesNotThrow(() -> FlinkValidatorUtils.runValidators(
+ conf, "20260320120000000", writeStatuses, extraMeta, () ->
Option.of(prevMeta)));
+ }
+}