This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 56bc28398a47 feat(flink): add pre-commit validation framework for 
Flink - Phase 2 (#18362)
56bc28398a47 is described below

commit 56bc28398a47b9083c883ef9fdc438fd44164823
Author: Xinli Shang <[email protected]>
AuthorDate: Thu Mar 26 19:21:09 2026 -0700

    feat(flink): add pre-commit validation framework for Flink - Phase 2 
(#18362)
    
    * feat(flink): add pre-commit validation framework for Flink - Phase 2
    
    Wire the pluggable pre-commit validation framework into Flink's
    StreamWriteOperatorCoordinator.doCommit() flow. This enables
    configurable validators to run before each Flink streaming commit.
    
    Key changes:
    - FlinkValidationContext: Flink implementation of ValidationContext
    - FlinkKafkaOffsetValidator: validates Kafka offset diff vs records written
    - FlinkValidatorUtils: orchestrator that instantiates and runs validators
    - CheckpointUtils: add Flink Kafka checkpoint format parsing
    - StreamWriteOperatorCoordinator: wire validators into doCommit()
    - BasePreCommitValidator: make validateWithMetadata() public for
      cross-package invocation
    
    Tests: 52 test cases covering FlinkValidatorUtils, FlinkValidationContext,
    FlinkKafkaOffsetValidator, and Flink Kafka checkpoint parsing.
    
    Closes #18067
    
    * fix(test): update TestCheckpointUtils for Phase 2 Flink Kafka 
implementation
    
    - Remove FLINK_KAFKA from testUnsupportedFormats since it's now implemented
    - Remove FLINK_KAFKA from testIsValidCheckpointFormatUnsupported
    - Add testFlinkKafkaCheckpointParsing to verify the new parser
    - Add testIsValidCheckpointFormatFlinkKafka for validation
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    * fix: address review comments from danny0405
    
    - Use TypedProperties.fromMap() instead of manual iteration 
(FlinkValidatorUtils)
    - Lazy-evaluate previousCommitMetadata via Supplier to skip fetch when no 
validators configured
    - Move getPreviousCommitMetadata() from StreamWriteOperatorCoordinator to 
StreamerUtil
    - Use getWriteTimeline() to filter to write commits only (instead of all 
completed instants)
    - Update tests for new Supplier-based API
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    ---------
    
    Co-authored-by: Xinli Shang <[email protected]>
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 .../client/validator/StreamingOffsetValidator.java |   2 +-
 .../config/HoodiePreCommitValidatorConfig.java     |   3 +-
 .../client/validator/BasePreCommitValidator.java   |   2 +-
 .../apache/hudi/common/util/CheckpointUtils.java   |  86 +++++-
 .../hudi/common/util/TestCheckpointUtils.java      |  28 +-
 .../hudi/sink/StreamWriteOperatorCoordinator.java  |   6 +
 .../sink/validator/FlinkKafkaOffsetValidator.java  |  58 ++++
 .../sink/validator/FlinkValidationContext.java     | 117 +++++++
 .../hudi/sink/validator/FlinkValidatorUtils.java   | 150 +++++++++
 .../java/org/apache/hudi/util/StreamerUtil.java    |  24 ++
 .../validator/TestFlinkKafkaCheckpointParsing.java | 171 ++++++++++
 .../validator/TestFlinkKafkaOffsetValidator.java   | 343 +++++++++++++++++++++
 .../sink/validator/TestFlinkValidationContext.java | 200 ++++++++++++
 .../sink/validator/TestFlinkValidatorUtils.java    | 290 +++++++++++++++++
 14 files changed, 1463 insertions(+), 17 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/StreamingOffsetValidator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/StreamingOffsetValidator.java
index 71844f3c1fad..ce577d84ca01 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/StreamingOffsetValidator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/StreamingOffsetValidator.java
@@ -88,7 +88,7 @@ public abstract class StreamingOffsetValidator extends 
BasePreCommitValidator {
   }
 
   @Override
-  protected void validateWithMetadata(ValidationContext context) throws 
HoodieValidationException {
+  public void validateWithMetadata(ValidationContext context) throws 
HoodieValidationException {
     // Skip validation for first commit (no previous checkpoint)
     if (context.isFirstCommit()) {
       log.info("Skipping offset validation for first commit");
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java
index b1faa5153d22..f85cc44120d4 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreCommitValidatorConfig.java
@@ -70,7 +70,8 @@ public class HoodiePreCommitValidatorConfig extends 
HoodieConfig {
       .sinceVersion("1.2.0")
       .markAdvanced()
       .withDocumentation("Tolerance percentage for streaming offset validation 
"
-          + "(used by 
org.apache.hudi.client.validator.StreamingOffsetValidator). "
+          + "(used by 
org.apache.hudi.client.validator.StreamingOffsetValidator "
+          + "and org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator). "
           + "The validator compares the offset difference (expected records 
from source) "
           + "with actual records written. If the deviation exceeds this 
percentage, "
           + "the commit is rejected or warned depending on the validation 
failure policy. "
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/client/validator/BasePreCommitValidator.java
 
b/hudi-common/src/main/java/org/apache/hudi/client/validator/BasePreCommitValidator.java
index b1c718246608..a35412797d90 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/client/validator/BasePreCommitValidator.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/client/validator/BasePreCommitValidator.java
@@ -72,7 +72,7 @@ public abstract class BasePreCommitValidator {
    * @throws HoodieValidationException if validation fails
    */
   @PublicAPIMethod(maturity = ApiMaturityLevel.EVOLVING)
-  protected void validateWithMetadata(ValidationContext context) throws 
HoodieValidationException {
+  public void validateWithMetadata(ValidationContext context) throws 
HoodieValidationException {
     // Default no-op implementation
     // Concrete validators override this to implement validation logic
   }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/util/CheckpointUtils.java 
b/hudi-common/src/main/java/org/apache/hudi/common/util/CheckpointUtils.java
index 5e0fc862b605..e3faabe10db3 100644
--- a/hudi-common/src/main/java/org/apache/hudi/common/util/CheckpointUtils.java
+++ b/hudi-common/src/main/java/org/apache/hudi/common/util/CheckpointUtils.java
@@ -37,10 +37,11 @@ import java.util.Map;
  *   {@code consumer.endOffsets()} or {@code consumer.offsetsForTimes()}, but 
the
  *   checkpoint string format is identical.
  *
- * - FLINK_KAFKA: Base64-encoded serialized Map (TopicPartition → Long)
- *   Example: "eyJ0b3BpY..." (base64)
- *   Used by: Flink streaming connector
- *   Note: Actual implementation requires Flink checkpoint deserialization 
(Phase 2)
+ * - FLINK_KAFKA: URL-encoded format with topic and partition offsets
+ *   Example: "kafka_metadata%3Aevents%3A0:100;kafka_metadata%3Aevents%3A1:200"
+ *   Format: "kafka_metadata%3A{topic}%3A{partition}:{offset}" separated by ";"
+ *   Used by: Flink streaming connector (stored in HoodieMetadataKey 
extraMetadata)
+ *   Note: Cluster metadata entries (kafka_metadata%3Akafka_cluster%3A...) are 
skipped
  *
  * - PULSAR: "partition:ledgerId:entryId,partition:ledgerId:entryId,..."
  *   Example: "0:123:45,1:234:56"
@@ -66,7 +67,10 @@ public class CheckpointUtils {
     /** HoodieStreamer (Spark) Kafka format: "topic,0:1000,1:2000" */
     SPARK_KAFKA,
 
-    /** Flink Kafka format: base64-encoded Map&lt;TopicPartition, Long&gt; */
+    /**
+     * Flink Kafka format: URL-encoded 
"kafka_metadata%3Atopic%3Apartition:offset" separated by ";".
+     * Cluster metadata entries (containing "kafka_cluster") are skipped 
during parsing.
+     */
     FLINK_KAFKA,
 
     /** Pulsar format: "0:123:45,1:234:56" (ledgerId:entryId). 
Engine-agnostic. */
@@ -92,9 +96,7 @@ public class CheckpointUtils {
       case SPARK_KAFKA:
         return parseSparkKafkaCheckpoint(checkpointStr);
       case FLINK_KAFKA:
-        throw new UnsupportedOperationException(
-            "Flink Kafka checkpoint parsing not yet implemented. "
-                + "This will be added in Phase 2 with Flink checkpoint 
deserialization support.");
+        return parseFlinkKafkaCheckpoint(checkpointStr);
       case PULSAR:
         throw new UnsupportedOperationException(
             "Pulsar checkpoint parsing not yet implemented. Planned for Phase 
4.");
@@ -254,4 +256,72 @@ public class CheckpointUtils {
 
     return offsetMap;
   }
+
+  /**
+   * Parse Flink Kafka checkpoint.
+   * Format: "kafka_metadata%3Atopic%3Apartition:offset" entries separated by 
";".
+   * Cluster metadata entries (containing "kafka_cluster") are skipped.
+   *
+   * <p>Example: 
"kafka_metadata%3Aevents%3A0:100;kafka_metadata%3Aevents%3A1:200
+   * ;kafka_metadata%3Akafka_cluster%3Aevents%3A:my-cluster"</p>
+   *
+   * <p>The URL-encoded colons (%3A) separate the prefix, topic, and 
partition:offset.
+   * The format is produced by {@code StreamerUtil.stringFy()} in 
hudi-flink.</p>
+   *
+   * <p>Parsing uses {@code lastIndexOf} to locate the partition and offset 
from the end
+   * of each entry, consistent with the production parser in
+   * {@code StreamerUtil.parseKafkaOffsets()}.</p>
+   *
+   * @param checkpointStr Checkpoint string
+   * @return Map of partition → offset (cluster metadata entries excluded)
+   * @throws IllegalArgumentException if format is invalid
+   */
+  private static Map<Integer, Long> parseFlinkKafkaCheckpoint(String 
checkpointStr) {
+    if (checkpointStr == null || checkpointStr.trim().isEmpty()) {
+      throw new IllegalArgumentException("Flink Kafka checkpoint string cannot 
be null or empty");
+    }
+
+    Map<Integer, Long> offsetMap = new HashMap<>();
+    String[] entries = checkpointStr.split(";");
+
+    for (String entry : entries) {
+      entry = entry.trim();
+      if (entry.isEmpty()) {
+        continue;
+      }
+
+      // Skip cluster metadata entries (e.g., 
kafka_metadata%3Akafka_cluster%3Atopic%3A:cluster)
+      if (!entry.contains(":") || entry.contains("kafka_cluster")) {
+        continue;
+      }
+
+      // Entry format: kafka_metadata%3Atopic%3Apartition:offset
+      // Find the last colon which separates partition:offset
+      int lastColonIndex = entry.lastIndexOf(':');
+      if (lastColonIndex == -1) {
+        continue;
+      }
+
+      String offsetStr = entry.substring(lastColonIndex + 1);
+      String beforeOffset = entry.substring(0, lastColonIndex);
+
+      // Find the partition number (everything after the last %3A)
+      int lastEncodedColonIndex = beforeOffset.lastIndexOf("%3A");
+      if (lastEncodedColonIndex == -1) {
+        continue;
+      }
+
+      String partitionStr = beforeOffset.substring(lastEncodedColonIndex + 
"%3A".length());
+
+      try {
+        int partition = Integer.parseInt(partitionStr);
+        long offset = Long.parseLong(offsetStr);
+        offsetMap.put(partition, offset);
+      } catch (NumberFormatException e) {
+        log.warn("Failed to parse partition ID or offset from entry: {}", 
entry, e);
+      }
+    }
+
+    return offsetMap;
+  }
 }
diff --git 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCheckpointUtils.java
 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCheckpointUtils.java
index 4f43926a7a48..3b20b76f632d 100644
--- 
a/hudi-common/src/test/java/org/apache/hudi/common/util/TestCheckpointUtils.java
+++ 
b/hudi-common/src/test/java/org/apache/hudi/common/util/TestCheckpointUtils.java
@@ -199,10 +199,6 @@ public class TestCheckpointUtils {
 
   @Test
   public void testUnsupportedFormats() {
-    // Flink format not yet implemented (Phase 2)
-    assertThrows(UnsupportedOperationException.class, () ->
-        CheckpointUtils.parseCheckpoint(CheckpointFormat.FLINK_KAFKA, 
"anystring"));
-
     // Pulsar format not yet implemented (Phase 4)
     assertThrows(UnsupportedOperationException.class, () ->
         CheckpointUtils.parseCheckpoint(CheckpointFormat.PULSAR, "anystring"));
@@ -212,6 +208,17 @@ public class TestCheckpointUtils {
         CheckpointUtils.parseCheckpoint(CheckpointFormat.KINESIS, 
"anystring"));
   }
 
+  @Test
+  public void testFlinkKafkaCheckpointParsing() {
+    // Flink Kafka format is now implemented (Phase 2)
+    Map<Integer, Long> result = CheckpointUtils.parseCheckpoint(
+        CheckpointFormat.FLINK_KAFKA,
+        "kafka_metadata%3Aevents%3A0:100;kafka_metadata%3Aevents%3A1:200");
+    assertEquals(2, result.size());
+    assertEquals(100L, result.get(0));
+    assertEquals(200L, result.get(1));
+  }
+
   @Test
   public void testCustomFormatThrows() {
     assertThrows(IllegalArgumentException.class, () ->
@@ -221,9 +228,18 @@ public class TestCheckpointUtils {
   @Test
   public void testIsValidCheckpointFormatUnsupported() {
     // Unsupported formats should return false (caught internally)
-    assertFalse(CheckpointUtils.isValidCheckpointFormat(
-        CheckpointFormat.FLINK_KAFKA, "anystring"));
     assertFalse(CheckpointUtils.isValidCheckpointFormat(
         CheckpointFormat.CUSTOM, "anystring"));
   }
+
+  @Test
+  public void testIsValidCheckpointFormatFlinkKafka() {
+    // Flink Kafka format is now supported (Phase 2)
+    assertTrue(CheckpointUtils.isValidCheckpointFormat(
+        CheckpointFormat.FLINK_KAFKA,
+        "kafka_metadata%3Aevents%3A0:100;kafka_metadata%3Aevents%3A1:200"));
+    // Invalid Flink checkpoint should return false
+    assertFalse(CheckpointUtils.isValidCheckpointFormat(
+        CheckpointFormat.FLINK_KAFKA, ""));
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
index 88f48c1863ec..1c1add8b721b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/StreamWriteOperatorCoordinator.java
@@ -38,6 +38,7 @@ import org.apache.hudi.hive.HiveSyncTool;
 import org.apache.hudi.sink.common.AbstractStreamWriteFunction;
 import org.apache.hudi.sink.event.Correspondent;
 import org.apache.hudi.sink.event.WriteMetadataEvent;
+import org.apache.hudi.sink.validator.FlinkValidatorUtils;
 import org.apache.hudi.sink.utils.CoordinationResponseSerDe;
 import org.apache.hudi.sink.utils.EventBuffers;
 import org.apache.hudi.sink.utils.EventBuffers.EventBuffer;
@@ -640,6 +641,11 @@ public class StreamWriteOperatorCoordinator
         ? writeClient.getPartitionToReplacedFileIds(tableState.operationType, 
dataWriteResults)
         : Collections.emptyMap();
     List<WriteStatus> allWriteStatus = 
Stream.concat(dataWriteResults.stream(), 
indexWriteResults.stream()).collect(Collectors.toList());
+
+    // Run pre-commit validators (if configured) before finalizing the commit
+    FlinkValidatorUtils.runValidators(conf, instant, allWriteStatus,
+        checkpointCommitMetadata, () -> 
StreamerUtil.getPreviousCommitMetadata(this.metaClient));
+
     boolean success = writeClient.commit(instant, allWriteStatus, 
Option.of(checkpointCommitMetadata),
         tableState.commitAction, partitionToReplacedFileIds);
     if (success) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/validator/FlinkKafkaOffsetValidator.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/validator/FlinkKafkaOffsetValidator.java
new file mode 100644
index 000000000000..88c9a4a6f877
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/validator/FlinkKafkaOffsetValidator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sink.validator;
+
+import org.apache.hudi.client.validator.StreamingOffsetValidator;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.CheckpointUtils.CheckpointFormat;
+import org.apache.hudi.util.StreamerUtil;
+
+/**
+ * Flink-specific Kafka offset validator.
+ *
+ * <p>Validates that the number of records written matches the Kafka offset 
difference
+ * between the current and previous Flink checkpoints. Uses the Flink Kafka 
checkpoint
+ * format stored in {@code HoodieMetadataKey} extraMetadata.</p>
+ *
+ * <p>Configuration:
+ * <ul>
+ *   <li>{@code hoodie.precommit.validators}: Include
+ *       {@code org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator}</li>
+ *   <li>{@code 
hoodie.precommit.validators.streaming.offset.tolerance.percentage}:
+ *       Acceptable deviation (default: 0.0 = strict)</li>
+ *   <li>{@code hoodie.precommit.validators.failure.policy}:
+ *       FAIL (default) or WARN_LOG</li>
+ * </ul></p>
+ *
+ * <p>This validator is primarily intended for append-only ingestion from 
Kafka.
+ * For upsert workloads with deduplication, configure a higher tolerance or 
use WARN_LOG.</p>
+ */
+public class FlinkKafkaOffsetValidator extends StreamingOffsetValidator {
+
+  /**
+   * Create a Flink Kafka offset validator.
+   *
+   * @param config Validator configuration
+   */
+  public FlinkKafkaOffsetValidator(TypedProperties config) {
+    super(config, StreamerUtil.HOODIE_METADATA_KEY, 
CheckpointFormat.FLINK_KAFKA);
+  }
+
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/validator/FlinkValidationContext.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/validator/FlinkValidationContext.java
new file mode 100644
index 000000000000..c8b3a2fe8b5b
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/validator/FlinkValidationContext.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sink.validator;
+
+import org.apache.hudi.client.validator.ValidationContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+
+import java.util.List;
+
+/**
+ * Flink implementation of {@link ValidationContext}.
+ *
+ * <p>Constructed from data available in {@code 
StreamWriteOperatorCoordinator.doCommit()}
+ * before the commit is finalized. Provides validators with access to commit 
metadata,
+ * write statistics, and previous commit information.</p>
+ *
+ * <p>Unlike Spark's ValidationContext (Phase 3), Flink's context does not 
have access to
+ * the active timeline at validation time because the commit has not yet been 
written.
+ * The previous commit metadata is provided directly from the coordinator's 
state.</p>
+ */
+public class FlinkValidationContext implements ValidationContext {
+
+  private final String instantTime;
+  private final Option<HoodieCommitMetadata> commitMetadata;
+  private final Option<List<HoodieWriteStat>> writeStats;
+  private final Option<HoodieCommitMetadata> previousCommitMetadata;
+
+  /**
+   * Create a Flink validation context.
+   *
+   * @param instantTime Current commit instant time
+   * @param commitMetadata Current commit metadata (with extraMetadata 
including checkpoints)
+   * @param writeStats Write statistics from all write operators
+   * @param previousCommitMetadata Metadata from the previous completed commit
+   */
+  public FlinkValidationContext(String instantTime,
+                                Option<HoodieCommitMetadata> commitMetadata,
+                                Option<List<HoodieWriteStat>> writeStats,
+                                Option<HoodieCommitMetadata> 
previousCommitMetadata) {
+    this.instantTime = instantTime;
+    this.commitMetadata = commitMetadata;
+    this.writeStats = writeStats;
+    this.previousCommitMetadata = previousCommitMetadata;
+  }
+
+  @Override
+  public String getInstantTime() {
+    return instantTime;
+  }
+
+  @Override
+  public Option<HoodieCommitMetadata> getCommitMetadata() {
+    return commitMetadata;
+  }
+
+  @Override
+  public Option<List<HoodieWriteStat>> getWriteStats() {
+    return writeStats;
+  }
+
+  /**
+   * Not supported in Flink context. The active timeline is not available
+   * at pre-commit validation time because the commit has not been written yet.
+   *
+   * @throws UnsupportedOperationException always
+   */
+  @Override
+  public HoodieActiveTimeline getActiveTimeline() {
+    throw new UnsupportedOperationException(
+        "Active timeline is not available in Flink pre-commit validation 
context. "
+            + "Use getPreviousCommitMetadata() to access previous commit 
information.");
+  }
+
+  /**
+   * Not supported in Flink context. Use {@link #isFirstCommit()} or
+   * {@link #getPreviousCommitMetadata()} instead.
+   *
+   * @throws UnsupportedOperationException always
+   */
+  @Override
+  public Option<HoodieInstant> getPreviousCommitInstant() {
+    throw new UnsupportedOperationException(
+        "getPreviousCommitInstant() is not available in Flink pre-commit 
validation context. "
+            + "Use isFirstCommit() or getPreviousCommitMetadata() instead.");
+  }
+
+  @Override
+  public boolean isFirstCommit() {
+    return !previousCommitMetadata.isPresent();
+  }
+
+  @Override
+  public Option<HoodieCommitMetadata> getPreviousCommitMetadata() {
+    return previousCommitMetadata;
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/validator/FlinkValidatorUtils.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/validator/FlinkValidatorUtils.java
new file mode 100644
index 000000000000..a2b1120f8d87
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/validator/FlinkValidatorUtils.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sink.validator;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.validator.BasePreCommitValidator;
+import org.apache.hudi.client.validator.ValidationContext;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.flink.configuration.Configuration;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for running pre-commit validators in the Flink commit flow.
+ *
+ * <p>Instantiates and executes validators configured via
+ * {@code hoodie.precommit.validators}. Each validator must extend
+ * {@link BasePreCommitValidator} and have a constructor that accepts
+ * {@link TypedProperties}.</p>
+ *
+ * <p>Called from {@code StreamWriteOperatorCoordinator.doCommit()} before
+ * the commit is finalized.</p>
+ */
+@Slf4j
+public class FlinkValidatorUtils {
+
+  /**
+   * Run all configured pre-commit validators.
+   *
+   * @param conf Flink configuration containing validator class names
+   * @param instant Commit instant time
+   * @param allWriteStatus Write statuses from all operators
+   * @param checkpointCommitMetadata Extra metadata being committed (contains 
checkpoint info)
+   * @param previousCommitMetadataSupplier Supplier for metadata from the 
previous completed commit (lazily evaluated)
+   * @throws HoodieValidationException if any validator fails with FAIL policy
+   */
+  public static void runValidators(Configuration conf,
+                                   String instant,
+                                   List<WriteStatus> allWriteStatus,
+                                   Map<String, String> 
checkpointCommitMetadata,
+                                   Supplier<Option<HoodieCommitMetadata>> 
previousCommitMetadataSupplier) {
+    String validatorClassNames = conf.getString(
+        HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
+        HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue());
+
+    if (StringUtils.isNullOrEmpty(validatorClassNames)) {
+      return;
+    }
+
+    // Fetch previous commit metadata only when validators are configured
+    Option<HoodieCommitMetadata> previousCommitMetadata = 
previousCommitMetadataSupplier.get();
+
+    // Build ValidationContext from available data
+    HoodieCommitMetadata currentMetadata = buildCommitMetadata(allWriteStatus, 
checkpointCommitMetadata);
+    List<HoodieWriteStat> writeStats = allWriteStatus.stream()
+        .map(WriteStatus::getStat)
+        .collect(Collectors.toList());
+
+    ValidationContext context = new FlinkValidationContext(
+        instant,
+        Option.of(currentMetadata),
+        Option.of(writeStats),
+        previousCommitMetadata);
+
+    // Build config properties for validators
+    TypedProperties props = TypedProperties.fromMap(conf.toMap());
+
+    // Instantiate and run each validator
+    List<String> classNames = Arrays.stream(validatorClassNames.split(","))
+        .map(String::trim)
+        .filter(s -> !s.isEmpty())
+        .collect(Collectors.toList());
+
+    for (String className : classNames) {
+      try {
+        BasePreCommitValidator validator = (BasePreCommitValidator)
+            ReflectionUtils.loadClass(className, new Class<?>[] 
{TypedProperties.class}, props);
+        log.info("Running pre-commit validator: {} for instant: {}", 
className, instant);
+        validator.validateWithMetadata(context);
+        log.info("Pre-commit validator {} passed for instant: {}", className, 
instant);
+      } catch (HoodieValidationException e) {
+        log.error("Pre-commit validator {} failed for instant: {}", className, 
instant, e);
+        throw e;
+      } catch (Exception e) {
+        log.error("Failed to instantiate or run validator: {}", className, e);
+        throw new HoodieValidationException(
+            "Failed to run pre-commit validator: " + className, e);
+      }
+    }
+  }
+
+  /**
+   * Build HoodieCommitMetadata from write statuses and extra metadata.
+   * This constructs the metadata object that would be committed, giving
+   * validators access to the same data.
+   */
+  private static HoodieCommitMetadata buildCommitMetadata(
+      List<WriteStatus> writeStatuses, Map<String, String> extraMetadata) {
+    HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+
+    // Add write stats
+    for (WriteStatus status : writeStatuses) {
+      HoodieWriteStat stat = status.getStat();
+      if (stat != null) {
+        metadata.addWriteStat(stat.getPartitionPath(), stat);
+      }
+    }
+
+    // Add extra metadata (includes checkpoint info like HoodieMetadataKey)
+    if (extraMetadata != null) {
+      extraMetadata.forEach(metadata::addMetadata);
+    }
+
+    return metadata;
+  }
+
+  private FlinkValidatorUtils() {
+    // Utility class
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index 503910d970c8..ed5cbbc64ec8 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.config.RecordMergeMode;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.engine.EngineType;
 import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieTableType;
@@ -982,4 +983,27 @@ public class StreamerUtil {
 
     return offsets;
   }
+
+  /**
+   * Get commit metadata from the last completed write commit on the timeline.
+   * Used for pre-commit validation to compare current commit against previous.
+   *
+   * @param metaClient the table meta client
+   * @return the previous write commit metadata, or empty if none exists
+   */
+  public static Option<HoodieCommitMetadata> 
getPreviousCommitMetadata(HoodieTableMetaClient metaClient) {
+    try {
+      HoodieTimeline completedWriteTimeline = metaClient.reloadActiveTimeline()
+          .getWriteTimeline()
+          .filterCompletedInstants();
+      Option<HoodieInstant> lastInstant = completedWriteTimeline.lastInstant();
+      if (lastInstant.isPresent()) {
+        return 
Option.of(completedWriteTimeline.readCommitMetadata(lastInstant.get()));
+      }
+    } catch (Exception e) {
+      log.warn("Failed to read previous commit metadata for pre-commit 
validation. "
+          + "Validation will skip previous-commit checks.", e);
+    }
+    return Option.empty();
+  }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/validator/TestFlinkKafkaCheckpointParsing.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/validator/TestFlinkKafkaCheckpointParsing.java
new file mode 100644
index 000000000000..24140289114a
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/validator/TestFlinkKafkaCheckpointParsing.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sink.validator;
+
+import org.apache.hudi.common.util.CheckpointUtils;
+import org.apache.hudi.common.util.CheckpointUtils.CheckpointFormat;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for Flink Kafka checkpoint parsing in {@link CheckpointUtils}.
+ */
+public class TestFlinkKafkaCheckpointParsing {
+
+  @Test
+  public void testParseSinglePartition() {
+    String checkpoint = "kafka_metadata%3Aevents%3A0:1000";
+    Map<Integer, Long> offsets = 
CheckpointUtils.parseCheckpoint(CheckpointFormat.FLINK_KAFKA, checkpoint);
+    assertEquals(1, offsets.size());
+    assertEquals(1000L, offsets.get(0));
+  }
+
+  @Test
+  public void testParseMultiplePartitions() {
+    String checkpoint = 
"kafka_metadata%3Aevents%3A0:100;kafka_metadata%3Aevents%3A1:200;kafka_metadata%3Aevents%3A2:300";
+    Map<Integer, Long> offsets = 
CheckpointUtils.parseCheckpoint(CheckpointFormat.FLINK_KAFKA, checkpoint);
+    assertEquals(3, offsets.size());
+    assertEquals(100L, offsets.get(0));
+    assertEquals(200L, offsets.get(1));
+    assertEquals(300L, offsets.get(2));
+  }
+
+  @Test
+  public void testParseWithClusterMetadata() {
+    // Cluster metadata entry should be skipped (not a partition offset)
+    String checkpoint = 
"kafka_metadata%3Aevents%3A0:100;kafka_metadata%3Aevents%3A1:200"
+        + ";kafka_metadata%3Akafka_cluster%3Aevents%3A:my-cluster";
+    Map<Integer, Long> offsets = 
CheckpointUtils.parseCheckpoint(CheckpointFormat.FLINK_KAFKA, checkpoint);
+    assertEquals(2, offsets.size());
+    assertEquals(100L, offsets.get(0));
+    assertEquals(200L, offsets.get(1));
+  }
+
+  @Test
+  public void testCalculateOffsetDifference() {
+    String prev = 
"kafka_metadata%3Aevents%3A0:100;kafka_metadata%3Aevents%3A1:200";
+    String curr = 
"kafka_metadata%3Aevents%3A0:300;kafka_metadata%3Aevents%3A1:500";
+    // Diff = (300-100) + (500-200) = 200 + 300 = 500
+    long diff = 
CheckpointUtils.calculateOffsetDifference(CheckpointFormat.FLINK_KAFKA, prev, 
curr);
+    assertEquals(500L, diff);
+  }
+
+  @Test
+  public void testCalculateOffsetDifferenceWithClusterMetadata() {
+    String prev = 
"kafka_metadata%3Aevents%3A0:100;kafka_metadata%3Akafka_cluster%3Aevents%3A:cluster1";
+    String curr = 
"kafka_metadata%3Aevents%3A0:200;kafka_metadata%3Akafka_cluster%3Aevents%3A:cluster1";
+    long diff = 
CheckpointUtils.calculateOffsetDifference(CheckpointFormat.FLINK_KAFKA, prev, 
curr);
+    assertEquals(100L, diff);
+  }
+
+  @Test
+  public void testIsValidFormat() {
+    assertTrue(CheckpointUtils.isValidCheckpointFormat(
+        CheckpointFormat.FLINK_KAFKA, "kafka_metadata%3Aevents%3A0:100"));
+    assertFalse(CheckpointUtils.isValidCheckpointFormat(
+        CheckpointFormat.FLINK_KAFKA, ""));
+    assertFalse(CheckpointUtils.isValidCheckpointFormat(
+        CheckpointFormat.FLINK_KAFKA, null));
+    // "invalid-format" is parsed without error (entries silently skipped), so 
it's "valid" format
+    assertTrue(CheckpointUtils.isValidCheckpointFormat(
+        CheckpointFormat.FLINK_KAFKA, "invalid-format"));
+  }
+
+  @Test
+  public void testParseWithNoOffsets() {
+    // Only cluster metadata, no partition offsets
+    String checkpoint = 
"kafka_metadata%3Akafka_cluster%3Aevents%3A:my-cluster";
+    Map<Integer, Long> offsets = 
CheckpointUtils.parseCheckpoint(CheckpointFormat.FLINK_KAFKA, checkpoint);
+    assertEquals(0, offsets.size());
+  }
+
+  @Test
+  public void testOffsetResetSkippedInDiff() {
+    // Partition 0 offset goes backward (reset). Should be skipped, only 
partition 1 counted.
+    String prev = 
"kafka_metadata%3Aevents%3A0:500;kafka_metadata%3Aevents%3A1:100";
+    String curr = 
"kafka_metadata%3Aevents%3A0:200;kafka_metadata%3Aevents%3A1:300";
+    // Partition 0: 200-500 = -300 (skipped). Partition 1: 300-100 = 200.
+    long diff = 
CheckpointUtils.calculateOffsetDifference(CheckpointFormat.FLINK_KAFKA, prev, 
curr);
+    assertEquals(200L, diff);
+  }
+
+  @Test
+  public void testParseInvalidFormatReturnsEmpty() {
+    // Missing %3A separators — entry is silently skipped (consistent with 
production parser)
+    Map<Integer, Long> offsets = CheckpointUtils.parseCheckpoint(
+        CheckpointFormat.FLINK_KAFKA, "invalid-no-separators");
+    assertEquals(0, offsets.size());
+  }
+
+  @Test
+  public void testParseNullThrows() {
+    assertThrows(IllegalArgumentException.class,
+        () -> CheckpointUtils.parseCheckpoint(CheckpointFormat.FLINK_KAFKA, 
null));
+  }
+
+  @Test
+  public void testParseEmptyStringThrows() {
+    assertThrows(IllegalArgumentException.class,
+        () -> CheckpointUtils.parseCheckpoint(CheckpointFormat.FLINK_KAFKA, 
""));
+  }
+
+  @Test
+  public void testParseWithEmptySemicolonEntries() {
+    // Extra semicolons should be handled gracefully
+    String checkpoint = 
";kafka_metadata%3Aevents%3A0:100;;kafka_metadata%3Aevents%3A1:200;";
+    Map<Integer, Long> offsets = 
CheckpointUtils.parseCheckpoint(CheckpointFormat.FLINK_KAFKA, checkpoint);
+    assertEquals(2, offsets.size());
+    assertEquals(100L, offsets.get(0));
+    assertEquals(200L, offsets.get(1));
+  }
+
+  @Test
+  public void testZeroOffsetDifference() {
+    String checkpoint = 
"kafka_metadata%3Aevents%3A0:100;kafka_metadata%3Aevents%3A1:200";
+    long diff = 
CheckpointUtils.calculateOffsetDifference(CheckpointFormat.FLINK_KAFKA, 
checkpoint, checkpoint);
+    assertEquals(0L, diff);
+  }
+
+  @Test
+  public void testLargeOffsets() {
+    // Test with large offset values to ensure no overflow
+    String prev = "kafka_metadata%3Aevents%3A0:" + Long.MAX_VALUE / 2;
+    String curr = "kafka_metadata%3Aevents%3A0:" + (Long.MAX_VALUE / 2 + 1000);
+    long diff = 
CheckpointUtils.calculateOffsetDifference(CheckpointFormat.FLINK_KAFKA, prev, 
curr);
+    assertEquals(1000L, diff);
+  }
+
+  @Test
+  public void testNewPartitionSkippedInDiff() {
+    // Previous has partition 0, current has 0 and 1 (new partition)
+    String prev = "kafka_metadata%3Aevents%3A0:100";
+    String curr = 
"kafka_metadata%3Aevents%3A0:200;kafka_metadata%3Aevents%3A1:50";
+    // Only partition 0 diff counted: 200-100 = 100. Partition 1 skipped (new).
+    long diff = 
CheckpointUtils.calculateOffsetDifference(CheckpointFormat.FLINK_KAFKA, prev, 
curr);
+    assertEquals(100L, diff);
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/validator/TestFlinkKafkaOffsetValidator.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/validator/TestFlinkKafkaOffsetValidator.java
new file mode 100644
index 000000000000..421e8ec79dd2
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/validator/TestFlinkKafkaOffsetValidator.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sink.validator;
+
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+/**
+ * Tests for {@link FlinkKafkaOffsetValidator}.
+ */
+public class TestFlinkKafkaOffsetValidator {
+
+  // ========== Helper methods ==========
+
+  private static TypedProperties defaultConfig() {
+    TypedProperties props = new TypedProperties();
+    
props.setProperty(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
 "0.0");
+    
props.setProperty(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(),
 "FAIL");
+    return props;
+  }
+
+  private static TypedProperties configWithTolerance(double tolerance) {
+    TypedProperties props = defaultConfig();
+    
props.setProperty(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
+        String.valueOf(tolerance));
+    return props;
+  }
+
+  private static TypedProperties configWithWarnPolicy() {
+    TypedProperties props = defaultConfig();
+    
props.setProperty(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(),
 "WARN_LOG");
+    return props;
+  }
+
+  /**
+   * Build a Flink Kafka checkpoint string.
+   * Format: kafka_metadata%3Atopic%3Apartition:offset;...
+   */
+  private static String buildFlinkKafkaCheckpoint(String topic, int[] 
partitions, long[] offsets) {
+    StringBuilder sb = new StringBuilder();
+    for (int i = 0; i < partitions.length; i++) {
+      if (i > 0) {
+        sb.append(";");
+      }
+      sb.append("kafka_metadata%3A").append(topic).append("%3A")
+          .append(partitions[i]).append(":").append(offsets[i]);
+    }
+    return sb.toString();
+  }
+
+  private static HoodieCommitMetadata buildMetadata(String checkpointValue) {
+    HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+    if (checkpointValue != null) {
+      metadata.addMetadata(StreamerUtil.HOODIE_METADATA_KEY, checkpointValue);
+    }
+    return metadata;
+  }
+
+  private static List<HoodieWriteStat> buildWriteStats(long numInserts, long 
numUpdates) {
+    HoodieWriteStat stat = new HoodieWriteStat();
+    stat.setNumInserts(numInserts);
+    stat.setNumUpdateWrites(numUpdates);
+    stat.setPartitionPath("partition1");
+    return Collections.singletonList(stat);
+  }
+
+  private static FlinkValidationContext buildContext(
+      String instantTime,
+      HoodieCommitMetadata currentMetadata,
+      List<HoodieWriteStat> writeStats,
+      HoodieCommitMetadata previousMetadata) {
+    return new FlinkValidationContext(
+        instantTime,
+        Option.of(currentMetadata),
+        Option.of(writeStats),
+        previousMetadata != null ? Option.of(previousMetadata) : 
Option.empty());
+  }
+
+  // ========== Tests ==========
+
+  @Test
+  public void testExactMatchPasses() {
+    // Previous: partition 0 at offset 100, partition 1 at offset 200
+    // Current: partition 0 at offset 200, partition 1 at offset 300
+    // Diff = (200-100) + (300-200) = 200. Records written = 200.
+    String prevCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0, 
1}, new long[]{100, 200});
+    String currCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0, 
1}, new long[]{200, 300});
+
+    FlinkValidationContext ctx = buildContext("20260320120000000",
+        buildMetadata(currCheckpoint),
+        buildWriteStats(200, 0),
+        buildMetadata(prevCheckpoint));
+
+    FlinkKafkaOffsetValidator validator = new 
FlinkKafkaOffsetValidator(defaultConfig());
+    assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+  }
+
+  @Test
+  public void testDataLossDetected() {
+    // Diff = 1000 but only 500 records written -> 50% deviation
+    String prevCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0}, 
new long[]{0});
+    String currCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0}, 
new long[]{1000});
+
+    FlinkValidationContext ctx = buildContext("20260320120000000",
+        buildMetadata(currCheckpoint),
+        buildWriteStats(500, 0),
+        buildMetadata(prevCheckpoint));
+
+    FlinkKafkaOffsetValidator validator = new 
FlinkKafkaOffsetValidator(defaultConfig());
+    assertThrows(HoodieValidationException.class, () -> 
validator.validateWithMetadata(ctx));
+  }
+
+  @Test
+  public void testWithinTolerancePasses() {
+    // Diff = 1000, records = 950 -> 5% deviation, tolerance = 10%
+    String prevCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0}, 
new long[]{0});
+    String currCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0}, 
new long[]{1000});
+
+    FlinkValidationContext ctx = buildContext("20260320120000000",
+        buildMetadata(currCheckpoint),
+        buildWriteStats(950, 0),
+        buildMetadata(prevCheckpoint));
+
+    FlinkKafkaOffsetValidator validator = new 
FlinkKafkaOffsetValidator(configWithTolerance(10.0));
+    assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+  }
+
+  @Test
+  public void testWarnPolicyDoesNotThrow() {
+    // Data loss but WARN_LOG policy
+    String prevCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0}, 
new long[]{0});
+    String currCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0}, 
new long[]{1000});
+
+    FlinkValidationContext ctx = buildContext("20260320120000000",
+        buildMetadata(currCheckpoint),
+        buildWriteStats(0, 0),
+        buildMetadata(prevCheckpoint));
+
+    FlinkKafkaOffsetValidator validator = new 
FlinkKafkaOffsetValidator(configWithWarnPolicy());
+    // Should warn but not throw
+    assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+  }
+
+  @Test
+  public void testSkipsFirstCommit() {
+    String currCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0}, 
new long[]{1000});
+
+    // No previous commit
+    FlinkValidationContext ctx = new FlinkValidationContext(
+        "20260320120000000",
+        Option.of(buildMetadata(currCheckpoint)),
+        Option.of(buildWriteStats(500, 0)),
+        Option.empty());
+
+    FlinkKafkaOffsetValidator validator = new 
FlinkKafkaOffsetValidator(defaultConfig());
+    assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+  }
+
+  @Test
+  public void testSkipsWhenNoCheckpointKey() {
+    // Current metadata has no HoodieMetadataKey
+    HoodieCommitMetadata currentMeta = new HoodieCommitMetadata();
+    String prevCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0}, 
new long[]{100});
+
+    FlinkValidationContext ctx = buildContext("20260320120000000",
+        currentMeta,
+        buildWriteStats(500, 0),
+        buildMetadata(prevCheckpoint));
+
+    FlinkKafkaOffsetValidator validator = new 
FlinkKafkaOffsetValidator(defaultConfig());
+    assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+  }
+
+  @Test
+  public void testMultiPartitionValidation() {
+    // 4 partitions, each advancing by 250 = total diff 1000
+    String prevCheckpoint = buildFlinkKafkaCheckpoint("events",
+        new int[]{0, 1, 2, 3}, new long[]{0, 0, 0, 0});
+    String currCheckpoint = buildFlinkKafkaCheckpoint("events",
+        new int[]{0, 1, 2, 3}, new long[]{250, 250, 250, 250});
+
+    FlinkValidationContext ctx = buildContext("20260320120000000",
+        buildMetadata(currCheckpoint),
+        buildWriteStats(800, 200),  // 800 inserts + 200 updates = 1000
+        buildMetadata(prevCheckpoint));
+
+    FlinkKafkaOffsetValidator validator = new 
FlinkKafkaOffsetValidator(defaultConfig());
+    assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+  }
+
+  @Test
+  public void testEmptyCommitSkipsValidation() {
+    // Both offsets same and no records written
+    String checkpoint = buildFlinkKafkaCheckpoint("events", new int[]{0}, new 
long[]{100});
+
+    FlinkValidationContext ctx = buildContext("20260320120000000",
+        buildMetadata(checkpoint),
+        buildWriteStats(0, 0),
+        buildMetadata(checkpoint));
+
+    FlinkKafkaOffsetValidator validator = new 
FlinkKafkaOffsetValidator(defaultConfig());
+    assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+  }
+
+  @Test
+  public void testCheckpointWithClusterMetadataIgnored() {
+    // Checkpoint with cluster metadata appended should still parse correctly
+    String prevCheckpoint = 
"kafka_metadata%3Aevents%3A0:100;kafka_metadata%3Akafka_cluster%3Aevents%3A:my-cluster";
+    String currCheckpoint = 
"kafka_metadata%3Aevents%3A0:200;kafka_metadata%3Akafka_cluster%3Aevents%3A:my-cluster";
+
+    FlinkValidationContext ctx = buildContext("20260320120000000",
+        buildMetadata(currCheckpoint),
+        buildWriteStats(100, 0),
+        buildMetadata(prevCheckpoint));
+
+    FlinkKafkaOffsetValidator validator = new 
FlinkKafkaOffsetValidator(defaultConfig());
+    assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+  }
+
+  @Test
+  public void testPreviousCheckpointMissingSkipsValidation() {
+    // Previous metadata exists but has no HoodieMetadataKey
+    HoodieCommitMetadata prevMeta = new HoodieCommitMetadata();
+    // No checkpoint key set on previous
+
+    String currCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0}, 
new long[]{1000});
+
+    FlinkValidationContext ctx = buildContext("20260320120000000",
+        buildMetadata(currCheckpoint),
+        buildWriteStats(500, 0),
+        prevMeta);
+
+    FlinkKafkaOffsetValidator validator = new 
FlinkKafkaOffsetValidator(defaultConfig());
+    // Should skip — no previous checkpoint to compare
+    assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+  }
+
+  @Test
+  public void testOvercountingDetected() {
+    // More records written than offset diff should also trigger deviation
+    // Diff = 100, records = 200 -> |100-200|/100 = 100% deviation
+    String prevCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0}, 
new long[]{0});
+    String currCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0}, 
new long[]{100});
+
+    FlinkValidationContext ctx = buildContext("20260320120000000",
+        buildMetadata(currCheckpoint),
+        buildWriteStats(200, 0),
+        buildMetadata(prevCheckpoint));
+
+    FlinkKafkaOffsetValidator validator = new 
FlinkKafkaOffsetValidator(defaultConfig());
+    assertThrows(HoodieValidationException.class, () -> 
validator.validateWithMetadata(ctx));
+  }
+
+  @Test
+  public void testExactToleranceBoundaryPasses() {
+    // Diff = 1000, records = 900 -> 10% deviation, tolerance = 10% (exactly 
at boundary)
+    String prevCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0}, 
new long[]{0});
+    String currCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0}, 
new long[]{1000});
+
+    FlinkValidationContext ctx = buildContext("20260320120000000",
+        buildMetadata(currCheckpoint),
+        buildWriteStats(900, 0),
+        buildMetadata(prevCheckpoint));
+
+    FlinkKafkaOffsetValidator validator = new 
FlinkKafkaOffsetValidator(configWithTolerance(10.0));
+    assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+  }
+
+  @Test
+  public void testJustOverToleranceFails() {
+    // Diff = 1000, records = 899 -> 10.1% deviation, tolerance = 10%
+    String prevCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0}, 
new long[]{0});
+    String currCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0}, 
new long[]{1000});
+
+    FlinkValidationContext ctx = buildContext("20260320120000000",
+        buildMetadata(currCheckpoint),
+        buildWriteStats(899, 0),
+        buildMetadata(prevCheckpoint));
+
+    FlinkKafkaOffsetValidator validator = new 
FlinkKafkaOffsetValidator(configWithTolerance(10.0));
+    assertThrows(HoodieValidationException.class, () -> 
validator.validateWithMetadata(ctx));
+  }
+
+  @Test
+  public void testOnlyInsertsNoUpdates() {
+    // Pure insert workload
+    String prevCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0, 
1}, new long[]{0, 0});
+    String currCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0, 
1}, new long[]{500, 500});
+
+    FlinkValidationContext ctx = buildContext("20260320120000000",
+        buildMetadata(currCheckpoint),
+        buildWriteStats(1000, 0),
+        buildMetadata(prevCheckpoint));
+
+    FlinkKafkaOffsetValidator validator = new 
FlinkKafkaOffsetValidator(defaultConfig());
+    assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+  }
+
+  @Test
+  public void testUpdatesCountedInRecordTotal() {
+    // Diff = 1000. 600 inserts + 400 updates = 1000 total
+    String prevCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0}, 
new long[]{0});
+    String currCheckpoint = buildFlinkKafkaCheckpoint("events", new int[]{0}, 
new long[]{1000});
+
+    FlinkValidationContext ctx = buildContext("20260320120000000",
+        buildMetadata(currCheckpoint),
+        buildWriteStats(600, 400),
+        buildMetadata(prevCheckpoint));
+
+    FlinkKafkaOffsetValidator validator = new 
FlinkKafkaOffsetValidator(defaultConfig());
+    assertDoesNotThrow(() -> validator.validateWithMetadata(ctx));
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/validator/TestFlinkValidationContext.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/validator/TestFlinkValidationContext.java
new file mode 100644
index 000000000000..b6fb31f0c873
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/validator/TestFlinkValidationContext.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sink.validator;
+
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.Option;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link FlinkValidationContext}.
+ */
+public class TestFlinkValidationContext {
+
+  @Test
+  public void testBasicProperties() {
+    HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+    metadata.addMetadata("key1", "value1");
+
+    HoodieWriteStat stat = new HoodieWriteStat();
+    stat.setNumInserts(100);
+    stat.setNumUpdateWrites(50);
+    List<HoodieWriteStat> stats = Collections.singletonList(stat);
+
+    FlinkValidationContext ctx = new FlinkValidationContext(
+        "20260320120000000",
+        Option.of(metadata),
+        Option.of(stats),
+        Option.empty());
+
+    assertEquals("20260320120000000", ctx.getInstantTime());
+    assertTrue(ctx.getCommitMetadata().isPresent());
+    assertTrue(ctx.getWriteStats().isPresent());
+    assertEquals(1, ctx.getWriteStats().get().size());
+  }
+
+  @Test
+  public void testTotalRecordsWritten() {
+    HoodieWriteStat stat1 = new HoodieWriteStat();
+    stat1.setNumInserts(100);
+    stat1.setNumUpdateWrites(50);
+
+    HoodieWriteStat stat2 = new HoodieWriteStat();
+    stat2.setNumInserts(200);
+    stat2.setNumUpdateWrites(30);
+
+    List<HoodieWriteStat> stats = Arrays.asList(stat1, stat2);
+
+    FlinkValidationContext ctx = new FlinkValidationContext(
+        "20260320120000000",
+        Option.of(new HoodieCommitMetadata()),
+        Option.of(stats),
+        Option.empty());
+
+    assertEquals(380, ctx.getTotalRecordsWritten());
+    assertEquals(300, ctx.getTotalInsertRecordsWritten());
+    assertEquals(80, ctx.getTotalUpdateRecordsWritten());
+  }
+
+  @Test
+  public void testIsFirstCommit() {
+    FlinkValidationContext ctxFirst = new FlinkValidationContext(
+        "20260320120000000",
+        Option.of(new HoodieCommitMetadata()),
+        Option.of(Collections.emptyList()),
+        Option.empty());
+
+    assertTrue(ctxFirst.isFirstCommit());
+
+    FlinkValidationContext ctxNotFirst = new FlinkValidationContext(
+        "20260320120000000",
+        Option.of(new HoodieCommitMetadata()),
+        Option.of(Collections.emptyList()),
+        Option.of(new HoodieCommitMetadata()));
+
+    assertFalse(ctxNotFirst.isFirstCommit());
+  }
+
+  @Test
+  public void testExtraMetadata() {
+    HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+    metadata.addMetadata("checkpoint", "topic,0:100");
+    metadata.addMetadata("other_key", "other_value");
+
+    FlinkValidationContext ctx = new FlinkValidationContext(
+        "20260320120000000",
+        Option.of(metadata),
+        Option.of(Collections.emptyList()),
+        Option.empty());
+
+    assertEquals(2, ctx.getExtraMetadata().size());
+    assertTrue(ctx.getExtraMetadata("checkpoint").isPresent());
+    assertEquals("topic,0:100", ctx.getExtraMetadata("checkpoint").get());
+    assertFalse(ctx.getExtraMetadata("nonexistent").isPresent());
+  }
+
+  @Test
+  public void testPreviousCommitMetadata() {
+    HoodieCommitMetadata prevMeta = new HoodieCommitMetadata();
+    prevMeta.addMetadata("checkpoint", "topic,0:50");
+
+    FlinkValidationContext ctx = new FlinkValidationContext(
+        "20260320120000000",
+        Option.of(new HoodieCommitMetadata()),
+        Option.of(Collections.emptyList()),
+        Option.of(prevMeta));
+
+    assertTrue(ctx.getPreviousCommitMetadata().isPresent());
+    assertEquals("topic,0:50", 
ctx.getPreviousCommitMetadata().get().getMetadata("checkpoint"));
+  }
+
+  @Test
+  public void testGetActiveTimelineThrows() {
+    FlinkValidationContext ctx = new FlinkValidationContext(
+        "20260320120000000",
+        Option.of(new HoodieCommitMetadata()),
+        Option.of(Collections.emptyList()),
+        Option.empty());
+
+    assertThrows(UnsupportedOperationException.class, ctx::getActiveTimeline);
+  }
+
+  @Test
+  public void testGetPreviousCommitInstantThrows() {
+    FlinkValidationContext ctx = new FlinkValidationContext(
+        "20260320120000000",
+        Option.of(new HoodieCommitMetadata()),
+        Option.of(Collections.emptyList()),
+        Option.of(new HoodieCommitMetadata()));
+
+    assertThrows(UnsupportedOperationException.class, 
ctx::getPreviousCommitInstant);
+  }
+
+  @Test
+  public void testMultipleWriteStatsAggregation() {
+    HoodieWriteStat stat1 = new HoodieWriteStat();
+    stat1.setNumInserts(10);
+    stat1.setNumUpdateWrites(5);
+
+    HoodieWriteStat stat2 = new HoodieWriteStat();
+    stat2.setNumInserts(20);
+    stat2.setNumUpdateWrites(15);
+
+    HoodieWriteStat stat3 = new HoodieWriteStat();
+    stat3.setNumInserts(30);
+    stat3.setNumUpdateWrites(25);
+
+    List<HoodieWriteStat> stats = Arrays.asList(stat1, stat2, stat3);
+
+    FlinkValidationContext ctx = new FlinkValidationContext(
+        "20260320120000000",
+        Option.of(new HoodieCommitMetadata()),
+        Option.of(stats),
+        Option.empty());
+
+    assertEquals(60, ctx.getTotalInsertRecordsWritten());
+    assertEquals(45, ctx.getTotalUpdateRecordsWritten());
+    assertEquals(105, ctx.getTotalRecordsWritten());
+  }
+
+  @Test
+  public void testEmptyWriteStats() {
+    FlinkValidationContext ctx = new FlinkValidationContext(
+        "20260320120000000",
+        Option.of(new HoodieCommitMetadata()),
+        Option.empty(),
+        Option.empty());
+
+    assertEquals(0, ctx.getTotalRecordsWritten());
+    assertEquals(0, ctx.getTotalInsertRecordsWritten());
+    assertEquals(0, ctx.getTotalUpdateRecordsWritten());
+  }
+}
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/validator/TestFlinkValidatorUtils.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/validator/TestFlinkValidatorUtils.java
new file mode 100644
index 000000000000..dea583597183
--- /dev/null
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/validator/TestFlinkValidatorUtils.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.sink.validator;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.util.StreamerUtil;
+
+import org.apache.flink.configuration.Configuration;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link FlinkValidatorUtils}.
+ */
+public class TestFlinkValidatorUtils {
+
+  private static Configuration confWithValidator(String validatorClassName) {
+    Configuration conf = new Configuration();
+    conf.setString(HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(), 
validatorClassName);
+    return conf;
+  }
+
+  private static WriteStatus buildWriteStatus(String partitionPath, long 
numInserts, long numUpdates) {
+    HoodieWriteStat stat = new HoodieWriteStat();
+    stat.setPartitionPath(partitionPath);
+    stat.setNumInserts(numInserts);
+    stat.setNumUpdateWrites(numUpdates);
+
+    WriteStatus ws = new WriteStatus(false, 0.0);
+    ws.setStat(stat);
+    return ws;
+  }
+
+  // ========== Tests ==========
+
+  @Test
+  public void testNoValidatorsConfigured() {
+    Configuration conf = new Configuration();
+    // No validator class names set
+    List<WriteStatus> writeStatuses = 
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+    assertDoesNotThrow(() -> FlinkValidatorUtils.runValidators(
+        conf, "20260320120000000", writeStatuses,
+        new HashMap<>(), () -> Option.empty()));
+  }
+
+  @Test
+  public void testEmptyValidatorString() {
+    Configuration conf = new Configuration();
+    conf.setString(HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(), 
"");
+    List<WriteStatus> writeStatuses = 
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+    assertDoesNotThrow(() -> FlinkValidatorUtils.runValidators(
+        conf, "20260320120000000", writeStatuses,
+        new HashMap<>(), () -> Option.empty()));
+  }
+
+  @Test
+  public void testValidValidatorRunsSuccessfully() {
+    // FlinkKafkaOffsetValidator with first commit (no previous) should pass
+    Configuration conf = confWithValidator(
+        "org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator");
+    
conf.setString(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
 "0.0");
+    
conf.setString(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(), 
"FAIL");
+
+    List<WriteStatus> writeStatuses = 
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+    Map<String, String> extraMeta = new HashMap<>();
+    extraMeta.put(StreamerUtil.HOODIE_METADATA_KEY,
+        "kafka_metadata%3Aevents%3A0:100");
+
+    // First commit (no previous metadata) — validator should skip and pass
+    assertDoesNotThrow(() -> FlinkValidatorUtils.runValidators(
+        conf, "20260320120000000", writeStatuses, extraMeta, () -> 
Option.empty()));
+  }
+
+  @Test
+  public void testValidatorPassesWithMatchingCounts() {
+    Configuration conf = confWithValidator(
+        "org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator");
+    
conf.setString(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
 "0.0");
+    
conf.setString(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(), 
"FAIL");
+
+    // 100 records written, offset diff = 100
+    List<WriteStatus> writeStatuses = 
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+    Map<String, String> extraMeta = new HashMap<>();
+    extraMeta.put(StreamerUtil.HOODIE_METADATA_KEY,
+        "kafka_metadata%3Aevents%3A0:200");
+
+    HoodieCommitMetadata prevMeta = new HoodieCommitMetadata();
+    prevMeta.addMetadata(StreamerUtil.HOODIE_METADATA_KEY,
+        "kafka_metadata%3Aevents%3A0:100");
+
+    assertDoesNotThrow(() -> FlinkValidatorUtils.runValidators(
+        conf, "20260320120000000", writeStatuses, extraMeta, () -> 
Option.of(prevMeta)));
+  }
+
+  @Test
+  public void testValidatorFailsWithMismatchedCounts() {
+    Configuration conf = confWithValidator(
+        "org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator");
+    
conf.setString(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
 "0.0");
+    
conf.setString(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(), 
"FAIL");
+
+    // Only 10 records written but offset diff = 1000
+    List<WriteStatus> writeStatuses = 
Collections.singletonList(buildWriteStatus("p1", 10, 0));
+    Map<String, String> extraMeta = new HashMap<>();
+    extraMeta.put(StreamerUtil.HOODIE_METADATA_KEY,
+        "kafka_metadata%3Aevents%3A0:1000");
+
+    HoodieCommitMetadata prevMeta = new HoodieCommitMetadata();
+    prevMeta.addMetadata(StreamerUtil.HOODIE_METADATA_KEY,
+        "kafka_metadata%3Aevents%3A0:0");
+
+    assertThrows(HoodieValidationException.class,
+        () -> FlinkValidatorUtils.runValidators(
+            conf, "20260320120000000", writeStatuses, extraMeta, () -> 
Option.of(prevMeta)));
+  }
+
+  @Test
+  public void testInvalidValidatorClassThrows() {
+    Configuration conf = confWithValidator("com.nonexistent.FakeValidator");
+    List<WriteStatus> writeStatuses = 
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+    assertThrows(HoodieValidationException.class,
+        () -> FlinkValidatorUtils.runValidators(
+            conf, "20260320120000000", writeStatuses, new HashMap<>(), () -> 
Option.empty()));
+  }
+
+  @Test
+  public void testMultipleValidators() {
+    // Two validators comma-separated, both FlinkKafkaOffsetValidator (will 
both skip on first commit)
+    Configuration conf = confWithValidator(
+        "org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator,"
+            + "org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator");
+    
conf.setString(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
 "0.0");
+    
conf.setString(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(), 
"FAIL");
+
+    List<WriteStatus> writeStatuses = 
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+    Map<String, String> extraMeta = new HashMap<>();
+    extraMeta.put(StreamerUtil.HOODIE_METADATA_KEY,
+        "kafka_metadata%3Aevents%3A0:100");
+
+    assertDoesNotThrow(() -> FlinkValidatorUtils.runValidators(
+        conf, "20260320120000000", writeStatuses, extraMeta, () -> 
Option.empty()));
+  }
+
+  @Test
+  public void testValidatorWithWhitespaceInClassNames() {
+    // Class names with extra whitespace and trailing comma
+    Configuration conf = confWithValidator(
+        "  org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator  , ");
+    
conf.setString(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
 "0.0");
+    
conf.setString(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(), 
"FAIL");
+
+    List<WriteStatus> writeStatuses = 
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+    assertDoesNotThrow(() -> FlinkValidatorUtils.runValidators(
+        conf, "20260320120000000", writeStatuses, new HashMap<>(), () -> 
Option.empty()));
+  }
+
+  @Test
+  public void testBuildCommitMetadataWithNullExtraMetadata() {
+    Configuration conf = confWithValidator(
+        "org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator");
+    
conf.setString(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
 "0.0");
+    
conf.setString(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(), 
"FAIL");
+
+    List<WriteStatus> writeStatuses = 
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+    // null extraMetadata should be handled gracefully
+    assertDoesNotThrow(() -> FlinkValidatorUtils.runValidators(
+        conf, "20260320120000000", writeStatuses, null, () -> Option.empty()));
+  }
+
+  @Test
+  public void testMultipleWriteStatusesAggregated() {
+    Configuration conf = confWithValidator(
+        "org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator");
+    
conf.setString(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
 "0.0");
+    
conf.setString(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(), 
"FAIL");
+
+    // Two write statuses: 60 inserts + 40 inserts = 100 total
+    List<WriteStatus> writeStatuses = new ArrayList<>();
+    writeStatuses.add(buildWriteStatus("p1", 60, 0));
+    writeStatuses.add(buildWriteStatus("p2", 40, 0));
+
+    Map<String, String> extraMeta = new HashMap<>();
+    extraMeta.put(StreamerUtil.HOODIE_METADATA_KEY,
+        "kafka_metadata%3Aevents%3A0:200");
+
+    HoodieCommitMetadata prevMeta = new HoodieCommitMetadata();
+    prevMeta.addMetadata(StreamerUtil.HOODIE_METADATA_KEY,
+        "kafka_metadata%3Aevents%3A0:100");
+
+    // Offset diff = 100, records = 100 — should pass
+    assertDoesNotThrow(() -> FlinkValidatorUtils.runValidators(
+        conf, "20260320120000000", writeStatuses, extraMeta, () -> 
Option.of(prevMeta)));
+  }
+
+  @Test
+  public void testEmptyWriteStatuses() {
+    Configuration conf = confWithValidator(
+        "org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator");
+    
conf.setString(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
 "0.0");
+    
conf.setString(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(), 
"FAIL");
+
+    // Empty write statuses with matching empty offsets
+    List<WriteStatus> writeStatuses = Collections.emptyList();
+    Map<String, String> extraMeta = new HashMap<>();
+    String checkpoint = "kafka_metadata%3Aevents%3A0:100";
+    extraMeta.put(StreamerUtil.HOODIE_METADATA_KEY, checkpoint);
+
+    HoodieCommitMetadata prevMeta = new HoodieCommitMetadata();
+    prevMeta.addMetadata(StreamerUtil.HOODIE_METADATA_KEY, checkpoint);
+
+    // Both offset diff and records are 0 — should skip validation
+    assertDoesNotThrow(() -> FlinkValidatorUtils.runValidators(
+        conf, "20260320120000000", writeStatuses, extraMeta, () -> 
Option.of(prevMeta)));
+  }
+
+  @Test
+  public void testValidationExceptionPreservedAcrossValidators() {
+    // First validator: valid, second: invalid class
+    Configuration conf = confWithValidator(
+        "org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator,"
+            + "com.nonexistent.FakeValidator");
+    
conf.setString(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
 "0.0");
+    
conf.setString(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(), 
"FAIL");
+
+    List<WriteStatus> writeStatuses = 
Collections.singletonList(buildWriteStatus("p1", 100, 0));
+
+    HoodieValidationException ex = 
assertThrows(HoodieValidationException.class,
+        () -> FlinkValidatorUtils.runValidators(
+            conf, "20260320120000000", writeStatuses, new HashMap<>(), () -> 
Option.empty()));
+    assertTrue(ex.getMessage().contains("FakeValidator"));
+  }
+
+  @Test
+  public void testConfigPropertiesPassedToValidator() {
+    // Set a high tolerance so even with mismatch, it passes
+    Configuration conf = confWithValidator(
+        "org.apache.hudi.sink.validator.FlinkKafkaOffsetValidator");
+    
conf.setString(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
 "100.0");
+    
conf.setString(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(), 
"FAIL");
+
+    // 0 records written but 1000 offset diff — 100% deviation, but tolerance 
is 100%
+    List<WriteStatus> writeStatuses = 
Collections.singletonList(buildWriteStatus("p1", 0, 0));
+    Map<String, String> extraMeta = new HashMap<>();
+    extraMeta.put(StreamerUtil.HOODIE_METADATA_KEY,
+        "kafka_metadata%3Aevents%3A0:1000");
+
+    HoodieCommitMetadata prevMeta = new HoodieCommitMetadata();
+    prevMeta.addMetadata(StreamerUtil.HOODIE_METADATA_KEY,
+        "kafka_metadata%3Aevents%3A0:0");
+
+    assertDoesNotThrow(() -> FlinkValidatorUtils.runValidators(
+        conf, "20260320120000000", writeStatuses, extraMeta, () -> 
Option.of(prevMeta)));
+  }
+}

Reply via email to