This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 9339ee633 [core] Refactor the Field Aggregator factory to validate
data types when creating function (#4446)
9339ee633 is described below
commit 9339ee6339e3ff5e1c336c983776415db21b42ce
Author: Kerwin <[email protected]>
AuthorDate: Tue Nov 5 16:23:19 2024 +0800
[core] Refactor the Field Aggregator factory to validate data types when
creating function (#4446)
---
.../compact/PartialUpdateMergeFunction.java | 6 +-
.../compact/aggregate/AggregateMergeFunction.java | 3 +-
.../compact/aggregate/FieldAggregator.java | 51 +----
.../compact/aggregate/FieldBoolAndAgg.java | 31 +--
.../compact/aggregate/FieldBoolOrAgg.java | 31 +--
.../compact/aggregate/FieldCollectAgg.java | 11 +-
.../aggregate/FieldFirstNonNullValueAgg.java | 12 +-
.../compact/aggregate/FieldFirstValueAgg.java | 11 +-
.../compact/aggregate/FieldHllSketchAgg.java | 15 +-
.../compact/aggregate/FieldIgnoreRetractAgg.java | 11 +-
.../aggregate/FieldLastNonNullValueAgg.java | 11 +-
.../compact/aggregate/FieldLastValueAgg.java | 11 +-
.../compact/aggregate/FieldListaggAgg.java | 45 ++---
.../mergetree/compact/aggregate/FieldMaxAgg.java | 27 +--
.../compact/aggregate/FieldMergeMapAgg.java | 11 +-
.../mergetree/compact/aggregate/FieldMinAgg.java | 28 +--
.../compact/aggregate/FieldNestedUpdateAgg.java | 18 +-
.../compact/aggregate/FieldPrimaryKeyAgg.java | 11 +-
.../compact/aggregate/FieldProductAgg.java | 95 +++++----
.../compact/aggregate/FieldRoaringBitmap32Agg.java | 15 +-
.../compact/aggregate/FieldRoaringBitmap64Agg.java | 15 +-
.../mergetree/compact/aggregate/FieldSumAgg.java | 183 ++++++++---------
.../compact/aggregate/FieldThetaSketchAgg.java | 15 +-
.../aggregate/factory/FieldAggregatorFactory.java | 39 ++++
.../aggregate/factory/FieldBoolAndAggFactory.java | 17 +-
.../aggregate/factory/FieldBoolOrAggFactory.java | 17 +-
.../aggregate/factory/FieldCollectAggFactory.java | 11 +-
.../factory/FieldFirstNonNullValueAggFactory.java | 10 +-
.../FieldFirstNonNullValueAggLegacyFactory.java | 7 +-
.../factory/FieldFirstValueAggFactory.java | 10 +-
.../factory/FieldHllSketchAggFactory.java | 10 +-
.../factory/FieldLastNonNullValueAggFactory.java | 10 +-
.../factory/FieldLastValueAggFactory.java | 10 +-
.../aggregate/factory/FieldListaggAggFactory.java | 17 +-
.../aggregate/factory/FieldMaxAggFactory.java | 10 +-
.../aggregate/factory/FieldMergeMapAggFactory.java | 12 +-
.../aggregate/factory/FieldMinAggFactory.java | 10 +-
.../factory/FieldNestedUpdateAggFactory.java | 13 +-
.../factory/FieldPrimaryKeyAggFactory.java | 7 +-
.../aggregate/factory/FieldProductAggFactory.java | 17 +-
.../factory/FieldRoaringBitmap32AggFactory.java | 10 +-
.../factory/FieldRoaringBitmap64AggFactory.java | 10 +-
.../aggregate/factory/FieldSumAggFactory.java | 17 +-
.../factory/FieldThetaSketchAggFactory.java | 10 +-
.../LookupChangelogMergeFunctionWrapperTest.java | 10 +-
.../compact/aggregate/FieldAggregatorTest.java | 220 ++++++++++++++++-----
.../{TestCostomAgg.java => TestCustomAgg.java} | 10 +-
...omAggFactory.java => TestCustomAggFactory.java} | 11 +-
.../services/org.apache.paimon.factories.Factory | 2 +-
49 files changed, 591 insertions(+), 603 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
index b15d9388a..4d720cb3f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/PartialUpdateMergeFunction.java
@@ -23,6 +23,7 @@ import org.apache.paimon.KeyValue;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
+import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
@@ -51,7 +52,6 @@ import java.util.stream.Stream;
import static org.apache.paimon.CoreOptions.FIELDS_PREFIX;
import static org.apache.paimon.CoreOptions.FIELDS_SEPARATOR;
import static
org.apache.paimon.CoreOptions.PARTIAL_UPDATE_REMOVE_RECORD_ON_DELETE;
-import static
org.apache.paimon.mergetree.compact.aggregate.FieldAggregator.createFieldAggregator;
import static org.apache.paimon.utils.InternalRowUtils.createFieldGetters;
/**
@@ -495,7 +495,7 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
fieldAggregators.put(
i,
() ->
- createFieldAggregator(
+ FieldAggregatorFactory.create(
fieldType,
strAggFunc,
ignoreRetract,
@@ -506,7 +506,7 @@ public class PartialUpdateMergeFunction implements
MergeFunction<KeyValue> {
fieldAggregators.put(
i,
() ->
- createFieldAggregator(
+ FieldAggregatorFactory.create(
fieldType,
defaultAggFunc,
ignoreRetract,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
index e73bfe8e9..bad77ba91 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/AggregateMergeFunction.java
@@ -24,6 +24,7 @@ import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.mergetree.compact.MergeFunction;
import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
+import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory;
import org.apache.paimon.options.Options;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
@@ -142,7 +143,7 @@ public class AggregateMergeFunction implements
MergeFunction<KeyValue> {
boolean ignoreRetract =
options.fieldAggIgnoreRetract(fieldName);
fieldAggregators[i] =
- FieldAggregator.createFieldAggregator(
+ FieldAggregatorFactory.create(
fieldType,
strAggFunc,
ignoreRetract,
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
index b776f0a2e..cd368a818 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
@@ -18,62 +18,23 @@
package org.apache.paimon.mergetree.compact.aggregate;
-import org.apache.paimon.CoreOptions;
-import org.apache.paimon.factories.FactoryUtil;
-import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory;
import org.apache.paimon.types.DataType;
-import javax.annotation.Nullable;
-
import java.io.Serializable;
/** abstract class of aggregating a field of a row. */
public abstract class FieldAggregator implements Serializable {
- protected DataType fieldType;
private static final long serialVersionUID = 1L;
- public FieldAggregator(DataType dataType) {
- this.fieldType = dataType;
- }
-
- public static FieldAggregator createFieldAggregator(
- DataType fieldType,
- @Nullable String strAgg,
- boolean ignoreRetract,
- boolean isPrimaryKey,
- CoreOptions options,
- String field) {
- FieldAggregator fieldAggregator;
- if (isPrimaryKey) {
- strAgg = FieldPrimaryKeyAgg.NAME;
- } else if (strAgg == null) {
- strAgg = FieldLastNonNullValueAgg.NAME;
- }
-
- FieldAggregatorFactory fieldAggregatorFactory =
- FactoryUtil.discoverFactory(
- FieldAggregator.class.getClassLoader(),
- FieldAggregatorFactory.class,
- strAgg);
- if (fieldAggregatorFactory == null) {
- throw new RuntimeException(
- String.format(
- "Use unsupported aggregation: %s or spell
aggregate function incorrectly!",
- strAgg));
- }
+ protected final DataType fieldType;
+ protected final String name;
- fieldAggregator = fieldAggregatorFactory.create(fieldType, options,
field);
-
- if (ignoreRetract) {
- fieldAggregator = new FieldIgnoreRetractAgg(fieldAggregator);
- }
-
- return fieldAggregator;
+ public FieldAggregator(String name, DataType dataType) {
+ this.name = name;
+ this.fieldType = dataType;
}
- public abstract String name();
-
public abstract Object agg(Object accumulator, Object inputField);
public Object aggReversed(Object accumulator, Object inputField) {
@@ -89,6 +50,6 @@ public abstract class FieldAggregator implements Serializable
{
"Aggregate function '%s' does not support retraction,"
+ " If you allow this function to ignore
retraction messages,"
+ " you can configure
'fields.${field_name}.ignore-retract'='true'.",
- name()));
+ name));
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldBoolAndAgg.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldBoolAndAgg.java
index 5acf2595a..cc44ce5ed 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldBoolAndAgg.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldBoolAndAgg.java
@@ -18,43 +18,22 @@
package org.apache.paimon.mergetree.compact.aggregate;
-import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.BooleanType;
/** bool_and aggregate a field of a row. */
public class FieldBoolAndAgg extends FieldAggregator {
- public static final String NAME = "bool_and";
-
private static final long serialVersionUID = 1L;
- public FieldBoolAndAgg(DataType dataType) {
- super(dataType);
- }
-
- @Override
- public String name() {
- return NAME;
+ public FieldBoolAndAgg(String name, BooleanType dataType) {
+ super(name, dataType);
}
@Override
public Object agg(Object accumulator, Object inputField) {
- Object boolAnd;
-
if (accumulator == null || inputField == null) {
- boolAnd = (inputField == null) ? accumulator : inputField;
- } else {
- switch (fieldType.getTypeRoot()) {
- case BOOLEAN:
- boolAnd = (boolean) accumulator && (boolean) inputField;
- break;
- default:
- String msg =
- String.format(
- "type %s not support in %s",
- fieldType.getTypeRoot().toString(),
this.getClass().getName());
- throw new IllegalArgumentException(msg);
- }
+ return accumulator == null ? inputField : accumulator;
}
- return boolAnd;
+ return (boolean) accumulator && (boolean) inputField;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldBoolOrAgg.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldBoolOrAgg.java
index 03a0c1c3c..105f42191 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldBoolOrAgg.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldBoolOrAgg.java
@@ -18,43 +18,22 @@
package org.apache.paimon.mergetree.compact.aggregate;
-import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.BooleanType;
/** bool_or aggregate a field of a row. */
public class FieldBoolOrAgg extends FieldAggregator {
- public static final String NAME = "bool_or";
-
private static final long serialVersionUID = 1L;
- public FieldBoolOrAgg(DataType dataType) {
- super(dataType);
- }
-
- @Override
- public String name() {
- return NAME;
+ public FieldBoolOrAgg(String name, BooleanType dataType) {
+ super(name, dataType);
}
@Override
public Object agg(Object accumulator, Object inputField) {
- Object boolOr;
-
if (accumulator == null || inputField == null) {
- boolOr = (inputField == null) ? accumulator : inputField;
- } else {
- switch (fieldType.getTypeRoot()) {
- case BOOLEAN:
- boolOr = (boolean) accumulator || (boolean) inputField;
- break;
- default:
- String msg =
- String.format(
- "type %s not support in %s",
- fieldType.getTypeRoot().toString(),
this.getClass().getName());
- throw new IllegalArgumentException(msg);
- }
+ return accumulator == null ? inputField : accumulator;
}
- return boolOr;
+ return (boolean) accumulator || (boolean) inputField;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java
index 64ef223fa..afe5e05e7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldCollectAgg.java
@@ -43,16 +43,14 @@ import static
org.apache.paimon.codegen.CodeGenUtils.newRecordEqualiser;
/** Collect elements into an ARRAY. */
public class FieldCollectAgg extends FieldAggregator {
- public static final String NAME = "collect";
-
private static final long serialVersionUID = 1L;
private final boolean distinct;
private final InternalArray.ElementGetter elementGetter;
@Nullable private final BiFunction<Object, Object, Boolean> equaliser;
- public FieldCollectAgg(ArrayType dataType, boolean distinct) {
- super(dataType);
+ public FieldCollectAgg(String name, ArrayType dataType, boolean distinct) {
+ super(name, dataType);
this.distinct = distinct;
this.elementGetter =
InternalArray.createElementGetter(dataType.getElementType());
@@ -84,11 +82,6 @@ public class FieldCollectAgg extends FieldAggregator {
}
}
- @Override
- public String name() {
- return NAME;
- }
-
@Override
public Object aggReversed(Object accumulator, Object inputField) {
// we don't need to actually do the reverse here for this agg
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldFirstNonNullValueAgg.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldFirstNonNullValueAgg.java
index 0bd950bbf..273af1a95 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldFirstNonNullValueAgg.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldFirstNonNullValueAgg.java
@@ -23,20 +23,12 @@ import org.apache.paimon.types.DataType;
/** first non-null value aggregate a field of a row. */
public class FieldFirstNonNullValueAgg extends FieldAggregator {
- public static final String NAME = "first_non_null_value";
- public static final String LEGACY_NAME = "first_not_null_value";
-
private static final long serialVersionUID = 1L;
private boolean initialized;
- public FieldFirstNonNullValueAgg(DataType dataType) {
- super(dataType);
- }
-
- @Override
- public String name() {
- return NAME;
+ public FieldFirstNonNullValueAgg(String name, DataType dataType) {
+ super(name, dataType);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldFirstValueAgg.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldFirstValueAgg.java
index d31a6e0ae..436f841d9 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldFirstValueAgg.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldFirstValueAgg.java
@@ -23,19 +23,12 @@ import org.apache.paimon.types.DataType;
/** first value aggregate a field of a row. */
public class FieldFirstValueAgg extends FieldAggregator {
- public static final String NAME = "first_value";
-
private static final long serialVersionUID = 1L;
private boolean initialized;
- public FieldFirstValueAgg(DataType dataType) {
- super(dataType);
- }
-
- @Override
- public String name() {
- return NAME;
+ public FieldFirstValueAgg(String name, DataType dataType) {
+ super(name, dataType);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldHllSketchAgg.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldHllSketchAgg.java
index 0ccf4af64..aa399ac37 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldHllSketchAgg.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldHllSketchAgg.java
@@ -24,25 +24,14 @@ import org.apache.paimon.utils.HllSketchUtil;
/** HllSketch aggregate a field of a row. */
public class FieldHllSketchAgg extends FieldAggregator {
- public static final String NAME = "hll_sketch";
-
private static final long serialVersionUID = 1L;
- public FieldHllSketchAgg(VarBinaryType dataType) {
- super(dataType);
- }
-
- @Override
- public String name() {
- return NAME;
+ public FieldHllSketchAgg(String name, VarBinaryType dataType) {
+ super(name, dataType);
}
@Override
public Object agg(Object accumulator, Object inputField) {
- if (accumulator == null && inputField == null) {
- return null;
- }
-
if (accumulator == null || inputField == null) {
return accumulator == null ? inputField : accumulator;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldIgnoreRetractAgg.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldIgnoreRetractAgg.java
index 40772c6d1..e98e64852 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldIgnoreRetractAgg.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldIgnoreRetractAgg.java
@@ -21,20 +21,15 @@ package org.apache.paimon.mergetree.compact.aggregate;
/** An aggregator which ignores retraction messages. */
public class FieldIgnoreRetractAgg extends FieldAggregator {
- private final FieldAggregator aggregator;
-
private static final long serialVersionUID = 1L;
+ private final FieldAggregator aggregator;
+
public FieldIgnoreRetractAgg(FieldAggregator aggregator) {
- super(aggregator.fieldType);
+ super(aggregator.name, aggregator.fieldType);
this.aggregator = aggregator;
}
- @Override
- public String name() {
- return aggregator.name();
- }
-
@Override
public Object agg(Object accumulator, Object inputField) {
return aggregator.agg(accumulator, inputField);
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldLastNonNullValueAgg.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldLastNonNullValueAgg.java
index e189c20b2..cc5383739 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldLastNonNullValueAgg.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldLastNonNullValueAgg.java
@@ -23,17 +23,10 @@ import org.apache.paimon.types.DataType;
/** last non-null value aggregate a field of a row. */
public class FieldLastNonNullValueAgg extends FieldAggregator {
- public static final String NAME = "last_non_null_value";
-
private static final long serialVersionUID = 1L;
- public FieldLastNonNullValueAgg(DataType dataType) {
- super(dataType);
- }
-
- @Override
- public String name() {
- return NAME;
+ public FieldLastNonNullValueAgg(String name, DataType dataType) {
+ super(name, dataType);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldLastValueAgg.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldLastValueAgg.java
index 22c2b3da1..592f080fb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldLastValueAgg.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldLastValueAgg.java
@@ -23,17 +23,10 @@ import org.apache.paimon.types.DataType;
/** last value aggregate a field of a row. */
public class FieldLastValueAgg extends FieldAggregator {
- public static final String NAME = "last_value";
-
private static final long serialVersionUID = 1L;
- public FieldLastValueAgg(DataType dataType) {
- super(dataType);
- }
-
- @Override
- public String name() {
- return NAME;
+ public FieldLastValueAgg(String name, DataType dataType) {
+ super(name, dataType);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java
index 3bde7e7cc..a01891501 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java
@@ -20,53 +20,32 @@ package org.apache.paimon.mergetree.compact.aggregate;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.BinaryString;
-import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.StringUtils;
/** listagg aggregate a field of a row. */
public class FieldListaggAgg extends FieldAggregator {
- public static final String NAME = "listagg";
-
private static final long serialVersionUID = 1L;
private final String delimiter;
- public FieldListaggAgg(DataType dataType, CoreOptions options, String
field) {
- super(dataType);
+ public FieldListaggAgg(String name, VarCharType dataType, CoreOptions
options, String field) {
+ super(name, dataType);
this.delimiter = options.fieldListAggDelimiter(field);
}
- @Override
- public String name() {
- return NAME;
- }
-
@Override
public Object agg(Object accumulator, Object inputField) {
- Object concatenate;
-
- if (inputField == null || accumulator == null) {
- concatenate = (inputField == null) ? accumulator : inputField;
- } else {
- // ordered by type root definition
- switch (fieldType.getTypeRoot()) {
- case VARCHAR:
- // TODO: ensure not VARCHAR(n)
- BinaryString mergeFieldSD = (BinaryString) accumulator;
- BinaryString inFieldSD = (BinaryString) inputField;
- concatenate =
- StringUtils.concat(
- mergeFieldSD,
BinaryString.fromString(delimiter), inFieldSD);
- break;
- default:
- String msg =
- String.format(
- "type %s not support in %s",
- fieldType.getTypeRoot().toString(),
this.getClass().getName());
- throw new IllegalArgumentException(msg);
- }
+ if (accumulator == null || inputField == null) {
+ return accumulator == null ? inputField : accumulator;
}
- return concatenate;
+ // ordered by type root definition
+
+ // TODO: ensure not VARCHAR(n)
+ BinaryString mergeFieldSD = (BinaryString) accumulator;
+ BinaryString inFieldSD = (BinaryString) inputField;
+
+ return StringUtils.concat(mergeFieldSD,
BinaryString.fromString(delimiter), inFieldSD);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMaxAgg.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMaxAgg.java
index 34e7dd139..06628244e 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMaxAgg.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMaxAgg.java
@@ -25,33 +25,20 @@ import org.apache.paimon.utils.InternalRowUtils;
/** max aggregate a field of a row. */
public class FieldMaxAgg extends FieldAggregator {
- public static final String NAME = "max";
-
private static final long serialVersionUID = 1L;
- public FieldMaxAgg(DataType dataType) {
- super(dataType);
- }
-
- @Override
- public String name() {
- return NAME;
+ public FieldMaxAgg(String name, DataType dataType) {
+ super(name, dataType);
}
@Override
public Object agg(Object accumulator, Object inputField) {
- Object max;
-
if (accumulator == null || inputField == null) {
- max = (accumulator == null ? inputField : accumulator);
- } else {
- DataTypeRoot type = fieldType.getTypeRoot();
- if (InternalRowUtils.compare(accumulator, inputField, type) < 0) {
- max = inputField;
- } else {
- max = accumulator;
- }
+ return accumulator == null ? inputField : accumulator;
}
- return max;
+ DataTypeRoot type = fieldType.getTypeRoot();
+ return InternalRowUtils.compare(accumulator, inputField, type) < 0
+ ? inputField
+ : accumulator;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java
index f597e8de5..9965339af 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMergeMapAgg.java
@@ -31,25 +31,18 @@ import java.util.Set;
/** Merge two maps. */
public class FieldMergeMapAgg extends FieldAggregator {
- public static final String NAME = "merge_map";
-
private static final long serialVersionUID = 1L;
private final InternalArray.ElementGetter keyGetter;
private final InternalArray.ElementGetter valueGetter;
- public FieldMergeMapAgg(MapType dataType) {
- super(dataType);
+ public FieldMergeMapAgg(String name, MapType dataType) {
+ super(name, dataType);
this.keyGetter =
InternalArray.createElementGetter(dataType.getKeyType());
this.valueGetter =
InternalArray.createElementGetter(dataType.getValueType());
}
- @Override
- public String name() {
- return NAME;
- }
-
@Override
public Object agg(Object accumulator, Object inputField) {
if (accumulator == null || inputField == null) {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMinAgg.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMinAgg.java
index 4002966db..01b0403ba 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMinAgg.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldMinAgg.java
@@ -25,33 +25,21 @@ import org.apache.paimon.utils.InternalRowUtils;
/** min aggregate a field of a row. */
public class FieldMinAgg extends FieldAggregator {
- public static final String NAME = "min";
-
private static final long serialVersionUID = 1L;
- public FieldMinAgg(DataType dataType) {
- super(dataType);
- }
-
- @Override
- public String name() {
- return NAME;
+ public FieldMinAgg(String name, DataType dataType) {
+ super(name, dataType);
}
@Override
public Object agg(Object accumulator, Object inputField) {
- Object min;
-
if (accumulator == null || inputField == null) {
- min = (accumulator == null ? inputField : accumulator);
- } else {
- DataTypeRoot type = fieldType.getTypeRoot();
- if (InternalRowUtils.compare(accumulator, inputField, type) < 0) {
- min = accumulator;
- } else {
- min = inputField;
- }
+ return accumulator == null ? inputField : accumulator;
}
- return min;
+
+ DataTypeRoot type = fieldType.getTypeRoot();
+ return InternalRowUtils.compare(accumulator, inputField, type) < 0
+ ? accumulator
+ : inputField;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java
index 3cd29f127..005bf7b17 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldNestedUpdateAgg.java
@@ -44,8 +44,6 @@ import static
org.apache.paimon.utils.Preconditions.checkNotNull;
*/
public class FieldNestedUpdateAgg extends FieldAggregator {
- public static final String NAME = "nested_update";
-
private static final long serialVersionUID = 1L;
private final int nestedFields;
@@ -53,8 +51,8 @@ public class FieldNestedUpdateAgg extends FieldAggregator {
@Nullable private final Projection keyProjection;
@Nullable private final RecordEqualiser elementEqualiser;
- public FieldNestedUpdateAgg(ArrayType dataType, List<String> nestedKey) {
- super(dataType);
+ public FieldNestedUpdateAgg(String name, ArrayType dataType, List<String>
nestedKey) {
+ super(name, dataType);
RowType nestedType = (RowType) dataType.getElementType();
this.nestedFields = nestedType.getFieldCount();
if (nestedKey.isEmpty()) {
@@ -66,18 +64,10 @@ public class FieldNestedUpdateAgg extends FieldAggregator {
}
}
- @Override
- public String name() {
- return NAME;
- }
-
@Override
public Object agg(Object accumulator, Object inputField) {
- if (accumulator == null) {
- return inputField;
- }
- if (inputField == null) {
- return accumulator;
+ if (accumulator == null || inputField == null) {
+ return accumulator == null ? inputField : accumulator;
}
InternalArray acc = (InternalArray) accumulator;
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldPrimaryKeyAgg.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldPrimaryKeyAgg.java
index e8053be1e..3db4e9b32 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldPrimaryKeyAgg.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldPrimaryKeyAgg.java
@@ -23,17 +23,10 @@ import org.apache.paimon.types.DataType;
/** primary key aggregate a field of a row. */
public class FieldPrimaryKeyAgg extends FieldAggregator {
- public static final String NAME = "primary-key";
-
private static final long serialVersionUID = 1L;
- public FieldPrimaryKeyAgg(DataType dataType) {
- super(dataType);
- }
-
- @Override
- public String name() {
- return NAME;
+ public FieldPrimaryKeyAgg(String name, DataType dataType) {
+ super(name, dataType);
}
@Override
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldProductAgg.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldProductAgg.java
index c3fb18232..26a0c0c52 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldProductAgg.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldProductAgg.java
@@ -28,65 +28,58 @@ import static org.apache.paimon.data.Decimal.fromBigDecimal;
/** product value aggregate a field of a row. */
public class FieldProductAgg extends FieldAggregator {
- public static final String NAME = "product";
-
private static final long serialVersionUID = 1L;
- public FieldProductAgg(DataType dataType) {
- super(dataType);
- }
-
- @Override
- public String name() {
- return NAME;
+ public FieldProductAgg(String name, DataType dataType) {
+ super(name, dataType);
}
@Override
public Object agg(Object accumulator, Object inputField) {
+ if (accumulator == null || inputField == null) {
+ return accumulator == null ? inputField : accumulator;
+ }
+
Object product;
- if (accumulator == null || inputField == null) {
- product = (accumulator == null ? inputField : accumulator);
- } else {
- // ordered by type root definition
- switch (fieldType.getTypeRoot()) {
- case DECIMAL:
- Decimal mergeFieldDD = (Decimal) accumulator;
- Decimal inFieldDD = (Decimal) inputField;
- assert mergeFieldDD.scale() == inFieldDD.scale()
- : "Inconsistent scale of aggregate Decimal!";
- assert mergeFieldDD.precision() == inFieldDD.precision()
- : "Inconsistent precision of aggregate Decimal!";
- BigDecimal bigDecimal = mergeFieldDD.toBigDecimal();
- BigDecimal bigDecimal1 = inFieldDD.toBigDecimal();
- BigDecimal mul = bigDecimal.multiply(bigDecimal1);
- product = fromBigDecimal(mul, mergeFieldDD.precision(),
mergeFieldDD.scale());
- break;
- case TINYINT:
- product = (byte) ((byte) accumulator * (byte) inputField);
- break;
- case SMALLINT:
- product = (short) ((short) accumulator * (short)
inputField);
- break;
- case INTEGER:
- product = (int) accumulator * (int) inputField;
- break;
- case BIGINT:
- product = (long) accumulator * (long) inputField;
- break;
- case FLOAT:
- product = (float) accumulator * (float) inputField;
- break;
- case DOUBLE:
- product = (double) accumulator * (double) inputField;
- break;
- default:
- String msg =
- String.format(
- "type %s not support in %s",
- fieldType.getTypeRoot().toString(),
this.getClass().getName());
- throw new IllegalArgumentException(msg);
- }
+ // ordered by type root definition
+ switch (fieldType.getTypeRoot()) {
+ case DECIMAL:
+ Decimal mergeFieldDD = (Decimal) accumulator;
+ Decimal inFieldDD = (Decimal) inputField;
+ assert mergeFieldDD.scale() == inFieldDD.scale()
+ : "Inconsistent scale of aggregate Decimal!";
+ assert mergeFieldDD.precision() == inFieldDD.precision()
+ : "Inconsistent precision of aggregate Decimal!";
+ BigDecimal bigDecimal = mergeFieldDD.toBigDecimal();
+ BigDecimal bigDecimal1 = inFieldDD.toBigDecimal();
+ BigDecimal mul = bigDecimal.multiply(bigDecimal1);
+ product = fromBigDecimal(mul, mergeFieldDD.precision(),
mergeFieldDD.scale());
+ break;
+ case TINYINT:
+ product = (byte) ((byte) accumulator * (byte) inputField);
+ break;
+ case SMALLINT:
+ product = (short) ((short) accumulator * (short) inputField);
+ break;
+ case INTEGER:
+ product = (int) accumulator * (int) inputField;
+ break;
+ case BIGINT:
+ product = (long) accumulator * (long) inputField;
+ break;
+ case FLOAT:
+ product = (float) accumulator * (float) inputField;
+ break;
+ case DOUBLE:
+ product = (double) accumulator * (double) inputField;
+ break;
+ default:
+ String msg =
+ String.format(
+ "type %s not support in %s",
+ fieldType.getTypeRoot().toString(),
this.getClass().getName());
+ throw new IllegalArgumentException(msg);
}
return product;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldRoaringBitmap32Agg.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldRoaringBitmap32Agg.java
index 15cbc2b96..ef7ac20e8 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldRoaringBitmap32Agg.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldRoaringBitmap32Agg.java
@@ -26,29 +26,18 @@ import java.io.IOException;
/** roaring bitmap aggregate a field of a row. */
public class FieldRoaringBitmap32Agg extends FieldAggregator {
- public static final String NAME = "rbm32";
-
private static final long serialVersionUID = 1L;
private final RoaringBitmap32 roaringBitmapAcc;
private final RoaringBitmap32 roaringBitmapInput;
- public FieldRoaringBitmap32Agg(VarBinaryType dataType) {
- super(dataType);
+ public FieldRoaringBitmap32Agg(String name, VarBinaryType dataType) {
+ super(name, dataType);
this.roaringBitmapAcc = new RoaringBitmap32();
this.roaringBitmapInput = new RoaringBitmap32();
}
- @Override
- public String name() {
- return NAME;
- }
-
@Override
public Object agg(Object accumulator, Object inputField) {
- if (accumulator == null && inputField == null) {
- return null;
- }
-
if (accumulator == null || inputField == null) {
return accumulator == null ? inputField : accumulator;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldRoaringBitmap64Agg.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldRoaringBitmap64Agg.java
index aa9cff1fe..b1d096497 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldRoaringBitmap64Agg.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldRoaringBitmap64Agg.java
@@ -26,29 +26,18 @@ import java.io.IOException;
/** roaring bitmap aggregate a field of a row. */
public class FieldRoaringBitmap64Agg extends FieldAggregator {
- public static final String NAME = "rbm64";
-
private static final long serialVersionUID = 1L;
private final RoaringBitmap64 roaringBitmapAcc;
private final RoaringBitmap64 roaringBitmapInput;
- public FieldRoaringBitmap64Agg(VarBinaryType dataType) {
- super(dataType);
+ public FieldRoaringBitmap64Agg(String name, VarBinaryType dataType) {
+ super(name, dataType);
this.roaringBitmapAcc = new RoaringBitmap64();
this.roaringBitmapInput = new RoaringBitmap64();
}
- @Override
- public String name() {
- return NAME;
- }
-
@Override
public Object agg(Object accumulator, Object inputField) {
- if (accumulator == null && inputField == null) {
- return null;
- }
-
if (accumulator == null || inputField == null) {
return accumulator == null ? inputField : accumulator;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldSumAgg.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldSumAgg.java
index 18e4bbeaf..4b3ad12ae 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldSumAgg.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldSumAgg.java
@@ -25,118 +25,109 @@ import org.apache.paimon.utils.DecimalUtils;
/** sum aggregate a field of a row. */
public class FieldSumAgg extends FieldAggregator {
- public static final String NAME = "sum";
-
private static final long serialVersionUID = 1L;
- public FieldSumAgg(DataType dataType) {
- super(dataType);
- }
-
- @Override
- public String name() {
- return NAME;
+ public FieldSumAgg(String name, DataType dataType) {
+ super(name, dataType);
}
@Override
public Object agg(Object accumulator, Object inputField) {
+ if (accumulator == null || inputField == null) {
+ return accumulator == null ? inputField : accumulator;
+ }
Object sum;
- if (accumulator == null || inputField == null) {
- sum = (accumulator == null ? inputField : accumulator);
- } else {
- // ordered by type root definition
- switch (fieldType.getTypeRoot()) {
- case DECIMAL:
- Decimal mergeFieldDD = (Decimal) accumulator;
- Decimal inFieldDD = (Decimal) inputField;
- assert mergeFieldDD.scale() == inFieldDD.scale()
- : "Inconsistent scale of aggregate Decimal!";
- assert mergeFieldDD.precision() == inFieldDD.precision()
- : "Inconsistent precision of aggregate Decimal!";
- sum =
- DecimalUtils.add(
- mergeFieldDD,
- inFieldDD,
- mergeFieldDD.precision(),
- mergeFieldDD.scale());
- break;
- case TINYINT:
- sum = (byte) ((byte) accumulator + (byte) inputField);
- break;
- case SMALLINT:
- sum = (short) ((short) accumulator + (short) inputField);
- break;
- case INTEGER:
- sum = (int) accumulator + (int) inputField;
- break;
- case BIGINT:
- sum = (long) accumulator + (long) inputField;
- break;
- case FLOAT:
- sum = (float) accumulator + (float) inputField;
- break;
- case DOUBLE:
- sum = (double) accumulator + (double) inputField;
- break;
- default:
- String msg =
- String.format(
- "type %s not support in %s",
- fieldType.getTypeRoot().toString(),
this.getClass().getName());
- throw new IllegalArgumentException(msg);
- }
+ // ordered by type root definition
+ switch (fieldType.getTypeRoot()) {
+ case DECIMAL:
+ Decimal mergeFieldDD = (Decimal) accumulator;
+ Decimal inFieldDD = (Decimal) inputField;
+ assert mergeFieldDD.scale() == inFieldDD.scale()
+ : "Inconsistent scale of aggregate Decimal!";
+ assert mergeFieldDD.precision() == inFieldDD.precision()
+ : "Inconsistent precision of aggregate Decimal!";
+ sum =
+ DecimalUtils.add(
+ mergeFieldDD,
+ inFieldDD,
+ mergeFieldDD.precision(),
+ mergeFieldDD.scale());
+ break;
+ case TINYINT:
+ sum = (byte) ((byte) accumulator + (byte) inputField);
+ break;
+ case SMALLINT:
+ sum = (short) ((short) accumulator + (short) inputField);
+ break;
+ case INTEGER:
+ sum = (int) accumulator + (int) inputField;
+ break;
+ case BIGINT:
+ sum = (long) accumulator + (long) inputField;
+ break;
+ case FLOAT:
+ sum = (float) accumulator + (float) inputField;
+ break;
+ case DOUBLE:
+ sum = (double) accumulator + (double) inputField;
+ break;
+ default:
+ String msg =
+ String.format(
+ "type %s not support in %s",
+ fieldType.getTypeRoot().toString(),
this.getClass().getName());
+ throw new IllegalArgumentException(msg);
}
return sum;
}
@Override
public Object retract(Object accumulator, Object inputField) {
- Object sum;
if (accumulator == null || inputField == null) {
- sum = (accumulator == null ? negative(inputField) : accumulator);
- } else {
- switch (fieldType.getTypeRoot()) {
- case DECIMAL:
- Decimal mergeFieldDD = (Decimal) accumulator;
- Decimal inFieldDD = (Decimal) inputField;
- assert mergeFieldDD.scale() == inFieldDD.scale()
- : "Inconsistent scale of aggregate Decimal!";
- assert mergeFieldDD.precision() == inFieldDD.precision()
- : "Inconsistent precision of aggregate Decimal!";
- sum =
- DecimalUtils.subtract(
- mergeFieldDD,
- inFieldDD,
- mergeFieldDD.precision(),
- mergeFieldDD.scale());
- break;
- case TINYINT:
- sum = (byte) ((byte) accumulator - (byte) inputField);
- break;
- case SMALLINT:
- sum = (short) ((short) accumulator - (short) inputField);
- break;
- case INTEGER:
- sum = (int) accumulator - (int) inputField;
- break;
- case BIGINT:
- sum = (long) accumulator - (long) inputField;
- break;
- case FLOAT:
- sum = (float) accumulator - (float) inputField;
- break;
- case DOUBLE:
- sum = (double) accumulator - (double) inputField;
- break;
- default:
- String msg =
- String.format(
- "type %s not support in %s",
- fieldType.getTypeRoot().toString(),
this.getClass().getName());
- throw new IllegalArgumentException(msg);
- }
+ return (accumulator == null ? negative(inputField) : accumulator);
+ }
+ Object sum;
+ switch (fieldType.getTypeRoot()) {
+ case DECIMAL:
+ Decimal mergeFieldDD = (Decimal) accumulator;
+ Decimal inFieldDD = (Decimal) inputField;
+ assert mergeFieldDD.scale() == inFieldDD.scale()
+ : "Inconsistent scale of aggregate Decimal!";
+ assert mergeFieldDD.precision() == inFieldDD.precision()
+ : "Inconsistent precision of aggregate Decimal!";
+ sum =
+ DecimalUtils.subtract(
+ mergeFieldDD,
+ inFieldDD,
+ mergeFieldDD.precision(),
+ mergeFieldDD.scale());
+ break;
+ case TINYINT:
+ sum = (byte) ((byte) accumulator - (byte) inputField);
+ break;
+ case SMALLINT:
+ sum = (short) ((short) accumulator - (short) inputField);
+ break;
+ case INTEGER:
+ sum = (int) accumulator - (int) inputField;
+ break;
+ case BIGINT:
+ sum = (long) accumulator - (long) inputField;
+ break;
+ case FLOAT:
+ sum = (float) accumulator - (float) inputField;
+ break;
+ case DOUBLE:
+ sum = (double) accumulator - (double) inputField;
+ break;
+ default:
+ String msg =
+ String.format(
+ "type %s not support in %s",
+ fieldType.getTypeRoot().toString(),
this.getClass().getName());
+ throw new IllegalArgumentException(msg);
}
return sum;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldThetaSketchAgg.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldThetaSketchAgg.java
index 7182a6744..9622b4aff 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldThetaSketchAgg.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldThetaSketchAgg.java
@@ -24,25 +24,14 @@ import org.apache.paimon.utils.ThetaSketch;
/** ThetaSketch aggregate a field of a row. */
public class FieldThetaSketchAgg extends FieldAggregator {
- public static final String NAME = "theta_sketch";
-
private static final long serialVersionUID = 1L;
- public FieldThetaSketchAgg(VarBinaryType dataType) {
- super(dataType);
- }
-
- @Override
- public String name() {
- return NAME;
+ public FieldThetaSketchAgg(String name, VarBinaryType dataType) {
+ super(name, dataType);
}
@Override
public Object agg(Object accumulator, Object inputField) {
- if (accumulator == null && inputField == null) {
- return null;
- }
-
if (accumulator == null || inputField == null) {
return accumulator == null ? inputField : accumulator;
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldAggregatorFactory.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldAggregatorFactory.java
index 44f2439fe..d2ce0e476 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldAggregatorFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldAggregatorFactory.java
@@ -20,13 +20,52 @@ package
org.apache.paimon.mergetree.compact.aggregate.factory;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.factories.Factory;
+import org.apache.paimon.factories.FactoryUtil;
import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
+import org.apache.paimon.mergetree.compact.aggregate.FieldIgnoreRetractAgg;
import org.apache.paimon.types.DataType;
+import javax.annotation.Nullable;
+
/** Factory for {@link FieldAggregator}. */
public interface FieldAggregatorFactory extends Factory {
FieldAggregator create(DataType fieldType, CoreOptions options, String
field);
String identifier();
+
+ static FieldAggregator create(
+ DataType fieldType,
+ @Nullable String strAgg,
+ boolean ignoreRetract,
+ boolean isPrimaryKey,
+ CoreOptions options,
+ String field) {
+ FieldAggregator fieldAggregator;
+ if (isPrimaryKey) {
+ strAgg = FieldPrimaryKeyAggFactory.NAME;
+ } else if (strAgg == null) {
+ strAgg = FieldLastNonNullValueAggFactory.NAME;
+ }
+
+ FieldAggregatorFactory fieldAggregatorFactory =
+ FactoryUtil.discoverFactory(
+ FieldAggregator.class.getClassLoader(),
+ FieldAggregatorFactory.class,
+ strAgg);
+ if (fieldAggregatorFactory == null) {
+ throw new RuntimeException(
+ String.format(
+ "Use unsupported aggregation: %s or spell
aggregate function incorrectly!",
+ strAgg));
+ }
+
+ fieldAggregator = fieldAggregatorFactory.create(fieldType, options,
field);
+
+ if (ignoreRetract) {
+ fieldAggregator = new FieldIgnoreRetractAgg(fieldAggregator);
+ }
+
+ return fieldAggregator;
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldBoolAndAggFactory.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldBoolAndAggFactory.java
index 4a3a2dc88..45bb8708d 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldBoolAndAggFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldBoolAndAggFactory.java
@@ -19,19 +19,28 @@
package org.apache.paimon.mergetree.compact.aggregate.factory;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
import org.apache.paimon.mergetree.compact.aggregate.FieldBoolAndAgg;
+import org.apache.paimon.types.BooleanType;
import org.apache.paimon.types.DataType;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
/** Factory for #{@link FieldBoolAndAgg}. */
public class FieldBoolAndAggFactory implements FieldAggregatorFactory {
+
+ public static final String NAME = "bool_and";
+
@Override
- public FieldAggregator create(DataType fieldType, CoreOptions options,
String field) {
- return new FieldBoolAndAgg(fieldType);
+ public FieldBoolAndAgg create(DataType fieldType, CoreOptions options,
String field) {
+ checkArgument(
+ fieldType instanceof BooleanType,
+ "Data type for bool and column must be 'BooleanType' but was
'%s'.",
+ fieldType);
+ return new FieldBoolAndAgg(identifier(), (BooleanType) fieldType);
}
@Override
public String identifier() {
- return FieldBoolAndAgg.NAME;
+ return NAME;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldBoolOrAggFactory.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldBoolOrAggFactory.java
index 488325eae..266ccad6a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldBoolOrAggFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldBoolOrAggFactory.java
@@ -19,19 +19,28 @@
package org.apache.paimon.mergetree.compact.aggregate.factory;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
import org.apache.paimon.mergetree.compact.aggregate.FieldBoolOrAgg;
+import org.apache.paimon.types.BooleanType;
import org.apache.paimon.types.DataType;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
/** Factory for #{@link FieldBoolOrAgg}. */
public class FieldBoolOrAggFactory implements FieldAggregatorFactory {
+
+ public static final String NAME = "bool_or";
+
@Override
- public FieldAggregator create(DataType fieldType, CoreOptions options,
String field) {
- return new FieldBoolOrAgg(fieldType);
+ public FieldBoolOrAgg create(DataType fieldType, CoreOptions options,
String field) {
+ checkArgument(
+ fieldType instanceof BooleanType,
+ "Data type for bool or column must be 'BooleanType' but was
'%s'.",
+ fieldType);
+ return new FieldBoolOrAgg(identifier(), (BooleanType) fieldType);
}
@Override
public String identifier() {
- return FieldBoolOrAgg.NAME;
+ return NAME;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldCollectAggFactory.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldCollectAggFactory.java
index b20453e07..a4325d165 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldCollectAggFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldCollectAggFactory.java
@@ -19,7 +19,6 @@
package org.apache.paimon.mergetree.compact.aggregate.factory;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
import org.apache.paimon.mergetree.compact.aggregate.FieldCollectAgg;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataType;
@@ -28,17 +27,21 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
/** Factory for #{@link FieldCollectAgg}. */
public class FieldCollectAggFactory implements FieldAggregatorFactory {
+
+ public static final String NAME = "collect";
+
@Override
- public FieldAggregator create(DataType fieldType, CoreOptions options,
String field) {
+ public FieldCollectAgg create(DataType fieldType, CoreOptions options,
String field) {
checkArgument(
fieldType instanceof ArrayType,
"Data type for collect column must be 'Array' but was '%s'.",
fieldType);
- return new FieldCollectAgg((ArrayType) fieldType,
options.fieldCollectAggDistinct(field));
+ return new FieldCollectAgg(
+ identifier(), (ArrayType) fieldType,
options.fieldCollectAggDistinct(field));
}
@Override
public String identifier() {
- return FieldCollectAgg.NAME;
+ return NAME;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstNonNullValueAggFactory.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstNonNullValueAggFactory.java
index 141da6342..51e3a6d62 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstNonNullValueAggFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstNonNullValueAggFactory.java
@@ -19,19 +19,21 @@
package org.apache.paimon.mergetree.compact.aggregate.factory;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
import org.apache.paimon.mergetree.compact.aggregate.FieldFirstNonNullValueAgg;
import org.apache.paimon.types.DataType;
/** Factory for #{@link FieldFirstNonNullValueAgg}. */
public class FieldFirstNonNullValueAggFactory implements
FieldAggregatorFactory {
+
+ public static final String NAME = "first_non_null_value";
+
@Override
- public FieldAggregator create(DataType fieldType, CoreOptions options,
String field) {
- return new FieldFirstNonNullValueAgg(fieldType);
+ public FieldFirstNonNullValueAgg create(DataType fieldType, CoreOptions
options, String field) {
+ return new FieldFirstNonNullValueAgg(identifier(), fieldType);
}
@Override
public String identifier() {
- return FieldFirstNonNullValueAgg.NAME;
+ return NAME;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstNonNullValueAggLegacyFactory.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstNonNullValueAggLegacyFactory.java
index 1d92dd3ec..507ecbb5c 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstNonNullValueAggLegacyFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstNonNullValueAggLegacyFactory.java
@@ -25,13 +25,16 @@ import org.apache.paimon.types.DataType;
/** Factory for legacy name of #{@link FieldFirstNonNullValueAgg}. */
public class FieldFirstNonNullValueAggLegacyFactory implements
FieldAggregatorFactory {
+
+ public static final String LEGACY_NAME = "first_not_null_value";
+
@Override
public FieldAggregator create(DataType fieldType, CoreOptions options,
String field) {
- return new FieldFirstNonNullValueAgg(fieldType);
+ return new FieldFirstNonNullValueAgg(identifier(), fieldType);
}
@Override
public String identifier() {
- return FieldFirstNonNullValueAgg.LEGACY_NAME;
+ return LEGACY_NAME;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstValueAggFactory.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstValueAggFactory.java
index cc36928c6..84db12ffc 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstValueAggFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldFirstValueAggFactory.java
@@ -19,19 +19,21 @@
package org.apache.paimon.mergetree.compact.aggregate.factory;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
import org.apache.paimon.mergetree.compact.aggregate.FieldFirstValueAgg;
import org.apache.paimon.types.DataType;
/** Factory for #{@link FieldFirstValueAgg}. */
public class FieldFirstValueAggFactory implements FieldAggregatorFactory {
+
+ public static final String NAME = "first_value";
+
@Override
- public FieldAggregator create(DataType fieldType, CoreOptions options,
String field) {
- return new FieldFirstValueAgg(fieldType);
+ public FieldFirstValueAgg create(DataType fieldType, CoreOptions options,
String field) {
+ return new FieldFirstValueAgg(identifier(), fieldType);
}
@Override
public String identifier() {
- return FieldFirstValueAgg.NAME;
+ return NAME;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldHllSketchAggFactory.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldHllSketchAggFactory.java
index 9f57abaee..5777c6a41 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldHllSketchAggFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldHllSketchAggFactory.java
@@ -19,7 +19,6 @@
package org.apache.paimon.mergetree.compact.aggregate.factory;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
import org.apache.paimon.mergetree.compact.aggregate.FieldHllSketchAgg;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.VarBinaryType;
@@ -28,17 +27,20 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
/** Factory for #{@link FieldHllSketchAgg}. */
public class FieldHllSketchAggFactory implements FieldAggregatorFactory {
+
+ public static final String NAME = "hll_sketch";
+
@Override
- public FieldAggregator create(DataType fieldType, CoreOptions options,
String field) {
+ public FieldHllSketchAgg create(DataType fieldType, CoreOptions options,
String field) {
checkArgument(
fieldType instanceof VarBinaryType,
"Data type for hll sketch column must be 'VarBinaryType' but
was '%s'.",
fieldType);
- return new FieldHllSketchAgg((VarBinaryType) fieldType);
+ return new FieldHllSketchAgg(identifier(), (VarBinaryType) fieldType);
}
@Override
public String identifier() {
- return FieldHllSketchAgg.NAME;
+ return NAME;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldLastNonNullValueAggFactory.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldLastNonNullValueAggFactory.java
index e3e2ff079..bbc6402bb 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldLastNonNullValueAggFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldLastNonNullValueAggFactory.java
@@ -19,19 +19,21 @@
package org.apache.paimon.mergetree.compact.aggregate.factory;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
import org.apache.paimon.mergetree.compact.aggregate.FieldLastNonNullValueAgg;
import org.apache.paimon.types.DataType;
/** Factory for #{@link FieldLastNonNullValueAgg}. */
public class FieldLastNonNullValueAggFactory implements FieldAggregatorFactory
{
+
+ public static final String NAME = "last_non_null_value";
+
@Override
- public FieldAggregator create(DataType fieldType, CoreOptions options,
String field) {
- return new FieldLastNonNullValueAgg(fieldType);
+ public FieldLastNonNullValueAgg create(DataType fieldType, CoreOptions
options, String field) {
+ return new FieldLastNonNullValueAgg(identifier(), fieldType);
}
@Override
public String identifier() {
- return FieldLastNonNullValueAgg.NAME;
+ return NAME;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldLastValueAggFactory.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldLastValueAggFactory.java
index b3423a39e..c825825a1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldLastValueAggFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldLastValueAggFactory.java
@@ -19,19 +19,21 @@
package org.apache.paimon.mergetree.compact.aggregate.factory;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
import org.apache.paimon.mergetree.compact.aggregate.FieldLastValueAgg;
import org.apache.paimon.types.DataType;
/** Factory for #{@link FieldLastValueAgg}. */
public class FieldLastValueAggFactory implements FieldAggregatorFactory {
+
+ public static final String NAME = "last_value";
+
@Override
- public FieldAggregator create(DataType fieldType, CoreOptions options,
String field) {
- return new FieldLastValueAgg(fieldType);
+ public FieldLastValueAgg create(DataType fieldType, CoreOptions options,
String field) {
+ return new FieldLastValueAgg(identifier(), fieldType);
}
@Override
public String identifier() {
- return FieldLastValueAgg.NAME;
+ return NAME;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldListaggAggFactory.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldListaggAggFactory.java
index e5e85dbab..cdb9c128a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldListaggAggFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldListaggAggFactory.java
@@ -19,19 +19,28 @@
package org.apache.paimon.mergetree.compact.aggregate.factory;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
import org.apache.paimon.mergetree.compact.aggregate.FieldListaggAgg;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.VarCharType;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Factory for #{@link FieldListaggAgg}. */
public class FieldListaggAggFactory implements FieldAggregatorFactory {
+
+ public static final String NAME = "listagg";
+
@Override
- public FieldAggregator create(DataType fieldType, CoreOptions options,
String field) {
- return new FieldListaggAgg(fieldType, options, field);
+ public FieldListaggAgg create(DataType fieldType, CoreOptions options,
String field) {
+ checkArgument(
+ fieldType instanceof VarCharType,
+ "Data type for list agg column must be 'VarCharType' but was
'%s'.",
+ fieldType);
+ return new FieldListaggAgg(identifier(), (VarCharType) fieldType,
options, field);
}
@Override
public String identifier() {
- return FieldListaggAgg.NAME;
+ return NAME;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMaxAggFactory.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMaxAggFactory.java
index 2fda49a76..4e3c33171 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMaxAggFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMaxAggFactory.java
@@ -19,19 +19,21 @@
package org.apache.paimon.mergetree.compact.aggregate.factory;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
import org.apache.paimon.mergetree.compact.aggregate.FieldMaxAgg;
import org.apache.paimon.types.DataType;
/** Factory for #{@link FieldMaxAgg}. */
public class FieldMaxAggFactory implements FieldAggregatorFactory {
+
+ public static final String NAME = "max";
+
@Override
- public FieldAggregator create(DataType fieldType, CoreOptions options,
String field) {
- return new FieldMaxAgg(fieldType);
+ public FieldMaxAgg create(DataType fieldType, CoreOptions options, String
field) {
+ return new FieldMaxAgg(identifier(), fieldType);
}
@Override
public String identifier() {
- return FieldMaxAgg.NAME;
+ return NAME;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMergeMapAggFactory.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMergeMapAggFactory.java
index ac2409b25..e10602f61 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMergeMapAggFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMergeMapAggFactory.java
@@ -19,7 +19,6 @@
package org.apache.paimon.mergetree.compact.aggregate.factory;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
import org.apache.paimon.mergetree.compact.aggregate.FieldMergeMapAgg;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.MapType;
@@ -28,17 +27,20 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
/** Factory for #{@link FieldMergeMapAgg}. */
public class FieldMergeMapAggFactory implements FieldAggregatorFactory {
+
+ public static final String NAME = "merge_map";
+
@Override
- public FieldAggregator create(DataType fieldType, CoreOptions options,
String field) {
+ public FieldMergeMapAgg create(DataType fieldType, CoreOptions options,
String field) {
checkArgument(
fieldType instanceof MapType,
- "Data type of merge map column must be 'MAP' but was '%s'",
+ "Data type for merge map column must be 'MAP' but was '%s'",
fieldType);
- return new FieldMergeMapAgg((MapType) fieldType);
+ return new FieldMergeMapAgg(identifier(), (MapType) fieldType);
}
@Override
public String identifier() {
- return FieldMergeMapAgg.NAME;
+ return NAME;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMinAggFactory.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMinAggFactory.java
index e939f8a73..4ac7c08b1 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMinAggFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldMinAggFactory.java
@@ -19,19 +19,21 @@
package org.apache.paimon.mergetree.compact.aggregate.factory;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
import org.apache.paimon.mergetree.compact.aggregate.FieldMinAgg;
import org.apache.paimon.types.DataType;
/** Factory for #{@link FieldMinAgg}. */
public class FieldMinAggFactory implements FieldAggregatorFactory {
+
+ public static final String NAME = "min";
+
@Override
- public FieldAggregator create(DataType fieldType, CoreOptions options,
String field) {
- return new FieldMinAgg(fieldType);
+ public FieldMinAgg create(DataType fieldType, CoreOptions options, String
field) {
+ return new FieldMinAgg(identifier(), fieldType);
}
@Override
public String identifier() {
- return FieldMinAgg.NAME;
+ return NAME;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java
index 41b9f3e41..b92df6414 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldNestedUpdateAggFactory.java
@@ -32,6 +32,9 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
/** Factory for #{@link FieldNestedUpdateAgg}. */
public class FieldNestedUpdateAggFactory implements FieldAggregatorFactory {
+
+ public static final String NAME = "nested_update";
+
@Override
public FieldAggregator create(DataType fieldType, CoreOptions options,
String field) {
return createFieldNestedUpdateAgg(fieldType,
options.fieldNestedUpdateAggNestedKey(field));
@@ -39,20 +42,20 @@ public class FieldNestedUpdateAggFactory implements
FieldAggregatorFactory {
@Override
public String identifier() {
- return FieldNestedUpdateAgg.NAME;
+ return NAME;
}
- private static FieldAggregator createFieldNestedUpdateAgg(
- DataType fieldType, List<String> nestedKey) {
+ private FieldAggregator createFieldNestedUpdateAgg(DataType fieldType,
List<String> nestedKey) {
if (nestedKey == null) {
nestedKey = Collections.emptyList();
}
- String typeErrorMsg = "Data type of nested table column must be
'Array<Row>' but was '%s'.";
+ String typeErrorMsg =
+ "Data type for nested table column must be 'Array<Row>' but
was '%s'.";
checkArgument(fieldType instanceof ArrayType, typeErrorMsg, fieldType);
ArrayType arrayType = (ArrayType) fieldType;
checkArgument(arrayType.getElementType() instanceof RowType,
typeErrorMsg, fieldType);
- return new FieldNestedUpdateAgg(arrayType, nestedKey);
+ return new FieldNestedUpdateAgg(identifier(), arrayType, nestedKey);
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldPrimaryKeyAggFactory.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldPrimaryKeyAggFactory.java
index 312d29753..0e293bcf7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldPrimaryKeyAggFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldPrimaryKeyAggFactory.java
@@ -25,13 +25,16 @@ import org.apache.paimon.types.DataType;
/** Factory for #{@link FieldPrimaryKeyAgg}. */
public class FieldPrimaryKeyAggFactory implements FieldAggregatorFactory {
+
+ public static final String NAME = "primary-key";
+
@Override
public FieldAggregator create(DataType fieldType, CoreOptions options,
String field) {
- return new FieldPrimaryKeyAgg(fieldType);
+ return new FieldPrimaryKeyAgg(identifier(), fieldType);
}
@Override
public String identifier() {
- return FieldPrimaryKeyAgg.NAME;
+ return NAME;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldProductAggFactory.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldProductAggFactory.java
index 88be9c42a..7dbdd9f5a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldProductAggFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldProductAggFactory.java
@@ -19,19 +19,28 @@
package org.apache.paimon.mergetree.compact.aggregate.factory;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
import org.apache.paimon.mergetree.compact.aggregate.FieldProductAgg;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeFamily;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Factory for #{@link FieldProductAgg}. */
public class FieldProductAggFactory implements FieldAggregatorFactory {
+
+ public static final String NAME = "product";
+
@Override
- public FieldAggregator create(DataType fieldType, CoreOptions options,
String field) {
- return new FieldProductAgg(fieldType);
+ public FieldProductAgg create(DataType fieldType, CoreOptions options,
String field) {
+ checkArgument(
+
fieldType.getTypeRoot().getFamilies().contains(DataTypeFamily.NUMERIC),
+ "Data type for product column must be 'NumericType' but was
'%s'.",
+ fieldType);
+ return new FieldProductAgg(identifier(), fieldType);
}
@Override
public String identifier() {
- return FieldProductAgg.NAME;
+ return NAME;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldRoaringBitmap32AggFactory.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldRoaringBitmap32AggFactory.java
index 5b2a80b30..91103791f 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldRoaringBitmap32AggFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldRoaringBitmap32AggFactory.java
@@ -19,7 +19,6 @@
package org.apache.paimon.mergetree.compact.aggregate.factory;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
import org.apache.paimon.mergetree.compact.aggregate.FieldRoaringBitmap32Agg;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.VarBinaryType;
@@ -28,17 +27,20 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
/** Factory for #{@link FieldRoaringBitmap32Agg}. */
public class FieldRoaringBitmap32AggFactory implements FieldAggregatorFactory {
+
+ public static final String NAME = "rbm32";
+
@Override
- public FieldAggregator create(DataType fieldType, CoreOptions options,
String field) {
+ public FieldRoaringBitmap32Agg create(DataType fieldType, CoreOptions
options, String field) {
checkArgument(
fieldType instanceof VarBinaryType,
"Data type for roaring bitmap column must be 'VarBinaryType'
but was '%s'.",
fieldType);
- return new FieldRoaringBitmap32Agg((VarBinaryType) fieldType);
+ return new FieldRoaringBitmap32Agg(identifier(), (VarBinaryType)
fieldType);
}
@Override
public String identifier() {
- return FieldRoaringBitmap32Agg.NAME;
+ return NAME;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldRoaringBitmap64AggFactory.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldRoaringBitmap64AggFactory.java
index a0e9fe652..56f5554af 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldRoaringBitmap64AggFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldRoaringBitmap64AggFactory.java
@@ -19,7 +19,6 @@
package org.apache.paimon.mergetree.compact.aggregate.factory;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
import org.apache.paimon.mergetree.compact.aggregate.FieldRoaringBitmap64Agg;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.VarBinaryType;
@@ -28,17 +27,20 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
/** Factory for #{@link FieldRoaringBitmap64Agg}. */
public class FieldRoaringBitmap64AggFactory implements FieldAggregatorFactory {
+
+ public static final String NAME = "rbm64";
+
@Override
- public FieldAggregator create(DataType fieldType, CoreOptions options,
String field) {
+ public FieldRoaringBitmap64Agg create(DataType fieldType, CoreOptions
options, String field) {
checkArgument(
fieldType instanceof VarBinaryType,
"Data type for roaring bitmap column must be 'VarBinaryType'
but was '%s'.",
fieldType);
- return new FieldRoaringBitmap64Agg((VarBinaryType) fieldType);
+ return new FieldRoaringBitmap64Agg(identifier(), (VarBinaryType)
fieldType);
}
@Override
public String identifier() {
- return FieldRoaringBitmap64Agg.NAME;
+ return NAME;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldSumAggFactory.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldSumAggFactory.java
index 8470e6d17..5343f67b6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldSumAggFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldSumAggFactory.java
@@ -19,19 +19,28 @@
package org.apache.paimon.mergetree.compact.aggregate.factory;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
import org.apache.paimon.mergetree.compact.aggregate.FieldSumAgg;
import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DataTypeFamily;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
/** Factory for #{@link FieldSumAgg}. */
public class FieldSumAggFactory implements FieldAggregatorFactory {
+
+ public static final String NAME = "sum";
+
@Override
- public FieldAggregator create(DataType fieldType, CoreOptions options,
String field) {
- return new FieldSumAgg(fieldType);
+ public FieldSumAgg create(DataType fieldType, CoreOptions options, String
field) {
+ checkArgument(
+
fieldType.getTypeRoot().getFamilies().contains(DataTypeFamily.NUMERIC),
+ "Data type for sum column must be 'NumericType' but was '%s'.",
+ fieldType);
+ return new FieldSumAgg(identifier(), fieldType);
}
@Override
public String identifier() {
- return FieldSumAgg.NAME;
+ return NAME;
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldThetaSketchAggFactory.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldThetaSketchAggFactory.java
index 6aa5d7bc4..c30fb7df7 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldThetaSketchAggFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/factory/FieldThetaSketchAggFactory.java
@@ -19,7 +19,6 @@
package org.apache.paimon.mergetree.compact.aggregate.factory;
import org.apache.paimon.CoreOptions;
-import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
import org.apache.paimon.mergetree.compact.aggregate.FieldThetaSketchAgg;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.VarBinaryType;
@@ -28,17 +27,20 @@ import static
org.apache.paimon.utils.Preconditions.checkArgument;
/** Factory for #{@link FieldThetaSketchAgg}. */
public class FieldThetaSketchAggFactory implements FieldAggregatorFactory {
+
+ public static final String NAME = "theta_sketch";
+
@Override
- public FieldAggregator create(DataType fieldType, CoreOptions options,
String field) {
+ public FieldThetaSketchAgg create(DataType fieldType, CoreOptions options,
String field) {
checkArgument(
fieldType instanceof VarBinaryType,
"Data type for theta sketch column must be 'VarBinaryType' but
was '%s'.",
fieldType);
- return new FieldThetaSketchAgg((VarBinaryType) fieldType);
+ return new FieldThetaSketchAgg(identifier(), (VarBinaryType)
fieldType);
}
@Override
public String identifier() {
- return FieldThetaSketchAgg.NAME;
+ return NAME;
}
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
index c8344c44d..28cb4c099 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/LookupChangelogMergeFunctionWrapperTest.java
@@ -26,8 +26,8 @@ import org.apache.paimon.data.InternalRow.FieldGetter;
import org.apache.paimon.lookup.LookupStrategy;
import org.apache.paimon.mergetree.compact.aggregate.AggregateMergeFunction;
import org.apache.paimon.mergetree.compact.aggregate.FieldAggregator;
-import org.apache.paimon.mergetree.compact.aggregate.FieldLastValueAgg;
-import org.apache.paimon.mergetree.compact.aggregate.FieldSumAgg;
+import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastValueAggFactory;
+import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldSumAggFactory;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypes;
@@ -293,7 +293,8 @@ public class LookupChangelogMergeFunctionWrapperTest {
row -> row.isNullAt(0) ?
null : row.getInt(0)
},
new FieldAggregator[] {
- new
FieldSumAgg(DataTypes.INT())
+ new FieldSumAggFactory()
+
.create(DataTypes.INT(), null, null)
}),
RowType.of(DataTypes.INT()),
RowType.of(DataTypes.INT())),
@@ -381,7 +382,8 @@ public class LookupChangelogMergeFunctionWrapperTest {
row -> row.isNullAt(0) ?
null : row.getInt(0)
},
new FieldAggregator[] {
- new
FieldLastValueAgg(DataTypes.INT())
+ new
FieldLastValueAggFactory()
+
.create(DataTypes.INT(), null, null)
}),
RowType.of(DataTypes.INT()),
RowType.of(DataTypes.INT())),
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
index 0fa1433e7..d32098b80 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
@@ -27,6 +27,25 @@ import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalArray;
import org.apache.paimon.data.InternalMap;
import org.apache.paimon.data.InternalRow;
+import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory;
+import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldBoolAndAggFactory;
+import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldBoolOrAggFactory;
+import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldCollectAggFactory;
+import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldFirstNonNullValueAggFactory;
+import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldFirstValueAggFactory;
+import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldHllSketchAggFactory;
+import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastNonNullValueAggFactory;
+import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldLastValueAggFactory;
+import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldListaggAggFactory;
+import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldMaxAggFactory;
+import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldMergeMapAggFactory;
+import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldMinAggFactory;
+import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldNestedUpdateAggFactory;
+import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldProductAggFactory;
+import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldRoaringBitmap32AggFactory;
+import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldRoaringBitmap64AggFactory;
+import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldSumAggFactory;
+import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldThetaSketchAggFactory;
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.BigIntType;
import org.apache.paimon.types.BooleanType;
@@ -67,14 +86,16 @@ public class FieldAggregatorTest {
@Test
public void testFieldBoolAndAgg() {
- FieldBoolAndAgg fieldBoolAndAgg = new FieldBoolAndAgg(new
BooleanType());
+ FieldBoolAndAgg fieldBoolAndAgg =
+ new FieldBoolAndAggFactory().create(new BooleanType(), null,
null);
assertThat(fieldBoolAndAgg.agg(false, true)).isEqualTo(false);
assertThat(fieldBoolAndAgg.agg(true, true)).isEqualTo(true);
}
@Test
public void testFieldBoolOrAgg() {
- FieldBoolOrAgg fieldBoolOrAgg = new FieldBoolOrAgg(new BooleanType());
+ FieldBoolOrAgg fieldBoolOrAgg =
+ new FieldBoolOrAggFactory().create(new BooleanType(), null,
null);
assertThat(fieldBoolOrAgg.agg(false, true)).isEqualTo(true);
assertThat(fieldBoolOrAgg.agg(false, false)).isEqualTo(false);
}
@@ -82,7 +103,7 @@ public class FieldAggregatorTest {
@Test
public void testFieldLastNonNullValueAgg() {
FieldLastNonNullValueAgg fieldLastNonNullValueAgg =
- new FieldLastNonNullValueAgg(new IntType());
+ new FieldLastNonNullValueAggFactory().create(new IntType(),
null, null);
Integer accumulator = null;
Integer inputField = 1;
assertThat(fieldLastNonNullValueAgg.agg(accumulator,
inputField)).isEqualTo(1);
@@ -94,7 +115,8 @@ public class FieldAggregatorTest {
@Test
public void testFieldLastValueAgg() {
- FieldLastValueAgg fieldLastValueAgg = new FieldLastValueAgg(new
IntType());
+ FieldLastValueAgg fieldLastValueAgg =
+ new FieldLastValueAggFactory().create(new IntType(), null,
null);
Integer accumulator = null;
Integer inputField = 1;
assertThat(fieldLastValueAgg.agg(accumulator,
inputField)).isEqualTo(1);
@@ -106,7 +128,8 @@ public class FieldAggregatorTest {
@Test
public void testFieldFirstValueAgg() {
- FieldFirstValueAgg fieldFirstValueAgg = new FieldFirstValueAgg(new
IntType());
+ FieldFirstValueAgg fieldFirstValueAgg =
+ new FieldFirstValueAggFactory().create(new IntType(), null,
null);
assertThat(fieldFirstValueAgg.agg(null, 1)).isEqualTo(1);
assertThat(fieldFirstValueAgg.agg(1, 2)).isEqualTo(1);
@@ -117,7 +140,7 @@ public class FieldAggregatorTest {
@Test
public void testFieldFirstNonNullValueAgg() {
FieldFirstNonNullValueAgg fieldFirstNonNullValueAgg =
- new FieldFirstNonNullValueAgg(new IntType());
+ new FieldFirstNonNullValueAggFactory().create(new IntType(),
null, null);
assertThat(fieldFirstNonNullValueAgg.agg(null, null)).isNull();
assertThat(fieldFirstNonNullValueAgg.agg(null, 1)).isEqualTo(1);
assertThat(fieldFirstNonNullValueAgg.agg(1, 2)).isEqualTo(1);
@@ -129,8 +152,8 @@ public class FieldAggregatorTest {
@Test
public void testFieldListAggWithDefaultDelimiter() {
FieldListaggAgg fieldListaggAgg =
- new FieldListaggAgg(
- new VarCharType(), new CoreOptions(new HashMap<>()),
"fieldName");
+ new FieldListaggAggFactory()
+ .create(new VarCharType(), new CoreOptions(new
HashMap<>()), "fieldName");
BinaryString accumulator = BinaryString.fromString("user1");
BinaryString inputField = BinaryString.fromString("user2");
assertThat(fieldListaggAgg.agg(accumulator, inputField).toString())
@@ -140,11 +163,13 @@ public class FieldAggregatorTest {
@Test
public void testFieldListAggWithCustomDelimiter() {
FieldListaggAgg fieldListaggAgg =
- new FieldListaggAgg(
- new VarCharType(),
- CoreOptions.fromMap(
-
ImmutableMap.of("fields.fieldName.list-agg-delimiter", "-")),
- "fieldName");
+ new FieldListaggAggFactory()
+ .create(
+ new VarCharType(),
+ CoreOptions.fromMap(
+ ImmutableMap.of(
+
"fields.fieldName.list-agg-delimiter", "-")),
+ "fieldName");
BinaryString accumulator = BinaryString.fromString("user1");
BinaryString inputField = BinaryString.fromString("user2");
assertThat(fieldListaggAgg.agg(accumulator, inputField).toString())
@@ -153,7 +178,7 @@ public class FieldAggregatorTest {
@Test
public void testFieldMaxAgg() {
- FieldMaxAgg fieldMaxAgg = new FieldMaxAgg(new IntType());
+ FieldMaxAgg fieldMaxAgg = new FieldMaxAggFactory().create(new
IntType(), null, null);
Integer accumulator = 1;
Integer inputField = 10;
assertThat(fieldMaxAgg.agg(accumulator, inputField)).isEqualTo(10);
@@ -161,7 +186,7 @@ public class FieldAggregatorTest {
@Test
public void testFieldMinAgg() {
- FieldMinAgg fieldMinAgg = new FieldMinAgg(new IntType());
+ FieldMinAgg fieldMinAgg = new FieldMinAggFactory().create(new
IntType(), null, null);
Integer accumulator = 1;
Integer inputField = 10;
assertThat(fieldMinAgg.agg(accumulator, inputField)).isEqualTo(1);
@@ -169,7 +194,7 @@ public class FieldAggregatorTest {
@Test
public void testFieldSumIntAgg() {
- FieldSumAgg fieldSumAgg = new FieldSumAgg(new IntType());
+ FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new
IntType(), null, null);
assertThat(fieldSumAgg.agg(null, 10)).isEqualTo(10);
assertThat(fieldSumAgg.agg(1, 10)).isEqualTo(11);
assertThat(fieldSumAgg.retract(10, 5)).isEqualTo(5);
@@ -178,7 +203,8 @@ public class FieldAggregatorTest {
@Test
public void testFieldProductIntAgg() {
- FieldProductAgg fieldProductAgg = new FieldProductAgg(new IntType());
+ FieldProductAgg fieldProductAgg =
+ new FieldProductAggFactory().create(new IntType(), null, null);
assertThat(fieldProductAgg.agg(null, 10)).isEqualTo(10);
assertThat(fieldProductAgg.agg(1, 10)).isEqualTo(10);
assertThat(fieldProductAgg.retract(10, 5)).isEqualTo(2);
@@ -187,7 +213,7 @@ public class FieldAggregatorTest {
@Test
public void testFieldSumByteAgg() {
- FieldSumAgg fieldSumAgg = new FieldSumAgg(new TinyIntType());
+ FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new
TinyIntType(), null, null);
assertThat(fieldSumAgg.agg(null, (byte) 10)).isEqualTo((byte) 10);
assertThat(fieldSumAgg.agg((byte) 1, (byte) 10)).isEqualTo((byte) 11);
assertThat(fieldSumAgg.retract((byte) 10, (byte) 5)).isEqualTo((byte)
5);
@@ -196,7 +222,8 @@ public class FieldAggregatorTest {
@Test
public void testFieldProductByteAgg() {
- FieldProductAgg fieldProductAgg = new FieldProductAgg(new
TinyIntType());
+ FieldProductAgg fieldProductAgg =
+ new FieldProductAggFactory().create(new TinyIntType(), null,
null);
assertThat(fieldProductAgg.agg(null, (byte) 10)).isEqualTo((byte) 10);
assertThat(fieldProductAgg.agg((byte) 1, (byte) 10)).isEqualTo((byte)
10);
assertThat(fieldProductAgg.retract((byte) 10, (byte)
5)).isEqualTo((byte) 2);
@@ -205,7 +232,8 @@ public class FieldAggregatorTest {
@Test
public void testFieldProductShortAgg() {
- FieldProductAgg fieldProductAgg = new FieldProductAgg(new
SmallIntType());
+ FieldProductAgg fieldProductAgg =
+ new FieldProductAggFactory().create(new SmallIntType(), null,
null);
assertThat(fieldProductAgg.agg(null, (short) 10)).isEqualTo((short)
10);
assertThat(fieldProductAgg.agg((short) 1, (short)
10)).isEqualTo((short) 10);
assertThat(fieldProductAgg.retract((short) 10, (short)
5)).isEqualTo((short) 2);
@@ -214,7 +242,7 @@ public class FieldAggregatorTest {
@Test
public void testFieldSumShortAgg() {
- FieldSumAgg fieldSumAgg = new FieldSumAgg(new SmallIntType());
+ FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new
SmallIntType(), null, null);
assertThat(fieldSumAgg.agg(null, (short) 10)).isEqualTo((short) 10);
assertThat(fieldSumAgg.agg((short) 1, (short) 10)).isEqualTo((short)
11);
assertThat(fieldSumAgg.retract((short) 10, (short)
5)).isEqualTo((short) 5);
@@ -223,7 +251,7 @@ public class FieldAggregatorTest {
@Test
public void testFieldSumLongAgg() {
- FieldSumAgg fieldSumAgg = new FieldSumAgg(new BigIntType());
+ FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new
BigIntType(), null, null);
assertThat(fieldSumAgg.agg(null, 10L)).isEqualTo(10L);
assertThat(fieldSumAgg.agg(1L, 10L)).isEqualTo(11L);
assertThat(fieldSumAgg.retract(10L, 5L)).isEqualTo(5L);
@@ -232,7 +260,8 @@ public class FieldAggregatorTest {
@Test
public void testFieldProductLongAgg() {
- FieldProductAgg fieldProductAgg = new FieldProductAgg(new
BigIntType());
+ FieldProductAgg fieldProductAgg =
+ new FieldProductAggFactory().create(new BigIntType(), null,
null);
assertThat(fieldProductAgg.agg(null, 10L)).isEqualTo(10L);
assertThat(fieldProductAgg.agg(1L, 10L)).isEqualTo(10L);
assertThat(fieldProductAgg.retract(10L, 5L)).isEqualTo(2L);
@@ -241,7 +270,8 @@ public class FieldAggregatorTest {
@Test
public void testFieldProductFloatAgg() {
- FieldProductAgg fieldProductAgg = new FieldProductAgg(new FloatType());
+ FieldProductAgg fieldProductAgg =
+ new FieldProductAggFactory().create(new FloatType(), null,
null);
assertThat(fieldProductAgg.agg(null, (float) 10)).isEqualTo((float)
10);
assertThat(fieldProductAgg.agg((float) 1, (float)
10)).isEqualTo((float) 10);
assertThat(fieldProductAgg.retract((float) 10, (float)
5)).isEqualTo((float) 2);
@@ -250,7 +280,7 @@ public class FieldAggregatorTest {
@Test
public void testFieldSumFloatAgg() {
- FieldSumAgg fieldSumAgg = new FieldSumAgg(new FloatType());
+ FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new
FloatType(), null, null);
assertThat(fieldSumAgg.agg(null, (float) 10)).isEqualTo((float) 10);
assertThat(fieldSumAgg.agg((float) 1, (float) 10)).isEqualTo((float)
11);
assertThat(fieldSumAgg.retract((float) 10, (float)
5)).isEqualTo((float) 5);
@@ -259,7 +289,8 @@ public class FieldAggregatorTest {
@Test
public void testFieldProductDoubleAgg() {
- FieldProductAgg fieldProductAgg = new FieldProductAgg(new
DoubleType());
+ FieldProductAgg fieldProductAgg =
+ new FieldProductAggFactory().create(new DoubleType(), null,
null);
assertThat(fieldProductAgg.agg(null, (double) 10)).isEqualTo((double)
10);
assertThat(fieldProductAgg.agg((double) 1, (double)
10)).isEqualTo((double) 10);
assertThat(fieldProductAgg.retract((double) 10, (double)
5)).isEqualTo((double) 2);
@@ -268,7 +299,7 @@ public class FieldAggregatorTest {
@Test
public void testFieldSumDoubleAgg() {
- FieldSumAgg fieldSumAgg = new FieldSumAgg(new DoubleType());
+ FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new
DoubleType(), null, null);
assertThat(fieldSumAgg.agg(null, (double) 10)).isEqualTo((double) 10);
assertThat(fieldSumAgg.agg((double) 1, (double)
10)).isEqualTo((double) 11);
assertThat(fieldSumAgg.retract((double) 10, (double)
5)).isEqualTo((double) 5);
@@ -277,7 +308,8 @@ public class FieldAggregatorTest {
@Test
public void testFieldProductDecimalAgg() {
- FieldProductAgg fieldProductAgg = new FieldProductAgg(new
DecimalType());
+ FieldProductAgg fieldProductAgg =
+ new FieldProductAggFactory().create(new DecimalType(), null,
null);
assertThat(fieldProductAgg.agg(null,
toDecimal(10))).isEqualTo(toDecimal(10));
assertThat(fieldProductAgg.agg(toDecimal(1),
toDecimal(10))).isEqualTo(toDecimal(10));
assertThat(fieldProductAgg.retract(toDecimal(10),
toDecimal(5))).isEqualTo(toDecimal(2));
@@ -286,7 +318,7 @@ public class FieldAggregatorTest {
@Test
public void testFieldSumDecimalAgg() {
- FieldSumAgg fieldSumAgg = new FieldSumAgg(new DecimalType());
+ FieldSumAgg fieldSumAgg = new FieldSumAggFactory().create(new
DecimalType(), null, null);
assertThat(fieldSumAgg.agg(null,
toDecimal(10))).isEqualTo(toDecimal(10));
assertThat(fieldSumAgg.agg(toDecimal(1),
toDecimal(10))).isEqualTo(toDecimal(11));
assertThat(fieldSumAgg.retract(toDecimal(10),
toDecimal(5))).isEqualTo(toDecimal(5));
@@ -307,6 +339,7 @@ public class FieldAggregatorTest {
DataTypes.FIELD(2, "v", DataTypes.STRING()));
FieldNestedUpdateAgg agg =
new FieldNestedUpdateAgg(
+ FieldNestedUpdateAggFactory.NAME,
DataTypes.ARRAY(
DataTypes.ROW(
DataTypes.FIELD(0, "k0",
DataTypes.INT()),
@@ -347,7 +380,10 @@ public class FieldAggregatorTest {
DataTypes.FIELD(1, "k1", DataTypes.INT()),
DataTypes.FIELD(2, "v", DataTypes.STRING()));
FieldNestedUpdateAgg agg =
- new FieldNestedUpdateAgg(DataTypes.ARRAY(elementRowType),
Collections.emptyList());
+ new FieldNestedUpdateAgg(
+ FieldNestedUpdateAggFactory.NAME,
+ DataTypes.ARRAY(elementRowType),
+ Collections.emptyList());
InternalArray accumulator = null;
InternalArray.ElementGetter elementGetter =
@@ -385,7 +421,13 @@ public class FieldAggregatorTest {
@Test
public void testFieldCollectAggWithDistinct() {
- FieldCollectAgg agg = new
FieldCollectAgg(DataTypes.ARRAY(DataTypes.INT()), true);
+ FieldCollectAgg agg =
+ new FieldCollectAggFactory()
+ .create(
+ DataTypes.ARRAY(DataTypes.INT()),
+ CoreOptions.fromMap(
+
ImmutableMap.of("fields.fieldName.distinct", "true")),
+ "fieldName");
InternalArray result;
InternalArray.ElementGetter elementGetter =
@@ -407,7 +449,13 @@ public class FieldAggregatorTest {
@Test
public void testFiledCollectAggWithRowType() {
RowType rowType = RowType.of(DataTypes.INT(), DataTypes.STRING());
- FieldCollectAgg agg = new FieldCollectAgg(DataTypes.ARRAY(rowType),
true);
+ FieldCollectAgg agg =
+ new FieldCollectAggFactory()
+ .create(
+ DataTypes.ARRAY(rowType),
+ CoreOptions.fromMap(
+
ImmutableMap.of("fields.fieldName.distinct", "true")),
+ "fieldName");
InternalArray result;
InternalArray.ElementGetter elementGetter =
InternalArray.createElementGetter(rowType);
@@ -438,7 +486,13 @@ public class FieldAggregatorTest {
@Test
public void testFiledCollectAggWithArrayType() {
ArrayType arrayType = new ArrayType(DataTypes.INT());
- FieldCollectAgg agg = new FieldCollectAgg(DataTypes.ARRAY(arrayType),
true);
+ FieldCollectAgg agg =
+ new FieldCollectAggFactory()
+ .create(
+ DataTypes.ARRAY(arrayType),
+ CoreOptions.fromMap(
+
ImmutableMap.of("fields.fieldName.distinct", "true")),
+ "fieldName");
InternalArray result;
InternalArray.ElementGetter elementGetter =
InternalArray.createElementGetter(arrayType);
@@ -469,7 +523,13 @@ public class FieldAggregatorTest {
@Test
public void testFiledCollectAggWithMapType() {
MapType mapType = new MapType(DataTypes.INT(), DataTypes.STRING());
- FieldCollectAgg agg = new FieldCollectAgg(DataTypes.ARRAY(mapType),
true);
+ FieldCollectAgg agg =
+ new FieldCollectAggFactory()
+ .create(
+ DataTypes.ARRAY(mapType),
+ CoreOptions.fromMap(
+
ImmutableMap.of("fields.fieldName.distinct", "true")),
+ "fieldName");
InternalArray result;
InternalArray.ElementGetter elementGetter =
InternalArray.createElementGetter(mapType);
@@ -497,7 +557,13 @@ public class FieldAggregatorTest {
@Test
public void testFieldCollectAggWithoutDistinct() {
- FieldCollectAgg agg = new
FieldCollectAgg(DataTypes.ARRAY(DataTypes.INT()), false);
+ FieldCollectAgg agg =
+ new FieldCollectAggFactory()
+ .create(
+ DataTypes.ARRAY(DataTypes.INT()),
+ CoreOptions.fromMap(
+
ImmutableMap.of("fields.fieldName.distinct", "false")),
+ "fieldName");
InternalArray result;
InternalArray.ElementGetter elementGetter =
@@ -522,7 +588,13 @@ public class FieldAggregatorTest {
InternalArray.ElementGetter elementGetter;
// primitive type
- agg = new FieldCollectAgg(DataTypes.ARRAY(DataTypes.INT()), true);
+ agg =
+ new FieldCollectAggFactory()
+ .create(
+ DataTypes.ARRAY(DataTypes.INT()),
+ CoreOptions.fromMap(
+
ImmutableMap.of("fields.fieldName.distinct", "true")),
+ "fieldName");
elementGetter = InternalArray.createElementGetter(DataTypes.INT());
InternalArray result =
(InternalArray)
@@ -533,7 +605,13 @@ public class FieldAggregatorTest {
// row type
RowType rowType = RowType.of(DataTypes.INT(), DataTypes.STRING());
- agg = new FieldCollectAgg(DataTypes.ARRAY(rowType), true);
+ agg =
+ new FieldCollectAggFactory()
+ .create(
+ DataTypes.ARRAY(rowType),
+ CoreOptions.fromMap(
+
ImmutableMap.of("fields.fieldName.distinct", "true")),
+ "fieldName");
elementGetter = InternalArray.createElementGetter(rowType);
Object[] accElements =
@@ -556,7 +634,13 @@ public class FieldAggregatorTest {
// array type
ArrayType arrayType = new ArrayType(DataTypes.INT());
- agg = new FieldCollectAgg(DataTypes.ARRAY(arrayType), true);
+ agg =
+ new FieldCollectAggFactory()
+ .create(
+ DataTypes.ARRAY(arrayType),
+ CoreOptions.fromMap(
+
ImmutableMap.of("fields.fieldName.distinct", "true")),
+ "fieldName");
elementGetter = InternalArray.createElementGetter(arrayType);
accElements =
@@ -578,7 +662,13 @@ public class FieldAggregatorTest {
// map type
MapType mapType = new MapType(DataTypes.INT(), DataTypes.STRING());
- agg = new FieldCollectAgg(DataTypes.ARRAY(mapType), true);
+ agg =
+ new FieldCollectAggFactory()
+ .create(
+ DataTypes.ARRAY(mapType),
+ CoreOptions.fromMap(
+
ImmutableMap.of("fields.fieldName.distinct", "true")),
+ "fieldName");
elementGetter = InternalArray.createElementGetter(mapType);
accElements =
@@ -604,7 +694,13 @@ public class FieldAggregatorTest {
InternalArray.ElementGetter elementGetter;
// primitive type
- agg = new FieldCollectAgg(DataTypes.ARRAY(DataTypes.INT()), true);
+ agg =
+ new FieldCollectAggFactory()
+ .create(
+ DataTypes.ARRAY(DataTypes.INT()),
+ CoreOptions.fromMap(
+
ImmutableMap.of("fields.fieldName.distinct", "true")),
+ "fieldName");
elementGetter = InternalArray.createElementGetter(DataTypes.INT());
InternalArray result =
(InternalArray)
@@ -615,7 +711,13 @@ public class FieldAggregatorTest {
// row type
RowType rowType = RowType.of(DataTypes.INT(), DataTypes.STRING());
- agg = new FieldCollectAgg(DataTypes.ARRAY(rowType), true);
+ agg =
+ new FieldCollectAggFactory()
+ .create(
+ DataTypes.ARRAY(rowType),
+ CoreOptions.fromMap(
+
ImmutableMap.of("fields.fieldName.distinct", "true")),
+ "fieldName");
elementGetter = InternalArray.createElementGetter(rowType);
Object[] accElements =
@@ -641,7 +743,13 @@ public class FieldAggregatorTest {
// array type
ArrayType arrayType = new ArrayType(DataTypes.INT());
- agg = new FieldCollectAgg(DataTypes.ARRAY(arrayType), true);
+ agg =
+ new FieldCollectAggFactory()
+ .create(
+ DataTypes.ARRAY(arrayType),
+ CoreOptions.fromMap(
+
ImmutableMap.of("fields.fieldName.distinct", "true")),
+ "fieldName");
elementGetter = InternalArray.createElementGetter(arrayType);
accElements =
@@ -666,7 +774,13 @@ public class FieldAggregatorTest {
// map type
MapType mapType = new MapType(DataTypes.INT(), DataTypes.STRING());
- agg = new FieldCollectAgg(DataTypes.ARRAY(mapType), true);
+ agg =
+ new FieldCollectAggFactory()
+ .create(
+ DataTypes.ARRAY(mapType),
+ CoreOptions.fromMap(
+
ImmutableMap.of("fields.fieldName.distinct", "true")),
+ "fieldName");
elementGetter = InternalArray.createElementGetter(mapType);
accElements =
@@ -691,7 +805,8 @@ public class FieldAggregatorTest {
@Test
public void testFieldMergeMapAgg() {
FieldMergeMapAgg agg =
- new FieldMergeMapAgg(DataTypes.MAP(DataTypes.INT(),
DataTypes.STRING()));
+ new FieldMergeMapAggFactory()
+ .create(DataTypes.MAP(DataTypes.INT(),
DataTypes.STRING()), null, null);
assertThat(agg.agg(null, null)).isNull();
@@ -710,7 +825,8 @@ public class FieldAggregatorTest {
@Test
public void testFieldMergeMapAggRetract() {
FieldMergeMapAgg agg =
- new FieldMergeMapAgg(DataTypes.MAP(DataTypes.INT(),
DataTypes.STRING()));
+ new FieldMergeMapAggFactory()
+ .create(DataTypes.MAP(DataTypes.INT(),
DataTypes.STRING()), null, null);
Object result =
agg.retract(
new GenericMap(toMap(1, "A", 2, "B", 3, "C")),
@@ -720,7 +836,8 @@ public class FieldAggregatorTest {
@Test
public void testFieldThetaSketchAgg() {
- FieldThetaSketchAgg agg = new
FieldThetaSketchAgg(DataTypes.VARBINARY(20));
+ FieldThetaSketchAgg agg =
+ new
FieldThetaSketchAggFactory().create(DataTypes.VARBINARY(20), null, null);
byte[] inputVal = sketchOf(1);
byte[] acc1 = sketchOf(2, 3);
@@ -743,7 +860,8 @@ public class FieldAggregatorTest {
@Test
public void testFieldHllSketchAgg() {
- FieldHllSketchAgg agg = new FieldHllSketchAgg(DataTypes.VARBINARY(20));
+ FieldHllSketchAgg agg =
+ new FieldHllSketchAggFactory().create(DataTypes.VARBINARY(20),
null, null);
byte[] inputVal = HllSketchUtil.sketchOf(1);
byte[] acc1 = HllSketchUtil.sketchOf(2, 3);
@@ -766,7 +884,8 @@ public class FieldAggregatorTest {
@Test
public void testFieldRoaringBitmap32Agg() {
- FieldRoaringBitmap32Agg agg = new
FieldRoaringBitmap32Agg(DataTypes.VARBINARY(20));
+ FieldRoaringBitmap32Agg agg =
+ new
FieldRoaringBitmap32AggFactory().create(DataTypes.VARBINARY(20), null, null);
byte[] inputVal = RoaringBitmap32.bitmapOf(1).serialize();
byte[] acc1 = RoaringBitmap32.bitmapOf(2, 3).serialize();
@@ -789,7 +908,8 @@ public class FieldAggregatorTest {
@Test
public void testFieldRoaringBitmap64Agg() throws IOException {
- FieldRoaringBitmap64Agg agg = new
FieldRoaringBitmap64Agg(DataTypes.VARBINARY(20));
+ FieldRoaringBitmap64Agg agg =
+ new
FieldRoaringBitmap64AggFactory().create(DataTypes.VARBINARY(20), null, null);
byte[] inputVal = RoaringBitmap64.bitmapOf(1L).serialize();
byte[] acc1 = RoaringBitmap64.bitmapOf(2L, 3L).serialize();
@@ -813,7 +933,7 @@ public class FieldAggregatorTest {
@Test
public void testCustomAgg() throws IOException {
FieldAggregator fieldAggregator =
- FieldAggregator.createFieldAggregator(
+ FieldAggregatorFactory.create(
DataTypes.STRING(),
"custom",
false,
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCostomAgg.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCustomAgg.java
similarity index 84%
rename from
paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCostomAgg.java
rename to
paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCustomAgg.java
index aedf6a373..3550ebe27 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCostomAgg.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCustomAgg.java
@@ -21,14 +21,10 @@ package org.apache.paimon.mergetree.compact.aggregate;
import org.apache.paimon.types.DataType;
/** Custom FieldAggregator for Test. */
-public class TestCostomAgg extends FieldAggregator {
- public TestCostomAgg(DataType dataType) {
- super(dataType);
- }
+public class TestCustomAgg extends FieldAggregator {
- @Override
- public String name() {
- return "custom";
+ public TestCustomAgg(String name, DataType dataType) {
+ super(name, dataType);
}
@Override
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCostomAggFactory.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCustomAggFactory.java
similarity index 82%
rename from
paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCostomAggFactory.java
rename to
paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCustomAggFactory.java
index e8884bfb5..7e7715f6d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCostomAggFactory.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/TestCustomAggFactory.java
@@ -22,15 +22,18 @@ import org.apache.paimon.CoreOptions;
import
org.apache.paimon.mergetree.compact.aggregate.factory.FieldAggregatorFactory;
import org.apache.paimon.types.DataType;
-/** FieldAggregatorFactory for test. */
-public class TestCostomAggFactory implements FieldAggregatorFactory {
+/** FieldAggregatorFactory for #{@link TestCustomAgg} test. */
+public class TestCustomAggFactory implements FieldAggregatorFactory {
+
+ public static final String NAME = "custom";
+
@Override
public FieldAggregator create(DataType fieldType, CoreOptions options,
String field) {
- return new TestCostomAgg(fieldType);
+ return new TestCustomAgg(identifier(), fieldType);
}
@Override
public String identifier() {
- return "custom";
+ return NAME;
}
}
diff --git
a/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
index f3e74bb89..7eb517ab9 100644
---
a/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-core/src/test/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -13,4 +13,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.paimon.mergetree.compact.aggregate.TestCostomAggFactory
\ No newline at end of file
+org.apache.paimon.mergetree.compact.aggregate.TestCustomAggFactory
\ No newline at end of file