pitrou commented on code in PR #14257:
URL: https://github.com/apache/arrow/pull/14257#discussion_r990184385
##########
cpp/src/arrow/util/async_util.cc:
##########
@@ -98,13 +98,54 @@ class FifoQueue : public AsyncTaskScheduler::Queue {
std::list<std::unique_ptr<Task>> tasks_;
};
+class AlreadyFailedScheduler : public AsyncTaskScheduler {
+ public:
+ explicit AlreadyFailedScheduler(Status failure_reason,
+ FnOnce<Status(Status)> finish_callback)
+ : failure_reason_(std::move(failure_reason)),
+ finish_callback_(std::move(finish_callback)) {}
+ bool AddTask(std::unique_ptr<Task> task) override { return false; }
+ void End() override {
+ std::ignore = std::move(finish_callback_)(failure_reason_);
+ self.reset();
+ }
+ Future<> OnFinished() const override {
+ Status::UnknownError(
+ "You should not rely on sub-scheduler's OnFinished. Use a "
+ "finished callback when creating the sub-scheduler instead")
+ .Abort();
+ }
+ AsyncTaskScheduler* MakeSubScheduler(FnOnce<Status(Status)> finish_callback,
+ Throttle* throttle,
+ std::unique_ptr<Queue> queue) override {
+ return AlreadyFailedScheduler::Make(failure_reason_,
std::move(finish_callback));
+ }
+ static AsyncTaskScheduler* Make(Status failure,
+ FnOnce<Status(Status)> finish_callback) {
+ DCHECK(!failure.ok());
+ std::unique_ptr<AlreadyFailedScheduler> instance =
+ std::make_unique<AlreadyFailedScheduler>(std::move(failure),
+ std::move(finish_callback));
+ AsyncTaskScheduler* view = instance.get();
+ instance->self = std::move(instance);
+ return view;
+ }
+ // This is deleted when ended so there is no possible way for this to return
true
+ bool IsEnded() override { return false; }
+
+ private:
+ Status failure_reason_;
+ FnOnce<Status(Status)> finish_callback_;
+ std::unique_ptr<AlreadyFailedScheduler> self;
Review Comment:
This is a weird pattern. It seems to make the object essentially immortal
unless End() is called. It would be much better to make the object owned by
something else.
##########
cpp/src/arrow/util/async_util.h:
##########
@@ -311,6 +306,13 @@ class ARROW_EXPORT AsyncTaskScheduler {
/// The default (nullptr) will use a FIFO queue if there is a
throttle.
static std::unique_ptr<AsyncTaskScheduler> Make(Throttle* throttle = NULLPTR,
std::unique_ptr<Queue> queue
= NULLPTR);
+
+ /// Check to see if the scheduler is currently ended
+ ///
+ /// This method is primarily for testing purposes and won't normally need to
be
+ /// called to use the scheduler. Note that a return value of false is not
conclusive as
+ /// the scheudler may end immediately after the call.
Review Comment:
```suggestion
/// the scheduler may end immediately after the call.
```
##########
cpp/src/arrow/util/async_util_test.cc:
##########
@@ -157,10 +154,60 @@ TEST(AsyncTaskScheduler, TaskFailsAfterEnd) {
ASSERT_FINISHES_AND_RAISES(Invalid, scheduler->OnFinished());
}
+FnOnce<Status(Status)> EmptyFinishCallback() {
+ return [](Status) { return Status::OK(); };
+}
+
+#ifndef ARROW_VALGRIND
+TEST(AsyncTaskScheduler, FailingTaskStress) {
+ // Test many tasks failling at the same time
Review Comment:
```suggestion
// Test many tasks failing at the same time
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]