lidavidm commented on a change in pull request #9607:
URL: https://github.com/apache/arrow/pull/9607#discussion_r605010331



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -72,24 +104,251 @@ Result<FragmentIterator> Scanner::GetFragments() {
   return GetFragmentsFromDatasets({dataset_}, scan_options_->filter);
 }
 
+Result<FragmentIterator> Scanner::GetFragments() {
+  auto fut = GetFragmentsAsync();
+  fut.Wait();

Review comment:
       nit: Future.result already calls Wait for you

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -1029,6 +1102,65 @@ AsyncGenerator<T> 
MakeConcatenatedGenerator(AsyncGenerator<AsyncGenerator<T>> so
   return MergedGenerator<T>(std::move(source), 1);
 }
 
+template <typename T>
+struct Enumerated {
+  util::optional<T> value;
+  int index;
+  bool last;
+};
+
+template <typename T>
+struct IterationTraits<Enumerated<T>> {
+  static Enumerated<T> End() { return Enumerated<T>{{}, -1, false}; }
+  static bool IsEnd(const Enumerated<T>& val) { return !val.value.has_value(); 
}
+};
+
+template <typename T>
+class EnumeratingGenerator {
+ public:
+  EnumeratingGenerator(AsyncGenerator<T> source, T initial_value)
+      : state_(std::make_shared<State>(std::move(source), 
std::move(initial_value))) {}
+
+  Future<Enumerated<T>> operator()() {

Review comment:
       I'm certainly missing something but why can't this just capture and 
increment the counter in operator() instead of keeping track of state/previous 
item?

##########
File path: cpp/src/arrow/util/async_generator.h
##########
@@ -640,6 +666,40 @@ class SerialReadaheadGenerator {
   std::shared_ptr<State> state_;
 };
 
+template <typename T>
+class FutureFirstGenerator {
+ public:
+  explicit FutureFirstGenerator(Future<AsyncGenerator<T>> future)
+      : state_(std::make_shared<State>(std::move(future))) {}
+
+  Future<T> operator()() {
+    if (state_->source_) {
+      return state_->source_();
+    } else {
+      auto state = state_;
+      return state_->future_.Then([state](const AsyncGenerator<T>& source) {
+        state->source_ = source;
+        return state->source_();
+      });
+    }
+  }
+
+ private:
+  struct State {
+    explicit State(Future<AsyncGenerator<T>> future) : future_(future), 
source_() {}
+
+    Future<AsyncGenerator<T>> future_;
+    AsyncGenerator<T> source_;
+  };
+
+  std::shared_ptr<State> state_;
+};
+
+template <typename T>

Review comment:
       nit: document that this is non-async-reentrant




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to