westonpace commented on a change in pull request #9643: URL: https://github.com/apache/arrow/pull/9643#discussion_r593082143
########## File path: cpp/src/arrow/util/iterator_test.cc ########## @@ -589,15 +675,255 @@ TEST(TestAsyncUtil, Collect) { ASSERT_EQ(expected, collected_val); } +TEST(TestAsyncUtil, Map) { + std::vector<TestInt> input = {1, 2, 3}; + auto generator = AsyncVectorIt(input); + std::function<TestStr(const TestInt&)> mapper = [](const TestInt& in) { + return std::to_string(in.value); + }; + auto mapped = MakeMappedGenerator(std::move(generator), mapper); + std::vector<TestStr> expected{"1", "2", "3"}; + AssertAsyncGeneratorMatch(expected, mapped); +} + +TEST(TestAsyncUtil, MapAsync) { + std::vector<TestInt> input = {1, 2, 3}; + auto generator = AsyncVectorIt(input); + std::function<Future<TestStr>(const TestInt&)> mapper = [](const TestInt& in) { + return SleepAsync(1e-3).Then([in](const Result<detail::Empty>& empty) { + return TestStr(std::to_string(in.value)); + }); + }; + auto mapped = MakeMappedGenerator(std::move(generator), mapper); + std::vector<TestStr> expected{"1", "2", "3"}; + AssertAsyncGeneratorMatch(expected, mapped); +} + +TEST(TestAsyncUtil, MapReentrant) { + std::vector<TestInt> input = {1, 2}; + auto source = AsyncVectorIt(input); + TrackingGenerator<TestInt> tracker(std::move(source)); + source = MakeTransferredGenerator(AsyncGenerator<TestInt>(tracker), + internal::GetCpuThreadPool()); + + std::atomic<int> map_tasks_running(0); + // Mapper blocks until signal, should start multiple map tasks + std::atomic<bool> can_proceed(false); + std::function<Future<TestStr>(const TestInt&)> mapper = + [&can_proceed, &map_tasks_running](const TestInt& in) -> Future<TestStr> { + auto fut = Future<TestStr>::Make(); + map_tasks_running.fetch_add(1); + std::thread([fut, in, &can_proceed]() mutable { + while (!can_proceed.load()) { + SleepABit(); + } + fut.MarkFinished(TestStr(std::to_string(in.value))); + }).detach(); + return fut; Review comment: Done. ########## File path: cpp/src/arrow/util/iterator_test.cc ########## @@ -570,8 +616,48 @@ TEST(ReadaheadIterator, NextError) { // -------------------------------------------------------------------- // Asynchronous iterator tests +template <typename T> +class ReentrantChecker { + public: + explicit ReentrantChecker(AsyncGenerator<T> source) + : state_(std::make_shared<State>(std::move(source))) {} + + Future<T> operator()() { + if (state_->in.load()) { + state_->valid.store(false); + } + state_->in.store(true); + auto result = state_->source(); + return result.Then(Callback{state_}); + } + + void AssertValid() { + EXPECT_EQ(true, state_->valid.load()) + << "The generator was accessed in a reentrant manner"; + } + + private: + struct State { + explicit State(AsyncGenerator<T> source_) + : source(std::move(source_)), in(false), valid(true) {} + + AsyncGenerator<T> source; + std::atomic<bool> in; Review comment: Done. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org