This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new b55ef815d [core] introduce theta_sketch aggregate function. (#3723)
b55ef815d is described below

commit b55ef815d4ec9e7adec40f3b71b309bd02d3ea6e
Author: zhoulii <[email protected]>
AuthorDate: Thu Jul 11 11:17:37 2024 +0800

    [core] introduce theta_sketch aggregate function. (#3723)
---
 docs/content/primary-key-table/merge-engine.md     | 59 ++++++++++++++++++++++
 paimon-common/pom.xml                              | 11 ++++
 .../java/org/apache/paimon/utils/ThetaSketch.java  | 46 +++++++++++++++++
 paimon-common/src/main/resources/META-INF/NOTICE   |  2 +
 .../compact/aggregate/FieldAggregator.java         |  8 +++
 .../compact/aggregate/FieldThetaSketchAgg.java     | 52 +++++++++++++++++++
 .../compact/aggregate/FieldAggregatorTest.java     | 24 +++++++++
 .../apache/paimon/flink/PreAggregationITCase.java  | 47 +++++++++++++++++
 8 files changed, 249 insertions(+)

diff --git a/docs/content/primary-key-table/merge-engine.md 
b/docs/content/primary-key-table/merge-engine.md
index 2b52ac703..a01a26c28 100644
--- a/docs/content/primary-key-table/merge-engine.md
+++ b/docs/content/primary-key-table/merge-engine.md
@@ -323,6 +323,65 @@ Current supported aggregate functions and data types are:
 * `merge_map`:
   The merge_map function merge input maps. It only supports MAP type.
 
+* `theta_sketch`:
+  The theta_sketch function aggregates multiple serialized Sketch objects into 
a single Sketch.
+  It supports VARBINARY data type.
+
+  An example:
+
+  {{< tabs "nested_update-example" >}}
+  
+  {{< tab "Flink" >}}
+
+  ```sql
+  -- source table
+  CREATE TABLE VISITS (
+    id INT PRIMARY KEY NOT ENFORCED,
+    user_id STRING
+  );
+  
+  -- agg table
+  CREATE TABLE UV_AGG (
+    id INT PRIMARY KEY NOT ENFORCED,
+    uv VARBINARY
+  ) WITH (
+    'merge-engine' = 'aggregation',
+    'fields.f0.aggregate-function' = 'theta_sketch'
+  );
+  
+  -- Register the following class as a Flink function with the name "SKETCH" 
+  -- which is used to transform input to sketch bytes array:
+  --
+  -- public static class SketchFunction extends ScalarFunction {
+  --   public byte[] eval(String user_id) {
+  --     UpdateSketch updateSketch = UpdateSketch.builder().build();
+  --     updateSketch.update(user_id);
+  --     return updateSketch.compact().toByteArray();
+  --   }
+  -- }
+  --
+  INSERT INTO UV_AGG SELECT id, SKETCH(user_id) FROM VISITS;
+
+  -- Register the following class as a Flink function with the name 
"SKETCH_COUNT"
+  -- which is used to get cardinality from sketch bytes array:
+  -- 
+  -- public static class SketchCountFunction extends ScalarFunction { 
+  --   public Double eval(byte[] sketchBytes) {
+  --     if (sketchBytes == null) {
+  --       return 0d; 
+  --     } 
+  --     return 
Sketches.wrapCompactSketch(Memory.wrap(sketchBytes)).getEstimate(); 
+  --   } 
+  -- }
+  --
+  -- Then we can get user cardinality based on the aggregated field.
+  SELECT id, SKETCH_COUNT(UV) as uv FROM UV_AGG;
+  ```
+
+  {{< /tab >}}
+  
+  {{< /tabs >}}
+
 {{< hint info >}}
 For streaming queries, `aggregation` merge engine must be used together with 
`lookup` or `full-compaction`
 [changelog producer]({{< ref "primary-key-table/changelog-producer" >}}). 
('input' changelog producer is also supported, but only returns input records.)
diff --git a/paimon-common/pom.xml b/paimon-common/pom.xml
index ab95c8332..78183158d 100644
--- a/paimon-common/pom.xml
+++ b/paimon-common/pom.xml
@@ -159,6 +159,12 @@ under the License.
             <version>0.2.2</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.datasketches</groupId>
+            <artifactId>datasketches-java</artifactId>
+            <version>4.2.0</version>
+        </dependency>
+
         <!-- Test -->
 
         <dependency>
@@ -294,6 +300,7 @@ under the License.
                                     
<include>net.openhft:zero-allocation-hashing</include>
                                     
<include>com.github.davidmoten:hilbert-curve</include>
                                     
<include>com.github.davidmoten:guava-mini</include>
+                                    
<include>org.apache.datasketches:*</include>
                                 </includes>
                             </artifactSet>
                             <filters>
@@ -345,6 +352,10 @@ under the License.
                                     
<pattern>com.github.davidmoten.guavamini</pattern>
                                     
<shadedPattern>org.apache.paimon.shade.com.github.davidmoten.guavamini</shadedPattern>
                                 </relocation>
+                                <relocation>
+                                    <pattern>org.apache.datasketches</pattern>
+                                    
<shadedPattern>org.apache.paimon.shade.org.apache.datasketches</shadedPattern>
+                                </relocation>
                             </relocations>
                             <minimizeJar>true</minimizeJar>
                         </configuration>
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/ThetaSketch.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/ThetaSketch.java
new file mode 100644
index 000000000..4d3ee9a84
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/ThetaSketch.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.annotation.VisibleForTesting;
+
+import org.apache.datasketches.memory.Memory;
+import org.apache.datasketches.theta.Sketches;
+import org.apache.datasketches.theta.Union;
+import org.apache.datasketches.theta.UpdateSketch;
+
+/** A compressed bitmap for 32-bit integer. */
+public class ThetaSketch {
+
+    public static byte[] union(byte[] sketchBytes1, byte[] sketchBytes2) {
+        Union union = Sketches.setOperationBuilder().buildUnion();
+        union.union(Memory.wrap(sketchBytes1));
+        union.union(Memory.wrap(sketchBytes2));
+        return union.getResult().toByteArray();
+    }
+
+    @VisibleForTesting
+    public static byte[] sketchOf(int... values) {
+        UpdateSketch updateSketch = UpdateSketch.builder().build();
+        for (int value : values) {
+            updateSketch.update(value);
+        }
+        return updateSketch.compact().toByteArray();
+    }
+}
diff --git a/paimon-common/src/main/resources/META-INF/NOTICE 
b/paimon-common/src/main/resources/META-INF/NOTICE
index 4cc302361..138f5d154 100644
--- a/paimon-common/src/main/resources/META-INF/NOTICE
+++ b/paimon-common/src/main/resources/META-INF/NOTICE
@@ -6,6 +6,8 @@ The Apache Software Foundation (http://www.apache.org/).
 
 This project bundles the following dependencies under the Apache Software 
License 2.0 (http://www.apache.org/licenses/LICENSE-2.0.txt)
 - org.roaringbitmap:RoaringBitmap:1.0.5
+- org.apache.datasketches:datasketches-java:4.2.0
+- org.apache.datasketches:datasketches-memory:2.2.0
 
 This project bundles the following dependencies under the BSD 3-clause license.
 You find them under licenses/LICENSE.antlr-runtime and licenses/LICENSE.janino.
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
index eee8fa3f9..1ce92c4cd 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregator.java
@@ -23,6 +23,7 @@ import org.apache.paimon.types.ArrayType;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.MapType;
 import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.VarBinaryType;
 
 import javax.annotation.Nullable;
 
@@ -116,6 +117,13 @@ public abstract class FieldAggregator implements 
Serializable {
                                 fieldType);
                         fieldAggregator = new FieldMergeMapAgg((MapType) 
fieldType);
                         break;
+                    case FieldThetaSketchAgg.NAME:
+                        checkArgument(
+                                fieldType instanceof VarBinaryType,
+                                "Data type for theta sketch column must be 
'VarBinaryType' but was '%s'.",
+                                fieldType);
+                        fieldAggregator = new 
FieldThetaSketchAgg((VarBinaryType) fieldType);
+                        break;
                     default:
                         throw new RuntimeException(
                                 String.format(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldThetaSketchAgg.java
 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldThetaSketchAgg.java
new file mode 100644
index 000000000..7182a6744
--- /dev/null
+++ 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/compact/aggregate/FieldThetaSketchAgg.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.mergetree.compact.aggregate;
+
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.utils.ThetaSketch;
+
+/** ThetaSketch aggregate a field of a row. */
+public class FieldThetaSketchAgg extends FieldAggregator {
+
+    public static final String NAME = "theta_sketch";
+
+    private static final long serialVersionUID = 1L;
+
+    public FieldThetaSketchAgg(VarBinaryType dataType) {
+        super(dataType);
+    }
+
+    @Override
+    public String name() {
+        return NAME;
+    }
+
+    @Override
+    public Object agg(Object accumulator, Object inputField) {
+        if (accumulator == null && inputField == null) {
+            return null;
+        }
+
+        if (accumulator == null || inputField == null) {
+            return accumulator == null ? inputField : accumulator;
+        }
+
+        return ThetaSketch.union((byte[]) accumulator, (byte[]) inputField);
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
index 8ca08e0aa..cc3f6813d 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/mergetree/compact/aggregate/FieldAggregatorTest.java
@@ -52,6 +52,7 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.paimon.utils.ThetaSketch.sketchOf;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** test whether {@link FieldAggregator}' subclasses behaviors are expected. */
@@ -725,6 +726,29 @@ public class FieldAggregatorTest {
         
assertThat(toJavaMap(result)).containsExactlyInAnyOrderEntriesOf(toMap(3, "C"));
     }
 
+    @Test
+    public void testFieldThetaSketchAgg() {
+        FieldThetaSketchAgg agg = new 
FieldThetaSketchAgg(DataTypes.VARBINARY(20));
+
+        byte[] inputVal = sketchOf(1);
+        byte[] acc1 = sketchOf(2, 3);
+        byte[] acc2 = sketchOf(1, 2, 3);
+
+        assertThat(agg.agg(null, null)).isNull();
+
+        byte[] result1 = (byte[]) agg.agg(null, inputVal);
+        assertThat(inputVal).isEqualTo(result1);
+
+        byte[] result2 = (byte[]) agg.agg(acc1, null);
+        assertThat(result2).isEqualTo(acc1);
+
+        byte[] result3 = (byte[]) agg.agg(acc1, inputVal);
+        assertThat(result3).isEqualTo(acc2);
+
+        byte[] result4 = (byte[]) agg.agg(acc2, inputVal);
+        assertThat(result4).isEqualTo(acc2);
+    }
+
     private Map<Object, Object> toMap(Object... kvs) {
         Map<Object, Object> result = new HashMap<>();
         for (int i = 0; i < kvs.length; i += 2) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
index ad7968341..2403b876a 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/PreAggregationITCase.java
@@ -23,6 +23,7 @@ import 
org.apache.paimon.mergetree.compact.aggregate.FieldMergeMapAgg;
 import org.apache.paimon.mergetree.compact.aggregate.FieldNestedUpdateAgg;
 import org.apache.paimon.utils.BlockingIterator;
 
+import org.apache.commons.codec.binary.Hex;
 import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.planner.factories.TestValuesTableFactory;
 import org.apache.flink.types.Row;
@@ -46,6 +47,7 @@ import java.util.Map;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.paimon.utils.ThetaSketch.sketchOf;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
@@ -1810,4 +1812,49 @@ public class PreAggregationITCase {
                     "Pre-aggregate continuous reading is not supported");
         }
     }
+
+    /** ITCase for {@link 
org.apache.paimon.mergetree.compact.aggregate.FieldThetaSketchAgg}. */
+    public static class ThetaSketchAggAggregationITCase extends 
CatalogITCaseBase {
+
+        @Test
+        public void testThetaSketchAgg() {
+            sql(
+                    "CREATE TABLE test_collect("
+                            + "  id INT PRIMARY KEY NOT ENFORCED,"
+                            + "  f0 VARBINARY"
+                            + ") WITH ("
+                            + "  'merge-engine' = 'aggregation',"
+                            + "  'fields.f0.aggregate-function' = 
'theta_sketch'"
+                            + ")");
+
+            String str1 = Hex.encodeHexString(sketchOf(1)).toUpperCase();
+            String str2 = Hex.encodeHexString(sketchOf(2)).toUpperCase();
+            String str3 = Hex.encodeHexString(sketchOf(3)).toUpperCase();
+
+            sql(
+                    String.format(
+                            "INSERT INTO test_collect VALUES (1, CAST (NULL AS 
VARBINARY)),(2, CAST(x'%s' AS VARBINARY)), (3, CAST(x'%s' AS VARBINARY))",
+                            str1, str2));
+
+            List<Row> result = queryAndSort("SELECT * FROM test_collect");
+            checkOneRecord(result.get(0), 1, null);
+            checkOneRecord(result.get(1), 2, sketchOf(1));
+            checkOneRecord(result.get(2), 3, sketchOf(2));
+
+            sql(
+                    String.format(
+                            "INSERT INTO test_collect VALUES (1, CAST (x'%s' 
AS VARBINARY)),(2, CAST(x'%s' AS VARBINARY)), (2, CAST(x'%s' AS VARBINARY)), 
(3, CAST(x'%s' AS VARBINARY))",
+                            str1, str2, str2, str3));
+
+            result = queryAndSort("SELECT * FROM test_collect");
+            checkOneRecord(result.get(0), 1, sketchOf(1));
+            checkOneRecord(result.get(1), 2, sketchOf(1, 2));
+            checkOneRecord(result.get(2), 3, sketchOf(2, 3));
+        }
+
+        private void checkOneRecord(Row row, int id, byte[] expected) {
+            assertThat(row.getField(0)).isEqualTo(id);
+            assertThat(row.getField(1)).isEqualTo(expected);
+        }
+    }
 }

Reply via email to