bkietz commented on a change in pull request #9947:
URL: https://github.com/apache/arrow/pull/9947#discussion_r611719695



##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -138,47 +179,147 @@ ARROW_DS_EXPORT Result<ScanTaskIterator> 
ScanTaskIteratorFromRecordBatch(
     std::vector<std::shared_ptr<RecordBatch>> batches,
     std::shared_ptr<ScanOptions> options);
 
-/// \brief Scanner is a materialized scan operation with context and options
-/// bound. A scanner is the class that glues ScanTask, Fragment,
-/// and Dataset. In python pseudo code, it performs the following:
+template <typename T>
+struct Enumerated {
+  T value;
+  int index;
+  bool last;
+};
+
+/// \brief Combines a record batch with the fragment that the record batch 
originated
+/// from
+///
+/// Knowing the source fragment can be useful for debugging & understanding 
loaded data
+struct TaggedRecordBatch {
+  std::shared_ptr<RecordBatch> record_batch;
+  std::shared_ptr<Fragment> fragment;
+};
+using TaggedRecordBatchGenerator = std::function<Future<TaggedRecordBatch>()>;
+using TaggedRecordBatchIterator = Iterator<TaggedRecordBatch>;
+
+/// \brief Combines a tagged batch with positional information
+///
+/// This is returned when scanning batches in an unordered fashion.  This 
information is
+/// needed if you ever want to reassemble the batches in order
+struct EnumeratedRecordBatch {
+  Enumerated<std::shared_ptr<RecordBatch>> record_batch;
+  Enumerated<std::shared_ptr<Fragment>> fragment;
+};
+using EnumeratedRecordBatchGenerator = 
std::function<Future<EnumeratedRecordBatch>()>;
+using EnumeratedRecordBatchIterator = Iterator<EnumeratedRecordBatch>;
+
+}  // namespace dataset
+
+template <>
+struct IterationTraits<dataset::TaggedRecordBatch> {
+  static dataset::TaggedRecordBatch End() {
+    return dataset::TaggedRecordBatch{NULL, NULL};
+  }
+  static bool IsEnd(const dataset::TaggedRecordBatch& val) {
+    return val.record_batch == NULL;
+  }
+};
+
+template <>
+struct IterationTraits<dataset::EnumeratedRecordBatch> {
+  static dataset::EnumeratedRecordBatch End() {
+    return dataset::EnumeratedRecordBatch{{NULL, -1, false}, {NULL, -1, 
false}};
+  }
+  static bool IsEnd(const dataset::EnumeratedRecordBatch& val) {
+    return val.fragment.value == NULL;
+  }
+};
+
+namespace dataset {
+/// \brief A scanner glues together several dataset classes to load in data.
+/// The dataset contains a collection of fragments and partitioning rules.
+///
+/// The fragments identify independently loadable units of data (i.e. each 
fragment has
+/// a potentially unique schema and possibly even format.  It should be 
possible to read
+/// fragments in parallel if desired).
+///
+/// The fragment's format contains the logic necessary to actually create a 
task to load
+/// the fragment into memory.  That task may or may not support parallel 
execution of
+/// its own.
 ///
-///  def Scan():
-///    for fragment in self.dataset.GetFragments(this.options.filter):
-///      for scan_task in fragment.Scan(this.options):
-///        yield scan_task
+/// The scanner is then responsible for creating scan tasks from every 
fragment in the
+/// dataset and (potentially) sequencing the loaded record batches together.
+///
+/// The scanner should not buffer the entire dataset in memory (unless asked) 
but should
+/// return record batches as soon as they are ready to scan.  Various readahead

Review comment:
       ```suggestion
   /// The scanner should not buffer the entire dataset in memory (unless 
asked) instead
   /// yielding record batches as soon as they are ready to scan.  Various 
readahead
   ```

##########
File path: cpp/src/arrow/dataset/test_util.h
##########
@@ -185,6 +196,45 @@ class DatasetFixtureMixin : public ::testing::Test {
     }
   }
 
+  /// \brief Ensure that record batches found in reader are equals to the
+  /// record batches yielded by a scanner.
+  void AssertScanBatchesEquals(RecordBatchReader* expected, Scanner* scanner,
+                               bool ensure_drained = true) {
+    ASSERT_OK_AND_ASSIGN(auto it, scanner->ScanBatches());
+
+    ARROW_EXPECT_OK(it.Visit([&](TaggedRecordBatch batch) -> Status {
+      AssertBatchEquals(expected, batch.record_batch.get());
+      return Status::OK();
+    }));
+
+    if (ensure_drained) {
+      EnsureRecordBatchReaderDrained(expected);
+    }
+  }
+
+  /// \brief Ensure that record batches found in reader are equals to the
+  /// record batches yielded by a scanner.

Review comment:
       ```suggestion
     /// record batches yielded by a scanner. Each fragment in the scanner is
     /// expected to have a single batch.
   ```

##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -138,47 +179,147 @@ ARROW_DS_EXPORT Result<ScanTaskIterator> 
ScanTaskIteratorFromRecordBatch(
     std::vector<std::shared_ptr<RecordBatch>> batches,
     std::shared_ptr<ScanOptions> options);
 
-/// \brief Scanner is a materialized scan operation with context and options
-/// bound. A scanner is the class that glues ScanTask, Fragment,
-/// and Dataset. In python pseudo code, it performs the following:
+template <typename T>
+struct Enumerated {
+  T value;
+  int index;
+  bool last;
+};
+
+/// \brief Combines a record batch with the fragment that the record batch 
originated
+/// from
+///
+/// Knowing the source fragment can be useful for debugging & understanding 
loaded data
+struct TaggedRecordBatch {
+  std::shared_ptr<RecordBatch> record_batch;
+  std::shared_ptr<Fragment> fragment;
+};
+using TaggedRecordBatchGenerator = std::function<Future<TaggedRecordBatch>()>;
+using TaggedRecordBatchIterator = Iterator<TaggedRecordBatch>;
+
+/// \brief Combines a tagged batch with positional information
+///
+/// This is returned when scanning batches in an unordered fashion.  This 
information is
+/// needed if you ever want to reassemble the batches in order
+struct EnumeratedRecordBatch {
+  Enumerated<std::shared_ptr<RecordBatch>> record_batch;
+  Enumerated<std::shared_ptr<Fragment>> fragment;
+};
+using EnumeratedRecordBatchGenerator = 
std::function<Future<EnumeratedRecordBatch>()>;
+using EnumeratedRecordBatchIterator = Iterator<EnumeratedRecordBatch>;
+
+}  // namespace dataset
+
+template <>
+struct IterationTraits<dataset::TaggedRecordBatch> {
+  static dataset::TaggedRecordBatch End() {
+    return dataset::TaggedRecordBatch{NULL, NULL};
+  }
+  static bool IsEnd(const dataset::TaggedRecordBatch& val) {
+    return val.record_batch == NULL;
+  }
+};
+
+template <>
+struct IterationTraits<dataset::EnumeratedRecordBatch> {
+  static dataset::EnumeratedRecordBatch End() {
+    return dataset::EnumeratedRecordBatch{{NULL, -1, false}, {NULL, -1, 
false}};
+  }
+  static bool IsEnd(const dataset::EnumeratedRecordBatch& val) {
+    return val.fragment.value == NULL;
+  }
+};
+
+namespace dataset {
+/// \brief A scanner glues together several dataset classes to load in data.
+/// The dataset contains a collection of fragments and partitioning rules.
+///
+/// The fragments identify independently loadable units of data (i.e. each 
fragment has
+/// a potentially unique schema and possibly even format.  It should be 
possible to read
+/// fragments in parallel if desired).
+///
+/// The fragment's format contains the logic necessary to actually create a 
task to load
+/// the fragment into memory.  That task may or may not support parallel 
execution of
+/// its own.
 ///
-///  def Scan():
-///    for fragment in self.dataset.GetFragments(this.options.filter):
-///      for scan_task in fragment.Scan(this.options):
-///        yield scan_task
+/// The scanner is then responsible for creating scan tasks from every 
fragment in the
+/// dataset and (potentially) sequencing the loaded record batches together.
+///
+/// The scanner should not buffer the entire dataset in memory (unless asked) 
but should
+/// return record batches as soon as they are ready to scan.  Various readahead
+/// properties control how much data is allowed to be scanned before pausing 
to let a
+/// slow consumer catchup.
+///
+/// Today the scanner also delegates projection & filtering although that may 
change in

Review comment:
       ```suggestion
   /// Today the scanner also handles projection & filtering although that may 
change in
   ```

##########
File path: cpp/src/arrow/dataset/test_util.h
##########
@@ -136,6 +138,15 @@ class DatasetFixtureMixin : public ::testing::Test {
     }
   }
 
+  /// \brief Ensure a record batch yielded by the fragment matches the next 
batch yielded
+  /// by the reader

Review comment:
       ```suggestion
     /// \brief Assert the value of the next batch yielded by the reader
   ```

##########
File path: cpp/src/arrow/dataset/test_util.h
##########
@@ -136,6 +138,15 @@ class DatasetFixtureMixin : public ::testing::Test {
     }
   }
 
+  /// \brief Ensure a record batch yielded by the fragment matches the next 
batch yielded
+  /// by the reader
+  void AssertBatchEquals(RecordBatchReader* expected, RecordBatch* batch) {

Review comment:
       Nit:
   ```suggestion
     void AssertBatchEquals(RecordBatchReader* expected, const RecordBatch& 
batch) {
   ```

##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -138,47 +179,147 @@ ARROW_DS_EXPORT Result<ScanTaskIterator> 
ScanTaskIteratorFromRecordBatch(
     std::vector<std::shared_ptr<RecordBatch>> batches,
     std::shared_ptr<ScanOptions> options);
 
-/// \brief Scanner is a materialized scan operation with context and options
-/// bound. A scanner is the class that glues ScanTask, Fragment,
-/// and Dataset. In python pseudo code, it performs the following:
+template <typename T>
+struct Enumerated {
+  T value;
+  int index;
+  bool last;
+};
+
+/// \brief Combines a record batch with the fragment that the record batch 
originated
+/// from
+///
+/// Knowing the source fragment can be useful for debugging & understanding 
loaded data
+struct TaggedRecordBatch {
+  std::shared_ptr<RecordBatch> record_batch;
+  std::shared_ptr<Fragment> fragment;
+};
+using TaggedRecordBatchGenerator = std::function<Future<TaggedRecordBatch>()>;
+using TaggedRecordBatchIterator = Iterator<TaggedRecordBatch>;
+
+/// \brief Combines a tagged batch with positional information
+///
+/// This is returned when scanning batches in an unordered fashion.  This 
information is
+/// needed if you ever want to reassemble the batches in order
+struct EnumeratedRecordBatch {
+  Enumerated<std::shared_ptr<RecordBatch>> record_batch;
+  Enumerated<std::shared_ptr<Fragment>> fragment;
+};
+using EnumeratedRecordBatchGenerator = 
std::function<Future<EnumeratedRecordBatch>()>;
+using EnumeratedRecordBatchIterator = Iterator<EnumeratedRecordBatch>;
+
+}  // namespace dataset
+
+template <>
+struct IterationTraits<dataset::TaggedRecordBatch> {
+  static dataset::TaggedRecordBatch End() {
+    return dataset::TaggedRecordBatch{NULL, NULL};
+  }
+  static bool IsEnd(const dataset::TaggedRecordBatch& val) {
+    return val.record_batch == NULL;
+  }
+};
+
+template <>
+struct IterationTraits<dataset::EnumeratedRecordBatch> {
+  static dataset::EnumeratedRecordBatch End() {
+    return dataset::EnumeratedRecordBatch{{NULL, -1, false}, {NULL, -1, 
false}};
+  }
+  static bool IsEnd(const dataset::EnumeratedRecordBatch& val) {
+    return val.fragment.value == NULL;
+  }
+};
+
+namespace dataset {
+/// \brief A scanner glues together several dataset classes to load in data.
+/// The dataset contains a collection of fragments and partitioning rules.
+///
+/// The fragments identify independently loadable units of data (i.e. each 
fragment has
+/// a potentially unique schema and possibly even format.  It should be 
possible to read
+/// fragments in parallel if desired).
+///
+/// The fragment's format contains the logic necessary to actually create a 
task to load
+/// the fragment into memory.  That task may or may not support parallel 
execution of
+/// its own.
 ///
-///  def Scan():
-///    for fragment in self.dataset.GetFragments(this.options.filter):
-///      for scan_task in fragment.Scan(this.options):
-///        yield scan_task
+/// The scanner is then responsible for creating scan tasks from every 
fragment in the
+/// dataset and (potentially) sequencing the loaded record batches together.
+///
+/// The scanner should not buffer the entire dataset in memory (unless asked) 
but should
+/// return record batches as soon as they are ready to scan.  Various readahead
+/// properties control how much data is allowed to be scanned before pausing 
to let a
+/// slow consumer catchup.
+///
+/// Today the scanner also delegates projection & filtering although that may 
change in
+/// the future.
 class ARROW_DS_EXPORT Scanner {
  public:
-  Scanner(std::shared_ptr<Dataset> dataset, std::shared_ptr<ScanOptions> 
scan_options)
-      : dataset_(std::move(dataset)), scan_options_(std::move(scan_options)) {}
-
-  Scanner(std::shared_ptr<Fragment> fragment, std::shared_ptr<ScanOptions> 
scan_options)
-      : fragment_(std::move(fragment)), scan_options_(std::move(scan_options)) 
{}
+  virtual ~Scanner() = default;
 
   /// \brief The Scan operator returns a stream of ScanTask. The caller is
   /// responsible to dispatch/schedule said tasks. Tasks should be safe to run
   /// in a concurrent fashion and outlive the iterator.
-  Result<ScanTaskIterator> Scan();
-
+  ///
+  /// Note: Not supported by the async scanner
+  /// TODO(ARROW-11797) Deprecate Scan()
+  virtual Result<ScanTaskIterator> Scan();
   /// \brief Convert a Scanner into a Table.
   ///
   /// Use this convenience utility with care. This will serially materialize 
the
   /// Scan result in memory before creating the Table.
-  Result<std::shared_ptr<Table>> ToTable();
+  virtual Result<std::shared_ptr<Table>> ToTable() = 0;
+  /// \brief Scan the dataset into a stream of record batches.  Each batch is 
tagged
+  /// with the fragment it originated from.  The batches will arrive in order. 
 The
+  /// order of fragments is determined by the dataset.
+  ///
+  /// Note: The scanner will perform some readahead but will avoid 
materializing too
+  /// much in memory (this is goverended by the readahead options and 
use_threads option).
+  /// If the readahead queue fills up then I/O will pause until the calling 
thread catches
+  /// up.
+  virtual Result<TaggedRecordBatchIterator> ScanBatches() = 0;
+  /// \brief Scan the dataset into a stream of record batches.  Unlike 
ScanBatches this
+  /// method may allow record batches to be returned out of order.  This 
allows for more
+  /// efficient scanning some fragments may be accessed more quickly than 
others (e.g. may

Review comment:
       ```suggestion
     /// efficient scanning: some fragments may be accessed more quickly than 
others (e.g. may
   ```

##########
File path: cpp/src/arrow/dataset/test_util.h
##########
@@ -213,7 +263,7 @@ class DatasetFixtureMixin : public ::testing::Test {
 
   std::shared_ptr<Schema> schema_;
   std::shared_ptr<ScanOptions> options_;
-};
+};  // namespace dataset

Review comment:
       ```suggestion
   };
   ```

##########
File path: cpp/src/arrow/dataset/file_base.cc
##########
@@ -424,68 +424,21 @@ Status FileSystemDataset::Write(const 
FileSystemDatasetWriteOptions& write_optio
                                 std::shared_ptr<Scanner> scanner) {
   RETURN_NOT_OK(ValidateBasenameTemplate(write_options.basename_template));
 
-  auto task_group = scanner->options()->TaskGroup();
-
-  // Things we'll un-lazy for the sake of simplicity, with the tradeoff they 
represent:
-  //
-  // - Fragment iteration. Keeping this lazy would allow us to start 
partitioning/writing
-  //   any fragments we have before waiting for discovery to complete. This 
isn't
-  //   currently implemented for FileSystemDataset anyway: ARROW-8613
-  //
-  // - ScanTask iteration. Keeping this lazy would save some unnecessary 
blocking when
-  //   writing Fragments which produce scan tasks slowly. No Fragments do this.
-  //
-  // NB: neither of these will have any impact whatsoever on the common case 
of writing
-  //     an in-memory table to disk.
-  ARROW_ASSIGN_OR_RAISE(auto fragment_it, scanner->GetFragments());
-  ARROW_ASSIGN_OR_RAISE(FragmentVector fragments, fragment_it.ToVector());
-  ScanTaskVector scan_tasks;
-  std::vector<Future<>> scan_futs;
-
-  for (const auto& fragment : fragments) {
-    auto options = std::make_shared<ScanOptions>(*scanner->options());
-    // Avoid contention with multithreaded readers
-    options->use_threads = false;
-    ARROW_ASSIGN_OR_RAISE(auto scan_task_it,
-                          Scanner(fragment, std::move(options)).Scan());
-    for (auto maybe_scan_task : scan_task_it) {
-      ARROW_ASSIGN_OR_RAISE(auto scan_task, maybe_scan_task);
-      scan_tasks.push_back(std::move(scan_task));
-    }
-  }
-
-  // Store a mapping from partitions (represened by their formatted partition 
expressions)
-  // to a WriteQueue which flushes batches into that partition's output file. 
In principle
-  // any thread could produce a batch for any partition, so each task 
alternates between
-  // pushing batches and flushing them to disk.
+  // Store a mapping from partitions (represented by their formatted partition
+  // expressions) to a WriteQueue which flushes batches into that partition's 
output file.
+  // In principle any thread could produce a batch for any partition, so each 
task
+  // alternates between pushing batches and flushing them to disk.
   WriteState state(write_options);
 
-  for (const auto& scan_task : scan_tasks) {
-    if (scan_task->supports_async()) {
-      ARROW_ASSIGN_OR_RAISE(auto batches_gen, scan_task->ExecuteAsync());
-      std::function<Status(std::shared_ptr<RecordBatch> batch)> batch_visitor =
-          [&, scan_task](std::shared_ptr<RecordBatch> batch) {
-            return WriteNextBatch(state, scan_task, std::move(batch));
-          };
-      scan_futs.push_back(VisitAsyncGenerator(batches_gen, batch_visitor));
-    } else {
-      task_group->Append([&, scan_task] {
-        ARROW_ASSIGN_OR_RAISE(auto batches, scan_task->Execute());
-
-        for (auto maybe_batch : batches) {
-          ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch);
-          RETURN_NOT_OK(WriteNextBatch(state, scan_task, std::move(batch)));
-        }
+  ARROW_ASSIGN_OR_RAISE(auto batch_it, scanner->ScanBatches());
 
-        return Status::OK();
-      });
-    }
-  }
-  RETURN_NOT_OK(task_group->Finish());
-  auto scan_futs_all_done = AllComplete(scan_futs);
-  RETURN_NOT_OK(scan_futs_all_done.status());
+  RETURN_NOT_OK(batch_it.Visit([&state](TaggedRecordBatch tagged_batch) {

Review comment:
       This seems to remove parallel execution of batch partitioning. If that's 
intended then please call it out and add a comment with the follow up JIRA noted




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to