pitrou commented on a change in pull request #9528:
URL: https://github.com/apache/arrow/pull/9528#discussion_r588183226



##########
File path: cpp/src/arrow/util/task_group.cc
##########
@@ -67,34 +75,54 @@ class SerialTaskGroup : public TaskGroup {
 
 class ThreadedTaskGroup : public TaskGroup {
  public:
-  explicit ThreadedTaskGroup(Executor* executor)
-      : executor_(executor), nremaining_(0), ok_(true) {}
+  ThreadedTaskGroup(Executor* executor, StopToken stop_token)
+      : executor_(executor),
+        stop_token_(std::move(stop_token)),
+        nremaining_(0),
+        ok_(true) {}
 
   ~ThreadedTaskGroup() override {
     // Make sure all pending tasks are finished, so that dangling references
     // to this don't persist.
     ARROW_UNUSED(Finish());
   }
 
-  void AppendReal(std::function<Status()> task) override {
+  void AppendReal(FnOnce<Status()> task) override {
     DCHECK(!finished_);
+    if (stop_token_.IsStopRequested()) {
+      UpdateStatus(stop_token_.Poll());
+      return;
+    }
+
     // The hot path is unlocked thanks to atomics
     // Only if an error occurs is the lock taken
     if (ok_.load(std::memory_order_acquire)) {
       nremaining_.fetch_add(1, std::memory_order_acquire);
 
       auto self = checked_pointer_cast<ThreadedTaskGroup>(shared_from_this());
-      Status st = executor_->Spawn(std::bind(
-          [](const std::shared_ptr<ThreadedTaskGroup>& self,
-             const std::function<Status()>& task) {
-            if (self->ok_.load(std::memory_order_acquire)) {
+
+      struct Callable {
+        void operator()() {
+          if (self_->ok_.load(std::memory_order_acquire)) {
+            Status st;
+            if (stop_token_.IsStopRequested()) {
+              st = stop_token_.Poll();
+            } else {
               // XXX what about exceptions?
-              Status st = task();
-              self->UpdateStatus(std::move(st));
+              st = std::move(task_)();
             }
-            self->OneTaskDone();
-          },
-          std::move(self), std::move(task)));
+            self_->UpdateStatus(std::move(st));
+          }
+          self_->OneTaskDone();
+        }
+
+        std::shared_ptr<ThreadedTaskGroup> self_;
+        FnOnce<Status()> task_;
+        StopToken stop_token_;

Review comment:
       Note to self: can use `self_->stop_token_` instead.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to