This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch release-1.2.0
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit 99c21cbc219026eef3c9280b5840807a1f983967
Author: Danny Chan <[email protected]>
AuthorDate: Thu May 14 20:53:23 2026 +0800

    fix: remove the pk check for Flink append only table (#18738)
---
 .../keygen/TimestampBasedAvroKeyGenerator.java     |  2 +-
 .../apache/hudi/configuration/OptionsResolver.java | 33 +++++++++++----
 .../apache/hudi/sink/bulk/AutoRowDataKeyGen.java   |  2 +-
 .../org/apache/hudi/sink/bulk/RowDataKeyGens.java  | 11 ++---
 .../java/org/apache/hudi/sink/utils/Pipelines.java |  2 +-
 .../hudi/source/prune/PrimaryKeyPruners.java       |  2 +-
 .../org/apache/hudi/table/HoodieTableFactory.java  | 19 ++++++---
 .../apache/hudi/table/catalog/HoodieCatalog.java   |  6 ++-
 .../hudi/table/catalog/HoodieHiveCatalog.java      |  2 +-
 .../java/org/apache/hudi/util/StreamerUtil.java    |  1 -
 .../hudi/configuration/TestOptionsResolver.java    | 27 +++++++++++++
 .../apache/hudi/sink/bulk/TestRowDataKeyGens.java  | 21 +++++++---
 .../apache/hudi/table/ITTestHoodieDataSource.java  |  8 ++--
 .../apache/hudi/table/TestHoodieTableFactory.java  | 40 ++++++++++++++++++
 .../hudi/table/catalog/TestHoodieCatalog.java      | 47 ++++++++++++++++++++++
 .../org/apache/hudi/utils/TestConfigurations.java  | 23 ++++++++---
 16 files changed, 202 insertions(+), 44 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java
index b38a85cba6c9..efb75be06254 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java
@@ -77,7 +77,7 @@ public class TimestampBasedAvroKeyGenerator extends 
SimpleAvroKeyGenerator {
         config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key()));
   }
 
-  TimestampBasedAvroKeyGenerator(TypedProperties config, String 
partitionPathField) throws IOException {
+  public TimestampBasedAvroKeyGenerator(TypedProperties config, String 
partitionPathField) throws IOException {
     this(config, null, partitionPathField);
   }
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
index 398ca329dbb4..49ccca803b38 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java
@@ -50,6 +50,8 @@ import org.apache.flink.api.common.functions.Partitioner;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.Configuration;
 
+import javax.annotation.Nullable;
+
 import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -154,16 +156,33 @@ public class OptionsResolver {
   }
 
   /**
-   * Return value of {@link FlinkOptions#RECORD_KEY_FIELD} if it was set,
-   * or throw exception otherwise.
+   * Return value of {@link FlinkOptions#RECORD_KEY_FIELD}, could be null if 
it is not set.
    */
+  @Nullable
   public static String getRecordKeyStr(Configuration conf) {
+    return conf.get(FlinkOptions.RECORD_KEY_FIELD);
+  }
+
+  /**
+   * Return the record keys as an array.
+   */
+  public static String[] getRecordKeys(Configuration conf) {
     final String recordKeyStr = conf.get(FlinkOptions.RECORD_KEY_FIELD);
-    ValidationUtils.checkArgument(
-        recordKeyStr != null,
-        "Primary key definition is required, use either PRIMARY KEY syntax or 
option '"
-            + FlinkOptions.RECORD_KEY_FIELD.key() + "' to specify.");
-    return recordKeyStr;
+    if (StringUtils.isNullOrEmpty(recordKeyStr)) {
+      return new String[]{};
+    }
+    return recordKeyStr.split(",");
+  }
+
+  /**
+   * Return the bucket index keys as an array.
+   */
+  public static String[] getBucketIndexKeys(Configuration conf) {
+    final String indexKeyStr = conf.get(FlinkOptions.INDEX_KEY_FIELD);
+    if (StringUtils.isNullOrEmpty(indexKeyStr)) {
+      return new String[]{};
+    }
+    return indexKeyStr.split(",");
   }
 
   /**
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/AutoRowDataKeyGen.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/AutoRowDataKeyGen.java
index 5bf5d944f7fa..ab3182406866 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/AutoRowDataKeyGen.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/AutoRowDataKeyGen.java
@@ -59,7 +59,7 @@ public class AutoRowDataKeyGen extends RowDataKeyGen {
     Option<TimestampBasedAvroKeyGenerator> keyGeneratorOpt = Option.empty();
     if 
(TimestampBasedAvroKeyGenerator.class.getName().equals(conf.get(FlinkOptions.KEYGEN_CLASS_NAME)))
 {
       try {
-        keyGeneratorOpt = Option.of(new 
TimestampBasedAvroKeyGenerator(StreamerUtil.flinkConf2TypedProperties(conf)));
+        keyGeneratorOpt = Option.of(new 
TimestampBasedAvroKeyGenerator(StreamerUtil.flinkConf2TypedProperties(conf), 
conf.get(FlinkOptions.PARTITION_PATH_FIELD)));
       } catch (IOException e) {
         throw new HoodieKeyException("Initialize 
TimestampBasedAvroKeyGenerator error", e);
       }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGens.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGens.java
index fc6128e6918d..f815ada7abcd 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGens.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGens.java
@@ -26,8 +26,6 @@ import org.apache.flink.table.types.logical.RowType;
 
 import javax.annotation.Nullable;
 
-import java.util.List;
-
 /**
  * Factory class for all kinds of {@link RowDataKeyGen}.
  */
@@ -37,8 +35,8 @@ public class RowDataKeyGens {
    * Creates {@link RowDataKeyGen} of corresponding type depending on table 
configuration.
    */
   public static RowDataKeyGen instance(Configuration conf, RowType rowType, 
@Nullable Integer taskId, @Nullable String instantTime) {
-    String recordKeys = OptionsResolver.getRecordKeyStr(conf);
-    if (hasRecordKey(recordKeys, rowType.getFieldNames())) {
+    String[] recordKeys = OptionsResolver.getRecordKeys(conf);
+    if (hasRecordKey(recordKeys)) {
       return RowDataKeyGen.instance(conf, rowType);
     } else {
       if (null == taskId || null == instantTime) {
@@ -59,8 +57,7 @@ public class RowDataKeyGens {
   /**
    * Checks whether user provides any record key.
    */
-  private static boolean hasRecordKey(String recordKeys, List<String> 
fieldNames) {
-    return recordKeys.split(",").length != 1
-        || fieldNames.contains(recordKeys);
+  private static boolean hasRecordKey(String[] recordKeys) {
+    return recordKeys.length > 0;
   }
 }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
index 47d1f8e9e716..caffd52f0265 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java
@@ -166,7 +166,7 @@ public class Pipelines {
       if (conf.get(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) {
         final boolean isNeededSortInput = 
conf.get(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT_BY_RECORD_KEY);
         final String[] partitionFields = 
FilePathUtils.extractPartitionKeys(conf);
-        final String[] recordKeyFields = 
OptionsResolver.getRecordKeyStr(conf).split(",");
+        final String[] recordKeyFields = OptionsResolver.getRecordKeys(conf);
 
         // if sort input by record key is needed then add record keys to 
partition keys
         String[] sortFields = isNeededSortInput
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PrimaryKeyPruners.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PrimaryKeyPruners.java
index 46a71c41518f..45dea1455312 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PrimaryKeyPruners.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PrimaryKeyPruners.java
@@ -43,7 +43,7 @@ import java.util.stream.Collectors;
 public class PrimaryKeyPruners {
 
   public static Function<Integer, Integer> 
getBucketIdFunc(List<ResolvedExpression> hashKeyFilters, Configuration conf) {
-    List<String> pkFields = 
Arrays.asList(OptionsResolver.getRecordKeyStr(conf).split(","));
+    List<String> pkFields = Arrays.asList(OptionsResolver.getRecordKeys(conf));
     // step1: resolve the hash key values
     final boolean logicalTimestamp = 
OptionsResolver.isConsistentLogicalTimestampEnabled(conf);
     List<String> values = hashKeyFilters.stream()
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
index a08f3f56aacb..85d1b72b2f22 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java
@@ -240,7 +240,11 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
   private void checkRecordKey(Configuration conf, ResolvedSchema schema) {
     List<String> fields = schema.getColumnNames();
     if (schema.getPrimaryKey().isEmpty()) {
-      String[] recordKeys = OptionsResolver.getRecordKeyStr(conf).split(",");
+      String[] recordKeys = OptionsResolver.getRecordKeys(conf);
+      if (recordKeys.length == 0) {
+        throw new HoodieValidationException("Primary key definition is 
required, use either PRIMARY KEY syntax or option '"
+            + FlinkOptions.RECORD_KEY_FIELD.key() + "' to specify.");
+      }
       Arrays.stream(recordKeys)
           .filter(field -> !fields.contains(field))
           .findAny()
@@ -293,7 +297,7 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
   private static void setupHoodieKeyOptions(Configuration conf, CatalogTable 
table) {
     List<String> pkColumns = table.getSchema().getPrimaryKey()
         .map(pk -> pk.getColumns()).orElse(Collections.emptyList());
-    if (pkColumns.size() > 0) {
+    if (!pkColumns.isEmpty()) {
       // the PRIMARY KEY syntax always has higher priority than option 
FlinkOptions#RECORD_KEY_FIELD
       String recordKey = String.join(",", pkColumns);
       conf.set(FlinkOptions.RECORD_KEY_FIELD, recordKey);
@@ -309,12 +313,15 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
         log.info("'{}' is not set, therefore '{}' value will be used as index 
key instead",
             FlinkOptions.INDEX_KEY_FIELD.key(),
             FlinkOptions.RECORD_KEY_FIELD.key());
-        conf.set(FlinkOptions.INDEX_KEY_FIELD, 
OptionsResolver.getRecordKeyStr(conf));
+        String recordKeyStr = OptionsResolver.getRecordKeyStr(conf);
+        if (StringUtils.nonEmpty(recordKeyStr)) {
+          conf.set(FlinkOptions.INDEX_KEY_FIELD, recordKeyStr);
+        }
       } else {
         Set<String> recordKeySet =
-            
Arrays.stream(OptionsResolver.getRecordKeyStr(conf).split(",")).collect(Collectors.toSet());
+            
Arrays.stream(OptionsResolver.getRecordKeys(conf)).collect(Collectors.toSet());
         Set<String> indexKeySet =
-            
Arrays.stream(conf.get(FlinkOptions.INDEX_KEY_FIELD).split(",")).collect(Collectors.toSet());
+            
Arrays.stream(OptionsResolver.getBucketIndexKeys(conf)).collect(Collectors.toSet());
         if (!recordKeySet.containsAll(indexKeySet)) {
           throw new HoodieValidationException(
               FlinkOptions.INDEX_KEY_FIELD + " should be a subset of or equal 
to the recordKey fields");
@@ -324,7 +331,7 @@ public class HoodieTableFactory implements 
DynamicTableSourceFactory, DynamicTab
 
     // tweak the key gen class if possible
     final String[] partitions = 
conf.get(FlinkOptions.PARTITION_PATH_FIELD).split(",");
-    final String[] pks = OptionsResolver.getRecordKeyStr(conf).split(",");
+    final String[] pks = OptionsResolver.getRecordKeys(conf);
     if (partitions.length == 1) {
       final String partitionField = partitions[0];
       if (partitionField.isEmpty()) {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
index 065a8a00c276..a8bf340f64e2 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java
@@ -314,7 +314,9 @@ public class HoodieCatalog extends AbstractCatalog {
     Configuration conf = Configuration.fromMap(options);
     conf.set(FlinkOptions.PATH, tablePathStr);
     ResolvedSchema resolvedSchema = resolvedTable.getResolvedSchema();
-    if (!resolvedSchema.getPrimaryKey().isPresent() && 
!conf.containsKey(RECORD_KEY_FIELD.key())) {
+    if (!resolvedSchema.getPrimaryKey().isPresent()
+        && !conf.containsKey(RECORD_KEY_FIELD.key())
+        && !OptionsResolver.isAppendMode(conf)) {
       throw new CatalogException("Primary key definition is missing");
     }
     final String avroSchema = HoodieSchemaConverter.convertToSchema(
@@ -350,7 +352,7 @@ public class HoodieCatalog extends AbstractCatalog {
       conf.set(FlinkOptions.PARTITION_PATH_FIELD, partitions);
       options.put(TableOptionProperties.PARTITION_COLUMNS, partitions);
 
-      final String[] pks = OptionsResolver.getRecordKeyStr(conf).split(",");
+      final String[] pks = OptionsResolver.getRecordKeys(conf);
       boolean complexHoodieKey = pks.length > 1 || 
resolvedTable.getPartitionKeys().size() > 1;
       StreamerUtil.checkKeygenGenerator(complexHoodieKey, conf);
     } else {
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
index 3caeb9565afb..8abe168eb6ea 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java
@@ -514,7 +514,7 @@ public class HoodieHiveCatalog extends AbstractCatalog {
     if (catalogTable.isPartitioned() && 
!flinkConf.contains(FlinkOptions.PARTITION_PATH_FIELD)) {
       final String partitions = String.join(",", 
catalogTable.getPartitionKeys());
       flinkConf.set(FlinkOptions.PARTITION_PATH_FIELD, partitions);
-      final String[] pks = 
OptionsResolver.getRecordKeyStr(flinkConf).split(",");
+      final String[] pks = OptionsResolver.getRecordKeys(flinkConf);
       boolean complexHoodieKey = pks.length > 1 || 
catalogTable.getPartitionKeys().size() > 1;
       StreamerUtil.checkKeygenGenerator(complexHoodieKey, flinkConf);
     }
diff --git 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
index ed5cbbc64ec8..f88d1a76a325 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java
@@ -840,7 +840,6 @@ public class StreamerUtil {
    *
    * @param conf             The Flink configuration
    * @param checkpointId     The checkpoint ID
-   * @param checkpointClient The checkpoint client (nullable)
    * @return Kafka offset checkpoint string in URL-encoded format for Hudi 
metadata,
    * e.g., 
"kafka_metadata%3Atopic-name%3A0:100;kafka_metadata%3Atopic-name%3A1:200"
    * where format is "kafka_metadata%3Atopic%3Apartition:offset" separated by 
semicolons.
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java
index 39a4ee93d7ec..5772a8a1098e 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java
@@ -30,8 +30,10 @@ import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
@@ -52,6 +54,31 @@ public class TestOptionsResolver {
     assertEquals(HoodieIndex.IndexType.BLOOM, 
OptionsResolver.getIndexType(conf));
   }
 
+  @Test
+  void testGetRecordKeys() {
+    Configuration conf = new Configuration();
+    assertNull(OptionsResolver.getRecordKeyStr(conf));
+    assertArrayEquals(new String[]{}, OptionsResolver.getRecordKeys(conf));
+
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, "");
+    assertArrayEquals(new String[]{}, OptionsResolver.getRecordKeys(conf));
+
+    conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid, name");
+    assertArrayEquals(new String[]{"uuid", " name"}, 
OptionsResolver.getRecordKeys(conf));
+  }
+
+  @Test
+  void testGetBucketIndexKeys() {
+    Configuration conf = new Configuration();
+    assertArrayEquals(new String[]{}, 
OptionsResolver.getBucketIndexKeys(conf));
+
+    conf.set(FlinkOptions.INDEX_KEY_FIELD, "");
+    assertArrayEquals(new String[]{}, 
OptionsResolver.getBucketIndexKeys(conf));
+
+    conf.set(FlinkOptions.INDEX_KEY_FIELD, "uuid, name");
+    assertArrayEquals(new String[]{"uuid", " name"}, 
OptionsResolver.getBucketIndexKeys(conf));
+  }
+
   @Test
   void testIsLazyFailedWritesCleanPolicy() {
     Configuration conf = new Configuration();
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGens.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGens.java
index 0199aa74c14b..f82ed02d6fdf 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGens.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGens.java
@@ -226,7 +226,7 @@ public class TestRowDataKeyGens {
   @Test
   void testPrimaryKeylessWrite() {
     Configuration conf = TestConfigurations.getDefaultConf("path1");
-    conf.set(FlinkOptions.RECORD_KEY_FIELD, "");
+    conf.removeConfig(FlinkOptions.RECORD_KEY_FIELD);
     final RowData rowData1 = insertRow(StringData.fromString("id1"), 
StringData.fromString("Danny"), 23,
         TimestampData.fromEpochMillis(1), StringData.fromString("par1"));
     final int taskId = 3;
@@ -252,7 +252,7 @@ public class TestRowDataKeyGens {
     final String instantTime = "000001";
 
     Configuration conf = TestConfigurations.getDefaultConf("path1");
-    conf.set(FlinkOptions.RECORD_KEY_FIELD, "");
+    conf.removeConfig(FlinkOptions.RECORD_KEY_FIELD);
     conf.set(FlinkOptions.PARTITION_PATH_FIELD, "ts");
     conf.set(FlinkOptions.PARTITION_FORMAT, partitionFormat);
     HoodieTableFactory.setupTimestampKeygenOptions(conf, 
DataTypes.TIMESTAMP(3));
@@ -284,9 +284,8 @@ public class TestRowDataKeyGens {
   }
 
   @Test
-  void testAutoKeyGenRecordKey() {
+  void testAutoKeyGenRecordKeyWithEmptyRecordKeyField() {
     Configuration conf = TestConfigurations.getDefaultConf("path1");
-    // without record keys AutoRowDataKeyGen will be used, which expects 
taskId and instantTime parameters for instantiation
     conf.set(FlinkOptions.RECORD_KEY_FIELD, "");
 
     int taskId = 1;
@@ -296,11 +295,23 @@ public class TestRowDataKeyGens {
     assertThat(autoKeyGen.getRecordKey(TestData.DATA_SET_INSERT.get(1)), 
is(instantTime + "_" + taskId + "_1"));
   }
 
+  @Test
+  void testAutoKeyGenRecordKeyWithoutDeclaringRecordKeyField() {
+    Configuration conf = TestConfigurations.getDefaultConf("path1");
+    conf.removeConfig(FlinkOptions.RECORD_KEY_FIELD);
+
+    int taskId = 1;
+    String instantTime = "20250716145212986";
+    final AutoRowDataKeyGen autoKeyGen = (AutoRowDataKeyGen) 
RowDataKeyGens.instance(conf, TestConfigurations.ROW_TYPE, taskId, instantTime);
+    assertThat(autoKeyGen.getRecordKey(TestData.DATA_SET_INSERT.get(0)), 
is(instantTime + "_" + taskId + "_0"));
+    assertThat(autoKeyGen.getRecordKey(TestData.DATA_SET_INSERT.get(1)), 
is(instantTime + "_" + taskId + "_1"));
+  }
+
   @Test
   void testAutoKeyGenNotAllowNulls() {
     Configuration conf = TestConfigurations.getDefaultConf("path1");
     // without record keys AutoRowDataKeyGen will be used, which expects 
taskId and instantTime parameters for instantiation
-    conf.set(FlinkOptions.RECORD_KEY_FIELD, "");
+    conf.removeConfig(FlinkOptions.RECORD_KEY_FIELD);
 
     HoodieValidationException exNullInstant =
         assertThrows(HoodieValidationException.class, () -> 
RowDataKeyGens.instance(conf, TestConfigurations.ROW_TYPE, 1, null));
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
index 37d5e9169ff3..aa9fa49312ad 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java
@@ -21,6 +21,7 @@ package org.apache.hudi.table;
 import org.apache.hudi.common.config.HoodieMetadataConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.model.DefaultHoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableConfig;
@@ -31,7 +32,6 @@ import org.apache.hudi.common.table.marker.MarkerType;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.CollectionUtils;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.common.model.HoodieFileFormat;
 import org.apache.hudi.configuration.FlinkOptions;
 import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.index.bucket.partition.PartitionBucketIndexUtils;
@@ -416,7 +416,6 @@ public class ITTestHoodieDataSource {
 
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
-        .options(getDefaultKeys())
         .option(FlinkOptions.OPERATION, "insert")
         .option(FlinkOptions.READ_AS_STREAMING, true)
         .option(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true)
@@ -446,7 +445,6 @@ public class ITTestHoodieDataSource {
 
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
-        .options(getDefaultKeys())
         .option(FlinkOptions.OPERATION, "insert")
         .option(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true)
         .option(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true)
@@ -1651,9 +1649,9 @@ public class ITTestHoodieDataSource {
 
     String hoodieTableDDL = sql("hoodie_sink")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
-        .options(getDefaultKeys())
         .option(FlinkOptions.OPERATION, "insert")
         .option(FlinkOptions.INSERT_CLUSTER, clustering)
+        .option(FlinkOptions.RECORD_KEY_FIELD, clustering ? "uuid" : "")
         .end();
     tableEnv.executeSql(hoodieTableDDL);
 
@@ -3212,11 +3210,11 @@ public class ITTestHoodieDataSource {
 
     String hoodieTableDDL = sql("t1")
         .option(FlinkOptions.PATH, tempFile.getAbsolutePath())
-        .options(getDefaultKeys())
         .option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ)
         .option(FlinkOptions.OPERATION, "insert")
         .option(FlinkOptions.WRITE_BUFFER_MEMORY_TYPE, 
BufferMemoryType.MANAGED)
         .option(FlinkOptions.WRITE_BUFFER_TYPE, bufferType.name())
+        .option(FlinkOptions.RECORD_KEY_FIELD, "uuid")
         .end();
     streamTableEnv.executeSql(hoodieTableDDL);
 
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
index 4d29503208dc..f594e9575860 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java
@@ -192,6 +192,46 @@ public class TestHoodieTableFactory {
     assertDoesNotThrow(() -> new 
HoodieTableFactory().createDynamicTableSink(sourceContext6));
   }
 
+  @Test
+  void testAppendOnlySinkWithoutRecordKey() {
+    Configuration appendOnlyConf = new Configuration();
+    appendOnlyConf.set(FlinkOptions.PATH, new File(tempFile, 
"append_only_without_record_key").getAbsolutePath());
+    appendOnlyConf.set(FlinkOptions.TABLE_NAME, 
"append_only_without_record_key");
+    appendOnlyConf.set(FlinkOptions.OPERATION, "insert");
+
+    ResolvedSchema schema = SchemaBuilder.instance()
+        .field("f0", DataTypes.INT())
+        .field("f1", DataTypes.VARCHAR(20))
+        .field("ts", DataTypes.TIMESTAMP(3))
+        .build();
+    MockContext context = MockContext.getInstance(appendOnlyConf, schema, "");
+
+    HoodieTableSink tableSink = (HoodieTableSink) new 
HoodieTableFactory().createDynamicTableSink(context);
+    assertNull(tableSink.getConf().get(FlinkOptions.RECORD_KEY_FIELD));
+    assertThat(tableSink.getConf().get(FlinkOptions.ORDERING_FIELDS), 
is(FlinkOptions.NO_PRE_COMBINE));
+    assertThat(tableSink.getConf().get(FlinkOptions.KEYGEN_CLASS_NAME), 
is(NonpartitionedAvroKeyGenerator.class.getName()));
+  }
+
+  @Test
+  void testNonAppendSinkRequiresRecordKey() {
+    Configuration upsertConf = new Configuration();
+    upsertConf.set(FlinkOptions.PATH, new File(tempFile, 
"upsert_without_record_key").getAbsolutePath());
+    upsertConf.set(FlinkOptions.TABLE_NAME, "upsert_without_record_key");
+
+    ResolvedSchema schema = SchemaBuilder.instance()
+        .field("f0", DataTypes.INT())
+        .field("f1", DataTypes.VARCHAR(20))
+        .field("ts", DataTypes.TIMESTAMP(3))
+        .build();
+    MockContext context = MockContext.getInstance(upsertConf, schema, "");
+
+    HoodieValidationException exception = assertThrows(
+        HoodieValidationException.class,
+        () -> new HoodieTableFactory().createDynamicTableSink(context));
+    assertThat(exception.getMessage(), is("Primary key definition is required, 
use either PRIMARY KEY syntax or option '"
+        + FlinkOptions.RECORD_KEY_FIELD.key() + "' to specify."));
+  }
+
   @Test
   void testIndexTypeCheck() {
     ResolvedSchema schema = SchemaBuilder.instance()
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
index ab8ce6f58317..9934d8a4e160 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java
@@ -333,6 +333,53 @@ public class TestHoodieCatalog extends 
BaseTestHoodieCatalog {
     assertEquals(keyGeneratorClassName, 
NonpartitionedAvroKeyGenerator.class.getName());
   }
 
+  @Test
+  public void testCreateAppendOnlyTableWithoutRecordKey() throws Exception {
+    ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, 
"tb_append_only_without_record_key");
+    ResolvedSchema schemaWithoutPrimaryKey = new ResolvedSchema(
+        CREATE_COLUMNS,
+        Collections.emptyList(),
+        null);
+    Map<String, String> options = new HashMap<>(EXPECTED_OPTIONS);
+    options.put(FlinkOptions.OPERATION.key(), "insert");
+    ResolvedCatalogTable catalogTable = new ResolvedCatalogTable(
+        CatalogUtils.createCatalogTable(
+            
Schema.newBuilder().fromResolvedSchema(schemaWithoutPrimaryKey).build(),
+            Arrays.asList("partition"),
+            options,
+            "test"),
+        schemaWithoutPrimaryKey
+    );
+
+    catalog.createTable(tablePath, catalogTable, false);
+
+    HoodieTableMetaClient metaClient = createMetaClient(
+        new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(new 
Configuration())),
+        catalog.inferTablePath(catalogPathStr, tablePath));
+    assertFalse(metaClient.getTableConfig().getRecordKeyFields().isPresent());
+    assertEquals(SimpleAvroKeyGenerator.class.getName(), 
metaClient.getTableConfig().getKeyGeneratorClassName());
+  }
+
+  @Test
+  public void testCreateNonAppendTableWithoutRecordKey() {
+    ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, 
"tb_non_append_without_record_key");
+    ResolvedSchema schemaWithoutPrimaryKey = new ResolvedSchema(
+        CREATE_COLUMNS,
+        Collections.emptyList(),
+        null);
+    ResolvedCatalogTable catalogTable = new ResolvedCatalogTable(
+        CatalogUtils.createCatalogTable(
+            
Schema.newBuilder().fromResolvedSchema(schemaWithoutPrimaryKey).build(),
+            Arrays.asList("partition"),
+            EXPECTED_OPTIONS,
+            "test"),
+        schemaWithoutPrimaryKey
+    );
+
+    CatalogException exception = assertThrows(CatalogException.class, () -> 
catalog.createTable(tablePath, catalogTable, false));
+    assertEquals("Primary key definition is missing", exception.getMessage());
+  }
+
   @Test
   void testCreateTableWithPartitionBucketIndex() throws 
TableAlreadyExistException, DatabaseNotExistException, IOException {
     String rule = "regex";
diff --git 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
index e2a4558e0ee3..60b6fad6c8cb 100644
--- 
a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
+++ 
b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java
@@ -241,11 +241,18 @@ public class TestConfigurations {
       String partitionField) {
     StringBuilder builder = new StringBuilder();
     builder.append("create table ").append(tableName).append("(\n");
-    for (String field : fields) {
-      builder.append("  ").append(field).append(",\n");
+    for (int i = 0; i < fields.size(); i++) {
+      builder.append("  ").append(fields.get(i));
+      if (i == fields.size() - 1 && pkField == null) {
+        builder.append(")\n");
+      } else {
+        builder.append(",\n");
+      }
+    }
+    if (pkField != null) {
+      builder.append("  PRIMARY KEY(").append(pkField).append(") NOT 
ENFORCED\n")
+          .append(")\n");
     }
-    builder.append("  PRIMARY KEY(").append(pkField).append(") NOT ENFORCED\n")
-        .append(")\n");
     if (havePartition) {
       builder.append("PARTITIONED BY 
(`").append(partitionField).append("`)\n");
     }
@@ -420,7 +427,7 @@ public class TestConfigurations {
     private final String tableName;
     private List<String> fields = new ArrayList<>();
     private boolean withPartition = true;
-    private String pkField = "uuid";
+    private String pkField = null;
     private String partitionField = "partition";
 
     public Sql(String tableName) {
@@ -464,9 +471,13 @@ public class TestConfigurations {
     }
 
     public String end() {
-      if (this.fields.size() == 0) {
+      if (this.fields.isEmpty()) {
         this.fields = FIELDS;
       }
+      if 
(!"insert".equalsIgnoreCase(options.get(FlinkOptions.OPERATION.key())) && 
this.pkField == null) {
+        // assign default pk for upsert table
+        this.pkField = "uuid";
+      }
       return TestConfigurations.getCreateHoodieTableDDL(this.tableName, 
this.fields, options,
           this.withPartition, this.pkField, this.partitionField);
     }

Reply via email to