This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch release-1.2.0 in repository https://gitbox.apache.org/repos/asf/hudi.git
commit 99c21cbc219026eef3c9280b5840807a1f983967 Author: Danny Chan <[email protected]> AuthorDate: Thu May 14 20:53:23 2026 +0800 fix: remove the pk check for Flink append only table (#18738) --- .../keygen/TimestampBasedAvroKeyGenerator.java | 2 +- .../apache/hudi/configuration/OptionsResolver.java | 33 +++++++++++---- .../apache/hudi/sink/bulk/AutoRowDataKeyGen.java | 2 +- .../org/apache/hudi/sink/bulk/RowDataKeyGens.java | 11 ++--- .../java/org/apache/hudi/sink/utils/Pipelines.java | 2 +- .../hudi/source/prune/PrimaryKeyPruners.java | 2 +- .../org/apache/hudi/table/HoodieTableFactory.java | 19 ++++++--- .../apache/hudi/table/catalog/HoodieCatalog.java | 6 ++- .../hudi/table/catalog/HoodieHiveCatalog.java | 2 +- .../java/org/apache/hudi/util/StreamerUtil.java | 1 - .../hudi/configuration/TestOptionsResolver.java | 27 +++++++++++++ .../apache/hudi/sink/bulk/TestRowDataKeyGens.java | 21 +++++++--- .../apache/hudi/table/ITTestHoodieDataSource.java | 8 ++-- .../apache/hudi/table/TestHoodieTableFactory.java | 40 ++++++++++++++++++ .../hudi/table/catalog/TestHoodieCatalog.java | 47 ++++++++++++++++++++++ .../org/apache/hudi/utils/TestConfigurations.java | 23 ++++++++--- 16 files changed, 202 insertions(+), 44 deletions(-) diff --git a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java index b38a85cba6c9..efb75be06254 100644 --- a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java +++ b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/keygen/TimestampBasedAvroKeyGenerator.java @@ -77,7 +77,7 @@ public class TimestampBasedAvroKeyGenerator extends SimpleAvroKeyGenerator { config.getString(KeyGeneratorOptions.PARTITIONPATH_FIELD_NAME.key())); } - TimestampBasedAvroKeyGenerator(TypedProperties config, String partitionPathField) throws IOException { + public TimestampBasedAvroKeyGenerator(TypedProperties config, String partitionPathField) throws IOException { this(config, null, partitionPathField); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java index 398ca329dbb4..49ccca803b38 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/OptionsResolver.java @@ -50,6 +50,8 @@ import org.apache.flink.api.common.functions.Partitioner; import org.apache.flink.configuration.ConfigOption; import org.apache.flink.configuration.Configuration; +import javax.annotation.Nullable; + import java.lang.reflect.Field; import java.util.ArrayList; import java.util.HashMap; @@ -154,16 +156,33 @@ public class OptionsResolver { } /** - * Return value of {@link FlinkOptions#RECORD_KEY_FIELD} if it was set, - * or throw exception otherwise. + * Return value of {@link FlinkOptions#RECORD_KEY_FIELD}, could be null if it is not set. */ + @Nullable public static String getRecordKeyStr(Configuration conf) { + return conf.get(FlinkOptions.RECORD_KEY_FIELD); + } + + /** + * Return the record keys as an array. + */ + public static String[] getRecordKeys(Configuration conf) { final String recordKeyStr = conf.get(FlinkOptions.RECORD_KEY_FIELD); - ValidationUtils.checkArgument( - recordKeyStr != null, - "Primary key definition is required, use either PRIMARY KEY syntax or option '" - + FlinkOptions.RECORD_KEY_FIELD.key() + "' to specify."); - return recordKeyStr; + if (StringUtils.isNullOrEmpty(recordKeyStr)) { + return new String[]{}; + } + return recordKeyStr.split(","); + } + + /** + * Return the bucket index keys as an array. + */ + public static String[] getBucketIndexKeys(Configuration conf) { + final String indexKeyStr = conf.get(FlinkOptions.INDEX_KEY_FIELD); + if (StringUtils.isNullOrEmpty(indexKeyStr)) { + return new String[]{}; + } + return indexKeyStr.split(","); } /** diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/AutoRowDataKeyGen.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/AutoRowDataKeyGen.java index 5bf5d944f7fa..ab3182406866 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/AutoRowDataKeyGen.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/AutoRowDataKeyGen.java @@ -59,7 +59,7 @@ public class AutoRowDataKeyGen extends RowDataKeyGen { Option<TimestampBasedAvroKeyGenerator> keyGeneratorOpt = Option.empty(); if (TimestampBasedAvroKeyGenerator.class.getName().equals(conf.get(FlinkOptions.KEYGEN_CLASS_NAME))) { try { - keyGeneratorOpt = Option.of(new TimestampBasedAvroKeyGenerator(StreamerUtil.flinkConf2TypedProperties(conf))); + keyGeneratorOpt = Option.of(new TimestampBasedAvroKeyGenerator(StreamerUtil.flinkConf2TypedProperties(conf), conf.get(FlinkOptions.PARTITION_PATH_FIELD))); } catch (IOException e) { throw new HoodieKeyException("Initialize TimestampBasedAvroKeyGenerator error", e); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGens.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGens.java index fc6128e6918d..f815ada7abcd 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGens.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/bulk/RowDataKeyGens.java @@ -26,8 +26,6 @@ import org.apache.flink.table.types.logical.RowType; import javax.annotation.Nullable; -import java.util.List; - /** * Factory class for all kinds of {@link RowDataKeyGen}. */ @@ -37,8 +35,8 @@ public class RowDataKeyGens { * Creates {@link RowDataKeyGen} of corresponding type depending on table configuration. */ public static RowDataKeyGen instance(Configuration conf, RowType rowType, @Nullable Integer taskId, @Nullable String instantTime) { - String recordKeys = OptionsResolver.getRecordKeyStr(conf); - if (hasRecordKey(recordKeys, rowType.getFieldNames())) { + String[] recordKeys = OptionsResolver.getRecordKeys(conf); + if (hasRecordKey(recordKeys)) { return RowDataKeyGen.instance(conf, rowType); } else { if (null == taskId || null == instantTime) { @@ -59,8 +57,7 @@ public class RowDataKeyGens { /** * Checks whether user provides any record key. */ - private static boolean hasRecordKey(String recordKeys, List<String> fieldNames) { - return recordKeys.split(",").length != 1 - || fieldNames.contains(recordKeys); + private static boolean hasRecordKey(String[] recordKeys) { + return recordKeys.length > 0; } } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java index 47d1f8e9e716..caffd52f0265 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/utils/Pipelines.java @@ -166,7 +166,7 @@ public class Pipelines { if (conf.get(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT)) { final boolean isNeededSortInput = conf.get(FlinkOptions.WRITE_BULK_INSERT_SORT_INPUT_BY_RECORD_KEY); final String[] partitionFields = FilePathUtils.extractPartitionKeys(conf); - final String[] recordKeyFields = OptionsResolver.getRecordKeyStr(conf).split(","); + final String[] recordKeyFields = OptionsResolver.getRecordKeys(conf); // if sort input by record key is needed then add record keys to partition keys String[] sortFields = isNeededSortInput diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PrimaryKeyPruners.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PrimaryKeyPruners.java index 46a71c41518f..45dea1455312 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PrimaryKeyPruners.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/source/prune/PrimaryKeyPruners.java @@ -43,7 +43,7 @@ import java.util.stream.Collectors; public class PrimaryKeyPruners { public static Function<Integer, Integer> getBucketIdFunc(List<ResolvedExpression> hashKeyFilters, Configuration conf) { - List<String> pkFields = Arrays.asList(OptionsResolver.getRecordKeyStr(conf).split(",")); + List<String> pkFields = Arrays.asList(OptionsResolver.getRecordKeys(conf)); // step1: resolve the hash key values final boolean logicalTimestamp = OptionsResolver.isConsistentLogicalTimestampEnabled(conf); List<String> values = hashKeyFilters.stream() diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java index a08f3f56aacb..85d1b72b2f22 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/HoodieTableFactory.java @@ -240,7 +240,11 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab private void checkRecordKey(Configuration conf, ResolvedSchema schema) { List<String> fields = schema.getColumnNames(); if (schema.getPrimaryKey().isEmpty()) { - String[] recordKeys = OptionsResolver.getRecordKeyStr(conf).split(","); + String[] recordKeys = OptionsResolver.getRecordKeys(conf); + if (recordKeys.length == 0) { + throw new HoodieValidationException("Primary key definition is required, use either PRIMARY KEY syntax or option '" + + FlinkOptions.RECORD_KEY_FIELD.key() + "' to specify."); + } Arrays.stream(recordKeys) .filter(field -> !fields.contains(field)) .findAny() @@ -293,7 +297,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab private static void setupHoodieKeyOptions(Configuration conf, CatalogTable table) { List<String> pkColumns = table.getSchema().getPrimaryKey() .map(pk -> pk.getColumns()).orElse(Collections.emptyList()); - if (pkColumns.size() > 0) { + if (!pkColumns.isEmpty()) { // the PRIMARY KEY syntax always has higher priority than option FlinkOptions#RECORD_KEY_FIELD String recordKey = String.join(",", pkColumns); conf.set(FlinkOptions.RECORD_KEY_FIELD, recordKey); @@ -309,12 +313,15 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab log.info("'{}' is not set, therefore '{}' value will be used as index key instead", FlinkOptions.INDEX_KEY_FIELD.key(), FlinkOptions.RECORD_KEY_FIELD.key()); - conf.set(FlinkOptions.INDEX_KEY_FIELD, OptionsResolver.getRecordKeyStr(conf)); + String recordKeyStr = OptionsResolver.getRecordKeyStr(conf); + if (StringUtils.nonEmpty(recordKeyStr)) { + conf.set(FlinkOptions.INDEX_KEY_FIELD, recordKeyStr); + } } else { Set<String> recordKeySet = - Arrays.stream(OptionsResolver.getRecordKeyStr(conf).split(",")).collect(Collectors.toSet()); + Arrays.stream(OptionsResolver.getRecordKeys(conf)).collect(Collectors.toSet()); Set<String> indexKeySet = - Arrays.stream(conf.get(FlinkOptions.INDEX_KEY_FIELD).split(",")).collect(Collectors.toSet()); + Arrays.stream(OptionsResolver.getBucketIndexKeys(conf)).collect(Collectors.toSet()); if (!recordKeySet.containsAll(indexKeySet)) { throw new HoodieValidationException( FlinkOptions.INDEX_KEY_FIELD + " should be a subset of or equal to the recordKey fields"); @@ -324,7 +331,7 @@ public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTab // tweak the key gen class if possible final String[] partitions = conf.get(FlinkOptions.PARTITION_PATH_FIELD).split(","); - final String[] pks = OptionsResolver.getRecordKeyStr(conf).split(","); + final String[] pks = OptionsResolver.getRecordKeys(conf); if (partitions.length == 1) { final String partitionField = partitions[0]; if (partitionField.isEmpty()) { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java index 065a8a00c276..a8bf340f64e2 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieCatalog.java @@ -314,7 +314,9 @@ public class HoodieCatalog extends AbstractCatalog { Configuration conf = Configuration.fromMap(options); conf.set(FlinkOptions.PATH, tablePathStr); ResolvedSchema resolvedSchema = resolvedTable.getResolvedSchema(); - if (!resolvedSchema.getPrimaryKey().isPresent() && !conf.containsKey(RECORD_KEY_FIELD.key())) { + if (!resolvedSchema.getPrimaryKey().isPresent() + && !conf.containsKey(RECORD_KEY_FIELD.key()) + && !OptionsResolver.isAppendMode(conf)) { throw new CatalogException("Primary key definition is missing"); } final String avroSchema = HoodieSchemaConverter.convertToSchema( @@ -350,7 +352,7 @@ public class HoodieCatalog extends AbstractCatalog { conf.set(FlinkOptions.PARTITION_PATH_FIELD, partitions); options.put(TableOptionProperties.PARTITION_COLUMNS, partitions); - final String[] pks = OptionsResolver.getRecordKeyStr(conf).split(","); + final String[] pks = OptionsResolver.getRecordKeys(conf); boolean complexHoodieKey = pks.length > 1 || resolvedTable.getPartitionKeys().size() > 1; StreamerUtil.checkKeygenGenerator(complexHoodieKey, conf); } else { diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java index 3caeb9565afb..8abe168eb6ea 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/table/catalog/HoodieHiveCatalog.java @@ -514,7 +514,7 @@ public class HoodieHiveCatalog extends AbstractCatalog { if (catalogTable.isPartitioned() && !flinkConf.contains(FlinkOptions.PARTITION_PATH_FIELD)) { final String partitions = String.join(",", catalogTable.getPartitionKeys()); flinkConf.set(FlinkOptions.PARTITION_PATH_FIELD, partitions); - final String[] pks = OptionsResolver.getRecordKeyStr(flinkConf).split(","); + final String[] pks = OptionsResolver.getRecordKeys(flinkConf); boolean complexHoodieKey = pks.length > 1 || catalogTable.getPartitionKeys().size() > 1; StreamerUtil.checkKeygenGenerator(complexHoodieKey, flinkConf); } diff --git a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java index ed5cbbc64ec8..f88d1a76a325 100644 --- a/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java +++ b/hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/util/StreamerUtil.java @@ -840,7 +840,6 @@ public class StreamerUtil { * * @param conf The Flink configuration * @param checkpointId The checkpoint ID - * @param checkpointClient The checkpoint client (nullable) * @return Kafka offset checkpoint string in URL-encoded format for Hudi metadata, * e.g., "kafka_metadata%3Atopic-name%3A0:100;kafka_metadata%3Atopic-name%3A1:200" * where format is "kafka_metadata%3Atopic%3Apartition:offset" separated by semicolons. diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java index 39a4ee93d7ec..5772a8a1098e 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/configuration/TestOptionsResolver.java @@ -30,8 +30,10 @@ import org.junit.jupiter.api.io.TempDir; import java.io.File; +import static org.junit.jupiter.api.Assertions.assertArrayEquals; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; /** @@ -52,6 +54,31 @@ public class TestOptionsResolver { assertEquals(HoodieIndex.IndexType.BLOOM, OptionsResolver.getIndexType(conf)); } + @Test + void testGetRecordKeys() { + Configuration conf = new Configuration(); + assertNull(OptionsResolver.getRecordKeyStr(conf)); + assertArrayEquals(new String[]{}, OptionsResolver.getRecordKeys(conf)); + + conf.set(FlinkOptions.RECORD_KEY_FIELD, ""); + assertArrayEquals(new String[]{}, OptionsResolver.getRecordKeys(conf)); + + conf.set(FlinkOptions.RECORD_KEY_FIELD, "uuid, name"); + assertArrayEquals(new String[]{"uuid", " name"}, OptionsResolver.getRecordKeys(conf)); + } + + @Test + void testGetBucketIndexKeys() { + Configuration conf = new Configuration(); + assertArrayEquals(new String[]{}, OptionsResolver.getBucketIndexKeys(conf)); + + conf.set(FlinkOptions.INDEX_KEY_FIELD, ""); + assertArrayEquals(new String[]{}, OptionsResolver.getBucketIndexKeys(conf)); + + conf.set(FlinkOptions.INDEX_KEY_FIELD, "uuid, name"); + assertArrayEquals(new String[]{"uuid", " name"}, OptionsResolver.getBucketIndexKeys(conf)); + } + @Test void testIsLazyFailedWritesCleanPolicy() { Configuration conf = new Configuration(); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGens.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGens.java index 0199aa74c14b..f82ed02d6fdf 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGens.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/sink/bulk/TestRowDataKeyGens.java @@ -226,7 +226,7 @@ public class TestRowDataKeyGens { @Test void testPrimaryKeylessWrite() { Configuration conf = TestConfigurations.getDefaultConf("path1"); - conf.set(FlinkOptions.RECORD_KEY_FIELD, ""); + conf.removeConfig(FlinkOptions.RECORD_KEY_FIELD); final RowData rowData1 = insertRow(StringData.fromString("id1"), StringData.fromString("Danny"), 23, TimestampData.fromEpochMillis(1), StringData.fromString("par1")); final int taskId = 3; @@ -252,7 +252,7 @@ public class TestRowDataKeyGens { final String instantTime = "000001"; Configuration conf = TestConfigurations.getDefaultConf("path1"); - conf.set(FlinkOptions.RECORD_KEY_FIELD, ""); + conf.removeConfig(FlinkOptions.RECORD_KEY_FIELD); conf.set(FlinkOptions.PARTITION_PATH_FIELD, "ts"); conf.set(FlinkOptions.PARTITION_FORMAT, partitionFormat); HoodieTableFactory.setupTimestampKeygenOptions(conf, DataTypes.TIMESTAMP(3)); @@ -284,9 +284,8 @@ public class TestRowDataKeyGens { } @Test - void testAutoKeyGenRecordKey() { + void testAutoKeyGenRecordKeyWithEmptyRecordKeyField() { Configuration conf = TestConfigurations.getDefaultConf("path1"); - // without record keys AutoRowDataKeyGen will be used, which expects taskId and instantTime parameters for instantiation conf.set(FlinkOptions.RECORD_KEY_FIELD, ""); int taskId = 1; @@ -296,11 +295,23 @@ public class TestRowDataKeyGens { assertThat(autoKeyGen.getRecordKey(TestData.DATA_SET_INSERT.get(1)), is(instantTime + "_" + taskId + "_1")); } + @Test + void testAutoKeyGenRecordKeyWithoutDeclaringRecordKeyField() { + Configuration conf = TestConfigurations.getDefaultConf("path1"); + conf.removeConfig(FlinkOptions.RECORD_KEY_FIELD); + + int taskId = 1; + String instantTime = "20250716145212986"; + final AutoRowDataKeyGen autoKeyGen = (AutoRowDataKeyGen) RowDataKeyGens.instance(conf, TestConfigurations.ROW_TYPE, taskId, instantTime); + assertThat(autoKeyGen.getRecordKey(TestData.DATA_SET_INSERT.get(0)), is(instantTime + "_" + taskId + "_0")); + assertThat(autoKeyGen.getRecordKey(TestData.DATA_SET_INSERT.get(1)), is(instantTime + "_" + taskId + "_1")); + } + @Test void testAutoKeyGenNotAllowNulls() { Configuration conf = TestConfigurations.getDefaultConf("path1"); // without record keys AutoRowDataKeyGen will be used, which expects taskId and instantTime parameters for instantiation - conf.set(FlinkOptions.RECORD_KEY_FIELD, ""); + conf.removeConfig(FlinkOptions.RECORD_KEY_FIELD); HoodieValidationException exNullInstant = assertThrows(HoodieValidationException.class, () -> RowDataKeyGens.instance(conf, TestConfigurations.ROW_TYPE, 1, null)); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java index 37d5e9169ff3..aa9fa49312ad 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/ITTestHoodieDataSource.java @@ -21,6 +21,7 @@ package org.apache.hudi.table; import org.apache.hudi.common.config.HoodieMetadataConfig; import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.model.DefaultHoodieRecordPayload; +import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.common.model.HoodieTableType; import org.apache.hudi.common.model.WriteOperationType; import org.apache.hudi.common.table.HoodieTableConfig; @@ -31,7 +32,6 @@ import org.apache.hudi.common.table.marker.MarkerType; import org.apache.hudi.common.table.timeline.HoodieTimeline; import org.apache.hudi.common.util.CollectionUtils; import org.apache.hudi.config.HoodieWriteConfig; -import org.apache.hudi.common.model.HoodieFileFormat; import org.apache.hudi.configuration.FlinkOptions; import org.apache.hudi.index.HoodieIndex; import org.apache.hudi.index.bucket.partition.PartitionBucketIndexUtils; @@ -416,7 +416,6 @@ public class ITTestHoodieDataSource { String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) - .options(getDefaultKeys()) .option(FlinkOptions.OPERATION, "insert") .option(FlinkOptions.READ_AS_STREAMING, true) .option(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true) @@ -446,7 +445,6 @@ public class ITTestHoodieDataSource { String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) - .options(getDefaultKeys()) .option(FlinkOptions.OPERATION, "insert") .option(FlinkOptions.CLUSTERING_SCHEDULE_ENABLED, true) .option(FlinkOptions.CLUSTERING_ASYNC_ENABLED, true) @@ -1651,9 +1649,9 @@ public class ITTestHoodieDataSource { String hoodieTableDDL = sql("hoodie_sink") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) - .options(getDefaultKeys()) .option(FlinkOptions.OPERATION, "insert") .option(FlinkOptions.INSERT_CLUSTER, clustering) + .option(FlinkOptions.RECORD_KEY_FIELD, clustering ? "uuid" : "") .end(); tableEnv.executeSql(hoodieTableDDL); @@ -3212,11 +3210,11 @@ public class ITTestHoodieDataSource { String hoodieTableDDL = sql("t1") .option(FlinkOptions.PATH, tempFile.getAbsolutePath()) - .options(getDefaultKeys()) .option(FlinkOptions.TABLE_TYPE, MERGE_ON_READ) .option(FlinkOptions.OPERATION, "insert") .option(FlinkOptions.WRITE_BUFFER_MEMORY_TYPE, BufferMemoryType.MANAGED) .option(FlinkOptions.WRITE_BUFFER_TYPE, bufferType.name()) + .option(FlinkOptions.RECORD_KEY_FIELD, "uuid") .end(); streamTableEnv.executeSql(hoodieTableDDL); diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java index 4d29503208dc..f594e9575860 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/TestHoodieTableFactory.java @@ -192,6 +192,46 @@ public class TestHoodieTableFactory { assertDoesNotThrow(() -> new HoodieTableFactory().createDynamicTableSink(sourceContext6)); } + @Test + void testAppendOnlySinkWithoutRecordKey() { + Configuration appendOnlyConf = new Configuration(); + appendOnlyConf.set(FlinkOptions.PATH, new File(tempFile, "append_only_without_record_key").getAbsolutePath()); + appendOnlyConf.set(FlinkOptions.TABLE_NAME, "append_only_without_record_key"); + appendOnlyConf.set(FlinkOptions.OPERATION, "insert"); + + ResolvedSchema schema = SchemaBuilder.instance() + .field("f0", DataTypes.INT()) + .field("f1", DataTypes.VARCHAR(20)) + .field("ts", DataTypes.TIMESTAMP(3)) + .build(); + MockContext context = MockContext.getInstance(appendOnlyConf, schema, ""); + + HoodieTableSink tableSink = (HoodieTableSink) new HoodieTableFactory().createDynamicTableSink(context); + assertNull(tableSink.getConf().get(FlinkOptions.RECORD_KEY_FIELD)); + assertThat(tableSink.getConf().get(FlinkOptions.ORDERING_FIELDS), is(FlinkOptions.NO_PRE_COMBINE)); + assertThat(tableSink.getConf().get(FlinkOptions.KEYGEN_CLASS_NAME), is(NonpartitionedAvroKeyGenerator.class.getName())); + } + + @Test + void testNonAppendSinkRequiresRecordKey() { + Configuration upsertConf = new Configuration(); + upsertConf.set(FlinkOptions.PATH, new File(tempFile, "upsert_without_record_key").getAbsolutePath()); + upsertConf.set(FlinkOptions.TABLE_NAME, "upsert_without_record_key"); + + ResolvedSchema schema = SchemaBuilder.instance() + .field("f0", DataTypes.INT()) + .field("f1", DataTypes.VARCHAR(20)) + .field("ts", DataTypes.TIMESTAMP(3)) + .build(); + MockContext context = MockContext.getInstance(upsertConf, schema, ""); + + HoodieValidationException exception = assertThrows( + HoodieValidationException.class, + () -> new HoodieTableFactory().createDynamicTableSink(context)); + assertThat(exception.getMessage(), is("Primary key definition is required, use either PRIMARY KEY syntax or option '" + + FlinkOptions.RECORD_KEY_FIELD.key() + "' to specify.")); + } + @Test void testIndexTypeCheck() { ResolvedSchema schema = SchemaBuilder.instance() diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java index ab8ce6f58317..9934d8a4e160 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/table/catalog/TestHoodieCatalog.java @@ -333,6 +333,53 @@ public class TestHoodieCatalog extends BaseTestHoodieCatalog { assertEquals(keyGeneratorClassName, NonpartitionedAvroKeyGenerator.class.getName()); } + @Test + public void testCreateAppendOnlyTableWithoutRecordKey() throws Exception { + ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb_append_only_without_record_key"); + ResolvedSchema schemaWithoutPrimaryKey = new ResolvedSchema( + CREATE_COLUMNS, + Collections.emptyList(), + null); + Map<String, String> options = new HashMap<>(EXPECTED_OPTIONS); + options.put(FlinkOptions.OPERATION.key(), "insert"); + ResolvedCatalogTable catalogTable = new ResolvedCatalogTable( + CatalogUtils.createCatalogTable( + Schema.newBuilder().fromResolvedSchema(schemaWithoutPrimaryKey).build(), + Arrays.asList("partition"), + options, + "test"), + schemaWithoutPrimaryKey + ); + + catalog.createTable(tablePath, catalogTable, false); + + HoodieTableMetaClient metaClient = createMetaClient( + new HadoopStorageConfiguration(HadoopConfigurations.getHadoopConf(new Configuration())), + catalog.inferTablePath(catalogPathStr, tablePath)); + assertFalse(metaClient.getTableConfig().getRecordKeyFields().isPresent()); + assertEquals(SimpleAvroKeyGenerator.class.getName(), metaClient.getTableConfig().getKeyGeneratorClassName()); + } + + @Test + public void testCreateNonAppendTableWithoutRecordKey() { + ObjectPath tablePath = new ObjectPath(TEST_DEFAULT_DATABASE, "tb_non_append_without_record_key"); + ResolvedSchema schemaWithoutPrimaryKey = new ResolvedSchema( + CREATE_COLUMNS, + Collections.emptyList(), + null); + ResolvedCatalogTable catalogTable = new ResolvedCatalogTable( + CatalogUtils.createCatalogTable( + Schema.newBuilder().fromResolvedSchema(schemaWithoutPrimaryKey).build(), + Arrays.asList("partition"), + EXPECTED_OPTIONS, + "test"), + schemaWithoutPrimaryKey + ); + + CatalogException exception = assertThrows(CatalogException.class, () -> catalog.createTable(tablePath, catalogTable, false)); + assertEquals("Primary key definition is missing", exception.getMessage()); + } + @Test void testCreateTableWithPartitionBucketIndex() throws TableAlreadyExistException, DatabaseNotExistException, IOException { String rule = "regex"; diff --git a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java index e2a4558e0ee3..60b6fad6c8cb 100644 --- a/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java +++ b/hudi-flink-datasource/hudi-flink/src/test/java/org/apache/hudi/utils/TestConfigurations.java @@ -241,11 +241,18 @@ public class TestConfigurations { String partitionField) { StringBuilder builder = new StringBuilder(); builder.append("create table ").append(tableName).append("(\n"); - for (String field : fields) { - builder.append(" ").append(field).append(",\n"); + for (int i = 0; i < fields.size(); i++) { + builder.append(" ").append(fields.get(i)); + if (i == fields.size() - 1 && pkField == null) { + builder.append(")\n"); + } else { + builder.append(",\n"); + } + } + if (pkField != null) { + builder.append(" PRIMARY KEY(").append(pkField).append(") NOT ENFORCED\n") + .append(")\n"); } - builder.append(" PRIMARY KEY(").append(pkField).append(") NOT ENFORCED\n") - .append(")\n"); if (havePartition) { builder.append("PARTITIONED BY (`").append(partitionField).append("`)\n"); } @@ -420,7 +427,7 @@ public class TestConfigurations { private final String tableName; private List<String> fields = new ArrayList<>(); private boolean withPartition = true; - private String pkField = "uuid"; + private String pkField = null; private String partitionField = "partition"; public Sql(String tableName) { @@ -464,9 +471,13 @@ public class TestConfigurations { } public String end() { - if (this.fields.size() == 0) { + if (this.fields.isEmpty()) { this.fields = FIELDS; } + if (!"insert".equalsIgnoreCase(options.get(FlinkOptions.OPERATION.key())) && this.pkField == null) { + // assign default pk for upsert table + this.pkField = "uuid"; + } return TestConfigurations.getCreateHoodieTableDDL(this.tableName, this.fields, options, this.withPartition, this.pkField, this.partitionField); }
