This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 1c6943c3f713437edf93b547e1f42addc6ed98b4 Author: dylanhz <[email protected]> AuthorDate: Tue Mar 24 21:10:21 2026 +0800 [FLINK-39187][table] Add the built-in function BITMAP_BUILD_AGG --- .../docs/sql/functions/built-in-functions.md | 4 + .../docs/sql/functions/built-in-functions.md | 4 + docs/data/sql_functions.yml | 10 ++ docs/data/sql_functions_zh.yml | 10 ++ .../docs/reference/pyflink.table/expressions.rst | 1 + flink-python/pyflink/table/expression.py | 8 + .../pyflink/table/tests/test_expression.py | 1 + .../flink/table/api/internal/BaseExpressions.java | 10 ++ .../functions/BuiltInFunctionDefinitions.java | 12 ++ .../planner/plan/utils/AggFunctionFactory.scala | 12 ++ .../planner/functions/BitmapAggFunctionITCase.java | 146 +++++++++++++++ .../runtime/batch/sql/OverAggregateITCase.scala | 30 ++++ .../runtime/stream/sql/AggregateITCase.scala | 71 ++++++++ .../runtime/stream/sql/OverAggregateITCase.scala | 39 ++++ .../runtime/stream/sql/WindowAggregateITCase.scala | 26 +++ .../runtime/stream/table/AggregateITCase.scala | 29 +++ .../aggregate/BitmapBuildAggFunction.java | 97 ++++++++++ .../BitmapBuildWithRetractAggFunction.java | 199 +++++++++++++++++++++ 18 files changed, 709 insertions(+) diff --git a/docs/content.zh/docs/sql/functions/built-in-functions.md b/docs/content.zh/docs/sql/functions/built-in-functions.md index 70fa6bc5e84..f931534ce6e 100644 --- a/docs/content.zh/docs/sql/functions/built-in-functions.md +++ b/docs/content.zh/docs/sql/functions/built-in-functions.md @@ -122,6 +122,10 @@ JSON 函数使用符合 ISO/IEC TR 19075-6 SQL标准的 JSON 路径表达式。 {{< sql_functions_zh "aggregate" >}} +### 位图聚合函数 + +{{< sql_functions_zh "bitmapagg" >}} + 时间间隔单位和时间点单位标识符 --------------------------------------- diff --git a/docs/content/docs/sql/functions/built-in-functions.md b/docs/content/docs/sql/functions/built-in-functions.md index f85ae93e486..3ba23b32f33 100644 --- a/docs/content/docs/sql/functions/built-in-functions.md +++ b/docs/content/docs/sql/functions/built-in-functions.md @@ -125,6 +125,10 @@ The aggregate functions take an expression across all the rows as the input and {{< sql_functions "aggregate" >}} +### Bitmap Aggregate Functions + +{{< sql_functions "bitmapagg" >}} + Time Interval and Point Unit Specifiers --------------------------------------- diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index 8f92038de69..fd7fc55c949 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -1568,6 +1568,16 @@ aggregate: FROM orders ``` +bitmapagg: + - sql: BITMAP_BUILD_AGG(value) + table: value.bitmapBuildAgg() + description: | + Aggregates 32-bit integers into a bitmap. + + `value INT` + + Returns a `BITMAP`. + catalog: - sql: CURRENT_DATABASE() table: currentDatabase() diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml index c28996064d7..c74b830968a 100644 --- a/docs/data/sql_functions_zh.yml +++ b/docs/data/sql_functions_zh.yml @@ -1652,6 +1652,16 @@ aggregate: FROM orders ``` +bitmapagg: + - sql: BITMAP_BUILD_AGG(value) + table: value.bitmapBuildAgg() + description: | + 将 32 位整数聚合成位图。 + + `value INT` + + 返回一个 `BITMAP`。 + catalog: - sql: CURRENT_DATABASE() table: currentDatabase() diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst b/flink-python/docs/reference/pyflink.table/expressions.rst index 9ad848c167f..727f5e979b0 100644 --- a/flink-python/docs/reference/pyflink.table/expressions.rst +++ b/flink-python/docs/reference/pyflink.table/expressions.rst @@ -348,6 +348,7 @@ Bitmap functions Expression.bitmap_and Expression.bitmap_andnot Expression.bitmap_build + Expression.bitmap_build_agg Expression.bitmap_cardinality Expression.bitmap_from_bytes Expression.bitmap_or diff --git a/flink-python/pyflink/table/expression.py b/flink-python/pyflink/table/expression.py index a251a2dfcda..9241bbadf55 100644 --- a/flink-python/pyflink/table/expression.py +++ b/flink-python/pyflink/table/expression.py @@ -2309,6 +2309,14 @@ class Expression(Generic[T]): """ return _unary_op("bitmapBuild")(self) + def bitmap_build_agg(self): + """ + Aggregates 32-bit integers into a bitmap. + + :return: a BITMAP expression + """ + return _unary_op("bitmapBuildAgg")(self) + def bitmap_cardinality(self) -> 'Expression': """ Returns the cardinality of a bitmap. diff --git a/flink-python/pyflink/table/tests/test_expression.py b/flink-python/pyflink/table/tests/test_expression.py index edc0dab1770..f6d10fc99c5 100644 --- a/flink-python/pyflink/table/tests/test_expression.py +++ b/flink-python/pyflink/table/tests/test_expression.py @@ -269,6 +269,7 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase): self.assertEqual("BITMAP_AND(a, b)", str(expr1.bitmap_and(expr2))) self.assertEqual("BITMAP_ANDNOT(a, b)", str(expr1.bitmap_andnot(expr2))) self.assertEqual("BITMAP_BUILD(a)", str(expr1.bitmap_build())) + self.assertEqual("BITMAP_BUILD_AGG(a)", str(expr1.bitmap_build_agg())) self.assertEqual("BITMAP_CARDINALITY(a)", str(expr1.bitmap_cardinality())) self.assertEqual("BITMAP_FROM_BYTES(a)", str(expr1.bitmap_from_bytes())) self.assertEqual("BITMAP_OR(a, b)", str(expr1.bitmap_or(expr2))) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java index 3704f80ea8b..086ef39867e 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java @@ -79,6 +79,7 @@ import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BIN; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_AND; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_ANDNOT; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_BUILD; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_BUILD_AGG; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_CARDINALITY; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_FROM_BYTES; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BITMAP_OR; @@ -2651,6 +2652,15 @@ public abstract class BaseExpressions<InType, OutType> { return toApiSpecificExpression(unresolvedCall(BITMAP_BUILD, toExpr())); } + /** + * Aggregates 32-bit integers into a bitmap. + * + * @return a BITMAP expression + */ + public OutType bitmapBuildAgg() { + return toApiSpecificExpression(unresolvedCall(BITMAP_BUILD_AGG, toExpr())); + } + /** * Returns the cardinality of a bitmap. * diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index 7dac1d91cc8..44c40421e30 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -3037,6 +3037,18 @@ public final class BuiltInFunctionDefinitions { "org.apache.flink.table.runtime.functions.scalar.BitmapBuildFunction") .build(); + public static final BuiltInFunctionDefinition BITMAP_BUILD_AGG = + BuiltInFunctionDefinition.newBuilder() + .name("BITMAP_BUILD_AGG") + .kind(AGGREGATE) + .inputTypeStrategy( + sequence( + Collections.singletonList("value"), + Collections.singletonList(logical(LogicalTypeRoot.INTEGER)))) + .outputTypeStrategy(explicit(DataTypes.BITMAP())) + .runtimeProvided() + .build(); + public static final BuiltInFunctionDefinition BITMAP_CARDINALITY = BuiltInFunctionDefinition.newBuilder() .name("BITMAP_CARDINALITY") diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala index ce6bbece424..3eecd384abf 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/utils/AggFunctionFactory.scala @@ -174,6 +174,8 @@ class AggFunctionFactory( // built-in imperativeFunction case BuiltInFunctionDefinitions.PERCENTILE => createPercentileAggFunction(argTypes) + case BuiltInFunctionDefinitions.BITMAP_BUILD_AGG => + createBitmapBuildAggFunction(argTypes, index) // DeclarativeAggregateFunction & UDF case _ => bridge.getDefinition.asInstanceOf[UserDefinedFunction] @@ -649,4 +651,14 @@ class AggFunctionFactory( new SinglePercentileAggFunction(firstArg, secondArg) } } + + private def createBitmapBuildAggFunction( + argTypes: Array[LogicalType], + index: Int): UserDefinedFunction = { + if (aggCallNeedRetractions(index)) { + new BitmapBuildWithRetractAggFunction(argTypes(0)) + } else { + new BitmapBuildAggFunction(argTypes(0)) + } + } } diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapAggFunctionITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapAggFunctionITCase.java new file mode 100644 index 00000000000..8312da400e3 --- /dev/null +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/BitmapAggFunctionITCase.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.planner.functions; + +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.types.Row; +import org.apache.flink.types.bitmap.Bitmap; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.stream.Stream; + +import static org.apache.flink.table.api.DataTypes.BIGINT; +import static org.apache.flink.table.api.DataTypes.BITMAP; +import static org.apache.flink.table.api.DataTypes.INT; +import static org.apache.flink.table.api.DataTypes.ROW; +import static org.apache.flink.table.api.DataTypes.STRING; +import static org.apache.flink.table.api.Expressions.$; +import static org.apache.flink.types.RowKind.DELETE; +import static org.apache.flink.types.RowKind.INSERT; +import static org.apache.flink.types.RowKind.UPDATE_AFTER; +import static org.apache.flink.types.RowKind.UPDATE_BEFORE; + +/** Tests for built-in bitmap aggregation functions. */ +class BitmapAggFunctionITCase extends BuiltInAggregateFunctionTestBase { + + @Override + Stream<TestSpec> getTestCaseSpecs() { + final List<TestSpec> specs = new ArrayList<>(); + specs.addAll(bitmapBuildAggTestCases()); + return specs.stream(); + } + + private List<TestSpec> bitmapBuildAggTestCases() { + return Arrays.asList( + TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_BUILD_AGG) + .withDescription("without retraction") + .withSource( + ROW(INT(), STRING()), + Arrays.asList( + Row.ofKind(INSERT, 1, "A"), + Row.ofKind(INSERT, 2, "A"), + Row.ofKind(INSERT, null, "A"), + Row.ofKind(INSERT, 4, "A"), + Row.ofKind(INSERT, 3, "A"), + Row.ofKind(INSERT, 2, "B"), + Row.ofKind(INSERT, -1, "A"), + Row.ofKind(INSERT, 1, "B"), + Row.ofKind(INSERT, -1, "B"), + Row.ofKind(INSERT, null, "B"), + Row.ofKind(INSERT, null, "C"))) + .testResult( + source -> + "SELECT f1, BITMAP_BUILD_AGG(f0) FROM " + + source + + " GROUP BY f1", + TableApiAggSpec.groupBySelect( + Collections.singletonList($("f1")), + $("f1"), + $("f0").bitmapBuildAgg()), + ROW(STRING(), BITMAP()), + ROW(STRING(), BITMAP()), + Arrays.asList( + Row.of("A", Bitmap.fromArray(new int[] {-1, 1, 2, 3, 4})), + Row.of("B", Bitmap.fromArray(new int[] {-1, 1, 2})), + Row.of("C", null))), + TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_BUILD_AGG) + .withDescription("with retraction") + .withSource( + ROW(INT(), STRING()), + Arrays.asList( + Row.ofKind(INSERT, 1, "A"), + Row.ofKind(INSERT, null, "A"), + Row.ofKind(DELETE, 1, "A"), + Row.ofKind(INSERT, 1, "B"), + Row.ofKind(DELETE, 1, "B"), + Row.ofKind(INSERT, null, "B"), + Row.ofKind(INSERT, 3, "B"), + Row.ofKind(DELETE, 2, "B"), // count < 0 + Row.ofKind(INSERT, -1, "B"), + Row.ofKind(INSERT, 2, "B"), + Row.ofKind(INSERT, 2, "B"), + Row.ofKind(INSERT, 1, "B"), + Row.ofKind(DELETE, 1, "B"), + Row.ofKind(UPDATE_BEFORE, 3, "B"), + Row.ofKind(UPDATE_AFTER, 1, "B"), + Row.ofKind(INSERT, 2, "C"), + Row.ofKind(INSERT, 1, "C"), + Row.ofKind(INSERT, -1, "C"), + Row.ofKind(INSERT, null, "C"), + Row.ofKind(DELETE, 1, "C"), + Row.ofKind(DELETE, -1, "C"), + Row.ofKind(DELETE, null, "C"), + Row.ofKind(UPDATE_BEFORE, 2, "C"), + Row.ofKind(UPDATE_AFTER, 1, "C"))) + .testResult( + source -> + "SELECT f1, BITMAP_BUILD_AGG(f0) FROM " + + source + + " GROUP BY f1", + TableApiAggSpec.groupBySelect( + Collections.singletonList($("f1")), + $("f1"), + $("f0").bitmapBuildAgg()), + ROW(STRING(), BITMAP()), + ROW(STRING(), BITMAP()), + Arrays.asList( + Row.of("A", null), + Row.of("B", Bitmap.fromArray(new int[] {-1, 1, 2})), + Row.of("C", Bitmap.fromArray(new int[] {1})))), + TestSpec.forFunction(BuiltInFunctionDefinitions.BITMAP_BUILD_AGG) + .withDescription("Validation Error") + .withSource( + ROW(BIGINT(), STRING()), + Collections.singletonList(Row.ofKind(INSERT, 1L, "A"))) + .testValidationError( + source -> + "SELECT f1, BITMAP_BUILD_AGG(f0) FROM " + + source + + " GROUP BY f1", + TableApiAggSpec.groupBySelect( + Collections.singletonList($("f1")), + $("f1"), + $("f0").bitmapBuildAgg()), + "Invalid input arguments. Expected signatures are:\n" + + "BITMAP_BUILD_AGG(value <INTEGER>)")); + } +} diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala index b08a944b571..c9ca677c07a 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/batch/sql/OverAggregateITCase.scala @@ -2974,6 +2974,36 @@ class OverAggregateITCase extends BatchTestBase { ) ) } + + @Test + def testBitmapBuildAgg(): Unit = { + checkResult( + "SELECT " + + "d, e, " + + "BITMAP_BUILD_AGG(d) OVER (ORDER BY e ROWS BETWEEN 4 PRECEDING AND CURRENT ROW), " + + "BITMAP_BUILD_AGG(CAST(e AS INT)) OVER (ORDER BY e ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) " + + "FROM NullTable5", + Seq( + row(1, 1L, "{1}", "{1}"), + row(2, 2L, "{1,2}", "{1,2}"), + row(2, 3L, "{1,2}", "{1,2,3}"), + row(3, 4L, "{1,2,3}", "{1,2,3,4}"), + row(3, 5L, "{1,2,3}", "{1,2,3,4,5}"), + row(3, 6L, "{2,3}", "{2,3,4,5,6}"), + row(4, 7L, "{2,3,4}", "{3,4,5,6,7}"), + row(4, 8L, "{3,4}", "{4,5,6,7,8}"), + row(4, 9L, "{3,4}", "{5,6,7,8,9}"), + row(4, 10L, "{3,4}", "{6,7,8,9,10}"), + row(5, 11L, "{4,5}", "{7,8,9,10,11}"), + row(5, 12L, "{4,5}", "{8,9,10,11,12}"), + row(5, 13L, "{4,5}", "{9,10,11,12,13}"), + row(5, 14L, "{4,5}", "{10,11,12,13,14}"), + row(5, 15L, "{5}", "{11,12,13,14,15}"), + row(null, 999L, "{5}", "{12,13,14,15,999}"), + row(null, 999L, "{5}", "{13,14,15,999}") + ) + ) + } } /** The initial accumulator for count aggregate function */ diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala index 3d0e189294e..cd2adf0154a 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala @@ -2119,6 +2119,77 @@ class AggregateITCase( tEnv.dropTemporarySystemFunction("PERCENTILE") } + + @TestTemplate + def testBitmapBuildAgg(): Unit = { + val data = new mutable.MutableList[(Int, Int, String)] + for (i <- 0 until 5) { + data.+=((i, -i, "a")) + data.+=((i * 2, -i * 2, "b")) + } + + val sql = + """ + |SELECT + | c, + | CAST(BITMAP_BUILD_AGG(a) AS STRING), + | CAST(BITMAP_BUILD_AGG(b) AS STRING) + |FROM MyTable + |GROUP BY c + """.stripMargin + + val t = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c) + tEnv.createTemporaryView("MyTable", t) + + val sink = new TestingRetractSink + tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) + env.execute() + + val expected = List( + s"a,{0,1,2,3,4},{0,${Integer.toUnsignedLong(-4)},${Integer.toUnsignedLong(-3)},${Integer + .toUnsignedLong(-2)},${Integer.toUnsignedLong(-1)}}", + s"b,{0,2,4,6,8},{0,${Integer.toUnsignedLong(-8)},${Integer.toUnsignedLong(-6)},${Integer + .toUnsignedLong(-4)},${Integer.toUnsignedLong(-2)}}" + ) + assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted) + } + + @TestTemplate + def testBitmapBuildAggWithRetract(): Unit = { + val data = new mutable.MutableList[(Int, Int, String)] + for (i <- 0 until 5) { + data.+=((i, i, "a")) + data.+=((i + 1, i * 2, "b")) + data.+=((i + 2, i * 3, "c")) + } + + val inner = + """ + |SELECT + | MAX(a) AS ma, + | LAST_VALUE(b) AS la, + | c + |FROM MyTable + |GROUP BY c + """.stripMargin + val sql = + s""" + |SELECT + | CAST(BITMAP_BUILD_AGG(ma) AS STRING), + | CAST(BITMAP_BUILD_AGG(la) AS STRING) + |FROM ($inner) + """.stripMargin + + val t = failingDataSource(data).toTable(tEnv, 'a, 'b, 'c) + tEnv.createTemporaryView("MyTable", t) + + val sink = new TestingRetractSink + tEnv.sqlQuery(sql).toRetractStream[Row].addSink(sink).setParallelism(1) + env.execute() + + val expected = List(s"{4,5,6},{4,8,12}") + assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted) + } } object AggregateITCase { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala index 82c58da9a87..c4deb3500a9 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/OverAggregateITCase.scala @@ -1499,6 +1499,45 @@ class OverAggregateITCase(mode: StateBackendMode, unboundedOverVersion: Int) } } } + + @TestTemplate + def testBitmapBuildAgg(): Unit = { + val sql = + """ + |SELECT + | a, b, + | BITMAP_BUILD_AGG(a) OVER (ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW), + | BITMAP_BUILD_AGG(CAST(b AS INT)) OVER (ORDER BY proctime ROWS BETWEEN 4 PRECEDING AND CURRENT ROW) + |FROM MyTable + """.stripMargin + + val t = + failingDataSource(TestData.tupleData5).toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) + tEnv.createTemporaryView("MyTable", t) + + val sink = new TestingAppendSink() + tEnv.sqlQuery(sql).toDataStream.addSink(sink).setParallelism(1) + env.execute() + + val expected = List( + "1,1,{1},{1}", + "2,2,{1,2},{1,2}", + "2,3,{1,2},{1,2,3}", + "3,4,{1,2,3},{1,2,3,4}", + "3,5,{1,2,3},{1,2,3,4,5}", + "3,6,{2,3},{2,3,4,5,6}", + "4,7,{2,3,4},{3,4,5,6,7}", + "4,8,{3,4},{4,5,6,7,8}", + "4,9,{3,4},{5,6,7,8,9}", + "4,10,{3,4},{6,7,8,9,10}", + "5,11,{4,5},{7,8,9,10,11}", + "5,12,{4,5},{8,9,10,11,12}", + "5,13,{4,5},{9,10,11,12,13}", + "5,14,{4,5},{10,11,12,13,14}", + "5,15,{5},{11,12,13,14,15}" + ) + assertThat(sink.getAppendResults).isEqualTo(expected) + } } object OverAggregateITCase { diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala index 12ac1f73680..0c498882137 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/WindowAggregateITCase.scala @@ -988,6 +988,32 @@ class WindowAggregateITCase( } } + @TestTemplate + def testBitmapBuildAggOnEventTimeTumbleWindow(): Unit = { + val sql = + """ + |SELECT + | window_start, + | window_end, + | BITMAP_BUILD_AGG(`int`) as `bitmap` + |FROM TABLE( + | TUMBLE(TABLE T1_CDC, DESCRIPTOR(rowtime), INTERVAL '5' SECOND)) + |GROUP BY window_start, window_end + """.stripMargin + + val sink = new TestingAppendSink + tEnv.sqlQuery(sql).toDataStream.addSink(sink) + env.execute() + + val expected = List( + "2020-10-10T00:00,2020-10-10T00:00:05,{2,5,22}", + "2020-10-10T00:00:05,2020-10-10T00:00:10,{3,6}", + "2020-10-10T00:00:15,2020-10-10T00:00:20,{4}" + ) + val result = sink.getAppendResults.sorted + assertThat(result.mkString("\n")).isEqualTo(expected.mkString("\n")) + } + private def verifyWindowAgg( tvfFromClause: String, allExpectedData: Seq[String], diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala index 067bfd60905..29a6e80a2e7 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/table/AggregateITCase.scala @@ -606,4 +606,33 @@ class AggregateITCase(mode: StateBackendMode) extends StreamingWithStateTestBase } } } + + @TestTemplate + def testBitmapBuildAgg(): Unit = { + val data = new mutable.MutableList[(Int, Int, String)] + for (i <- 0 until 5) { + data.+=((i, -i, "a")) + data.+=((i * 2, -i * 2, "b")) + } + + val t = failingDataSource(data) + .toTable(tEnv, 'a, 'b, 'c) + .groupBy('c) + .select( + 'c, + 'a.bitmapBuildAgg().cast(DataTypes.STRING()), + 'b.bitmapBuildAgg().cast(DataTypes.STRING())) + + val sink = new TestingRetractSink + t.toRetractStream[Row].addSink(sink).setParallelism(1) + env.execute() + + val expected = List( + s"a,{0,1,2,3,4},{0,${Integer.toUnsignedLong(-4)},${Integer.toUnsignedLong(-3)},${Integer + .toUnsignedLong(-2)},${Integer.toUnsignedLong(-1)}}", + s"b,{0,2,4,6,8},{0,${Integer.toUnsignedLong(-8)},${Integer.toUnsignedLong(-6)},${Integer + .toUnsignedLong(-4)},${Integer.toUnsignedLong(-2)}}" + ) + assertThat(sink.getRetractResults.sorted).isEqualTo(expected.sorted) + } } diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapBuildAggFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapBuildAggFunction.java new file mode 100644 index 00000000000..4e0215019e8 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapBuildAggFunction.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.aggregate; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.bitmap.Bitmap; +import org.apache.flink.types.bitmap.RoaringBitmapData; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; + +import static org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType; + +/** Built-in BITMAP_BUILD_AGG aggregate function. */ +@Internal +public final class BitmapBuildAggFunction extends BuiltInAggregateFunction<Bitmap, Bitmap> { + + private final transient DataType valueDataType; + + public BitmapBuildAggFunction(LogicalType valueType) { + this.valueDataType = toInternalDataType(valueType); + } + + // -------------------------------------------------------------------------------------------- + // Planning + // -------------------------------------------------------------------------------------------- + + @Override + public List<DataType> getArgumentDataTypes() { + return Collections.singletonList(valueDataType); + } + + @Override + public DataType getAccumulatorDataType() { + return DataTypes.BITMAP().notNull(); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BITMAP(); + } + + // -------------------------------------------------------------------------------------------- + // Accumulator + // -------------------------------------------------------------------------------------------- + + @Override + public Bitmap createAccumulator() { + return Bitmap.empty(); + } + + public void resetAccumulator(Bitmap acc) { + acc.clear(); + } + + @Override + public Bitmap getValue(Bitmap acc) { + return acc.isEmpty() ? null : RoaringBitmapData.from(acc); + } + + // -------------------------------------------------------------------------------------------- + // Runtime + // -------------------------------------------------------------------------------------------- + + public void accumulate(Bitmap acc, @Nullable Integer value) { + if (value != null) { + acc.add(value); + } + } + + public void merge(Bitmap acc, Iterable<Bitmap> its) { + for (Bitmap other : its) { + acc.or(other); + } + } +} diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapBuildWithRetractAggFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapBuildWithRetractAggFunction.java new file mode 100644 index 00000000000..c9599b7f31e --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/aggregate/BitmapBuildWithRetractAggFunction.java @@ -0,0 +1,199 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.aggregate; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.dataview.MapView; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.types.bitmap.Bitmap; +import org.apache.flink.types.bitmap.RoaringBitmapData; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Objects; + +import static org.apache.flink.table.types.utils.DataTypeUtils.toInternalDataType; + +/** Built-in BITMAP_BUILD_AGG with retraction aggregate function. */ +@Internal +public final class BitmapBuildWithRetractAggFunction + extends BuiltInAggregateFunction< + Bitmap, BitmapBuildWithRetractAggFunction.BitmapBuildWithRetractAccumulator> { + + private final transient DataType valueDataType; + + public BitmapBuildWithRetractAggFunction(LogicalType valueType) { + this.valueDataType = toInternalDataType(valueType); + } + + // -------------------------------------------------------------------------------------------- + // Planning + // -------------------------------------------------------------------------------------------- + + @Override + public List<DataType> getArgumentDataTypes() { + return Collections.singletonList(valueDataType); + } + + @Override + public DataType getAccumulatorDataType() { + return DataTypes.STRUCTURED( + BitmapBuildWithRetractAccumulator.class, + DataTypes.FIELD("bitmap", DataTypes.BITMAP().notNull()), + DataTypes.FIELD( + "valueCount", + MapView.newMapViewDataType( + DataTypes.INT().notNull(), DataTypes.INT().notNull()))); + } + + @Override + public DataType getOutputDataType() { + return DataTypes.BITMAP(); + } + + // -------------------------------------------------------------------------------------------- + // Accumulator + // -------------------------------------------------------------------------------------------- + + /** Accumulator for BITMAP_BUILD_AGG with retraction. */ + public static class BitmapBuildWithRetractAccumulator { + + // bitmap should reflect the actual data based on valueCount + public RoaringBitmapData bitmap = RoaringBitmapData.empty(); + public MapView<Integer, Integer> valueCount = new MapView<>(); + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (obj == null || getClass() != obj.getClass()) { + return false; + } + BitmapBuildWithRetractAccumulator that = (BitmapBuildWithRetractAccumulator) obj; + return Objects.equals(bitmap, that.bitmap) + && Objects.equals(valueCount, that.valueCount); + } + + @Override + public int hashCode() { + return Objects.hash(bitmap, valueCount); + } + } + + @Override + public BitmapBuildWithRetractAccumulator createAccumulator() { + return new BitmapBuildWithRetractAccumulator(); + } + + public void resetAccumulator(BitmapBuildWithRetractAccumulator acc) { + acc.bitmap.clear(); + acc.valueCount.clear(); + } + + @Override + public Bitmap getValue(BitmapBuildWithRetractAccumulator acc) { + return acc.bitmap.isEmpty() ? null : RoaringBitmapData.from(acc.bitmap); + } + + // -------------------------------------------------------------------------------------------- + // Runtime + // -------------------------------------------------------------------------------------------- + + public void accumulate(BitmapBuildWithRetractAccumulator acc, @Nullable Integer value) + throws Exception { + if (value != null) { + Integer count = acc.valueCount.get(value); + count = count == null ? 1 : count + 1; + + if (count == 0) { + acc.valueCount.remove(value); + } else { + acc.valueCount.put(value, count); + // add value to bitmap if count changes from 0 to 1 or expires + if (count == 1) { + acc.bitmap.add(value); + } + } + } + } + + public void retract(BitmapBuildWithRetractAccumulator acc, @Nullable Integer value) + throws Exception { + if (value != null) { + Integer count = acc.valueCount.get(value); + count = count == null ? -1 : count - 1; + + if (count == 0) { + acc.valueCount.remove(value); + // remove value from bitmap if count changes from 1 to 0 + acc.bitmap.remove(value); + } else { + acc.valueCount.put(value, count); + if (count == -1) { + // remove value from bitmap if count expires + acc.bitmap.remove(value); + } + } + } + } + + public void merge( + BitmapBuildWithRetractAccumulator acc, Iterable<BitmapBuildWithRetractAccumulator> its) + throws Exception { + for (BitmapBuildWithRetractAccumulator other : its) { + for (Map.Entry<Integer, Integer> entry : other.valueCount.entries()) { + Integer value = entry.getKey(); + // count != 0 + Integer count = entry.getValue(); + + Integer curCount = acc.valueCount.get(value); + curCount = curCount == null ? count : curCount + count; + + if (curCount == 0) { + acc.valueCount.remove(value); + + if (curCount > count) { + // preCount > 0, value is in bitmap + acc.bitmap.remove(value); + } + } else { + acc.valueCount.put(value, curCount); + + if (0 < curCount && curCount <= count) { + // preCount < 0, value is not in bitmap + // preCount = 0, unknown (expiration) + acc.bitmap.add(value); + } + + if (0 > curCount && curCount >= count) { + // preCount > 0, value is in bitmap + // preCount = 0, unknown (expiration) + acc.bitmap.remove(value); + } + } + } + } + } +}
