westonpace commented on a change in pull request #9533:
URL: https://github.com/apache/arrow/pull/9533#discussion_r592268811



##########
File path: cpp/src/arrow/util/iterator_test.cc
##########
@@ -726,6 +730,138 @@ TEST(TestAsyncUtil, CompleteBackgroundStressTest) {
   }
 }
 
+template <typename T>
+class ReentrantChecker {
+ public:
+  explicit ReentrantChecker(AsyncGenerator<T> source)
+      : state_(std::make_shared<State>(std::move(source))) {}
+
+  Future<T> operator()() {
+    if (state_->in.load()) {
+      state_->valid.store(false);
+    }
+    state_->in.store(true);
+    auto result = state_->source();
+    return result.Then(Callback{state_});
+  }
+
+  void AssertValid() {
+    EXPECT_EQ(true, state_->valid.load())
+        << "The generator was accessed in a reentrant manner";
+  }
+
+ private:
+  struct State {
+    explicit State(AsyncGenerator<T> source_)
+        : source(std::move(source_)), in(false), valid(true) {}
+
+    AsyncGenerator<T> source;
+    std::atomic<bool> in;
+    std::atomic<bool> valid;
+  };
+  struct Callback {
+    Future<T> operator()(const Result<T>& result) {
+      state_->in.store(false);
+      return result;
+    }
+    std::shared_ptr<State> state_;
+  };
+
+  std::shared_ptr<State> state_;
+};
+
+TEST(TestAsyncUtil, SerialReadaheadSlowProducer) {
+  AsyncGenerator<TestInt> it = BackgroundAsyncVectorIt({1, 2, 3, 4, 5});
+  ReentrantChecker<TestInt> checker(std::move(it));
+  SerialReadaheadGenerator<TestInt> serial_readahead(checker, 2);
+  AssertAsyncGeneratorMatch({1, 2, 3, 4, 5},
+                            
static_cast<AsyncGenerator<TestInt>>(serial_readahead));
+  checker.AssertValid();
+}
+
+TEST(TestAsyncUtil, SerialReadaheadSlowConsumer) {
+  int num_delivered = 0;
+  auto source = [&num_delivered]() {
+    if (num_delivered < 5) {
+      return Future<TestInt>::MakeFinished(num_delivered++);
+    } else {
+      return Future<TestInt>::MakeFinished(IterationTraits<TestInt>::End());
+    }
+  };
+  SerialReadaheadGenerator<TestInt> serial_readahead(std::move(source), 3);
+  SleepABit();
+  ASSERT_EQ(0, num_delivered);

Review comment:
       Keeping it lazy gives us flexibility and doesn't really cost anything.  
Imagine you scan a dataset and you find 100 files so you convert each file into 
an AsyncGenerator<RecordBatch>.  You probably don't want to start reading from 
all 100 files at once.
   
   If you really want to start reading right away then just call the generator 
as soon as you create it.  Creating a generator should be trivial so you 
shouldn't be losing any real time.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to