This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7a7373bf91 [core] Fix the issue where aggregate function columns
without defining sequenceGroup cause the table to be unavailable (#7046)
7a7373bf91 is described below
commit 7a7373bf911f55e770c2242c4cca87fc6ce3472f
Author: zhangdove <[email protected]>
AuthorDate: Sat Jan 17 21:53:29 2026 +0800
[core] Fix the issue where aggregate function columns without defining
sequenceGroup cause the table to be unavailable (#7046)
---
.../org/apache/paimon/schema/SchemaValidation.java | 41 +++++++++++++++++++++-
.../apache/paimon/schema/SchemaValidationTest.java | 13 +++++++
2 files changed, 53 insertions(+), 1 deletion(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
index 1378ecc586..e69ba193b7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaValidation.java
@@ -26,6 +26,7 @@ import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.format.FileFormat;
import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory;
+import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastNonNullValueAggFactory;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.BucketMode;
@@ -450,11 +451,15 @@ public class SchemaValidation {
private static void validateSequenceGroup(TableSchema schema, CoreOptions
options) {
Map<String, Set<String>> fields2Group = new HashMap<>();
+ Set<Integer> sequenceGroupFieldIndexs = new HashSet<>();
+ List<String> fieldNames = schema.fieldNames();
for (Map.Entry<String, String> entry : options.toMap().entrySet()) {
String k = entry.getKey();
String v = entry.getValue();
- List<String> fieldNames = schema.fieldNames();
if (k.startsWith(FIELDS_PREFIX) && k.endsWith(SEQUENCE_GROUP)) {
+ Arrays.stream(v.split(FIELDS_SEPARATOR))
+ .map(fieldName -> requireField(fieldName, fieldNames))
+ .forEach(sequenceGroupFieldIndexs::add);
String[] sequenceFieldNames =
k.substring(
FIELDS_PREFIX.length() + 1,
@@ -492,8 +497,33 @@ public class SchemaValidation {
Set<String> group = fields2Group.computeIfAbsent(field, p
-> new HashSet<>());
group.addAll(sequenceFieldsList);
}
+
+ // add self
+ Arrays.stream(sequenceFieldNames)
+ .mapToInt(fieldName -> requireField(fieldName,
fieldNames))
+ .forEach(sequenceGroupFieldIndexs::add);
}
}
+
+ if (options.mergeEngine() == MergeEngine.PARTIAL_UPDATE) {
+ for (String fieldName : fieldNames) {
+ String aggFunc = options.fieldAggFunc(fieldName);
+ String aggFuncName = aggFunc == null ?
options.fieldsDefaultFunc() : aggFunc;
+ if (schema.primaryKeys().contains(fieldName)) {
+ continue;
+ }
+ if (aggFuncName != null) {
+ // last_non_null_value doesn't require sequence group
+ checkArgument(
+
aggFuncName.equals(FieldLastNonNullValueAggFactory.NAME)
+ || sequenceGroupFieldIndexs.contains(
+ fieldNames.indexOf(fieldName)),
+ "Must use sequence group for aggregation functions
but not found for field %s.",
+ fieldName);
+ }
+ }
+ }
+
Set<String> illegalGroup =
fields2Group.values().stream()
.flatMap(Collection::stream)
@@ -689,6 +719,15 @@ public class SchemaValidation {
}
}
+ private static int requireField(String fieldName, List<String> fieldNames)
{
+ int field = fieldNames.indexOf(fieldName);
+ if (field == -1) {
+ throw new IllegalArgumentException(
+ String.format("Field %s can not be found in table
schema.", fieldName));
+ }
+ return field;
+ }
+
public static void validateChainTable(TableSchema schema, CoreOptions
options) {
if (options.isChainTable()) {
boolean isPrimaryTbl = schema.primaryKeys() != null &&
!schema.primaryKeys().isEmpty();
diff --git
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
index 57c659e5c7..1af8c83346 100644
---
a/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/schema/SchemaValidationTest.java
@@ -151,4 +151,17 @@ class SchemaValidationTest {
assertThatThrownBy(() -> validateBlobSchema(options,
Collections.singletonList("f2")))
.hasMessage("The BLOB type column can not be part of partition
keys.");
}
+
+ @Test
+ public void testPartialUpdateTableAggregateFunctionWithoutSequenceGroup() {
+ Map<String, String> options = new HashMap<>(2);
+ options.put("merge-engine", "partial-update");
+ options.put("fields.f3.aggregate-function", "max");
+ assertThatThrownBy(() -> validateTableSchemaExec(options))
+ .hasMessageContaining(
+ "Must use sequence group for aggregation functions but
not found for field");
+
+ options.put("fields.f2.sequence-group", "f3");
+ assertThatCode(() ->
validateTableSchemaExec(options)).doesNotThrowAnyException();
+ }
}