This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b55ef815d [core] introduce theta_sketch aggregate function. (#3723)
b55ef815d is described below
commit b55ef815d4ec9e7adec40f3b71b309bd02d3ea6e
Author: zhoulii <[email protected]>
AuthorDate: Thu Jul 11 11:17:37 2024 +0800
[core] introduce theta_sketch aggregate function. (#3723)
---
docs/content/primary-key-table/merge-engine.md | 59 ++++++++++++++++++++++
paimon-common/pom.xml | 11 ++++
.../java/org/apache/paimon/utils/ThetaSketch.java | 46 +++++++++++++++++
paimon-common/src/main/resources/META-INF/NOTICE | 2 +
.../compact/aggregate/FieldAggregator.java | 8 +++
.../compact/aggregate/FieldThetaSketchAgg.java | 52 +++++++++++++++++++
.../compact/aggregate/FieldAggregatorTest.java | 24 +++++++++
.../apache/paimon/flink/PreAggregationITCase.java | 47 +++++++++++++++++
8 files changed, 249 insertions(+)
diff --git a/docs/content/primary-key-table/merge-engine.md
b/docs/content/primary-key-table/merge-engine.md
index 2b52ac703..a01a26c28 100644
--- a/docs/content/primary-key-table/merge-engine.md
+++ b/docs/content/primary-key-table/merge-engine.md
@@ -323,6 +323,65 @@ Current supported aggregate functions and data types are:
* `merge_map`:
The merge_map function merge input maps. It only supports MAP type.
+* `theta_sketch`:
+ The theta_sketch function aggregates multiple serialized Sketch objects into
a single Sketch.
+ It supports VARBINARY data type.
+
+ An example:
+
+ {{< tabs "nested_update-example" >}}
+
+ {{< tab "Flink" >}}
+
+ ```sql
+ -- source table
+ CREATE TABLE VISITS (
+ id INT PRIMARY KEY NOT ENFORCED,
+ user_id STRING
+ );
+
+ -- agg table
+ CREATE TABLE UV_AGG (
+ id INT PRIMARY KEY NOT ENFORCED,
+ uv VARBINARY
+ ) WITH (
+ 'merge-engine' = 'aggregation',
+ 'fields.f0.aggregate-function' = 'theta_sketch'
+ );
+
+ -- Register the following class as a Flink function with the name "SKETCH"
+ -- which is used to transform input to sketch bytes array:
+ --
+ -- public static class SketchFunction extends ScalarFunction {
+ -- public byte[] eval(String user_id) {
+ -- UpdateSketch updateSketch = UpdateSketch.builder().build();
+ -- updateSketch.update(user_id);
+ -- return updateSketch.compact().toByteArray();
+ -- }
+ -- }
+ --
+ INSERT INTO UV_AGG SELECT id, SKETCH(user_id) FROM VISITS;
+
+ -- Register the following class as a Flink function with the name
"SKETCH_COUNT"
+ -- which is used to get cardinality from sketch bytes array:
+ --
+ -- public static class SketchCountFunction extends ScalarFunction {
+ -- public Double eval(byte[] sketchBytes) {
+ -- if (sketchBytes == null) {
+ -- return 0d;
+ -- }
+ -- return
Sketches.wrapCompactSketch(Memory.wrap(sketchBytes)).getEstimate();
+ -- }
+ -- }
+ --
+ -- Then we can get user cardinality based on the aggregated field.
+ SELECT id, SKETCH_COUNT(UV) as uv FROM UV_AGG;
+ ```
+
+ {{< /tab >}}
+
+ {{< /tabs >}}
+
{{< hint info >}}
For streaming queries, `aggregation` merge engine must be used together with
`lookup` or `full-compaction`
[changelog producer]({{< ref "primary-key-table/changelog-producer" >}}).
('input' changelog producer is also supported, but only returns input records.)
diff --git a/paimon-common/pom.xml b/paimon-common/pom.xml
index ab95c8332..78183158d 100644
--- a/paimon-common/pom.xml
+++ b/paimon-common/pom.xml
@@ -159,6 +159,12 @@ under the License.
<version>0.2.2</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.datasketches</groupId>
+ <artifactId>datasketches-java</artifactId>
+ <version>4.2.0</version>
+ </dependency>
+
<!-- Test -->
<dependency>
@@ -294,6 +300,7 @@ under the License.
<include>net.openhft:zero-allocation-hashing</include>
<include>com.github.davidmoten:hilbert-curve</include>
<include>com.github.davidmoten:guava-mini</include>
+
<include>org.apache.datasketches:*</include>
</includes>
</artifactSet>
<filters>
@@ -345,6 +352,10 @@ under the License.
<pattern>com.github.davidmoten.guavamini</pattern>
<shadedPattern>org.apache.paimon.shade.com.github.davidmoten.guavamini</shadedPattern>
</relocation>
+ <relocation>
+ <pattern>org.apache.datasketches</pattern>
+
<shadedPattern>org.apache.paimon.shade.org.apache.datasketches</shadedPattern>
+ </relocation>
</relocations>
<minimizeJar>true</minimizeJar>
</configuration>
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/ThetaSketch.java
b/paimon-common/src/main/java/org/apache/paimon/utils/ThetaSketch.java
new file mode 100644
index 000000000..4d3ee9a84
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ThetaSketch.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.theta.Sketches;
+import org.apache.datasketches.theta.Union;
+import org.apache.datasketches.theta.UpdateSketch;
+
+/** A compressed bitmap for 32-bit integer. */
+public class ThetaSketch {
+
+ public static byte[] union(byte[] sketchBytes1, byte[] sketchBytes2) {
+ Union union = Sketches.setOperationBuilder().buildUnion();
+ union.union(Memory.wrap(sketchBytes1));
+ union.union(Memory.wrap(sketchBytes2));
+ return union.getResult().toByteArray();
+ }
+
+ @VisibleForTesting
+ public static byte[] sketchOf(int... values) {
+ UpdateSketch updateSketch = UpdateSketch.builder().build();
+ for (int value : values) {
+ updateSketch.update(value);
+ }
+ return updateSketch.compact().toByteArray();
+ }
+}
diff --git a/paimon-common/src/main/resources/META-INF/NOTICE
b/paimon-common/src/main/resources/META-INF/NOTICE
index 4cc302361..138f5d154 100644
--- a/paimon-common/src/main/resources/META-INF/NOTICE
+++ b/paimon-common/src/main/resources/META-INF/NOTICE
@@ -6,6 +6,8 @@ The Apache Software Foundation (http://www.apache.org/).
This project bundles the following dependencies under the Apache Software
License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
- org.roaringbitmap:RoaringBitmap:1.0.5
+- org.apache.datasketches:datasketches-java:4.2.0
+- org.apache.datasketches:datasketches-memory:2.2.0
This project bundles the following dependencies under the BSD 3-clause license.
You find them under licenses/LICENSE.antlr-runtime and licenses/LICENSE.janino.
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
index eee8fa3f9..1ce92c4cd 100644
---
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
@@ -23,6 +23,7 @@ import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarBinaryType;
import javax.annotation.Nullable;
@@ -116,6 +117,13 @@ public abstract class FieldAggregator implements
Serializable {
fieldType);
fieldAggregator = new FieldMergeMapAgg((MapType)
fieldType);
break;
+ case FieldThetaSketchAgg.NAME:
+ checkArgument(
+ fieldType instanceof VarBinaryType,
+ "Data type for theta sketch column must be
'VarBinaryType' but was '%s'.",
+ fieldType);
+ fieldAggregator = new
FieldThetaSketchAgg((VarBinaryType) fieldType);
+ break;
default:
throw new RuntimeException(
String.format(
diff --git
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldThetaSketchAgg.java
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldThetaSketchAgg.java
new file mode 100644
index 000000000..7182a6744
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldThetaSketchAgg.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact.aggregate;
+
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.utils.ThetaSketch;
+
+/** ThetaSketch aggregate a field of a row. */
+public class FieldThetaSketchAgg extends FieldAggregator {
+
+ public static final String NAME = "theta_sketch";
+
+ private static final long serialVersionUID = 1L;
+
+ public FieldThetaSketchAgg(VarBinaryType dataType) {
+ super(dataType);
+ }
+
+ @Override
+ public String name() {
+ return NAME;
+ }
+
+ @Override
+ public Object agg(Object accumulator, Object inputField) {
+ if (accumulator == null && inputField == null) {
+ return null;
+ }
+
+ if (accumulator == null || inputField == null) {
+ return accumulator == null ? inputField : accumulator;
+ }
+
+ return ThetaSketch.union((byte[]) accumulator, (byte[]) inputField);
+ }
+}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
index 8ca08e0aa..cc3f6813d 100644
---
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
@@ -52,6 +52,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.apache.paimon.utils.ThetaSketch.sketchOf;
import static org.assertj.core.api.Assertions.assertThat;
/** test whether {@link FieldAggregator}' subclasses behaviors are expected. */
@@ -725,6 +726,29 @@ public class FieldAggregatorTest {
assertThat(toJavaMap(result)).containsExactlyInAnyOrderEntriesOf(toMap(3, "C"));
}
+ @Test
+ public void testFieldThetaSketchAgg() {
+ FieldThetaSketchAgg agg = new
FieldThetaSketchAgg(DataTypes.VARBINARY(20));
+
+ byte[] inputVal = sketchOf(1);
+ byte[] acc1 = sketchOf(2, 3);
+ byte[] acc2 = sketchOf(1, 2, 3);
+
+ assertThat(agg.agg(null, null)).isNull();
+
+ byte[] result1 = (byte[]) agg.agg(null, inputVal);
+ assertThat(inputVal).isEqualTo(result1);
+
+ byte[] result2 = (byte[]) agg.agg(acc1, null);
+ assertThat(result2).isEqualTo(acc1);
+
+ byte[] result3 = (byte[]) agg.agg(acc1, inputVal);
+ assertThat(result3).isEqualTo(acc2);
+
+ byte[] result4 = (byte[]) agg.agg(acc2, inputVal);
+ assertThat(result4).isEqualTo(acc2);
+ }
+
private Map<Object, Object> toMap(Object... kvs) {
Map<Object, Object> result = new HashMap<>();
for (int i = 0; i < kvs.length; i += 2) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
index ad7968341..2403b876a 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
@@ -23,6 +23,7 @@ import
org.apache.paimon.mergetree.compact.aggregate.FieldMergeMapAgg;
import org.apache.paimon.mergetree.compact.aggregate.FieldNestedUpdateAgg;
import org.apache.paimon.utils.BlockingIterator;
+import org.apache.commons.codec.binary.Hex;
import org.apache.flink.table.api.config.ExecutionConfigOptions;
import org.apache.flink.table.planner.factories.TestValuesTableFactory;
import org.apache.flink.types.Row;
@@ -46,6 +47,7 @@ import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.apache.paimon.utils.ThetaSketch.sketchOf;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -1810,4 +1812,49 @@ public class PreAggregationITCase {
"Pre-aggregate continuous reading is not supported");
}
}
+
+ /** ITCase for {@link
org.apache.paimon.mergetree.compact.aggregate.FieldThetaSketchAgg}. */
+ public static class ThetaSketchAggAggregationITCase extends
CatalogITCaseBase {
+
+ @Test
+ public void testThetaSketchAgg() {
+ sql(
+ "CREATE TABLE test_collect("
+ + " id INT PRIMARY KEY NOT ENFORCED,"
+ + " f0 VARBINARY"
+ + ") WITH ("
+ + " 'merge-engine' = 'aggregation',"
+ + " 'fields.f0.aggregate-function' =
'theta_sketch'"
+ + ")");
+
+ String str1 = Hex.encodeHexString(sketchOf(1)).toUpperCase();
+ String str2 = Hex.encodeHexString(sketchOf(2)).toUpperCase();
+ String str3 = Hex.encodeHexString(sketchOf(3)).toUpperCase();
+
+ sql(
+ String.format(
+ "INSERT INTO test_collect VALUES (1, CAST (NULL AS
VARBINARY)),(2, CAST(x'%s' AS VARBINARY)), (3, CAST(x'%s' AS VARBINARY))",
+ str1, str2));
+
+ List<Row> result = queryAndSort("SELECT * FROM test_collect");
+ checkOneRecord(result.get(0), 1, null);
+ checkOneRecord(result.get(1), 2, sketchOf(1));
+ checkOneRecord(result.get(2), 3, sketchOf(2));
+
+ sql(
+ String.format(
+ "INSERT INTO test_collect VALUES (1, CAST (x'%s'
AS VARBINARY)),(2, CAST(x'%s' AS VARBINARY)), (2, CAST(x'%s' AS VARBINARY)),
(3, CAST(x'%s' AS VARBINARY))",
+ str1, str2, str2, str3));
+
+ result = queryAndSort("SELECT * FROM test_collect");
+ checkOneRecord(result.get(0), 1, sketchOf(1));
+ checkOneRecord(result.get(1), 2, sketchOf(1, 2));
+ checkOneRecord(result.get(2), 3, sketchOf(2, 3));
+ }
+
+ private void checkOneRecord(Row row, int id, byte[] expected) {
+ assertThat(row.getField(0)).isEqualTo(id);
+ assertThat(row.getField(1)).isEqualTo(expected);
+ }
+ }
}