lidavidm commented on a change in pull request #9947:
URL: https://github.com/apache/arrow/pull/9947#discussion_r610653229



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -67,7 +68,105 @@ Result<RecordBatchGenerator> ScanTask::ExecuteAsync() {
 
 bool ScanTask::supports_async() const { return false; }
 
-Result<FragmentIterator> Scanner::GetFragments() {
+Result<ScanTaskIterator> Scanner::Scan() {
+  // TODO(ARROW-12289) This is overridden in SyncScanner and will never be 
implemented in
+  // AsyncScanner.  It is deprecated and will eventually go away.
+  return Status::NotImplemented("This scanner does not support the legacy 
Scan() method");
+}
+
+Result<EnumeratedRecordBatchIterator> Scanner::ScanBatchesUnordered() {
+  // If a scanner doesn't support unordered scanning (i.e. SyncScanner) then 
we just
+  // fall back to an ordered scan and assign the appropriate tagging
+  ARROW_ASSIGN_OR_RAISE(auto ordered_scan, ScanBatches());
+  return AddPositioningToInOrderScan(std::move(ordered_scan));
+}
+
+Result<EnumeratedRecordBatchIterator> Scanner::AddPositioningToInOrderScan(
+    TaggedRecordBatchIterator scan) {
+  ARROW_ASSIGN_OR_RAISE(auto first, scan.Next());
+  if (IsIterationEnd(first)) {
+    return MakeEmptyIterator<EnumeratedRecordBatch>();
+  }
+  struct State {
+    State(TaggedRecordBatchIterator source, TaggedRecordBatch first)

Review comment:
       `first` is unused here. (I guess it was meant to be used to initialize 
prev_batch?)

##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -138,47 +179,147 @@ ARROW_DS_EXPORT Result<ScanTaskIterator> 
ScanTaskIteratorFromRecordBatch(
     std::vector<std::shared_ptr<RecordBatch>> batches,
     std::shared_ptr<ScanOptions> options);
 
-/// \brief Scanner is a materialized scan operation with context and options
-/// bound. A scanner is the class that glues ScanTask, Fragment,
-/// and Dataset. In python pseudo code, it performs the following:
+template <typename T>
+struct Enumerated {
+  T value;
+  int index;
+  bool last;
+};
+
+/// \brief Combines a record batch with the fragment that the record batch 
originated
+/// from
+///
+/// Knowing the source fragment can be useful for debugging & understanding 
loaded data
+struct TaggedRecordBatch {
+  std::shared_ptr<RecordBatch> record_batch;
+  std::shared_ptr<Fragment> fragment;
+};

Review comment:
       N.B. you also need `operator==` if you want to use range-for over an 
iterator of these values.

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -67,7 +68,105 @@ Result<RecordBatchGenerator> ScanTask::ExecuteAsync() {
 
 bool ScanTask::supports_async() const { return false; }
 
-Result<FragmentIterator> Scanner::GetFragments() {
+Result<ScanTaskIterator> Scanner::Scan() {
+  // TODO(ARROW-12289) This is overridden in SyncScanner and will never be 
implemented in
+  // AsyncScanner.  It is deprecated and will eventually go away.
+  return Status::NotImplemented("This scanner does not support the legacy 
Scan() method");
+}
+
+Result<EnumeratedRecordBatchIterator> Scanner::ScanBatchesUnordered() {
+  // If a scanner doesn't support unordered scanning (i.e. SyncScanner) then 
we just
+  // fall back to an ordered scan and assign the appropriate tagging
+  ARROW_ASSIGN_OR_RAISE(auto ordered_scan, ScanBatches());
+  return AddPositioningToInOrderScan(std::move(ordered_scan));
+}
+
+Result<EnumeratedRecordBatchIterator> Scanner::AddPositioningToInOrderScan(
+    TaggedRecordBatchIterator scan) {
+  ARROW_ASSIGN_OR_RAISE(auto first, scan.Next());
+  if (IsIterationEnd(first)) {
+    return MakeEmptyIterator<EnumeratedRecordBatch>();
+  }
+  struct State {
+    State(TaggedRecordBatchIterator source, TaggedRecordBatch first)
+        : source(std::move(source)),
+          batch_index(1),
+          finished(false),
+          prev_batch(TaggedRecordBatch{nullptr, nullptr}) {}
+    TaggedRecordBatchIterator source;
+    int batch_index;
+    int fragment_index;
+    bool finished;
+    TaggedRecordBatch prev_batch;
+  };
+  struct TaggingIterator {

Review comment:
       Overall this state pattern is common enough for iterators/async 
generators that I feel like we should encapsulate it instead of defining it 
ad-hoc every time. (Not for this PR, but as a future task.)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to