This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d4ff54b571c7 fix(flink): Use timestamp based partitioning in 
AutoRowDataKeyGen (#18090)
d4ff54b571c7 is described below

commit d4ff54b571c73478317801dd8201cc8023801c0c
Author: Prashant Wason <[email protected]>
AuthorDate: Thu Feb 26 19:20:26 2026 -0800

    fix(flink): Use timestamp based partitioning in AutoRowDataKeyGen (#18090)
    
    Summary:
    Problem: when set FlinkOptions.RECORD_KEY_FIELD to empty, flink hudi allows 
to auto generate hoodie_record_key without specifying primary key uuid which is 
nullified in ingestion job. However the partition format is incorrectly set.
    
    Solution: use TimestampBasedAvroKeyGenerator to format partitions in the 
class of AutoRowDataKeyGen same as done in RowDataKeyGen
    
    Co-authored-by: Jing Li <[email protected]>
---
 .../apache/hudi/sink/bulk/AutoRowDataKeyGen.java   | 22 ++++++++++++++++++----
 .../apache/hudi/sink/bulk/TestRowDataKeyGens.java  | 20 ++++++++++++++++++++
 2 files changed, 38 insertions(+), 4 deletions(-)

diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/AutoRowDataKeyGen.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/AutoRowDataKeyGen.java
index 27979ee93c59..5bf5d944f7fa 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/AutoRowDataKeyGen.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/AutoRowDataKeyGen.java
@@ -22,11 +22,16 @@ import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.configuration.OptionsResolver;
+import org.apache.hudi.exception.HoodieKeyException;
+import org.apache.hudi.keygen.TimestampBasedAvroKeyGenerator;
+import org.apache.hudi.util.StreamerUtil;
 
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 
+import java.io.IOException;
+
 /**
  * Key generator for {@link RowData} that use an auto key generator.
  */
@@ -42,17 +47,26 @@ public class AutoRowDataKeyGen extends RowDataKeyGen {
       RowType rowType,
       boolean hiveStylePartitioning,
       boolean encodePartitionPath,
-      boolean useCompkexKeygenNewEncoding) {
-    super(Option.empty(), partitionFields, rowType, hiveStylePartitioning, 
encodePartitionPath, false, Option.empty(),
-            useCompkexKeygenNewEncoding);
+      boolean useComplexKeygenNewEncoding,
+      Option<TimestampBasedAvroKeyGenerator> keyGenOpt) {
+    super(Option.empty(), partitionFields, rowType, hiveStylePartitioning, 
encodePartitionPath, false, keyGenOpt,
+            useComplexKeygenNewEncoding);
     this.taskId = taskId;
     this.instantTime = instantTime;
   }
 
   public static RowDataKeyGen instance(Configuration conf, RowType rowType, 
int taskId, String instantTime) {
+    Option<TimestampBasedAvroKeyGenerator> keyGeneratorOpt = Option.empty();
+    if 
(TimestampBasedAvroKeyGenerator.class.getName().equals(conf.get(FlinkOptions.KEYGEN_CLASS_NAME)))
 {
+      try {
+        keyGeneratorOpt = Option.of(new 
TimestampBasedAvroKeyGenerator(StreamerUtil.flinkConf2TypedProperties(conf)));
+      } catch (IOException e) {
+        throw new HoodieKeyException("Initialize 
TimestampBasedAvroKeyGenerator error", e);
+      }
+    }
     return new AutoRowDataKeyGen(taskId, instantTime, 
conf.get(FlinkOptions.PARTITION_PATH_FIELD),
         rowType, conf.get(FlinkOptions.HIVE_STYLE_PARTITIONING), 
conf.get(FlinkOptions.URL_ENCODE_PARTITIONING),
-        OptionsResolver.useComplexKeygenNewEncoding(conf));
+        OptionsResolver.useComplexKeygenNewEncoding(conf), keyGeneratorOpt);
   }
 
   @Override
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGens.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGens.java
index 18b85fde733d..0199aa74c14b 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGens.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGens.java
@@ -245,6 +245,26 @@ public class TestRowDataKeyGens {
     assertThat(keyGen1.getRecordKey(rowData3), is(instantTime + "_" + taskId + 
"_2"));
   }
 
+  @Test
+  void testTimestampBasedKeyGeneratorForKeylessWrite() {
+    final String partitionFormat = "yyyy/MM/dd";
+    final int taskId = 3;
+    final String instantTime = "000001";
+
+    Configuration conf = TestConfigurations.getDefaultConf("path1");
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, "");
+    conf.set(FlinkOptions.PARTITION_PATH_FIELD, "ts");
+    conf.set(FlinkOptions.PARTITION_FORMAT, partitionFormat);
+    HoodieTableFactory.setupTimestampKeygenOptions(conf, 
DataTypes.TIMESTAMP(3));
+
+    final RowData rowData = insertRow(StringData.fromString("id1"), 
StringData.fromString("Danny"), 23,
+            TimestampData.fromEpochMillis(7200000), 
StringData.fromString("par1"));
+
+    final RowDataKeyGen keyGen = RowDataKeyGens.instance(conf, 
TestConfigurations.ROW_TYPE, taskId, instantTime);
+    assertThat(keyGen.getRecordKey(rowData), is(instantTime + "_" + taskId + 
"_0"));
+    assertThat(keyGen.getPartitionPath(rowData), is("1970/01/01"));
+  }
+
   @Test
   void testRecordKeyContainsTimestamp() {
     Configuration conf = TestConfigurations.getDefaultConf("path1");

Reply via email to