hudi-agent commented on code in PR #18405:
URL: https://github.com/apache/hudi/pull/18405#discussion_r3253216037


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -874,8 +877,38 @@ private Pair<Option<String>, JavaRDD<WriteStatus>> 
writeToSinkAndDoMetaSync(Hood
           totalSuccessfulRecords);
       String commitActionType = CommitUtils.getCommitActionType(cfg.operation, 
HoodieTableType.valueOf(cfg.tableType));
 
-      boolean success = writeClient.commit(instantTime, writeStatusRDD, 
Option.of(checkpointCommitMetadata), commitActionType, 
partitionToReplacedFileIds, Option.empty(),
-          Option.of(writeStatusValidator));
+      // Cache the RDD only when pre-commit validators are configured. 
Validators collect the RDD
+      // before commit, so without caching the same DAG would re-evaluate 
inside writeClient.commit().
+      // When no validators are configured, commit consumes the RDD once and 
caching adds no value.
+      // shouldUnpersist is true only when we created the cache here 
(validators present and storage
+      // level was NONE), so the finally block knows to release it.
+      boolean validatorsConfigured = 
!StringUtils.isNullOrEmpty(props.getString(
+          HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
+          
HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue()));
+      boolean shouldUnpersist = validatorsConfigured && 
writeStatusRDD.getStorageLevel().equals(StorageLevel.NONE());

Review Comment:
   🤖 nit: `shouldUnpersist` reads oddly at the point where it gates 
`writeStatusRDD.cache()` ("if we should unpersist, cache"). Could you rename to 
something like `shouldCacheAndUnpersist` or `cacheForValidators` so the intent 
at both call sites matches the name?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkStreamerValidatorUtils.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.client.validator.BasePreCommitValidator;
+import org.apache.hudi.client.validator.ValidationContext;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+/**
+ * Utility for running pre-commit validators in the HoodieStreamer commit flow.
+ *
+ * <p>Instantiates and executes validators configured via
+ * {@code hoodie.precommit.validators}. Each validator must extend
+ * {@link BasePreCommitValidator} and have a constructor that accepts
+ * {@link TypedProperties}.</p>
+ *
+ * <p>Called from {@code StreamSync.writeToSinkAndDoMetaSync()} before
+ * the commit is finalized.</p>
+ *
+ * <p><b>Note on validator compatibility:</b> This utility uses a different 
instantiation
+ * mechanism than {@code SparkValidatorUtils} (used by the Spark table write 
path).
+ * {@code SparkValidatorUtils} expects validators implementing {@code 
SparkPreCommitValidator}
+ * with a {@code (HoodieSparkTable, HoodieEngineContext, HoodieWriteConfig)} 
constructor.
+ * Validators registered here (e.g. {@link SparkKafkaOffsetValidator}) extend
+ * {@link BasePreCommitValidator} with a {@code (TypedProperties)} constructor 
and
+ * are NOT compatible with {@code SparkValidatorUtils}. Do not mix them under 
the same
+ * {@code hoodie.precommit.validators} config if both paths are active.</p>
+ */
+public class SparkStreamerValidatorUtils {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkStreamerValidatorUtils.class);
+
+  /**
+   * Run all configured pre-commit validators.
+   *
+   * <p>The caller is responsible for caching and unpersisting the source RDD 
if needed.
+   * This method accepts pre-collected write statuses to avoid a second DAG 
evaluation —
+   * the caller should cache the RDD, collect to this list, call this method, 
then pass
+   * the same RDD to {@code writeClient.commit()}, and unpersist after commit 
completes.</p>
+   *
+   * @param props Configuration properties containing validator class names
+   * @param instantTime Commit instant time
+   * @param writeStatuses Pre-collected write statuses from Spark write 
operations
+   * @param checkpointCommitMetadata Extra metadata being committed (contains 
checkpoint info)
+   * @param metaClient Table meta client for timeline access and previous 
commit lookup
+   * @throws HoodieValidationException if any validator fails with FAIL policy
+   */
+  public static void runValidators(TypedProperties props,
+                                   String instantTime,
+                                   List<WriteStatus> writeStatuses,
+                                   Map<String, String> 
checkpointCommitMetadata,
+                                   HoodieTableMetaClient metaClient) {
+    String validatorClassNames = props.getString(
+        HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(),
+        HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.defaultValue());
+
+    if (StringUtils.isNullOrEmpty(validatorClassNames)) {
+      return;
+    }
+
+    HoodieCommitMetadata currentMetadata = buildCommitMetadata(writeStatuses, 
checkpointCommitMetadata);
+    List<HoodieWriteStat> writeStats = writeStatuses.stream()
+        .map(WriteStatus::getStat)
+        .filter(Objects::nonNull)
+        .collect(Collectors.toList());
+
+    Option<HoodieCommitMetadata> previousCommitMetadata = 
loadPreviousCommitMetadata(metaClient);
+
+    ValidationContext context = new SparkValidationContext(
+        instantTime,
+        Option.of(currentMetadata),
+        Option.of(writeStats),
+        previousCommitMetadata,
+        metaClient);
+
+    List<String> classNames = Arrays.stream(validatorClassNames.split(","))
+        .map(String::trim)
+        .filter(s -> !s.isEmpty())
+        .collect(Collectors.toList());
+
+    for (String className : classNames) {
+      try {
+        Class<?> clazz = Class.forName(className);
+        if (!BasePreCommitValidator.class.isAssignableFrom(clazz)) {
+          LOG.warn("Skipping validator {} in HoodieStreamer path — it does not 
extend BasePreCommitValidator. "
+              + "If this is a SparkPreCommitValidator (e.g. 
SqlQueryEqualityPreCommitValidator), "
+              + "it must be invoked via SparkValidatorUtils in the standard 
Spark write path instead.", className);
+          continue;
+        }
+        BasePreCommitValidator validator = (BasePreCommitValidator)
+            ReflectionUtils.loadClass(className, new Class<?>[] 
{TypedProperties.class}, props);
+        LOG.info("Running pre-commit validator: {} for instant: {}", 
className, instantTime);
+        validator.validateWithMetadata(context);
+        LOG.info("Pre-commit validator {} passed for instant: {}", className, 
instantTime);
+      } catch (HoodieValidationException e) {
+        LOG.error("Pre-commit validator {} failed for instant: {}", className, 
instantTime, e);
+        throw e;
+      } catch (Exception e) {
+        LOG.error("Failed to instantiate or run validator: {}", className, e);
+        throw new HoodieValidationException(
+            "Failed to run pre-commit validator: " + className, e);
+      }
+    }
+  }
+
+  /**
+   * Build a pre-commit snapshot of {@link HoodieCommitMetadata} from write 
statuses and extra metadata.
+   *
+   * <p>This is intentionally a partial/preview object used only for 
validation — it contains
+   * write stats and checkpoint extra-metadata, but omits fields that are not 
available before the
+   * commit (e.g. schema, operation type). Validators should treat this as a 
read-only snapshot
+   * of what will be committed, not a fully-constructed commit record.</p>
+   */
+  private static HoodieCommitMetadata buildCommitMetadata(
+      List<WriteStatus> writeStatuses, Map<String, String> extraMetadata) {
+    HoodieCommitMetadata metadata = new HoodieCommitMetadata();
+
+    // Add write stats
+    for (WriteStatus status : writeStatuses) {
+      HoodieWriteStat stat = status.getStat();
+      if (stat != null) {
+        metadata.addWriteStat(stat.getPartitionPath(), stat);
+      }
+    }
+
+    // Add extra metadata (includes checkpoint info like 
deltastreamer.checkpoint.key)
+    if (extraMetadata != null) {
+      extraMetadata.forEach(metadata::addMetadata);
+    }
+
+    return metadata;
+  }
+
+  /**
+   * Load the previous completed commit metadata from the timeline.
+   */
+  private static Option<HoodieCommitMetadata> 
loadPreviousCommitMetadata(HoodieTableMetaClient metaClient) {
+    try {
+      HoodieTimeline completedTimeline = metaClient.reloadActiveTimeline()

Review Comment:
   🤖 `lastInstant()` on the write timeline can return a compaction or 
clustering commit that has no streamer checkpoint key, which then makes 
`resolveCheckpoint` return empty and the validator silently skip with "Previous 
checkpoint not found". With inline/auto compaction or any clustering, this 
would happen on most commits. HoodieStreamer itself avoids this via 
`StreamerCheckpointUtils.getLatestCommitMetadataWithValidCheckpointInfo` which 
walks back to find the most recent commit carrying V1/V2 checkpoint info (see 
`TestHoodieDeltaStreamer.java:3831` exercising exactly this clustering case). 
Could we reuse that here, or otherwise walk back past non-streamer commits? 
@yihua could you weigh in on whether this is the right shape for the 
validator's previous-commit lookup?
   
   <sub><i>- AI-generated; verify before applying. React 👍/👎 to flag 
quality.</i></sub>



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to