This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch branch-2.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-2.1 by this push:
new 405b50b1b7a [Improvement](queue) Return value of concurrent queue
should be proce… (#45032)
405b50b1b7a is described below
commit 405b50b1b7a0eefa79228da1c57955ed3d060fa3
Author: Gabriel <[email protected]>
AuthorDate: Thu Dec 5 17:46:18 2024 +0800
[Improvement](queue) Return value of concurrent queue should be proce…
(#45032)
…… (#44986)
…ssed
Push items into concurrent queue will return false due to some
unexpected error (e.g. poor memory available).
---
.../pipeline/pipeline_x/local_exchange/local_exchanger.h | 14 ++++++++++++--
be/src/vec/exec/scan/scanner_context.cpp | 5 ++++-
2 files changed, 16 insertions(+), 3 deletions(-)
diff --git a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
index fda86b5bb55..6a6680c0bbd 100644
--- a/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
+++ b/be/src/pipeline/pipeline_x/local_exchange/local_exchanger.h
@@ -88,7 +88,11 @@ struct BlockQueue {
: eos(other.eos.load()), data_queue(std::move(other.data_queue)) {}
inline bool enqueue(BlockType const& item) {
if (!eos) {
- data_queue.enqueue(item);
+ if (!data_queue.enqueue(item)) [[unlikely]] {
+ throw Exception(ErrorCode::INTERNAL_ERROR,
+ "Exception occurs in data queue [size = {}] of
local exchange.",
+ data_queue.size_approx());
+ }
return true;
}
return false;
@@ -96,7 +100,11 @@ struct BlockQueue {
inline bool enqueue(BlockType&& item) {
if (!eos) {
- data_queue.enqueue(std::move(item));
+ if (!data_queue.enqueue(std::move(item))) [[unlikely]] {
+ throw Exception(ErrorCode::INTERNAL_ERROR,
+ "Exception occurs in data queue [size = {}] of
local exchange.",
+ data_queue.size_approx());
+ }
return true;
}
return false;
@@ -146,6 +154,8 @@ struct ShuffleBlockWrapper {
shared_state->exchanger->_free_block_limit *
shared_state->exchanger->_num_sources) {
data_block.clear_column_data();
+ // Free blocks is used to improve memory efficiency. Failure
during pushing back
+ // free block will not incur any bad result so just ignore the
return value.
shared_state->exchanger->_free_blocks.enqueue(std::move(data_block));
}
}
diff --git a/be/src/vec/exec/scan/scanner_context.cpp
b/be/src/vec/exec/scan/scanner_context.cpp
index ee34c5fb774..c2338a57818 100644
--- a/be/src/vec/exec/scan/scanner_context.cpp
+++ b/be/src/vec/exec/scan/scanner_context.cpp
@@ -63,7 +63,10 @@ ScannerContext::ScannerContext(RuntimeState* state, const
TupleDescriptor* outpu
_output_row_descriptor->tuple_descriptors().size() == 1);
_query_id = _state->get_query_ctx()->query_id();
ctx_id = UniqueId::gen_uid().to_string();
- _scanners.enqueue_bulk(scanners.begin(), scanners.size());
+ if (!_scanners.enqueue_bulk(scanners.begin(), scanners.size()))
[[unlikely]] {
+ throw Exception(ErrorCode::INTERNAL_ERROR,
+ "Exception occurs during scanners initialization.");
+ };
if (limit < 0) {
limit = -1;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]