westonpace commented on a change in pull request #9095:
URL: https://github.com/apache/arrow/pull/9095#discussion_r564825292



##########
File path: cpp/src/arrow/util/future.h
##########
@@ -580,4 +615,74 @@ inline std::vector<int> WaitForAny(const 
std::vector<Future<T>*>& futures,
   return waiter->MoveFinishedFutures();
 }
 
+template <typename T = detail::Empty>
+struct ControlFlow {
+  using BreakValueType = T;
+
+  bool IsBreak() const { return break_value_.has_value(); }
+
+  static Result<BreakValueType> MoveBreakValue(const ControlFlow& cf) {
+    return std::move(*cf.break_value_);
+  }
+
+  mutable util::optional<BreakValueType> break_value_;
+};
+
+struct Continue {
+  template <typename T>
+  operator ControlFlow<T>() && {  // NOLINT explicit
+    return {};
+  }
+};
+
+template <typename T = detail::Empty>
+ControlFlow<T> Break(T break_value = {}) {
+  return ControlFlow<T>{std::move(break_value)};
+}
+
+template <typename Iterate,
+          typename Control = typename 
detail::result_of_t<Iterate()>::ValueType,
+          typename BreakValueType = typename Control::BreakValueType>
+Future<BreakValueType> Loop(Iterate iterate) {
+  auto break_fut = Future<BreakValueType>::Make();
+
+  struct Callback {
+    bool CheckForTermination(const Result<Control>& maybe_control) {
+      if (!maybe_control.ok() || maybe_control->IsBreak()) {
+        Result<BreakValueType> maybe_break = 
maybe_control.Map(Control::MoveBreakValue);
+        break_fut.MarkFinished(std::move(maybe_break));
+        return true;
+      }
+      return false;
+    }
+
+    void operator()(const Result<Control>& maybe_control) && {
+      if (CheckForTermination(maybe_control)) return;
+
+      auto control_fut = iterate();
+      while (control_fut.is_finished()) {
+        // There's no need to AddCallback on a finished future; we can 
CheckForTermination
+        // now. This also avoids recursion and potential stack overflow.

Review comment:
       There is a mutex inside of `AddCallback`.  We could create a 
`TryAddCallback` which returns false without running the callback if the future 
is finished.  This method could be guaranteed to never run the callback 
synchronously.  I will add this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to