This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.1 by this push:
     new 37d0eb73f1a [Bug](sink) fix cancel_at_time not work at 
parallel_sink/parallel_outfile (#61740)
37d0eb73f1a is described below

commit 37d0eb73f1a86788c3a45e1b1aee2d49581c9d0b
Author: Pxl <[email protected]>
AuthorDate: Thu Mar 26 15:43:21 2026 +0800

    [Bug](sink) fix cancel_at_time not work at parallel_sink/parallel_outfile 
(#61740)
    
    pick #61739
---
 be/src/exec/operator/result_file_sink_operator.cpp   | 17 +++++++++++++----
 be/src/exec/operator/result_sink_operator.cpp        | 17 +++++++++++++----
 be/src/runtime/result_block_buffer.cpp               |  6 +++++-
 be/src/runtime/result_block_buffer.h                 | 18 ++++++++++++++++--
 be/test/exec/sink/arrow_result_block_buffer_test.cpp | 12 +++++++++---
 be/test/exec/sink/result_block_buffer_test.cpp       | 12 +++++++++---
 6 files changed, 65 insertions(+), 17 deletions(-)

diff --git a/be/src/exec/operator/result_file_sink_operator.cpp 
b/be/src/exec/operator/result_file_sink_operator.cpp
index dc282a53fcb..97de630d3e7 100644
--- a/be/src/exec/operator/result_file_sink_operator.cpp
+++ b/be/src/exec/operator/result_file_sink_operator.cpp
@@ -133,11 +133,20 @@ Status ResultFileSinkLocalState::close(RuntimeState* 
state, Status exec_status)
     if (_sender) {
         int64_t written_rows = _writer == nullptr ? 0 : 
_writer->get_written_rows();
         
state->get_query_ctx()->resource_ctx()->io_context()->update_returned_rows(written_rows);
-        RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), 
final_status, written_rows));
+        bool is_fully_closed = false;
+        RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), 
final_status, written_rows,
+                                       is_fully_closed));
+        // Schedule deferred cleanup only when the last instance closes the 
shared
+        // buffer.  In parallel outfile mode the buffer is keyed by query_id; 
in
+        // non-parallel mode it is keyed by fragment_instance_id.  Either way,
+        // _sender->buffer_id() returns the correct registration key, so there 
is
+        // no need to branch on enable_parallel_outfile here.
+        if (is_fully_closed) {
+            state->exec_env()->result_mgr()->cancel_at_time(
+                    time(nullptr) + 
config::result_buffer_cancelled_interval_time,
+                    _sender->buffer_id());
+        }
     }
-    state->exec_env()->result_mgr()->cancel_at_time(
-            time(nullptr) + config::result_buffer_cancelled_interval_time,
-            state->fragment_instance_id());
 
     return Base::close(state, exec_status);
 }
diff --git a/be/src/exec/operator/result_sink_operator.cpp 
b/be/src/exec/operator/result_sink_operator.cpp
index b361cb7b6e1..da684ffc8d1 100644
--- a/be/src/exec/operator/result_sink_operator.cpp
+++ b/be/src/exec/operator/result_sink_operator.cpp
@@ -197,11 +197,20 @@ Status ResultSinkLocalState::close(RuntimeState* state, 
Status exec_status) {
             
state->get_query_ctx()->resource_ctx()->io_context()->update_returned_rows(
                     written_rows);
         }
-        RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), 
final_status, written_rows));
+        bool is_fully_closed = false;
+        RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(), 
final_status, written_rows,
+                                       is_fully_closed));
+        // Schedule deferred cleanup only when the last instance closes the 
shared
+        // buffer.  In parallel result-sink mode the buffer is keyed by 
query_id;
+        // in non-parallel mode it is keyed by fragment_instance_id.  Either 
way,
+        // _sender->buffer_id() returns the correct registration key, so there 
is
+        // no need to branch on enable_parallel_result_sink here.
+        if (is_fully_closed) {
+            state->exec_env()->result_mgr()->cancel_at_time(
+                    time(nullptr) + 
config::result_buffer_cancelled_interval_time,
+                    _sender->buffer_id());
+        }
     }
-    state->exec_env()->result_mgr()->cancel_at_time(
-            time(nullptr) + config::result_buffer_cancelled_interval_time,
-            state->fragment_instance_id());
     RETURN_IF_ERROR(Base::close(state, exec_status));
     return final_status;
 }
diff --git a/be/src/runtime/result_block_buffer.cpp 
b/be/src/runtime/result_block_buffer.cpp
index 828af5da2c7..df595f29cc0 100644
--- a/be/src/runtime/result_block_buffer.cpp
+++ b/be/src/runtime/result_block_buffer.cpp
@@ -60,7 +60,7 @@ ResultBlockBuffer<ResultCtxType>::ResultBlockBuffer(TUniqueId 
id, RuntimeState*
 
 template <typename ResultCtxType>
 Status ResultBlockBuffer<ResultCtxType>::close(const TUniqueId& id, Status 
exec_status,
-                                               int64_t num_rows) {
+                                               int64_t num_rows, bool& 
is_fully_closed) {
     std::unique_lock<std::mutex> l(_lock);
     _returned_rows.fetch_add(num_rows);
     // close will be called multiple times and error status needs to be 
collected.
@@ -77,9 +77,13 @@ Status ResultBlockBuffer<ResultCtxType>::close(const 
TUniqueId& id, Status exec_
                                         print_id(id));
     }
     if (!_result_sink_dependencies.empty()) {
+        // Still waiting for other instances to finish; this is not the final 
close.
+        is_fully_closed = false;
         return _status;
     }
 
+    // All instances have closed: the buffer is now fully closed.
+    is_fully_closed = true;
     _is_close = true;
     _arrow_data_arrival.notify_all();
 
diff --git a/be/src/runtime/result_block_buffer.h 
b/be/src/runtime/result_block_buffer.h
index 40df3b0538b..a14d9e831ff 100644
--- a/be/src/runtime/result_block_buffer.h
+++ b/be/src/runtime/result_block_buffer.h
@@ -56,9 +56,21 @@ public:
     ResultBlockBufferBase() = default;
     virtual ~ResultBlockBufferBase() = default;
 
-    virtual Status close(const TUniqueId& id, Status exec_status, int64_t 
num_rows) = 0;
+    // Close one fragment instance's contribution to this buffer.  When the 
last
+    // registered instance calls close(), |is_fully_closed| is set to true,
+    // indicating that no more producers will write to this buffer and callers 
may
+    // safely schedule deferred cleanup.  The buffer is keyed in 
ResultBufferMgr
+    // under buffer_id(); use that id (not the per-instance 
fragment_instance_id)
+    // when scheduling cancel_at_time() for the deferred cleanup.
+    virtual Status close(const TUniqueId& id, Status exec_status, int64_t 
num_rows,
+                         bool& is_fully_closed) = 0;
     virtual void cancel(const Status& reason) = 0;
 
+    // The id under which this buffer was registered in ResultBufferMgr.
+    // In parallel result-sink mode this equals query_id; in non-parallel mode
+    // it equals fragment_instance_id.
+    [[nodiscard]] virtual const TUniqueId& buffer_id() const = 0;
+
     [[nodiscard]] virtual std::shared_ptr<MemTrackerLimiter> mem_tracker() = 0;
     virtual void set_dependency(const TUniqueId& id,
                                 std::shared_ptr<Dependency> 
result_sink_dependency) = 0;
@@ -74,9 +86,11 @@ public:
 
     Status add_batch(RuntimeState* state, std::shared_ptr<InBlockType>& 
result);
     Status get_batch(std::shared_ptr<ResultCtxType> ctx);
-    Status close(const TUniqueId& id, Status exec_status, int64_t num_rows) 
override;
+    Status close(const TUniqueId& id, Status exec_status, int64_t num_rows,
+                 bool& is_fully_closed) override;
     void cancel(const Status& reason) override;
 
+    [[nodiscard]] const TUniqueId& buffer_id() const override { return 
_fragment_id; }
     [[nodiscard]] std::shared_ptr<MemTrackerLimiter> mem_tracker() override { 
return _mem_tracker; }
     void set_dependency(const TUniqueId& id,
                         std::shared_ptr<Dependency> result_sink_dependency) 
override;
diff --git a/be/test/exec/sink/arrow_result_block_buffer_test.cpp 
b/be/test/exec/sink/arrow_result_block_buffer_test.cpp
index 3cb6939b78c..a87a03d1542 100644
--- a/be/test/exec/sink/arrow_result_block_buffer_test.cpp
+++ b/be/test/exec/sink/arrow_result_block_buffer_test.cpp
@@ -173,7 +173,9 @@ TEST_F(ArrowResultBlockBufferTest, 
TestArrowResultBlockBuffer) {
         EXPECT_FALSE(fail);
     }
     {
-        EXPECT_TRUE(buffer.close(ins_id, Status::OK(), 0).ok());
+        bool is_fully_closed = false;
+        EXPECT_TRUE(buffer.close(ins_id, Status::OK(), 0, 
is_fully_closed).ok());
+        EXPECT_TRUE(is_fully_closed);
         EXPECT_EQ(buffer._instance_rows[ins_id], 0);
         EXPECT_TRUE(buffer._instance_rows_in_queue.empty());
         EXPECT_EQ(buffer._waiting_rpc.size(), 0);
@@ -305,8 +307,10 @@ TEST_F(ArrowResultBlockBufferTest, TestErrorClose) {
         EXPECT_FALSE(fail);
     }
     {
-        EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0).code(),
+        bool is_fully_closed = false;
+        EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0, 
is_fully_closed).code(),
                   ErrorCode::INTERNAL_ERROR);
+        EXPECT_TRUE(is_fully_closed);
         EXPECT_EQ(buffer._instance_rows[ins_id], 0);
         EXPECT_TRUE(buffer._instance_rows_in_queue.empty());
         EXPECT_EQ(buffer._waiting_rpc.size(), 0);
@@ -324,8 +328,10 @@ TEST_F(ArrowResultBlockBufferTest, TestErrorClose) {
         new_ins_id.lo = 1;
         auto new_dep = Dependency::create_shared(0, 0, "Test", true);
         buffer.set_dependency(new_ins_id, new_dep);
-        EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0).code(),
+        bool is_fully_closed = true; // will be set to false since new_dep 
remains
+        EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0, 
is_fully_closed).code(),
                   ErrorCode::INTERNAL_ERROR);
+        EXPECT_FALSE(is_fully_closed);
         EXPECT_FALSE(data);
         EXPECT_FALSE(close);
         EXPECT_FALSE(fail);
diff --git a/be/test/exec/sink/result_block_buffer_test.cpp 
b/be/test/exec/sink/result_block_buffer_test.cpp
index 427bf0d8e71..46ea26c3d79 100644
--- a/be/test/exec/sink/result_block_buffer_test.cpp
+++ b/be/test/exec/sink/result_block_buffer_test.cpp
@@ -160,7 +160,9 @@ TEST_F(MysqlResultBlockBufferTest, 
TestMySQLResultBlockBuffer) {
         EXPECT_FALSE(fail);
     }
     {
-        EXPECT_TRUE(buffer.close(ins_id, Status::OK(), 0).ok());
+        bool is_fully_closed = false;
+        EXPECT_TRUE(buffer.close(ins_id, Status::OK(), 0, 
is_fully_closed).ok());
+        EXPECT_TRUE(is_fully_closed);
         EXPECT_EQ(buffer._instance_rows[ins_id], 0);
         EXPECT_TRUE(buffer._instance_rows_in_queue.empty());
         EXPECT_EQ(buffer._waiting_rpc.size(), 0);
@@ -289,8 +291,10 @@ TEST_F(MysqlResultBlockBufferTest, TestErrorClose) {
         EXPECT_FALSE(fail);
     }
     {
-        EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0).code(),
+        bool is_fully_closed = false;
+        EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0, 
is_fully_closed).code(),
                   ErrorCode::INTERNAL_ERROR);
+        EXPECT_TRUE(is_fully_closed);
         EXPECT_EQ(buffer._instance_rows[ins_id], 0);
         EXPECT_TRUE(buffer._instance_rows_in_queue.empty());
         EXPECT_EQ(buffer._waiting_rpc.size(), 0);
@@ -308,8 +312,10 @@ TEST_F(MysqlResultBlockBufferTest, TestErrorClose) {
         new_ins_id.lo = 1;
         auto new_dep = Dependency::create_shared(0, 0, "Test", true);
         buffer.set_dependency(new_ins_id, new_dep);
-        EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0).code(),
+        bool is_fully_closed = true; // will be set to false since new_dep 
remains
+        EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0, 
is_fully_closed).code(),
                   ErrorCode::INTERNAL_ERROR);
+        EXPECT_FALSE(is_fully_closed);
         EXPECT_FALSE(data);
         EXPECT_FALSE(close);
         EXPECT_FALSE(fail);


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to