This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9339ee633 [core] Refactor the Field Aggregator factory to validate 
data types when creating function (#4446)
9339ee633 is described below

commit 9339ee6339e3ff5e1c336c983776415db21b42ce
Author: Kerwin <[email protected]>
AuthorDate: Tue Nov 5 16:23:19 2024 +0800

    [core] Refactor the Field Aggregator factory to validate data types when 
creating function (#4446)
---
 .../compact/PartialUpdateMergeFunction.java        |   6 +-
 .../compact/aggregate/AggregateMergeFunction.java  |   3 +-
 .../compact/aggregate/FieldAggregator.java         |  51 +----
 .../compact/aggregate/FieldBoolAndAgg.java         |  31 +--
 .../compact/aggregate/FieldBoolOrAgg.java          |  31 +--
 .../compact/aggregate/FieldCollectAgg.java         |  11 +-
 .../aggregate/FieldFirstNonNullValueAgg.java       |  12 +-
 .../compact/aggregate/FieldFirstValueAgg.java      |  11 +-
 .../compact/aggregate/FieldHllSketchAgg.java       |  15 +-
 .../compact/aggregate/FieldIgnoreRetractAgg.java   |  11 +-
 .../aggregate/FieldLastNonNullValueAgg.java        |  11 +-
 .../compact/aggregate/FieldLastValueAgg.java       |  11 +-
 .../compact/aggregate/FieldListaggAgg.java         |  45 ++---
 .../mergetree/compact/aggregate/FieldMaxAgg.java   |  27 +--
 .../compact/aggregate/FieldMergeMapAgg.java        |  11 +-
 .../mergetree/compact/aggregate/FieldMinAgg.java   |  28 +--
 .../compact/aggregate/FieldNestedUpdateAgg.java    |  18 +-
 .../compact/aggregate/FieldPrimaryKeyAgg.java      |  11 +-
 .../compact/aggregate/FieldProductAgg.java         |  95 +++++----
 .../compact/aggregate/FieldRoaringBitmap32Agg.java |  15 +-
 .../compact/aggregate/FieldRoaringBitmap64Agg.java |  15 +-
 .../mergetree/compact/aggregate/FieldSumAgg.java   | 183 ++++++++---------
 .../compact/aggregate/FieldThetaSketchAgg.java     |  15 +-
 .../aggregate/factory/FieldAggregatorFactory.java  |  39 ++++
 .../aggregate/factory/FieldBoolAndAggFactory.java  |  17 +-
 .../aggregate/factory/FieldBoolOrAggFactory.java   |  17 +-
 .../aggregate/factory/FieldCollectAggFactory.java  |  11 +-
 .../factory/FieldFirstNonNullValueAggFactory.java  |  10 +-
 .../FieldFirstNonNullValueAggLegacyFactory.java    |   7 +-
 .../factory/FieldFirstValueAggFactory.java         |  10 +-
 .../factory/FieldHllSketchAggFactory.java          |  10 +-
 .../factory/FieldLastNonNullValueAggFactory.java   |  10 +-
 .../factory/FieldLastValueAggFactory.java          |  10 +-
 .../aggregate/factory/FieldListaggAggFactory.java  |  17 +-
 .../aggregate/factory/FieldMaxAggFactory.java      |  10 +-
 .../aggregate/factory/FieldMergeMapAggFactory.java |  12 +-
 .../aggregate/factory/FieldMinAggFactory.java      |  10 +-
 .../factory/FieldNestedUpdateAggFactory.java       |  13 +-
 .../factory/FieldPrimaryKeyAggFactory.java         |   7 +-
 .../aggregate/factory/FieldProductAggFactory.java  |  17 +-
 .../factory/FieldRoaringBitmap32AggFactory.java    |  10 +-
 .../factory/FieldRoaringBitmap64AggFactory.java    |  10 +-
 .../aggregate/factory/FieldSumAggFactory.java      |  17 +-
 .../factory/FieldThetaSketchAggFactory.java        |  10 +-
 .../LookupChangelogMergeFunctionWrapperTest.java   |  10 +-
 .../compact/aggregate/FieldAggregatorTest.java     | 220 ++++++++++++++++-----
 .../{TestCostomAgg.java => TestCustomAgg.java}     |  10 +-
 ...omAggFactory.java => TestCustomAggFactory.java} |  11 +-
 .../services/org.apache.paimon.factories.Factory   |   2 +-
 49 files changed, 591 insertions(+), 603 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index b15d9388a..4d720cb3f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -23,6 +23,7 @@ import org.apache.paimon.KeyValue;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
+import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
@@ -51,7 +52,6 @@ import java.util.stream.Stream;
 import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
 import static org.apache.paimon.CoreOptions.FIELDS_SEPARATOR;
 import static 
org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE;
-import static 
org.apache.paimon.mergetree.compact.aggregate.FieldAggregator.createFieldAggregator;
 import static org.apache.paimon.utils.InternalRowUtils.createFieldGetters;
 
 /**
@@ -495,7 +495,7 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
                     fieldAggregators.put(
                             i,
                             () ->
-                                    createFieldAggregator(
+                                    FieldAggregatorFactory.create(
                                             fieldType,
                                             strAggFunc,
                                             ignoreRetract,
@@ -506,7 +506,7 @@ public class PartialUpdateMergeFunction implements 
MergeFunction<KeyValue> {
                     fieldAggregators.put(
                             i,
                             () ->
-                                    createFieldAggregator(
+                                    FieldAggregatorFactory.create(
                                             fieldType,
                                             defaultAggFunc,
                                             ignoreRetract,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
index e73bfe8e9..bad77ba91 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.mergetree.compact.MergeFunction;
 import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
+import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
@@ -142,7 +143,7 @@ public class AggregateMergeFunction implements 
MergeFunction<KeyValue> {
 
                 boolean ignoreRetract = 
options.fieldAggIgnoreRetract(fieldName);
                 fieldAggregators[i] =
-                        FieldAggregator.createFieldAggregator(
+                        FieldAggregatorFactory.create(
                                 fieldType,
                                 strAggFunc,
                                 ignoreRetract,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
index b776f0a2e..cd368a818 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
@@ -18,62 +18,23 @@
 
 package org.apache.paimon.mergetree.compact.aggregate;
 
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.factories.FactoryUtil;
-import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory;
 import org.apache.paimon.types.DataType;
 
-import javax.annotation.Nullable;
-
 import java.io.Serializable;
 
 /** abstract class of aggregating a field of a row. */
 public abstract class FieldAggregator implements Serializable {
-    protected DataType fieldType;
 
     private static final long serialVersionUID = 1L;
 
-    public FieldAggregator(DataType dataType) {
-        this.fieldType = dataType;
-    }
-
-    public static FieldAggregator createFieldAggregator(
-            DataType fieldType,
-            @Nullable String strAgg,
-            boolean ignoreRetract,
-            boolean isPrimaryKey,
-            CoreOptions options,
-            String field) {
-        FieldAggregator fieldAggregator;
-        if (isPrimaryKey) {
-            strAgg = FieldPrimaryKeyAgg.NAME;
-        } else if (strAgg == null) {
-            strAgg = FieldLastNonNullValueAgg.NAME;
-        }
-
-        FieldAggregatorFactory fieldAggregatorFactory =
-                FactoryUtil.discoverFactory(
-                        FieldAggregator.class.getClassLoader(),
-                        FieldAggregatorFactory.class,
-                        strAgg);
-        if (fieldAggregatorFactory == null) {
-            throw new RuntimeException(
-                    String.format(
-                            "Use unsupported aggregation: %s or spell 
aggregate function incorrectly!",
-                            strAgg));
-        }
+    protected final DataType fieldType;
+    protected final String name;
 
-        fieldAggregator = fieldAggregatorFactory.create(fieldType, options, 
field);
-
-        if (ignoreRetract) {
-            fieldAggregator = new FieldIgnoreRetractAgg(fieldAggregator);
-        }
-
-        return fieldAggregator;
+    public FieldAggregator(String name, DataType dataType) {
+        this.name = name;
+        this.fieldType = dataType;
     }
 
-    public abstract String name();
-
     public abstract Object agg(Object accumulator, Object inputField);
 
     public Object aggReversed(Object accumulator, Object inputField) {
@@ -89,6 +50,6 @@ public abstract class FieldAggregator implements Serializable 
{
                         "Aggregate function '%s' does not support retraction,"
                                 + " If you allow this function to ignore 
retraction messages,"
                                 + " you can configure 
'fields.${field_name}.ignore-retract'='true'.",
-                        name()));
+                        name));
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldBoolAndAgg.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldBoolAndAgg.java
index 5acf2595a..cc44ce5ed 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldBoolAndAgg.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldBoolAndAgg.java
@@ -18,43 +18,22 @@
 
 package org.apache.paimon.mergetree.compact.aggregate;
 
-import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.BooleanType;
 
 /** bool_and aggregate a field of a row. */
 public class FieldBoolAndAgg extends FieldAggregator {
 
-    public static final String NAME = "bool_and";
-
     private static final long serialVersionUID = 1L;
 
-    public FieldBoolAndAgg(DataType dataType) {
-        super(dataType);
-    }
-
-    @Override
-    public String name() {
-        return NAME;
+    public FieldBoolAndAgg(String name, BooleanType dataType) {
+        super(name, dataType);
     }
 
     @Override
     public Object agg(Object accumulator, Object inputField) {
-        Object boolAnd;
-
         if (accumulator == null || inputField == null) {
-            boolAnd = (inputField == null) ? accumulator : inputField;
-        } else {
-            switch (fieldType.getTypeRoot()) {
-                case BOOLEAN:
-                    boolAnd = (boolean) accumulator && (boolean) inputField;
-                    break;
-                default:
-                    String msg =
-                            String.format(
-                                    "type %s not support in %s",
-                                    fieldType.getTypeRoot().toString(), 
this.getClass().getName());
-                    throw new IllegalArgumentException(msg);
-            }
+            return accumulator == null ? inputField : accumulator;
         }
-        return boolAnd;
+        return (boolean) accumulator && (boolean) inputField;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldBoolOrAgg.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldBoolOrAgg.java
index 03a0c1c3c..105f42191 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldBoolOrAgg.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldBoolOrAgg.java
@@ -18,43 +18,22 @@
 
 package org.apache.paimon.mergetree.compact.aggregate;
 
-import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.BooleanType;
 
 /** bool_or aggregate a field of a row. */
 public class FieldBoolOrAgg extends FieldAggregator {
 
-    public static final String NAME = "bool_or";
-
     private static final long serialVersionUID = 1L;
 
-    public FieldBoolOrAgg(DataType dataType) {
-        super(dataType);
-    }
-
-    @Override
-    public String name() {
-        return NAME;
+    public FieldBoolOrAgg(String name, BooleanType dataType) {
+        super(name, dataType);
     }
 
     @Override
     public Object agg(Object accumulator, Object inputField) {
-        Object boolOr;
-
         if (accumulator == null || inputField == null) {
-            boolOr = (inputField == null) ? accumulator : inputField;
-        } else {
-            switch (fieldType.getTypeRoot()) {
-                case BOOLEAN:
-                    boolOr = (boolean) accumulator || (boolean) inputField;
-                    break;
-                default:
-                    String msg =
-                            String.format(
-                                    "type %s not support in %s",
-                                    fieldType.getTypeRoot().toString(), 
this.getClass().getName());
-                    throw new IllegalArgumentException(msg);
-            }
+            return accumulator == null ? inputField : accumulator;
         }
-        return boolOr;
+        return (boolean) accumulator || (boolean) inputField;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java
index 64ef223fa..afe5e05e7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java
@@ -43,16 +43,14 @@ import static 
org.apache.paimon.codegen.CodeGenUtils.newRecordEqualiser;
 /** Collect elements into an ARRAY. */
 public class FieldCollectAgg extends FieldAggregator {
 
-    public static final String NAME = "collect";
-
     private static final long serialVersionUID = 1L;
 
     private final boolean distinct;
     private final InternalArray.ElementGetter elementGetter;
     @Nullable private final BiFunction<Object, Object, Boolean> equaliser;
 
-    public FieldCollectAgg(ArrayType dataType, boolean distinct) {
-        super(dataType);
+    public FieldCollectAgg(String name, ArrayType dataType, boolean distinct) {
+        super(name, dataType);
         this.distinct = distinct;
         this.elementGetter = 
InternalArray.createElementGetter(dataType.getElementType());
 
@@ -84,11 +82,6 @@ public class FieldCollectAgg extends FieldAggregator {
         }
     }
 
-    @Override
-    public String name() {
-        return NAME;
-    }
-
     @Override
     public Object aggReversed(Object accumulator, Object inputField) {
         // we don't need to actually do the reverse here for this agg
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldFirstNonNullValueAgg.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldFirstNonNullValueAgg.java
index 0bd950bbf..273af1a95 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldFirstNonNullValueAgg.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldFirstNonNullValueAgg.java
@@ -23,20 +23,12 @@ import org.apache.paimon.types.DataType;
 /** first non-null value aggregate a field of a row. */
 public class FieldFirstNonNullValueAgg extends FieldAggregator {
 
-    public static final String NAME = "first_non_null_value";
-    public static final String LEGACY_NAME = "first_not_null_value";
-
     private static final long serialVersionUID = 1L;
 
     private boolean initialized;
 
-    public FieldFirstNonNullValueAgg(DataType dataType) {
-        super(dataType);
-    }
-
-    @Override
-    public String name() {
-        return NAME;
+    public FieldFirstNonNullValueAgg(String name, DataType dataType) {
+        super(name, dataType);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldFirstValueAgg.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldFirstValueAgg.java
index d31a6e0ae..436f841d9 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldFirstValueAgg.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldFirstValueAgg.java
@@ -23,19 +23,12 @@ import org.apache.paimon.types.DataType;
 /** first value aggregate a field of a row. */
 public class FieldFirstValueAgg extends FieldAggregator {
 
-    public static final String NAME = "first_value";
-
     private static final long serialVersionUID = 1L;
 
     private boolean initialized;
 
-    public FieldFirstValueAgg(DataType dataType) {
-        super(dataType);
-    }
-
-    @Override
-    public String name() {
-        return NAME;
+    public FieldFirstValueAgg(String name, DataType dataType) {
+        super(name, dataType);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldHllSketchAgg.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldHllSketchAgg.java
index 0ccf4af64..aa399ac37 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldHllSketchAgg.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldHllSketchAgg.java
@@ -24,25 +24,14 @@ import org.apache.paimon.utils.HllSketchUtil;
 /** HllSketch aggregate a field of a row. */
 public class FieldHllSketchAgg extends FieldAggregator {
 
-    public static final String NAME = "hll_sketch";
-
     private static final long serialVersionUID = 1L;
 
-    public FieldHllSketchAgg(VarBinaryType dataType) {
-        super(dataType);
-    }
-
-    @Override
-    public String name() {
-        return NAME;
+    public FieldHllSketchAgg(String name, VarBinaryType dataType) {
+        super(name, dataType);
     }
 
     @Override
     public Object agg(Object accumulator, Object inputField) {
-        if (accumulator == null && inputField == null) {
-            return null;
-        }
-
         if (accumulator == null || inputField == null) {
             return accumulator == null ? inputField : accumulator;
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldIgnoreRetractAgg.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldIgnoreRetractAgg.java
index 40772c6d1..e98e64852 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldIgnoreRetractAgg.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldIgnoreRetractAgg.java
@@ -21,20 +21,15 @@ package org.apache.paimon.mergetree.compact.aggregate;
 /** An aggregator which ignores retraction messages. */
 public class FieldIgnoreRetractAgg extends FieldAggregator {
 
-    private final FieldAggregator aggregator;
-
     private static final long serialVersionUID = 1L;
 
+    private final FieldAggregator aggregator;
+
     public FieldIgnoreRetractAgg(FieldAggregator aggregator) {
-        super(aggregator.fieldType);
+        super(aggregator.name, aggregator.fieldType);
         this.aggregator = aggregator;
     }
 
-    @Override
-    public String name() {
-        return aggregator.name();
-    }
-
     @Override
     public Object agg(Object accumulator, Object inputField) {
         return aggregator.agg(accumulator, inputField);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldLastNonNullValueAgg.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldLastNonNullValueAgg.java
index e189c20b2..cc5383739 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldLastNonNullValueAgg.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldLastNonNullValueAgg.java
@@ -23,17 +23,10 @@ import org.apache.paimon.types.DataType;
 /** last non-null value aggregate a field of a row. */
 public class FieldLastNonNullValueAgg extends FieldAggregator {
 
-    public static final String NAME = "last_non_null_value";
-
     private static final long serialVersionUID = 1L;
 
-    public FieldLastNonNullValueAgg(DataType dataType) {
-        super(dataType);
-    }
-
-    @Override
-    public String name() {
-        return NAME;
+    public FieldLastNonNullValueAgg(String name, DataType dataType) {
+        super(name, dataType);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldLastValueAgg.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldLastValueAgg.java
index 22c2b3da1..592f080fb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldLastValueAgg.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldLastValueAgg.java
@@ -23,17 +23,10 @@ import org.apache.paimon.types.DataType;
 /** last value aggregate a field of a row. */
 public class FieldLastValueAgg extends FieldAggregator {
 
-    public static final String NAME = "last_value";
-
     private static final long serialVersionUID = 1L;
 
-    public FieldLastValueAgg(DataType dataType) {
-        super(dataType);
-    }
-
-    @Override
-    public String name() {
-        return NAME;
+    public FieldLastValueAgg(String name, DataType dataType) {
+        super(name, dataType);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java
index 3bde7e7cc..a01891501 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java
@@ -20,53 +20,32 @@ package org.apache.paimon.mergetree.compact.aggregate;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.VarCharType;
 import org.apache.paimon.utils.StringUtils;
 
 /** listagg aggregate a field of a row. */
 public class FieldListaggAgg extends FieldAggregator {
 
-    public static final String NAME = "listagg";
-
     private static final long serialVersionUID = 1L;
 
     private final String delimiter;
 
-    public FieldListaggAgg(DataType dataType, CoreOptions options, String 
field) {
-        super(dataType);
+    public FieldListaggAgg(String name, VarCharType dataType, CoreOptions 
options, String field) {
+        super(name, dataType);
         this.delimiter = options.fieldListAggDelimiter(field);
     }
 
-    @Override
-    public String name() {
-        return NAME;
-    }
-
     @Override
     public Object agg(Object accumulator, Object inputField) {
-        Object concatenate;
-
-        if (inputField == null || accumulator == null) {
-            concatenate = (inputField == null) ? accumulator : inputField;
-        } else {
-            // ordered by type root definition
-            switch (fieldType.getTypeRoot()) {
-                case VARCHAR:
-                    // TODO: ensure not VARCHAR(n)
-                    BinaryString mergeFieldSD = (BinaryString) accumulator;
-                    BinaryString inFieldSD = (BinaryString) inputField;
-                    concatenate =
-                            StringUtils.concat(
-                                    mergeFieldSD, 
BinaryString.fromString(delimiter), inFieldSD);
-                    break;
-                default:
-                    String msg =
-                            String.format(
-                                    "type %s not support in %s",
-                                    fieldType.getTypeRoot().toString(), 
this.getClass().getName());
-                    throw new IllegalArgumentException(msg);
-            }
+        if (accumulator == null || inputField == null) {
+            return accumulator == null ? inputField : accumulator;
         }
-        return concatenate;
+        // ordered by type root definition
+
+        // TODO: ensure not VARCHAR(n)
+        BinaryString mergeFieldSD = (BinaryString) accumulator;
+        BinaryString inFieldSD = (BinaryString) inputField;
+
+        return StringUtils.concat(mergeFieldSD, 
BinaryString.fromString(delimiter), inFieldSD);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMaxAgg.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMaxAgg.java
index 34e7dd139..06628244e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMaxAgg.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMaxAgg.java
@@ -25,33 +25,20 @@ import org.apache.paimon.utils.InternalRowUtils;
 /** max aggregate a field of a row. */
 public class FieldMaxAgg extends FieldAggregator {
 
-    public static final String NAME = "max";
-
     private static final long serialVersionUID = 1L;
 
-    public FieldMaxAgg(DataType dataType) {
-        super(dataType);
-    }
-
-    @Override
-    public String name() {
-        return NAME;
+    public FieldMaxAgg(String name, DataType dataType) {
+        super(name, dataType);
     }
 
     @Override
     public Object agg(Object accumulator, Object inputField) {
-        Object max;
-
         if (accumulator == null || inputField == null) {
-            max = (accumulator == null ? inputField : accumulator);
-        } else {
-            DataTypeRoot type = fieldType.getTypeRoot();
-            if (InternalRowUtils.compare(accumulator, inputField, type) < 0) {
-                max = inputField;
-            } else {
-                max = accumulator;
-            }
+            return accumulator == null ? inputField : accumulator;
         }
-        return max;
+        DataTypeRoot type = fieldType.getTypeRoot();
+        return InternalRowUtils.compare(accumulator, inputField, type) < 0
+                ? inputField
+                : accumulator;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java
index f597e8de5..9965339af 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java
@@ -31,25 +31,18 @@ import java.util.Set;
 /** Merge two maps. */
 public class FieldMergeMapAgg extends FieldAggregator {
 
-    public static final String NAME = "merge_map";
-
     private static final long serialVersionUID = 1L;
 
     private final InternalArray.ElementGetter keyGetter;
     private final InternalArray.ElementGetter valueGetter;
 
-    public FieldMergeMapAgg(MapType dataType) {
-        super(dataType);
+    public FieldMergeMapAgg(String name, MapType dataType) {
+        super(name, dataType);
 
         this.keyGetter = 
InternalArray.createElementGetter(dataType.getKeyType());
         this.valueGetter = 
InternalArray.createElementGetter(dataType.getValueType());
     }
 
-    @Override
-    public String name() {
-        return NAME;
-    }
-
     @Override
     public Object agg(Object accumulator, Object inputField) {
         if (accumulator == null || inputField == null) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMinAgg.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMinAgg.java
index 4002966db..01b0403ba 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMinAgg.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMinAgg.java
@@ -25,33 +25,21 @@ import org.apache.paimon.utils.InternalRowUtils;
 /** min aggregate a field of a row. */
 public class FieldMinAgg extends FieldAggregator {
 
-    public static final String NAME = "min";
-
     private static final long serialVersionUID = 1L;
 
-    public FieldMinAgg(DataType dataType) {
-        super(dataType);
-    }
-
-    @Override
-    public String name() {
-        return NAME;
+    public FieldMinAgg(String name, DataType dataType) {
+        super(name, dataType);
     }
 
     @Override
     public Object agg(Object accumulator, Object inputField) {
-        Object min;
-
         if (accumulator == null || inputField == null) {
-            min = (accumulator == null ? inputField : accumulator);
-        } else {
-            DataTypeRoot type = fieldType.getTypeRoot();
-            if (InternalRowUtils.compare(accumulator, inputField, type) < 0) {
-                min = accumulator;
-            } else {
-                min = inputField;
-            }
+            return accumulator == null ? inputField : accumulator;
         }
-        return min;
+
+        DataTypeRoot type = fieldType.getTypeRoot();
+        return InternalRowUtils.compare(accumulator, inputField, type) < 0
+                ? accumulator
+                : inputField;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java
index 3cd29f127..005bf7b17 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java
@@ -44,8 +44,6 @@ import static 
org.apache.paimon.utils.Preconditions.checkNotNull;
  */
 public class FieldNestedUpdateAgg extends FieldAggregator {
 
-    public static final String NAME = "nested_update";
-
     private static final long serialVersionUID = 1L;
 
     private final int nestedFields;
@@ -53,8 +51,8 @@ public class FieldNestedUpdateAgg extends FieldAggregator {
     @Nullable private final Projection keyProjection;
     @Nullable private final RecordEqualiser elementEqualiser;
 
-    public FieldNestedUpdateAgg(ArrayType dataType, List<String> nestedKey) {
-        super(dataType);
+    public FieldNestedUpdateAgg(String name, ArrayType dataType, List<String> 
nestedKey) {
+        super(name, dataType);
         RowType nestedType = (RowType) dataType.getElementType();
         this.nestedFields = nestedType.getFieldCount();
         if (nestedKey.isEmpty()) {
@@ -66,18 +64,10 @@ public class FieldNestedUpdateAgg extends FieldAggregator {
         }
     }
 
-    @Override
-    public String name() {
-        return NAME;
-    }
-
     @Override
     public Object agg(Object accumulator, Object inputField) {
-        if (accumulator == null) {
-            return inputField;
-        }
-        if (inputField == null) {
-            return accumulator;
+        if (accumulator == null || inputField == null) {
+            return accumulator == null ? inputField : accumulator;
         }
 
         InternalArray acc = (InternalArray) accumulator;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldPrimaryKeyAgg.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldPrimaryKeyAgg.java
index e8053be1e..3db4e9b32 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldPrimaryKeyAgg.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldPrimaryKeyAgg.java
@@ -23,17 +23,10 @@ import org.apache.paimon.types.DataType;
 /** primary key aggregate a field of a row. */
 public class FieldPrimaryKeyAgg extends FieldAggregator {
 
-    public static final String NAME = "primary-key";
-
     private static final long serialVersionUID = 1L;
 
-    public FieldPrimaryKeyAgg(DataType dataType) {
-        super(dataType);
-    }
-
-    @Override
-    public String name() {
-        return NAME;
+    public FieldPrimaryKeyAgg(String name, DataType dataType) {
+        super(name, dataType);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldProductAgg.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldProductAgg.java
index c3fb18232..26a0c0c52 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldProductAgg.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldProductAgg.java
@@ -28,65 +28,58 @@ import static org.apache.paimon.data.Decimal.fromBigDecimal;
 /** product value aggregate a field of a row. */
 public class FieldProductAgg extends FieldAggregator {
 
-    public static final String NAME = "product";
-
     private static final long serialVersionUID = 1L;
 
-    public FieldProductAgg(DataType dataType) {
-        super(dataType);
-    }
-
-    @Override
-    public String name() {
-        return NAME;
+    public FieldProductAgg(String name, DataType dataType) {
+        super(name, dataType);
     }
 
     @Override
     public Object agg(Object accumulator, Object inputField) {
+        if (accumulator == null || inputField == null) {
+            return accumulator == null ? inputField : accumulator;
+        }
+
         Object product;
 
-        if (accumulator == null || inputField == null) {
-            product = (accumulator == null ? inputField : accumulator);
-        } else {
-            // ordered by type root definition
-            switch (fieldType.getTypeRoot()) {
-                case DECIMAL:
-                    Decimal mergeFieldDD = (Decimal) accumulator;
-                    Decimal inFieldDD = (Decimal) inputField;
-                    assert mergeFieldDD.scale() == inFieldDD.scale()
-                            : "Inconsistent scale of aggregate Decimal!";
-                    assert mergeFieldDD.precision() == inFieldDD.precision()
-                            : "Inconsistent precision of aggregate Decimal!";
-                    BigDecimal bigDecimal = mergeFieldDD.toBigDecimal();
-                    BigDecimal bigDecimal1 = inFieldDD.toBigDecimal();
-                    BigDecimal mul = bigDecimal.multiply(bigDecimal1);
-                    product = fromBigDecimal(mul, mergeFieldDD.precision(), 
mergeFieldDD.scale());
-                    break;
-                case TINYINT:
-                    product = (byte) ((byte) accumulator * (byte) inputField);
-                    break;
-                case SMALLINT:
-                    product = (short) ((short) accumulator * (short) 
inputField);
-                    break;
-                case INTEGER:
-                    product = (int) accumulator * (int) inputField;
-                    break;
-                case BIGINT:
-                    product = (long) accumulator * (long) inputField;
-                    break;
-                case FLOAT:
-                    product = (float) accumulator * (float) inputField;
-                    break;
-                case DOUBLE:
-                    product = (double) accumulator * (double) inputField;
-                    break;
-                default:
-                    String msg =
-                            String.format(
-                                    "type %s not support in %s",
-                                    fieldType.getTypeRoot().toString(), 
this.getClass().getName());
-                    throw new IllegalArgumentException(msg);
-            }
+        // ordered by type root definition
+        switch (fieldType.getTypeRoot()) {
+            case DECIMAL:
+                Decimal mergeFieldDD = (Decimal) accumulator;
+                Decimal inFieldDD = (Decimal) inputField;
+                assert mergeFieldDD.scale() == inFieldDD.scale()
+                        : "Inconsistent scale of aggregate Decimal!";
+                assert mergeFieldDD.precision() == inFieldDD.precision()
+                        : "Inconsistent precision of aggregate Decimal!";
+                BigDecimal bigDecimal = mergeFieldDD.toBigDecimal();
+                BigDecimal bigDecimal1 = inFieldDD.toBigDecimal();
+                BigDecimal mul = bigDecimal.multiply(bigDecimal1);
+                product = fromBigDecimal(mul, mergeFieldDD.precision(), 
mergeFieldDD.scale());
+                break;
+            case TINYINT:
+                product = (byte) ((byte) accumulator * (byte) inputField);
+                break;
+            case SMALLINT:
+                product = (short) ((short) accumulator * (short) inputField);
+                break;
+            case INTEGER:
+                product = (int) accumulator * (int) inputField;
+                break;
+            case BIGINT:
+                product = (long) accumulator * (long) inputField;
+                break;
+            case FLOAT:
+                product = (float) accumulator * (float) inputField;
+                break;
+            case DOUBLE:
+                product = (double) accumulator * (double) inputField;
+                break;
+            default:
+                String msg =
+                        String.format(
+                                "type %s not support in %s",
+                                fieldType.getTypeRoot().toString(), 
this.getClass().getName());
+                throw new IllegalArgumentException(msg);
         }
         return product;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldRoaringBitmap32Agg.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldRoaringBitmap32Agg.java
index 15cbc2b96..ef7ac20e8 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldRoaringBitmap32Agg.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldRoaringBitmap32Agg.java
@@ -26,29 +26,18 @@ import java.io.IOException;
 /** roaring bitmap aggregate a field of a row. */
 public class FieldRoaringBitmap32Agg extends FieldAggregator {
 
-    public static final String NAME = "rbm32";
-
     private static final long serialVersionUID = 1L;
     private final RoaringBitmap32 roaringBitmapAcc;
     private final RoaringBitmap32 roaringBitmapInput;
 
-    public FieldRoaringBitmap32Agg(VarBinaryType dataType) {
-        super(dataType);
+    public FieldRoaringBitmap32Agg(String name, VarBinaryType dataType) {
+        super(name, dataType);
         this.roaringBitmapAcc = new RoaringBitmap32();
         this.roaringBitmapInput = new RoaringBitmap32();
     }
 
-    @Override
-    public String name() {
-        return NAME;
-    }
-
     @Override
     public Object agg(Object accumulator, Object inputField) {
-        if (accumulator == null && inputField == null) {
-            return null;
-        }
-
         if (accumulator == null || inputField == null) {
             return accumulator == null ? inputField : accumulator;
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldRoaringBitmap64Agg.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldRoaringBitmap64Agg.java
index aa9cff1fe..b1d096497 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldRoaringBitmap64Agg.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldRoaringBitmap64Agg.java
@@ -26,29 +26,18 @@ import java.io.IOException;
 /** roaring bitmap aggregate a field of a row. */
 public class FieldRoaringBitmap64Agg extends FieldAggregator {
 
-    public static final String NAME = "rbm64";
-
     private static final long serialVersionUID = 1L;
     private final RoaringBitmap64 roaringBitmapAcc;
     private final RoaringBitmap64 roaringBitmapInput;
 
-    public FieldRoaringBitmap64Agg(VarBinaryType dataType) {
-        super(dataType);
+    public FieldRoaringBitmap64Agg(String name, VarBinaryType dataType) {
+        super(name, dataType);
         this.roaringBitmapAcc = new RoaringBitmap64();
         this.roaringBitmapInput = new RoaringBitmap64();
     }
 
-    @Override
-    public String name() {
-        return NAME;
-    }
-
     @Override
     public Object agg(Object accumulator, Object inputField) {
-        if (accumulator == null && inputField == null) {
-            return null;
-        }
-
         if (accumulator == null || inputField == null) {
             return accumulator == null ? inputField : accumulator;
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldSumAgg.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldSumAgg.java
index 18e4bbeaf..4b3ad12ae 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldSumAgg.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldSumAgg.java
@@ -25,118 +25,109 @@ import org.apache.paimon.utils.DecimalUtils;
 /** sum aggregate a field of a row. */
 public class FieldSumAgg extends FieldAggregator {
 
-    public static final String NAME = "sum";
-
     private static final long serialVersionUID = 1L;
 
-    public FieldSumAgg(DataType dataType) {
-        super(dataType);
-    }
-
-    @Override
-    public String name() {
-        return NAME;
+    public FieldSumAgg(String name, DataType dataType) {
+        super(name, dataType);
     }
 
     @Override
     public Object agg(Object accumulator, Object inputField) {
+        if (accumulator == null || inputField == null) {
+            return accumulator == null ? inputField : accumulator;
+        }
         Object sum;
 
-        if (accumulator == null || inputField == null) {
-            sum = (accumulator == null ? inputField : accumulator);
-        } else {
-            // ordered by type root definition
-            switch (fieldType.getTypeRoot()) {
-                case DECIMAL:
-                    Decimal mergeFieldDD = (Decimal) accumulator;
-                    Decimal inFieldDD = (Decimal) inputField;
-                    assert mergeFieldDD.scale() == inFieldDD.scale()
-                            : "Inconsistent scale of aggregate Decimal!";
-                    assert mergeFieldDD.precision() == inFieldDD.precision()
-                            : "Inconsistent precision of aggregate Decimal!";
-                    sum =
-                            DecimalUtils.add(
-                                    mergeFieldDD,
-                                    inFieldDD,
-                                    mergeFieldDD.precision(),
-                                    mergeFieldDD.scale());
-                    break;
-                case TINYINT:
-                    sum = (byte) ((byte) accumulator + (byte) inputField);
-                    break;
-                case SMALLINT:
-                    sum = (short) ((short) accumulator + (short) inputField);
-                    break;
-                case INTEGER:
-                    sum = (int) accumulator + (int) inputField;
-                    break;
-                case BIGINT:
-                    sum = (long) accumulator + (long) inputField;
-                    break;
-                case FLOAT:
-                    sum = (float) accumulator + (float) inputField;
-                    break;
-                case DOUBLE:
-                    sum = (double) accumulator + (double) inputField;
-                    break;
-                default:
-                    String msg =
-                            String.format(
-                                    "type %s not support in %s",
-                                    fieldType.getTypeRoot().toString(), 
this.getClass().getName());
-                    throw new IllegalArgumentException(msg);
-            }
+        // ordered by type root definition
+        switch (fieldType.getTypeRoot()) {
+            case DECIMAL:
+                Decimal mergeFieldDD = (Decimal) accumulator;
+                Decimal inFieldDD = (Decimal) inputField;
+                assert mergeFieldDD.scale() == inFieldDD.scale()
+                        : "Inconsistent scale of aggregate Decimal!";
+                assert mergeFieldDD.precision() == inFieldDD.precision()
+                        : "Inconsistent precision of aggregate Decimal!";
+                sum =
+                        DecimalUtils.add(
+                                mergeFieldDD,
+                                inFieldDD,
+                                mergeFieldDD.precision(),
+                                mergeFieldDD.scale());
+                break;
+            case TINYINT:
+                sum = (byte) ((byte) accumulator + (byte) inputField);
+                break;
+            case SMALLINT:
+                sum = (short) ((short) accumulator + (short) inputField);
+                break;
+            case INTEGER:
+                sum = (int) accumulator + (int) inputField;
+                break;
+            case BIGINT:
+                sum = (long) accumulator + (long) inputField;
+                break;
+            case FLOAT:
+                sum = (float) accumulator + (float) inputField;
+                break;
+            case DOUBLE:
+                sum = (double) accumulator + (double) inputField;
+                break;
+            default:
+                String msg =
+                        String.format(
+                                "type %s not support in %s",
+                                fieldType.getTypeRoot().toString(), 
this.getClass().getName());
+                throw new IllegalArgumentException(msg);
         }
         return sum;
     }
 
     @Override
     public Object retract(Object accumulator, Object inputField) {
-        Object sum;
 
         if (accumulator == null || inputField == null) {
-            sum = (accumulator == null ? negative(inputField) : accumulator);
-        } else {
-            switch (fieldType.getTypeRoot()) {
-                case DECIMAL:
-                    Decimal mergeFieldDD = (Decimal) accumulator;
-                    Decimal inFieldDD = (Decimal) inputField;
-                    assert mergeFieldDD.scale() == inFieldDD.scale()
-                            : "Inconsistent scale of aggregate Decimal!";
-                    assert mergeFieldDD.precision() == inFieldDD.precision()
-                            : "Inconsistent precision of aggregate Decimal!";
-                    sum =
-                            DecimalUtils.subtract(
-                                    mergeFieldDD,
-                                    inFieldDD,
-                                    mergeFieldDD.precision(),
-                                    mergeFieldDD.scale());
-                    break;
-                case TINYINT:
-                    sum = (byte) ((byte) accumulator - (byte) inputField);
-                    break;
-                case SMALLINT:
-                    sum = (short) ((short) accumulator - (short) inputField);
-                    break;
-                case INTEGER:
-                    sum = (int) accumulator - (int) inputField;
-                    break;
-                case BIGINT:
-                    sum = (long) accumulator - (long) inputField;
-                    break;
-                case FLOAT:
-                    sum = (float) accumulator - (float) inputField;
-                    break;
-                case DOUBLE:
-                    sum = (double) accumulator - (double) inputField;
-                    break;
-                default:
-                    String msg =
-                            String.format(
-                                    "type %s not support in %s",
-                                    fieldType.getTypeRoot().toString(), 
this.getClass().getName());
-                    throw new IllegalArgumentException(msg);
-            }
+            return (accumulator == null ? negative(inputField) : accumulator);
+        }
+        Object sum;
+        switch (fieldType.getTypeRoot()) {
+            case DECIMAL:
+                Decimal mergeFieldDD = (Decimal) accumulator;
+                Decimal inFieldDD = (Decimal) inputField;
+                assert mergeFieldDD.scale() == inFieldDD.scale()
+                        : "Inconsistent scale of aggregate Decimal!";
+                assert mergeFieldDD.precision() == inFieldDD.precision()
+                        : "Inconsistent precision of aggregate Decimal!";
+                sum =
+                        DecimalUtils.subtract(
+                                mergeFieldDD,
+                                inFieldDD,
+                                mergeFieldDD.precision(),
+                                mergeFieldDD.scale());
+                break;
+            case TINYINT:
+                sum = (byte) ((byte) accumulator - (byte) inputField);
+                break;
+            case SMALLINT:
+                sum = (short) ((short) accumulator - (short) inputField);
+                break;
+            case INTEGER:
+                sum = (int) accumulator - (int) inputField;
+                break;
+            case BIGINT:
+                sum = (long) accumulator - (long) inputField;
+                break;
+            case FLOAT:
+                sum = (float) accumulator - (float) inputField;
+                break;
+            case DOUBLE:
+                sum = (double) accumulator - (double) inputField;
+                break;
+            default:
+                String msg =
+                        String.format(
+                                "type %s not support in %s",
+                                fieldType.getTypeRoot().toString(), 
this.getClass().getName());
+                throw new IllegalArgumentException(msg);
         }
         return sum;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldThetaSketchAgg.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldThetaSketchAgg.java
index 7182a6744..9622b4aff 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldThetaSketchAgg.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldThetaSketchAgg.java
@@ -24,25 +24,14 @@ import org.apache.paimon.utils.ThetaSketch;
 /** ThetaSketch aggregate a field of a row. */
 public class FieldThetaSketchAgg extends FieldAggregator {
 
-    public static final String NAME = "theta_sketch";
-
     private static final long serialVersionUID = 1L;
 
-    public FieldThetaSketchAgg(VarBinaryType dataType) {
-        super(dataType);
-    }
-
-    @Override
-    public String name() {
-        return NAME;
+    public FieldThetaSketchAgg(String name, VarBinaryType dataType) {
+        super(name, dataType);
     }
 
     @Override
     public Object agg(Object accumulator, Object inputField) {
-        if (accumulator == null && inputField == null) {
-            return null;
-        }
-
         if (accumulator == null || inputField == null) {
             return accumulator == null ? inputField : accumulator;
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldAggregatorFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldAggregatorFactory.java
index 44f2439fe..d2ce0e476 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldAggregatorFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldAggregatorFactory.java
@@ -20,13 +20,52 @@ package 
org.apache.paimon.mergetree.compact.aggregate.factory;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.factories.Factory;
+import org.apache.paimon.factories.FactoryUtil;
 import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
+import org.apache.paimon.mergetree.compact.aggregate.FieldIgnoreRetractAgg;
 import org.apache.paimon.types.DataType;
 
+import javax.annotation.Nullable;
+
 /** Factory for {@link FieldAggregator}. */
 public interface FieldAggregatorFactory extends Factory {
 
     FieldAggregator create(DataType fieldType, CoreOptions options, String 
field);
 
     String identifier();
+
+    static FieldAggregator create(
+            DataType fieldType,
+            @Nullable String strAgg,
+            boolean ignoreRetract,
+            boolean isPrimaryKey,
+            CoreOptions options,
+            String field) {
+        FieldAggregator fieldAggregator;
+        if (isPrimaryKey) {
+            strAgg = FieldPrimaryKeyAggFactory.NAME;
+        } else if (strAgg == null) {
+            strAgg = FieldLastNonNullValueAggFactory.NAME;
+        }
+
+        FieldAggregatorFactory fieldAggregatorFactory =
+                FactoryUtil.discoverFactory(
+                        FieldAggregator.class.getClassLoader(),
+                        FieldAggregatorFactory.class,
+                        strAgg);
+        if (fieldAggregatorFactory == null) {
+            throw new RuntimeException(
+                    String.format(
+                            "Use unsupported aggregation: %s or spell 
aggregate function incorrectly!",
+                            strAgg));
+        }
+
+        fieldAggregator = fieldAggregatorFactory.create(fieldType, options, 
field);
+
+        if (ignoreRetract) {
+            fieldAggregator = new FieldIgnoreRetractAgg(fieldAggregator);
+        }
+
+        return fieldAggregator;
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldBoolAndAggFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldBoolAndAggFactory.java
index 4a3a2dc88..45bb8708d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldBoolAndAggFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldBoolAndAggFactory.java
@@ -19,19 +19,28 @@
 package org.apache.paimon.mergetree.compact.aggregate.factory;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
 import org.apache.paimon.mergetree.compact.aggregate.FieldBoolAndAgg;
+import org.apache.paimon.types.BooleanType;
 import org.apache.paimon.types.DataType;
 
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
 /** Factory for #{@link FieldBoolAndAgg}. */
 public class FieldBoolAndAggFactory implements FieldAggregatorFactory {
+
+    public static final String NAME = "bool_and";
+
     @Override
-    public FieldAggregator create(DataType fieldType, CoreOptions options, 
String field) {
-        return new FieldBoolAndAgg(fieldType);
+    public FieldBoolAndAgg create(DataType fieldType, CoreOptions options, 
String field) {
+        checkArgument(
+                fieldType instanceof BooleanType,
+                "Data type for bool and column must be 'BooleanType' but was 
'%s'.",
+                fieldType);
+        return new FieldBoolAndAgg(identifier(), (BooleanType) fieldType);
     }
 
     @Override
     public String identifier() {
-        return FieldBoolAndAgg.NAME;
+        return NAME;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldBoolOrAggFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldBoolOrAggFactory.java
index 488325eae..266ccad6a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldBoolOrAggFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldBoolOrAggFactory.java
@@ -19,19 +19,28 @@
 package org.apache.paimon.mergetree.compact.aggregate.factory;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
 import org.apache.paimon.mergetree.compact.aggregate.FieldBoolOrAgg;
+import org.apache.paimon.types.BooleanType;
 import org.apache.paimon.types.DataType;
 
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
 /** Factory for #{@link FieldBoolOrAgg}. */
 public class FieldBoolOrAggFactory implements FieldAggregatorFactory {
+
+    public static final String NAME = "bool_or";
+
     @Override
-    public FieldAggregator create(DataType fieldType, CoreOptions options, 
String field) {
-        return new FieldBoolOrAgg(fieldType);
+    public FieldBoolOrAgg create(DataType fieldType, CoreOptions options, 
String field) {
+        checkArgument(
+                fieldType instanceof BooleanType,
+                "Data type for bool or column must be 'BooleanType' but was 
'%s'.",
+                fieldType);
+        return new FieldBoolOrAgg(identifier(), (BooleanType) fieldType);
     }
 
     @Override
     public String identifier() {
-        return FieldBoolOrAgg.NAME;
+        return NAME;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldCollectAggFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldCollectAggFactory.java
index b20453e07..a4325d165 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldCollectAggFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldCollectAggFactory.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.mergetree.compact.aggregate.factory;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
 import org.apache.paimon.mergetree.compact.aggregate.FieldCollectAgg;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.DataType;
@@ -28,17 +27,21 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Factory for #{@link FieldCollectAgg}. */
 public class FieldCollectAggFactory implements FieldAggregatorFactory {
+
+    public static final String NAME = "collect";
+
     @Override
-    public FieldAggregator create(DataType fieldType, CoreOptions options, 
String field) {
+    public FieldCollectAgg create(DataType fieldType, CoreOptions options, 
String field) {
         checkArgument(
                 fieldType instanceof ArrayType,
                 "Data type for collect column must be 'Array' but was '%s'.",
                 fieldType);
-        return new FieldCollectAgg((ArrayType) fieldType, 
options.fieldCollectAggDistinct(field));
+        return new FieldCollectAgg(
+                identifier(), (ArrayType) fieldType, 
options.fieldCollectAggDistinct(field));
     }
 
     @Override
     public String identifier() {
-        return FieldCollectAgg.NAME;
+        return NAME;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstNonNullValueAggFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstNonNullValueAggFactory.java
index 141da6342..51e3a6d62 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstNonNullValueAggFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstNonNullValueAggFactory.java
@@ -19,19 +19,21 @@
 package org.apache.paimon.mergetree.compact.aggregate.factory;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
 import org.apache.paimon.mergetree.compact.aggregate.FieldFirstNonNullValueAgg;
 import org.apache.paimon.types.DataType;
 
 /** Factory for #{@link FieldFirstNonNullValueAgg}. */
 public class FieldFirstNonNullValueAggFactory implements 
FieldAggregatorFactory {
+
+    public static final String NAME = "first_non_null_value";
+
     @Override
-    public FieldAggregator create(DataType fieldType, CoreOptions options, 
String field) {
-        return new FieldFirstNonNullValueAgg(fieldType);
+    public FieldFirstNonNullValueAgg create(DataType fieldType, CoreOptions 
options, String field) {
+        return new FieldFirstNonNullValueAgg(identifier(), fieldType);
     }
 
     @Override
     public String identifier() {
-        return FieldFirstNonNullValueAgg.NAME;
+        return NAME;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstNonNullValueAggLegacyFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstNonNullValueAggLegacyFactory.java
index 1d92dd3ec..507ecbb5c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstNonNullValueAggLegacyFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstNonNullValueAggLegacyFactory.java
@@ -25,13 +25,16 @@ import org.apache.paimon.types.DataType;
 
 /** Factory for legacy name of #{@link FieldFirstNonNullValueAgg}. */
 public class FieldFirstNonNullValueAggLegacyFactory implements 
FieldAggregatorFactory {
+
+    public static final String LEGACY_NAME = "first_not_null_value";
+
     @Override
     public FieldAggregator create(DataType fieldType, CoreOptions options, 
String field) {
-        return new FieldFirstNonNullValueAgg(fieldType);
+        return new FieldFirstNonNullValueAgg(identifier(), fieldType);
     }
 
     @Override
     public String identifier() {
-        return FieldFirstNonNullValueAgg.LEGACY_NAME;
+        return LEGACY_NAME;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstValueAggFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstValueAggFactory.java
index cc36928c6..84db12ffc 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstValueAggFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstValueAggFactory.java
@@ -19,19 +19,21 @@
 package org.apache.paimon.mergetree.compact.aggregate.factory;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
 import org.apache.paimon.mergetree.compact.aggregate.FieldFirstValueAgg;
 import org.apache.paimon.types.DataType;
 
 /** Factory for #{@link FieldFirstValueAgg}. */
 public class FieldFirstValueAggFactory implements FieldAggregatorFactory {
+
+    public static final String NAME = "first_value";
+
     @Override
-    public FieldAggregator create(DataType fieldType, CoreOptions options, 
String field) {
-        return new FieldFirstValueAgg(fieldType);
+    public FieldFirstValueAgg create(DataType fieldType, CoreOptions options, 
String field) {
+        return new FieldFirstValueAgg(identifier(), fieldType);
     }
 
     @Override
     public String identifier() {
-        return FieldFirstValueAgg.NAME;
+        return NAME;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldHllSketchAggFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldHllSketchAggFactory.java
index 9f57abaee..5777c6a41 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldHllSketchAggFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldHllSketchAggFactory.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.mergetree.compact.aggregate.factory;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
 import org.apache.paimon.mergetree.compact.aggregate.FieldHllSketchAgg;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.VarBinaryType;
@@ -28,17 +27,20 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Factory for #{@link FieldHllSketchAgg}. */
 public class FieldHllSketchAggFactory implements FieldAggregatorFactory {
+
+    public static final String NAME = "hll_sketch";
+
     @Override
-    public FieldAggregator create(DataType fieldType, CoreOptions options, 
String field) {
+    public FieldHllSketchAgg create(DataType fieldType, CoreOptions options, 
String field) {
         checkArgument(
                 fieldType instanceof VarBinaryType,
                 "Data type for hll sketch column must be 'VarBinaryType' but 
was '%s'.",
                 fieldType);
-        return new FieldHllSketchAgg((VarBinaryType) fieldType);
+        return new FieldHllSketchAgg(identifier(), (VarBinaryType) fieldType);
     }
 
     @Override
     public String identifier() {
-        return FieldHllSketchAgg.NAME;
+        return NAME;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldLastNonNullValueAggFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldLastNonNullValueAggFactory.java
index e3e2ff079..bbc6402bb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldLastNonNullValueAggFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldLastNonNullValueAggFactory.java
@@ -19,19 +19,21 @@
 package org.apache.paimon.mergetree.compact.aggregate.factory;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
 import org.apache.paimon.mergetree.compact.aggregate.FieldLastNonNullValueAgg;
 import org.apache.paimon.types.DataType;
 
 /** Factory for #{@link FieldLastNonNullValueAgg}. */
 public class FieldLastNonNullValueAggFactory implements FieldAggregatorFactory 
{
+
+    public static final String NAME = "last_non_null_value";
+
     @Override
-    public FieldAggregator create(DataType fieldType, CoreOptions options, 
String field) {
-        return new FieldLastNonNullValueAgg(fieldType);
+    public FieldLastNonNullValueAgg create(DataType fieldType, CoreOptions 
options, String field) {
+        return new FieldLastNonNullValueAgg(identifier(), fieldType);
     }
 
     @Override
     public String identifier() {
-        return FieldLastNonNullValueAgg.NAME;
+        return NAME;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldLastValueAggFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldLastValueAggFactory.java
index b3423a39e..c825825a1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldLastValueAggFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldLastValueAggFactory.java
@@ -19,19 +19,21 @@
 package org.apache.paimon.mergetree.compact.aggregate.factory;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
 import org.apache.paimon.mergetree.compact.aggregate.FieldLastValueAgg;
 import org.apache.paimon.types.DataType;
 
 /** Factory for #{@link FieldLastValueAgg}. */
 public class FieldLastValueAggFactory implements FieldAggregatorFactory {
+
+    public static final String NAME = "last_value";
+
     @Override
-    public FieldAggregator create(DataType fieldType, CoreOptions options, 
String field) {
-        return new FieldLastValueAgg(fieldType);
+    public FieldLastValueAgg create(DataType fieldType, CoreOptions options, 
String field) {
+        return new FieldLastValueAgg(identifier(), fieldType);
     }
 
     @Override
     public String identifier() {
-        return FieldLastValueAgg.NAME;
+        return NAME;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldListaggAggFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldListaggAggFactory.java
index e5e85dbab..cdb9c128a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldListaggAggFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldListaggAggFactory.java
@@ -19,19 +19,28 @@
 package org.apache.paimon.mergetree.compact.aggregate.factory;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
 import org.apache.paimon.mergetree.compact.aggregate.FieldListaggAgg;
 import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.VarCharType;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Factory for #{@link FieldListaggAgg}. */
 public class FieldListaggAggFactory implements FieldAggregatorFactory {
+
+    public static final String NAME = "listagg";
+
     @Override
-    public FieldAggregator create(DataType fieldType, CoreOptions options, 
String field) {
-        return new FieldListaggAgg(fieldType, options, field);
+    public FieldListaggAgg create(DataType fieldType, CoreOptions options, 
String field) {
+        checkArgument(
+                fieldType instanceof VarCharType,
+                "Data type for list agg column must be 'VarCharType' but was 
'%s'.",
+                fieldType);
+        return new FieldListaggAgg(identifier(), (VarCharType) fieldType, 
options, field);
     }
 
     @Override
     public String identifier() {
-        return FieldListaggAgg.NAME;
+        return NAME;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMaxAggFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMaxAggFactory.java
index 2fda49a76..4e3c33171 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMaxAggFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMaxAggFactory.java
@@ -19,19 +19,21 @@
 package org.apache.paimon.mergetree.compact.aggregate.factory;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
 import org.apache.paimon.mergetree.compact.aggregate.FieldMaxAgg;
 import org.apache.paimon.types.DataType;
 
 /** Factory for #{@link FieldMaxAgg}. */
 public class FieldMaxAggFactory implements FieldAggregatorFactory {
+
+    public static final String NAME = "max";
+
     @Override
-    public FieldAggregator create(DataType fieldType, CoreOptions options, 
String field) {
-        return new FieldMaxAgg(fieldType);
+    public FieldMaxAgg create(DataType fieldType, CoreOptions options, String 
field) {
+        return new FieldMaxAgg(identifier(), fieldType);
     }
 
     @Override
     public String identifier() {
-        return FieldMaxAgg.NAME;
+        return NAME;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMergeMapAggFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMergeMapAggFactory.java
index ac2409b25..e10602f61 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMergeMapAggFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMergeMapAggFactory.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.mergetree.compact.aggregate.factory;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
 import org.apache.paimon.mergetree.compact.aggregate.FieldMergeMapAgg;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.MapType;
@@ -28,17 +27,20 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Factory for #{@link FieldMergeMapAgg}. */
 public class FieldMergeMapAggFactory implements FieldAggregatorFactory {
+
+    public static final String NAME = "merge_map";
+
     @Override
-    public FieldAggregator create(DataType fieldType, CoreOptions options, 
String field) {
+    public FieldMergeMapAgg create(DataType fieldType, CoreOptions options, 
String field) {
         checkArgument(
                 fieldType instanceof MapType,
-                "Data type of merge map column must be 'MAP' but was '%s'",
+                "Data type for merge map column must be 'MAP' but was '%s'",
                 fieldType);
-        return new FieldMergeMapAgg((MapType) fieldType);
+        return new FieldMergeMapAgg(identifier(), (MapType) fieldType);
     }
 
     @Override
     public String identifier() {
-        return FieldMergeMapAgg.NAME;
+        return NAME;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMinAggFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMinAggFactory.java
index e939f8a73..4ac7c08b1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMinAggFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMinAggFactory.java
@@ -19,19 +19,21 @@
 package org.apache.paimon.mergetree.compact.aggregate.factory;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
 import org.apache.paimon.mergetree.compact.aggregate.FieldMinAgg;
 import org.apache.paimon.types.DataType;
 
 /** Factory for #{@link FieldMinAgg}. */
 public class FieldMinAggFactory implements FieldAggregatorFactory {
+
+    public static final String NAME = "min";
+
     @Override
-    public FieldAggregator create(DataType fieldType, CoreOptions options, 
String field) {
-        return new FieldMinAgg(fieldType);
+    public FieldMinAgg create(DataType fieldType, CoreOptions options, String 
field) {
+        return new FieldMinAgg(identifier(), fieldType);
     }
 
     @Override
     public String identifier() {
-        return FieldMinAgg.NAME;
+        return NAME;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java
index 41b9f3e41..b92df6414 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java
@@ -32,6 +32,9 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Factory for #{@link FieldNestedUpdateAgg}. */
 public class FieldNestedUpdateAggFactory implements FieldAggregatorFactory {
+
+    public static final String NAME = "nested_update";
+
     @Override
     public FieldAggregator create(DataType fieldType, CoreOptions options, 
String field) {
         return createFieldNestedUpdateAgg(fieldType, 
options.fieldNestedUpdateAggNestedKey(field));
@@ -39,20 +42,20 @@ public class FieldNestedUpdateAggFactory implements 
FieldAggregatorFactory {
 
     @Override
     public String identifier() {
-        return FieldNestedUpdateAgg.NAME;
+        return NAME;
     }
 
-    private static FieldAggregator createFieldNestedUpdateAgg(
-            DataType fieldType, List<String> nestedKey) {
+    private FieldAggregator createFieldNestedUpdateAgg(DataType fieldType, 
List<String> nestedKey) {
         if (nestedKey == null) {
             nestedKey = Collections.emptyList();
         }
 
-        String typeErrorMsg = "Data type of nested table column must be 
'Array<Row>' but was '%s'.";
+        String typeErrorMsg =
+                "Data type for nested table column must be 'Array<Row>' but 
was '%s'.";
         checkArgument(fieldType instanceof ArrayType, typeErrorMsg, fieldType);
         ArrayType arrayType = (ArrayType) fieldType;
         checkArgument(arrayType.getElementType() instanceof RowType, 
typeErrorMsg, fieldType);
 
-        return new FieldNestedUpdateAgg(arrayType, nestedKey);
+        return new FieldNestedUpdateAgg(identifier(), arrayType, nestedKey);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldPrimaryKeyAggFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldPrimaryKeyAggFactory.java
index 312d29753..0e293bcf7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldPrimaryKeyAggFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldPrimaryKeyAggFactory.java
@@ -25,13 +25,16 @@ import org.apache.paimon.types.DataType;
 
 /** Factory for #{@link FieldPrimaryKeyAgg}. */
 public class FieldPrimaryKeyAggFactory implements FieldAggregatorFactory {
+
+    public static final String NAME = "primary-key";
+
     @Override
     public FieldAggregator create(DataType fieldType, CoreOptions options, 
String field) {
-        return new FieldPrimaryKeyAgg(fieldType);
+        return new FieldPrimaryKeyAgg(identifier(), fieldType);
     }
 
     @Override
     public String identifier() {
-        return FieldPrimaryKeyAgg.NAME;
+        return NAME;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldProductAggFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldProductAggFactory.java
index 88be9c42a..7dbdd9f5a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldProductAggFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldProductAggFactory.java
@@ -19,19 +19,28 @@
 package org.apache.paimon.mergetree.compact.aggregate.factory;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
 import org.apache.paimon.mergetree.compact.aggregate.FieldProductAgg;
 import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeFamily;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Factory for #{@link FieldProductAgg}. */
 public class FieldProductAggFactory implements FieldAggregatorFactory {
+
+    public static final String NAME = "product";
+
     @Override
-    public FieldAggregator create(DataType fieldType, CoreOptions options, 
String field) {
-        return new FieldProductAgg(fieldType);
+    public FieldProductAgg create(DataType fieldType, CoreOptions options, 
String field) {
+        checkArgument(
+                
fieldType.getTypeRoot().getFamilies().contains(DataTypeFamily.NUMERIC),
+                "Data type for product column must be 'NumericType' but was 
'%s'.",
+                fieldType);
+        return new FieldProductAgg(identifier(), fieldType);
     }
 
     @Override
     public String identifier() {
-        return FieldProductAgg.NAME;
+        return NAME;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldRoaringBitmap32AggFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldRoaringBitmap32AggFactory.java
index 5b2a80b30..91103791f 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldRoaringBitmap32AggFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldRoaringBitmap32AggFactory.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.mergetree.compact.aggregate.factory;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
 import org.apache.paimon.mergetree.compact.aggregate.FieldRoaringBitmap32Agg;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.VarBinaryType;
@@ -28,17 +27,20 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Factory for #{@link FieldRoaringBitmap32Agg}. */
 public class FieldRoaringBitmap32AggFactory implements FieldAggregatorFactory {
+
+    public static final String NAME = "rbm32";
+
     @Override
-    public FieldAggregator create(DataType fieldType, CoreOptions options, 
String field) {
+    public FieldRoaringBitmap32Agg create(DataType fieldType, CoreOptions 
options, String field) {
         checkArgument(
                 fieldType instanceof VarBinaryType,
                 "Data type for roaring bitmap column must be 'VarBinaryType' 
but was '%s'.",
                 fieldType);
-        return new FieldRoaringBitmap32Agg((VarBinaryType) fieldType);
+        return new FieldRoaringBitmap32Agg(identifier(), (VarBinaryType) 
fieldType);
     }
 
     @Override
     public String identifier() {
-        return FieldRoaringBitmap32Agg.NAME;
+        return NAME;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldRoaringBitmap64AggFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldRoaringBitmap64AggFactory.java
index a0e9fe652..56f5554af 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldRoaringBitmap64AggFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldRoaringBitmap64AggFactory.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.mergetree.compact.aggregate.factory;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
 import org.apache.paimon.mergetree.compact.aggregate.FieldRoaringBitmap64Agg;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.VarBinaryType;
@@ -28,17 +27,20 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Factory for #{@link FieldRoaringBitmap64Agg}. */
 public class FieldRoaringBitmap64AggFactory implements FieldAggregatorFactory {
+
+    public static final String NAME = "rbm64";
+
     @Override
-    public FieldAggregator create(DataType fieldType, CoreOptions options, 
String field) {
+    public FieldRoaringBitmap64Agg create(DataType fieldType, CoreOptions 
options, String field) {
         checkArgument(
                 fieldType instanceof VarBinaryType,
                 "Data type for roaring bitmap column must be 'VarBinaryType' 
but was '%s'.",
                 fieldType);
-        return new FieldRoaringBitmap64Agg((VarBinaryType) fieldType);
+        return new FieldRoaringBitmap64Agg(identifier(), (VarBinaryType) 
fieldType);
     }
 
     @Override
     public String identifier() {
-        return FieldRoaringBitmap64Agg.NAME;
+        return NAME;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldSumAggFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldSumAggFactory.java
index 8470e6d17..5343f67b6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldSumAggFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldSumAggFactory.java
@@ -19,19 +19,28 @@
 package org.apache.paimon.mergetree.compact.aggregate.factory;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
 import org.apache.paimon.mergetree.compact.aggregate.FieldSumAgg;
 import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeFamily;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Factory for #{@link FieldSumAgg}. */
 public class FieldSumAggFactory implements FieldAggregatorFactory {
+
+    public static final String NAME = "sum";
+
     @Override
-    public FieldAggregator create(DataType fieldType, CoreOptions options, 
String field) {
-        return new FieldSumAgg(fieldType);
+    public FieldSumAgg create(DataType fieldType, CoreOptions options, String 
field) {
+        checkArgument(
+                
fieldType.getTypeRoot().getFamilies().contains(DataTypeFamily.NUMERIC),
+                "Data type for sum column must be 'NumericType' but was '%s'.",
+                fieldType);
+        return new FieldSumAgg(identifier(), fieldType);
     }
 
     @Override
     public String identifier() {
-        return FieldSumAgg.NAME;
+        return NAME;
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldThetaSketchAggFactory.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldThetaSketchAggFactory.java
index 6aa5d7bc4..c30fb7df7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldThetaSketchAggFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldThetaSketchAggFactory.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.mergetree.compact.aggregate.factory;
 
 import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
 import org.apache.paimon.mergetree.compact.aggregate.FieldThetaSketchAgg;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.VarBinaryType;
@@ -28,17 +27,20 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Factory for #{@link FieldThetaSketchAgg}. */
 public class FieldThetaSketchAggFactory implements FieldAggregatorFactory {
+
+    public static final String NAME = "theta_sketch";
+
     @Override
-    public FieldAggregator create(DataType fieldType, CoreOptions options, 
String field) {
+    public FieldThetaSketchAgg create(DataType fieldType, CoreOptions options, 
String field) {
         checkArgument(
                 fieldType instanceof VarBinaryType,
                 "Data type for theta sketch column must be 'VarBinaryType' but 
was '%s'.",
                 fieldType);
-        return new FieldThetaSketchAgg((VarBinaryType) fieldType);
+        return new FieldThetaSketchAgg(identifier(), (VarBinaryType) 
fieldType);
     }
 
     @Override
     public String identifier() {
-        return FieldThetaSketchAgg.NAME;
+        return NAME;
     }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index c8344c44d..28cb4c099 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -26,8 +26,8 @@ import org.apache.paimon.data.InternalRow.FieldGetter;
 import org.apache.paimon.lookup.LookupStrategy;
 import org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction;
 import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
-import org.apache.paimon.mergetree.compact.aggregate.FieldLastValueAgg;
-import org.apache.paimon.mergetree.compact.aggregate.FieldSumAgg;
+import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastValueAggFactory;
+import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldSumAggFactory;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.DataTypes;
@@ -293,7 +293,8 @@ public class LookupChangelogMergeFunctionWrapperTest {
                                                     row -> row.isNullAt(0) ? 
null : row.getInt(0)
                                                 },
                                                 new FieldAggregator[] {
-                                                    new 
FieldSumAgg(DataTypes.INT())
+                                                    new FieldSumAggFactory()
+                                                            
.create(DataTypes.INT(), null, null)
                                                 }),
                                 RowType.of(DataTypes.INT()),
                                 RowType.of(DataTypes.INT())),
@@ -381,7 +382,8 @@ public class LookupChangelogMergeFunctionWrapperTest {
                                                     row -> row.isNullAt(0) ? 
null : row.getInt(0)
                                                 },
                                                 new FieldAggregator[] {
-                                                    new 
FieldLastValueAgg(DataTypes.INT())
+                                                    new 
FieldLastValueAggFactory()
+                                                            
.create(DataTypes.INT(), null, null)
                                                 }),
                                 RowType.of(DataTypes.INT()),
                                 RowType.of(DataTypes.INT())),
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
index 0fa1433e7..d32098b80 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
@@ -27,6 +27,25 @@ import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalArray;
 import org.apache.paimon.data.InternalMap;
 import org.apache.paimon.data.InternalRow;
+import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory;
+import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldBoolAndAggFactory;
+import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldBoolOrAggFactory;
+import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldCollectAggFactory;
+import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldFirstNonNullValueAggFactory;
+import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldFirstValueAggFactory;
+import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldHllSketchAggFactory;
+import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastNonNullValueAggFactory;
+import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastValueAggFactory;
+import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldListaggAggFactory;
+import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldMaxAggFactory;
+import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldMergeMapAggFactory;
+import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldMinAggFactory;
+import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldNestedUpdateAggFactory;
+import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldProductAggFactory;
+import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldRoaringBitmap32AggFactory;
+import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldRoaringBitmap64AggFactory;
+import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldSumAggFactory;
+import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldThetaSketchAggFactory;
 import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.BooleanType;
@@ -67,14 +86,16 @@ public class FieldAggregatorTest {
 
     @Test
     public void testFieldBoolAndAgg() {
-        FieldBoolAndAgg fieldBoolAndAgg = new FieldBoolAndAgg(new 
BooleanType());
+        FieldBoolAndAgg fieldBoolAndAgg =
+                new FieldBoolAndAggFactory().create(new BooleanType(), null, 
null);
         assertThat(fieldBoolAndAgg.agg(false, true)).isEqualTo(false);
         assertThat(fieldBoolAndAgg.agg(true, true)).isEqualTo(true);
     }
 
     @Test
     public void testFieldBoolOrAgg() {
-        FieldBoolOrAgg fieldBoolOrAgg = new FieldBoolOrAgg(new BooleanType());
+        FieldBoolOrAgg fieldBoolOrAgg =
+                new FieldBoolOrAggFactory().create(new BooleanType(), null, 
null);
         assertThat(fieldBoolOrAgg.agg(false, true)).isEqualTo(true);
         assertThat(fieldBoolOrAgg.agg(false, false)).isEqualTo(false);
     }
@@ -82,7 +103,7 @@ public class FieldAggregatorTest {
     @Test
     public void testFieldLastNonNullValueAgg() {
         FieldLastNonNullValueAgg fieldLastNonNullValueAgg =
-                new FieldLastNonNullValueAgg(new IntType());
+                new FieldLastNonNullValueAggFactory().create(new IntType(), 
null, null);
         Integer accumulator = null;
         Integer inputField = 1;
         assertThat(fieldLastNonNullValueAgg.agg(accumulator, 
inputField)).isEqualTo(1);
@@ -94,7 +115,8 @@ public class FieldAggregatorTest {
 
     @Test
     public void testFieldLastValueAgg() {
-        FieldLastValueAgg fieldLastValueAgg = new FieldLastValueAgg(new 
IntType());
+        FieldLastValueAgg fieldLastValueAgg =
+                new FieldLastValueAggFactory().create(new IntType(), null, 
null);
         Integer accumulator = null;
         Integer inputField = 1;
         assertThat(fieldLastValueAgg.agg(accumulator, 
inputField)).isEqualTo(1);
@@ -106,7 +128,8 @@ public class FieldAggregatorTest {
 
     @Test
     public void testFieldFirstValueAgg() {
-        FieldFirstValueAgg fieldFirstValueAgg = new FieldFirstValueAgg(new 
IntType());
+        FieldFirstValueAgg fieldFirstValueAgg =
+                new FieldFirstValueAggFactory().create(new IntType(), null, 
null);
         assertThat(fieldFirstValueAgg.agg(null, 1)).isEqualTo(1);
         assertThat(fieldFirstValueAgg.agg(1, 2)).isEqualTo(1);
 
@@ -117,7 +140,7 @@ public class FieldAggregatorTest {
     @Test
     public void testFieldFirstNonNullValueAgg() {
         FieldFirstNonNullValueAgg fieldFirstNonNullValueAgg =
-                new FieldFirstNonNullValueAgg(new IntType());
+                new FieldFirstNonNullValueAggFactory().create(new IntType(), 
null, null);
         assertThat(fieldFirstNonNullValueAgg.agg(null, null)).isNull();
         assertThat(fieldFirstNonNullValueAgg.agg(null, 1)).isEqualTo(1);
         assertThat(fieldFirstNonNullValueAgg.agg(1, 2)).isEqualTo(1);
@@ -129,8 +152,8 @@ public class FieldAggregatorTest {
     @Test
     public void testFieldListAggWithDefaultDelimiter() {
         FieldListaggAgg fieldListaggAgg =
-                new FieldListaggAgg(
-                        new VarCharType(), new CoreOptions(new HashMap<>()), 
"fieldName");
+                new FieldListaggAggFactory()
+                        .create(new VarCharType(), new CoreOptions(new 
HashMap<>()), "fieldName");
         BinaryString accumulator = BinaryString.fromString("user1");
         BinaryString inputField = BinaryString.fromString("user2");
         assertThat(fieldListaggAgg.agg(accumulator, inputField).toString())
@@ -140,11 +163,13 @@ public class FieldAggregatorTest {
     @Test
     public void testFieldListAggWithCustomDelimiter() {
         FieldListaggAgg fieldListaggAgg =
-                new FieldListaggAgg(
-                        new VarCharType(),
-                        CoreOptions.fromMap(
-                                
ImmutableMap.of("fields.fieldName.list-agg-delimiter", "-")),
-                        "fieldName");
+                new FieldListaggAggFactory()
+                        .create(
+                                new VarCharType(),
+                                CoreOptions.fromMap(
+                                        ImmutableMap.of(
+                                                
"fields.fieldName.list-agg-delimiter", "-")),
+                                "fieldName");
         BinaryString accumulator = BinaryString.fromString("user1");
         BinaryString inputField = BinaryString.fromString("user2");
         assertThat(fieldListaggAgg.agg(accumulator, inputField).toString())
@@ -153,7 +178,7 @@ public class FieldAggregatorTest {
 
     @Test
     public void testFieldMaxAgg() {
-        FieldMaxAgg fieldMaxAgg = new FieldMaxAgg(new IntType());
+        FieldMaxAgg fieldMaxAgg = new FieldMaxAggFactory().create(new 
IntType(), null, null);
         Integer accumulator = 1;
         Integer inputField = 10;
         assertThat(fieldMaxAgg.agg(accumulator, inputField)).isEqualTo(10);
@@ -161,7 +186,7 @@ public class FieldAggregatorTest {
 
     @Test
     public void testFieldMinAgg() {
-        FieldMinAgg fieldMinAgg = new FieldMinAgg(new IntType());
+        FieldMinAgg fieldMinAgg = new FieldMinAggFactory().create(new 
IntType(), null, null);
         Integer accumulator = 1;
         Integer inputField = 10;
         assertThat(fieldMinAgg.agg(accumulator, inputField)).isEqualTo(1);
@@ -169,7 +194,7 @@ public class FieldAggregatorTest {
 
     @Test
     public void testFieldSumIntAgg() {
-        FieldSumAgg fieldSumAgg = new FieldSumAgg(new IntType());
+        FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new 
IntType(), null, null);
         assertThat(fieldSumAgg.agg(null, 10)).isEqualTo(10);
         assertThat(fieldSumAgg.agg(1, 10)).isEqualTo(11);
         assertThat(fieldSumAgg.retract(10, 5)).isEqualTo(5);
@@ -178,7 +203,8 @@ public class FieldAggregatorTest {
 
     @Test
     public void testFieldProductIntAgg() {
-        FieldProductAgg fieldProductAgg = new FieldProductAgg(new IntType());
+        FieldProductAgg fieldProductAgg =
+                new FieldProductAggFactory().create(new IntType(), null, null);
         assertThat(fieldProductAgg.agg(null, 10)).isEqualTo(10);
         assertThat(fieldProductAgg.agg(1, 10)).isEqualTo(10);
         assertThat(fieldProductAgg.retract(10, 5)).isEqualTo(2);
@@ -187,7 +213,7 @@ public class FieldAggregatorTest {
 
     @Test
     public void testFieldSumByteAgg() {
-        FieldSumAgg fieldSumAgg = new FieldSumAgg(new TinyIntType());
+        FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new 
TinyIntType(), null, null);
         assertThat(fieldSumAgg.agg(null, (byte) 10)).isEqualTo((byte) 10);
         assertThat(fieldSumAgg.agg((byte) 1, (byte) 10)).isEqualTo((byte) 11);
         assertThat(fieldSumAgg.retract((byte) 10, (byte) 5)).isEqualTo((byte) 
5);
@@ -196,7 +222,8 @@ public class FieldAggregatorTest {
 
     @Test
     public void testFieldProductByteAgg() {
-        FieldProductAgg fieldProductAgg = new FieldProductAgg(new 
TinyIntType());
+        FieldProductAgg fieldProductAgg =
+                new FieldProductAggFactory().create(new TinyIntType(), null, 
null);
         assertThat(fieldProductAgg.agg(null, (byte) 10)).isEqualTo((byte) 10);
         assertThat(fieldProductAgg.agg((byte) 1, (byte) 10)).isEqualTo((byte) 
10);
         assertThat(fieldProductAgg.retract((byte) 10, (byte) 
5)).isEqualTo((byte) 2);
@@ -205,7 +232,8 @@ public class FieldAggregatorTest {
 
     @Test
     public void testFieldProductShortAgg() {
-        FieldProductAgg fieldProductAgg = new FieldProductAgg(new 
SmallIntType());
+        FieldProductAgg fieldProductAgg =
+                new FieldProductAggFactory().create(new SmallIntType(), null, 
null);
         assertThat(fieldProductAgg.agg(null, (short) 10)).isEqualTo((short) 
10);
         assertThat(fieldProductAgg.agg((short) 1, (short) 
10)).isEqualTo((short) 10);
         assertThat(fieldProductAgg.retract((short) 10, (short) 
5)).isEqualTo((short) 2);
@@ -214,7 +242,7 @@ public class FieldAggregatorTest {
 
     @Test
     public void testFieldSumShortAgg() {
-        FieldSumAgg fieldSumAgg = new FieldSumAgg(new SmallIntType());
+        FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new 
SmallIntType(), null, null);
         assertThat(fieldSumAgg.agg(null, (short) 10)).isEqualTo((short) 10);
         assertThat(fieldSumAgg.agg((short) 1, (short) 10)).isEqualTo((short) 
11);
         assertThat(fieldSumAgg.retract((short) 10, (short) 
5)).isEqualTo((short) 5);
@@ -223,7 +251,7 @@ public class FieldAggregatorTest {
 
     @Test
     public void testFieldSumLongAgg() {
-        FieldSumAgg fieldSumAgg = new FieldSumAgg(new BigIntType());
+        FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new 
BigIntType(), null, null);
         assertThat(fieldSumAgg.agg(null, 10L)).isEqualTo(10L);
         assertThat(fieldSumAgg.agg(1L, 10L)).isEqualTo(11L);
         assertThat(fieldSumAgg.retract(10L, 5L)).isEqualTo(5L);
@@ -232,7 +260,8 @@ public class FieldAggregatorTest {
 
     @Test
     public void testFieldProductLongAgg() {
-        FieldProductAgg fieldProductAgg = new FieldProductAgg(new 
BigIntType());
+        FieldProductAgg fieldProductAgg =
+                new FieldProductAggFactory().create(new BigIntType(), null, 
null);
         assertThat(fieldProductAgg.agg(null, 10L)).isEqualTo(10L);
         assertThat(fieldProductAgg.agg(1L, 10L)).isEqualTo(10L);
         assertThat(fieldProductAgg.retract(10L, 5L)).isEqualTo(2L);
@@ -241,7 +270,8 @@ public class FieldAggregatorTest {
 
     @Test
     public void testFieldProductFloatAgg() {
-        FieldProductAgg fieldProductAgg = new FieldProductAgg(new FloatType());
+        FieldProductAgg fieldProductAgg =
+                new FieldProductAggFactory().create(new FloatType(), null, 
null);
         assertThat(fieldProductAgg.agg(null, (float) 10)).isEqualTo((float) 
10);
         assertThat(fieldProductAgg.agg((float) 1, (float) 
10)).isEqualTo((float) 10);
         assertThat(fieldProductAgg.retract((float) 10, (float) 
5)).isEqualTo((float) 2);
@@ -250,7 +280,7 @@ public class FieldAggregatorTest {
 
     @Test
     public void testFieldSumFloatAgg() {
-        FieldSumAgg fieldSumAgg = new FieldSumAgg(new FloatType());
+        FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new 
FloatType(), null, null);
         assertThat(fieldSumAgg.agg(null, (float) 10)).isEqualTo((float) 10);
         assertThat(fieldSumAgg.agg((float) 1, (float) 10)).isEqualTo((float) 
11);
         assertThat(fieldSumAgg.retract((float) 10, (float) 
5)).isEqualTo((float) 5);
@@ -259,7 +289,8 @@ public class FieldAggregatorTest {
 
     @Test
     public void testFieldProductDoubleAgg() {
-        FieldProductAgg fieldProductAgg = new FieldProductAgg(new 
DoubleType());
+        FieldProductAgg fieldProductAgg =
+                new FieldProductAggFactory().create(new DoubleType(), null, 
null);
         assertThat(fieldProductAgg.agg(null, (double) 10)).isEqualTo((double) 
10);
         assertThat(fieldProductAgg.agg((double) 1, (double) 
10)).isEqualTo((double) 10);
         assertThat(fieldProductAgg.retract((double) 10, (double) 
5)).isEqualTo((double) 2);
@@ -268,7 +299,7 @@ public class FieldAggregatorTest {
 
     @Test
     public void testFieldSumDoubleAgg() {
-        FieldSumAgg fieldSumAgg = new FieldSumAgg(new DoubleType());
+        FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new 
DoubleType(), null, null);
         assertThat(fieldSumAgg.agg(null, (double) 10)).isEqualTo((double) 10);
         assertThat(fieldSumAgg.agg((double) 1, (double) 
10)).isEqualTo((double) 11);
         assertThat(fieldSumAgg.retract((double) 10, (double) 
5)).isEqualTo((double) 5);
@@ -277,7 +308,8 @@ public class FieldAggregatorTest {
 
     @Test
     public void testFieldProductDecimalAgg() {
-        FieldProductAgg fieldProductAgg = new FieldProductAgg(new 
DecimalType());
+        FieldProductAgg fieldProductAgg =
+                new FieldProductAggFactory().create(new DecimalType(), null, 
null);
         assertThat(fieldProductAgg.agg(null, 
toDecimal(10))).isEqualTo(toDecimal(10));
         assertThat(fieldProductAgg.agg(toDecimal(1), 
toDecimal(10))).isEqualTo(toDecimal(10));
         assertThat(fieldProductAgg.retract(toDecimal(10), 
toDecimal(5))).isEqualTo(toDecimal(2));
@@ -286,7 +318,7 @@ public class FieldAggregatorTest {
 
     @Test
     public void testFieldSumDecimalAgg() {
-        FieldSumAgg fieldSumAgg = new FieldSumAgg(new DecimalType());
+        FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new 
DecimalType(), null, null);
         assertThat(fieldSumAgg.agg(null, 
toDecimal(10))).isEqualTo(toDecimal(10));
         assertThat(fieldSumAgg.agg(toDecimal(1), 
toDecimal(10))).isEqualTo(toDecimal(11));
         assertThat(fieldSumAgg.retract(toDecimal(10), 
toDecimal(5))).isEqualTo(toDecimal(5));
@@ -307,6 +339,7 @@ public class FieldAggregatorTest {
                         DataTypes.FIELD(2, "v", DataTypes.STRING()));
         FieldNestedUpdateAgg agg =
                 new FieldNestedUpdateAgg(
+                        FieldNestedUpdateAggFactory.NAME,
                         DataTypes.ARRAY(
                                 DataTypes.ROW(
                                         DataTypes.FIELD(0, "k0", 
DataTypes.INT()),
@@ -347,7 +380,10 @@ public class FieldAggregatorTest {
                         DataTypes.FIELD(1, "k1", DataTypes.INT()),
                         DataTypes.FIELD(2, "v", DataTypes.STRING()));
         FieldNestedUpdateAgg agg =
-                new FieldNestedUpdateAgg(DataTypes.ARRAY(elementRowType), 
Collections.emptyList());
+                new FieldNestedUpdateAgg(
+                        FieldNestedUpdateAggFactory.NAME,
+                        DataTypes.ARRAY(elementRowType),
+                        Collections.emptyList());
 
         InternalArray accumulator = null;
         InternalArray.ElementGetter elementGetter =
@@ -385,7 +421,13 @@ public class FieldAggregatorTest {
 
     @Test
     public void testFieldCollectAggWithDistinct() {
-        FieldCollectAgg agg = new 
FieldCollectAgg(DataTypes.ARRAY(DataTypes.INT()), true);
+        FieldCollectAgg agg =
+                new FieldCollectAggFactory()
+                        .create(
+                                DataTypes.ARRAY(DataTypes.INT()),
+                                CoreOptions.fromMap(
+                                        
ImmutableMap.of("fields.fieldName.distinct", "true")),
+                                "fieldName");
 
         InternalArray result;
         InternalArray.ElementGetter elementGetter =
@@ -407,7 +449,13 @@ public class FieldAggregatorTest {
     @Test
     public void testFiledCollectAggWithRowType() {
         RowType rowType = RowType.of(DataTypes.INT(), DataTypes.STRING());
-        FieldCollectAgg agg = new FieldCollectAgg(DataTypes.ARRAY(rowType), 
true);
+        FieldCollectAgg agg =
+                new FieldCollectAggFactory()
+                        .create(
+                                DataTypes.ARRAY(rowType),
+                                CoreOptions.fromMap(
+                                        
ImmutableMap.of("fields.fieldName.distinct", "true")),
+                                "fieldName");
 
         InternalArray result;
         InternalArray.ElementGetter elementGetter = 
InternalArray.createElementGetter(rowType);
@@ -438,7 +486,13 @@ public class FieldAggregatorTest {
     @Test
     public void testFiledCollectAggWithArrayType() {
         ArrayType arrayType = new ArrayType(DataTypes.INT());
-        FieldCollectAgg agg = new FieldCollectAgg(DataTypes.ARRAY(arrayType), 
true);
+        FieldCollectAgg agg =
+                new FieldCollectAggFactory()
+                        .create(
+                                DataTypes.ARRAY(arrayType),
+                                CoreOptions.fromMap(
+                                        
ImmutableMap.of("fields.fieldName.distinct", "true")),
+                                "fieldName");
 
         InternalArray result;
         InternalArray.ElementGetter elementGetter = 
InternalArray.createElementGetter(arrayType);
@@ -469,7 +523,13 @@ public class FieldAggregatorTest {
     @Test
     public void testFiledCollectAggWithMapType() {
         MapType mapType = new MapType(DataTypes.INT(), DataTypes.STRING());
-        FieldCollectAgg agg = new FieldCollectAgg(DataTypes.ARRAY(mapType), 
true);
+        FieldCollectAgg agg =
+                new FieldCollectAggFactory()
+                        .create(
+                                DataTypes.ARRAY(mapType),
+                                CoreOptions.fromMap(
+                                        
ImmutableMap.of("fields.fieldName.distinct", "true")),
+                                "fieldName");
 
         InternalArray result;
         InternalArray.ElementGetter elementGetter = 
InternalArray.createElementGetter(mapType);
@@ -497,7 +557,13 @@ public class FieldAggregatorTest {
 
     @Test
     public void testFieldCollectAggWithoutDistinct() {
-        FieldCollectAgg agg = new 
FieldCollectAgg(DataTypes.ARRAY(DataTypes.INT()), false);
+        FieldCollectAgg agg =
+                new FieldCollectAggFactory()
+                        .create(
+                                DataTypes.ARRAY(DataTypes.INT()),
+                                CoreOptions.fromMap(
+                                        
ImmutableMap.of("fields.fieldName.distinct", "false")),
+                                "fieldName");
 
         InternalArray result;
         InternalArray.ElementGetter elementGetter =
@@ -522,7 +588,13 @@ public class FieldAggregatorTest {
         InternalArray.ElementGetter elementGetter;
 
         // primitive type
-        agg = new FieldCollectAgg(DataTypes.ARRAY(DataTypes.INT()), true);
+        agg =
+                new FieldCollectAggFactory()
+                        .create(
+                                DataTypes.ARRAY(DataTypes.INT()),
+                                CoreOptions.fromMap(
+                                        
ImmutableMap.of("fields.fieldName.distinct", "true")),
+                                "fieldName");
         elementGetter = InternalArray.createElementGetter(DataTypes.INT());
         InternalArray result =
                 (InternalArray)
@@ -533,7 +605,13 @@ public class FieldAggregatorTest {
 
         // row type
         RowType rowType = RowType.of(DataTypes.INT(), DataTypes.STRING());
-        agg = new FieldCollectAgg(DataTypes.ARRAY(rowType), true);
+        agg =
+                new FieldCollectAggFactory()
+                        .create(
+                                DataTypes.ARRAY(rowType),
+                                CoreOptions.fromMap(
+                                        
ImmutableMap.of("fields.fieldName.distinct", "true")),
+                                "fieldName");
         elementGetter = InternalArray.createElementGetter(rowType);
 
         Object[] accElements =
@@ -556,7 +634,13 @@ public class FieldAggregatorTest {
 
         // array type
         ArrayType arrayType = new ArrayType(DataTypes.INT());
-        agg = new FieldCollectAgg(DataTypes.ARRAY(arrayType), true);
+        agg =
+                new FieldCollectAggFactory()
+                        .create(
+                                DataTypes.ARRAY(arrayType),
+                                CoreOptions.fromMap(
+                                        
ImmutableMap.of("fields.fieldName.distinct", "true")),
+                                "fieldName");
         elementGetter = InternalArray.createElementGetter(arrayType);
 
         accElements =
@@ -578,7 +662,13 @@ public class FieldAggregatorTest {
 
         // map type
         MapType mapType = new MapType(DataTypes.INT(), DataTypes.STRING());
-        agg = new FieldCollectAgg(DataTypes.ARRAY(mapType), true);
+        agg =
+                new FieldCollectAggFactory()
+                        .create(
+                                DataTypes.ARRAY(mapType),
+                                CoreOptions.fromMap(
+                                        
ImmutableMap.of("fields.fieldName.distinct", "true")),
+                                "fieldName");
         elementGetter = InternalArray.createElementGetter(mapType);
 
         accElements =
@@ -604,7 +694,13 @@ public class FieldAggregatorTest {
         InternalArray.ElementGetter elementGetter;
 
         // primitive type
-        agg = new FieldCollectAgg(DataTypes.ARRAY(DataTypes.INT()), true);
+        agg =
+                new FieldCollectAggFactory()
+                        .create(
+                                DataTypes.ARRAY(DataTypes.INT()),
+                                CoreOptions.fromMap(
+                                        
ImmutableMap.of("fields.fieldName.distinct", "true")),
+                                "fieldName");
         elementGetter = InternalArray.createElementGetter(DataTypes.INT());
         InternalArray result =
                 (InternalArray)
@@ -615,7 +711,13 @@ public class FieldAggregatorTest {
 
         // row type
         RowType rowType = RowType.of(DataTypes.INT(), DataTypes.STRING());
-        agg = new FieldCollectAgg(DataTypes.ARRAY(rowType), true);
+        agg =
+                new FieldCollectAggFactory()
+                        .create(
+                                DataTypes.ARRAY(rowType),
+                                CoreOptions.fromMap(
+                                        
ImmutableMap.of("fields.fieldName.distinct", "true")),
+                                "fieldName");
         elementGetter = InternalArray.createElementGetter(rowType);
 
         Object[] accElements =
@@ -641,7 +743,13 @@ public class FieldAggregatorTest {
 
         // array type
         ArrayType arrayType = new ArrayType(DataTypes.INT());
-        agg = new FieldCollectAgg(DataTypes.ARRAY(arrayType), true);
+        agg =
+                new FieldCollectAggFactory()
+                        .create(
+                                DataTypes.ARRAY(arrayType),
+                                CoreOptions.fromMap(
+                                        
ImmutableMap.of("fields.fieldName.distinct", "true")),
+                                "fieldName");
         elementGetter = InternalArray.createElementGetter(arrayType);
 
         accElements =
@@ -666,7 +774,13 @@ public class FieldAggregatorTest {
 
         // map type
         MapType mapType = new MapType(DataTypes.INT(), DataTypes.STRING());
-        agg = new FieldCollectAgg(DataTypes.ARRAY(mapType), true);
+        agg =
+                new FieldCollectAggFactory()
+                        .create(
+                                DataTypes.ARRAY(mapType),
+                                CoreOptions.fromMap(
+                                        
ImmutableMap.of("fields.fieldName.distinct", "true")),
+                                "fieldName");
         elementGetter = InternalArray.createElementGetter(mapType);
 
         accElements =
@@ -691,7 +805,8 @@ public class FieldAggregatorTest {
     @Test
     public void testFieldMergeMapAgg() {
         FieldMergeMapAgg agg =
-                new FieldMergeMapAgg(DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING()));
+                new FieldMergeMapAggFactory()
+                        .create(DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING()), null, null);
 
         assertThat(agg.agg(null, null)).isNull();
 
@@ -710,7 +825,8 @@ public class FieldAggregatorTest {
     @Test
     public void testFieldMergeMapAggRetract() {
         FieldMergeMapAgg agg =
-                new FieldMergeMapAgg(DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING()));
+                new FieldMergeMapAggFactory()
+                        .create(DataTypes.MAP(DataTypes.INT(), 
DataTypes.STRING()), null, null);
         Object result =
                 agg.retract(
                         new GenericMap(toMap(1, "A", 2, "B", 3, "C")),
@@ -720,7 +836,8 @@ public class FieldAggregatorTest {
 
     @Test
     public void testFieldThetaSketchAgg() {
-        FieldThetaSketchAgg agg = new 
FieldThetaSketchAgg(DataTypes.VARBINARY(20));
+        FieldThetaSketchAgg agg =
+                new 
FieldThetaSketchAggFactory().create(DataTypes.VARBINARY(20), null, null);
 
         byte[] inputVal = sketchOf(1);
         byte[] acc1 = sketchOf(2, 3);
@@ -743,7 +860,8 @@ public class FieldAggregatorTest {
 
     @Test
     public void testFieldHllSketchAgg() {
-        FieldHllSketchAgg agg = new FieldHllSketchAgg(DataTypes.VARBINARY(20));
+        FieldHllSketchAgg agg =
+                new FieldHllSketchAggFactory().create(DataTypes.VARBINARY(20), 
null, null);
 
         byte[] inputVal = HllSketchUtil.sketchOf(1);
         byte[] acc1 = HllSketchUtil.sketchOf(2, 3);
@@ -766,7 +884,8 @@ public class FieldAggregatorTest {
 
     @Test
     public void testFieldRoaringBitmap32Agg() {
-        FieldRoaringBitmap32Agg agg = new 
FieldRoaringBitmap32Agg(DataTypes.VARBINARY(20));
+        FieldRoaringBitmap32Agg agg =
+                new 
FieldRoaringBitmap32AggFactory().create(DataTypes.VARBINARY(20), null, null);
 
         byte[] inputVal = RoaringBitmap32.bitmapOf(1).serialize();
         byte[] acc1 = RoaringBitmap32.bitmapOf(2, 3).serialize();
@@ -789,7 +908,8 @@ public class FieldAggregatorTest {
 
     @Test
     public void testFieldRoaringBitmap64Agg() throws IOException {
-        FieldRoaringBitmap64Agg agg = new 
FieldRoaringBitmap64Agg(DataTypes.VARBINARY(20));
+        FieldRoaringBitmap64Agg agg =
+                new 
FieldRoaringBitmap64AggFactory().create(DataTypes.VARBINARY(20), null, null);
 
         byte[] inputVal = RoaringBitmap64.bitmapOf(1L).serialize();
         byte[] acc1 = RoaringBitmap64.bitmapOf(2L, 3L).serialize();
@@ -813,7 +933,7 @@ public class FieldAggregatorTest {
     @Test
     public void testCustomAgg() throws IOException {
         FieldAggregator fieldAggregator =
-                FieldAggregator.createFieldAggregator(
+                FieldAggregatorFactory.create(
                         DataTypes.STRING(),
                         "custom",
                         false,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCostomAgg.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCustomAgg.java
similarity index 84%
rename from 
paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCostomAgg.java
rename to 
paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCustomAgg.java
index aedf6a373..3550ebe27 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCostomAgg.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCustomAgg.java
@@ -21,14 +21,10 @@ package org.apache.paimon.mergetree.compact.aggregate;
 import org.apache.paimon.types.DataType;
 
 /** Custom FieldAggregator for Test. */
-public class TestCostomAgg extends FieldAggregator {
-    public TestCostomAgg(DataType dataType) {
-        super(dataType);
-    }
+public class TestCustomAgg extends FieldAggregator {
 
-    @Override
-    public String name() {
-        return "custom";
+    public TestCustomAgg(String name, DataType dataType) {
+        super(name, dataType);
     }
 
     @Override
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCostomAggFactory.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCustomAggFactory.java
similarity index 82%
rename from 
paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCostomAggFactory.java
rename to 
paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCustomAggFactory.java
index e8884bfb5..7e7715f6d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCostomAggFactory.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCustomAggFactory.java
@@ -22,15 +22,18 @@ import org.apache.paimon.CoreOptions;
 import 
org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory;
 import org.apache.paimon.types.DataType;
 
-/** FieldAggregatorFactory for test. */
-public class TestCostomAggFactory implements FieldAggregatorFactory {
+/** FieldAggregatorFactory for #{@link TestCustomAgg} test. */
+public class TestCustomAggFactory implements FieldAggregatorFactory {
+
+    public static final String NAME = "custom";
+
     @Override
     public FieldAggregator create(DataType fieldType, CoreOptions options, 
String field) {
-        return new TestCostomAgg(fieldType);
+        return new TestCustomAgg(identifier(), fieldType);
     }
 
     @Override
     public String identifier() {
-        return "custom";
+        return NAME;
     }
 }
diff --git 
a/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
 
b/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
index f3e74bb89..7eb517ab9 100644
--- 
a/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
+++ 
b/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.paimon.mergetree.compact.aggregate.TestCostomAggFactory
\ No newline at end of file
+org.apache.paimon.mergetree.compact.aggregate.TestCustomAggFactory
\ No newline at end of file

Reply via email to