westonpace commented on a change in pull request #9095:
URL: https://github.com/apache/arrow/pull/9095#discussion_r564820568



##########
File path: cpp/src/arrow/util/future.h
##########
@@ -580,4 +615,74 @@ inline std::vector<int> WaitForAny(const 
std::vector<Future<T>*>& futures,
   return waiter->MoveFinishedFutures();
 }
 
+template <typename T = detail::Empty>
+struct ControlFlow {
+  using BreakValueType = T;
+
+  bool IsBreak() const { return break_value_.has_value(); }
+
+  static Result<BreakValueType> MoveBreakValue(const ControlFlow& cf) {
+    return std::move(*cf.break_value_);
+  }
+
+  mutable util::optional<BreakValueType> break_value_;
+};
+
+struct Continue {
+  template <typename T>
+  operator ControlFlow<T>() && {  // NOLINT explicit
+    return {};
+  }
+};
+
+template <typename T = detail::Empty>
+ControlFlow<T> Break(T break_value = {}) {
+  return ControlFlow<T>{std::move(break_value)};
+}
+
+template <typename Iterate,
+          typename Control = typename 
detail::result_of_t<Iterate()>::ValueType,
+          typename BreakValueType = typename Control::BreakValueType>
+Future<BreakValueType> Loop(Iterate iterate) {
+  auto break_fut = Future<BreakValueType>::Make();
+
+  struct Callback {
+    bool CheckForTermination(const Result<Control>& maybe_control) {
+      if (!maybe_control.ok() || maybe_control->IsBreak()) {
+        Result<BreakValueType> maybe_break = 
maybe_control.Map(Control::MoveBreakValue);
+        break_fut.MarkFinished(std::move(maybe_break));
+        return true;
+      }
+      return false;
+    }
+
+    void operator()(const Result<Control>& maybe_control) && {
+      if (CheckForTermination(maybe_control)) return;
+
+      auto control_fut = iterate();
+      while (control_fut.is_finished()) {
+        // There's no need to AddCallback on a finished future; we can 
CheckForTermination
+        // now. This also avoids recursion and potential stack overflow.
+        if (CheckForTermination(control_fut.result())) return;
+
+        control_fut = iterate();
+      }
+      control_fut.AddCallback(std::move(*this));
+    }
+
+    Iterate iterate;
+    // If the future returned by control_fut is never completed then we will 
be hanging on
+    // to break_fut forever even if the listener has given up listening on it. 
 Instead we
+    // rely on the fact that a producer (the caller of Future<>::Make) is 
always
+    // responsible for completing the futures they create.
+    // TODO: Could avoid this kind of situation with "future abandonment" 
similar to mesos
+    Future<BreakValueType> break_fut;

Review comment:
       No.  It is quite normal for the future consumer to abandon it.  For 
example...
   
   ```
   AsyncVisit(...).Then(PrintSomething);
   ```
   Two futures are created.  Neither one is kept.  Also, the future created by 
the call to `Then` does not keep a strong reference to its predecessor.  Even 
if we add such a reference it doesn't really help the situation.  If we make 
the reference a `WeakFuture` then the future would immediately go out of scope 
and be destroyed.  
   
   The true owner of the future is the producer.  If the future producer never 
completes the future there will be a memory leak.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to