westonpace commented on a change in pull request #9095: URL: https://github.com/apache/arrow/pull/9095#discussion_r555266265
########## File path: cpp/src/arrow/util/future_test.cc ########## @@ -832,6 +832,145 @@ TEST(FutureCompletionTest, FutureVoid) { } } +TEST(FutureAllTest, Simple) { + auto f1 = Future<int>::Make(); + auto f2 = Future<int>::Make(); + std::vector<Future<int>> futures = {f1, f2}; + auto combined = arrow::All(futures); + + ARROW_UNUSED(combined.Then([](std::vector<Result<int>> results) { + ASSERT_EQ(2, results.size()); + ASSERT_EQ(1, *results[0]); + ASSERT_EQ(2, *results[1]); + })); + + // Finish in reverse order, results should still be delivered in proper order + AssertNotFinished(combined); + f2.MarkFinished(2); + AssertNotFinished(combined); + f1.MarkFinished(1); + AssertSuccessful(combined); +} + +TEST(FutureAllTest, Failure) { + auto f1 = Future<int>::Make(); + auto f2 = Future<int>::Make(); + auto f3 = Future<int>::Make(); + std::vector<Future<int>> futures = {f1, f2, f3}; + auto combined = arrow::All(futures); + + ARROW_UNUSED(combined.Then([](std::vector<Result<int>> results) { + ASSERT_EQ(3, results.size()); + ASSERT_EQ(1, *results[0]); + ASSERT_EQ(Status::IOError("XYZ"), results[1].status()); + ASSERT_EQ(3, *results[2]); + })); + + f1.MarkFinished(1); + f2.MarkFinished(Status::IOError("XYZ")); + f3.MarkFinished(3); + + AssertFinished(combined); +} + +TEST(FutureLoopTest, Sync) { + struct { + int i = 0; + Future<int> Get() { return Future<int>::MakeFinished(i++); } + } IntSource; + + bool do_fail = false; + std::vector<int> ints; + auto loop_body = [&] { + return IntSource.Get().Then([&](int i) -> Result<ControlFlow<int>> { + if (do_fail && i == 3) { + return Status::IOError("xxx"); + } + + if (i == 5) { + int sum = 0; + for (int i : ints) sum += i; + return Break(sum); + } + + ints.push_back(i); + return Continue(); + }); + }; + + { + auto sum_fut = Loop(loop_body); + AssertSuccessful(sum_fut); + + ASSERT_OK_AND_ASSIGN(auto sum, sum_fut.result()); + ASSERT_EQ(sum, 0 + 1 + 2 + 3 + 4); + } + + { + do_fail = true; + IntSource.i = 0; + auto sum_fut = Loop(loop_body); + AssertFailed(sum_fut); + ASSERT_RAISES(IOError, sum_fut.result()); + } +} + +TEST(FutureLoopTest, EmptyBreakValue) { + Future<> none_fut = + Loop([&] { return Future<>::MakeFinished().Then([&](...) { return Break(); }); }); + AssertSuccessful(none_fut); +} + +TEST(FutureLoopTest, MoveOnlyBreakValue) { + Future<MoveOnlyDataType> one_fut = Loop([&] { + return Future<int>::MakeFinished(1).Then( + [&](int i) { return Break(MoveOnlyDataType(i)); }); + }); + AssertSuccessful(one_fut); + ASSERT_OK_AND_ASSIGN(auto one, std::move(one_fut).result()); + ASSERT_EQ(one, 1); +} + +TEST(FutureLoopTest, StackOverflow) { + // Looping over futures is normally a rather recursive task. If the futures complete + // synchronously (because they are already finished) it could lead to a stack overflow + // if care is not taken. + int counter = 0; + auto loop_body = [&counter]() -> Future<ControlFlow<int>> { + while (counter < 1000000) { + counter++; + return Future<ControlFlow<int>>::MakeFinished(Continue()); Review comment: Hmm...I'm a little torn here. Could this make return type inference even harder? For example, if you change ``` auto some_future = [] { if (some_base_case) { return Future<T>::MakeFinished(0); } return SomeAsyncFunction.Then(...); }; ``` to... ``` auto some_future = [] { if (some_base_case) { return 0; } return SomeAsyncFunction.Then(...); }; ``` ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org