This is an automated email from the ASF dual-hosted git repository.

gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new a6756b46609 [pipelineX](bug) Fix broadcast buffer reference count 
(#26545)
a6756b46609 is described below

commit a6756b46609e7a96bebce26658379f0516ea21d4
Author: Gabriel <[email protected]>
AuthorDate: Wed Nov 8 00:14:48 2023 +0800

    [pipelineX](bug) Fix broadcast buffer reference count (#26545)
---
 be/src/pipeline/exec/exchange_sink_buffer.cpp      |  9 +++------
 be/src/pipeline/exec/exchange_sink_buffer.h        |  7 ++++---
 be/src/pipeline/exec/exchange_sink_operator.cpp    | 21 ++++++++-------------
 be/src/pipeline/exec/result_file_sink_operator.cpp |  2 +-
 be/src/vec/sink/vdata_stream_sender.cpp            |  6 +++++-
 be/src/vec/sink/vdata_stream_sender.h              |  9 ++++-----
 6 files changed, 25 insertions(+), 29 deletions(-)

diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp 
b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index 577eb4a4622..f450fa69605 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -177,20 +177,17 @@ Status 
ExchangeSinkBuffer<Parent>::add_block(TransmitInfo<Parent>&& request) {
 }
 
 template <typename Parent>
-Status ExchangeSinkBuffer<Parent>::add_block(BroadcastTransmitInfo<Parent>&& 
request,
-                                             [[maybe_unused]] bool* sent) {
+Status ExchangeSinkBuffer<Parent>::add_block(BroadcastTransmitInfo<Parent>&& 
request) {
     if (_is_finishing) {
+        request.block_holder->unref();
         return Status::OK();
     }
     TUniqueId ins_id = request.channel->_fragment_instance_id;
     if (_is_receiver_eof(ins_id.lo)) {
+        request.block_holder->unref();
         return Status::EndOfFile("receiver eof");
     }
     bool send_now = false;
-    if (sent) {
-        *sent = true;
-    }
-    request.block_holder->ref();
     {
         std::unique_lock<std::mutex> 
lock(*_instance_to_package_queue_mutex[ins_id.lo]);
         // Do not have in process rpc, directly send
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.h 
b/be/src/pipeline/exec/exchange_sink_buffer.h
index d63dd2a55f9..2b30f6fac70 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.h
+++ b/be/src/pipeline/exec/exchange_sink_buffer.h
@@ -76,15 +76,16 @@ public:
     BroadcastPBlockHolder(pipeline::BroadcastDependency* dep) : _ref_count(0), 
_dep(dep) {}
     ~BroadcastPBlockHolder() noexcept = default;
 
+    void ref(int delta) noexcept { _ref_count._value.fetch_add(delta); }
     void unref() noexcept;
-    void ref() noexcept { _ref_count._value.fetch_add(1); }
+    void ref() noexcept { ref(1); }
 
     bool available() { return _ref_count._value == 0; }
 
     PBlock* get_block() { return &pblock; }
 
 private:
-    AtomicWrapper<uint32_t> _ref_count;
+    AtomicWrapper<int32_t> _ref_count;
     PBlock pblock;
     pipeline::BroadcastDependency* _dep;
 };
@@ -177,7 +178,7 @@ public:
     void register_sink(TUniqueId);
 
     Status add_block(TransmitInfo<Parent>&& request);
-    Status add_block(BroadcastTransmitInfo<Parent>&& request, [[maybe_unused]] 
bool* sent);
+    Status add_block(BroadcastTransmitInfo<Parent>&& request);
     bool can_write() const;
     bool is_pending_finish();
     void close();
diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp 
b/be/src/pipeline/exec/exchange_sink_operator.cpp
index e6bcccbabb2..44d6b6448ed 100644
--- a/be/src/pipeline/exec/exchange_sink_operator.cpp
+++ b/be/src/pipeline/exec/exchange_sink_operator.cpp
@@ -344,30 +344,25 @@ Status ExchangeSinkOperatorX::sink(RuntimeState* state, 
vectorized::Block* block
                     } else {
                         block_holder->get_block()->Clear();
                     }
-                    Status error_status = Status::OK();
-                    bool sent = false;
+                    local_state._broadcast_dependency->take_available_block();
+                    block_holder->ref(local_state.channels.size());
+                    Status status;
                     for (auto channel : local_state.channels) {
                         if (!channel->is_receiver_eof()) {
                             Status status;
                             if (channel->is_local()) {
+                                block_holder->unref();
                                 status = channel->send_local_block(&cur_block);
                             } else {
                                 SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
                                 status = channel->send_broadcast_block(
-                                        block_holder, &sent, source_state == 
SourceState::FINISHED);
-                            }
-                            if (status.is<ErrorCode::END_OF_FILE>()) {
-                                _handle_eof_channel(state, channel, status);
-                            } else if (!status.ok()) {
-                                error_status = status;
-                                break;
+                                        block_holder, source_state == 
SourceState::FINISHED);
                             }
+                            HANDLE_CHANNEL_STATUS(state, channel, status);
+                        } else {
+                            block_holder->unref();
                         }
                     }
-                    if (sent) {
-                        
local_state._broadcast_dependency->take_available_block();
-                    }
-                    RETURN_IF_ERROR(error_status);
                     cur_block.clear_column_data();
                     local_state._serializer.get_block()->set_muatable_columns(
                             cur_block.mutate_columns());
diff --git a/be/src/pipeline/exec/result_file_sink_operator.cpp 
b/be/src/pipeline/exec/result_file_sink_operator.cpp
index 217696c6939..3193c1b07c4 100644
--- a/be/src/pipeline/exec/result_file_sink_operator.cpp
+++ b/be/src/pipeline/exec/result_file_sink_operator.cpp
@@ -241,7 +241,7 @@ Status ResultFileSinkLocalState::close(RuntimeState* state, 
Status exec_status)
                                 } else {
                                     
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
                                     status = 
channel->send_broadcast_block(_block_holder.get(),
-                                                                           
nullptr, true);
+                                                                           
true);
                                 }
                                 HANDLE_CHANNEL_STATUS(state, channel, status);
                             }
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp 
b/be/src/vec/sink/vdata_stream_sender.cpp
index a7066f981ba..728bd0c56f4 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -562,15 +562,19 @@ Status VDataStreamSender::send(RuntimeState* state, 
Block* block, bool eos) {
                         block_holder->get_block()->Clear();
                     }
                     Status status;
+                    block_holder->ref(_channels.size());
                     for (auto channel : _channels) {
                         if (!channel->is_receiver_eof()) {
                             if (channel->is_local()) {
+                                block_holder->unref();
                                 status = channel->send_local_block(&cur_block);
                             } else {
                                 SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
-                                status = 
channel->send_broadcast_block(block_holder, nullptr, eos);
+                                status = 
channel->send_broadcast_block(block_holder, eos);
                             }
                             HANDLE_CHANNEL_STATUS(state, channel, status);
+                        } else {
+                            block_holder->unref();
                         }
                     }
                     cur_block.clear_column_data();
diff --git a/be/src/vec/sink/vdata_stream_sender.h 
b/be/src/vec/sink/vdata_stream_sender.h
index 30a55f13296..10116be0334 100644
--- a/be/src/vec/sink/vdata_stream_sender.h
+++ b/be/src/vec/sink/vdata_stream_sender.h
@@ -278,8 +278,7 @@ public:
     virtual Status send_remote_block(PBlock* block, bool eos = false,
                                      Status exec_status = Status::OK());
 
-    virtual Status send_broadcast_block(BroadcastPBlockHolder* block, 
[[maybe_unused]] bool* sent,
-                                        bool eos = false) {
+    virtual Status send_broadcast_block(BroadcastPBlockHolder* block, bool eos 
= false) {
         return Status::InternalError("Send BroadcastPBlockHolder is not 
allowed!");
     }
 
@@ -494,17 +493,17 @@ public:
         return Status::OK();
     }
 
-    Status send_broadcast_block(BroadcastPBlockHolder* block, [[maybe_unused]] 
bool* sent,
-                                bool eos = false) override {
+    Status send_broadcast_block(BroadcastPBlockHolder* block, bool eos = 
false) override {
         COUNTER_UPDATE(Channel<Parent>::_parent->blocks_sent_counter(), 1);
         if (eos) {
             if (_eos_send) {
+                block->unref();
                 return Status::OK();
             }
             _eos_send = true;
         }
         if (eos || block->get_block()->column_metas_size()) {
-            RETURN_IF_ERROR(_buffer->add_block({this, block, eos}, sent));
+            RETURN_IF_ERROR(_buffer->add_block({this, block, eos}));
         }
         return Status::OK();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to