This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-0.14.1-keygen
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit c9e5b696bde6a772a75c52286b1f11a6596d49a5
Author: Y Ethan Guo <[email protected]>
AuthorDate: Tue Jul 22 12:23:54 2025 -0700

    WIP
---
 .../org/apache/hudi/keygen/SimpleKeyGenerator.java | 86 ----------------------
 ...enerator.java => UserProvidedKeyGenerator.java} | 41 ++++-------
 .../apache/hudi/functional/TestCOWDataSource.scala | 32 ++++++++
 3 files changed, 46 insertions(+), 113 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
index c897d6b657e9..722a13f5677b 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
@@ -36,91 +36,5 @@ import static 
org.apache.hudi.common.util.ValidationUtils.checkArgument;
  */
 public class SimpleKeyGenerator extends BuiltinKeyGenerator {
 
-  private final SimpleAvroKeyGenerator simpleAvroKeyGenerator;
 
-  public SimpleKeyGenerator(TypedProperties props) {
-    this(props, 
Option.ofNullable(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
 null)),
-        props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()));
-  }
-
-  SimpleKeyGenerator(TypedProperties props, String partitionPathField) {
-    this(props, Option.empty(), partitionPathField);
-  }
-
-  SimpleKeyGenerator(TypedProperties props, Option<String> recordKeyField, 
String partitionPathField) {
-    super(props);
-    // Make sure key-generator is configured properly
-    validateRecordKey(recordKeyField);
-    validatePartitionPath(partitionPathField);
-
-    this.recordKeyFields = !recordKeyField.isPresent() ? 
Collections.emptyList() : Collections.singletonList(recordKeyField.get());
-    this.partitionPathFields = partitionPathField == null ? 
Collections.emptyList() : Collections.singletonList(partitionPathField);
-    this.simpleAvroKeyGenerator = new SimpleAvroKeyGenerator(props, 
recordKeyField, partitionPathField);
-  }
-
-  @Override
-  public String getRecordKey(GenericRecord record) {
-    return simpleAvroKeyGenerator.getRecordKey(record);
-  }
-
-  @Override
-  public String getPartitionPath(GenericRecord record) {
-    return simpleAvroKeyGenerator.getPartitionPath(record);
-  }
-
-  @Override
-  public String getRecordKey(Row row) {
-    tryInitRowAccessor(row.schema());
-
-    Object[] recordKeys = rowAccessor.getRecordKeyParts(row);
-    // NOTE: [[SimpleKeyGenerator]] is restricted to allow only primitive 
(non-composite)
-    //       record-key field
-    if (recordKeys[0] == null) {
-      return handleNullRecordKey(null);
-    } else {
-      return requireNonNullNonEmptyKey(recordKeys[0].toString());
-    }
-  }
-
-  @Override
-  public UTF8String getRecordKey(InternalRow internalRow, StructType schema) {
-    tryInitRowAccessor(schema);
-
-    Object[] recordKeyValues = rowAccessor.getRecordKeyParts(internalRow);
-    // NOTE: [[SimpleKeyGenerator]] is restricted to allow only primitive 
(non-composite)
-    //       record-key field
-    if (recordKeyValues[0] == null) {
-      return handleNullRecordKey(null);
-    } else if (recordKeyValues[0] instanceof UTF8String) {
-      return requireNonNullNonEmptyKey((UTF8String) recordKeyValues[0]);
-    } else {
-      return 
requireNonNullNonEmptyKey(UTF8String.fromString(recordKeyValues[0].toString()));
-    }
-  }
-
-  @Override
-  public String getPartitionPath(Row row) {
-    tryInitRowAccessor(row.schema());
-    return combinePartitionPath(rowAccessor.getRecordPartitionPathValues(row));
-  }
-
-  @Override
-  public UTF8String getPartitionPath(InternalRow row, StructType schema) {
-    tryInitRowAccessor(schema);
-    return 
combinePartitionPathUnsafe(rowAccessor.getRecordPartitionPathValues(row));
-  }
-
-  private static void validatePartitionPath(String partitionPathField) {
-    checkArgument(partitionPathField == null || !partitionPathField.isEmpty(),
-        "Partition-path field has to be non-empty!");
-    checkArgument(partitionPathField == null || 
!partitionPathField.contains(FIELDS_SEP),
-        String.format("Single partition-path field is expected; provided 
(%s)", partitionPathField));
-  }
-
-  private void validateRecordKey(Option<String> recordKeyField) {
-    checkArgument(!recordKeyField.isPresent() || 
!recordKeyField.get().isEmpty(),
-        "Record key field has to be non-empty!");
-    checkArgument(!recordKeyField.isPresent() || 
!recordKeyField.get().contains(FIELDS_SEP),
-        String.format("Single record-key field is expected; provided (%s)", 
recordKeyField));
-  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/UserProvidedKeyGenerator.java
similarity index 76%
copy from 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
copy to 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/UserProvidedKeyGenerator.java
index c897d6b657e9..ca05501642bb 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/SimpleKeyGenerator.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/keygen/UserProvidedKeyGenerator.java
@@ -1,53 +1,40 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
 package org.apache.hudi.keygen;
 
-import org.apache.avro.generic.GenericRecord;
 import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.exception.HoodieKeyGeneratorException;
 import org.apache.hudi.keygen.constant.KeyGeneratorOptions;
+
+import org.apache.avro.generic.GenericRecord;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.unsafe.types.UTF8String;
 
+import java.io.IOException;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
 
 import static org.apache.hudi.common.util.ValidationUtils.checkArgument;
 
-/**
- * Simple key generator, which takes names of fields to be used for recordKey 
and partitionPath as configs.
- */
-public class SimpleKeyGenerator extends BuiltinKeyGenerator {
+public class UserProvidedKeyGenerator extends BuiltinKeyGenerator {
 
   private final SimpleAvroKeyGenerator simpleAvroKeyGenerator;
 
-  public SimpleKeyGenerator(TypedProperties props) {
+  public UserProvidedKeyGenerator(TypedProperties props) {
     this(props, 
Option.ofNullable(props.getString(KeyGeneratorOptions.RECORDKEY_FIELD_NAME.key(),
 null)),
         props.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()));
   }
 
-  SimpleKeyGenerator(TypedProperties props, String partitionPathField) {
+  UserProvidedKeyGenerator(TypedProperties props, String partitionPathField) {
     this(props, Option.empty(), partitionPathField);
   }
 
-  SimpleKeyGenerator(TypedProperties props, Option<String> recordKeyField, 
String partitionPathField) {
+  UserProvidedKeyGenerator(TypedProperties props, Option<String> 
recordKeyField, String partitionPathField) {
     super(props);
     // Make sure key-generator is configured properly
     validateRecordKey(recordKeyField);
@@ -123,4 +110,4 @@ public class SimpleKeyGenerator extends BuiltinKeyGenerator 
{
     checkArgument(!recordKeyField.isPresent() || 
!recordKeyField.get().contains(FIELDS_SEP),
         String.format("Single record-key field is expected; provided (%s)", 
recordKeyField));
   }
-}
+}
\ No newline at end of file
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
index f500ea83120d..70d36db737c2 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestCOWDataSource.scala
@@ -348,6 +348,38 @@ class TestCOWDataSource extends HoodieSparkClientTestBase 
with ScalaAssertionSup
       .save(basePath)
   }
 
+  @Test
+  def testUserProvidedKeyGeneratorClass(): Unit = {
+    val recordType = HoodieRecordType.AVRO
+    val (writeOpts, readOpts) = getWriterReaderOpts(recordType, Map(
+      "hoodie.insert.shuffle.parallelism" -> "4",
+      "hoodie.upsert.shuffle.parallelism" -> "4",
+      "hoodie.bulkinsert.shuffle.parallelism" -> "2",
+      "hoodie.delete.shuffle.parallelism" -> "1",
+      "hoodie.datasource.write.precombine.field" -> "timestamp",
+      "hoodie.datasource.write.keygenerator.class" -> 
"org.apache.hudi.keygen.UserProvidedKeyGenerator",
+      DataSourceWriteOptions.PARTITIONPATH_FIELD.key -> "partition"
+    ))
+
+    // Insert Operation
+    val records = recordsToStrings(dataGen.generateInserts("000", 
100)).asScala.toList
+    val inputDF = spark.read.json(spark.sparkContext.parallelize(records, 2))
+
+    val commonOptsNoPreCombine = Map(
+      "hoodie.insert.shuffle.parallelism" -> "4",
+      "hoodie.upsert.shuffle.parallelism" -> "4",
+      DataSourceWriteOptions.RECORDKEY_FIELD.key -> "_row_key",
+      HoodieWriteConfig.TBL_NAME.key -> "hoodie_test"
+    ) ++ writeOpts
+    writeToHudi(commonOptsNoPreCombine, inputDF)
+
+    spark.read.format("hudi").options(readOpts).load(basePath).count()
+
+    writeToHudi(commonOptsNoPreCombine, inputDF)
+
+    spark.read.format("hudi").options(readOpts).load(basePath).count()
+  }
+
   @ParameterizedTest
   @CsvSource(Array("hoodie.datasource.write.recordkey.field,begin_lat", 
"hoodie.datasource.write.partitionpath.field,end_lon",
     
"hoodie.datasource.write.keygenerator.class,org.apache.hudi.keygen.NonpartitionedKeyGenerator",
 "hoodie.datasource.write.precombine.field,fare"))

Reply via email to