codope commented on code in PR #18405: URL: https://github.com/apache/hudi/pull/18405#discussion_r3252148203
########## hudi-utilities/src/test/java/org/apache/hudi/utilities/streamer/validator/TestSparkStreamerValidatorUtils.java: ########## @@ -0,0 +1,237 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.streamer.validator; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.model.HoodieCommitMetadata; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1; +import org.apache.hudi.common.testutils.HoodieTestTable; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.config.HoodiePreCommitValidatorConfig; +import org.apache.hudi.exception.HoodieValidationException; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.IOException; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +/** + * Tests for {@link SparkStreamerValidatorUtils}. + * + * <p>Uses a lightweight Spark context for JavaRDD creation. Tests validate the orchestration + * logic (class loading, config passing, error handling) using first-commit scenarios + * (no previous commit on timeline) to avoid needing a full HoodieTable setup.</p> + */ +public class TestSparkStreamerValidatorUtils { + + private static JavaSparkContext jsc; + + @TempDir + Path tempDir; + + @BeforeAll + public static void setUp() { + SparkConf conf = new SparkConf() + .setAppName("TestSparkStreamerValidatorUtils") + .setMaster("local[2]") + .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") + .set("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension"); + jsc = new JavaSparkContext(conf); + } + + @AfterAll + public static void tearDown() { + if (jsc != null) { + jsc.stop(); + jsc = null; + } + } + + private static TypedProperties propsWithValidator(String validatorClassName) { + TypedProperties props = new TypedProperties(); + props.setProperty(HoodiePreCommitValidatorConfig.VALIDATOR_CLASS_NAMES.key(), validatorClassName); + props.setProperty(HoodiePreCommitValidatorConfig.STREAMING_OFFSET_TOLERANCE_PERCENTAGE.key(), "0.0"); + props.setProperty(HoodiePreCommitValidatorConfig.VALIDATION_FAILURE_POLICY.key(), "FAIL"); + return props; + } + + private static WriteStatus buildWriteStatus(String partitionPath, long numInserts, long numUpdates) { + HoodieWriteStat stat = new HoodieWriteStat(); + stat.setPartitionPath(partitionPath); + stat.setNumInserts(numInserts); + stat.setNumUpdateWrites(numUpdates); + + WriteStatus ws = new WriteStatus(false, 0.0); + ws.setStat(stat); + return ws; + } + + private JavaRDD<WriteStatus> toRDD(List<WriteStatus> writeStatuses) { + return jsc.parallelize(writeStatuses); + } + + private org.apache.hudi.common.table.HoodieTableMetaClient createMetaClient() throws IOException { + return org.apache.hudi.common.testutils.HoodieTestUtils.init( + tempDir.toAbsolutePath().toString()); + } + + // ========== Tests ========== Review Comment: Thanks for adding these tests. Both tests seed via STREAMER_CHECKPOINT_KEY_V1. They never exercise the V2-key path because the validator can't read it. In context of [this comment](https://github.com/apache/hudi/pull/18405/changes#r3252136615), shall we make these two tests parameterized over both keys (and ideally a new test on a writeTableVersion=8 table that confirms the validator actually fires)? ########## hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/validator/SparkKafkaOffsetValidator.java: ########## @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.hudi.utilities.streamer.validator; + +import org.apache.hudi.client.validator.StreamingOffsetValidator; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.table.checkpoint.StreamerCheckpointV1; +import org.apache.hudi.common.util.CheckpointUtils.CheckpointFormat; + +/** + * Spark/HoodieStreamer-specific Kafka offset validator. + * + * <p>Validates that the number of records written matches the Kafka offset difference + * between the current and previous HoodieStreamer checkpoints. Uses the Spark Kafka + * checkpoint format stored with key {@code deltastreamer.checkpoint.key} in extraMetadata.</p> + * + * <p>Configuration: + * <ul> + * <li>{@code hoodie.precommit.validators}: Include + * {@code org.apache.hudi.utilities.streamer.validator.SparkKafkaOffsetValidator}</li> + * <li>{@code hoodie.precommit.validators.streaming.offset.tolerance.percentage}: + * Acceptable deviation (default: 0.0 = strict)</li> + * <li>{@code hoodie.precommit.validators.failure.policy}: + * FAIL (default) or WARN_LOG</li> + * </ul></p> + * + * <p>This validator is primarily intended for append-only ingestion from Kafka via HoodieStreamer. + * For upsert workloads with deduplication, configure a higher tolerance or use WARN_LOG.</p> + * + * <p><b>Important:</b> This class extends {@link org.apache.hudi.client.validator.BasePreCommitValidator} + * and is invoked by {@link SparkStreamerValidatorUtils}, NOT by {@code SparkValidatorUtils} + * (which expects {@code SparkPreCommitValidator} with a different constructor signature). + * Listing this class in {@code hoodie.precommit.validators} while also using the standard + * Spark table write-path validators will cause an instantiation failure in {@code SparkValidatorUtils}. + * Use this validator exclusively with HoodieStreamer pipelines.</p> + */ +public class SparkKafkaOffsetValidator extends StreamingOffsetValidator { + + public SparkKafkaOffsetValidator(TypedProperties config) { + super(config, StreamerCheckpointV1.STREAMER_CHECKPOINT_KEY_V1, CheckpointFormat.SPARK_KAFKA); Review Comment: @shangxinli this sounds like a legit concern. > steady-state V2 streamers only write streamer.checkpoint.key.v2 (no V1 fallback unless during a V1→V2 transition via addV1Props) I checked the code: [StreamerCheckpointV2.getCheckpointCommitMetadata()](https://github.com/apache/hudi/blob/4035f707f8a132c58d221a339ff0b44f990a4537/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/StreamerCheckpointV2.java#L57) puts only streamer.checkpoint.key.v2. The V1 key is added to extraProps exclusively via `addV1Props()`. To fix, I see 2 options: - Option 1 (cleanest): have the base `StreamingOffsetValidator` use [CheckpointUtils.getCheckpoint(commitMetadata).getCheckpointKey()](https://github.com/apache/hudi/blob/4035f707f8a132c58d221a339ff0b44f990a4537/hudi-common/src/main/java/org/apache/hudi/common/table/checkpoint/CheckpointUtils.java#L62-L72) as it already does V2-then-V1 fallback, and is the single source of truth for which key is active. The subclass no longer needs to pass a hardcoded key at all. - Option 2 (smaller blast radius): have `SparkKafkaOffsetValidator` look up V2 first, fall back to V1. Doesn't touch the base class or the Flink validator. Whichever option you prefer is fine for this PR. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
