lidavidm commented on code in PR #14257:
URL: https://github.com/apache/arrow/pull/14257#discussion_r988308882


##########
cpp/src/arrow/util/async_util.cc:
##########
@@ -190,22 +244,17 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler {
     struct Finalizer {
       void operator()(const Status& st) {
         std::unique_lock<std::mutex> lk(self->mutex_);
-        FnOnce<Status()> finish_callback;
+        FnOnce<Status(Status)> finish_callback =
+            std::move((*child_itr)->finish_callback_);
+        self->sub_schedulers_.erase(child_itr);
+        lk.unlock();
+        Status finish_st = std::move(finish_callback)(st);
+        lk.lock();
+        self->running_tasks_--;
         if (!st.ok()) {
-          self->running_tasks_--;
           self->AbortUnlocked(st, std::move(lk));
           return;
-        } else {
-          // We only eagerly erase the sub-scheduler on a successful 
completion.  This is
-          // because, if the sub-scheduler aborted, then the caller of 
MakeSubScheduler
-          // might still be planning to call End

Review Comment:
   Hmm, why is this not a concern anymore? Because OnFinished() won't actually 
finish until the owner calls End()?



##########
cpp/src/arrow/util/async_util.h:
##########
@@ -311,6 +306,8 @@ class ARROW_EXPORT AsyncTaskScheduler {
   ///        The default (nullptr) will use a FIFO queue if there is a 
throttle.
   static std::unique_ptr<AsyncTaskScheduler> Make(Throttle* throttle = NULLPTR,
                                                   std::unique_ptr<Queue> queue 
= NULLPTR);
+
+  virtual bool IsEnded() = 0;

Review Comment:
   nit: `const`, docstring? (though it seems obvious)
   
   I suppose usage of this may also be prone to time-of-check/time-of-use 
errors, though I see it's only for testing, so that's OK



##########
cpp/src/arrow/util/async_util.cc:
##########
@@ -98,13 +98,53 @@ class FifoQueue : public AsyncTaskScheduler::Queue {
   std::list<std::unique_ptr<Task>> tasks_;
 };
 
+class AlreadyFailedScheduler : public AsyncTaskScheduler {
+ public:
+  explicit AlreadyFailedScheduler(Status failure_reason,
+                                  FnOnce<Status(Status)> finish_callback)
+      : failure_reason_(std::move(failure_reason)),
+        finish_callback_(std::move(finish_callback)) {}
+  bool AddTask(std::unique_ptr<Task> task) override { return false; }
+  void End() override {
+    std::ignore = std::move(finish_callback_)(failure_reason_);
+    self.reset();
+  }
+  Future<> OnFinished() const override {
+    DCHECK(false) << "You should not rely on sub-scheduler's OnFinished.  Use 
a "
+                     "finished callback when creating the sub-scheduler 
instead";
+    return Future<>::MakeFinished(Status::UnknownError("Unreachable code 
encountered"));
+  }
+  AsyncTaskScheduler* MakeSubScheduler(FnOnce<Status(Status)> finish_callback,
+                                       Throttle* throttle,
+                                       std::unique_ptr<Queue> queue) override {
+    return AlreadyFailedScheduler::Make(failure_reason_, 
std::move(finish_callback));
+  }

Review Comment:
   This is maybe an unimportant edge case but it seems this leaves the parent 
sub-scheduler with an invalid callback, which might be an issue?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to