This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-0.14.1-keygen in repository https://gitbox.apache.org/repos/asf/hudi.git
commit c9e5b696bde6a772a75c52286b1f11a6596d49a5 Author: Y Ethan Guo <[email protected]> AuthorDate: Tue Jul 22 12:23:54 2025 -0700 WIP --- .../org/apache/hudi/keygen/SimpleKeyGenerator.java | 86 ---------------------- ...enerator.java => UserProvidedKeyGenerator.java} | 41 ++++------- .../apache/hudi/functional/TestCOWDataSource.scala | 32 ++++++++ 3 files changed, 46 insertions(+), 113 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java index c897d6b657e9..722a13f5677b 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java @@ -36,91 +36,5 @@ import static org.apache.hudi.common.util.ValidationUtils.checkArgument; */ public class SimpleKeyGenerator extends BuiltinKeyGenerator { - private final SimpleAvroKeyGenerator simpleAvroKeyGenerator; - public SimpleKeyGenerator(TypedProperties props) { - this(props, Option.ofNullable(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), null)), - props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())); - } - - SimpleKeyGenerator(TypedProperties props, String partitionPathField) { - this(props, Option.empty(), partitionPathField); - } - - SimpleKeyGenerator(TypedProperties props, Option<String> recordKeyField, String partitionPathField) { - super(props); - // Make sure key-generator is configured properly - validateRecordKey(recordKeyField); - validatePartitionPath(partitionPathField); - - this.recordKeyFields = !recordKeyField.isPresent() ? Collections.emptyList() : Collections.singletonList(recordKeyField.get()); - this.partitionPathFields = partitionPathField == null ? Collections.emptyList() : Collections.singletonList(partitionPathField); - this.simpleAvroKeyGenerator = new SimpleAvroKeyGenerator(props, recordKeyField, partitionPathField); - } - - @Override - public String getRecordKey(GenericRecord record) { - return simpleAvroKeyGenerator.getRecordKey(record); - } - - @Override - public String getPartitionPath(GenericRecord record) { - return simpleAvroKeyGenerator.getPartitionPath(record); - } - - @Override - public String getRecordKey(Row row) { - tryInitRowAccessor(row.schema()); - - Object[] recordKeys = rowAccessor.getRecordKeyParts(row); - // NOTE: [[SimpleKeyGenerator]] is restricted to allow only primitive (non-composite) - // record-key field - if (recordKeys[0] == null) { - return handleNullRecordKey(null); - } else { - return requireNonNullNonEmptyKey(recordKeys[0].toString()); - } - } - - @Override - public UTF8String getRecordKey(InternalRow internalRow, StructType schema) { - tryInitRowAccessor(schema); - - Object[] recordKeyValues = rowAccessor.getRecordKeyParts(internalRow); - // NOTE: [[SimpleKeyGenerator]] is restricted to allow only primitive (non-composite) - // record-key field - if (recordKeyValues[0] == null) { - return handleNullRecordKey(null); - } else if (recordKeyValues[0] instanceof UTF8String) { - return requireNonNullNonEmptyKey((UTF8String) recordKeyValues[0]); - } else { - return requireNonNullNonEmptyKey(UTF8String.fromString(recordKeyValues[0].toString())); - } - } - - @Override - public String getPartitionPath(Row row) { - tryInitRowAccessor(row.schema()); - return combinePartitionPath(rowAccessor.getRecordPartitionPathValues(row)); - } - - @Override - public UTF8String getPartitionPath(InternalRow row, StructType schema) { - tryInitRowAccessor(schema); - return combinePartitionPathUnsafe(rowAccessor.getRecordPartitionPathValues(row)); - } - - private static void validatePartitionPath(String partitionPathField) { - checkArgument(partitionPathField == null || !partitionPathField.isEmpty(), - "Partition-path field has to be non-empty!"); - checkArgument(partitionPathField == null || !partitionPathField.contains(FIELDS_SEP), - String.format("Single partition-path field is expected; provided (%s)", partitionPathField)); - } - - private void validateRecordKey(Option<String> recordKeyField) { - checkArgument(!recordKeyField.isPresent() || !recordKeyField.get().isEmpty(), - "Record key field has to be non-empty!"); - checkArgument(!recordKeyField.isPresent() || !recordKeyField.get().contains(FIELDS_SEP), - String.format("Single record-key field is expected; provided (%s)", recordKeyField)); - } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/UserProvidedKeyGenerator.java similarity index 76% copy from hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java copy to hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/UserProvidedKeyGenerator.java index c897d6b657e9..ca05501642bb 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/UserProvidedKeyGenerator.java @@ -1,53 +1,40 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - package org.apache.hudi.keygen; -import org.apache.avro.generic.GenericRecord; import org.apache.hudi.common.config.TypedProperties; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.exception.HoodieKeyException; +import org.apache.hudi.exception.HoodieKeyGeneratorException; import org.apache.hudi.keygen.constant.KeyGeneratorOptions; + +import org.apache.avro.generic.GenericRecord; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.InternalRow; import org.apache.spark.sql.types.StructType; import org.apache.spark.unsafe.types.UTF8String; +import java.io.IOException; +import java.util.Arrays; import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; import static org.apache.hudi.common.util.ValidationUtils.checkArgument; -/** - * Simple key generator, which takes names of fields to be used for recordKey and partitionPath as configs. - */ -public class SimpleKeyGenerator extends BuiltinKeyGenerator { +public class UserProvidedKeyGenerator extends BuiltinKeyGenerator { private final SimpleAvroKeyGenerator simpleAvroKeyGenerator; - public SimpleKeyGenerator(TypedProperties props) { + public UserProvidedKeyGenerator(TypedProperties props) { this(props, Option.ofNullable(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(), null)), props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())); } - SimpleKeyGenerator(TypedProperties props, String partitionPathField) { + UserProvidedKeyGenerator(TypedProperties props, String partitionPathField) { this(props, Option.empty(), partitionPathField); } - SimpleKeyGenerator(TypedProperties props, Option<String> recordKeyField, String partitionPathField) { + UserProvidedKeyGenerator(TypedProperties props, Option<String> recordKeyField, String partitionPathField) { super(props); // Make sure key-generator is configured properly validateRecordKey(recordKeyField); @@ -123,4 +110,4 @@ public class SimpleKeyGenerator extends BuiltinKeyGenerator { checkArgument(!recordKeyField.isPresent() || !recordKeyField.get().contains(FIELDS_SEP), String.format("Single record-key field is expected; provided (%s)", recordKeyField)); } -} +} \ No newline at end of file diff --git a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala index f500ea83120d..70d36db737c2 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala +++ b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala @@ -348,6 +348,38 @@ class TestCOWDataSource extends HoodieSparkClientTestBase with ScalaAssertionSup .save(basePath) } + @Test + def testUserProvidedKeyGeneratorClass(): Unit = { + val recordType = HoodieRecordType.AVRO + val (writeOpts, readOpts) = getWriterReaderOpts(recordType, Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + "hoodie.bulkinsert.shuffle.parallelism" -> "2", + "hoodie.delete.shuffle.parallelism" -> "1", + "hoodie.datasource.write.precombine.field" -> "timestamp", + "hoodie.datasource.write.keygenerator.class" -> "org.apache.hudi.keygen.UserProvidedKeyGenerator", + DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition" + )) + + // Insert Operation + val records = recordsToStrings(dataGen.generateInserts("000", 100)).asScala.toList + val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2)) + + val commonOptsNoPreCombine = Map( + "hoodie.insert.shuffle.parallelism" -> "4", + "hoodie.upsert.shuffle.parallelism" -> "4", + DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key", + HoodieWriteConfig.TBL_NAME.key -> "hoodie_test" + ) ++ writeOpts + writeToHudi(commonOptsNoPreCombine, inputDF) + + spark.read.format("hudi").options(readOpts).load(basePath).count() + + writeToHudi(commonOptsNoPreCombine, inputDF) + + spark.read.format("hudi").options(readOpts).load(basePath).count() + } + @ParameterizedTest @CsvSource(Array("hoodie.datasource.write.recordkey.field,begin_lat", "hoodie.datasource.write.partitionpath.field,end_lon", "hoodie.datasource.write.keygenerator.class,org.apache.hudi.keygen.NonpartitionedKeyGenerator", "hoodie.datasource.write.precombine.field,fare"))
