westonpace commented on a change in pull request #9095: URL: https://github.com/apache/arrow/pull/9095#discussion_r564820568
########## File path: cpp/src/arrow/util/future.h ########## @@ -580,4 +615,74 @@ inline std::vector<int> WaitForAny(const std::vector<Future<T>*>& futures, return waiter->MoveFinishedFutures(); } +template <typename T = detail::Empty> +struct ControlFlow { + using BreakValueType = T; + + bool IsBreak() const { return break_value_.has_value(); } + + static Result<BreakValueType> MoveBreakValue(const ControlFlow& cf) { + return std::move(*cf.break_value_); + } + + mutable util::optional<BreakValueType> break_value_; +}; + +struct Continue { + template <typename T> + operator ControlFlow<T>() && { // NOLINT explicit + return {}; + } +}; + +template <typename T = detail::Empty> +ControlFlow<T> Break(T break_value = {}) { + return ControlFlow<T>{std::move(break_value)}; +} + +template <typename Iterate, + typename Control = typename detail::result_of_t<Iterate()>::ValueType, + typename BreakValueType = typename Control::BreakValueType> +Future<BreakValueType> Loop(Iterate iterate) { + auto break_fut = Future<BreakValueType>::Make(); + + struct Callback { + bool CheckForTermination(const Result<Control>& maybe_control) { + if (!maybe_control.ok() || maybe_control->IsBreak()) { + Result<BreakValueType> maybe_break = maybe_control.Map(Control::MoveBreakValue); + break_fut.MarkFinished(std::move(maybe_break)); + return true; + } + return false; + } + + void operator()(const Result<Control>& maybe_control) && { + if (CheckForTermination(maybe_control)) return; + + auto control_fut = iterate(); + while (control_fut.is_finished()) { + // There's no need to AddCallback on a finished future; we can CheckForTermination + // now. This also avoids recursion and potential stack overflow. + if (CheckForTermination(control_fut.result())) return; + + control_fut = iterate(); + } + control_fut.AddCallback(std::move(*this)); + } + + Iterate iterate; + // If the future returned by control_fut is never completed then we will be hanging on + // to break_fut forever even if the listener has given up listening on it. Instead we + // rely on the fact that a producer (the caller of Future<>::Make) is always + // responsible for completing the futures they create. + // TODO: Could avoid this kind of situation with "future abandonment" similar to mesos + Future<BreakValueType> break_fut; Review comment: No. It is quite normal for the future consumer to abandon it. For example... ``` AsyncVisit(...).Then(PrintSomething); ``` Two futures are created. Neither one is kept. Also, the future created by the call to `Then` does not keep a strong reference to its predecessor. Even if we add such a reference it doesn't really help the situation. If we make the reference a `WeakFuture` then the future would immediately go out of scope and be destroyed. The true owner of the future is the producer. If the future producer never completes the future there will be a memory leak. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org