This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 214dcc507e [core] Validate merge function creation in SchemaValidation
214dcc507e is described below

commit 214dcc507ec9167e1e6585a2ee492bc852def8b2
Author: JingsongLi <[email protected]>
AuthorDate: Sat Jan 17 22:06:43 2026 +0800

    [core] Validate merge function creation in SchemaValidation
---
 .../org/apache/paimon/schema/SchemaValidation.java | 97 ++--------------------
 .../paimon/table/PrimaryKeySimpleTableTest.java    | 25 +++---
 .../apache/paimon/flink/PartialUpdateITCase.java   | 10 +--
 3 files changed, 25 insertions(+), 107 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index e69ba193b7..cceae0567e 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -26,7 +26,6 @@ import org.apache.paimon.factories.FactoryUtil;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
 import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory;
-import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastNonNullValueAggFactory;
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.BucketMode;
@@ -44,16 +43,12 @@ import org.apache.paimon.types.TimestampType;
 import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.StringUtils;
 
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
-import java.util.Set;
 import java.util.stream.Collectors;
 
 import static org.apache.paimon.CoreOptions.BUCKET_KEY;
@@ -77,7 +72,7 @@ import static org.apache.paimon.CoreOptions.SCAN_WATERMARK;
 import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MAX;
 import static org.apache.paimon.CoreOptions.SNAPSHOT_NUM_RETAINED_MIN;
 import static org.apache.paimon.CoreOptions.STREAMING_READ_OVERWRITE;
-import static 
org.apache.paimon.mergetree.compact.PartialUpdateMergeFunction.SEQUENCE_GROUP;
+import static 
org.apache.paimon.table.PrimaryKeyTableUtils.createMergeFunctionFactory;
 import static org.apache.paimon.table.SpecialFields.KEY_FIELD_PREFIX;
 import static org.apache.paimon.table.SpecialFields.SYSTEM_FIELD_NAMES;
 import static org.apache.paimon.types.DataTypeRoot.ARRAY;
@@ -96,8 +91,6 @@ public class SchemaValidation {
     /**
      * Validate the {@link TableSchema} and {@link CoreOptions}.
      *
-     * <p>TODO validate all items in schema and all keys in options.
-     *
      * @param schema the schema to be validated
      */
     public static void validateTableSchema(TableSchema schema) {
@@ -122,7 +115,7 @@ public class SchemaValidation {
 
         validateSequenceField(schema, options);
 
-        validateSequenceGroup(schema, options);
+        validateMergeFunction(schema);
 
         ChangelogProducer changelogProducer = options.changelogProducer();
         if (schema.primaryKeys().isEmpty() && changelogProducer != 
ChangelogProducer.NONE) {
@@ -449,90 +442,12 @@ public class SchemaValidation {
                         });
     }
 
-    private static void validateSequenceGroup(TableSchema schema, CoreOptions 
options) {
-        Map<String, Set<String>> fields2Group = new HashMap<>();
-        Set<Integer> sequenceGroupFieldIndexs = new HashSet<>();
-        List<String> fieldNames = schema.fieldNames();
-        for (Map.Entry<String, String> entry : options.toMap().entrySet()) {
-            String k = entry.getKey();
-            String v = entry.getValue();
-            if (k.startsWith(FIELDS_PREFIX) && k.endsWith(SEQUENCE_GROUP)) {
-                Arrays.stream(v.split(FIELDS_SEPARATOR))
-                        .map(fieldName -> requireField(fieldName, fieldNames))
-                        .forEach(sequenceGroupFieldIndexs::add);
-                String[] sequenceFieldNames =
-                        k.substring(
-                                        FIELDS_PREFIX.length() + 1,
-                                        k.length() - SEQUENCE_GROUP.length() - 
1)
-                                .split(FIELDS_SEPARATOR);
-
-                for (String field : v.split(FIELDS_SEPARATOR)) {
-                    if (!fieldNames.contains(field)) {
-                        throw new IllegalArgumentException(
-                                String.format("Field %s can not be found in 
table schema.", field));
-                    }
-
-                    List<String> sequenceFieldsList = new ArrayList<>();
-                    for (String sequenceFieldName : sequenceFieldNames) {
-                        if (!fieldNames.contains(sequenceFieldName)) {
-                            throw new IllegalArgumentException(
-                                    String.format(
-                                            "The sequence field group: %s can 
not be found in table schema.",
-                                            sequenceFieldName));
-                        }
-                        sequenceFieldsList.add(sequenceFieldName);
-                    }
-
-                    if (fields2Group.containsKey(field)) {
-                        List<List<String>> sequenceGroups = new ArrayList<>();
-                        sequenceGroups.add(new 
ArrayList<>(fields2Group.get(field)));
-                        sequenceGroups.add(sequenceFieldsList);
-
-                        throw new IllegalArgumentException(
-                                String.format(
-                                        "Field %s is defined repeatedly by 
multiple groups: %s.",
-                                        field, sequenceGroups));
-                    }
-
-                    Set<String> group = fields2Group.computeIfAbsent(field, p 
-> new HashSet<>());
-                    group.addAll(sequenceFieldsList);
-                }
-
-                // add self
-                Arrays.stream(sequenceFieldNames)
-                        .mapToInt(fieldName -> requireField(fieldName, 
fieldNames))
-                        .forEach(sequenceGroupFieldIndexs::add);
-            }
-        }
-
-        if (options.mergeEngine() == MergeEngine.PARTIAL_UPDATE) {
-            for (String fieldName : fieldNames) {
-                String aggFunc = options.fieldAggFunc(fieldName);
-                String aggFuncName = aggFunc == null ? 
options.fieldsDefaultFunc() : aggFunc;
-                if (schema.primaryKeys().contains(fieldName)) {
-                    continue;
-                }
-                if (aggFuncName != null) {
-                    // last_non_null_value doesn't require sequence group
-                    checkArgument(
-                            
aggFuncName.equals(FieldLastNonNullValueAggFactory.NAME)
-                                    || sequenceGroupFieldIndexs.contains(
-                                            fieldNames.indexOf(fieldName)),
-                            "Must use sequence group for aggregation functions 
but not found for field %s.",
-                            fieldName);
-                }
-            }
+    private static void validateMergeFunction(TableSchema schema) {
+        if (schema.primaryKeys().isEmpty()) {
+            return;
         }
 
-        Set<String> illegalGroup =
-                fields2Group.values().stream()
-                        .flatMap(Collection::stream)
-                        .filter(g -> options.fieldAggFunc(g) != null)
-                        .collect(Collectors.toSet());
-        if (!illegalGroup.isEmpty()) {
-            throw new IllegalArgumentException(
-                    "Should not defined aggregation function on sequence 
group: " + illegalGroup);
-        }
+        createMergeFunctionFactory(schema);
     }
 
     private static void validateForDeletionVectors(CoreOptions options) {
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
index ef0ab9de24..d920cd0a07 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeySimpleTableTest.java
@@ -1711,18 +1711,19 @@ public class PrimaryKeySimpleTableTest extends 
SimpleTableTestBase {
                             
options.set("partial-update.remove-record-on-sequence-group", "seq2");
                         },
                         rowType);
-        FileStoreTable wrongTable =
-                createFileStoreTable(
-                        options -> {
-                            options.set("merge-engine", "partial-update");
-                            options.set("fields.seq1.sequence-group", "b");
-                            options.set("fields.seq2.sequence-group", "c,d");
-                            
options.set("partial-update.remove-record-on-sequence-group", "b");
-                        },
-                        rowType);
-        Function<InternalRow, String> rowToString = row -> 
internalRowToString(row, rowType);
 
-        assertThatThrownBy(() -> wrongTable.newWrite(""))
+        assertThatThrownBy(
+                        () ->
+                                createFileStoreTable(
+                                        options -> {
+                                            options.set("merge-engine", 
"partial-update");
+                                            
options.set("fields.seq1.sequence-group", "b");
+                                            
options.set("fields.seq2.sequence-group", "c,d");
+                                            options.set(
+                                                    
"partial-update.remove-record-on-sequence-group",
+                                                    "b");
+                                        },
+                                        rowType))
                 .hasMessageContaining(
                         "field 'b' defined in 
'partial-update.remove-record-on-sequence-group' option must be part of 
sequence groups");
 
@@ -1730,6 +1731,8 @@ public class PrimaryKeySimpleTableTest extends 
SimpleTableTestBase {
         TableRead read = table.newRead();
         StreamTableWrite write = table.newWrite("");
         StreamTableCommit commit = table.newCommit("");
+        Function<InternalRow, String> rowToString = row -> 
internalRowToString(row, rowType);
+
         // 1. Inserts
         write.write(GenericRow.of(1, 1, 10, 1, 20, 20, 1));
         write.write(GenericRow.of(1, 1, 11, 2, 25, 25, 0));
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
index 2da837d2f4..01e0b6c11b 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PartialUpdateITCase.java
@@ -304,7 +304,7 @@ public class PartialUpdateITCase extends CatalogITCaseBase {
                                                 + 
"'merge-engine'='partial-update', "
                                                 + 
"'fields.g_1.sequence-group'='a1,b', "
                                                 + 
"'fields.g_2.sequence-group'='c,d');"))
-                .hasRootCauseMessage("Field a1 can not be found in table 
schema.");
+                .hasRootCauseMessage("Field a1 can not be found in table 
schema");
 
         Assertions.assertThatThrownBy(
                         () ->
@@ -315,8 +315,8 @@ public class PartialUpdateITCase extends CatalogITCaseBase {
                                                 + 
"'merge-engine'='partial-update', "
                                                 + 
"'fields.g_1.sequence-group'='a,b', "
                                                 + 
"'fields.g_2.sequence-group'='a,d');"))
-                .hasRootCauseMessage(
-                        "Field a is defined repeatedly by multiple groups: 
[[g_1], [g_2]].");
+                .rootCause()
+                .hasMessageContaining("Field a is defined repeatedly by 
multiple groups");
 
         Assertions.assertThatThrownBy(
                         () ->
@@ -327,8 +327,8 @@ public class PartialUpdateITCase extends CatalogITCaseBase {
                                                 + 
"'merge-engine'='partial-update', "
                                                 + 
"'fields.g_1.sequence-group'='a,b', "
                                                 + 
"'fields.g_2,g_3.sequence-group'='a,d');"))
-                .hasRootCauseMessage(
-                        "Field a is defined repeatedly by multiple groups: 
[[g_1], [g_2, g_3]].");
+                .rootCause()
+                .hasMessageContaining("Field a is defined repeatedly by 
multiple groups");
     }
 
     @Test

Reply via email to