This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a6756b46609 [pipelineX](bug) Fix broadcast buffer reference count
(#26545)
a6756b46609 is described below
commit a6756b46609e7a96bebce26658379f0516ea21d4
Author: Gabriel <[email protected]>
AuthorDate: Wed Nov 8 00:14:48 2023 +0800
[pipelineX](bug) Fix broadcast buffer reference count (#26545)
---
be/src/pipeline/exec/exchange_sink_buffer.cpp | 9 +++------
be/src/pipeline/exec/exchange_sink_buffer.h | 7 ++++---
be/src/pipeline/exec/exchange_sink_operator.cpp | 21 ++++++++-------------
be/src/pipeline/exec/result_file_sink_operator.cpp | 2 +-
be/src/vec/sink/vdata_stream_sender.cpp | 6 +++++-
be/src/vec/sink/vdata_stream_sender.h | 9 ++++-----
6 files changed, 25 insertions(+), 29 deletions(-)
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 577eb4a4622..f450fa69605 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -177,20 +177,17 @@ Status
ExchangeSinkBuffer<Parent>::add_block(TransmitInfo<Parent>&& request) {
}
template <typename Parent>
-Status ExchangeSinkBuffer<Parent>::add_block(BroadcastTransmitInfo<Parent>&&
request,
- [[maybe_unused]] bool* sent) {
+Status ExchangeSinkBuffer<Parent>::add_block(BroadcastTransmitInfo<Parent>&&
request) {
if (_is_finishing) {
+ request.block_holder->unref();
return Status::OK();
}
TUniqueId ins_id = request.channel->_fragment_instance_id;
if (_is_receiver_eof(ins_id.lo)) {
+ request.block_holder->unref();
return Status::EndOfFile("receiver eof");
}
bool send_now = false;
- if (sent) {
- *sent = true;
- }
- request.block_holder->ref();
{
std::unique_lock<std::mutex>
lock(*_instance_to_package_queue_mutex[ins_id.lo]);
// Do not have in process rpc, directly send
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h
b/be/src/pipeline/exec/exchange_sink_buffer.h
index d63dd2a55f9..2b30f6fac70 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -76,15 +76,16 @@ public:
BroadcastPBlockHolder(pipeline::BroadcastDependency* dep) : _ref_count(0),
_dep(dep) {}
~BroadcastPBlockHolder() noexcept = default;
+ void ref(int delta) noexcept { _ref_count._value.fetch_add(delta); }
void unref() noexcept;
- void ref() noexcept { _ref_count._value.fetch_add(1); }
+ void ref() noexcept { ref(1); }
bool available() { return _ref_count._value == 0; }
PBlock* get_block() { return &pblock; }
private:
- AtomicWrapper<uint32_t> _ref_count;
+ AtomicWrapper<int32_t> _ref_count;
PBlock pblock;
pipeline::BroadcastDependency* _dep;
};
@@ -177,7 +178,7 @@ public:
void register_sink(TUniqueId);
Status add_block(TransmitInfo<Parent>&& request);
- Status add_block(BroadcastTransmitInfo<Parent>&& request, [[maybe_unused]]
bool* sent);
+ Status add_block(BroadcastTransmitInfo<Parent>&& request);
bool can_write() const;
bool is_pending_finish();
void close();
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index e6bcccbabb2..44d6b6448ed 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -344,30 +344,25 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state,
vectorized::Block* block
} else {
block_holder->get_block()->Clear();
}
- Status error_status = Status::OK();
- bool sent = false;
+ local_state._broadcast_dependency->take_available_block();
+ block_holder->ref(local_state.channels.size());
+ Status status;
for (auto channel : local_state.channels) {
if (!channel->is_receiver_eof()) {
Status status;
if (channel->is_local()) {
+ block_holder->unref();
status = channel->send_local_block(&cur_block);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
status = channel->send_broadcast_block(
- block_holder, &sent, source_state ==
SourceState::FINISHED);
- }
- if (status.is<ErrorCode::END_OF_FILE>()) {
- _handle_eof_channel(state, channel, status);
- } else if (!status.ok()) {
- error_status = status;
- break;
+ block_holder, source_state ==
SourceState::FINISHED);
}
+ HANDLE_CHANNEL_STATUS(state, channel, status);
+ } else {
+ block_holder->unref();
}
}
- if (sent) {
-
local_state._broadcast_dependency->take_available_block();
- }
- RETURN_IF_ERROR(error_status);
cur_block.clear_column_data();
local_state._serializer.get_block()->set_muatable_columns(
cur_block.mutate_columns());
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index 217696c6939..3193c1b07c4 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -241,7 +241,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state,
Status exec_status)
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
status =
channel->send_broadcast_block(_block_holder.get(),
-
nullptr, true);
+
true);
}
HANDLE_CHANNEL_STATUS(state, channel, status);
}
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp
b/be/src/vec/sink/vdata_stream_sender.cpp
index a7066f981ba..728bd0c56f4 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -562,15 +562,19 @@ Status VDataStreamSender::send(RuntimeState* state,
Block* block, bool eos) {
block_holder->get_block()->Clear();
}
Status status;
+ block_holder->ref(_channels.size());
for (auto channel : _channels) {
if (!channel->is_receiver_eof()) {
if (channel->is_local()) {
+ block_holder->unref();
status = channel->send_local_block(&cur_block);
} else {
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
- status =
channel->send_broadcast_block(block_holder, nullptr, eos);
+ status =
channel->send_broadcast_block(block_holder, eos);
}
HANDLE_CHANNEL_STATUS(state, channel, status);
+ } else {
+ block_holder->unref();
}
}
cur_block.clear_column_data();
diff --git a/be/src/vec/sink/vdata_stream_sender.h
b/be/src/vec/sink/vdata_stream_sender.h
index 30a55f13296..10116be0334 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -278,8 +278,7 @@ public:
virtual Status send_remote_block(PBlock* block, bool eos = false,
Status exec_status = Status::OK());
- virtual Status send_broadcast_block(BroadcastPBlockHolder* block,
[[maybe_unused]] bool* sent,
- bool eos = false) {
+ virtual Status send_broadcast_block(BroadcastPBlockHolder* block, bool eos
= false) {
return Status::InternalError("Send BroadcastPBlockHolder is not
allowed!");
}
@@ -494,17 +493,17 @@ public:
return Status::OK();
}
- Status send_broadcast_block(BroadcastPBlockHolder* block, [[maybe_unused]]
bool* sent,
- bool eos = false) override {
+ Status send_broadcast_block(BroadcastPBlockHolder* block, bool eos =
false) override {
COUNTER_UPDATE(Channel<Parent>::_parent->blocks_sent_counter(), 1);
if (eos) {
if (_eos_send) {
+ block->unref();
return Status::OK();
}
_eos_send = true;
}
if (eos || block->get_block()->column_metas_size()) {
- RETURN_IF_ERROR(_buffer->add_block({this, block, eos}, sent));
+ RETURN_IF_ERROR(_buffer->add_block({this, block, eos}));
}
return Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]