lidavidm commented on code in PR #14257:
URL: https://github.com/apache/arrow/pull/14257#discussion_r988308882
##########
cpp/src/arrow/util/async_util.cc:
##########
@@ -190,22 +244,17 @@ class AsyncTaskSchedulerImpl : public AsyncTaskScheduler {
struct Finalizer {
void operator()(const Status& st) {
std::unique_lock<std::mutex> lk(self->mutex_);
- FnOnce<Status()> finish_callback;
+ FnOnce<Status(Status)> finish_callback =
+ std::move((*child_itr)->finish_callback_);
+ self->sub_schedulers_.erase(child_itr);
+ lk.unlock();
+ Status finish_st = std::move(finish_callback)(st);
+ lk.lock();
+ self->running_tasks_--;
if (!st.ok()) {
- self->running_tasks_--;
self->AbortUnlocked(st, std::move(lk));
return;
- } else {
- // We only eagerly erase the sub-scheduler on a successful
completion. This is
- // because, if the sub-scheduler aborted, then the caller of
MakeSubScheduler
- // might still be planning to call End
Review Comment:
Hmm, why is this not a concern anymore? Because OnFinished() won't actually
finish until the owner calls End()?
##########
cpp/src/arrow/util/async_util.h:
##########
@@ -311,6 +306,8 @@ class ARROW_EXPORT AsyncTaskScheduler {
/// The default (nullptr) will use a FIFO queue if there is a
throttle.
static std::unique_ptr<AsyncTaskScheduler> Make(Throttle* throttle = NULLPTR,
std::unique_ptr<Queue> queue
= NULLPTR);
+
+ virtual bool IsEnded() = 0;
Review Comment:
nit: `const`, docstring? (though it seems obvious)
I suppose usage of this may also be prone to time-of-check/time-of-use
errors, though I see it's only for testing, so that's OK
##########
cpp/src/arrow/util/async_util.cc:
##########
@@ -98,13 +98,53 @@ class FifoQueue : public AsyncTaskScheduler::Queue {
std::list<std::unique_ptr<Task>> tasks_;
};
+class AlreadyFailedScheduler : public AsyncTaskScheduler {
+ public:
+ explicit AlreadyFailedScheduler(Status failure_reason,
+ FnOnce<Status(Status)> finish_callback)
+ : failure_reason_(std::move(failure_reason)),
+ finish_callback_(std::move(finish_callback)) {}
+ bool AddTask(std::unique_ptr<Task> task) override { return false; }
+ void End() override {
+ std::ignore = std::move(finish_callback_)(failure_reason_);
+ self.reset();
+ }
+ Future<> OnFinished() const override {
+ DCHECK(false) << "You should not rely on sub-scheduler's OnFinished. Use
a "
+ "finished callback when creating the sub-scheduler
instead";
+ return Future<>::MakeFinished(Status::UnknownError("Unreachable code
encountered"));
+ }
+ AsyncTaskScheduler* MakeSubScheduler(FnOnce<Status(Status)> finish_callback,
+ Throttle* throttle,
+ std::unique_ptr<Queue> queue) override {
+ return AlreadyFailedScheduler::Make(failure_reason_,
std::move(finish_callback));
+ }
Review Comment:
This is maybe an unimportant edge case but it seems this leaves the parent
sub-scheduler with an invalid callback, which might be an issue?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]