pitrou commented on a change in pull request #12712:
URL: https://github.com/apache/arrow/pull/12712#discussion_r836423004



##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -276,25 +277,100 @@ class ARROW_EXPORT SerialExecutor : public Executor {
     return FutureToSync(fut);
   }
 
+  template <typename T>
+  static Iterator<T> RunGeneratorInSerialExecutor(

Review comment:
       Nit, but `SerialExecutor::RunGeneratorInSerialExecutor` seems bit 
verbose. Perhaps something like `SerialExecutor::IterateGenerator`?

##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -276,25 +277,100 @@ class ARROW_EXPORT SerialExecutor : public Executor {
     return FutureToSync(fut);
   }
 
+  template <typename T>

Review comment:
       Add a small docstring here?

##########
File path: cpp/src/arrow/testing/async_test_util.h
##########
@@ -20,12 +20,37 @@
 #include <atomic>
 #include <memory>
 
+#include "arrow/testing/gtest_util.h"
 #include "arrow/util/async_generator.h"
 #include "arrow/util/future.h"
 
 namespace arrow {
 namespace util {
 
+template <typename T>
+AsyncGenerator<T> AsyncVectorIt(std::vector<T> v) {
+  return MakeVectorGenerator(std::move(v));
+}
+
+template <typename T>
+AsyncGenerator<T> FailsAt(AsyncGenerator<T> src, int failing_index) {

Review comment:
       Nit: `FailsAt` sounds like a predicate, perhaps `FailAt`?

##########
File path: cpp/src/arrow/util/thread_pool_test.cc
##########
@@ -261,6 +262,140 @@ TEST_P(TestRunSynchronously, PropagatedError) {
 INSTANTIATE_TEST_SUITE_P(TestRunSynchronously, TestRunSynchronously,
                          ::testing::Values(false, true));
 
+TEST(SerialExecutor, AsyncGenerator) {
+  std::vector<TestInt> values{1, 2, 3, 4, 5};
+  auto source = util::SlowdownABit(util::AsyncVectorIt(values));
+  Iterator<TestInt> iter = 
SerialExecutor::RunGeneratorInSerialExecutor<TestInt>(
+      [&source](Executor* executor) {
+        return MakeMappedGenerator(source, [executor](const TestInt& ti) {
+          return DeferNotOk(executor->Submit([ti] { return ti; }));
+        });
+      });
+  ASSERT_OK_AND_ASSIGN(auto vec, iter.ToVector());
+  ASSERT_EQ(vec, values);
+}
+
+TEST(SerialExecutor, AsyncGeneratorWithFollowUp) {
+  // Sometimes a task will generate follow-up tasks.  These should be run
+  // before the next task is started
+  bool follow_up_ran = false;
+  bool first = true;
+  Iterator<TestInt> iter =
+      SerialExecutor::RunGeneratorInSerialExecutor<TestInt>([&](Executor* 
executor) {
+        return [=, &first, &follow_up_ran]() -> Future<TestInt> {
+          if (first) {
+            first = false;
+            Future<TestInt> item =
+                DeferNotOk(executor->Submit([] { return TestInt(0); }));
+            RETURN_NOT_OK(executor->Spawn([&] { follow_up_ran = true; }));
+            return item;
+          }
+          return DeferNotOk(executor->Submit([] { return 
IterationEnd<TestInt>(); }));
+        };
+      });
+  ASSERT_FALSE(follow_up_ran);
+  ASSERT_OK_AND_EQ(TestInt(0), iter.Next());
+  ASSERT_FALSE(follow_up_ran);
+  ASSERT_OK_AND_EQ(IterationEnd<TestInt>(), iter.Next());
+  ASSERT_TRUE(follow_up_ran);
+}
+
+TEST(SerialExecutor, AsyncGeneratorWithAsyncFollowUp) {
+  // Simulates a situation where a user calls into the async generator, tasks 
(e.g. I/O
+  // readahead tasks) are spawned onto the I/O threadpool, the user gets a 
result, and
+  // then the I/O readahead tasks are completed while there is no calling 
thread in the
+  // async generator to hand the task off to (it should be queued up)
+  bool follow_up_ran = false;
+  bool first = true;
+  Executor* captured_executor;
+  Iterator<TestInt> iter =
+      SerialExecutor::RunGeneratorInSerialExecutor<TestInt>([&](Executor* 
executor) {
+        return [=, &first, &captured_executor]() -> Future<TestInt> {
+          if (first) {
+            captured_executor = executor;
+            first = false;
+            return DeferNotOk(executor->Submit([] {
+              // I/O tasks would be scheduled at this point
+              return TestInt(0);
+            }));
+          }
+          return DeferNotOk(executor->Submit([] { return 
IterationEnd<TestInt>(); }));
+        };
+      });
+  ASSERT_FALSE(follow_up_ran);
+  ASSERT_OK_AND_EQ(TestInt(0), iter.Next());
+  // I/O task completes and has reference to executor to submit continuation
+  ASSERT_OK(captured_executor->Spawn([&] { follow_up_ran = true; }));
+  // Follow-up task can't run right now because there is no thread in the 
executor
+  SleepABit();
+  ASSERT_FALSE(follow_up_ran);
+  // Follow-up should run as part of retrieving the next item
+  ASSERT_OK_AND_EQ(IterationEnd<TestInt>(), iter.Next());
+  ASSERT_TRUE(follow_up_ran);
+}
+
+TEST(SerialExecutor, AsyncGeneratorWithCleanup) {
+  // Sometimes a final task might generate follow-up tasks.  Unlike other 
follow-up

Review comment:
       The "unlike" looks a bit confusing to me, this seems to imply that other 
follow-up tasks may not be run before the iterator is finished.

##########
File path: cpp/src/arrow/util/thread_pool_test.cc
##########
@@ -261,6 +262,140 @@ TEST_P(TestRunSynchronously, PropagatedError) {
 INSTANTIATE_TEST_SUITE_P(TestRunSynchronously, TestRunSynchronously,
                          ::testing::Values(false, true));
 
+TEST(SerialExecutor, AsyncGenerator) {
+  std::vector<TestInt> values{1, 2, 3, 4, 5};
+  auto source = util::SlowdownABit(util::AsyncVectorIt(values));
+  Iterator<TestInt> iter = 
SerialExecutor::RunGeneratorInSerialExecutor<TestInt>(
+      [&source](Executor* executor) {
+        return MakeMappedGenerator(source, [executor](const TestInt& ti) {
+          return DeferNotOk(executor->Submit([ti] { return ti; }));
+        });
+      });
+  ASSERT_OK_AND_ASSIGN(auto vec, iter.ToVector());
+  ASSERT_EQ(vec, values);
+}
+
+TEST(SerialExecutor, AsyncGeneratorWithFollowUp) {
+  // Sometimes a task will generate follow-up tasks.  These should be run
+  // before the next task is started
+  bool follow_up_ran = false;
+  bool first = true;
+  Iterator<TestInt> iter =
+      SerialExecutor::RunGeneratorInSerialExecutor<TestInt>([&](Executor* 
executor) {
+        return [=, &first, &follow_up_ran]() -> Future<TestInt> {
+          if (first) {
+            first = false;
+            Future<TestInt> item =
+                DeferNotOk(executor->Submit([] { return TestInt(0); }));
+            RETURN_NOT_OK(executor->Spawn([&] { follow_up_ran = true; }));
+            return item;
+          }
+          return DeferNotOk(executor->Submit([] { return 
IterationEnd<TestInt>(); }));
+        };
+      });
+  ASSERT_FALSE(follow_up_ran);
+  ASSERT_OK_AND_EQ(TestInt(0), iter.Next());
+  ASSERT_FALSE(follow_up_ran);
+  ASSERT_OK_AND_EQ(IterationEnd<TestInt>(), iter.Next());
+  ASSERT_TRUE(follow_up_ran);
+}
+
+TEST(SerialExecutor, AsyncGeneratorWithAsyncFollowUp) {
+  // Simulates a situation where a user calls into the async generator, tasks 
(e.g. I/O
+  // readahead tasks) are spawned onto the I/O threadpool, the user gets a 
result, and
+  // then the I/O readahead tasks are completed while there is no calling 
thread in the
+  // async generator to hand the task off to (it should be queued up)
+  bool follow_up_ran = false;
+  bool first = true;
+  Executor* captured_executor;

Review comment:
       ```suggestion
     Executor* captured_executor = nullptr;
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to