marsupialtail commented on code in PR #13931:
URL: https://github.com/apache/arrow/pull/13931#discussion_r955629720


##########
cpp/src/arrow/util/async_generator.h:
##########
@@ -147,6 +147,51 @@ Future<std::vector<T>> 
CollectAsyncGenerator(AsyncGenerator<T> generator) {
   return Loop(LoopBody{std::move(generator), std::move(vec)});
 }
 
+/// \brief this is just like a MapGenerator but the map fun returns a thing 
instead of a
+/// future
+template <typename T, typename ApplyFn,
+          typename Applied = arrow::detail::result_of_t<ApplyFn(const T&)>,
+          typename V = typename EnsureResult<Applied>::type::ValueType>
+AsyncGenerator<V> MakeApplyGenerator(AsyncGenerator<T> source_gen, ApplyFn 
apply_fun,
+                                     internal::Executor* cpu_exec) {
+  struct State {
+    explicit State(AsyncGenerator<T> source_gen_, ApplyFn apply_fun_,
+                   internal::Executor* cpu_exec_)
+        : source_gen(std::move(source_gen_)),
+          apply_fun(std::move(apply_fun_)),
+          cpu_exec(cpu_exec_),
+          finished(false) {}
+
+    AsyncGenerator<T> source_gen;
+    ApplyFn apply_fun;
+    internal::Executor* cpu_exec;
+    bool finished;
+  };
+
+  auto state =
+      std::make_shared<State>(std::move(source_gen), std::move(apply_fun), 
cpu_exec);
+  return [state]() {
+    CallbackOptions options;
+    options.executor = state->cpu_exec;
+    options.should_schedule = ShouldSchedule::Always;
+
+    return state->source_gen().Then(
+        [state](const T& next) -> Result<V> {
+          if (IsIterationEnd(next)) {
+            return IterationTraits<V>::End();
+          } else {
+            auto value = state->apply_fun(next);
+            if (!value.ok()) {
+              return Status::NotImplemented("not implemented");
+            } else {
+              return value.ValueOrDie();
+            }

Review Comment:
   As opposed to just finishing the generator? It needs to return a not ok 
status here, the error message should be fleshed out more. If it simply 
finishes instead of returning a not ok, some of the tests will fail. e.g. if 
you read invalid csv, the parse task would return a not ok status that needs to 
be reflected up this generator to result in a status not ok for dataset inspect 
or the like. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to