xiaokang commented on code in PR #40813: URL: https://github.com/apache/doris/pull/40813#discussion_r1825319468
########## regression-test/suites/inverted_index_p0/test_index_multi_topn.groovy: ########## @@ -0,0 +1,109 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + + +suite("test_index_multi_topn", "p0"){ + def indexTbNameV1 = "test_index_multi_topn_v1" + def indexTbNameV2 = "test_index_multi_topn_v2" + + sql "DROP TABLE IF EXISTS ${indexTbNameV1}" + sql "DROP TABLE IF EXISTS ${indexTbNameV2}" + + def create_table = {table_name, idx_version -> + sql """ + CREATE TABLE ${table_name} ( + `@timestamp` int(11) NULL COMMENT "", + `clientip` text NULL COMMENT "", + `request` text NULL COMMENT "", + `status` int NULL COMMENT "", + `size` int NULL COMMENT "" + ) ENGINE=OLAP + DUPLICATE KEY(`@timestamp`) + COMMENT "OLAP" + DISTRIBUTED BY RANDOM BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1", + "inverted_index_storage_format" = "${idx_version}", + "disable_auto_compaction" = "true" + ); + """ + } + + def load_httplogs_data = {table_name, label, read_flag, format_flag, file_name, ignore_failure=false, + expected_succ_rows = -1, load_to_single_tablet = 'true' -> + + // load the json data + streamLoad { + table "${table_name}" + + // set http request header params + set 'label', label + "_" + UUID.randomUUID().toString() + set 'read_json_by_line', read_flag + set 'format', format_flag + file file_name // import json file + time 10000 // limit inflight 10s + if (expected_succ_rows >= 0) { + set 'max_filter_ratio', '1' + } + + // if declared a check callback, the default check condition will ignore. + // So you must check all condition + check { result, exception, startTime, endTime -> + if (ignore_failure && expected_succ_rows < 0) { return } + if (exception != null) { + throw exception + } + log.info("Stream load result: ${result}".toString()) + def json = parseJson(result) + assertEquals("success", json.Status.toLowerCase()) + if (expected_succ_rows >= 0) { + assertEquals(json.NumberLoadedRows, expected_succ_rows) + } else { + assertEquals(json.NumberTotalRows, json.NumberLoadedRows + json.NumberUnselectedRows) + assertTrue(json.NumberLoadedRows > 0 && json.LoadBytes > 0) + } + } + } + } + + try { + create_table(indexTbNameV1, 'V1') + create_table(indexTbNameV2, 'V2') Review Comment: What's the difference between the two tables and why you need two tables? ########## be/src/vec/aggregate_functions/aggregate_function.h: ########## @@ -287,8 +288,14 @@ class IAggregateFunctionHelper : public IAggregateFunction { void add_batch_single_place(size_t batch_size, AggregateDataPtr place, const IColumn** columns, Arena* arena) const override { const Derived* derived = assert_cast<const Derived*>(this); - for (size_t i = 0; i < batch_size; ++i) { - derived->add(place, columns, i, arena); + + if constexpr (is_aggregate_function_multi_top<Derived>::value || Review Comment: trick ########## be/src/vec/aggregate_functions/aggregate_function_multi_top.h: ########## @@ -0,0 +1,52 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "vec/core/types.h" + +namespace doris::vectorized { + +class AggregateFunctionMultiTop { +public: + static inline constexpr UInt64 TOP_K_MAX_SIZE = 0xFFFFFF; +}; + +template <typename NestFunction, bool result_is_nullable> +class AggregateFunctionNullVariadicInline; + +#define REGISTER_AGGREGATE_FUNCTION(Name) \ + template <typename T> \ + struct is_aggregate_function_##Name : std::is_base_of<AggregateFunctionMultiTop, T> {} + +REGISTER_AGGREGATE_FUNCTION(MultiTopN); Review Comment: use consistent lower case name style ########## be/src/vec/aggregate_functions/aggregate_function.h: ########## @@ -287,8 +288,14 @@ class IAggregateFunctionHelper : public IAggregateFunction { void add_batch_single_place(size_t batch_size, AggregateDataPtr place, const IColumn** columns, Arena* arena) const override { const Derived* derived = assert_cast<const Derived*>(this); - for (size_t i = 0; i < batch_size; ++i) { - derived->add(place, columns, i, arena); + + if constexpr (is_aggregate_function_multi_top<Derived>::value || + is_aggregate_function_multi_top_with_null_variadic_inline<Derived>::value) { + derived->add_range(place, columns, 0, batch_size, arena); Review Comment: add_range() calls add() in a loop just like the `else` branch. So is it necessary to add a branch for is_aggregate_function_multi_top? ``` void add_range(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t min, ssize_t max, Arena* arena) const { for (ssize_t row_num = min; row_num < max; ++row_num) { add(place, columns, row_num, arena); } } ``` ########## be/src/vec/aggregate_functions/aggregate_function_multi_topn.h: ########## @@ -0,0 +1,267 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <rapidjson/encodings.h> +#include <rapidjson/stringbuffer.h> +#include <rapidjson/writer.h> + +#include <cstdint> +#include <string> + +#include "vec/aggregate_functions/aggregate_function.h" +#include "vec/aggregate_functions/aggregate_function_multi_top.h" +#include "vec/columns/column.h" +#include "vec/columns/column_array.h" +#include "vec/columns/column_string.h" +#include "vec/columns/column_struct.h" +#include "vec/columns/column_vector.h" +#include "vec/columns/columns_number.h" +#include "vec/common/assert_cast.h" +#include "vec/common/space_saving.h" +#include "vec/common/string_ref.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type_array.h" +#include "vec/data_types/data_type_ipv4.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/data_types/data_type_struct.h" +#include "vec/io/io_helper.h" + +namespace doris::vectorized { + +inline constexpr UInt64 TOP_K_MAX_SIZE = 0xFFFFFF; + +struct AggregateFunctionTopKGenericData { + using Set = SpaceSaving<StringRef, StringRefHash>; + + Set value; +}; + +class AggregateFunctionMultiTopN final + : public IAggregateFunctionDataHelper<AggregateFunctionTopKGenericData, + AggregateFunctionMultiTopN>, + AggregateFunctionMultiTop { +private: + using State = AggregateFunctionTopKGenericData; + +public: + AggregateFunctionMultiTopN(const DataTypes& argument_types_) + : IAggregateFunctionDataHelper<AggregateFunctionTopKGenericData, + AggregateFunctionMultiTopN>(argument_types_), + column_size(argument_types_.size() - 2) {} + + String get_name() const override { return "multi_topn"; } Review Comment: approx_topn is better since the key semantics is approximate and multi filed is not necessary. ########## be/src/vec/aggregate_functions/aggregate_function_multi_topn.h: ########## @@ -0,0 +1,267 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <rapidjson/encodings.h> +#include <rapidjson/stringbuffer.h> +#include <rapidjson/writer.h> + +#include <cstdint> +#include <string> + +#include "vec/aggregate_functions/aggregate_function.h" +#include "vec/aggregate_functions/aggregate_function_multi_top.h" +#include "vec/columns/column.h" +#include "vec/columns/column_array.h" +#include "vec/columns/column_string.h" +#include "vec/columns/column_struct.h" +#include "vec/columns/column_vector.h" +#include "vec/columns/columns_number.h" +#include "vec/common/assert_cast.h" +#include "vec/common/space_saving.h" +#include "vec/common/string_ref.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type_array.h" +#include "vec/data_types/data_type_ipv4.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/data_types/data_type_struct.h" +#include "vec/io/io_helper.h" + +namespace doris::vectorized { + +inline constexpr UInt64 TOP_K_MAX_SIZE = 0xFFFFFF; + +struct AggregateFunctionTopKGenericData { + using Set = SpaceSaving<StringRef, StringRefHash>; + + Set value; +}; + +class AggregateFunctionMultiTopN final + : public IAggregateFunctionDataHelper<AggregateFunctionTopKGenericData, + AggregateFunctionMultiTopN>, + AggregateFunctionMultiTop { +private: + using State = AggregateFunctionTopKGenericData; + +public: + AggregateFunctionMultiTopN(const DataTypes& argument_types_) + : IAggregateFunctionDataHelper<AggregateFunctionTopKGenericData, + AggregateFunctionMultiTopN>(argument_types_), + column_size(argument_types_.size() - 2) {} + + String get_name() const override { return "multi_topn"; } + + DataTypePtr get_return_type() const override { return std::make_shared<DataTypeString>(); } + + bool allocates_memory_in_arena() const override { return true; } + + void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override { + this->data(place).value.write(buf); + + write_var_uint(threshold, buf); + write_var_uint(reserved, buf); + } + + void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf, Review Comment: add comment ########## fe/fe-core/src/main/java/org/apache/doris/nereids/trees/expressions/functions/agg/MultiTopN.java: ########## @@ -0,0 +1,94 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.nereids.trees.expressions.functions.agg; + +import org.apache.doris.catalog.FunctionSignature; +import org.apache.doris.nereids.exceptions.AnalysisException; +import org.apache.doris.nereids.trees.expressions.Expression; +import org.apache.doris.nereids.trees.expressions.functions.ExplicitlyCastableSignature; +import org.apache.doris.nereids.trees.expressions.visitor.ExpressionVisitor; +import org.apache.doris.nereids.types.StringType; +import org.apache.doris.nereids.types.coercion.AnyDataType; + +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; + +import java.util.List; + +/** + * AggregateFunction 'multi_topn'. This class is generated by GenerateFunction. + */ +public class MultiTopN extends NullableAggregateFunction + implements ExplicitlyCastableSignature { + + public static final List<FunctionSignature> SIGNATURES = ImmutableList.of( + FunctionSignature.ret(StringType.INSTANCE) + .varArgs(AnyDataType.INSTANCE_WITHOUT_INDEX) + ); + + public MultiTopN(Expression... varArgs) { + this(false, varArgs); + } + + public MultiTopN(boolean distinct, Expression... varArgs) { + this(distinct, false, varArgs); + } + + public MultiTopN(boolean distinct, boolean alwaysNullable, Expression... varArgs) { + super("multi_topn", distinct, alwaysNullable, varArgs); + } + + @Override + public void checkLegalityBeforeTypeCoercion() { + if (arity() < 3) { Review Comment: we should add a function without argument `reserved` and use a default value to be more easy to use for general users. ########## be/src/vec/aggregate_functions/aggregate_function_multi_topn.h: ########## @@ -0,0 +1,267 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include <rapidjson/encodings.h> +#include <rapidjson/stringbuffer.h> +#include <rapidjson/writer.h> + +#include <cstdint> +#include <string> + +#include "vec/aggregate_functions/aggregate_function.h" +#include "vec/aggregate_functions/aggregate_function_multi_top.h" +#include "vec/columns/column.h" +#include "vec/columns/column_array.h" +#include "vec/columns/column_string.h" +#include "vec/columns/column_struct.h" +#include "vec/columns/column_vector.h" +#include "vec/columns/columns_number.h" +#include "vec/common/assert_cast.h" +#include "vec/common/space_saving.h" +#include "vec/common/string_ref.h" +#include "vec/core/types.h" +#include "vec/data_types/data_type_array.h" +#include "vec/data_types/data_type_ipv4.h" +#include "vec/data_types/data_type_nullable.h" +#include "vec/data_types/data_type_struct.h" +#include "vec/io/io_helper.h" + +namespace doris::vectorized { + +inline constexpr UInt64 TOP_K_MAX_SIZE = 0xFFFFFF; + +struct AggregateFunctionTopKGenericData { + using Set = SpaceSaving<StringRef, StringRefHash>; + + Set value; +}; + +class AggregateFunctionMultiTopN final + : public IAggregateFunctionDataHelper<AggregateFunctionTopKGenericData, + AggregateFunctionMultiTopN>, + AggregateFunctionMultiTop { +private: + using State = AggregateFunctionTopKGenericData; + +public: + AggregateFunctionMultiTopN(const DataTypes& argument_types_) + : IAggregateFunctionDataHelper<AggregateFunctionTopKGenericData, + AggregateFunctionMultiTopN>(argument_types_), + column_size(argument_types_.size() - 2) {} + + String get_name() const override { return "multi_topn"; } + + DataTypePtr get_return_type() const override { return std::make_shared<DataTypeString>(); } + + bool allocates_memory_in_arena() const override { return true; } + + void serialize(ConstAggregateDataPtr __restrict place, BufferWritable& buf) const override { + this->data(place).value.write(buf); + + write_var_uint(threshold, buf); + write_var_uint(reserved, buf); + } + + void deserialize(AggregateDataPtr __restrict place, BufferReadable& buf, + Arena* arena) const override { + auto readStringBinaryInto = [](Arena& arena, BufferReadable& buf) { + size_t size = 0; + read_var_uint(size, buf); + + if (UNLIKELY(size > DEFAULT_MAX_STRING_SIZE)) { + throw Exception(ErrorCode::INTERNAL_ERROR, "Too large string size."); + } + + char* data = arena.alloc(size); + buf.read(data, size); + + return StringRef(data, size); + }; + + auto& set = this->data(place).value; + set.clear(); + + size_t size = 0; + read_var_uint(size, buf); + if (UNLIKELY(size > TOP_K_MAX_SIZE)) { + throw Exception(ErrorCode::INTERNAL_ERROR, + "Too large size ({}) for aggregate function '{}' state (maximum is {})", + size, get_name(), TOP_K_MAX_SIZE); + } + + set.resize(size); + for (size_t i = 0; i < size; ++i) { + auto ref = readStringBinaryInto(*arena, buf); + uint64_t count = 0; + uint64_t error = 0; + read_var_uint(count, buf); + read_var_uint(error, buf); + set.insert(ref, count, error); + arena->rollback(ref.size); + } + + set.read_alpha_map(buf); + + read_var_uint(threshold, buf); + read_var_uint(reserved, buf); + } + + void add(AggregateDataPtr __restrict place, const IColumn** columns, ssize_t row_num, Review Comment: add comment ########## be/src/vec/aggregate_functions/aggregate_function_simple_factory.cpp: ########## @@ -114,20 +115,17 @@ AggregateFunctionSimpleFactory& AggregateFunctionSimpleFactory::instance() { register_aggregate_function_linear_histogram(instance); register_aggregate_function_map_agg(instance); register_aggregate_function_bitmap_agg(instance); - Review Comment: unnecessary change -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
