This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.1 by this push:
new 37d0eb73f1a [Bug](sink) fix cancel_at_time not work at
parallel_sink/parallel_outfile (#61740)
37d0eb73f1a is described below
commit 37d0eb73f1a86788c3a45e1b1aee2d49581c9d0b
Author: Pxl <[email protected]>
AuthorDate: Thu Mar 26 15:43:21 2026 +0800
[Bug](sink) fix cancel_at_time not work at parallel_sink/parallel_outfile
(#61740)
pick #61739
---
be/src/exec/operator/result_file_sink_operator.cpp | 17 +++++++++++++----
be/src/exec/operator/result_sink_operator.cpp | 17 +++++++++++++----
be/src/runtime/result_block_buffer.cpp | 6 +++++-
be/src/runtime/result_block_buffer.h | 18 ++++++++++++++++--
be/test/exec/sink/arrow_result_block_buffer_test.cpp | 12 +++++++++---
be/test/exec/sink/result_block_buffer_test.cpp | 12 +++++++++---
6 files changed, 65 insertions(+), 17 deletions(-)
diff --git a/be/src/exec/operator/result_file_sink_operator.cpp
b/be/src/exec/operator/result_file_sink_operator.cpp
index dc282a53fcb..97de630d3e7 100644
--- a/be/src/exec/operator/result_file_sink_operator.cpp
+++ b/be/src/exec/operator/result_file_sink_operator.cpp
@@ -133,11 +133,20 @@ Status ResultFileSinkLocalState::close(RuntimeState*
state, Status exec_status)
if (_sender) {
int64_t written_rows = _writer == nullptr ? 0 :
_writer->get_written_rows();
state->get_query_ctx()->resource_ctx()->io_context()->update_returned_rows(written_rows);
- RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(),
final_status, written_rows));
+ bool is_fully_closed = false;
+ RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(),
final_status, written_rows,
+ is_fully_closed));
+ // Schedule deferred cleanup only when the last instance closes the
shared
+ // buffer. In parallel outfile mode the buffer is keyed by query_id;
in
+ // non-parallel mode it is keyed by fragment_instance_id. Either way,
+ // _sender->buffer_id() returns the correct registration key, so there
is
+ // no need to branch on enable_parallel_outfile here.
+ if (is_fully_closed) {
+ state->exec_env()->result_mgr()->cancel_at_time(
+ time(nullptr) +
config::result_buffer_cancelled_interval_time,
+ _sender->buffer_id());
+ }
}
- state->exec_env()->result_mgr()->cancel_at_time(
- time(nullptr) + config::result_buffer_cancelled_interval_time,
- state->fragment_instance_id());
return Base::close(state, exec_status);
}
diff --git a/be/src/exec/operator/result_sink_operator.cpp
b/be/src/exec/operator/result_sink_operator.cpp
index b361cb7b6e1..da684ffc8d1 100644
--- a/be/src/exec/operator/result_sink_operator.cpp
+++ b/be/src/exec/operator/result_sink_operator.cpp
@@ -197,11 +197,20 @@ Status ResultSinkLocalState::close(RuntimeState* state,
Status exec_status) {
state->get_query_ctx()->resource_ctx()->io_context()->update_returned_rows(
written_rows);
}
- RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(),
final_status, written_rows));
+ bool is_fully_closed = false;
+ RETURN_IF_ERROR(_sender->close(state->fragment_instance_id(),
final_status, written_rows,
+ is_fully_closed));
+ // Schedule deferred cleanup only when the last instance closes the
shared
+ // buffer. In parallel result-sink mode the buffer is keyed by
query_id;
+ // in non-parallel mode it is keyed by fragment_instance_id. Either
way,
+ // _sender->buffer_id() returns the correct registration key, so there
is
+ // no need to branch on enable_parallel_result_sink here.
+ if (is_fully_closed) {
+ state->exec_env()->result_mgr()->cancel_at_time(
+ time(nullptr) +
config::result_buffer_cancelled_interval_time,
+ _sender->buffer_id());
+ }
}
- state->exec_env()->result_mgr()->cancel_at_time(
- time(nullptr) + config::result_buffer_cancelled_interval_time,
- state->fragment_instance_id());
RETURN_IF_ERROR(Base::close(state, exec_status));
return final_status;
}
diff --git a/be/src/runtime/result_block_buffer.cpp
b/be/src/runtime/result_block_buffer.cpp
index 828af5da2c7..df595f29cc0 100644
--- a/be/src/runtime/result_block_buffer.cpp
+++ b/be/src/runtime/result_block_buffer.cpp
@@ -60,7 +60,7 @@ ResultBlockBuffer<ResultCtxType>::ResultBlockBuffer(TUniqueId
id, RuntimeState*
template <typename ResultCtxType>
Status ResultBlockBuffer<ResultCtxType>::close(const TUniqueId& id, Status
exec_status,
- int64_t num_rows) {
+ int64_t num_rows, bool&
is_fully_closed) {
std::unique_lock<std::mutex> l(_lock);
_returned_rows.fetch_add(num_rows);
// close will be called multiple times and error status needs to be
collected.
@@ -77,9 +77,13 @@ Status ResultBlockBuffer<ResultCtxType>::close(const
TUniqueId& id, Status exec_
print_id(id));
}
if (!_result_sink_dependencies.empty()) {
+ // Still waiting for other instances to finish; this is not the final
close.
+ is_fully_closed = false;
return _status;
}
+ // All instances have closed: the buffer is now fully closed.
+ is_fully_closed = true;
_is_close = true;
_arrow_data_arrival.notify_all();
diff --git a/be/src/runtime/result_block_buffer.h
b/be/src/runtime/result_block_buffer.h
index 40df3b0538b..a14d9e831ff 100644
--- a/be/src/runtime/result_block_buffer.h
+++ b/be/src/runtime/result_block_buffer.h
@@ -56,9 +56,21 @@ public:
ResultBlockBufferBase() = default;
virtual ~ResultBlockBufferBase() = default;
- virtual Status close(const TUniqueId& id, Status exec_status, int64_t
num_rows) = 0;
+ // Close one fragment instance's contribution to this buffer. When the
last
+ // registered instance calls close(), |is_fully_closed| is set to true,
+ // indicating that no more producers will write to this buffer and callers
may
+ // safely schedule deferred cleanup. The buffer is keyed in
ResultBufferMgr
+ // under buffer_id(); use that id (not the per-instance
fragment_instance_id)
+ // when scheduling cancel_at_time() for the deferred cleanup.
+ virtual Status close(const TUniqueId& id, Status exec_status, int64_t
num_rows,
+ bool& is_fully_closed) = 0;
virtual void cancel(const Status& reason) = 0;
+ // The id under which this buffer was registered in ResultBufferMgr.
+ // In parallel result-sink mode this equals query_id; in non-parallel mode
+ // it equals fragment_instance_id.
+ [[nodiscard]] virtual const TUniqueId& buffer_id() const = 0;
+
[[nodiscard]] virtual std::shared_ptr<MemTrackerLimiter> mem_tracker() = 0;
virtual void set_dependency(const TUniqueId& id,
std::shared_ptr<Dependency>
result_sink_dependency) = 0;
@@ -74,9 +86,11 @@ public:
Status add_batch(RuntimeState* state, std::shared_ptr<InBlockType>&
result);
Status get_batch(std::shared_ptr<ResultCtxType> ctx);
- Status close(const TUniqueId& id, Status exec_status, int64_t num_rows)
override;
+ Status close(const TUniqueId& id, Status exec_status, int64_t num_rows,
+ bool& is_fully_closed) override;
void cancel(const Status& reason) override;
+ [[nodiscard]] const TUniqueId& buffer_id() const override { return
_fragment_id; }
[[nodiscard]] std::shared_ptr<MemTrackerLimiter> mem_tracker() override {
return _mem_tracker; }
void set_dependency(const TUniqueId& id,
std::shared_ptr<Dependency> result_sink_dependency)
override;
diff --git a/be/test/exec/sink/arrow_result_block_buffer_test.cpp
b/be/test/exec/sink/arrow_result_block_buffer_test.cpp
index 3cb6939b78c..a87a03d1542 100644
--- a/be/test/exec/sink/arrow_result_block_buffer_test.cpp
+++ b/be/test/exec/sink/arrow_result_block_buffer_test.cpp
@@ -173,7 +173,9 @@ TEST_F(ArrowResultBlockBufferTest,
TestArrowResultBlockBuffer) {
EXPECT_FALSE(fail);
}
{
- EXPECT_TRUE(buffer.close(ins_id, Status::OK(), 0).ok());
+ bool is_fully_closed = false;
+ EXPECT_TRUE(buffer.close(ins_id, Status::OK(), 0,
is_fully_closed).ok());
+ EXPECT_TRUE(is_fully_closed);
EXPECT_EQ(buffer._instance_rows[ins_id], 0);
EXPECT_TRUE(buffer._instance_rows_in_queue.empty());
EXPECT_EQ(buffer._waiting_rpc.size(), 0);
@@ -305,8 +307,10 @@ TEST_F(ArrowResultBlockBufferTest, TestErrorClose) {
EXPECT_FALSE(fail);
}
{
- EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0).code(),
+ bool is_fully_closed = false;
+ EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0,
is_fully_closed).code(),
ErrorCode::INTERNAL_ERROR);
+ EXPECT_TRUE(is_fully_closed);
EXPECT_EQ(buffer._instance_rows[ins_id], 0);
EXPECT_TRUE(buffer._instance_rows_in_queue.empty());
EXPECT_EQ(buffer._waiting_rpc.size(), 0);
@@ -324,8 +328,10 @@ TEST_F(ArrowResultBlockBufferTest, TestErrorClose) {
new_ins_id.lo = 1;
auto new_dep = Dependency::create_shared(0, 0, "Test", true);
buffer.set_dependency(new_ins_id, new_dep);
- EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0).code(),
+ bool is_fully_closed = true; // will be set to false since new_dep
remains
+ EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0,
is_fully_closed).code(),
ErrorCode::INTERNAL_ERROR);
+ EXPECT_FALSE(is_fully_closed);
EXPECT_FALSE(data);
EXPECT_FALSE(close);
EXPECT_FALSE(fail);
diff --git a/be/test/exec/sink/result_block_buffer_test.cpp
b/be/test/exec/sink/result_block_buffer_test.cpp
index 427bf0d8e71..46ea26c3d79 100644
--- a/be/test/exec/sink/result_block_buffer_test.cpp
+++ b/be/test/exec/sink/result_block_buffer_test.cpp
@@ -160,7 +160,9 @@ TEST_F(MysqlResultBlockBufferTest,
TestMySQLResultBlockBuffer) {
EXPECT_FALSE(fail);
}
{
- EXPECT_TRUE(buffer.close(ins_id, Status::OK(), 0).ok());
+ bool is_fully_closed = false;
+ EXPECT_TRUE(buffer.close(ins_id, Status::OK(), 0,
is_fully_closed).ok());
+ EXPECT_TRUE(is_fully_closed);
EXPECT_EQ(buffer._instance_rows[ins_id], 0);
EXPECT_TRUE(buffer._instance_rows_in_queue.empty());
EXPECT_EQ(buffer._waiting_rpc.size(), 0);
@@ -289,8 +291,10 @@ TEST_F(MysqlResultBlockBufferTest, TestErrorClose) {
EXPECT_FALSE(fail);
}
{
- EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0).code(),
+ bool is_fully_closed = false;
+ EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0,
is_fully_closed).code(),
ErrorCode::INTERNAL_ERROR);
+ EXPECT_TRUE(is_fully_closed);
EXPECT_EQ(buffer._instance_rows[ins_id], 0);
EXPECT_TRUE(buffer._instance_rows_in_queue.empty());
EXPECT_EQ(buffer._waiting_rpc.size(), 0);
@@ -308,8 +312,10 @@ TEST_F(MysqlResultBlockBufferTest, TestErrorClose) {
new_ins_id.lo = 1;
auto new_dep = Dependency::create_shared(0, 0, "Test", true);
buffer.set_dependency(new_ins_id, new_dep);
- EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0).code(),
+ bool is_fully_closed = true; // will be set to false since new_dep
remains
+ EXPECT_EQ(buffer.close(ins_id, Status::InternalError(""), 0,
is_fully_closed).code(),
ErrorCode::INTERNAL_ERROR);
+ EXPECT_FALSE(is_fully_closed);
EXPECT_FALSE(data);
EXPECT_FALSE(close);
EXPECT_FALSE(fail);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]