bkietz commented on a change in pull request #9095:
URL: https://github.com/apache/arrow/pull/9095#discussion_r554155616



##########
File path: cpp/src/arrow/util/iterator.h
##########
@@ -35,6 +36,13 @@
 
 namespace arrow {
 
+namespace detail {
+
+template <typename Signature>
+using result_of_t = typename std::result_of<Signature>::type;

Review comment:
       It's technically legal to have this both here and in future.h, but for 
clarity we should probably have a single alias decl. util/functional.h would be 
a reasonable place to put it

##########
File path: cpp/src/arrow/util/iterator.h
##########
@@ -186,6 +194,109 @@ class Iterator : public 
util::EqualityComparable<Iterator<T>> {
   Result<T> (*next_)(void*) = NULLPTR;
 };
 
+template <typename T>
+struct TransformFlow {
+  using YieldValueType = T;
+
+  TransformFlow(YieldValueType value, bool ready_for_next)
+      : finished_(false),
+        ready_for_next_(ready_for_next),
+        status_(),
+        yield_value_(std::move(value)) {}
+  TransformFlow(bool finished, bool ready_for_next)
+      : finished_(finished), ready_for_next_(ready_for_next), status_(), 
yield_value_() {}
+  TransformFlow(Status s)  // NOLINT runtime/explicit
+      : finished_(true), ready_for_next_(false), status_(s), yield_value_() {}
+
+  bool HasValue() const { return yield_value_.has_value(); }
+  bool Finished() const { return finished_; }
+  Status status() const { return status_; }
+  bool Ok() const { return status_.ok(); }
+  bool ReadyForNext() const { return ready_for_next_; }
+  T Value() const { return *yield_value_; }
+
+  bool finished_;
+  bool ready_for_next_;
+  Status status_;
+  util::optional<YieldValueType> yield_value_;
+};
+
+struct TransformFinish {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(true, true);
+  }
+};
+
+struct TransformSkip {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(false, true);
+  }
+};
+
+template <typename T>
+TransformFlow<T> TransformYield(T value = {}, bool ready_for_next = true) {
+  return TransformFlow<T>(std::move(value), ready_for_next);
+}
+
+template <typename T, typename V>
+using Transformer = std::function<TransformFlow<V>(T)>;
+
+template <typename T, typename V>
+class TransformIterator {
+ public:
+  explicit TransformIterator(Iterator<T> it, Transformer<T, V> transformer)
+      : it_(std::move(it)),
+        transformer_(std::move(transformer)),
+        last_value_(),
+        finished_() {}
+
+  util::optional<Result<V>> Pump() {
+    while (!finished_ && last_value_.has_value()) {
+      TransformFlow<V> next = transformer_(*last_value_);
+      if (next.ReadyForNext()) {
+        last_value_.reset();
+      }
+      if (next.Finished()) {
+        finished_ = true;
+      }
+      if (!next.Ok()) {
+        return next.status();
+      }
+      if (next.HasValue()) {
+        return next.Value();
+      }
+    }
+    if (finished_) {
+      return IterationTraits<V>::End();
+    }
+    return util::optional<V>();
+  }
+
+  Result<V> Next() {
+    while (!finished_) {
+      util::optional<Result<V>> next = Pump();
+      if (next.has_value()) {
+        return *next;
+      }
+      ARROW_ASSIGN_OR_RAISE(last_value_, it_.Next());
+    }
+    return IterationTraits<V>::End();
+  }
+
+ private:
+  Iterator<T> it_;
+  Transformer<T, V> transformer_;
+  util::optional<T> last_value_;
+  bool finished_;

Review comment:
       ```suggestion
     bool finished_ = false;
   ```

##########
File path: cpp/src/arrow/util/iterator.h
##########
@@ -186,6 +194,109 @@ class Iterator : public 
util::EqualityComparable<Iterator<T>> {
   Result<T> (*next_)(void*) = NULLPTR;
 };
 
+template <typename T>
+struct TransformFlow {
+  using YieldValueType = T;
+
+  TransformFlow(YieldValueType value, bool ready_for_next)
+      : finished_(false),
+        ready_for_next_(ready_for_next),
+        status_(),
+        yield_value_(std::move(value)) {}
+  TransformFlow(bool finished, bool ready_for_next)
+      : finished_(finished), ready_for_next_(ready_for_next), status_(), 
yield_value_() {}
+  TransformFlow(Status s)  // NOLINT runtime/explicit
+      : finished_(true), ready_for_next_(false), status_(s), yield_value_() {}
+
+  bool HasValue() const { return yield_value_.has_value(); }
+  bool Finished() const { return finished_; }
+  Status status() const { return status_; }
+  bool Ok() const { return status_.ok(); }
+  bool ReadyForNext() const { return ready_for_next_; }
+  T Value() const { return *yield_value_; }
+
+  bool finished_;
+  bool ready_for_next_;
+  Status status_;
+  util::optional<YieldValueType> yield_value_;
+};
+
+struct TransformFinish {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(true, true);
+  }
+};
+
+struct TransformSkip {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(false, true);
+  }
+};
+
+template <typename T>
+TransformFlow<T> TransformYield(T value = {}, bool ready_for_next = true) {
+  return TransformFlow<T>(std::move(value), ready_for_next);
+}
+
+template <typename T, typename V>
+using Transformer = std::function<TransformFlow<V>(T)>;
+
+template <typename T, typename V>
+class TransformIterator {
+ public:
+  explicit TransformIterator(Iterator<T> it, Transformer<T, V> transformer)
+      : it_(std::move(it)),
+        transformer_(std::move(transformer)),
+        last_value_(),
+        finished_() {}
+
+  util::optional<Result<V>> Pump() {
+    while (!finished_ && last_value_.has_value()) {

Review comment:
       This loop is very confusing. Could you rewrite it with a singular 
condition (`while (!finished_)`, maybe) then include a break statement below?

##########
File path: cpp/src/arrow/util/iterator.h
##########
@@ -186,6 +194,109 @@ class Iterator : public 
util::EqualityComparable<Iterator<T>> {
   Result<T> (*next_)(void*) = NULLPTR;
 };
 
+template <typename T>
+struct TransformFlow {
+  using YieldValueType = T;
+
+  TransformFlow(YieldValueType value, bool ready_for_next)
+      : finished_(false),
+        ready_for_next_(ready_for_next),
+        status_(),
+        yield_value_(std::move(value)) {}
+  TransformFlow(bool finished, bool ready_for_next)
+      : finished_(finished), ready_for_next_(ready_for_next), status_(), 
yield_value_() {}
+  TransformFlow(Status s)  // NOLINT runtime/explicit
+      : finished_(true), ready_for_next_(false), status_(s), yield_value_() {}
+
+  bool HasValue() const { return yield_value_.has_value(); }
+  bool Finished() const { return finished_; }
+  Status status() const { return status_; }
+  bool Ok() const { return status_.ok(); }
+  bool ReadyForNext() const { return ready_for_next_; }
+  T Value() const { return *yield_value_; }
+
+  bool finished_;
+  bool ready_for_next_;
+  Status status_;
+  util::optional<YieldValueType> yield_value_;
+};
+
+struct TransformFinish {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(true, true);
+  }
+};
+
+struct TransformSkip {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(false, true);
+  }
+};
+
+template <typename T>
+TransformFlow<T> TransformYield(T value = {}, bool ready_for_next = true) {
+  return TransformFlow<T>(std::move(value), ready_for_next);
+}
+
+template <typename T, typename V>
+using Transformer = std::function<TransformFlow<V>(T)>;
+
+template <typename T, typename V>
+class TransformIterator {
+ public:
+  explicit TransformIterator(Iterator<T> it, Transformer<T, V> transformer)
+      : it_(std::move(it)),
+        transformer_(std::move(transformer)),
+        last_value_(),
+        finished_() {}
+
+  util::optional<Result<V>> Pump() {
+    while (!finished_ && last_value_.has_value()) {
+      TransformFlow<V> next = transformer_(*last_value_);
+      if (next.ReadyForNext()) {
+        last_value_.reset();
+      }
+      if (next.Finished()) {
+        finished_ = true;
+      }
+      if (!next.Ok()) {
+        return next.status();
+      }
+      if (next.HasValue()) {
+        return next.Value();
+      }
+    }
+    if (finished_) {
+      return IterationTraits<V>::End();
+    }
+    return util::optional<V>();
+  }
+
+  Result<V> Next() {
+    while (!finished_) {
+      util::optional<Result<V>> next = Pump();
+      if (next.has_value()) {
+        return *next;

Review comment:
       ```suggestion
           return std::move(*next);
   ```

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -580,4 +611,73 @@ inline std::vector<int> WaitForAny(const 
std::vector<Future<T>*>& futures,
   return waiter->MoveFinishedFutures();
 }
 
+template <typename T = detail::Empty>
+struct ControlFlow {
+  using BreakValueType = T;
+
+  bool IsBreak() const { return break_value_.has_value(); }
+
+  static Result<BreakValueType> MoveBreakValue(const ControlFlow& cf) {
+    return std::move(*cf.break_value_);
+  }
+
+  mutable util::optional<BreakValueType> break_value_;
+};
+
+struct Continue {
+  template <typename T>
+  operator ControlFlow<T>() && {  // NOLINT explicit
+    return {};
+  }
+};
+
+template <typename T = detail::Empty>
+ControlFlow<T> Break(T break_value = {}) {
+  return ControlFlow<T>{std::move(break_value)};
+}
+
+template <typename Iterate,
+          typename Control = typename 
detail::result_of_t<Iterate()>::ValueType,
+          typename BreakValueType = typename Control::BreakValueType>
+Future<BreakValueType> Loop(Iterate iterate) {

Review comment:
       Maybe the typedef for AsyncGenerator would be useful here; then we could 
write
   
   ```suggestion
   template <typename T>
   using AsyncGenerator = std::function<Future<T>()>;
   
   template <typename BreakValue>
   Future<BreakValue> Loop(AsyncGenerator<ControlFlow<BreakValue>> iterate) {
   ```

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -832,6 +832,145 @@ TEST(FutureCompletionTest, FutureVoid) {
   }
 }
 
+TEST(FutureAllTest, Simple) {
+  auto f1 = Future<int>::Make();
+  auto f2 = Future<int>::Make();
+  std::vector<Future<int>> futures = {f1, f2};
+  auto combined = arrow::All(futures);
+
+  ARROW_UNUSED(combined.Then([](std::vector<Result<int>> results) {
+    ASSERT_EQ(2, results.size());
+    ASSERT_EQ(1, *results[0]);
+    ASSERT_EQ(2, *results[1]);
+  }));
+
+  // Finish in reverse order, results should still be delivered in proper order
+  AssertNotFinished(combined);
+  f2.MarkFinished(2);
+  AssertNotFinished(combined);
+  f1.MarkFinished(1);
+  AssertSuccessful(combined);
+}
+
+TEST(FutureAllTest, Failure) {
+  auto f1 = Future<int>::Make();
+  auto f2 = Future<int>::Make();
+  auto f3 = Future<int>::Make();
+  std::vector<Future<int>> futures = {f1, f2, f3};
+  auto combined = arrow::All(futures);
+
+  ARROW_UNUSED(combined.Then([](std::vector<Result<int>> results) {
+    ASSERT_EQ(3, results.size());
+    ASSERT_EQ(1, *results[0]);
+    ASSERT_EQ(Status::IOError("XYZ"), results[1].status());
+    ASSERT_EQ(3, *results[2]);
+  }));
+
+  f1.MarkFinished(1);
+  f2.MarkFinished(Status::IOError("XYZ"));
+  f3.MarkFinished(3);
+
+  AssertFinished(combined);
+}
+
+TEST(FutureLoopTest, Sync) {
+  struct {
+    int i = 0;
+    Future<int> Get() { return Future<int>::MakeFinished(i++); }
+  } IntSource;
+
+  bool do_fail = false;
+  std::vector<int> ints;
+  auto loop_body = [&] {
+    return IntSource.Get().Then([&](int i) -> Result<ControlFlow<int>> {
+      if (do_fail && i == 3) {
+        return Status::IOError("xxx");
+      }
+
+      if (i == 5) {
+        int sum = 0;
+        for (int i : ints) sum += i;
+        return Break(sum);
+      }
+
+      ints.push_back(i);
+      return Continue();
+    });
+  };
+
+  {
+    auto sum_fut = Loop(loop_body);
+    AssertSuccessful(sum_fut);
+
+    ASSERT_OK_AND_ASSIGN(auto sum, sum_fut.result());
+    ASSERT_EQ(sum, 0 + 1 + 2 + 3 + 4);
+  }
+
+  {
+    do_fail = true;
+    IntSource.i = 0;
+    auto sum_fut = Loop(loop_body);
+    AssertFailed(sum_fut);
+    ASSERT_RAISES(IOError, sum_fut.result());
+  }
+}
+
+TEST(FutureLoopTest, EmptyBreakValue) {
+  Future<> none_fut =
+      Loop([&] { return Future<>::MakeFinished().Then([&](...) { return 
Break(); }); });
+  AssertSuccessful(none_fut);
+}
+
+TEST(FutureLoopTest, MoveOnlyBreakValue) {

Review comment:
       It'd be good to have a stress test for `Loop` as well

##########
File path: cpp/src/arrow/util/iterator.h
##########
@@ -186,6 +194,109 @@ class Iterator : public 
util::EqualityComparable<Iterator<T>> {
   Result<T> (*next_)(void*) = NULLPTR;
 };
 
+template <typename T>
+struct TransformFlow {
+  using YieldValueType = T;
+
+  TransformFlow(YieldValueType value, bool ready_for_next)
+      : finished_(false),
+        ready_for_next_(ready_for_next),
+        status_(),
+        yield_value_(std::move(value)) {}
+  TransformFlow(bool finished, bool ready_for_next)
+      : finished_(finished), ready_for_next_(ready_for_next), status_(), 
yield_value_() {}
+  TransformFlow(Status s)  // NOLINT runtime/explicit
+      : finished_(true), ready_for_next_(false), status_(s), yield_value_() {}
+
+  bool HasValue() const { return yield_value_.has_value(); }
+  bool Finished() const { return finished_; }
+  Status status() const { return status_; }
+  bool Ok() const { return status_.ok(); }
+  bool ReadyForNext() const { return ready_for_next_; }
+  T Value() const { return *yield_value_; }
+
+  bool finished_;
+  bool ready_for_next_;
+  Status status_;
+  util::optional<YieldValueType> yield_value_;
+};
+
+struct TransformFinish {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(true, true);
+  }
+};
+
+struct TransformSkip {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(false, true);
+  }
+};
+
+template <typename T>
+TransformFlow<T> TransformYield(T value = {}, bool ready_for_next = true) {
+  return TransformFlow<T>(std::move(value), ready_for_next);
+}
+
+template <typename T, typename V>
+using Transformer = std::function<TransformFlow<V>(T)>;
+
+template <typename T, typename V>
+class TransformIterator {
+ public:
+  explicit TransformIterator(Iterator<T> it, Transformer<T, V> transformer)
+      : it_(std::move(it)),
+        transformer_(std::move(transformer)),
+        last_value_(),
+        finished_() {}
+
+  util::optional<Result<V>> Pump() {
+    while (!finished_ && last_value_.has_value()) {

Review comment:
       Could you also include a comment describing the control flow, the 
contract of transformer functions, ...?

##########
File path: cpp/src/arrow/util/iterator.h
##########
@@ -186,6 +194,109 @@ class Iterator : public 
util::EqualityComparable<Iterator<T>> {
   Result<T> (*next_)(void*) = NULLPTR;
 };
 
+template <typename T>
+struct TransformFlow {
+  using YieldValueType = T;
+
+  TransformFlow(YieldValueType value, bool ready_for_next)
+      : finished_(false),
+        ready_for_next_(ready_for_next),
+        status_(),
+        yield_value_(std::move(value)) {}
+  TransformFlow(bool finished, bool ready_for_next)
+      : finished_(finished), ready_for_next_(ready_for_next), status_(), 
yield_value_() {}
+  TransformFlow(Status s)  // NOLINT runtime/explicit
+      : finished_(true), ready_for_next_(false), status_(s), yield_value_() {}

Review comment:
       Instead of having TransformFlow potentially be an error, perhaps we 
could just rely on Result and have `using Transformer = 
std::function<Result<TransformFlow<V>>(T)>;`?

##########
File path: cpp/src/arrow/util/iterator.h
##########
@@ -186,6 +194,109 @@ class Iterator : public 
util::EqualityComparable<Iterator<T>> {
   Result<T> (*next_)(void*) = NULLPTR;
 };
 
+template <typename T>
+struct TransformFlow {
+  using YieldValueType = T;
+
+  TransformFlow(YieldValueType value, bool ready_for_next)
+      : finished_(false),
+        ready_for_next_(ready_for_next),
+        status_(),
+        yield_value_(std::move(value)) {}
+  TransformFlow(bool finished, bool ready_for_next)
+      : finished_(finished), ready_for_next_(ready_for_next), status_(), 
yield_value_() {}
+  TransformFlow(Status s)  // NOLINT runtime/explicit
+      : finished_(true), ready_for_next_(false), status_(s), yield_value_() {}
+
+  bool HasValue() const { return yield_value_.has_value(); }
+  bool Finished() const { return finished_; }
+  Status status() const { return status_; }
+  bool Ok() const { return status_.ok(); }
+  bool ReadyForNext() const { return ready_for_next_; }
+  T Value() const { return *yield_value_; }
+
+  bool finished_;
+  bool ready_for_next_;
+  Status status_;
+  util::optional<YieldValueType> yield_value_;
+};
+
+struct TransformFinish {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(true, true);
+  }
+};
+
+struct TransformSkip {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(false, true);
+  }
+};
+
+template <typename T>
+TransformFlow<T> TransformYield(T value = {}, bool ready_for_next = true) {
+  return TransformFlow<T>(std::move(value), ready_for_next);
+}
+
+template <typename T, typename V>
+using Transformer = std::function<TransformFlow<V>(T)>;
+
+template <typename T, typename V>
+class TransformIterator {
+ public:
+  explicit TransformIterator(Iterator<T> it, Transformer<T, V> transformer)
+      : it_(std::move(it)),
+        transformer_(std::move(transformer)),
+        last_value_(),
+        finished_() {}
+
+  util::optional<Result<V>> Pump() {

Review comment:
       I'd usually expect `Result<>` to be outermost in a return type:
   ```suggestion
     Result<util::optional<V>> Pump() {
   ```
   Additionally, that'd enable you to use `ARROW_RETURN_NOT_OK` below

##########
File path: cpp/src/arrow/util/iterator.h
##########
@@ -186,6 +194,109 @@ class Iterator : public 
util::EqualityComparable<Iterator<T>> {
   Result<T> (*next_)(void*) = NULLPTR;
 };
 
+template <typename T>
+struct TransformFlow {
+  using YieldValueType = T;
+
+  TransformFlow(YieldValueType value, bool ready_for_next)
+      : finished_(false),
+        ready_for_next_(ready_for_next),
+        status_(),
+        yield_value_(std::move(value)) {}
+  TransformFlow(bool finished, bool ready_for_next)
+      : finished_(finished), ready_for_next_(ready_for_next), status_(), 
yield_value_() {}
+  TransformFlow(Status s)  // NOLINT runtime/explicit
+      : finished_(true), ready_for_next_(false), status_(s), yield_value_() {}
+
+  bool HasValue() const { return yield_value_.has_value(); }
+  bool Finished() const { return finished_; }
+  Status status() const { return status_; }
+  bool Ok() const { return status_.ok(); }
+  bool ReadyForNext() const { return ready_for_next_; }
+  T Value() const { return *yield_value_; }
+
+  bool finished_;
+  bool ready_for_next_;

Review comment:
       Use member initializers for fields with no default constructor:
   ```suggestion
     bool finished_ = false;
     bool ready_for_next_ = false;
   ```

##########
File path: cpp/src/arrow/util/iterator.h
##########
@@ -186,6 +194,109 @@ class Iterator : public 
util::EqualityComparable<Iterator<T>> {
   Result<T> (*next_)(void*) = NULLPTR;
 };
 
+template <typename T>
+struct TransformFlow {
+  using YieldValueType = T;
+
+  TransformFlow(YieldValueType value, bool ready_for_next)
+      : finished_(false),
+        ready_for_next_(ready_for_next),
+        status_(),
+        yield_value_(std::move(value)) {}
+  TransformFlow(bool finished, bool ready_for_next)
+      : finished_(finished), ready_for_next_(ready_for_next), status_(), 
yield_value_() {}
+  TransformFlow(Status s)  // NOLINT runtime/explicit
+      : finished_(true), ready_for_next_(false), status_(s), yield_value_() {}
+
+  bool HasValue() const { return yield_value_.has_value(); }
+  bool Finished() const { return finished_; }
+  Status status() const { return status_; }
+  bool Ok() const { return status_.ok(); }
+  bool ReadyForNext() const { return ready_for_next_; }
+  T Value() const { return *yield_value_; }
+
+  bool finished_;
+  bool ready_for_next_;
+  Status status_;
+  util::optional<YieldValueType> yield_value_;
+};
+
+struct TransformFinish {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(true, true);
+  }
+};
+
+struct TransformSkip {
+  template <typename T>
+  operator TransformFlow<T>() && {  // NOLINT explicit
+    return TransformFlow<T>(false, true);
+  }
+};
+
+template <typename T>
+TransformFlow<T> TransformYield(T value = {}, bool ready_for_next = true) {
+  return TransformFlow<T>(std::move(value), ready_for_next);
+}
+
+template <typename T, typename V>
+using Transformer = std::function<TransformFlow<V>(T)>;
+
+template <typename T, typename V>
+class TransformIterator {
+ public:
+  explicit TransformIterator(Iterator<T> it, Transformer<T, V> transformer)
+      : it_(std::move(it)),
+        transformer_(std::move(transformer)),
+        last_value_(),
+        finished_() {}
+
+  util::optional<Result<V>> Pump() {
+    while (!finished_ && last_value_.has_value()) {
+      TransformFlow<V> next = transformer_(*last_value_);
+      if (next.ReadyForNext()) {
+        last_value_.reset();
+      }
+      if (next.Finished()) {
+        finished_ = true;
+      }
+      if (!next.Ok()) {
+        return next.status();
+      }
+      if (next.HasValue()) {
+        return next.Value();
+      }
+    }
+    if (finished_) {
+      return IterationTraits<V>::End();
+    }
+    return util::optional<V>();

Review comment:
       This doesn't match the return type of the function, please use `return 
{};` or `return util::nullopt;` to be more clear

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -556,6 +560,33 @@ inline bool WaitForAll(const std::vector<Future<T>*>& 
futures,
   return waiter->Wait(seconds);
 }
 
+template <typename T>

Review comment:
       ```suggestion
   /// \brief Create a Future which completes when all of `futures` complete.
   ///
   /// The future's result is a vector of the results of `futures`.
   /// Note that this future will never be marked "failed"; failed results
   /// will be stored in the result vector alongside successful results.
   template <typename T>
   ```

##########
File path: cpp/src/arrow/util/iterator.cc
##########
@@ -67,6 +67,8 @@ class ReadaheadQueue::Impl : public 
std::enable_shared_from_this<ReadaheadQueue:
   }
 
   Status PopDone(std::unique_ptr<ReadaheadPromise>* out) {
+    DCHECK_GT(max_readahead_, 0);  // This function has no purpose and should 
not be
+                                   // called if using the queue unbounded

Review comment:
       `DCHECK` macros (except `DCHECK_OK`) can be streamed into, which makes 
assert failures more helpful and searchable:
   ```suggestion
       DCHECK_GT(max_readahead_, 0) << "PopDone should never be called if using 
the queue unbounded";
   ```

##########
File path: cpp/src/arrow/util/future_test.cc
##########
@@ -832,6 +832,145 @@ TEST(FutureCompletionTest, FutureVoid) {
   }
 }
 
+TEST(FutureAllTest, Simple) {
+  auto f1 = Future<int>::Make();
+  auto f2 = Future<int>::Make();
+  std::vector<Future<int>> futures = {f1, f2};
+  auto combined = arrow::All(futures);
+
+  ARROW_UNUSED(combined.Then([](std::vector<Result<int>> results) {
+    ASSERT_EQ(2, results.size());
+    ASSERT_EQ(1, *results[0]);
+    ASSERT_EQ(2, *results[1]);
+  }));
+
+  // Finish in reverse order, results should still be delivered in proper order
+  AssertNotFinished(combined);
+  f2.MarkFinished(2);
+  AssertNotFinished(combined);
+  f1.MarkFinished(1);
+  AssertSuccessful(combined);
+}
+
+TEST(FutureAllTest, Failure) {
+  auto f1 = Future<int>::Make();
+  auto f2 = Future<int>::Make();
+  auto f3 = Future<int>::Make();
+  std::vector<Future<int>> futures = {f1, f2, f3};
+  auto combined = arrow::All(futures);
+
+  ARROW_UNUSED(combined.Then([](std::vector<Result<int>> results) {
+    ASSERT_EQ(3, results.size());
+    ASSERT_EQ(1, *results[0]);
+    ASSERT_EQ(Status::IOError("XYZ"), results[1].status());
+    ASSERT_EQ(3, *results[2]);
+  }));
+
+  f1.MarkFinished(1);
+  f2.MarkFinished(Status::IOError("XYZ"));
+  f3.MarkFinished(3);
+
+  AssertFinished(combined);
+}
+
+TEST(FutureLoopTest, Sync) {
+  struct {
+    int i = 0;
+    Future<int> Get() { return Future<int>::MakeFinished(i++); }
+  } IntSource;
+
+  bool do_fail = false;
+  std::vector<int> ints;
+  auto loop_body = [&] {
+    return IntSource.Get().Then([&](int i) -> Result<ControlFlow<int>> {
+      if (do_fail && i == 3) {
+        return Status::IOError("xxx");
+      }
+
+      if (i == 5) {
+        int sum = 0;
+        for (int i : ints) sum += i;
+        return Break(sum);
+      }
+
+      ints.push_back(i);
+      return Continue();
+    });
+  };
+
+  {
+    auto sum_fut = Loop(loop_body);
+    AssertSuccessful(sum_fut);
+
+    ASSERT_OK_AND_ASSIGN(auto sum, sum_fut.result());
+    ASSERT_EQ(sum, 0 + 1 + 2 + 3 + 4);
+  }
+
+  {
+    do_fail = true;
+    IntSource.i = 0;
+    auto sum_fut = Loop(loop_body);
+    AssertFailed(sum_fut);
+    ASSERT_RAISES(IOError, sum_fut.result());
+  }
+}
+
+TEST(FutureLoopTest, EmptyBreakValue) {
+  Future<> none_fut =
+      Loop([&] { return Future<>::MakeFinished().Then([&](...) { return 
Break(); }); });
+  AssertSuccessful(none_fut);
+}
+
+TEST(FutureLoopTest, MoveOnlyBreakValue) {
+  Future<MoveOnlyDataType> one_fut = Loop([&] {
+    return Future<int>::MakeFinished(1).Then(
+        [&](int i) { return Break(MoveOnlyDataType(i)); });
+  });
+  AssertSuccessful(one_fut);
+  ASSERT_OK_AND_ASSIGN(auto one, std::move(one_fut).result());
+  ASSERT_EQ(one, 1);
+}
+
+TEST(FutureLoopTest, StackOverflow) {
+  // Looping over futures is normally a rather recursive task.  If the futures 
complete
+  // synchronously (because they are already finished) it could lead to a 
stack overflow
+  // if care is not taken.
+  int counter = 0;
+  auto loop_body = [&counter]() -> Future<ControlFlow<int>> {
+    while (counter < 1000000) {
+      counter++;
+      return Future<ControlFlow<int>>::MakeFinished(Continue());

Review comment:
       Maybe it'd be useful to have an implicit constructor 
`Future<T>(Result<T>)` for finished futures, then I think we'd be able to just 
write
   ```c++
         return Continue();
   ```

##########
File path: cpp/src/arrow/util/future.h
##########
@@ -580,4 +611,73 @@ inline std::vector<int> WaitForAny(const 
std::vector<Future<T>*>& futures,
   return waiter->MoveFinishedFutures();
 }
 
+template <typename T = detail::Empty>
+struct ControlFlow {
+  using BreakValueType = T;
+
+  bool IsBreak() const { return break_value_.has_value(); }
+
+  static Result<BreakValueType> MoveBreakValue(const ControlFlow& cf) {
+    return std::move(*cf.break_value_);
+  }
+
+  mutable util::optional<BreakValueType> break_value_;
+};
+
+struct Continue {
+  template <typename T>
+  operator ControlFlow<T>() && {  // NOLINT explicit
+    return {};
+  }
+};
+
+template <typename T = detail::Empty>
+ControlFlow<T> Break(T break_value = {}) {
+  return ControlFlow<T>{std::move(break_value)};
+}
+
+template <typename Iterate,

Review comment:
       ```suggestion
   /// \brief Loop through an asynchronous sequence
   ///
   /// \param[in] iterate A generator of Future<ControlFlow<BreakValue>>. On 
completion of each yielded
   /// future the resulting ControlFlow will be examined. A Break will 
terminate the loop, while a Continue
   /// will re-invoke `iterate`.
   /// \return A future which will complete when a Future returned by iterate 
completes with a Break
   template <typename Iterate,
   ```

##########
File path: cpp/src/arrow/util/iterator_test.cc
##########
@@ -129,11 +130,47 @@ template <typename T>
 inline Iterator<T> EmptyIt() {
   return MakeEmptyIterator<T>();
 }
-
 inline Iterator<TestInt> VectorIt(std::vector<TestInt> v) {
   return MakeVectorIterator<TestInt>(std::move(v));
 }
 
+std::function<Future<TestInt>()> AsyncVectorIt(std::vector<TestInt> v) {
+  auto index = std::make_shared<size_t>(0);
+  auto vec = std::make_shared<std::vector<TestInt>>(std::move(v));
+  return [index, vec]() -> Future<TestInt> {
+    if (*index >= vec->size()) {
+      return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
+    }
+    auto next = (*vec)[*index];
+    (*index)++;
+    return Future<TestInt>::MakeFinished(next);
+  };

Review comment:
       ```suggestion
     size_t index = 0;
     return [index, v]() mutable -> Future<TestInt> {
       if (index >= vec.size()) {
         return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
       }
       return Future<TestInt>::MakeFinished(v[index++]);
     };
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to