This is an automated email from the ASF dual-hosted git repository.
panxiaolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 18b5f70a7c0 [Bug](materialized-view) enable rewrite on select
materialized index with aggregate mode (#24691)
18b5f70a7c0 is described below
commit 18b5f70a7c0ee565df5208182d80b7a8d6d6ece9
Author: Pxl <[email protected]>
AuthorDate: Wed Sep 27 11:30:36 2023 +0800
[Bug](materialized-view) enable rewrite on select materialized index with
aggregate mode (#24691)
enable rewrite on select materialized index with aggregate mode
---
.../mv/SelectMaterializedIndexWithAggregate.java | 157 ++++++++-------------
.../test_o2.out} | 7 +-
.../testCountDistinctToBitmap.out | 8 ++
.../suites/mv_p0/test_o2/test_o2.groovy | 60 ++++++++
.../testCountDistinctToBitmap.groovy | 30 ++++
5 files changed, 156 insertions(+), 106 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/mv/SelectMaterializedIndexWithAggregate.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/mv/SelectMaterializedIndexWithAggregate.java
index 398f11b3765..abc85eb3f16 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/mv/SelectMaterializedIndexWithAggregate.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/mv/SelectMaterializedIndexWithAggregate.java
@@ -30,6 +30,7 @@ import org.apache.doris.nereids.parser.NereidsParser;
import org.apache.doris.nereids.rules.Rule;
import org.apache.doris.nereids.rules.RuleType;
import org.apache.doris.nereids.rules.rewrite.RewriteRuleFactory;
+import
org.apache.doris.nereids.rules.rewrite.mv.AbstractSelectMaterializedIndexRule.SlotContext;
import org.apache.doris.nereids.trees.expressions.Alias;
import org.apache.doris.nereids.trees.expressions.Cast;
import org.apache.doris.nereids.trees.expressions.ExprId;
@@ -65,6 +66,7 @@ import
org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
import org.apache.doris.nereids.trees.plans.logical.LogicalRepeat;
import org.apache.doris.nereids.types.BigIntType;
+import org.apache.doris.nereids.types.DataType;
import org.apache.doris.nereids.types.VarcharType;
import org.apache.doris.nereids.util.ExpressionUtils;
import org.apache.doris.planner.PlanNode;
@@ -657,12 +659,8 @@ public class SelectMaterializedIndexWithAggregate extends
AbstractSelectMaterial
* 2. filter indexes that have all the required columns.
* 3. select best index from all the candidate indexes that could use.
*/
- private SelectResult select(
- LogicalOlapScan scan,
- Set<Slot> requiredScanOutput,
- Set<Expression> predicates,
- List<AggregateFunction> aggregateFunctions,
- List<Expression> groupingExprs,
+ private SelectResult select(LogicalOlapScan scan, Set<Slot>
requiredScanOutput, Set<Expression> predicates,
+ List<AggregateFunction> aggregateFunctions, List<Expression>
groupingExprs,
Set<? extends Expression> requiredExpr) {
// remove virtual slot for grouping sets.
Set<Slot> nonVirtualRequiredScanOutput = requiredScanOutput.stream()
@@ -677,105 +675,57 @@ public class SelectMaterializedIndexWithAggregate
extends AbstractSelectMaterial
}
OlapTable table = scan.getTable();
- switch (scan.getTable().getKeysType()) {
- case AGG_KEYS:
- case UNIQUE_KEYS:
- case DUP_KEYS:
- break;
- default:
- throw new RuntimeException("Not supported keys type: " +
scan.getTable().getKeysType());
- }
Map<Boolean, List<MaterializedIndex>> indexesGroupByIsBaseOrNot =
table.getVisibleIndex()
.stream()
.collect(Collectors.groupingBy(index -> index.getId() ==
table.getBaseIndexId()));
- if (table.isDupKeysOrMergeOnWrite()) {
- // Duplicate-keys table could use base index and indexes that
pre-aggregation status is on.
- Set<MaterializedIndex> candidatesWithoutRewriting =
- indexesGroupByIsBaseOrNot.getOrDefault(false,
ImmutableList.of())
- .stream()
- .filter(index -> checkPreAggStatus(scan,
index.getId(), predicates,
- aggregateFunctions, groupingExprs).isOn())
- .collect(Collectors.toSet());
-
- // try to rewrite bitmap, hll by materialized index columns.
- List<AggRewriteResult> candidatesWithRewriting =
indexesGroupByIsBaseOrNot.getOrDefault(false,
- ImmutableList.of())
- .stream()
- .filter(index ->
!candidatesWithoutRewriting.contains(index))
- .map(index -> rewriteAgg(index, scan,
nonVirtualRequiredScanOutput, predicates,
- aggregateFunctions,
- groupingExprs))
- .filter(aggRewriteResult -> checkPreAggStatus(scan,
aggRewriteResult.index.getId(),
- predicates,
- // check pre-agg status of aggregate function that
couldn't rewrite.
- aggFuncsDiff(aggregateFunctions, aggRewriteResult),
- groupingExprs).isOn())
- .filter(result -> result.success)
- .collect(Collectors.toList());
-
- List<MaterializedIndex> haveAllRequiredColumns = Streams.concat(
- candidatesWithoutRewriting.stream()
- .filter(index -> containAllRequiredColumns(index,
scan, nonVirtualRequiredScanOutput,
- requiredExpr, predicates)),
- candidatesWithRewriting.stream()
- .filter(aggRewriteResult ->
containAllRequiredColumns(aggRewriteResult.index, scan,
- aggRewriteResult.requiredScanOutput,
- requiredExpr.stream().map(e ->
aggRewriteResult.exprRewriteMap.replaceAgg(e))
- .collect(Collectors.toSet()),
- predicates))
- .map(aggRewriteResult -> aggRewriteResult.index))
- .collect(Collectors.toList());
-
- long selectIndexId = selectBestIndex(haveAllRequiredColumns, scan,
predicates);
- Optional<AggRewriteResult> rewriteResultOpt =
candidatesWithRewriting.stream()
- .filter(aggRewriteResult -> aggRewriteResult.index.getId()
== selectIndexId)
- .findAny();
- // Pre-aggregation is set to `on` by default for duplicate-keys
table.
- return new SelectResult(PreAggStatus.on(), selectIndexId,
- rewriteResultOpt.map(r -> r.exprRewriteMap).orElse(new
ExprRewriteMap()));
- } else {
- if (scan.getPreAggStatus().isOff()) {
- return new SelectResult(scan.getPreAggStatus(),
- scan.getTable().getBaseIndexId(), new
ExprRewriteMap());
- }
-
- Set<MaterializedIndex> candidatesWithoutRewriting = new
HashSet<>();
- for (MaterializedIndex index :
indexesGroupByIsBaseOrNot.getOrDefault(false, ImmutableList.of())) {
- final PreAggStatus preAggStatus;
- if (preAggEnabledByHint(scan)) {
- preAggStatus = PreAggStatus.on();
- } else {
- preAggStatus = checkPreAggStatus(scan, index.getId(),
predicates,
- aggregateFunctions, groupingExprs);
- }
+ Set<MaterializedIndex> candidatesWithoutRewriting =
indexesGroupByIsBaseOrNot
+ .getOrDefault(false, ImmutableList.of()).stream()
+ .filter(index -> preAggEnabledByHint(scan)
+ || checkPreAggStatus(scan, index.getId(), predicates,
aggregateFunctions, groupingExprs).isOn())
+ .collect(Collectors.toSet());
+
+ // try to rewrite bitmap, hll by materialized index columns.
+ List<AggRewriteResult> candidatesWithRewriting =
indexesGroupByIsBaseOrNot
+ .getOrDefault(false, ImmutableList.of()).stream()
+ .filter(index -> !candidatesWithoutRewriting.contains(index))
+ .map(index -> rewriteAgg(index, scan,
nonVirtualRequiredScanOutput, predicates, aggregateFunctions,
+ groupingExprs))
+ .filter(aggRewriteResult -> checkPreAggStatus(scan,
aggRewriteResult.index.getId(), predicates,
+ // check pre-agg status of aggregate function that
couldn't rewrite.
+ aggFuncsDiff(aggregateFunctions, aggRewriteResult),
groupingExprs).isOn())
+ .filter(result -> result.success).collect(Collectors.toList());
+
+ List<MaterializedIndex> haveAllRequiredColumns = Streams.concat(
+ candidatesWithoutRewriting.stream()
+ .filter(index -> containAllRequiredColumns(index,
scan, nonVirtualRequiredScanOutput,
+ requiredExpr, predicates)),
+ candidatesWithRewriting.stream()
+ .filter(aggRewriteResult ->
containAllRequiredColumns(aggRewriteResult.index, scan,
+ aggRewriteResult.requiredScanOutput,
+ requiredExpr.stream().map(e ->
aggRewriteResult.exprRewriteMap.replaceAgg(e))
+ .collect(Collectors.toSet()),
+ predicates))
+ .map(aggRewriteResult -> aggRewriteResult.index))
+ .collect(Collectors.toList());
- if (preAggStatus.isOn()) {
- candidatesWithoutRewriting.add(index);
- }
- }
- SelectResult baseIndexSelectResult = new SelectResult(
- checkPreAggStatus(scan, scan.getTable().getBaseIndexId(),
- predicates, aggregateFunctions, groupingExprs),
- scan.getTable().getBaseIndexId(), new ExprRewriteMap());
- if (candidatesWithoutRewriting.isEmpty()) {
- // return early if pre agg status if off.
- return baseIndexSelectResult;
- } else {
- List<MaterializedIndex> rollupsWithAllRequiredCols =
- Stream.concat(candidatesWithoutRewriting.stream(),
indexesGroupByIsBaseOrNot.get(true).stream())
- .filter(index ->
containAllRequiredColumns(index, scan, nonVirtualRequiredScanOutput,
- requiredExpr, predicates))
- .collect(Collectors.toList());
-
- long selectedIndex =
selectBestIndex(rollupsWithAllRequiredCols, scan, predicates);
- if (selectedIndex == scan.getTable().getBaseIndexId()) {
- return baseIndexSelectResult;
- }
- return new SelectResult(PreAggStatus.on(), selectedIndex, new
ExprRewriteMap());
+ long selectIndexId = selectBestIndex(haveAllRequiredColumns, scan,
predicates);
+ // Pre-aggregation is set to `on` by default for duplicate-keys table.
+ // In other cases where mv is not hit, preagg may turn off from on.
+ if (!table.isDupKeysOrMergeOnWrite() && (new CheckContext(scan,
selectIndexId)).isBaseIndex()) {
+ PreAggStatus preagg = scan.getPreAggStatus();
+ if (preagg.isOn()) {
+ preagg = checkPreAggStatus(scan,
scan.getTable().getBaseIndexId(), predicates, aggregateFunctions,
+ groupingExprs);
}
+ return new SelectResult(preagg, selectIndexId, new
ExprRewriteMap());
}
+
+ Optional<AggRewriteResult> rewriteResultOpt =
candidatesWithRewriting.stream()
+ .filter(aggRewriteResult -> aggRewriteResult.index.getId() ==
selectIndexId).findAny();
+ return new SelectResult(PreAggStatus.on(), selectIndexId,
+ rewriteResultOpt.map(r -> r.exprRewriteMap).orElse(new
ExprRewriteMap()));
}
private List<AggregateFunction> aggFuncsDiff(List<AggregateFunction>
aggregateFunctions,
@@ -1191,6 +1141,13 @@ public class SelectMaterializedIndexWithAggregate
extends AbstractSelectMaterial
}
}
+ private static Expression castIfNeed(Expression expr, DataType targetType)
{
+ if (expr.getDataType().equals(targetType)) {
+ return expr;
+ }
+ return new Cast(expr, targetType);
+ }
+
private static class AggFuncRewriter extends
DefaultExpressionRewriter<RewriteContext> {
public static final AggFuncRewriter INSTANCE = new AggFuncRewriter();
@@ -1212,7 +1169,7 @@ public class SelectMaterializedIndexWithAggregate extends
AbstractSelectMaterial
// count(distinct col) ->
bitmap_union_count(mv_bitmap_union_col)
Optional<Slot> slotOpt =
ExpressionUtils.extractSlotOrCastOnSlot(count.child(0));
- Expression expr = new ToBitmapWithCheck(new
Cast(count.child(0), BigIntType.INSTANCE));
+ Expression expr = new
ToBitmapWithCheck(castIfNeed(count.child(0), BigIntType.INSTANCE));
// count distinct a value column.
if (slotOpt.isPresent() &&
!context.checkContext.keyNameToColumn.containsKey(
normalizeName(expr.toSql()))) {
@@ -1425,7 +1382,7 @@ public class SelectMaterializedIndexWithAggregate extends
AbstractSelectMaterial
// ndv on a value column.
if (slotOpt.isPresent() &&
!context.checkContext.keyNameToColumn.containsKey(
normalizeName(slotOpt.get().toSql()))) {
- Expression expr = new Cast(ndv.child(),
VarcharType.SYSTEM_DEFAULT);
+ Expression expr = castIfNeed(ndv.child(),
VarcharType.SYSTEM_DEFAULT);
String hllUnionColumn = normalizeName(
CreateMaterializedViewStmt.mvColumnBuilder(AggregateType.HLL_UNION,
CreateMaterializedViewStmt.mvColumnBuilder(new
HllHash(expr).toSql())));
@@ -1459,7 +1416,7 @@ public class SelectMaterializedIndexWithAggregate extends
AbstractSelectMaterial
Optional<Slot> slotOpt =
ExpressionUtils.extractSlotOrCastOnSlot(sum.child(0));
if (!sum.isDistinct() && slotOpt.isPresent()
&&
!context.checkContext.keyNameToColumn.containsKey(normalizeName(slotOpt.get().toSql())))
{
- Expression expr = new Cast(sum.child(), BigIntType.INSTANCE);
+ Expression expr = castIfNeed(sum.child(), BigIntType.INSTANCE);
String sumColumn =
normalizeName(CreateMaterializedViewStmt.mvColumnBuilder(AggregateType.SUM,
CreateMaterializedViewStmt.mvColumnBuilder(expr.toSql())));
Column mvColumn = context.checkContext.getColumn(sumColumn);
diff --git
a/regression-test/data/mv_p0/ut/testCountDistinctToBitmap/testCountDistinctToBitmap.out
b/regression-test/data/mv_p0/test_o2/test_o2.out
similarity index 60%
copy from
regression-test/data/mv_p0/ut/testCountDistinctToBitmap/testCountDistinctToBitmap.out
copy to regression-test/data/mv_p0/test_o2/test_o2.out
index 88913c5b65d..af029e6857d 100644
---
a/regression-test/data/mv_p0/ut/testCountDistinctToBitmap/testCountDistinctToBitmap.out
+++ b/regression-test/data/mv_p0/test_o2/test_o2.out
@@ -1,9 +1,4 @@
-- This file is automatically generated. You should know what you did if you
want to edit this
--- !select_star --
-2020-01-01 1 a 1
-2020-01-01 1 a 2
-2020-01-02 2 b 2
-
-- !select_mv --
-1 2
+2023-08-16T22:27 ax asd 2
diff --git
a/regression-test/data/mv_p0/ut/testCountDistinctToBitmap/testCountDistinctToBitmap.out
b/regression-test/data/mv_p0/ut/testCountDistinctToBitmap/testCountDistinctToBitmap.out
index 88913c5b65d..e5696983e12 100644
---
a/regression-test/data/mv_p0/ut/testCountDistinctToBitmap/testCountDistinctToBitmap.out
+++
b/regression-test/data/mv_p0/ut/testCountDistinctToBitmap/testCountDistinctToBitmap.out
@@ -7,3 +7,11 @@
-- !select_mv --
1 2
+-- !select_star --
+2020-01-01 1 a 1
+2020-01-01 1 a 2
+2020-01-02 2 b 2
+
+-- !select_mv --
+1 2
+
diff --git a/regression-test/suites/mv_p0/test_o2/test_o2.groovy
b/regression-test/suites/mv_p0/test_o2/test_o2.groovy
new file mode 100644
index 00000000000..c9990253584
--- /dev/null
+++ b/regression-test/suites/mv_p0/test_o2/test_o2.groovy
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+import org.codehaus.groovy.runtime.IOGroovyMethods
+
+suite ("test_o2") {
+ sql """set enable_nereids_planner=true"""
+ sql """SET enable_fallback_to_original_planner=false"""
+ sql """ DROP TABLE IF EXISTS o2_order_events; """
+
+ sql """
+ CREATE TABLE `o2_order_events` (
+ `ts` datetime NULL,
+ `metric_name` varchar(20) NULL,
+ `city_id` int(11) NULL,
+ `platform` varchar(20) NULL,
+ `vendor_id` int(11) NULL,
+ `pos_id` int(11) NULL,
+ `is_instant_restaurant` boolean NULL,
+ `country_id` int(11) NULL,
+ `logistics_partner_id` int(11) NULL,
+ `rpf_order` int(11) NULL,
+ `rejected_message_id` int(11) NULL,
+ `count_value` int(11) SUM NULL DEFAULT "0"
+ ) ENGINE=OLAP
+ AGGREGATE KEY(`ts`, `metric_name`, `city_id`, `platform`, `vendor_id`,
`pos_id`, `is_instant_restaurant`, `country_id`, `logistics_partner_id`,
`rpf_order`, `rejected_message_id`)
+ COMMENT 'OLAP'
+ DISTRIBUTED BY HASH(`metric_name`, `platform`) BUCKETS 2
+ PROPERTIES (
+ "replication_allocation" = "tag.location.default: 1"
+ );
+ """
+
+ sql """insert into o2_order_events values ("2023-08-16 22:27:00
","ax",1,"asd",2,1,1,1,1,1,1,1);"""
+
+ createMV ("""
+ create materialized view o2_order_events_mv as select
ts,metric_name,platform,sum(count_value) from o2_order_events group by
ts,metric_name,platform;;""")
+
+ sql """insert into o2_order_events values ("2023-08-16 22:27:00
","ax",1,"asd",2,1,1,1,1,1,1,1);"""
+
+ explain {
+ sql("select ts,metric_name,platform,sum(count_value) from
o2_order_events group by ts,metric_name,platform;")
+ contains "(o2_order_events_mv)"
+ }
+ qt_select_mv "select ts,metric_name,platform,sum(count_value) from
o2_order_events group by ts,metric_name,platform;"
+}
diff --git
a/regression-test/suites/mv_p0/ut/testCountDistinctToBitmap/testCountDistinctToBitmap.groovy
b/regression-test/suites/mv_p0/ut/testCountDistinctToBitmap/testCountDistinctToBitmap.groovy
index 3957b26caba..0d046aa657d 100644
---
a/regression-test/suites/mv_p0/ut/testCountDistinctToBitmap/testCountDistinctToBitmap.groovy
+++
b/regression-test/suites/mv_p0/ut/testCountDistinctToBitmap/testCountDistinctToBitmap.groovy
@@ -47,4 +47,34 @@ suite ("testCountDistinctToBitmap") {
contains "(user_tags_mv)"
}
qt_select_mv "select user_id, count(distinct tag_id) a from user_tags
group by user_id having a>1 order by a;"
+
+
+ sql """ DROP TABLE IF EXISTS user_tags2; """
+
+ sql """ create table user_tags2 (
+ time_col date,
+ user_id bigint,
+ user_name varchar(20),
+ tag_id bigint)
+ partition by range (time_col) (partition p1 values less than
MAXVALUE) distributed by hash(time_col) buckets 3 properties('replication_num'
= '1');
+ """
+
+ sql """insert into user_tags2 values("2020-01-01",1,"a",1);"""
+ sql """insert into user_tags2 values("2020-01-02",2,"b",2);"""
+
+ createMV("create materialized view user_tags_mv as select user_id,
bitmap_union(to_bitmap(tag_id)) from user_tags2 group by user_id;")
+
+ sql """insert into user_tags2 values("2020-01-01",1,"a",2);"""
+
+ explain {
+ sql("select * from user_tags2 order by time_col;")
+ contains "(user_tags2)"
+ }
+ qt_select_star "select * from user_tags2 order by time_col,tag_id;"
+
+ explain {
+ sql("select user_id, count(distinct tag_id) a from user_tags2 group by
user_id having a>1 order by a;")
+ contains "(user_tags_mv)"
+ }
+ qt_select_mv "select user_id, count(distinct tag_id) a from user_tags2
group by user_id having a>1 order by a;"
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]