This is an automated email from the ASF dual-hosted git repository.
pitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new e2b44378ed GH-47642: [C++] Catch exceptions from initial_task in
AsyncTaskScheduler (#49860)
e2b44378ed is described below
commit e2b44378edf0ccceda07e8a6823890bd4ecc1952
Author: egolearner <[email protected]>
AuthorDate: Mon Jun 1 18:57:20 2026 +0800
GH-47642: [C++] Catch exceptions from initial_task in AsyncTaskScheduler
(#49860)
### Rationale for this change
If the `initial_task` passed to `AsyncTaskScheduler::Make` throws a C++
exception (rather than returning a failed `Status`), `OnTaskFinished` is never
called. This leaves `running_tasks_` permanently at 1, causing a `DCHECK`
failure in debug builds and an indefinite hang in release builds because the
scheduler's `finished` future is never completed. In Acero, this manifests as
`DeclarationToTable` (and similar APIs) hanging forever when a `SourceNode`
generator throws during `StartPro [...]
### What changes are included in this PR?
Add exception handling logic.
### Are these changes tested?
Yes.
### Are there any user-facing changes?
No API changes.
* GitHub Issue: #47642
Lead-authored-by: egolearner <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
cpp/src/arrow/util/async_util.cc | 15 ++++++++++++++-
cpp/src/arrow/util/async_util_test.cc | 27 +++++++++++++++++++++++++++
2 files changed, 41 insertions(+), 1 deletion(-)
diff --git a/cpp/src/arrow/util/async_util.cc b/cpp/src/arrow/util/async_util.cc
index f8b979a3f5..76537fbac9 100644
--- a/cpp/src/arrow/util/async_util.cc
+++ b/cpp/src/arrow/util/async_util.cc
@@ -23,6 +23,7 @@
#include "arrow/util/tracing_internal.h"
#include <condition_variable>
+#include <exception>
#include <list>
#include <memory>
#include <mutex>
@@ -466,7 +467,19 @@ Future<>
AsyncTaskScheduler::Make(FnOnce<Status(AsyncTaskScheduler*)> initial_ta
auto scope = START_SCOPED_SPAN_SV(span, "AsyncTaskScheduler::InitialTask"sv);
auto scheduler =
std::make_unique<AsyncTaskSchedulerImpl>(std::move(stop_token),
std::move(abort_callback));
- Status initial_task_st = std::move(initial_task)(scheduler.get());
+ Status initial_task_st;
+ // GH-47642: We normally don't catch exceptions in Arrow C++ code, as the
error
+ // reporting model uses the Status object instead. Usually, an uncaught
exception
+ // will simply terminate the process, surfacing the programming error.
+ // However, an exception thrown from the initial task would result in a much
+ // harder to diagnose process hang.
+ try {
+ initial_task_st = std::move(initial_task)(scheduler.get());
+ } catch (const std::exception& e) {
+ initial_task_st = Status::UnknownError("Initial task threw an exception:
", e.what());
+ } catch (...) {
+ initial_task_st = Status::UnknownError("Initial task threw an unknown
exception");
+ }
scheduler->OnTaskFinished(std::move(initial_task_st));
// Keep scheduler alive until finished
return scheduler->OnFinished().Then([scheduler = std::move(scheduler)] {});
diff --git a/cpp/src/arrow/util/async_util_test.cc
b/cpp/src/arrow/util/async_util_test.cc
index 1f9aad453e..30b9eafe14 100644
--- a/cpp/src/arrow/util/async_util_test.cc
+++ b/cpp/src/arrow/util/async_util_test.cc
@@ -23,9 +23,11 @@
#include <memory>
#include <mutex>
#include <queue>
+#include <stdexcept>
#include <thread>
#include <unordered_set>
+#include <gmock/gmock-matchers.h>
#include <gtest/gtest.h>
#include "arrow/result.h"
@@ -204,6 +206,31 @@ TEST(AsyncTaskScheduler, InitialTaskFails) {
ASSERT_FINISHES_AND_RAISES(Invalid, finished);
}
+TEST(AsyncTaskScheduler, InitialTaskThrowsException) {
+ // If the initial task throws a C++ exception (not a Status), the scheduler
+ // should catch it, convert to a failed Status, and not hang indefinitely.
+ // See https://github.com/apache/arrow/issues/47642
+
+ // Case 1: initial task throws with no other tasks
+ Future<> finished =
+ AsyncTaskScheduler::Make([&](AsyncTaskScheduler* scheduler) -> Status {
+ throw std::runtime_error("some exception");
+ });
+ EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT(
+ UnknownError, ::testing::HasSubstr("some exception"), finished);
+
+ // Case 2: initial task throws while another task is still running
+ Future<> task = Future<>::Make();
+ finished = AsyncTaskScheduler::Make([&](AsyncTaskScheduler* scheduler) ->
Status {
+ EXPECT_TRUE(scheduler->AddSimpleTask([&]() { return task; }, kDummyName));
+ throw std::runtime_error("some exception after adding task");
+ });
+ AssertNotFinished(finished);
+ task.MarkFinished();
+ EXPECT_FINISHES_AND_RAISES_WITH_MESSAGE_THAT(
+ UnknownError, ::testing::HasSubstr("some exception after adding task"),
finished);
+}
+
TEST(AsyncTaskScheduler, TaskDestroyedBeforeSchedulerEnds) {
bool my_task_destroyed = false;
Future<> task_fut = Future<>::Make();