This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-3.0 by this push:
new 95f9b32c7fb [Bug](fix) Fix topn agg limit may get error result in when
refresh heap (#47844) (#47903)
95f9b32c7fb is described below
commit 95f9b32c7fba16dcafb413e180f4661ad721218e
Author: HappenLee <[email protected]>
AuthorDate: Fri Feb 14 15:21:44 2025 +0800
[Bug](fix) Fix topn agg limit may get error result in when refresh heap
(#47844) (#47903)
cherry pick #47844
---
be/src/pipeline/dependency.cpp | 12 +++
be/src/pipeline/dependency.h | 3 +
be/src/pipeline/exec/aggregation_sink_operator.cpp | 24 +-----
.../pipeline/operator/agg_shared_state_test.cpp | 95 ++++++++++++++++++++++
4 files changed, 114 insertions(+), 20 deletions(-)
diff --git a/be/src/pipeline/dependency.cpp b/be/src/pipeline/dependency.cpp
index 93117fa71a0..ffba01b05b2 100644
--- a/be/src/pipeline/dependency.cpp
+++ b/be/src/pipeline/dependency.cpp
@@ -399,6 +399,18 @@ Status
AggSharedState::_destroy_agg_status(vectorized::AggregateDataPtr data) {
return Status::OK();
}
+void AggSharedState::refresh_top_limit(size_t row_id,
+ const vectorized::ColumnRawPtrs&
key_columns) {
+ for (int j = 0; j < key_columns.size(); ++j) {
+ limit_columns[j]->insert_from(*key_columns[j], row_id);
+ }
+ limit_heap.emplace(limit_columns[0]->size() - 1, limit_columns,
order_directions,
+ null_directions);
+
+ limit_heap.pop();
+ limit_columns_min = limit_heap.top()._row_id;
+}
+
LocalExchangeSharedState::~LocalExchangeSharedState() = default;
} // namespace doris::pipeline
diff --git a/be/src/pipeline/dependency.h b/be/src/pipeline/dependency.h
index fea6d9cb7bb..dd953292396 100644
--- a/be/src/pipeline/dependency.h
+++ b/be/src/pipeline/dependency.h
@@ -386,6 +386,9 @@ public:
std::priority_queue<HeapLimitCursor> limit_heap;
+ // Refresh the top limit heap with a new row
+ void refresh_top_limit(size_t row_id, const vectorized::ColumnRawPtrs&
key_columns);
+
private:
vectorized::MutableColumns _get_keys_hash_table();
diff --git a/be/src/pipeline/exec/aggregation_sink_operator.cpp
b/be/src/pipeline/exec/aggregation_sink_operator.cpp
index d253685c4ae..39dfb4e7f3c 100644
--- a/be/src/pipeline/exec/aggregation_sink_operator.cpp
+++ b/be/src/pipeline/exec/aggregation_sink_operator.cpp
@@ -597,23 +597,7 @@ bool
AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData
agg_method.init_serialized_keys(key_columns,
num_rows);
size_t i = 0;
- auto refresh_top_limit = [&, this]() {
- _shared_state->limit_heap.pop();
- for (int j = 0; j < key_columns.size(); ++j) {
-
_shared_state->limit_columns[j]->insert_from(*key_columns[j],
-
i);
- }
- _shared_state->limit_heap.emplace(
-
_shared_state->limit_columns[0]->size() - 1,
- _shared_state->limit_columns,
- _shared_state->order_directions,
- _shared_state->null_directions);
- _shared_state->limit_columns_min =
-
_shared_state->limit_heap.top()._row_id;
- };
-
- auto creator = [this, refresh_top_limit](const
auto& ctor, auto& key,
- auto&
origin) {
+ auto creator = [&](const auto& ctor, auto& key,
auto& origin) {
try {
HashMethodType::try_presis_key_and_origin(key, origin,
*_agg_arena_pool);
@@ -625,7 +609,7 @@ bool
AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData
throw Exception(st.code(),
st.to_string());
}
ctor(key, mapped);
- refresh_top_limit();
+ _shared_state->refresh_top_limit(i,
key_columns);
} catch (...) {
// Exception-safety - if it can not
allocate memory or create status,
// the destructors will not be called.
@@ -634,7 +618,7 @@ bool
AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData
}
};
- auto creator_for_null_key = [this,
refresh_top_limit](auto& mapped) {
+ auto creator_for_null_key = [&](auto& mapped) {
mapped = _agg_arena_pool->aligned_alloc(
Base::_parent->template
cast<AggSinkOperatorX>()
._total_size_of_aggregate_states,
@@ -644,7 +628,7 @@ bool
AggSinkLocalState::_emplace_into_hash_table_limit(vectorized::AggregateData
if (!st) {
throw Exception(st.code(), st.to_string());
}
- refresh_top_limit();
+ _shared_state->refresh_top_limit(i,
key_columns);
};
SCOPED_TIMER(_hash_table_emplace_timer);
diff --git a/be/test/pipeline/operator/agg_shared_state_test.cpp
b/be/test/pipeline/operator/agg_shared_state_test.cpp
new file mode 100644
index 00000000000..e4ce200ed1e
--- /dev/null
+++ b/be/test/pipeline/operator/agg_shared_state_test.cpp
@@ -0,0 +1,95 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "pipeline/dependency.h"
+#include "vec/columns/column_vector.h"
+#include "vec/data_types/data_type_number.h"
+
+namespace doris::pipeline {
+
+class AggSharedStateTest : public testing::Test {
+protected:
+ void SetUp() override {
+ _shared_state = std::make_shared<AggSharedState>();
+
+ // Setup test data
+ auto int_type = std::make_shared<vectorized::DataTypeInt32>();
+ _shared_state->limit_columns.push_back(int_type->create_column());
+
+ // Setup order directions (ascending)
+ _shared_state->order_directions = {1};
+ _shared_state->null_directions = {1};
+
+ // Create test column
+ _test_column = int_type->create_column();
+ auto* col_data =
reinterpret_cast<vectorized::ColumnVector<int>*>(_test_column.get());
+
+ // Insert test values: 5, 3, 1, -2, -1, 0
+ col_data->insert(5);
+ col_data->insert(3);
+ col_data->insert(1);
+ col_data->insert(-1);
+ col_data->insert(0);
+ col_data->insert(2);
+
+ _key_columns.push_back(_test_column.get());
+ // prepare the heap data first [5, 3, 1, -2]
+ for (int i = 0; i < 4; ++i) {
+ for (int j = 0; j < _key_columns.size(); ++j) {
+ _shared_state->limit_columns[j]->insert_from(*_key_columns[j],
i);
+ }
+ // build agg limit heap
+ _shared_state->limit_heap.emplace(
+ _shared_state->limit_columns[0]->size() - 1,
_shared_state->limit_columns,
+ _shared_state->order_directions,
_shared_state->null_directions);
+ }
+ // keep the top limit values, only 3 value in heap [-1, 3, 1]
+ _shared_state->limit_heap.pop();
+ _shared_state->limit_columns_min =
_shared_state->limit_heap.top()._row_id;
+ }
+
+ std::shared_ptr<AggSharedState> _shared_state;
+ vectorized::MutableColumnPtr _test_column;
+ vectorized::ColumnRawPtrs _key_columns;
+};
+
+TEST_F(AggSharedStateTest, TestRefreshTopLimit) {
+ // Test with limit = 3 (keep top 3 values)
+ _shared_state->limit = 3;
+
+ // Add values one by one and verify the minimum value is tracked correctly
+ EXPECT_EQ(_shared_state->limit_columns_min, 1);
+
+ _shared_state->refresh_top_limit(4, _key_columns);
+ EXPECT_EQ(_shared_state->limit_columns_min, 2);
+
+ _shared_state->refresh_top_limit(5, _key_columns);
+ EXPECT_EQ(_shared_state->limit_columns_min, 2); // 1 should still be max
+
+ auto heap_size = _shared_state->limit_heap.size();
+ EXPECT_EQ(heap_size, 3);
+
+ EXPECT_EQ(_shared_state->limit_heap.top()._row_id, 2); // 1 should be the
top value
+ _shared_state->limit_heap.pop();
+ EXPECT_EQ(_shared_state->limit_heap.top()._row_id, 4); // 0 should be the
top value
+ _shared_state->limit_heap.pop();
+ EXPECT_EQ(_shared_state->limit_heap.top()._row_id, 3); // -1 should be the
top value
+}
+
+} // namespace doris::pipeline
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]