This is an automated email from the ASF dual-hosted git repository.
kxiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 9e426b4735e [Fix](Variant) support materialize view for variant and
accessing variant subcolumns (#30603)
9e426b4735e is described below
commit 9e426b4735e21596051b208fc6fd24cc6091f527
Author: lihangyu <[email protected]>
AuthorDate: Tue Feb 6 18:14:18 2024 +0800
[Fix](Variant) support materialize view for variant and accessing variant
subcolumns (#30603)
* [Fix](Variant) support materialize view for variant and accessing variant
subcolumns
1. fix schema change with path lost and lead to invalid data read
2. support element_at function in BE side and use simdjson to parse data
3. fix multi slot expression
---
be/src/olap/rowset/segment_v2/segment.cpp | 4 +-
be/src/vec/columns/column_object.cpp | 5 +
be/src/vec/columns/column_object.h | 2 +
be/src/vec/functions/function_variant_element.cpp | 178 +++++++++++++++++++++
be/src/vec/functions/simple_function_factory.h | 10 +-
.../main/java/org/apache/doris/common/Config.java | 2 -
.../java/org/apache/doris/analysis/Analyzer.java | 2 +-
.../glue/translator/PhysicalPlanTranslator.java | 31 ++--
.../rules/expression/ExpressionOptimization.java | 4 +-
.../rules/expression/rules/ElementAtToSlot.java | 89 -----------
.../rules/expression/rules/FunctionBinder.java | 5 +-
.../expressions/functions/scalar/ElementAt.java | 4 +-
.../scalar/PushDownToProjectionFunction.java | 50 +++++-
.../java/org/apache/doris/qe/SessionVariable.java | 5 +
.../doris/rewrite/ElementAtToSlotRefRule.java | 6 -
regression-test/data/variant_p0/mv/multi_slot.out | 43 +++++
.../suites/variant_p0/mv/multi_slot.groovy | 93 +++++++++++
.../variant_p0/schema_change/schema_change.groovy | 4 +-
18 files changed, 408 insertions(+), 129 deletions(-)
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp
b/be/src/olap/rowset/segment_v2/segment.cpp
index 68807b8473b..255c27243a5 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -449,8 +449,8 @@ Status Segment::new_column_iterator_with_path(const
TabletColumn& tablet_column,
// Alter table operation should read the whole variant column, since
it does not aware of
// subcolumns of variant during processing rewriting rowsets.
// This is slow, since it needs to read all sub columns and merge them
into a single column
- RETURN_IF_ERROR(HierarchicalDataReader::create(iter,
tablet_column.path_info(), node, root,
- output_as_raw_json));
+ RETURN_IF_ERROR(
+ HierarchicalDataReader::create(iter, root_path, node, root,
output_as_raw_json));
return Status::OK();
}
diff --git a/be/src/vec/columns/column_object.cpp
b/be/src/vec/columns/column_object.cpp
index aff38c56a80..552ad31809a 100644
--- a/be/src/vec/columns/column_object.cpp
+++ b/be/src/vec/columns/column_object.cpp
@@ -586,6 +586,11 @@ ColumnObject::ColumnObject(bool is_nullable_, bool
create_root_)
}
}
+ColumnObject::ColumnObject(bool is_nullable_, DataTypePtr type,
MutableColumnPtr&& column)
+ : is_nullable(is_nullable_) {
+ add_sub_column({}, std::move(column), type);
+}
+
ColumnObject::ColumnObject(Subcolumns&& subcolumns_, bool is_nullable_)
: is_nullable(is_nullable_),
subcolumns(std::move(subcolumns_)),
diff --git a/be/src/vec/columns/column_object.h
b/be/src/vec/columns/column_object.h
index 7f328992a25..407419ff5c4 100644
--- a/be/src/vec/columns/column_object.h
+++ b/be/src/vec/columns/column_object.h
@@ -231,6 +231,8 @@ public:
explicit ColumnObject(bool is_nullable_, bool create_root = true);
+ explicit ColumnObject(bool is_nullable_, DataTypePtr type,
MutableColumnPtr&& column);
+
ColumnObject(Subcolumns&& subcolumns_, bool is_nullable_);
~ColumnObject() override = default;
diff --git a/be/src/vec/functions/function_variant_element.cpp
b/be/src/vec/functions/function_variant_element.cpp
new file mode 100644
index 00000000000..89256635279
--- /dev/null
+++ b/be/src/vec/functions/function_variant_element.cpp
@@ -0,0 +1,178 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <glog/logging.h>
+#include <stddef.h>
+
+#include <memory>
+#include <ostream>
+#include <string>
+#include <string_view>
+#include <utility>
+#include <vector>
+
+#include "common/status.h"
+#include "exprs/json_functions.h"
+#include "simdjson.h"
+#include "vec/columns/column.h"
+#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_object.h"
+#include "vec/columns/column_string.h"
+#include "vec/common/assert_cast.h"
+#include "vec/common/string_ref.h"
+#include "vec/core/block.h"
+#include "vec/data_types/data_type.h"
+#include "vec/data_types/data_type_nothing.h"
+#include "vec/data_types/data_type_nullable.h"
+#include "vec/data_types/data_type_object.h"
+#include "vec/data_types/data_type_string.h"
+#include "vec/functions/function.h"
+#include "vec/functions/function_helpers.h"
+#include "vec/functions/simple_function_factory.h"
+
+namespace doris::vectorized {
+
+class FunctionVariantElement : public IFunction {
+public:
+ static constexpr auto name = "element_at";
+ static FunctionPtr create() { return
std::make_shared<FunctionVariantElement>(); }
+
+ // Get function name.
+ String get_name() const override { return name; }
+
+ bool use_default_implementation_for_nulls() const override { return true; }
+
+ size_t get_number_of_arguments() const override { return 2; }
+
+ ColumnNumbers get_arguments_that_are_always_constant() const override {
return {1}; }
+
+ DataTypes get_variadic_argument_types_impl() const override {
+ return {std::make_shared<vectorized::DataTypeObject>(),
std::make_shared<DataTypeString>()};
+ }
+
+ DataTypePtr get_return_type_impl(const DataTypes& arguments) const
override {
+
DCHECK((WhichDataType(remove_nullable(arguments[0]))).is_variant_type())
+ << "First argument for function: " << name
+ << " should be DataTypeObject but it has type " <<
arguments[0]->get_name() << ".";
+ DCHECK(is_string(arguments[1]))
+ << "Second argument for function: " << name << " should be
String but it has type "
+ << arguments[1]->get_name() << ".";
+ return make_nullable(std::make_shared<DataTypeObject>());
+ }
+
+ Status execute_impl(FunctionContext* context, Block& block, const
ColumnNumbers& arguments,
+ size_t result, size_t input_rows_count) const override
{
+ const auto* variant_col = check_and_get_column<ColumnObject>(
+ block.get_by_position(arguments[0]).column.get());
+ if (!variant_col) {
+ return Status::RuntimeError(
+ fmt::format("unsupported types for function {}({}, {})",
get_name(),
+
block.get_by_position(arguments[0]).type->get_name(),
+
block.get_by_position(arguments[1]).type->get_name()));
+ }
+ if (block.empty()) {
+ block.replace_by_position(result,
block.get_by_position(result).type->create_column());
+ return Status::OK();
+ }
+
+ auto index_column = block.get_by_position(arguments[1]).column;
+ ColumnPtr result_column;
+ RETURN_IF_ERROR(get_element_column(*variant_col, index_column,
&result_column));
+ block.replace_by_position(result, result_column);
+ return Status::OK();
+ }
+
+private:
+ static Status get_element_column(const ColumnObject& src, const ColumnPtr&
index_column,
+ ColumnPtr* result) {
+ std::string field_name = index_column->get_data_at(0).to_string();
+ if (src.empty()) {
+ *result = ColumnObject::create(true);
+ return Status::OK();
+ }
+ if (src.is_scalar_variant() &&
+
WhichDataType(remove_nullable(src.get_root_type())).is_string_or_fixed_string())
{
+ // use parser to extract from root
+ auto type = std::make_shared<DataTypeString>();
+ MutableColumnPtr result_column = type->create_column();
+ const ColumnString& docs =
+
*check_and_get_column<ColumnString>(remove_nullable(src.get_root()).get());
+ simdjson::ondemand::parser parser;
+ std::vector<JsonPath> parsed_paths;
+ if (field_name.empty() || field_name[0] != '$') {
+ field_name = "$." + field_name;
+ }
+ JsonFunctions::parse_json_paths(field_name, &parsed_paths);
+ for (size_t i = 0; i < docs.size(); ++i) {
+ if (!extract_from_document(parser, docs.get_data_at(i),
parsed_paths,
+
assert_cast<ColumnString*>(result_column.get()))) {
+ VLOG_DEBUG << "failed to parse " << docs.get_data_at(i) <<
", field "
+ << field_name;
+ result_column->insert_default();
+ }
+ }
+ *result = ColumnObject::create(true, type,
std::move(result_column));
+ return Status::OK();
+ } else {
+ return Status::NotSupported("Not support element_at with none
scalar variant {}",
+ src.debug_string());
+ }
+ }
+
+ static Status extract_from_document(simdjson::ondemand::parser& parser,
const StringRef& doc,
+ const std::vector<JsonPath>& paths,
ColumnString* column) {
+ try {
+ simdjson::padded_string json_str {doc.data, doc.size};
+ simdjson::ondemand::document doc = parser.iterate(json_str);
+ simdjson::ondemand::object object = doc.get_object();
+ simdjson::ondemand::value value;
+ RETURN_IF_ERROR(JsonFunctions::extract_from_object(object, paths,
&value));
+ _write_data_to_column(value, column);
+ } catch (simdjson::simdjson_error& e) {
+ VLOG_DEBUG << "simdjson parse exception: " << e.what();
+ return Status::DataQualityError("simdjson parse exception {}",
e.what());
+ }
+ return Status::OK();
+ }
+
+ static void _write_data_to_column(simdjson::ondemand::value& value,
ColumnString* column) {
+ switch (value.type()) {
+ case simdjson::ondemand::json_type::null: {
+ column->insert_default();
+ break;
+ }
+ case simdjson::ondemand::json_type::boolean: {
+ if (value.get_bool()) {
+ column->insert_data("1", 1);
+ } else {
+ column->insert_data("0", 1);
+ }
+ break;
+ }
+ default: {
+ auto value_str = simdjson::to_json_string(value).value();
+ column->insert_data(value_str.data(), value_str.length());
+ }
+ }
+ }
+};
+
+void register_function_variant_element(SimpleFunctionFactory& factory) {
+ factory.register_function<FunctionVariantElement>();
+}
+
+} // namespace doris::vectorized
diff --git a/be/src/vec/functions/simple_function_factory.h
b/be/src/vec/functions/simple_function_factory.h
index 5c47198fd6b..9bedc204cb0 100644
--- a/be/src/vec/functions/simple_function_factory.h
+++ b/be/src/vec/functions/simple_function_factory.h
@@ -90,6 +90,7 @@ void register_function_array(SimpleFunctionFactory& factory);
void register_function_map(SimpleFunctionFactory& factory);
void register_function_struct(SimpleFunctionFactory& factory);
void register_function_struct_element(SimpleFunctionFactory& factory);
+void register_function_variant_element(SimpleFunctionFactory& factory);
void register_function_geo(SimpleFunctionFactory& factory);
void register_function_multi_string_position(SimpleFunctionFactory& factory);
void register_function_multi_string_search(SimpleFunctionFactory& factory);
@@ -178,8 +179,12 @@ public:
auto iter = function_creators.find(key_str);
if (iter == function_creators.end()) {
- LOG(WARNING) << fmt::format("Function signature {} is not found",
key_str);
- return nullptr;
+ // use original name as signature without variadic arguments
+ iter = function_creators.find(name);
+ if (iter == function_creators.end()) {
+ LOG(WARNING) << fmt::format("Function signature {} is not
found", key_str);
+ return nullptr;
+ }
}
return iter->second()->build(arguments, return_type);
@@ -280,6 +285,7 @@ public:
register_function_ip(instance);
register_function_tokenize(instance);
register_function_ignore(instance);
+ register_function_variant_element(instance);
});
return instance;
}
diff --git a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
index be58c139852..a7f642944ed 100644
--- a/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
+++ b/fe/fe-common/src/main/java/org/apache/doris/common/Config.java
@@ -2521,8 +2521,6 @@ public class Config extends ConfigBase {
@ConfField
public static String cloud_sql_server_cluster_id =
"RESERVED_CLUSTER_ID_FOR_SQL_SERVER";
- @ConfField(mutable = true)
- public static boolean enable_variant_access_in_original_planner = false;
//==========================================================================
// end of cloud config
//==========================================================================
diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
index 38b7403960c..0b4317f8714 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/Analyzer.java
@@ -1039,7 +1039,7 @@ public class Analyzer {
LOG.debug("register column ref table {}, colName {}, col {}", tblName,
colName, col.toSql());
if (col.getType().isVariantType() || (subColNames != null &&
!subColNames.isEmpty())) {
- if (!Config.enable_variant_access_in_original_planner
+ if (getContext() != null &&
!getContext().getSessionVariable().enableVariantAccessInOriginalPlanner
&& (subColNames != null && !subColNames.isEmpty())) {
ErrorReport.reportAnalysisException("Variant sub-column access
is disabled in original planner,"
+ "set enable_variant_access_in_original_planner =
true in session variable");
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index cc8b2483ff8..94b2f78cf11 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -1657,28 +1657,31 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
return inputFragment;
}
- // Get top most PushDownToProjectionFunction from expression
- private Expression getOriginalFunctionForRewritten(NamedExpression
expression) {
- List<Expression> targetExpr =
expression.collectFirst(PushDownToProjectionFunction.class::isInstance);
- if (!targetExpr.isEmpty()) {
- return targetExpr.get(0);
- }
- return null;
+ // collect all valid PushDownToProjectionFunction from expression
+ private List<Expression>
getPushDownToProjectionFunctionForRewritten(NamedExpression expression) {
+ List<Expression> targetExprList =
expression.collectToList(PushDownToProjectionFunction.class::isInstance);
+ return targetExprList.stream()
+ .filter(PushDownToProjectionFunction::validToPushDown)
+ .collect(Collectors.toList());
}
// register rewritten slots from original PushDownToProjectionFunction
private void registerRewrittenSlot(PhysicalProject<? extends Plan>
project, OlapScanNode olapScanNode) {
// register slots that are rewritten from element_at/etc..
- for (NamedExpression expr : project.getProjects()) {
+ List<Expression> allPushDownProjectionFunctions =
project.getProjects().stream()
+ .map(this::getPushDownToProjectionFunctionForRewritten)
+ .flatMap(List::stream)
+ .collect(Collectors.toList());
+ for (Expression expr : allPushDownProjectionFunctions) {
+ PushDownToProjectionFunction function =
(PushDownToProjectionFunction) expr;
if (context != null
&& context.getConnectContext() != null
&& context.getConnectContext().getStatementContext() !=
null) {
- Slot rewrittenSlot = context.getConnectContext()
-
.getStatementContext().getRewrittenSlotRefByOriginalExpr(getOriginalFunctionForRewritten(expr));
- if (rewrittenSlot != null) {
- TupleDescriptor tupleDescriptor =
context.getTupleDesc(olapScanNode.getTupleId());
- context.createSlotDesc(tupleDescriptor, (SlotReference)
rewrittenSlot);
- }
+ Slot argumentSlot =
function.getInputSlots().stream().findFirst().get();
+ Expression rewrittenSlot =
PushDownToProjectionFunction.rewriteToSlot(
+ function, (SlotReference) argumentSlot);
+ TupleDescriptor tupleDescriptor =
context.getTupleDesc(olapScanNode.getTupleId());
+ context.createSlotDesc(tupleDescriptor, (SlotReference)
rewrittenSlot);
}
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/ExpressionOptimization.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/ExpressionOptimization.java
index 6064a8d210a..e7b3a308f0f 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/ExpressionOptimization.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/ExpressionOptimization.java
@@ -21,7 +21,6 @@ import
org.apache.doris.nereids.rules.expression.rules.ArrayContainToArrayOverla
import org.apache.doris.nereids.rules.expression.rules.CaseWhenToIf;
import org.apache.doris.nereids.rules.expression.rules.DateFunctionRewrite;
import org.apache.doris.nereids.rules.expression.rules.DistinctPredicatesRule;
-import org.apache.doris.nereids.rules.expression.rules.ElementAtToSlot;
import org.apache.doris.nereids.rules.expression.rules.ExtractCommonFactorRule;
import org.apache.doris.nereids.rules.expression.rules.OrToIn;
import
org.apache.doris.nereids.rules.expression.rules.SimplifyComparisonPredicate;
@@ -49,8 +48,7 @@ public class ExpressionOptimization extends ExpressionRewrite
{
OrToIn.INSTANCE,
ArrayContainToArrayOverlap.INSTANCE,
CaseWhenToIf.INSTANCE,
- TopnToMax.INSTANCE,
- ElementAtToSlot.INSTANCE
+ TopnToMax.INSTANCE
);
private static final ExpressionRuleExecutor EXECUTOR = new
ExpressionRuleExecutor(OPTIMIZE_REWRITE_RULES);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/ElementAtToSlot.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/ElementAtToSlot.java
deleted file mode 100644
index adc050c9871..00000000000
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/ElementAtToSlot.java
+++ /dev/null
@@ -1,89 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-package org.apache.doris.nereids.rules.expression.rules;
-
-import org.apache.doris.nereids.StatementContext;
-import org.apache.doris.nereids.rules.expression.ExpressionRewriteContext;
-import org.apache.doris.nereids.rules.expression.ExpressionRewriteRule;
-import org.apache.doris.nereids.trees.expressions.Expression;
-import org.apache.doris.nereids.trees.expressions.SlotReference;
-import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
-import org.apache.doris.nereids.trees.expressions.functions.scalar.ElementAt;
-import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
-import
org.apache.doris.nereids.trees.expressions.visitor.DefaultExpressionRewriter;
-import org.apache.doris.qe.ConnectContext;
-
-import java.util.List;
-import java.util.Optional;
-import java.util.stream.Collectors;
-
-/**
- * Transform element_at function to SlotReference for variant sub-column
access.
- * This optimization will help query engine to prune as many sub columns as
possible
- * to speed up query.
- * eg: element_at(element_at(v, "a"), "b") -> SlotReference(column=v,
subColLabels=["a", "b"])
- */
-public class ElementAtToSlot extends
DefaultExpressionRewriter<ExpressionRewriteContext> implements
- ExpressionRewriteRule<ExpressionRewriteContext> {
-
- public static final ElementAtToSlot INSTANCE = new ElementAtToSlot();
-
- @Override
- public Expression rewrite(Expression expr, ExpressionRewriteContext ctx) {
- return expr.accept(this, ctx);
- }
-
- /**
- * Rewrites an {@link ElementAt} instance to a {@link SlotReference}.
- * This method is used to transform an ElementAt expr into a SlotReference,
- * based on the provided topColumnSlot and the context of the statement.
- *
- * @param elementAt The {@link ElementAt} instance to be rewritten.
- * @param topColumnSlot The {@link SlotReference} that represents the top
column slot.
- * @return A {@link SlotReference} that represents the rewritten element.
- * If a target column slot is found in the context, it is returned
to avoid duplicates.
- * Otherwise, a new SlotReference is created and added to the
context.
- */
- public static Expression rewriteToSlot(ElementAt elementAt, SlotReference
topColumnSlot) {
- // rewrite to slotRef
- StatementContext ctx = ConnectContext.get().getStatementContext();
- List<String> fullPaths = elementAt.collectToList(node -> node
instanceof VarcharLiteral).stream()
- .map(node -> ((VarcharLiteral) node).getValue())
- .collect(Collectors.toList());
- SlotReference targetColumnSlot = ctx.getPathSlot(topColumnSlot,
fullPaths);
- if (targetColumnSlot != null) {
- // avoid duplicated slots
- return targetColumnSlot;
- }
- SlotReference slotRef = new
SlotReference(StatementScopeIdGenerator.newExprId(),
- topColumnSlot.getName(), topColumnSlot.getDataType(),
- topColumnSlot.nullable(), topColumnSlot.getQualifier(),
topColumnSlot.getTable().get(),
- topColumnSlot.getColumn().get(),
Optional.of(topColumnSlot.getInternalName()),
- fullPaths);
- ctx.addPathSlotRef(topColumnSlot, fullPaths, slotRef, elementAt);
- ctx.addSlotToRelation(slotRef, ctx.getRelationBySlot(topColumnSlot));
-
- return slotRef;
- }
-
- @Override
- public Expression visitElementAt(ElementAt elementAt,
ExpressionRewriteContext context) {
- // todo
- return elementAt;
- }
-}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/FunctionBinder.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/FunctionBinder.java
index f60f38f7649..bf0812cdc21 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/FunctionBinder.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/rules/FunctionBinder.java
@@ -205,15 +205,14 @@ public class FunctionBinder extends
AbstractExpressionRewriteRule {
if (ConnectContext.get() != null
&& ConnectContext.get().getSessionVariable() != null
&&
!ConnectContext.get().getSessionVariable().isEnableRewriteElementAtToSlot()) {
- throw new AnalysisException(
- "set enable_rewrite_element_at_to_slot=true when using
element_at function for variant type");
+ return boundFunction;
}
Slot slot = elementAt.getInputSlots().stream().findFirst().get();
if (slot.hasUnbound()) {
slot = (Slot) super.visit(slot, context);
}
// rewrite to slot and bound this slot
- return ElementAtToSlot.rewriteToSlot(elementAt, (SlotReference)
slot);
+ return PushDownToProjectionFunction.rewriteToSlot(elementAt,
(SlotReference) slot);
}
return boundFunction;
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/ElementAt.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/ElementAt.java
index 6bd5f1bd8e9..d4fe6d50438 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/ElementAt.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/ElementAt.java
@@ -39,8 +39,8 @@ import java.util.List;
/**
* ScalarFunction 'element_at'. This class is generated by GenerateFunction.
*/
-public class ElementAt extends ScalarFunction
- implements BinaryExpression, ExplicitlyCastableSignature,
AlwaysNullable, PushDownToProjectionFunction {
+public class ElementAt extends PushDownToProjectionFunction
+ implements BinaryExpression, ExplicitlyCastableSignature,
AlwaysNullable {
public static final List<FunctionSignature> SIGNATURES = ImmutableList.of(
FunctionSignature.ret(new FollowToAnyDataType(0))
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/PushDownToProjectionFunction.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/PushDownToProjectionFunction.java
index e2473ea0954..362a84bc943 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/PushDownToProjectionFunction.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/scalar/PushDownToProjectionFunction.java
@@ -17,24 +17,68 @@
package org.apache.doris.nereids.trees.expressions.functions.scalar;
+import org.apache.doris.nereids.StatementContext;
import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.expressions.StatementScopeIdGenerator;
+import org.apache.doris.nereids.trees.expressions.literal.VarcharLiteral;
+import org.apache.doris.qe.ConnectContext;
+import java.util.List;
+import java.util.Optional;
import java.util.stream.Collectors;
/**
* Function that could be rewritten and pushed down to projection
*/
-public interface PushDownToProjectionFunction {
+public abstract class PushDownToProjectionFunction extends ScalarFunction {
+ public PushDownToProjectionFunction(String name, Expression... arguments) {
+ super(name, arguments);
+ }
+
/**
* check if specified function could be pushed down to project
* @param pushDownExpr expr to check
* @return if it is valid to push down input expr
*/
- static boolean validToPushDown(Expression pushDownExpr) {
+ public static boolean validToPushDown(Expression pushDownExpr) {
// Currently only element at for variant type could be pushed down
- return !pushDownExpr.collectToList(
+ return pushDownExpr != null && !pushDownExpr.collectToList(
PushDownToProjectionFunction.class::isInstance).stream().filter(
x -> ((Expression)
x).getDataType().isVariantType()).collect(
Collectors.toList()).isEmpty();
}
+
+ /**
+ * Rewrites an {@link PushDownToProjectionFunction} instance to a {@link
SlotReference}.
+ * This method is used to transform an PushDownToProjectionFunction expr
into a SlotReference,
+ * based on the provided topColumnSlot and the context of the statement.
+ *
+ * @param pushedFunction The {@link PushDownToProjectionFunction} instance
to be rewritten.
+ * @param topColumnSlot The {@link SlotReference} that represents the top
column slot.
+ * @return A {@link SlotReference} that represents the rewritten element.
+ * If a target column slot is found in the context, it is returned
to avoid duplicates.
+ * Otherwise, a new SlotReference is created and added to the
context.
+ */
+ public static Expression rewriteToSlot(PushDownToProjectionFunction
pushedFunction, SlotReference topColumnSlot) {
+ // rewrite to slotRef
+ StatementContext ctx = ConnectContext.get().getStatementContext();
+ List<String> fullPaths = pushedFunction.collectToList(node -> node
instanceof VarcharLiteral).stream()
+ .map(node -> ((VarcharLiteral) node).getValue())
+ .collect(Collectors.toList());
+ SlotReference targetColumnSlot = ctx.getPathSlot(topColumnSlot,
fullPaths);
+ if (targetColumnSlot != null) {
+ // avoid duplicated slots
+ return targetColumnSlot;
+ }
+ SlotReference slotRef = new
SlotReference(StatementScopeIdGenerator.newExprId(),
+ topColumnSlot.getName(), topColumnSlot.getDataType(),
+ topColumnSlot.nullable(), topColumnSlot.getQualifier(),
topColumnSlot.getTable().get(),
+ topColumnSlot.getColumn().get(),
Optional.of(topColumnSlot.getInternalName()),
+ fullPaths);
+ ctx.addPathSlotRef(topColumnSlot, fullPaths, slotRef, pushedFunction);
+ ctx.addSlotToRelation(slotRef, ctx.getRelationBySlot(topColumnSlot));
+
+ return slotRef;
+ }
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
index 75e94c0c135..6cbca07c61c 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java
@@ -186,6 +186,8 @@ public class SessionVariable implements Serializable,
Writable {
public static final String DELETE_WITHOUT_PARTITION =
"delete_without_partition";
+ public static final String ENABLE_VARIANT_ACCESS_IN_ORIGINAL_PLANNER =
"enable_variant_access_in_original_planner";
+
// set the default parallelism for send batch when execute InsertStmt
operation,
// if the value for parallelism exceed
`max_send_batch_parallelism_per_job` in BE config,
// then the coordinator be will use the value of
`max_send_batch_parallelism_per_job`
@@ -793,6 +795,9 @@ public class SessionVariable implements Serializable,
Writable {
@VariableMgr.VarAttr(name = SEND_BATCH_PARALLELISM, needForward = true)
public int sendBatchParallelism = 1;
+ @VariableMgr.VarAttr(name = ENABLE_VARIANT_ACCESS_IN_ORIGINAL_PLANNER)
+ public boolean enableVariantAccessInOriginalPlanner = false;
+
@VariableMgr.VarAttr(name = EXTRACT_WIDE_RANGE_EXPR, needForward = true)
public boolean extractWideRangeExpr = true;
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/rewrite/ElementAtToSlotRefRule.java
b/fe/fe-core/src/main/java/org/apache/doris/rewrite/ElementAtToSlotRefRule.java
index e6234b002f1..e1d32530ec9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/rewrite/ElementAtToSlotRefRule.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/rewrite/ElementAtToSlotRefRule.java
@@ -42,12 +42,6 @@ public class ElementAtToSlotRefRule implements
ExprRewriteRule {
@Override
public Expr apply(Expr expr, Analyzer analyzer, ClauseType clauseType)
throws AnalysisException {
- // Only check element at of variant all rewrited to slots
- List<Expr> elementAtFunctions = Lists.newArrayList();
- getElementAtFunction(expr, elementAtFunctions);
- if (!elementAtFunctions.isEmpty()) {
- throw new AnalysisException("element_at should not appear in
common rewrite stage");
- }
return expr;
}
diff --git a/regression-test/data/variant_p0/mv/multi_slot.out
b/regression-test/data/variant_p0/mv/multi_slot.out
new file mode 100644
index 00000000000..2499e282895
--- /dev/null
+++ b/regression-test/data/variant_p0/mv/multi_slot.out
@@ -0,0 +1,43 @@
+-- This file is automatically generated. You should know what you did if you
want to edit this
+-- !select_star --
+\N \N
+456 \N
+
+-- !select_star --
+\N \N
+\N \N
+1 \N
+1 \N
+1 \N
+3 \N
+5 \N
+
+-- !select_star --
+-4 {"k1":-4,"k2":-4,"k3":"d"}
+-5 {"k1":-4,"k2":-4,"k4":"d"}
+-6 {"k1":-4,"k2":-4,"k4":{"k44":789}}
+1 {"k1":1,"k2":1,"k3":"a"}
+2 {"k1":2,"k2":2,"k3":"b"}
+3 {"k1":-3,"k3":"c"}
+4 {"k1":-3,"k4":{"k44":456}}
+
+-- !select_star --
+\N \N
+456 \N
+789 \N
+
+-- !select_mv --
+\N \N
+1 \N
+3 \N
+5 \N
+
+-- !select_mv --
+\N \N
+\N \N
+1 \N
+1 \N
+1 \N
+3 \N
+5 \N
+
diff --git a/regression-test/suites/variant_p0/mv/multi_slot.groovy
b/regression-test/suites/variant_p0/mv/multi_slot.groovy
new file mode 100644
index 00000000000..98b8f7f549e
--- /dev/null
+++ b/regression-test/suites/variant_p0/mv/multi_slot.groovy
@@ -0,0 +1,93 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite ("multi_slot") {
+ sql """ DROP TABLE IF EXISTS multi_slot; """
+
+ sql """
+ create table multi_slot(
+ k int null,
+ v variant null
+ )
+ duplicate key (k)
+ distributed BY hash(k) buckets 3
+ properties("replication_num" = "1");
+ """
+
+ sql """insert into multi_slot select 1,'{"k1" : 1, "k2" : 1, "k3" :
"a"}';"""
+ sql """insert into multi_slot select 2,'{"k1" : 2, "k2" : 2, "k3" :
"b"}';"""
+ sql """insert into multi_slot select 3,'{"k1" : -3, "k2" : null, "k3" :
"c"}';"""
+ sql """insert into multi_slot select 4,'{"k1" : -3, "k2" : null, "k4" :
{"k44" : 456}}';"""
+ order_qt_select_star "select abs(cast(v['k4']['k44'] as int)),
sum(abs(cast(v['k2'] as int)+2)+cast(v['k3'] as int)+3) from multi_slot group
by abs(cast(v['k4']['k44'] as int))"
+
+ createMV ("create materialized view k1a2p2ap3p as select abs(cast(v['k1']
as int))+cast(v['k2'] as int)+1,abs(cast(v['k2'] as int)+2)+cast(v['k3'] as
int)+3 from multi_slot;")
+
+ createMV("create materialized view k1a2p2ap3ps as select abs(cast(v['k1']
as int))+cast(v['k2'] as int)+1,sum(abs(cast(v['k2'] as int)+2)+cast(v['k3'] as
int)+3) from multi_slot group by abs(cast(v['k1'] as int))+cast(v['k2'] as
int)+1;")
+
+ createMV("create materialized view k1a2p2ap3psp as select
abs(cast(v['k4']['k44'] as int)), sum(abs(cast(v['k2'] as int)+2)+cast(v['k3']
as int)+3) from multi_slot group by abs(cast(v['k4']['k44'] as int));")
+
+ sql """insert into multi_slot select -4,'{"k1" : -4, "k2" : -4, "k3" :
"d"}';"""
+ sql """insert into multi_slot select -5,'{"k1" : -4, "k2" : -4, "k4" :
"d"}';"""
+ sql """insert into multi_slot select -6,'{"k1" : -4, "k2" : -4, "k4" :
{"k44" : 789}}';"""
+
+ sql "SET experimental_enable_nereids_planner=true"
+ sql "SET enable_fallback_to_original_planner=false"
+
+
+ order_qt_select_star "select abs(cast(v['k1'] as int))+cast(v['k2'] as
int)+1,abs(cast(v['k2'] as int)+2)+cast(v['k3'] as int)+3 from multi_slot;"
+ order_qt_select_star "select * from multi_slot order by cast(v['k1'] as
int);"
+ // TODO fix and remove enable_rewrite_element_at_to_slot
+ order_qt_select_star "select
/*+SET_VAR(enable_rewrite_element_at_to_slot=false) */ abs(cast(v['k4']['k44']
as int)), sum(abs(cast(v['k2'] as int)+2)+cast(v['k3'] as int)+3) from
multi_slot group by abs(cast(v['k4']['k44'] as int))"
+
+ def retry_times = 60
+ for (def i = 0; i < retry_times; ++i) {
+ boolean is_k1a2p2ap3p = false
+ boolean is_k1a2p2ap3ps = false
+ boolean is_d_table = false
+ explain {
+ sql("select /*+SET_VAR(enable_rewrite_element_at_to_slot=false)
*/ abs(cast(v['k1'] as int))+cast(v['k2'] as int)+1,sum(abs(cast(v['k2'] as
int)+2)+cast(v['k3'] as int)+3) from multi_slot group by abs(cast(v['k1'] as
int))+cast(v['k2'] as int)+1 order by abs(cast(v['k1'] as int))+cast(v['k2'] as
int)+1")
+ check { explainStr, ex, startTime, endTime ->
+ if (ex != null) {
+ throw ex;
+ }
+ logger.info("explain result: ${explainStr}".toString())
+ is_k1a2p2ap3p = explainStr.contains"(k1a2p2ap3p)"
+ is_k1a2p2ap3ps = explainStr.contains("(k1a2p2ap3ps)")
+ is_d_table = explainStr.contains("(multi_slot)")
+ assert is_k1a2p2ap3p || is_k1a2p2ap3ps || is_d_table
+ }
+ }
+ // FIXME: the mv selector maybe select base table forever when exist
multi mv,
+ // so this pr just treat as success if select base table.
+ // we should remove is_d_table in the future
+ if (is_d_table || is_k1a2p2ap3p || is_k1a2p2ap3ps) {
+ break
+ }
+ if (i + 1 == retry_times) {
+ throw new IllegalStateException("retry and failed too much")
+ }
+ sleep(1000)
+ }
+ order_qt_select_mv "select
/*+SET_VAR(enable_rewrite_element_at_to_slot=false) */ abs(cast(v['k1'] as
int))+cast(v['k2'] as int)+1,sum(abs(cast(v['k2'] as int)+2)+cast(v['k3'] as
int)+3) from multi_slot group by abs(cast(v['k1'] as int))+cast(v['k2'] as
int)+1 order by abs(cast(v['k1'] as int))+cast(v['k2'] as int)+1;"
+
+ explain {
+ sql("select abs(cast(v['k1'] as int))+cast(v['k2'] as
int)+1,abs(cast(v['k2'] as int)+2)+cast(v['k3'] as int)+3 from multi_slot order
by abs(cast(v['k1'] as int))+cast(v['k2'] as int)+1,abs(cast(v['k2'] as
int)+2)+cast(v['k3'] as int)+3")
+ contains "(k1a2p2ap3p)"
+ }
+ order_qt_select_mv "select abs(cast(v['k1'] as int))+cast(v['k2'] as
int)+1,abs(cast(v['k2'] as int)+2)+cast(v['k3'] as int)+3 from multi_slot order
by abs(cast(v['k1'] as int))+cast(v['k2'] as int)+1,abs(cast(v['k2'] as
int)+2)+cast(v['k3'] as int)+3;"
+
+}
diff --git
a/regression-test/suites/variant_p0/schema_change/schema_change.groovy
b/regression-test/suites/variant_p0/schema_change/schema_change.groovy
index fe593553fda..42cef32c8e5 100644
--- a/regression-test/suites/variant_p0/schema_change/schema_change.groovy
+++ b/regression-test/suites/variant_p0/schema_change/schema_change.groovy
@@ -77,6 +77,6 @@ suite("regression_test_variant_schema_change",
"variant_type"){
sql """INSERT INTO ${table_name} SELECT k, v, v from ${table_name} limit
8101"""
sql """DROP MATERIALIZED VIEW var_cnt ON ${table_name}"""
sql """INSERT INTO ${table_name} SELECT k, v,v from ${table_name} limit
1111"""
- // TODO support select from mv
- // qt_sql """select v['k1'], cast(v['k2'] as string) from ${table_name}
order by k desc limit 10"""
+ // select from mv
+ qt_sql """select v['k1'], cast(v['k2'] as string) from ${table_name} order
by k desc limit 10"""
}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]