codope commented on code in PR #18405:
URL: https://github.com/apache/hudi/pull/18405#discussion_r3252148203


##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkStreamerValidatorUtils.java:
##########
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
+import org.apache.hudi.common.testutils.HoodieTestTable;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.config.HoodiePreCommitValidatorConfig;
+import org.apache.hudi.exception.HoodieValidationException;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for {@link SparkStreamerValidatorUtils}.
+ *
+ * <p>Uses a lightweight Spark context for JavaRDD creation. Tests validate 
the orchestration
+ * logic (class loading, config passing, error handling) using first-commit 
scenarios
+ * (no previous commit on timeline) to avoid needing a full HoodieTable 
setup.</p>
+ */
+public class TestSparkStreamerValidatorUtils {
+
+  private static JavaSparkContext jsc;
+
+  @TempDir
+  Path tempDir;
+
+  @BeforeAll
+  public static void setUp() {
+    SparkConf conf = new SparkConf()
+        .setAppName("TestSparkStreamerValidatorUtils")
+        .setMaster("local[2]")
+        .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
+        .set("spark.sql.extensions", 
"org.apache.spark.sql.hudi.HoodieSparkSessionExtension");
+    jsc = new JavaSparkContext(conf);
+  }
+
+  @AfterAll
+  public static void tearDown() {
+    if (jsc != null) {
+      jsc.stop();
+      jsc = null;
+    }
+  }
+
+  private static TypedProperties propsWithValidator(String validatorClassName) 
{
+    TypedProperties props = new TypedProperties();
+    
props.setProperty(HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(), 
validatorClassName);
+    
props.setProperty(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(),
 "0.0");
+    
props.setProperty(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(),
 "FAIL");
+    return props;
+  }
+
+  private static WriteStatus buildWriteStatus(String partitionPath, long 
numInserts, long numUpdates) {
+    HoodieWriteStat stat = new HoodieWriteStat();
+    stat.setPartitionPath(partitionPath);
+    stat.setNumInserts(numInserts);
+    stat.setNumUpdateWrites(numUpdates);
+
+    WriteStatus ws = new WriteStatus(false, 0.0);
+    ws.setStat(stat);
+    return ws;
+  }
+
+  private JavaRDD<WriteStatus> toRDD(List<WriteStatus> writeStatuses) {
+    return jsc.parallelize(writeStatuses);
+  }
+
+  private org.apache.hudi.common.table.HoodieTableMetaClient 
createMetaClient() throws IOException {
+    return org.apache.hudi.common.testutils.HoodieTestUtils.init(
+        tempDir.toAbsolutePath().toString());
+  }
+
+  // ========== Tests ==========

Review Comment:
   Thanks for adding these tests. Both tests seed via 
STREAMER_CHECKPOINT_KEY_V1. They never exercise the V2-key path because the 
validator can't read it. In context of [this 
comment](https://github.com/apache/hudi/pull/18405/changes#r3252136615), shall 
we make these two tests parameterized over both keys (and ideally a new test on 
a writeTableVersion=8 table that confirms the validator actually fires)?



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkKafkaOffsetValidator.java:
##########
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.utilities.streamer.validator;
+
+import org.apache.hudi.client.validator.StreamingOffsetValidator;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1;
+import org.apache.hudi.common.util.CheckpointUtils.CheckpointFormat;
+
+/**
+ * Spark/HoodieStreamer-specific Kafka offset validator.
+ *
+ * <p>Validates that the number of records written matches the Kafka offset 
difference
+ * between the current and previous HoodieStreamer checkpoints. Uses the Spark 
Kafka
+ * checkpoint format stored with key {@code deltastreamer.checkpoint.key} in 
extraMetadata.</p>
+ *
+ * <p>Configuration:
+ * <ul>
+ *   <li>{@code hoodie.precommit.validators}: Include
+ *       {@code 
org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator}</li>
+ *   <li>{@code 
hoodie.precommit.validators.streaming.offset.tolerance.percentage}:
+ *       Acceptable deviation (default: 0.0 = strict)</li>
+ *   <li>{@code hoodie.precommit.validators.failure.policy}:
+ *       FAIL (default) or WARN_LOG</li>
+ * </ul></p>
+ *
+ * <p>This validator is primarily intended for append-only ingestion from 
Kafka via HoodieStreamer.
+ * For upsert workloads with deduplication, configure a higher tolerance or 
use WARN_LOG.</p>
+ *
+ * <p><b>Important:</b> This class extends {@link 
org.apache.hudi.client.validator.BasePreCommitValidator}
+ * and is invoked by {@link SparkStreamerValidatorUtils}, NOT by {@code 
SparkValidatorUtils}
+ * (which expects {@code SparkPreCommitValidator} with a different constructor 
signature).
+ * Listing this class in {@code hoodie.precommit.validators} while also using 
the standard
+ * Spark table write-path validators will cause an instantiation failure in 
{@code SparkValidatorUtils}.
+ * Use this validator exclusively with HoodieStreamer pipelines.</p>
+ */
+public class SparkKafkaOffsetValidator extends StreamingOffsetValidator {
+
+  public SparkKafkaOffsetValidator(TypedProperties config) {
+    super(config, StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, 
CheckpointFormat.SPARK_KAFKA);

Review Comment:
   @shangxinli this sounds like a legit concern.
   > steady-state V2 streamers only write streamer.checkpoint.key.v2 (no V1 
fallback unless during a V1→V2 transition via addV1Props)
   
   I checked the code: 
[StreamerCheckpointV2.getCheckpointCommitMetadata()](https://github.com/apache/hudi/blob/4035f707f8a132c58d221a339ff0b44f990a4537/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/StreamerCheckpointV2.java#L57)
 puts only streamer.checkpoint.key.v2. The V1 key is added to extraProps 
exclusively via `addV1Props()`.
   
   To fix, I see 2 options:
       - Option 1 (cleanest): have the base `StreamingOffsetValidator` use 
[CheckpointUtils.getCheckpoint(commitMetadata).getCheckpointKey()](https://github.com/apache/hudi/blob/4035f707f8a132c58d221a339ff0b44f990a4537/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java#L62-L72)
 as it already does V2-then-V1 fallback, and is the single source of truth for 
which key is active. The subclass no longer needs to pass a hardcoded key at 
all.
       - Option 2 (smaller blast radius): have `SparkKafkaOffsetValidator` look 
up V2 first, fall back to V1. Doesn't touch the base class or the Flink 
validator.
   
   Whichever option you prefer is fine for this PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to