This is an automated email from the ASF dual-hosted git repository.

HappenLee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 8f696e0ecaf [fix](be) Preserve collect aggregate limit during merge 
(#63361)
8f696e0ecaf is described below

commit 8f696e0ecaf400ceb4fa0895f3059aa8b6b9a444
Author: Jerry Hu <[email protected]>
AuthorDate: Tue May 19 11:42:15 2026 +0800

    [fix](be) Preserve collect aggregate limit during merge (#63361)
    
    Problem Summary: collect_list and collect_set aggregate states with a
    limit could lose their initialized max_size during merge because several
    merge paths overwrote the left state max_size with the right state
    unconditionally. If the right state was still uninitialized, later
    merge/add operations could ignore the requested limit.
    
    ### Release note
    
    None
    
    ### Check List (For Author)
    
    - Test: Unit Test
        - build-support/clang-format.sh
        - build-support/check-format.sh
    - ./run-be-ut.sh --run
    --filter=VAggCollectTest.*:AggregateFunctionCollectTest.*
    - build-support/run-clang-tidy.sh --build-dir be/ut_build_ASAN (failed:
    local clang-tidy could not analyze due to missing stddef.h in the
    toolchain include path and an existing unmatched NOLINTEND diagnostic in
    be/src/core/types.h)
    - Behavior changed: Yes. collect_list/collect_set merge preserves an
    initialized max_size limit instead of overwriting it with an
    uninitialized right-side state.
    - Does this need documentation: No
---
 .../exprs/aggregate/aggregate_function_collect.h   |  9 +--
 be/test/exprs/aggregate/agg_collect_test.cpp       | 72 ++++++++++++++++++++--
 2 files changed, 70 insertions(+), 11 deletions(-)

diff --git a/be/src/exprs/aggregate/aggregate_function_collect.h 
b/be/src/exprs/aggregate/aggregate_function_collect.h
index b78e86b0fc4..cd787627ef9 100644
--- a/be/src/exprs/aggregate/aggregate_function_collect.h
+++ b/be/src/exprs/aggregate/aggregate_function_collect.h
@@ -17,11 +17,10 @@
 
 #pragma once
 
-#include <assert.h>
 #include <glog/logging.h>
-#include <string.h>
 
 #include <cstddef>
+#include <cstring>
 #include <limits>
 #include <memory>
 #include <new>
@@ -137,7 +136,6 @@ struct AggregateFunctionCollectSetData<T, HasLimit> {
         if (max_size == -1) {
             max_size = rhs.max_size;
         }
-        max_size = rhs.max_size;
 
         for (const auto& rhs_elem : rhs.data_set) {
             if constexpr (HasLimit) {
@@ -205,7 +203,6 @@ struct AggregateFunctionCollectListData {
             if (max_size == -1) {
                 max_size = rhs.max_size;
             }
-            max_size = rhs.max_size;
             for (auto& rhs_elem : rhs.data) {
                 if (size() >= max_size) {
                     return;
@@ -237,7 +234,7 @@ struct AggregateFunctionCollectListData {
         auto& vec = assert_cast<ColVecType&>(to).get_data();
         size_t old_size = vec.size();
         vec.resize(old_size + size());
-        memcpy(vec.data() + old_size, data.data(), size() * 
sizeof(ElementType));
+        std::memcpy(vec.data() + old_size, data.data(), size() * 
sizeof(ElementType));
     }
 };
 
@@ -263,7 +260,6 @@ struct AggregateFunctionCollectListData<T, HasLimit> {
             if (max_size == -1) {
                 max_size = rhs.max_size;
             }
-            max_size = rhs.max_size;
 
             data->insert_range_from(*rhs.data, 0,
                                     std::min(static_cast<size_t>(max_size - 
size()), rhs.size()));
@@ -332,7 +328,6 @@ struct AggregateFunctionCollectListData<T, HasLimit> {
             if (max_size == -1) {
                 max_size = rhs.max_size;
             }
-            max_size = rhs.max_size;
 
             column_data->insert_range_from(
                     *rhs.column_data, 0,
diff --git a/be/test/exprs/aggregate/agg_collect_test.cpp 
b/be/test/exprs/aggregate/agg_collect_test.cpp
index 3fb0c8c020c..d5394ab86c4 100644
--- a/be/test/exprs/aggregate/agg_collect_test.cpp
+++ b/be/test/exprs/aggregate/agg_collect_test.cpp
@@ -17,9 +17,9 @@
 
 #include <gtest/gtest-message.h>
 #include <gtest/gtest-test-part.h>
-#include <stddef.h>
-#include <stdint.h>
 
+#include <cstddef>
+#include <cstdint>
 #include <memory>
 #include <ostream>
 #include <string>
@@ -40,6 +40,7 @@
 #include "core/types.h"
 #include "exprs/aggregate/agg_function_test.h"
 #include "exprs/aggregate/aggregate_function.h"
+#include "exprs/aggregate/aggregate_function_collect.h"
 #include "exprs/aggregate/aggregate_function_simple_factory.h"
 #include "gtest/gtest_pred_impl.h"
 
@@ -53,12 +54,12 @@ void 
register_aggregate_function_collect_list(AggregateFunctionSimpleFactory& fa
 
 class VAggCollectTest : public testing::Test {
 public:
-    void SetUp() {
+    void SetUp() override {
         AggregateFunctionSimpleFactory factory = 
AggregateFunctionSimpleFactory::instance();
         register_aggregate_function_collect_list(factory);
     }
 
-    void TearDown() {}
+    void TearDown() override {}
 
     bool is_distinct(const std::string& fn_name) { return fn_name == 
"collect_set"; }
 
@@ -217,6 +218,69 @@ TEST_F(VAggCollectTest, test_complex_data_type) {
     test_agg_collect<DataTypeString>("array_agg", 5, true);
 }
 
+TEST_F(VAggCollectTest, test_merge_preserve_initialized_max_size) {
+    {
+        const DataTypes argument_types {std::make_shared<DataTypeInt32>()};
+        AggregateFunctionCollectListData<TYPE_INT, true> lhs(argument_types);
+        AggregateFunctionCollectListData<TYPE_INT, true> rhs(argument_types);
+        lhs.max_size = 2;
+        lhs.data.push_back(1);
+        lhs.data.push_back(2);
+        rhs.data.push_back(3);
+        rhs.data.push_back(4);
+
+        lhs.merge(rhs);
+
+        EXPECT_EQ(lhs.max_size, 2);
+        EXPECT_EQ(lhs.size(), 2);
+    }
+
+    {
+        const DataTypes argument_types {std::make_shared<DataTypeString>()};
+        AggregateFunctionCollectSetData<TYPE_STRING, true> lhs(argument_types);
+        AggregateFunctionCollectSetData<TYPE_STRING, true> rhs(argument_types);
+        lhs.max_size = 1;
+        lhs.data_set.insert(StringRef("lhs", sizeof("lhs") - 1));
+        rhs.data_set.insert(StringRef("rhs", sizeof("rhs") - 1));
+        Arena arena;
+
+        lhs.merge(rhs, arena);
+
+        EXPECT_EQ(lhs.max_size, 1);
+        EXPECT_EQ(lhs.size(), 1);
+    }
+
+    {
+        const DataTypes argument_types {std::make_shared<DataTypeString>()};
+        AggregateFunctionCollectListData<TYPE_STRING, true> 
lhs(argument_types);
+        AggregateFunctionCollectListData<TYPE_STRING, true> 
rhs(argument_types);
+        lhs.max_size = 1;
+        lhs.data->insert_data("lhs", sizeof("lhs") - 1);
+        rhs.data->insert_data("rhs", sizeof("rhs") - 1);
+
+        lhs.merge(rhs);
+
+        EXPECT_EQ(lhs.max_size, 1);
+        EXPECT_EQ(lhs.size(), 1);
+    }
+
+    {
+        const DataTypePtr nested_type = std::make_shared<DataTypeInt32>();
+        const DataTypes argument_types {
+                std::make_shared<DataTypeArray>(make_nullable(nested_type))};
+        AggregateFunctionCollectListData<TYPE_ARRAY, true> lhs(argument_types);
+        AggregateFunctionCollectListData<TYPE_ARRAY, true> rhs(argument_types);
+        lhs.max_size = 1;
+        lhs.column_data->insert_default();
+        rhs.column_data->insert_default();
+
+        lhs.merge(rhs);
+
+        EXPECT_EQ(lhs.max_size, 1);
+        EXPECT_EQ(lhs.size(), 1);
+    }
+}
+
 struct AggregateFunctionCollectTest : public AggregateFunctiontest {};
 
 TEST_F(AggregateFunctionCollectTest, test_collect_list_aint64) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to