This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 214dcc507e [core] Validate merge function creation in SchemaValidation
214dcc507e is described below
commit 214dcc507ec9167e1e6585a2ee492bc852def8b2
Author: JingsongLi <[email protected]>
AuthorDate: Sat Jan 17 22:06:43 2026 +0800
[core] Validate merge function creation in SchemaValidation
---
.../org/apache/paimon/schema/SchemaValidation.java | 97 ++--------------------
.../paimon/table/PrimaryKeySimpleTableTest.java | 25 +++---
.../apache/paimon/flink/PartialUpdateITCase.java | 10 +--
3 files changed, 25 insertions(+), 107 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index e69ba193b7..cceae0567e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -26,7 +26,6 @@ import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory;
-import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastNonNullValueAggFactory;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.BucketMode;
@@ -44,16 +43,12 @@ import org.apache.paimon.types.TimestampType;
import org.apache.paimon.utils.Preconditions;
import org.apache.paimon.utils.StringUtils;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
-import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.BUCKET_KEY;
@@ -77,7 +72,7 @@ import static org.apache.paimon.CoreOptions.SCAN_WATERMARK;
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
import static org.apache.paimon.CoreOptions.STREAMING_READ_OVERWRITE;
-import static
org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction.SEQUENCE_GROUP;
+import static
org.apache.paimon.table.PrimaryKeyTableUtils.createMergeFunctionFactory;
import static org.apache.paimon.table.SpecialFields.KEY_FIELD_PREFIX;
import static org.apache.paimon.table.SpecialFields.SYSTEM_FIELD_NAMES;
import static org.apache.paimon.types.DataTypeRoot.ARRAY;
@@ -96,8 +91,6 @@ public class SchemaValidation {
/**
* Validate the {@link TableSchema} and {@link CoreOptions}.
*
- * <p>TODO validate all items in schema and all keys in options.
- *
* @param schema the schema to be validated
*/
public static void validateTableSchema(TableSchema schema) {
@@ -122,7 +115,7 @@ public class SchemaValidation {
validateSequenceField(schema, options);
- validateSequenceGroup(schema, options);
+ validateMergeFunction(schema);
ChangelogProducer changelogProducer = options.changelogProducer();
if (schema.primaryKeys().isEmpty() && changelogProducer !=
ChangelogProducer.NONE) {
@@ -449,90 +442,12 @@ public class SchemaValidation {
});
}
- private static void validateSequenceGroup(TableSchema schema, CoreOptions
options) {
- Map<String, Set<String>> fields2Group = new HashMap<>();
- Set<Integer> sequenceGroupFieldIndexs = new HashSet<>();
- List<String> fieldNames = schema.fieldNames();
- for (Map.Entry<String, String> entry : options.toMap().entrySet()) {
- String k = entry.getKey();
- String v = entry.getValue();
- if (k.startsWith(FIELDS_PREFIX) && k.endsWith(SEQUENCE_GROUP)) {
- Arrays.stream(v.split(FIELDS_SEPARATOR))
- .map(fieldName -> requireField(fieldName, fieldNames))
- .forEach(sequenceGroupFieldIndexs::add);
- String[] sequenceFieldNames =
- k.substring(
- FIELDS_PREFIX.length() + 1,
- k.length() - SEQUENCE_GROUP.length() -
1)
- .split(FIELDS_SEPARATOR);
-
- for (String field : v.split(FIELDS_SEPARATOR)) {
- if (!fieldNames.contains(field)) {
- throw new IllegalArgumentException(
- String.format("Field %s can not be found in
table schema.", field));
- }
-
- List<String> sequenceFieldsList = new ArrayList<>();
- for (String sequenceFieldName : sequenceFieldNames) {
- if (!fieldNames.contains(sequenceFieldName)) {
- throw new IllegalArgumentException(
- String.format(
- "The sequence field group: %s can
not be found in table schema.",
- sequenceFieldName));
- }
- sequenceFieldsList.add(sequenceFieldName);
- }
-
- if (fields2Group.containsKey(field)) {
- List<List<String>> sequenceGroups = new ArrayList<>();
- sequenceGroups.add(new
ArrayList<>(fields2Group.get(field)));
- sequenceGroups.add(sequenceFieldsList);
-
- throw new IllegalArgumentException(
- String.format(
- "Field %s is defined repeatedly by
multiple groups: %s.",
- field, sequenceGroups));
- }
-
- Set<String> group = fields2Group.computeIfAbsent(field, p
-> new HashSet<>());
- group.addAll(sequenceFieldsList);
- }
-
- // add self
- Arrays.stream(sequenceFieldNames)
- .mapToInt(fieldName -> requireField(fieldName,
fieldNames))
- .forEach(sequenceGroupFieldIndexs::add);
- }
- }
-
- if (options.mergeEngine() == MergeEngine.PARTIAL_UPDATE) {
- for (String fieldName : fieldNames) {
- String aggFunc = options.fieldAggFunc(fieldName);
- String aggFuncName = aggFunc == null ?
options.fieldsDefaultFunc() : aggFunc;
- if (schema.primaryKeys().contains(fieldName)) {
- continue;
- }
- if (aggFuncName != null) {
- // last_non_null_value doesn't require sequence group
- checkArgument(
-
aggFuncName.equals(FieldLastNonNullValueAggFactory.NAME)
- || sequenceGroupFieldIndexs.contains(
- fieldNames.indexOf(fieldName)),
- "Must use sequence group for aggregation functions
but not found for field %s.",
- fieldName);
- }
- }
+ private static void validateMergeFunction(TableSchema schema) {
+ if (schema.primaryKeys().isEmpty()) {
+ return;
}
- Set<String> illegalGroup =
- fields2Group.values().stream()
- .flatMap(Collection::stream)
- .filter(g -> options.fieldAggFunc(g) != null)
- .collect(Collectors.toSet());
- if (!illegalGroup.isEmpty()) {
- throw new IllegalArgumentException(
- "Should not defined aggregation function on sequence
group: " + illegalGroup);
- }
+ createMergeFunctionFactory(schema);
}
private static void validateForDeletionVectors(CoreOptions options) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index ef0ab9de24..d920cd0a07 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -1711,18 +1711,19 @@ public class PrimaryKeySimpleTableTest extends
SimpleTableTestBase {
options.set("partial-update.remove-record-on-sequence-group", "seq2");
},
rowType);
- FileStoreTable wrongTable =
- createFileStoreTable(
- options -> {
- options.set("merge-engine", "partial-update");
- options.set("fields.seq1.sequence-group", "b");
- options.set("fields.seq2.sequence-group", "c,d");
-
options.set("partial-update.remove-record-on-sequence-group", "b");
- },
- rowType);
- Function<InternalRow, String> rowToString = row ->
internalRowToString(row, rowType);
- assertThatThrownBy(() -> wrongTable.newWrite(""))
+ assertThatThrownBy(
+ () ->
+ createFileStoreTable(
+ options -> {
+ options.set("merge-engine",
"partial-update");
+
options.set("fields.seq1.sequence-group", "b");
+
options.set("fields.seq2.sequence-group", "c,d");
+ options.set(
+
"partial-update.remove-record-on-sequence-group",
+ "b");
+ },
+ rowType))
.hasMessageContaining(
"field 'b' defined in
'partial-update.remove-record-on-sequence-group' option must be part of
sequence groups");
@@ -1730,6 +1731,8 @@ public class PrimaryKeySimpleTableTest extends
SimpleTableTestBase {
TableRead read = table.newRead();
StreamTableWrite write = table.newWrite("");
StreamTableCommit commit = table.newCommit("");
+ Function<InternalRow, String> rowToString = row ->
internalRowToString(row, rowType);
+
// 1. Inserts
write.write(GenericRow.of(1, 1, 10, 1, 20, 20, 1));
write.write(GenericRow.of(1, 1, 11, 2, 25, 25, 0));
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
index 2da837d2f4..01e0b6c11b 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
@@ -304,7 +304,7 @@ public class PartialUpdateITCase extends CatalogITCaseBase {
+
"'merge-engine'='partial-update', "
+
"'fields.g_1.sequence-group'='a1,b', "
+
"'fields.g_2.sequence-group'='c,d');"))
- .hasRootCauseMessage("Field a1 can not be found in table
schema.");
+ .hasRootCauseMessage("Field a1 can not be found in table
schema");
Assertions.assertThatThrownBy(
() ->
@@ -315,8 +315,8 @@ public class PartialUpdateITCase extends CatalogITCaseBase {
+
"'merge-engine'='partial-update', "
+
"'fields.g_1.sequence-group'='a,b', "
+
"'fields.g_2.sequence-group'='a,d');"))
- .hasRootCauseMessage(
- "Field a is defined repeatedly by multiple groups:
[[g_1], [g_2]].");
+ .rootCause()
+ .hasMessageContaining("Field a is defined repeatedly by
multiple groups");
Assertions.assertThatThrownBy(
() ->
@@ -327,8 +327,8 @@ public class PartialUpdateITCase extends CatalogITCaseBase {
+
"'merge-engine'='partial-update', "
+
"'fields.g_1.sequence-group'='a,b', "
+
"'fields.g_2,g_3.sequence-group'='a,d');"))
- .hasRootCauseMessage(
- "Field a is defined repeatedly by multiple groups:
[[g_1], [g_2, g_3]].");
+ .rootCause()
+ .hasMessageContaining("Field a is defined repeatedly by
multiple groups");
}
@Test