This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new a9cd48b24c [core] Adjust distinct option behavior in FieldListaggAgg
operator (#6582)
a9cd48b24c is described below
commit a9cd48b24cb0832a371a34ee33bb218ef4f94651
Author: HOKNANG_LO <[email protected]>
AuthorDate: Tue Nov 25 15:14:38 2025 +0800
[core] Adjust distinct option behavior in FieldListaggAgg operator (#6582)
---
.../primary-key-table/merge-engine/aggregation.md | 1 +
.../org/apache/paimon/utils/BinaryStringUtils.java | 60 +++++++++
.../compact/aggregate/FieldListaggAgg.java | 42 +++++-
.../compact/aggregate/FieldAggregatorTest.java | 147 +++++++++++++++++++++
4 files changed, 249 insertions(+), 1 deletion(-)
diff --git a/docs/content/primary-key-table/merge-engine/aggregation.md
b/docs/content/primary-key-table/merge-engine/aggregation.md
index a575e48738..ed845eebfb 100644
--- a/docs/content/primary-key-table/merge-engine/aggregation.md
+++ b/docs/content/primary-key-table/merge-engine/aggregation.md
@@ -100,6 +100,7 @@ Current supported aggregate functions and data types are:
The listagg function concatenates multiple string values into a single
string.
It supports STRING data type.
Each field not part of the primary keys can be given a list agg delimiter,
specified by the fields.<field-name>.list-agg-delimiter table property,
otherwise it will use "," as default.
+ You can use `fields.<field-name>.distinct=true` to deduplicate values split
by the `fields.<field-name>.list-agg-delimiter`.
### bool_and
The bool_and function evaluates whether all values in a boolean set are true.
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/BinaryStringUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/BinaryStringUtils.java
index ae306c3764..74d1c726bf 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/BinaryStringUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/BinaryStringUtils.java
@@ -20,11 +20,13 @@ package org.apache.paimon.utils;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.memory.MemorySegment;
import org.apache.paimon.memory.MemorySegmentUtils;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.DataTypeChecks;
import java.time.DateTimeException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.TimeZone;
@@ -419,4 +421,62 @@ public class BinaryStringUtils {
}
return BinaryString.fromBytes(result);
}
+
+ public static BinaryString[] splitByWholeSeparatorPreserveAllTokens(
+ BinaryString str, BinaryString delimiter) {
+ int sizeInBytes = str.getSizeInBytes();
+ MemorySegment[] segments = str.getSegments();
+ int offset = str.getOffset();
+
+ if (sizeInBytes == 0) {
+ return EMPTY_STRING_ARRAY;
+ }
+
+ if (delimiter == null || BinaryString.EMPTY_UTF8.equals(delimiter)) {
+ // Split on whitespace.
+ return splitByWholeSeparatorPreserveAllTokens(str, fromString("
"));
+ }
+
+ int sepSize = delimiter.getSizeInBytes();
+ MemorySegment[] sepSegs = delimiter.getSegments();
+ int sepOffset = delimiter.getOffset();
+
+ final ArrayList<BinaryString> substrings = new ArrayList<>();
+ int beg = 0;
+ int end = 0;
+ while (end < sizeInBytes) {
+ end =
+ MemorySegmentUtils.find(
+ segments,
+ offset + beg,
+ sizeInBytes - beg,
+ sepSegs,
+ sepOffset,
+ sepSize)
+ - offset;
+
+ if (end > -1) {
+ if (end > beg) {
+
+ // The following is OK, because String.substring( beg, end
) excludes
+ // the character at the position 'end'.
+ substrings.add(BinaryString.fromAddress(segments, offset +
beg, end - beg));
+
+ // Set the starting point for the next search.
+ // The following is equivalent to beg = end +
(separatorLength - 1) + 1,
+ // which is the right calculation:
+ } else {
+ // We found a consecutive occurrence of the separator.
+ substrings.add(BinaryString.EMPTY_UTF8);
+ }
+ beg = end + sepSize;
+ } else {
+ // String.substring( beg ) goes from 'beg' to the end of the
String.
+ substrings.add(BinaryString.fromAddress(segments, offset +
beg, sizeInBytes - beg));
+ end = sizeInBytes;
+ }
+ }
+
+ return substrings.toArray(new BinaryString[0]);
+ }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java
index 239e40bfac..def24e3ed6 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldListaggAgg.java
@@ -23,6 +23,11 @@ import org.apache.paimon.data.BinaryString;
import org.apache.paimon.types.VarCharType;
import org.apache.paimon.utils.BinaryStringUtils;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
+import java.util.Arrays;
+import java.util.List;
+
/** listagg aggregate a field of a row. */
public class FieldListaggAgg extends FieldAggregator {
@@ -49,10 +54,45 @@ public class FieldListaggAgg extends FieldAggregator {
BinaryString mergeFieldSD = (BinaryString) accumulator;
BinaryString inFieldSD = (BinaryString) inputField;
- if (distinct && inFieldSD.getSizeInBytes() > 0 &&
mergeFieldSD.contains(inFieldSD)) {
+ if (inFieldSD.getSizeInBytes() <= 0) {
return mergeFieldSD;
}
+ if (mergeFieldSD.getSizeInBytes() <= 0) {
+ return inFieldSD;
+ }
+
+ if (distinct) {
+ BinaryString delimiterBinaryString =
BinaryString.fromString(delimiter);
+ BinaryString[] binaryStrings =
+ BinaryStringUtils.splitByWholeSeparatorPreserveAllTokens(
+ inFieldSD, delimiterBinaryString);
+
+ List<BinaryString> concatItems =
+ Arrays.stream(binaryStrings)
+ .filter(it -> it.getSizeInBytes() > 0 &&
!mergeFieldSD.contains(it))
+ .collect(
+ () -> Lists.newArrayList(mergeFieldSD),
+ (acc, r) -> {
+ if (!acc.isEmpty()) {
+ acc.add(delimiterBinaryString);
+ }
+ acc.add(r);
+ },
+ (l, r) -> {
+ if (!l.isEmpty() && !r.isEmpty()) {
+ l.add(delimiterBinaryString);
+ }
+ l.addAll(r);
+ });
+
+ if (concatItems.size() == 1) {
+ return concatItems.get(0);
+ }
+
+ return BinaryStringUtils.concat(concatItems);
+ }
+
return BinaryStringUtils.concat(
mergeFieldSD, BinaryString.fromString(delimiter), inFieldSD);
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
index 1c8506c25c..0b838c3609 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
@@ -187,6 +187,153 @@ public class FieldAggregatorTest {
assertEquals("user1,user2,user3", result.toString());
}
+ @Test
+ public void testFieldListAggWithCustomDelimiterAndEmptyStrings() {
+ FieldListaggAgg fieldListaggAgg =
+ new FieldListaggAggFactory()
+ .create(
+ new VarCharType(),
+ CoreOptions.fromMap(
+ ImmutableMap.of(
+ "fields.fieldName.distinct",
+ "true",
+
"fields.fieldName.list-agg-delimiter",
+ ";")),
+ "fieldName");
+
+ BinaryString result =
+ Stream.of(BinaryString.fromString(""),
BinaryString.fromString(""))
+ .sequential()
+ .reduce((l, r) -> (BinaryString)
fieldListaggAgg.agg(l, r))
+ .orElse(null);
+
+ assertNotNull(result);
+ assertEquals("", result.toString());
+ }
+
+ @Test
+ public void testFieldListAggWithDefaultDelimiterAndDistinctWithMultiUser()
{
+ FieldListaggAgg fieldListaggAgg =
+ new FieldListaggAggFactory()
+ .create(
+ new VarCharType(),
+ CoreOptions.fromMap(
+
ImmutableMap.of("fields.fieldName.distinct", "true")),
+ "fieldName");
+
+ BinaryString result =
+ Stream.of(
+ BinaryString.fromString("user1"),
+ BinaryString.fromString("user2"),
+ BinaryString.fromString("user1,user3"))
+ .sequential()
+ .reduce((l, r) -> (BinaryString)
fieldListaggAgg.agg(l, r))
+ .orElse(null);
+
+ assertNotNull(result);
+ assertEquals("user1,user2,user3", result.toString());
+ }
+
+ @Test
+ public void
testFieldListAggWithDefaultDelimiterAndDistinctWithEmptyLeftUser() {
+ FieldListaggAgg fieldListaggAgg =
+ new FieldListaggAggFactory()
+ .create(
+ new VarCharType(),
+ CoreOptions.fromMap(
+
ImmutableMap.of("fields.fieldName.distinct", "true")),
+ "fieldName");
+
+ BinaryString result =
+ Stream.of(
+ BinaryString.fromString(""),
+ BinaryString.fromString("user2"),
+ BinaryString.fromString("user1,user3"))
+ .sequential()
+ .reduce((l, r) -> (BinaryString)
fieldListaggAgg.agg(l, r))
+ .orElse(null);
+
+ assertNotNull(result);
+ assertEquals("user2,user1,user3", result.toString());
+ }
+
+ @Test
+ public void
testFieldListAggWithCustomDelimiterAndDistinctWithMultiKvString() {
+ FieldListaggAgg fieldListaggAgg =
+ new FieldListaggAggFactory()
+ .create(
+ new VarCharType(),
+ CoreOptions.fromMap(
+ ImmutableMap.of(
+ "fields.fieldName.distinct",
+ "true",
+
"fields.fieldName.list-agg-delimiter",
+ ";")),
+ "fieldName");
+
+ BinaryString result =
+ Stream.of(
+ BinaryString.fromString("k1=v1;k2=v2"),
+ BinaryString.fromString("k1=v1;k3=v3"),
+ BinaryString.fromString(""))
+ .sequential()
+ .reduce((l, r) -> (BinaryString)
fieldListaggAgg.agg(l, r))
+ .orElse(null);
+
+ assertNotNull(result);
+ assertEquals("k1=v1;k2=v2;k3=v3", result.toString());
+ }
+
+ @Test
+ public void
testFieldListAggWithCustomDelimiterDistinctMultiKvStringWithWhiteSpace() {
+ FieldListaggAgg fieldListaggAgg =
+ new FieldListaggAggFactory()
+ .create(
+ new VarCharType(),
+ CoreOptions.fromMap(
+ ImmutableMap.of(
+ "fields.fieldName.distinct",
+ "true",
+
"fields.fieldName.list-agg-delimiter",
+ " ")),
+ "fieldName");
+
+ BinaryString result =
+ Stream.of(
+ BinaryString.fromString("k1=v1 k2=v2"),
+ BinaryString.fromString(" k1=v1 k3=v3"),
+ BinaryString.fromString(" "))
+ .sequential()
+ .reduce((l, r) -> (BinaryString)
fieldListaggAgg.agg(l, r))
+ .orElse(null);
+
+ assertNotNull(result);
+ assertEquals("k1=v1 k2=v2 k3=v3", result.toString());
+ }
+
+ @Test
+ public void
testFieldListAggWithDefaultDelimiterAndDistinctWithMultiDuplicatedKvString() {
+ FieldListaggAgg fieldListaggAgg =
+ new FieldListaggAggFactory()
+ .create(
+ new VarCharType(),
+ CoreOptions.fromMap(
+
ImmutableMap.of("fields.fieldName.distinct", "true")),
+ "fieldName");
+
+ BinaryString result =
+ Stream.of(
+ BinaryString.fromString("k1=v1,k2=v2"),
+ BinaryString.fromString("k1=v1,k2=v3"),
+ BinaryString.fromString(""))
+ .sequential()
+ .reduce((l, r) -> (BinaryString)
fieldListaggAgg.agg(l, r))
+ .orElse(null);
+
+ assertNotNull(result);
+ assertEquals("k1=v1,k2=v2,k2=v3", result.toString());
+ }
+
@Test
public void testFieldListAggWithCustomDelimiter() {
FieldListaggAgg fieldListaggAgg =