This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new c0d9a8d53c6 [minor](pipelineX) refine error message for broadcast
shuffle buffer (#26442)
c0d9a8d53c6 is described below
commit c0d9a8d53c66c40910d9b1b42486be5cd7ec13ea
Author: Gabriel <[email protected]>
AuthorDate: Mon Nov 6 15:10:13 2023 +0800
[minor](pipelineX) refine error message for broadcast shuffle buffer
(#26442)
---
be/src/pipeline/exec/exchange_sink_operator.cpp | 5 ++++-
be/src/pipeline/exec/exchange_sink_operator.h | 6 ++++--
be/src/vec/sink/vdata_stream_sender.h | 3 +--
3 files changed, 9 insertions(+), 5 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index 9e05d682864..a32fbe45c55 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -451,7 +451,10 @@ Status ExchangeSinkLocalState::get_next_available_buffer(
return Status::OK();
}
}
- return Status::InternalError("No broadcast buffer left!");
+ return Status::InternalError("No broadcast buffer left! Available blocks:
" +
+
std::to_string(_broadcast_dependency->available_blocks()) +
+ " and number of buffer is " +
+ std::to_string(_broadcast_pb_blocks.size()));
}
template <typename Channels, typename HashValueType>
diff --git a/be/src/pipeline/exec/exchange_sink_operator.h
b/be/src/pipeline/exec/exchange_sink_operator.h
index 05fa799e5cf..0d35f819442 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.h
+++ b/be/src/pipeline/exec/exchange_sink_operator.h
@@ -68,7 +68,7 @@ class ExchangeSinkQueueDependency final : public
WriteDependency {
public:
ENABLE_FACTORY_CREATOR(ExchangeSinkQueueDependency);
ExchangeSinkQueueDependency(int id) : WriteDependency(id,
"ResultQueueDependency") {}
- ~ExchangeSinkQueueDependency() = default;
+ ~ExchangeSinkQueueDependency() override = default;
void* shared_state() override { return nullptr; }
};
@@ -77,7 +77,7 @@ class BroadcastDependency final : public WriteDependency {
public:
ENABLE_FACTORY_CREATOR(BroadcastDependency);
BroadcastDependency(int id) : WriteDependency(id, "BroadcastDependency"),
_available_block(0) {}
- virtual ~BroadcastDependency() = default;
+ ~BroadcastDependency() override = default;
[[nodiscard]] WriteDependency* write_blocked_by() override {
if (config::enable_fuzzy_mode && _available_block == 0 &&
@@ -107,6 +107,8 @@ public:
throw doris::Exception(ErrorCode::NOT_IMPLEMENTED_ERROR, "Should not
reach here!");
}
+ int available_blocks() const { return _available_block; }
+
private:
std::atomic<int> _available_block;
};
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index 3a58514c8df..a09cb4b7d47 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -505,9 +505,8 @@ public:
if (eos) {
if (_eos_send) {
return Status::OK();
- } else {
- _eos_send = true;
}
+ _eos_send = true;
}
if (eos || block->get_block()->column_metas_size()) {
RETURN_IF_ERROR(_buffer->add_block({this, block, eos}, sent));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]