rok commented on code in PR #14663:
URL: https://github.com/apache/arrow/pull/14663#discussion_r1028048227


##########
cpp/src/arrow/compute/exec/exec_plan.cc:
##########
@@ -614,6 +614,106 @@ Result<std::vector<ExecBatch>> 
DeclarationToExecBatches(Declaration declaration,
   return DeclarationToExecBatchesAsync(std::move(declaration), 
exec_context).result();
 }
 
+namespace {
+struct BatchConverter {
+  explicit BatchConverter(::arrow::internal::Executor* executor)
+      : exec_context(std::make_shared<ExecContext>(default_memory_pool(), 
executor)) {}
+
+  ~BatchConverter() {
+    if (!exec_plan) {
+      return;
+    }
+    if (exec_plan->finished().is_finished()) {
+      return;
+    }
+    exec_plan->StopProducing();
+    Status abandoned_status = exec_plan->finished().status();
+    if (!abandoned_status.ok()) {
+      abandoned_status.Warn();
+    }
+  }
+
+  Future<std::shared_ptr<RecordBatch>> operator()() {
+    return exec_batch_gen().Then(
+        [this](const std::optional<ExecBatch>& batch)
+            -> Future<std::shared_ptr<RecordBatch>> {
+          if (batch) {
+            return batch->ToRecordBatch(schema);
+          } else {
+            return exec_plan->finished().Then(
+                []() -> std::shared_ptr<RecordBatch> { return nullptr; });
+          }
+        },
+        [this](const Status& err) {
+          return exec_plan->finished().Then(
+              [err]() -> Result<std::shared_ptr<RecordBatch>> { return err; });
+        });
+  }
+
+  // TODO: Should use exec context owned by the plan
+  std::shared_ptr<ExecContext> exec_context;
+  AsyncGenerator<std::optional<ExecBatch>> exec_batch_gen;
+  std::shared_ptr<Schema> schema;
+  std::shared_ptr<ExecPlan> exec_plan;
+};
+
+Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> 
DeclarationToRecordBatchGenerator(
+    Declaration declaration, ::arrow::internal::Executor* executor,
+    std::shared_ptr<Schema>* out_schema) {
+  auto converter = std::make_shared<BatchConverter>(executor);
+  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> plan,
+                        ExecPlan::Make(converter->exec_context.get()));
+  Declaration with_sink = Declaration::Sequence(
+      {declaration,
+       {"sink", SinkNodeOptions(&converter->exec_batch_gen, 
&converter->schema)}});
+  ARROW_RETURN_NOT_OK(with_sink.AddToPlan(plan.get()));
+  ARROW_RETURN_NOT_OK(plan->StartProducing());
+  converter->exec_plan = std::move(plan);
+  *out_schema = converter->schema;
+  return [conv = std::move(converter)] { return (*conv)(); };
+}
+}  // namespace
+
+Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(Declaration 
declaration,
+                                                               bool 
use_threads) {
+  std::shared_ptr<Schema> schema;
+  auto batch_itr = std::make_unique<Iterator<std::shared_ptr<RecordBatch>>>(

Review Comment:
   Nit:
   ```suggestion
     auto batch_iterator = 
std::make_unique<Iterator<std::shared_ptr<RecordBatch>>>(
   ```



##########
cpp/src/arrow/compute/exec/exec_plan.cc:
##########
@@ -614,6 +614,106 @@ Result<std::vector<ExecBatch>> 
DeclarationToExecBatches(Declaration declaration,
   return DeclarationToExecBatchesAsync(std::move(declaration), 
exec_context).result();
 }
 
+namespace {
+struct BatchConverter {
+  explicit BatchConverter(::arrow::internal::Executor* executor)
+      : exec_context(std::make_shared<ExecContext>(default_memory_pool(), 
executor)) {}
+
+  ~BatchConverter() {
+    if (!exec_plan) {
+      return;
+    }
+    if (exec_plan->finished().is_finished()) {
+      return;
+    }
+    exec_plan->StopProducing();
+    Status abandoned_status = exec_plan->finished().status();
+    if (!abandoned_status.ok()) {
+      abandoned_status.Warn();
+    }
+  }
+
+  Future<std::shared_ptr<RecordBatch>> operator()() {
+    return exec_batch_gen().Then(
+        [this](const std::optional<ExecBatch>& batch)
+            -> Future<std::shared_ptr<RecordBatch>> {
+          if (batch) {
+            return batch->ToRecordBatch(schema);
+          } else {
+            return exec_plan->finished().Then(
+                []() -> std::shared_ptr<RecordBatch> { return nullptr; });
+          }
+        },
+        [this](const Status& err) {
+          return exec_plan->finished().Then(
+              [err]() -> Result<std::shared_ptr<RecordBatch>> { return err; });
+        });
+  }
+
+  // TODO: Should use exec context owned by the plan

Review Comment:
   Does this need to be addressed?



##########
cpp/src/arrow/compute/exec/exec_plan.cc:
##########
@@ -614,6 +614,106 @@ Result<std::vector<ExecBatch>> 
DeclarationToExecBatches(Declaration declaration,
   return DeclarationToExecBatchesAsync(std::move(declaration), 
exec_context).result();
 }
 
+namespace {
+struct BatchConverter {
+  explicit BatchConverter(::arrow::internal::Executor* executor)
+      : exec_context(std::make_shared<ExecContext>(default_memory_pool(), 
executor)) {}
+
+  ~BatchConverter() {
+    if (!exec_plan) {
+      return;
+    }
+    if (exec_plan->finished().is_finished()) {
+      return;
+    }
+    exec_plan->StopProducing();
+    Status abandoned_status = exec_plan->finished().status();
+    if (!abandoned_status.ok()) {
+      abandoned_status.Warn();
+    }
+  }
+
+  Future<std::shared_ptr<RecordBatch>> operator()() {
+    return exec_batch_gen().Then(
+        [this](const std::optional<ExecBatch>& batch)
+            -> Future<std::shared_ptr<RecordBatch>> {
+          if (batch) {
+            return batch->ToRecordBatch(schema);
+          } else {
+            return exec_plan->finished().Then(
+                []() -> std::shared_ptr<RecordBatch> { return nullptr; });
+          }
+        },
+        [this](const Status& err) {
+          return exec_plan->finished().Then(
+              [err]() -> Result<std::shared_ptr<RecordBatch>> { return err; });
+        });
+  }
+
+  // TODO: Should use exec context owned by the plan
+  std::shared_ptr<ExecContext> exec_context;
+  AsyncGenerator<std::optional<ExecBatch>> exec_batch_gen;
+  std::shared_ptr<Schema> schema;
+  std::shared_ptr<ExecPlan> exec_plan;
+};
+
+Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> 
DeclarationToRecordBatchGenerator(
+    Declaration declaration, ::arrow::internal::Executor* executor,
+    std::shared_ptr<Schema>* out_schema) {
+  auto converter = std::make_shared<BatchConverter>(executor);
+  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<ExecPlan> plan,
+                        ExecPlan::Make(converter->exec_context.get()));
+  Declaration with_sink = Declaration::Sequence(
+      {declaration,
+       {"sink", SinkNodeOptions(&converter->exec_batch_gen, 
&converter->schema)}});
+  ARROW_RETURN_NOT_OK(with_sink.AddToPlan(plan.get()));
+  ARROW_RETURN_NOT_OK(plan->StartProducing());
+  converter->exec_plan = std::move(plan);
+  *out_schema = converter->schema;
+  return [conv = std::move(converter)] { return (*conv)(); };
+}
+}  // namespace
+
+Result<std::unique_ptr<RecordBatchReader>> DeclarationToReader(Declaration 
declaration,
+                                                               bool 
use_threads) {
+  std::shared_ptr<Schema> schema;
+  auto batch_itr = std::make_unique<Iterator<std::shared_ptr<RecordBatch>>>(
+      ::arrow::internal::IterateSynchronously<std::shared_ptr<RecordBatch>>(
+          [&](::arrow::internal::Executor* executor)
+              -> Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> {
+            return DeclarationToRecordBatchGenerator(declaration, executor, 
&schema);
+          },
+          use_threads));
+
+  struct PlanReader : RecordBatchReader {
+    PlanReader(std::shared_ptr<Schema> schema,
+               std::unique_ptr<Iterator<std::shared_ptr<RecordBatch>>> 
iterator)
+        : schema_(std::move(schema)), iterator_(std::move(iterator)) {}
+
+    std::shared_ptr<Schema> schema() const override { return schema_; }
+
+    Status ReadNext(std::shared_ptr<RecordBatch>* record_batch) override {
+      DCHECK(!!iterator_) << "call to ReadNext on already closed reader";
+      return iterator_->Next().Value(record_batch);
+    }
+
+    Status Close() override {
+      // End plan and read from generator until finished
+      std::shared_ptr<RecordBatch> batch;
+      do {
+        ARROW_RETURN_NOT_OK(ReadNext(&batch));
+      } while (batch != nullptr);
+      iterator_.reset();
+      return Status::OK();
+    }
+
+    std::shared_ptr<Schema> schema_;
+    std::unique_ptr<Iterator<std::shared_ptr<RecordBatch>>> iterator_;
+  };
+
+  return std::make_unique<PlanReader>(std::move(schema), std::move(batch_itr));

Review Comment:
   Nit:
   ```suggestion
     return std::make_unique<PlanReader>(std::move(schema), 
std::move(batch_iterator));
   ```



##########
cpp/src/arrow/dataset/file_base.cc:
##########
@@ -108,6 +131,18 @@ Future<std::optional<int64_t>> FileFormat::CountRows(
   return Future<std::optional<int64_t>>::MakeFinished(std::nullopt);
 }
 
+Future<std::shared_ptr<InspectedFragment>> FileFormat::InspectFragment(
+    const FileSource& source, const FragmentScanOptions* format_options,
+    compute::ExecContext* exec_context) const {
+  return Status::NotImplemented("The new scanner is not yet supported on this 
format");

Review Comment:
   Which scanner?



##########
cpp/src/arrow/dataset/file_csv.cc:
##########
@@ -52,9 +53,99 @@ using internal::SerialExecutor;
 
 namespace dataset {
 
+struct CsvInspectedFragment : public InspectedFragment {
+  CsvInspectedFragment(std::vector<std::string> column_names,
+                       std::shared_ptr<io::InputStream> input_stream, int64_t 
num_bytes)
+      : InspectedFragment(std::move(column_names)),
+        input_stream(std::move(input_stream)),
+        num_bytes(num_bytes) {}
+  // We need to start reading the file in order to figure out the column names 
and
+  // so we save off the input stream
+  std::shared_ptr<io::InputStream> input_stream;
+  int64_t num_bytes;
+};
+
+class CsvFileScanner : public FragmentScanner {
+ public:
+  CsvFileScanner(std::shared_ptr<csv::StreamingReader> reader, int num_batches,
+                 int64_t best_guess_bytes_per_batch)
+      : reader_(std::move(reader)),
+        num_batches_(num_batches),
+        best_guess_bytes_per_batch_(best_guess_bytes_per_batch) {}
+
+  Future<std::shared_ptr<RecordBatch>> ScanBatch(int batch_number) override {
+    // We should be called in increasing order but let's verify that in case 
it changes.

Review Comment:
   Nit: do you mean:
   ```suggestion
       // This should be called in increasing order but let's verify that in 
case it changes.
   ```



##########
cpp/src/arrow/dataset/file_base.cc:
##########
@@ -108,6 +131,18 @@ Future<std::optional<int64_t>> FileFormat::CountRows(
   return Future<std::optional<int64_t>>::MakeFinished(std::nullopt);
 }
 
+Future<std::shared_ptr<InspectedFragment>> FileFormat::InspectFragment(
+    const FileSource& source, const FragmentScanOptions* format_options,
+    compute::ExecContext* exec_context) const {
+  return Status::NotImplemented("The new scanner is not yet supported on this 
format");
+}
+
+Future<std::shared_ptr<FragmentScanner>> FileFormat::BeginScan(
+    const FragmentScanRequest& request, const InspectedFragment& 
inspected_fragment,
+    const FragmentScanOptions* format_options, compute::ExecContext* 
exec_context) const {
+  return Status::NotImplemented("The new scanner is not yet supported on this 
format");

Review Comment:
   As above, which scanner?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to