This is an automated email from the ASF dual-hosted git repository.
zanmato pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 0e7e70cfde GH-47384: [C++][Acero] Isolate BackpressureHandler from
ExecNode (#47386)
0e7e70cfde is described below
commit 0e7e70cfdef4efa287495272649c071a700c34fa
Author: gitmodimo <[email protected]>
AuthorDate: Fri Sep 19 20:20:49 2025 +0200
GH-47384: [C++][Acero] Isolate BackpressureHandler from ExecNode (#47386)
### Rationale for this change
Current BackpressureHandler needs to be provided with ExecNode, however the
backpressure concept can be applied outside of ExecNode. It is currently needed
to Facilitate ForceShutdown in AsofJoinNode. Current implementation however is
not elegant and may still lead to deadlock after ForceShutdown in extreme
case - when several batched get pushed after ForceShutdown and exceed the
threshold.
### What changes are included in this PR?
-Remove ForceShutdown from BackpressureHandler.
-Reimplement ForceShutdown in BackpressureConcurrentQueue as onetime
nonrecoverable queue clear that effectively unpauses source using handler.
### Are these changes tested?
There is no new functionality. Current tests should cover it.
### Are there any user-facing changes?
No
* GitHub Issue: #47384
Lead-authored-by: RafaĆ Hibner <[email protected]>
Co-authored-by: Rossi Sun <[email protected]>
Co-authored-by: gitmodimo <[email protected]>
Signed-off-by: Rossi Sun <[email protected]>
---
cpp/src/arrow/acero/asof_join_node.cc | 23 ++++++++++++--------
cpp/src/arrow/acero/backpressure_handler.h | 18 ++++-----------
cpp/src/arrow/acero/concurrent_queue_internal.h | 12 ++++++++--
cpp/src/arrow/acero/sorted_merge_node.cc | 2 +-
cpp/src/arrow/acero/util_test.cc | 29 ++++++++++++++++++++-----
5 files changed, 52 insertions(+), 32 deletions(-)
diff --git a/cpp/src/arrow/acero/asof_join_node.cc
b/cpp/src/arrow/acero/asof_join_node.cc
index 55fa45543e..3970050e50 100644
--- a/cpp/src/arrow/acero/asof_join_node.cc
+++ b/cpp/src/arrow/acero/asof_join_node.cc
@@ -514,9 +514,9 @@ class InputState : public
util::SerialSequencingQueue::Processor {
std::unique_ptr<BackpressureControl> backpressure_control =
std::make_unique<BackpressureController>(
/*node=*/asof_input, /*output=*/asof_node, backpressure_counter);
- ARROW_ASSIGN_OR_RAISE(
- auto handler, BackpressureHandler::Make(asof_input, low_threshold,
high_threshold,
-
std::move(backpressure_control)));
+ ARROW_ASSIGN_OR_RAISE(auto handler,
+ BackpressureHandler::Make(low_threshold,
high_threshold,
+
std::move(backpressure_control)));
return std::make_unique<InputState>(index, tolerance, must_hash,
may_rehash,
key_hasher, asof_node,
std::move(handler), schema,
time_col_index, key_col_index);
@@ -763,10 +763,10 @@ class InputState : public
util::SerialSequencingQueue::Processor {
total_batches_ = n;
}
- Status ForceShutdown() {
+ void ForceShutdown() {
// Force the upstream input node to unpause. Necessary to avoid deadlock
when we
// terminate the process thread
- return queue_.ForceShutdown();
+ queue_.ForceShutdown();
}
private:
@@ -1046,8 +1046,10 @@ class AsofJoinNode : public ExecNode {
if (st.ok()) {
st = output_->InputFinished(this, batches_produced_);
}
- for (const auto& s : state_) {
- st &= s->ForceShutdown();
+ for (size_t i = 0; i < state_.size(); ++i) {
+ const auto& s = state_[i];
+ s->ForceShutdown();
+ st &= inputs_[i]->StopProducing();
}
}));
}
@@ -1499,8 +1501,11 @@ class AsofJoinNode : public ExecNode {
if (st.ok()) {
st = output_->InputFinished(this, batches_produced_);
}
- for (const auto& s : state_) {
- st &= s->ForceShutdown();
+
+ for (size_t i = 0; i < state_.size(); ++i) {
+ const auto& s = state_[i];
+ s->ForceShutdown();
+ st &= inputs_[i]->StopProducing();
}
}
diff --git a/cpp/src/arrow/acero/backpressure_handler.h
b/cpp/src/arrow/acero/backpressure_handler.h
index db6c379935..c6a47e6019 100644
--- a/cpp/src/arrow/acero/backpressure_handler.h
+++ b/cpp/src/arrow/acero/backpressure_handler.h
@@ -25,16 +25,15 @@ namespace arrow::acero {
class BackpressureHandler {
private:
- BackpressureHandler(ExecNode* input, size_t low_threshold, size_t
high_threshold,
+ BackpressureHandler(size_t low_threshold, size_t high_threshold,
std::unique_ptr<BackpressureControl>
backpressure_control)
- : input_(input),
- low_threshold_(low_threshold),
+ : low_threshold_(low_threshold),
high_threshold_(high_threshold),
backpressure_control_(std::move(backpressure_control)) {}
public:
static Result<BackpressureHandler> Make(
- ExecNode* input, size_t low_threshold, size_t high_threshold,
+ size_t low_threshold, size_t high_threshold,
std::unique_ptr<BackpressureControl> backpressure_control) {
if (low_threshold >= high_threshold) {
return Status::Invalid("low threshold (", low_threshold,
@@ -43,7 +42,7 @@ class BackpressureHandler {
if (backpressure_control == NULLPTR) {
return Status::Invalid("null backpressure control parameter");
}
- BackpressureHandler backpressure_handler(input, low_threshold,
high_threshold,
+ BackpressureHandler backpressure_handler(low_threshold, high_threshold,
std::move(backpressure_control));
return backpressure_handler;
}
@@ -56,16 +55,7 @@ class BackpressureHandler {
}
}
- Status ForceShutdown() {
- // It may be unintuitive to call Resume() here, but this is to avoid a
deadlock.
- // Since acero's executor won't terminate if any one node is paused, we
need to
- // force resume the node before stopping production.
- backpressure_control_->Resume();
- return input_->StopProducing();
- }
-
private:
- ExecNode* input_;
size_t low_threshold_;
size_t high_threshold_;
std::unique_ptr<BackpressureControl> backpressure_control_;
diff --git a/cpp/src/arrow/acero/concurrent_queue_internal.h
b/cpp/src/arrow/acero/concurrent_queue_internal.h
index a751db7026..b91daae8b0 100644
--- a/cpp/src/arrow/acero/concurrent_queue_internal.h
+++ b/cpp/src/arrow/acero/concurrent_queue_internal.h
@@ -113,7 +113,7 @@ class ConcurrentQueue {
};
template <typename T>
-class BackpressureConcurrentQueue : public ConcurrentQueue<T> {
+class BackpressureConcurrentQueue : private ConcurrentQueue<T> {
private:
struct DoHandle {
explicit DoHandle(BackpressureConcurrentQueue& queue)
@@ -134,6 +134,9 @@ class BackpressureConcurrentQueue : public
ConcurrentQueue<T> {
explicit BackpressureConcurrentQueue(BackpressureHandler handler)
: handler_(std::move(handler)) {}
+ using ConcurrentQueue<T>::Empty;
+ using ConcurrentQueue<T>::Front;
+
// Pops the last item from the queue but waits if the queue is empty until
new items are
// pushed.
T WaitAndPop() {
@@ -152,6 +155,7 @@ class BackpressureConcurrentQueue : public
ConcurrentQueue<T> {
// Pushes an item to the queue
void Push(const T& item) {
+ if (shutdown_) return;
std::unique_lock<std::mutex> lock(ConcurrentQueue<T>::GetMutex());
DoHandle do_handle(*this);
ConcurrentQueue<T>::PushUnlocked(item);
@@ -164,10 +168,14 @@ class BackpressureConcurrentQueue : public
ConcurrentQueue<T> {
ConcurrentQueue<T>::ClearUnlocked();
}
- Status ForceShutdown() { return handler_.ForceShutdown(); }
+ void ForceShutdown() {
+ shutdown_ = true;
+ Clear();
+ }
private:
BackpressureHandler handler_;
+ std::atomic<bool> shutdown_{false};
};
} // namespace arrow::acero
diff --git a/cpp/src/arrow/acero/sorted_merge_node.cc
b/cpp/src/arrow/acero/sorted_merge_node.cc
index 37997232cd..43f5b7b930 100644
--- a/cpp/src/arrow/acero/sorted_merge_node.cc
+++ b/cpp/src/arrow/acero/sorted_merge_node.cc
@@ -119,7 +119,7 @@ class InputState {
std::unique_ptr<arrow::acero::BackpressureControl> backpressure_control =
std::make_unique<BackpressureController>(input, output,
backpressure_counter);
ARROW_ASSIGN_OR_RAISE(auto handler,
- BackpressureHandler::Make(input, low_threshold,
high_threshold,
+ BackpressureHandler::Make(low_threshold,
high_threshold,
std::move(backpressure_control)));
return PtrType(new InputState(index, std::move(handler), schema,
time_col_index));
}
diff --git a/cpp/src/arrow/acero/util_test.cc b/cpp/src/arrow/acero/util_test.cc
index d86577d358..557574a40c 100644
--- a/cpp/src/arrow/acero/util_test.cc
+++ b/cpp/src/arrow/acero/util_test.cc
@@ -263,8 +263,7 @@ class TestBackpressureControl : public BackpressureControl {
TEST(BackpressureConcurrentQueue, BasicTest) {
BackpressureTestExecNode dummy_node;
auto ctrl = std::make_unique<TestBackpressureControl>(&dummy_node);
- ASSERT_OK_AND_ASSIGN(auto handler,
- BackpressureHandler::Make(&dummy_node, 2, 4,
std::move(ctrl)));
+ ASSERT_OK_AND_ASSIGN(auto handler, BackpressureHandler::Make(2, 4,
std::move(ctrl)));
BackpressureConcurrentQueue<int> queue(std::move(handler));
ConcurrentQueueBasicTest(queue);
@@ -275,8 +274,7 @@ TEST(BackpressureConcurrentQueue, BasicTest) {
TEST(BackpressureConcurrentQueue, BackpressureTest) {
BackpressureTestExecNode dummy_node;
auto ctrl = std::make_unique<TestBackpressureControl>(&dummy_node);
- ASSERT_OK_AND_ASSIGN(auto handler,
- BackpressureHandler::Make(&dummy_node, 2, 4,
std::move(ctrl)));
+ ASSERT_OK_AND_ASSIGN(auto handler, BackpressureHandler::Make(2, 4,
std::move(ctrl)));
BackpressureConcurrentQueue<int> queue(std::move(handler));
queue.Push(6);
@@ -299,9 +297,28 @@ TEST(BackpressureConcurrentQueue, BackpressureTest) {
queue.Push(11);
ASSERT_TRUE(dummy_node.paused);
ASSERT_FALSE(dummy_node.stopped);
- ASSERT_OK(queue.ForceShutdown());
+ queue.ForceShutdown();
+ ASSERT_FALSE(dummy_node.paused);
+}
+
+TEST(BackpressureConcurrentQueue, BackpressureTestStayUnpaused) {
+ BackpressureTestExecNode dummy_node;
+ auto ctrl = std::make_unique<TestBackpressureControl>(&dummy_node);
+ ASSERT_OK_AND_ASSIGN(
+ auto handler, BackpressureHandler::Make(/*low_threshold=*/2,
/*high_threshold=*/4,
+ std::move(ctrl)));
+ BackpressureConcurrentQueue<int> queue(std::move(handler));
+
+ queue.Push(6);
+ queue.Push(7);
+ queue.Push(8);
+ ASSERT_FALSE(dummy_node.paused);
+ ASSERT_FALSE(dummy_node.stopped);
+ queue.ForceShutdown();
+ for (int i = 0; i < 10; ++i) {
+ queue.Push(i);
+ }
ASSERT_FALSE(dummy_node.paused);
- ASSERT_TRUE(dummy_node.stopped);
}
} // namespace acero