This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 3dcf4c65b261 feat(client): Add pre-write validator framework (#18239)
3dcf4c65b261 is described below

commit 3dcf4c65b26179b50b70b1c59670823b74e03793
Author: Nada <[email protected]>
AuthorDate: Wed Mar 4 17:58:12 2026 -0500

    feat(client): Add pre-write validator framework (#18239)
    
    This PR introduces a pluggable pre-write validation framework that allows 
custom validators to run before write operations begin.
    
    For example, before allowing a write operation with a schema change, we may 
need to make a call to a schema service to verify if the schema update is 
permitted for the given dataset based on certain policies. This validation must 
happen before the write begins to prevent invalid schema changes from being 
committed.
    
    Related issue: #18008
    
    Currently, Hudi only supports pre-commit validators that run after data has 
been written but before commit. This PR adds an earlier validation hook at the 
pre-write stage, allowing failures to be detected before any write work begins.
    
    Summary and Changelog
    Summary: Users can now configure custom validators that run before write 
operations via the hoodie.prewrite.validators configuration property.
    
    Changelog:
    
    Added PreWriteValidator interface for implementing custom pre-write 
validators
    Added PreWriteValidatorUtils utility class to load and run configured 
validators
    Added HoodiePreWriteValidatorConfig configuration class with 
hoodie.prewrite.validators property
    Modified BaseHoodieWriteClient.preWrite() to invoke configured validators
    Added getPreWriteValidators() method to HoodieWriteConfig
    Added unit tests for PreWriteValidatorUtils
    
    Impact:
    New public API: PreWriteValidator interface that users can implement for 
custom validators
    New configuration: hoodie.prewrite.validators - comma-separated list of 
validator class names
    No breaking changes to existing functionality
    No performance impact when no validators are configured
---
 .../apache/hudi/client/BaseHoodieWriteClient.java  |  36 +-
 .../hudi/client/utils/PreWriteValidatorUtils.java  | 137 +++++++
 .../hudi/client/validator/PreWriteValidator.java   |  75 ++++
 .../hudi/config/HoodiePreWriteValidatorConfig.java |  83 ++++
 .../org/apache/hudi/config/HoodieWriteConfig.java  |   4 +
 .../client/utils/TestPreWriteValidatorUtils.java   | 434 +++++++++++++++++++++
 .../config/TestHoodiePreWriteValidatorConfig.java  | 202 ++++++++++
 .../apache/hudi/client/HoodieFlinkWriteClient.java |  15 +-
 .../apache/hudi/client/HoodieJavaWriteClient.java  |  14 +-
 .../apache/hudi/client/SparkRDDWriteClient.java    |  18 +-
 .../utils/TestSparkPreWriteValidatorUtils.java     | 194 +++++++++
 11 files changed, 1192 insertions(+), 20 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 1306e9c1b7ea..3ec602f6a8e8 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -31,14 +31,17 @@ import 
org.apache.hudi.callback.util.HoodieCommitCallbackFactory;
 import org.apache.hudi.client.embedded.EmbeddedTimelineService;
 import org.apache.hudi.client.heartbeat.HeartbeatUtils;
 import org.apache.hudi.client.transaction.TransactionManager;
+import org.apache.hudi.client.utils.PreWriteValidatorUtils;
 import org.apache.hudi.client.utils.TransactionUtils;
 import org.apache.hudi.common.HoodiePendingRollbackInfo;
 import org.apache.hudi.common.config.HoodieCommonConfig;
+import org.apache.hudi.common.data.HoodieData;
 import org.apache.hudi.common.engine.HoodieEngineContext;
 import org.apache.hudi.common.model.ActionType;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieFailedWritesCleaningPolicy;
 import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.HoodieWriteStat;
 import org.apache.hudi.common.model.TableServiceType;
@@ -548,13 +551,26 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
   public abstract O deletePrepped(I preppedRecords, final String instantTime);
 
   /**
-   * Common method containing steps to be performed before write 
(upsert/insert/...
+   * Common method containing steps to be performed before write 
(upsert/insert/...)
    * @param instantTime
    * @param writeOperationType
    * @param metaClient
    */
   public void preWrite(String instantTime, WriteOperationType 
writeOperationType,
                        HoodieTableMetaClient metaClient) {
+    preWrite(instantTime, writeOperationType, metaClient, Option.empty());
+  }
+
+  /**
+   * Common method containing steps to be performed before write 
(upsert/insert/...) with records.
+   * @param instantTime
+   * @param writeOperationType
+   * @param metaClient
+   * @param recordsOpt Option of HoodieData of records to be written, empty 
for operations
+   *                   without input records (e.g., compact, cluster, delete)
+   */
+  public void preWrite(String instantTime, WriteOperationType 
writeOperationType,
+                       HoodieTableMetaClient metaClient, 
Option<HoodieData<HoodieRecord<T>>> recordsOpt) {
     setOperationType(writeOperationType);
     this.lastCompletedTxnAndMetadata = txnManager.isLockRequired()
         ? TransactionUtils.getLastCompletedTxnInstantAndMetadata(metaClient) : 
Option.empty();
@@ -564,6 +580,24 @@ public abstract class BaseHoodieWriteClient<T, I, K, O> 
extends BaseHoodieClient
     
tableServiceClient.setPendingInflightAndRequestedInstants(this.pendingInflightAndRequestedInstants);
     tableServiceClient.startAsyncCleanerService(this);
     tableServiceClient.startAsyncArchiveService(this);
+
+    runPreWriteValidators(instantTime, writeOperationType, metaClient, 
recordsOpt);
+  }
+
+  /**
+   * Runs configured pre-write validators.
+   * Pre-write validators are invoked before the write operation to validate
+   * conditions that must be met before proceeding with the write.
+   *
+   * @param instantTime        The instant time for the write operation
+   * @param writeOperationType The type of write operation being performed
+   * @param metaClient         The HoodieTableMetaClient for accessing table 
metadata
+   * @param recordsOpt         Option of HoodieData of records to be written, 
empty for operations
+   *                           without input records (e.g., compact, cluster, 
delete)
+   */
+  protected void runPreWriteValidators(String instantTime, WriteOperationType 
writeOperationType,
+      HoodieTableMetaClient metaClient, Option<HoodieData<HoodieRecord<T>>> 
recordsOpt) {
+    PreWriteValidatorUtils.runValidators(config, instantTime, 
writeOperationType, metaClient, context, recordsOpt);
   }
 
   /**
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/PreWriteValidatorUtils.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/PreWriteValidatorUtils.java
new file mode 100644
index 000000000000..65d1e66d6891
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/utils/PreWriteValidatorUtils.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.validator.PreWriteValidator;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.HoodieTimer;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+/**
+ * Utility class for running pre-write validators.
+ */
+public class PreWriteValidatorUtils {
+  private static final Logger LOG = 
LoggerFactory.getLogger(PreWriteValidatorUtils.class);
+
+  /**
+   * Run all configured pre-write validators.
+   * Validators are run asynchronously in parallel, and if any validator fails,
+   * a HoodieValidationException is thrown.
+   *
+   * @param config             The write configuration
+   * @param instantTime        The instant time for the write operation
+   * @param writeOperationType The type of write operation
+   * @param metaClient         The HoodieTableMetaClient
+   * @param engineContext      The Hoodie engine context
+   * @param recordsOpt         Option of HoodieData of records to be written, 
empty for operations
+   *                           without input records (e.g., compact, cluster, 
delete)
+   * @param <T>                The payload type of the records
+   * @throws HoodieValidationException if any validation fails
+   */
+  public static <T> void runValidators(HoodieWriteConfig config,
+                                       String instantTime,
+                                       WriteOperationType writeOperationType,
+                                       HoodieTableMetaClient metaClient,
+                                       HoodieEngineContext engineContext,
+                                       Option<HoodieData<HoodieRecord<T>>> 
recordsOpt) {
+    String validatorClassNames = config.getPreWriteValidators();
+
+    if (StringUtils.isNullOrEmpty(validatorClassNames)) {
+      LOG.debug("No pre-write validators configured.");
+      return;
+    }
+
+    HoodieTimer timer = HoodieTimer.start();
+    Stream<PreWriteValidator> validators = 
Arrays.stream(validatorClassNames.split(","))
+        .map(String::trim)
+        .filter(className -> !className.isEmpty())
+        .map(className -> (PreWriteValidator) 
ReflectionUtils.loadClass(className));
+
+    LOG.info("Running pre-write validators for instant {}", instantTime);
+
+    // Collect all futures first to ensure parallel execution
+    List<CompletableFuture<Boolean>> futures = validators
+        .map(validator -> runValidatorAsync(validator, instantTime, 
writeOperationType, metaClient, config, engineContext, recordsOpt))
+        .collect(Collectors.toList());
+
+    // Wait for all validators to complete
+    boolean allSuccess = futures.stream()
+        .map(CompletableFuture::join)
+        .reduce(true, Boolean::logicalAnd);
+
+    long duration = timer.endTimer();
+    LOG.info("Pre-write validation completed in {} ms", duration);
+
+    if (allSuccess) {
+      LOG.info("All pre-write validations succeeded");
+    } else {
+      LOG.error("At least one pre-write validation failed");
+      throw new HoodieValidationException("At least one pre-write validation 
failed");
+    }
+  }
+
+  /**
+   * Run a single validator asynchronously in a separate thread pool for 
parallelism.
+   *
+   * @return CompletableFuture that resolves to true if validation passed, 
false if validation failed
+   */
+  private static <T> CompletableFuture<Boolean> 
runValidatorAsync(PreWriteValidator validator,
+                                                                   String 
instantTime,
+                                                                   
WriteOperationType writeOperationType,
+                                                                   
HoodieTableMetaClient metaClient,
+                                                                   
HoodieWriteConfig writeConfig,
+                                                                   
HoodieEngineContext engineContext,
+                                                                   
Option<HoodieData<HoodieRecord<T>>> recordsOpt) {
+    return CompletableFuture.supplyAsync(() -> {
+      String validatorName = validator.getName();
+      LOG.info("Running pre-write validator: {}", validatorName);
+
+      try {
+        HoodieTimer timer = HoodieTimer.start();
+        validator.validate(instantTime, writeOperationType, metaClient, 
writeConfig, engineContext, recordsOpt);
+        long duration = timer.endTimer();
+        LOG.info("Pre-write validator {} completed successfully in {} ms", 
validatorName, duration);
+        return true;
+      } catch (HoodieValidationException e) {
+        LOG.error("Pre-write validation failed for validator {}: {}", 
validatorName, e.getMessage(), e);
+        return false;
+      } catch (Exception e) {
+        LOG.error("Unexpected error running pre-write validator {}: {}", 
validatorName, e.getMessage(), e);
+        return false;
+      }
+    });
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/PreWriteValidator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/PreWriteValidator.java
new file mode 100644
index 000000000000..41d5a2beef41
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/validator/PreWriteValidator.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.validator;
+
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+
+/**
+ * Interface for pre-write validators.
+ * Pre-write validators are invoked before the write operation begins,
+ * similar to pre-commit validators but at an earlier stage.
+ *
+ * <p>Implementations can perform various validations such as:
+ * <ul>
+ *   <li>Schema compatibility checks</li>
+ *   <li>Data quality validations</li>
+ *   <li>Permission validations</li>
+ *   <li>Resource availability checks</li>
+ * </ul>
+ */
+public interface PreWriteValidator {
+
+  /**
+   * Validates the state before a write operation.
+   * This method is called in the preWrite phase of the write client.
+   *
+   * @param instantTime        The instant time for the write operation
+   * @param writeOperationType The type of write operation being performed
+   * @param metaClient         The HoodieTableMetaClient for accessing table 
metadata
+   * @param writeConfig        The write configuration
+   * @param engineContext      The Hoodie engine context
+   * @param recordsOpt         Option of HoodieData of records to be written, 
empty for operations
+   *                           without input records (e.g., compact, cluster, 
delete)
+   * @param <T>                The payload type of the records
+   * @throws HoodieValidationException if validation fails and the write 
should not proceed
+   */
+  <T> void validate(String instantTime,
+                    WriteOperationType writeOperationType,
+                    HoodieTableMetaClient metaClient,
+                    HoodieWriteConfig writeConfig,
+                    HoodieEngineContext engineContext,
+                    Option<HoodieData<HoodieRecord<T>>> recordsOpt) throws 
HoodieValidationException;
+
+  /**
+   * Returns a descriptive name for this validator.
+   * Used for logging and metrics purposes.
+   *
+   * @return The validator name
+   */
+  default String getName() {
+    return getClass().getSimpleName();
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreWriteValidatorConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreWriteValidatorConfig.java
new file mode 100644
index 000000000000..cf3e8a6cbf28
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodiePreWriteValidatorConfig.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.ConfigClassProperty;
+import org.apache.hudi.common.config.ConfigGroups;
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.config.HoodieConfig;
+
+import javax.annotation.concurrent.Immutable;
+
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Pre-write validator configurations.
+ * These validators are invoked before the write operation begins.
+ */
+@Immutable
+@ConfigClassProperty(name = "PreWrite Validator Configurations",
+    groupName = ConfigGroups.Names.WRITE_CLIENT,
+    description = "The following set of configurations help validate data 
before writes.")
+public class HoodiePreWriteValidatorConfig extends HoodieConfig {
+
+  public static final ConfigProperty<String> VALIDATOR_CLASS_NAMES = 
ConfigProperty
+      .key("hoodie.prewrite.validators")
+      .defaultValue("")
+      .markAdvanced()
+      .withDocumentation("Comma separated list of class names that can be 
invoked to validate before write operations");
+
+  private HoodiePreWriteValidatorConfig() {
+    super();
+  }
+
+  public static HoodiePreWriteValidatorConfig.Builder newBuilder() {
+    return new Builder();
+  }
+
+  public static class Builder {
+
+    private final HoodiePreWriteValidatorConfig preWriteValidatorConfig = new 
HoodiePreWriteValidatorConfig();
+
+    public Builder fromFile(File propertiesFile) throws IOException {
+      try (FileReader reader = new FileReader(propertiesFile)) {
+        this.preWriteValidatorConfig.getProps().load(reader);
+        return this;
+      }
+    }
+
+    public Builder fromProperties(Properties props) {
+      this.preWriteValidatorConfig.getProps().putAll(props);
+      return this;
+    }
+
+    public Builder withPreWriteValidator(String preWriteValidators) {
+      preWriteValidatorConfig.setValue(VALIDATOR_CLASS_NAMES, 
preWriteValidators);
+      return this;
+    }
+
+    public HoodiePreWriteValidatorConfig build() {
+      
preWriteValidatorConfig.setDefaults(HoodiePreWriteValidatorConfig.class.getName());
+      return preWriteValidatorConfig;
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 7ce9936bf80c..144d62468c19 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -2810,6 +2810,10 @@ public class HoodieWriteConfig extends HoodieConfig {
     return getString(HoodiePreCommitValidatorConfig.INEQUALITY_SQL_QUERIES);
   }
 
+  public String getPreWriteValidators() {
+    return getString(HoodiePreWriteValidatorConfig.VALIDATOR_CLASS_NAMES);
+  }
+
   public boolean allowEmptyCommit() {
     return getBooleanOrDefault(ALLOW_EMPTY_COMMIT);
   }
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestPreWriteValidatorUtils.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestPreWriteValidatorUtils.java
new file mode 100644
index 000000000000..c9d85ebe709d
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/client/utils/TestPreWriteValidatorUtils.java
@@ -0,0 +1,434 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.validator.PreWriteValidator;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.junit.jupiter.api.Test;
+import org.mockito.Mockito;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link PreWriteValidatorUtils}.
+ */
+public class TestPreWriteValidatorUtils {
+
+  @Test
+  public void testRunValidatorsWithNoValidatorsConfigured() {
+    HoodieWriteConfig config = Mockito.mock(HoodieWriteConfig.class);
+    HoodieTableMetaClient metaClient = 
Mockito.mock(HoodieTableMetaClient.class);
+    HoodieEngineContext engineContext = 
Mockito.mock(HoodieEngineContext.class);
+
+    when(config.getPreWriteValidators()).thenReturn("");
+
+    assertDoesNotThrow(() ->
+        PreWriteValidatorUtils.runValidators(config, "001", 
WriteOperationType.INSERT, metaClient, engineContext, Option.empty()));
+  }
+
+  @Test
+  public void testRunValidatorsWithNullValidatorsConfigured() {
+    HoodieWriteConfig config = Mockito.mock(HoodieWriteConfig.class);
+    HoodieTableMetaClient metaClient = 
Mockito.mock(HoodieTableMetaClient.class);
+    HoodieEngineContext engineContext = 
Mockito.mock(HoodieEngineContext.class);
+
+    when(config.getPreWriteValidators()).thenReturn(null);
+
+    assertDoesNotThrow(() ->
+        PreWriteValidatorUtils.runValidators(config, "001", 
WriteOperationType.INSERT, metaClient, engineContext, Option.empty()));
+  }
+
+  @Test
+  public void testRunValidatorsWithPassingValidator() {
+    HoodieWriteConfig config = Mockito.mock(HoodieWriteConfig.class);
+    HoodieTableMetaClient metaClient = 
Mockito.mock(HoodieTableMetaClient.class);
+    HoodieEngineContext engineContext = 
Mockito.mock(HoodieEngineContext.class);
+
+    
when(config.getPreWriteValidators()).thenReturn(PassingPreWriteValidator.class.getName());
+
+    assertDoesNotThrow(() ->
+        PreWriteValidatorUtils.runValidators(config, "001", 
WriteOperationType.INSERT, metaClient, engineContext, Option.empty()));
+  }
+
+  @Test
+  public void testRunValidatorsWithFailingValidator() {
+    HoodieWriteConfig config = Mockito.mock(HoodieWriteConfig.class);
+    HoodieTableMetaClient metaClient = 
Mockito.mock(HoodieTableMetaClient.class);
+    HoodieEngineContext engineContext = 
Mockito.mock(HoodieEngineContext.class);
+
+    
when(config.getPreWriteValidators()).thenReturn(FailingPreWriteValidator.class.getName());
+
+    assertThrows(HoodieValidationException.class, () ->
+        PreWriteValidatorUtils.runValidators(config, "001", 
WriteOperationType.INSERT, metaClient, engineContext, Option.empty()));
+  }
+
+  @Test
+  public void testRunValidatorsWithMultipleValidators() {
+    HoodieWriteConfig config = Mockito.mock(HoodieWriteConfig.class);
+    HoodieTableMetaClient metaClient = 
Mockito.mock(HoodieTableMetaClient.class);
+    HoodieEngineContext engineContext = 
Mockito.mock(HoodieEngineContext.class);
+
+    String validators = PassingPreWriteValidator.class.getName() + "," + 
PassingPreWriteValidator.class.getName();
+    when(config.getPreWriteValidators()).thenReturn(validators);
+
+    assertDoesNotThrow(() ->
+        PreWriteValidatorUtils.runValidators(config, "001", 
WriteOperationType.INSERT, metaClient, engineContext, Option.empty()));
+  }
+
+  @Test
+  public void testRunValidatorsWithOnePassingOneFailing() {
+    HoodieWriteConfig config = Mockito.mock(HoodieWriteConfig.class);
+    HoodieTableMetaClient metaClient = 
Mockito.mock(HoodieTableMetaClient.class);
+    HoodieEngineContext engineContext = 
Mockito.mock(HoodieEngineContext.class);
+
+    String validators = PassingPreWriteValidator.class.getName() + "," + 
FailingPreWriteValidator.class.getName();
+    when(config.getPreWriteValidators()).thenReturn(validators);
+
+    assertThrows(HoodieValidationException.class, () ->
+        PreWriteValidatorUtils.runValidators(config, "001", 
WriteOperationType.INSERT, metaClient, engineContext, Option.empty()));
+  }
+
+  @Test
+  public void testRunValidatorsWithInvalidClassName() {
+    HoodieWriteConfig config = Mockito.mock(HoodieWriteConfig.class);
+    HoodieTableMetaClient metaClient = 
Mockito.mock(HoodieTableMetaClient.class);
+    HoodieEngineContext engineContext = 
Mockito.mock(HoodieEngineContext.class);
+
+    
when(config.getPreWriteValidators()).thenReturn("com.invalid.NonExistentValidator");
+
+    // Should throw an exception when trying to load invalid class
+    assertThrows(Exception.class, () ->
+        PreWriteValidatorUtils.runValidators(config, "001", 
WriteOperationType.INSERT, metaClient, engineContext, Option.empty()));
+  }
+
+  @Test
+  public void testRunValidatorsWithWhitespaceInClassNames() {
+    HoodieWriteConfig config = Mockito.mock(HoodieWriteConfig.class);
+    HoodieTableMetaClient metaClient = 
Mockito.mock(HoodieTableMetaClient.class);
+    HoodieEngineContext engineContext = 
Mockito.mock(HoodieEngineContext.class);
+
+    // Add whitespace around class names
+    String validators = "  " + PassingPreWriteValidator.class.getName() + " , 
" + PassingPreWriteValidator.class.getName() + "  ";
+    when(config.getPreWriteValidators()).thenReturn(validators);
+
+    assertDoesNotThrow(() ->
+        PreWriteValidatorUtils.runValidators(config, "001", 
WriteOperationType.INSERT, metaClient, engineContext, Option.empty()));
+  }
+
+  @Test
+  public void testRunValidatorsWithRecords() {
+    HoodieWriteConfig config = Mockito.mock(HoodieWriteConfig.class);
+    HoodieTableMetaClient metaClient = 
Mockito.mock(HoodieTableMetaClient.class);
+    HoodieEngineContext engineContext = 
Mockito.mock(HoodieEngineContext.class);
+    HoodieData<HoodieRecord<Object>> records = Mockito.mock(HoodieData.class);
+
+    
when(config.getPreWriteValidators()).thenReturn(RecordCheckingValidator.class.getName());
+
+    assertDoesNotThrow(() ->
+        PreWriteValidatorUtils.runValidators(config, "001", 
WriteOperationType.UPSERT, metaClient, engineContext, Option.of(records)));
+  }
+
+  @Test
+  public void testRunValidatorsWithDifferentOperationTypes() {
+    HoodieWriteConfig config = Mockito.mock(HoodieWriteConfig.class);
+    HoodieTableMetaClient metaClient = 
Mockito.mock(HoodieTableMetaClient.class);
+    HoodieEngineContext engineContext = 
Mockito.mock(HoodieEngineContext.class);
+
+    
when(config.getPreWriteValidators()).thenReturn(OperationTypeCheckingValidator.class.getName());
+
+    // Test with different operation types
+    for (WriteOperationType opType : new WriteOperationType[]{
+        WriteOperationType.INSERT, WriteOperationType.UPSERT, 
WriteOperationType.DELETE,
+        WriteOperationType.BULK_INSERT, WriteOperationType.INSERT_OVERWRITE}) {
+      assertDoesNotThrow(() ->
+          PreWriteValidatorUtils.runValidators(config, "001", opType, 
metaClient, engineContext, Option.empty()));
+    }
+  }
+
+  @Test
+  public void testRunValidatorsWithEmptyStringEntries() {
+    HoodieWriteConfig config = Mockito.mock(HoodieWriteConfig.class);
+    HoodieTableMetaClient metaClient = 
Mockito.mock(HoodieTableMetaClient.class);
+    HoodieEngineContext engineContext = 
Mockito.mock(HoodieEngineContext.class);
+
+    // Test with empty entries in the validator list (should be filtered out)
+    String validators = PassingPreWriteValidator.class.getName() + ",,," + 
PassingPreWriteValidator.class.getName();
+    when(config.getPreWriteValidators()).thenReturn(validators);
+
+    assertDoesNotThrow(() ->
+        PreWriteValidatorUtils.runValidators(config, "001", 
WriteOperationType.INSERT, metaClient, engineContext, Option.empty()));
+  }
+
+  @Test
+  public void testRunValidatorsWithNonValidationException() {
+    HoodieWriteConfig config = Mockito.mock(HoodieWriteConfig.class);
+    HoodieTableMetaClient metaClient = 
Mockito.mock(HoodieTableMetaClient.class);
+    HoodieEngineContext engineContext = 
Mockito.mock(HoodieEngineContext.class);
+
+    
when(config.getPreWriteValidators()).thenReturn(RuntimeExceptionValidator.class.getName());
+
+    // Should catch and handle non-HoodieValidationException errors
+    assertThrows(HoodieValidationException.class, () ->
+        PreWriteValidatorUtils.runValidators(config, "001", 
WriteOperationType.INSERT, metaClient, engineContext, Option.empty()));
+  }
+
+  @Test
+  public void testRunValidatorsInParallel() {
+    HoodieWriteConfig config = Mockito.mock(HoodieWriteConfig.class);
+    HoodieTableMetaClient metaClient = 
Mockito.mock(HoodieTableMetaClient.class);
+    HoodieEngineContext engineContext = 
Mockito.mock(HoodieEngineContext.class);
+
+    // Reset counters
+    SlowValidator.reset();
+
+    String validators = SlowValidator.class.getName() + "," + 
SlowValidator.class.getName() + "," + SlowValidator.class.getName();
+    when(config.getPreWriteValidators()).thenReturn(validators);
+
+    long startTime = System.currentTimeMillis();
+    assertDoesNotThrow(() ->
+        PreWriteValidatorUtils.runValidators(config, "001", 
WriteOperationType.INSERT, metaClient, engineContext, Option.empty()));
+    long duration = System.currentTimeMillis() - startTime;
+
+    // Verify all validators were called
+    assertEquals(3, SlowValidator.getCallCount());
+
+    // If running in parallel, total time should be closer to 100ms than 300ms
+    // Allow some buffer for test execution overhead
+    assertTrue(duration < 250, "Validators should run in parallel. Duration: " 
+ duration + "ms");
+  }
+
+  @Test
+  public void testRunValidatorsAllCalled() {
+    HoodieWriteConfig config = Mockito.mock(HoodieWriteConfig.class);
+    HoodieTableMetaClient metaClient = 
Mockito.mock(HoodieTableMetaClient.class);
+    HoodieEngineContext engineContext = 
Mockito.mock(HoodieEngineContext.class);
+
+    // Reset counters
+    SlowValidator.reset();
+
+    String validators = SlowValidator.class.getName() + "," + 
SlowValidator.class.getName() + "," + SlowValidator.class.getName();
+    when(config.getPreWriteValidators()).thenReturn(validators);
+
+    assertDoesNotThrow(() ->
+        PreWriteValidatorUtils.runValidators(config, "001", 
WriteOperationType.INSERT, metaClient, engineContext, Option.empty()));
+
+    // Verify all 3 validators were called
+    assertEquals(3, SlowValidator.getCallCount());
+  }
+
+  @Test
+  public void testRunValidatorsWithCustomValidatorName() {
+    HoodieWriteConfig config = Mockito.mock(HoodieWriteConfig.class);
+    HoodieTableMetaClient metaClient = 
Mockito.mock(HoodieTableMetaClient.class);
+    HoodieEngineContext engineContext = 
Mockito.mock(HoodieEngineContext.class);
+
+    
when(config.getPreWriteValidators()).thenReturn(CustomNameValidator.class.getName());
+
+    assertDoesNotThrow(() ->
+        PreWriteValidatorUtils.runValidators(config, "001", 
WriteOperationType.INSERT, metaClient, engineContext, Option.empty()));
+  }
+
+  @Test
+  public void testRunValidatorsMultipleFailures() {
+    HoodieWriteConfig config = Mockito.mock(HoodieWriteConfig.class);
+    HoodieTableMetaClient metaClient = 
Mockito.mock(HoodieTableMetaClient.class);
+    HoodieEngineContext engineContext = 
Mockito.mock(HoodieEngineContext.class);
+
+    // Multiple failing validators
+    String validators = FailingPreWriteValidator.class.getName() + "," + 
FailingPreWriteValidator.class.getName();
+    when(config.getPreWriteValidators()).thenReturn(validators);
+
+    assertThrows(HoodieValidationException.class, () ->
+        PreWriteValidatorUtils.runValidators(config, "001", 
WriteOperationType.INSERT, metaClient, engineContext, Option.empty()));
+  }
+
+  /**
+   * A test validator that always passes.
+   */
+  public static class PassingPreWriteValidator implements PreWriteValidator {
+    @Override
+    public <T> void validate(String instantTime,
+                             WriteOperationType writeOperationType,
+                             HoodieTableMetaClient metaClient,
+                             HoodieWriteConfig writeConfig,
+                             HoodieEngineContext engineContext,
+                             Option<HoodieData<HoodieRecord<T>>> recordsOpt) 
throws HoodieValidationException {
+      // Always passes - do nothing
+    }
+
+    @Override
+    public String getName() {
+      return "PassingPreWriteValidator";
+    }
+  }
+
+  /**
+   * A test validator that always fails.
+   */
+  public static class FailingPreWriteValidator implements PreWriteValidator {
+    @Override
+    public <T> void validate(String instantTime,
+                             WriteOperationType writeOperationType,
+                             HoodieTableMetaClient metaClient,
+                             HoodieWriteConfig writeConfig,
+                             HoodieEngineContext engineContext,
+                             Option<HoodieData<HoodieRecord<T>>> recordsOpt) 
throws HoodieValidationException {
+      throw new HoodieValidationException("Validation failed for testing");
+    }
+
+    @Override
+    public String getName() {
+      return "FailingPreWriteValidator";
+    }
+  }
+
+  /**
+   * A test validator that checks if records are present.
+   */
+  public static class RecordCheckingValidator implements PreWriteValidator {
+    @Override
+    public <T> void validate(String instantTime,
+                             WriteOperationType writeOperationType,
+                             HoodieTableMetaClient metaClient,
+                             HoodieWriteConfig writeConfig,
+                             HoodieEngineContext engineContext,
+                             Option<HoodieData<HoodieRecord<T>>> recordsOpt) 
throws HoodieValidationException {
+      // Just verify we can access the recordsOpt parameter
+      recordsOpt.isPresent();
+    }
+
+    @Override
+    public String getName() {
+      return "RecordCheckingValidator";
+    }
+  }
+
+  /**
+   * A test validator that checks the operation type.
+   */
+  public static class OperationTypeCheckingValidator implements 
PreWriteValidator {
+    @Override
+    public <T> void validate(String instantTime,
+                             WriteOperationType writeOperationType,
+                             HoodieTableMetaClient metaClient,
+                             HoodieWriteConfig writeConfig,
+                             HoodieEngineContext engineContext,
+                             Option<HoodieData<HoodieRecord<T>>> recordsOpt) 
throws HoodieValidationException {
+      // Just verify we can access the writeOperationType parameter
+      if (writeOperationType == null) {
+        throw new HoodieValidationException("WriteOperationType is null");
+      }
+    }
+
+    @Override
+    public String getName() {
+      return "OperationTypeCheckingValidator";
+    }
+  }
+
+  /**
+   * A test validator that throws a non-HoodieValidationException.
+   */
+  public static class RuntimeExceptionValidator implements PreWriteValidator {
+    @Override
+    public <T> void validate(String instantTime,
+                             WriteOperationType writeOperationType,
+                             HoodieTableMetaClient metaClient,
+                             HoodieWriteConfig writeConfig,
+                             HoodieEngineContext engineContext,
+                             Option<HoodieData<HoodieRecord<T>>> recordsOpt) {
+      throw new RuntimeException("Unexpected runtime exception");
+    }
+
+    @Override
+    public String getName() {
+      return "RuntimeExceptionValidator";
+    }
+  }
+
+  /**
+   * A test validator that sleeps to test parallel execution.
+   */
+  public static class SlowValidator implements PreWriteValidator {
+    private static final AtomicInteger CALL_COUNT = new AtomicInteger(0);
+
+    public static void reset() {
+      CALL_COUNT.set(0);
+    }
+
+    public static int getCallCount() {
+      return CALL_COUNT.get();
+    }
+
+    @Override
+    public <T> void validate(String instantTime,
+                             WriteOperationType writeOperationType,
+                             HoodieTableMetaClient metaClient,
+                             HoodieWriteConfig writeConfig,
+                             HoodieEngineContext engineContext,
+                             Option<HoodieData<HoodieRecord<T>>> recordsOpt) 
throws HoodieValidationException {
+      CALL_COUNT.incrementAndGet();
+      try {
+        Thread.sleep(100); // Sleep for 100ms
+      } catch (InterruptedException e) {
+        Thread.currentThread().interrupt();
+        throw new HoodieValidationException("Interrupted", e);
+      }
+    }
+
+    @Override
+    public String getName() {
+      return "SlowValidator";
+    }
+  }
+
+  /**
+   * A test validator with a custom name.
+   */
+  public static class CustomNameValidator implements PreWriteValidator {
+    @Override
+    public <T> void validate(String instantTime,
+                             WriteOperationType writeOperationType,
+                             HoodieTableMetaClient metaClient,
+                             HoodieWriteConfig writeConfig,
+                             HoodieEngineContext engineContext,
+                             Option<HoodieData<HoodieRecord<T>>> recordsOpt) 
throws HoodieValidationException {
+      // Do nothing
+    }
+
+    @Override
+    public String getName() {
+      return "MyCustomValidatorName";
+    }
+  }
+}
diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodiePreWriteValidatorConfig.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodiePreWriteValidatorConfig.java
new file mode 100644
index 000000000000..f6a9ca16fd9a
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/config/TestHoodiePreWriteValidatorConfig.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.config;
+
+import org.apache.hudi.common.config.TypedProperties;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Properties;
+
+import static 
org.apache.hudi.config.HoodiePreWriteValidatorConfig.VALIDATOR_CLASS_NAMES;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+
+/**
+ * Tests for {@link HoodiePreWriteValidatorConfig}.
+ */
+class TestHoodiePreWriteValidatorConfig {
+
+  @Test
+  void testValidatorClassNamesConfigProperty() {
+    // documentation is not null
+    assertNotNull(VALIDATOR_CLASS_NAMES.doc());
+    assertNotEquals("", VALIDATOR_CLASS_NAMES.doc());
+
+    // default value is empty string
+    assertEquals("", VALIDATOR_CLASS_NAMES.defaultValue());
+
+    // config key is correct
+    assertEquals("hoodie.prewrite.validators", VALIDATOR_CLASS_NAMES.key());
+  }
+
+  @Test
+  void testNewBuilder() {
+    HoodiePreWriteValidatorConfig.Builder builder = 
HoodiePreWriteValidatorConfig.newBuilder();
+    assertNotNull(builder);
+
+    // Verify builder creates a valid config
+    HoodiePreWriteValidatorConfig config = builder.build();
+    assertNotNull(config);
+    assertNotNull(config.getProps());
+  }
+
+  @Test
+  void testBuilderWithValidatorClassNames() {
+    String validatorClassName = "org.apache.hudi.TestValidator";
+    HoodiePreWriteValidatorConfig config = 
HoodiePreWriteValidatorConfig.newBuilder()
+        .withPreWriteValidator(validatorClassName)
+        .build();
+
+    assertEquals(validatorClassName, config.getString(VALIDATOR_CLASS_NAMES));
+  }
+
+  @Test
+  void testBuilderWithMultipleValidators() {
+    String validators = 
"org.apache.hudi.Validator1,org.apache.hudi.Validator2";
+    HoodiePreWriteValidatorConfig config = 
HoodiePreWriteValidatorConfig.newBuilder()
+        .withPreWriteValidator(validators)
+        .build();
+
+    assertEquals(validators, config.getString(VALIDATOR_CLASS_NAMES));
+  }
+
+  @Test
+  void testBuilderWithDefaultValue() {
+    HoodiePreWriteValidatorConfig config = 
HoodiePreWriteValidatorConfig.newBuilder()
+        .build();
+
+    assertEquals("", config.getString(VALIDATOR_CLASS_NAMES));
+  }
+
+  @Test
+  void testBuilderFromProperties() {
+    Properties props = new Properties();
+    String validatorClassName = "org.apache.hudi.TestValidator";
+    props.setProperty(VALIDATOR_CLASS_NAMES.key(), validatorClassName);
+
+    HoodiePreWriteValidatorConfig config = 
HoodiePreWriteValidatorConfig.newBuilder()
+        .fromProperties(props)
+        .build();
+
+    assertEquals(validatorClassName, config.getString(VALIDATOR_CLASS_NAMES));
+  }
+
+  @Test
+  void testBuilderFromFile(@TempDir Path tempDir) throws IOException {
+    File configFile = tempDir.resolve("test-config.properties").toFile();
+    Properties props = new Properties();
+    String validatorClassName = "org.apache.hudi.TestValidator";
+    props.setProperty(VALIDATOR_CLASS_NAMES.key(), validatorClassName);
+
+    try (FileOutputStream fos = new FileOutputStream(configFile)) {
+      props.store(fos, "Test config");
+    }
+
+    HoodiePreWriteValidatorConfig config = 
HoodiePreWriteValidatorConfig.newBuilder()
+        .fromFile(configFile)
+        .build();
+
+    assertEquals(validatorClassName, config.getString(VALIDATOR_CLASS_NAMES));
+  }
+
+  @Test
+  void testBuilderChainingFromPropertiesAndWithPreWriteValidator() {
+    Properties props = new Properties();
+    props.setProperty(VALIDATOR_CLASS_NAMES.key(), 
"org.apache.hudi.Validator1");
+
+    String validator2 = "org.apache.hudi.Validator2";
+    HoodiePreWriteValidatorConfig config = 
HoodiePreWriteValidatorConfig.newBuilder()
+        .fromProperties(props)
+        .withPreWriteValidator(validator2)
+        .build();
+
+    // withPreWriteValidator should override the value from properties
+    assertEquals(validator2, config.getString(VALIDATOR_CLASS_NAMES));
+  }
+
+  @Test
+  void testBuilderChainingFromFileAndWithPreWriteValidator(@TempDir Path 
tempDir) throws IOException {
+    File configFile = tempDir.resolve("test-config.properties").toFile();
+    Properties props = new Properties();
+    props.setProperty(VALIDATOR_CLASS_NAMES.key(), 
"org.apache.hudi.Validator1");
+
+    try (FileOutputStream fos = new FileOutputStream(configFile)) {
+      props.store(fos, "Test config");
+    }
+
+    String validator2 = "org.apache.hudi.Validator2";
+    HoodiePreWriteValidatorConfig config = 
HoodiePreWriteValidatorConfig.newBuilder()
+        .fromFile(configFile)
+        .withPreWriteValidator(validator2)
+        .build();
+
+    // withPreWriteValidator should override the value from file
+    assertEquals(validator2, config.getString(VALIDATOR_CLASS_NAMES));
+  }
+
+  @Test
+  void testConfigInTypedProperties() {
+    TypedProperties props = new TypedProperties();
+    String validatorClassName = "org.apache.hudi.TestValidator";
+    props.setProperty(VALIDATOR_CLASS_NAMES.key(), validatorClassName);
+
+    assertEquals(validatorClassName, 
props.getString(VALIDATOR_CLASS_NAMES.key(), 
VALIDATOR_CLASS_NAMES.defaultValue()));
+  }
+
+  @Test
+  void testEmptyStringValidatorClassNames() {
+    HoodiePreWriteValidatorConfig config = 
HoodiePreWriteValidatorConfig.newBuilder()
+        .withPreWriteValidator("")
+        .build();
+
+    assertEquals("", config.getString(VALIDATOR_CLASS_NAMES));
+  }
+
+  @Test
+  void testBuilderReturnsThis() {
+    HoodiePreWriteValidatorConfig.Builder builder = 
HoodiePreWriteValidatorConfig.newBuilder();
+    Properties props = new Properties();
+
+    // Verify fromProperties returns the same builder instance (for chaining)
+    HoodiePreWriteValidatorConfig.Builder result = 
builder.fromProperties(props);
+    assertNotNull(result);
+
+    // Verify withPreWriteValidator returns the builder (for chaining)
+    result = builder.withPreWriteValidator("test.Validator");
+    assertNotNull(result);
+  }
+
+  @Test
+  void testConfigPropertiesInitialized() {
+    HoodiePreWriteValidatorConfig config = 
HoodiePreWriteValidatorConfig.newBuilder().build();
+
+    // Verify the config has properties initialized
+    assertNotNull(config.getProps());
+
+    // Verify default value is set
+    assertEquals("", config.getString(VALIDATOR_CLASS_NAMES));
+  }
+}
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index 7ebd21593c3e..1e5a691833d0 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -234,7 +234,7 @@ public class HoodieFlinkWriteClient<T>
     HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> 
table =
         initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));
     table.validateUpsertSchema();
-    preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, 
table.getMetaClient());
+    preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, 
table.getMetaClient(), Option.of(HoodieListData.eager(preppedRecords)));
     Map<String, List<HoodieRecord<T>>> preppedRecordsByFileId = 
preppedRecords.stream().parallel()
         .collect(Collectors.groupingBy(r -> 
r.getCurrentLocation().getFileId()));
     return preppedRecordsByFileId.values().stream().parallel().map(records -> {
@@ -318,7 +318,7 @@ public class HoodieFlinkWriteClient<T>
     HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> 
table =
         initTable(WriteOperationType.BULK_INSERT_PREPPED, 
Option.ofNullable(instantTime));
     table.validateInsertSchema();
-    preWrite(instantTime, WriteOperationType.BULK_INSERT_PREPPED, 
table.getMetaClient());
+    preWrite(instantTime, WriteOperationType.BULK_INSERT_PREPPED, 
table.getMetaClient(), Option.of(HoodieListData.eager(preppedRecords)));
     Map<String, List<HoodieRecord<T>>> preppedRecordsByFileId = 
preppedRecords.stream().parallel()
         .collect(Collectors.groupingBy(r -> 
r.getCurrentLocation().getFileId()));
     return preppedRecordsByFileId.values().stream().parallel().map(records -> {
@@ -345,7 +345,7 @@ public class HoodieFlinkWriteClient<T>
   public List<WriteStatus> deletePrepped(List<HoodieRecord<T>> preppedRecords, 
final String instantTime) {
     HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> 
table =
         initTable(WriteOperationType.DELETE_PREPPED, 
Option.ofNullable(instantTime));
-    preWrite(instantTime, WriteOperationType.DELETE_PREPPED, 
table.getMetaClient());
+    preWrite(instantTime, WriteOperationType.DELETE_PREPPED, 
table.getMetaClient(), Option.of(HoodieListData.eager(preppedRecords)));
     HoodieWriteMetadata<List<WriteStatus>> result = 
table.deletePrepped(context, instantTime, preppedRecords);
     return postWrite(result, instantTime, table);
   }
@@ -360,11 +360,20 @@ public class HoodieFlinkWriteClient<T>
 
   @Override
   public void preWrite(String instantTime, WriteOperationType 
writeOperationType, HoodieTableMetaClient metaClient) {
+    preWrite(instantTime, writeOperationType, metaClient, Option.empty());
+  }
+
+  @Override
+  public void preWrite(String instantTime, WriteOperationType 
writeOperationType,
+                       HoodieTableMetaClient metaClient, 
Option<HoodieData<HoodieRecord<T>>> recordsOpt) {
     setOperationType(writeOperationType);
     // Note: the code to read the commit metadata is not thread safe for JSON 
deserialization,
     // remove the table metadata sync
 
     // remove the async cleaning
+
+    // Run pre-write validators
+    runPreWriteValidators(instantTime, writeOperationType, metaClient, 
recordsOpt);
   }
 
   /**
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
index 53dd067ca5f8..9f3d550ba630 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
@@ -113,7 +113,7 @@ public class HoodieJavaWriteClient<T> extends
     HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> 
table =
         initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));
     table.validateUpsertSchema();
-    preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
+    preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient(), 
Option.of(HoodieListData.eager(records)));
     HoodieWriteMetadata<List<WriteStatus>> result = table.upsert(context, 
instantTime, records);
     if (result.getIndexLookupDuration().isPresent()) {
       metrics.updateIndexMetrics(LOOKUP_STR, 
result.getIndexLookupDuration().get().toMillis());
@@ -127,7 +127,7 @@ public class HoodieJavaWriteClient<T> extends
     HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> 
table =
         initTable(WriteOperationType.UPSERT_PREPPED, 
Option.ofNullable(instantTime));
     table.validateUpsertSchema();
-    preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, 
table.getMetaClient());
+    preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, 
table.getMetaClient(), Option.of(HoodieListData.eager(preppedRecords)));
     HoodieWriteMetadata<List<WriteStatus>> result = 
table.upsertPrepped(context,instantTime, preppedRecords);
     return postWrite(result, instantTime, table);
   }
@@ -137,7 +137,7 @@ public class HoodieJavaWriteClient<T> extends
     HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> 
table =
         initTable(WriteOperationType.INSERT, Option.ofNullable(instantTime));
     table.validateUpsertSchema();
-    preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient());
+    preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient(), 
Option.of(HoodieListData.eager(records)));
     HoodieWriteMetadata<List<WriteStatus>> result = table.insert(context, 
instantTime, records);
     if (result.getIndexLookupDuration().isPresent()) {
       metrics.updateIndexMetrics(LOOKUP_STR, 
result.getIndexLookupDuration().get().toMillis());
@@ -151,7 +151,7 @@ public class HoodieJavaWriteClient<T> extends
     HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> 
table =
         initTable(WriteOperationType.INSERT_PREPPED, 
Option.ofNullable(instantTime));
     table.validateInsertSchema();
-    preWrite(instantTime, WriteOperationType.INSERT_PREPPED, 
table.getMetaClient());
+    preWrite(instantTime, WriteOperationType.INSERT_PREPPED, 
table.getMetaClient(), Option.of(HoodieListData.eager(preppedRecords)));
     HoodieWriteMetadata<List<WriteStatus>> result = 
table.insertPrepped(context,instantTime, preppedRecords);
     return postWrite(result, instantTime, table);
   }
@@ -169,7 +169,7 @@ public class HoodieJavaWriteClient<T> extends
     HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> 
table =
         initTable(WriteOperationType.BULK_INSERT, 
Option.ofNullable(instantTime));
     table.validateInsertSchema();
-    preWrite(instantTime, WriteOperationType.BULK_INSERT, 
table.getMetaClient());
+    preWrite(instantTime, WriteOperationType.BULK_INSERT, 
table.getMetaClient(), Option.of(HoodieListData.eager(records)));
     HoodieWriteMetadata<List<WriteStatus>> result = table.bulkInsert(context, 
instantTime, records, userDefinedBulkInsertPartitioner);
     return postWrite(result, instantTime, table);
   }
@@ -188,7 +188,7 @@ public class HoodieJavaWriteClient<T> extends
     HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> 
table =
         initTable(WriteOperationType.BULK_INSERT_PREPPED, 
Option.ofNullable(instantTime));
     table.validateInsertSchema();
-    preWrite(instantTime, WriteOperationType.BULK_INSERT_PREPPED, 
table.getMetaClient());
+    preWrite(instantTime, WriteOperationType.BULK_INSERT_PREPPED, 
table.getMetaClient(), Option.of(HoodieListData.eager(preppedRecords)));
     HoodieWriteMetadata<List<WriteStatus>> result = 
table.bulkInsertPrepped(context, instantTime, preppedRecords, 
bulkInsertPartitioner);
     return postWrite(result, instantTime, table);
   }
@@ -207,7 +207,7 @@ public class HoodieJavaWriteClient<T> extends
   public List<WriteStatus> deletePrepped(List<HoodieRecord<T>> preppedRecords, 
final String instantTime) {
     HoodieTable<T, List<HoodieRecord<T>>, List<HoodieKey>, List<WriteStatus>> 
table =
         initTable(WriteOperationType.DELETE_PREPPED, 
Option.ofNullable(instantTime));
-    preWrite(instantTime, WriteOperationType.DELETE_PREPPED, 
table.getMetaClient());
+    preWrite(instantTime, WriteOperationType.DELETE_PREPPED, 
table.getMetaClient(), Option.of(HoodieListData.eager(preppedRecords)));
     HoodieWriteMetadata<List<WriteStatus>> result = 
table.deletePrepped(context,instantTime, preppedRecords);
     return postWrite(result, instantTime, table);
   }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 64a1caee32b9..cf01f39a01c9 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -204,7 +204,7 @@ public class SparkRDDWriteClient<T> extends
     HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, 
HoodieData<WriteStatus>> table =
         initTable(WriteOperationType.UPSERT, Option.ofNullable(instantTime));
     table.validateUpsertSchema();
-    preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient());
+    preWrite(instantTime, WriteOperationType.UPSERT, table.getMetaClient(), 
Option.of(HoodieJavaRDD.of(records)));
     HoodieWriteMetadata<HoodieData<WriteStatus>> result = 
table.upsert(context, instantTime, HoodieJavaRDD.of(records));
     HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = 
result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
     if (result.getSourceReadAndIndexDurationMs().isPresent()) {
@@ -218,7 +218,7 @@ public class SparkRDDWriteClient<T> extends
     HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, 
HoodieData<WriteStatus>> table =
         initTable(WriteOperationType.UPSERT_PREPPED, 
Option.ofNullable(instantTime));
     table.validateUpsertSchema();
-    preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, 
table.getMetaClient());
+    preWrite(instantTime, WriteOperationType.UPSERT_PREPPED, 
table.getMetaClient(), Option.of(HoodieJavaRDD.of(preppedRecords)));
     HoodieWriteMetadata<HoodieData<WriteStatus>> result = 
table.upsertPrepped(context, instantTime, HoodieJavaRDD.of(preppedRecords));
     HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = 
result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
     return postWrite(resultRDD, instantTime, table);
@@ -229,7 +229,7 @@ public class SparkRDDWriteClient<T> extends
     HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, 
HoodieData<WriteStatus>> table =
         initTable(WriteOperationType.INSERT, Option.ofNullable(instantTime));
     table.validateInsertSchema();
-    preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient());
+    preWrite(instantTime, WriteOperationType.INSERT, table.getMetaClient(), 
Option.of(HoodieJavaRDD.of(records)));
     HoodieWriteMetadata<HoodieData<WriteStatus>> result = 
table.insert(context, instantTime, HoodieJavaRDD.of(records));
     HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = 
result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
     return postWrite(resultRDD, instantTime, table);
@@ -240,7 +240,7 @@ public class SparkRDDWriteClient<T> extends
     HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, 
HoodieData<WriteStatus>> table =
         initTable(WriteOperationType.INSERT_PREPPED, 
Option.ofNullable(instantTime));
     table.validateInsertSchema();
-    preWrite(instantTime, WriteOperationType.INSERT_PREPPED, 
table.getMetaClient());
+    preWrite(instantTime, WriteOperationType.INSERT_PREPPED, 
table.getMetaClient(), Option.of(HoodieJavaRDD.of(preppedRecords)));
     HoodieWriteMetadata<HoodieData<WriteStatus>> result = 
table.insertPrepped(context, instantTime, HoodieJavaRDD.of(preppedRecords));
     HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = 
result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
     return postWrite(resultRDD, instantTime, table);
@@ -256,7 +256,7 @@ public class SparkRDDWriteClient<T> extends
   public HoodieWriteResult insertOverwrite(JavaRDD<HoodieRecord<T>> records, 
final String instantTime) {
     HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, 
HoodieData<WriteStatus>> table = initTable(WriteOperationType.INSERT_OVERWRITE, 
Option.ofNullable(instantTime));
     table.validateInsertSchema();
-    preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE, 
table.getMetaClient());
+    preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE, 
table.getMetaClient(), Option.of(HoodieJavaRDD.of(records)));
     HoodieWriteMetadata<HoodieData<WriteStatus>> result = 
table.insertOverwrite(context, instantTime, HoodieJavaRDD.of(records));
     HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = 
result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
     return new HoodieWriteResult(postWrite(resultRDD, instantTime, table), 
result.getPartitionToReplaceFileIds());
@@ -272,7 +272,7 @@ public class SparkRDDWriteClient<T> extends
   public HoodieWriteResult insertOverwriteTable(JavaRDD<HoodieRecord<T>> 
records, final String instantTime) {
     HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, 
HoodieData<WriteStatus>> table = 
initTable(WriteOperationType.INSERT_OVERWRITE_TABLE, 
Option.ofNullable(instantTime));
     table.validateInsertSchema();
-    preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE_TABLE, 
table.getMetaClient());
+    preWrite(instantTime, WriteOperationType.INSERT_OVERWRITE_TABLE, 
table.getMetaClient(), Option.of(HoodieJavaRDD.of(records)));
     HoodieWriteMetadata<HoodieData<WriteStatus>> result = 
table.insertOverwriteTable(context, instantTime, HoodieJavaRDD.of(records));
     HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = 
result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
     return new HoodieWriteResult(postWrite(resultRDD, instantTime, table), 
result.getPartitionToReplaceFileIds());
@@ -288,7 +288,7 @@ public class SparkRDDWriteClient<T> extends
     HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, 
HoodieData<WriteStatus>> table =
         initTable(WriteOperationType.BULK_INSERT, 
Option.ofNullable(instantTime));
     table.validateInsertSchema();
-    preWrite(instantTime, WriteOperationType.BULK_INSERT, 
table.getMetaClient());
+    preWrite(instantTime, WriteOperationType.BULK_INSERT, 
table.getMetaClient(), Option.of(HoodieJavaRDD.of(records)));
     HoodieWriteMetadata<HoodieData<WriteStatus>> result = 
table.bulkInsert(context, instantTime, HoodieJavaRDD.of(records), 
userDefinedBulkInsertPartitioner);
     HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = 
result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
     return postWrite(resultRDD, instantTime, table);
@@ -299,7 +299,7 @@ public class SparkRDDWriteClient<T> extends
     HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, 
HoodieData<WriteStatus>> table =
         initTable(WriteOperationType.BULK_INSERT_PREPPED, 
Option.ofNullable(instantTime));
     table.validateInsertSchema();
-    preWrite(instantTime, WriteOperationType.BULK_INSERT_PREPPED, 
table.getMetaClient());
+    preWrite(instantTime, WriteOperationType.BULK_INSERT_PREPPED, 
table.getMetaClient(), Option.of(HoodieJavaRDD.of(preppedRecords)));
     HoodieWriteMetadata<HoodieData<WriteStatus>> result = 
table.bulkInsertPrepped(context, instantTime, HoodieJavaRDD.of(preppedRecords), 
bulkInsertPartitioner);
     HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = 
result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
     return postWrite(resultRDD, instantTime, table);
@@ -317,7 +317,7 @@ public class SparkRDDWriteClient<T> extends
   @Override
   public JavaRDD<WriteStatus> deletePrepped(JavaRDD<HoodieRecord<T>> 
preppedRecord, String instantTime) {
     HoodieTable<T, HoodieData<HoodieRecord<T>>, HoodieData<HoodieKey>, 
HoodieData<WriteStatus>> table = initTable(WriteOperationType.DELETE_PREPPED, 
Option.ofNullable(instantTime));
-    preWrite(instantTime, WriteOperationType.DELETE_PREPPED, 
table.getMetaClient());
+    preWrite(instantTime, WriteOperationType.DELETE_PREPPED, 
table.getMetaClient(), Option.of(HoodieJavaRDD.of(preppedRecord)));
     HoodieWriteMetadata<HoodieData<WriteStatus>> result = 
table.deletePrepped(context, instantTime, HoodieJavaRDD.of(preppedRecord));
     HoodieWriteMetadata<JavaRDD<WriteStatus>> resultRDD = 
result.clone(HoodieJavaRDD.getJavaRDD(result.getWriteStatuses()));
     return postWrite(resultRDD, instantTime, table);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/utils/TestSparkPreWriteValidatorUtils.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/utils/TestSparkPreWriteValidatorUtils.java
new file mode 100644
index 000000000000..a48fd0fb876a
--- /dev/null
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/utils/TestSparkPreWriteValidatorUtils.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.utils;
+
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteClientTestUtils;
+import org.apache.hudi.client.validator.PreWriteValidator;
+import org.apache.hudi.common.data.HoodieData;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodiePreWriteValidatorConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+import org.apache.hudi.testutils.HoodieClientTestBase;
+
+import org.apache.spark.api.java.JavaRDD;
+import org.junit.jupiter.api.Test;
+
+import java.util.List;
+
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for pre-write validators with Spark write client.
+ * These tests verify that pre-write validators are invoked during write 
operations.
+ */
+public class TestSparkPreWriteValidatorUtils extends HoodieClientTestBase {
+
+  @Test
+  public void testPreWriteValidatorFailureBlocksWrite() throws Exception {
+    // Reset the static state
+    FailingValidator.reset();
+
+    HoodieWriteConfig writeConfig = getConfigBuilder()
+        .withProps(HoodiePreWriteValidatorConfig.newBuilder()
+            .withPreWriteValidator(FailingValidator.class.getName())
+            .build().getProps())
+        .build();
+
+    try (SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig)) {
+      String instantTime = "001";
+      WriteClientTestUtils.startCommitWithTime(writeClient, instantTime);
+
+      List<HoodieRecord> records = dataGen.generateInserts(instantTime, 5);
+      JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 2);
+
+      // The validator should throw an exception, blocking the write
+      assertThrows(HoodieValidationException.class, () -> {
+        writeClient.insert(writeRecords, instantTime);
+      });
+
+      assertTrue(FailingValidator.wasValidateCalled(), "Validator should have 
been called before failure");
+    }
+  }
+
+  @Test
+  public void testMultiplePreWriteValidatorsAreInvoked() throws Exception {
+    // Reset static states
+    FirstPassingValidator.reset();
+    SecondPassingValidator.reset();
+
+    String validators = FirstPassingValidator.class.getName() + "," + 
SecondPassingValidator.class.getName();
+    HoodieWriteConfig writeConfig = getConfigBuilder()
+        .withProps(HoodiePreWriteValidatorConfig.newBuilder()
+            .withPreWriteValidator(validators)
+            .build().getProps())
+        .build();
+
+    try (SparkRDDWriteClient writeClient = getHoodieWriteClient(writeConfig)) {
+      String instantTime = "001";
+      WriteClientTestUtils.startCommitWithTime(writeClient, instantTime);
+
+      List<HoodieRecord> records = dataGen.generateInserts(instantTime, 7);
+      JavaRDD<HoodieRecord> writeRecords = jsc.parallelize(records, 2);
+
+      writeClient.insert(writeRecords, instantTime);
+
+      // Verify both validators were called
+      assertTrue(FirstPassingValidator.wasValidateCalled(), 
"FirstPassingValidator should have been called");
+      assertTrue(SecondPassingValidator.wasValidateCalled(), 
"SecondPassingValidator should have been called");
+    }
+  }
+
+  /**
+   * A test validator that always fails.
+   */
+  public static class FailingValidator implements PreWriteValidator {
+    private static volatile boolean validateCalled = false;
+
+    public static void reset() {
+      validateCalled = false;
+    }
+
+    public static boolean wasValidateCalled() {
+      return validateCalled;
+    }
+
+    @Override
+    public <T> void validate(String instantTime,
+                             WriteOperationType writeOperationType,
+                             HoodieTableMetaClient metaClient,
+                             HoodieWriteConfig writeConfig,
+                             HoodieEngineContext engineContext,
+                             Option<HoodieData<HoodieRecord<T>>> recordsOpt) 
throws HoodieValidationException {
+      validateCalled = true;
+      throw new HoodieValidationException("Intentional validation failure for 
testing");
+    }
+
+    @Override
+    public String getName() {
+      return "FailingValidator";
+    }
+  }
+
+  /**
+   * A test validator that always passes.
+   */
+  public static class FirstPassingValidator implements PreWriteValidator {
+    private static volatile boolean validateCalled = false;
+
+    public static void reset() {
+      validateCalled = false;
+    }
+
+    public static boolean wasValidateCalled() {
+      return validateCalled;
+    }
+
+    @Override
+    public <T> void validate(String instantTime,
+                             WriteOperationType writeOperationType,
+                             HoodieTableMetaClient metaClient,
+                             HoodieWriteConfig writeConfig,
+                             HoodieEngineContext engineContext,
+                             Option<HoodieData<HoodieRecord<T>>> recordsOpt) 
throws HoodieValidationException {
+      validateCalled = true;
+    }
+
+    @Override
+    public String getName() {
+      return "FirstPassingValidator";
+    }
+  }
+
+  /**
+   * A second test validator that always passes.
+   */
+  public static class SecondPassingValidator implements PreWriteValidator {
+    private static volatile boolean validateCalled = false;
+
+    public static void reset() {
+      validateCalled = false;
+    }
+
+    public static boolean wasValidateCalled() {
+      return validateCalled;
+    }
+
+    @Override
+    public <T> void validate(String instantTime,
+                             WriteOperationType writeOperationType,
+                             HoodieTableMetaClient metaClient,
+                             HoodieWriteConfig writeConfig,
+                             HoodieEngineContext engineContext,
+                             Option<HoodieData<HoodieRecord<T>>> recordsOpt) 
throws HoodieValidationException {
+      validateCalled = true;
+    }
+
+    @Override
+    public String getName() {
+      return "SecondPassingValidator";
+    }
+  }
+}

Reply via email to