This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 7a7373bf91 [core] Fix the issue where aggregate function columns 
without defining sequenceGroup cause the table to be unavailable (#7046)
7a7373bf91 is described below

commit 7a7373bf911f55e770c2242c4cca87fc6ce3472f
Author: zhangdove <[email protected]>
AuthorDate: Sat Jan 17 21:53:29 2026 +0800

    [core] Fix the issue where aggregate function columns without defining 
sequenceGroup cause the table to be unavailable (#7046)
---
 .../org/apache/paimon/schema/SchemaValidation.java | 41 +++++++++++++++++++++-
 .../apache/paimon/schema/SchemaValidationTest.java | 13 +++++++
 2 files changed, 53 insertions(+), 1 deletion(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 1378ecc586..e69ba193b7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -26,6 +26,7 @@ import org.apache.paimon.factories.FactoryUtil;
 import org.apache.paimon.format.FileFormat;
 import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
 import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory;
+import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastNonNullValueAggFactory;
 import org.apache.paimon.options.ConfigOption;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.BucketMode;
@@ -450,11 +451,15 @@ public class SchemaValidation {
 
     private static void validateSequenceGroup(TableSchema schema, CoreOptions 
options) {
         Map<String, Set<String>> fields2Group = new HashMap<>();
+        Set<Integer> sequenceGroupFieldIndexs = new HashSet<>();
+        List<String> fieldNames = schema.fieldNames();
         for (Map.Entry<String, String> entry : options.toMap().entrySet()) {
             String k = entry.getKey();
             String v = entry.getValue();
-            List<String> fieldNames = schema.fieldNames();
             if (k.startsWith(FIELDS_PREFIX) && k.endsWith(SEQUENCE_GROUP)) {
+                Arrays.stream(v.split(FIELDS_SEPARATOR))
+                        .map(fieldName -> requireField(fieldName, fieldNames))
+                        .forEach(sequenceGroupFieldIndexs::add);
                 String[] sequenceFieldNames =
                         k.substring(
                                         FIELDS_PREFIX.length() + 1,
@@ -492,8 +497,33 @@ public class SchemaValidation {
                     Set<String> group = fields2Group.computeIfAbsent(field, p 
-> new HashSet<>());
                     group.addAll(sequenceFieldsList);
                 }
+
+                // add self
+                Arrays.stream(sequenceFieldNames)
+                        .mapToInt(fieldName -> requireField(fieldName, 
fieldNames))
+                        .forEach(sequenceGroupFieldIndexs::add);
             }
         }
+
+        if (options.mergeEngine() == MergeEngine.PARTIAL_UPDATE) {
+            for (String fieldName : fieldNames) {
+                String aggFunc = options.fieldAggFunc(fieldName);
+                String aggFuncName = aggFunc == null ? 
options.fieldsDefaultFunc() : aggFunc;
+                if (schema.primaryKeys().contains(fieldName)) {
+                    continue;
+                }
+                if (aggFuncName != null) {
+                    // last_non_null_value doesn't require sequence group
+                    checkArgument(
+                            
aggFuncName.equals(FieldLastNonNullValueAggFactory.NAME)
+                                    || sequenceGroupFieldIndexs.contains(
+                                            fieldNames.indexOf(fieldName)),
+                            "Must use sequence group for aggregation functions 
but not found for field %s.",
+                            fieldName);
+                }
+            }
+        }
+
         Set<String> illegalGroup =
                 fields2Group.values().stream()
                         .flatMap(Collection::stream)
@@ -689,6 +719,15 @@ public class SchemaValidation {
         }
     }
 
+    private static int requireField(String fieldName, List<String> fieldNames) 
{
+        int field = fieldNames.indexOf(fieldName);
+        if (field == -1) {
+            throw new IllegalArgumentException(
+                    String.format("Field %s can not be found in table 
schema.", fieldName));
+        }
+        return field;
+    }
+
     public static void validateChainTable(TableSchema schema, CoreOptions 
options) {
         if (options.isChainTable()) {
             boolean isPrimaryTbl = schema.primaryKeys() != null && 
!schema.primaryKeys().isEmpty();
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
index 57c659e5c7..1af8c83346 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
@@ -151,4 +151,17 @@ class SchemaValidationTest {
         assertThatThrownBy(() -> validateBlobSchema(options, 
Collections.singletonList("f2")))
                 .hasMessage("The BLOB type column can not be part of partition 
keys.");
     }
+
+    @Test
+    public void testPartialUpdateTableAggregateFunctionWithoutSequenceGroup() {
+        Map<String, String> options = new HashMap<>(2);
+        options.put("merge-engine", "partial-update");
+        options.put("fields.f3.aggregate-function", "max");
+        assertThatThrownBy(() -> validateTableSchemaExec(options))
+                .hasMessageContaining(
+                        "Must use sequence group for aggregation functions but 
not found for field");
+
+        options.put("fields.f2.sequence-group", "f3");
+        assertThatCode(() -> 
validateTableSchemaExec(options)).doesNotThrowAnyException();
+    }
 }

Reply via email to