lidavidm commented on a change in pull request #9947: URL: https://github.com/apache/arrow/pull/9947#discussion_r610653229
########## File path: cpp/src/arrow/dataset/scanner.cc ########## @@ -67,7 +68,105 @@ Result<RecordBatchGenerator> ScanTask::ExecuteAsync() { bool ScanTask::supports_async() const { return false; } -Result<FragmentIterator> Scanner::GetFragments() { +Result<ScanTaskIterator> Scanner::Scan() { + // TODO(ARROW-12289) This is overridden in SyncScanner and will never be implemented in + // AsyncScanner. It is deprecated and will eventually go away. + return Status::NotImplemented("This scanner does not support the legacy Scan() method"); +} + +Result<EnumeratedRecordBatchIterator> Scanner::ScanBatchesUnordered() { + // If a scanner doesn't support unordered scanning (i.e. SyncScanner) then we just + // fall back to an ordered scan and assign the appropriate tagging + ARROW_ASSIGN_OR_RAISE(auto ordered_scan, ScanBatches()); + return AddPositioningToInOrderScan(std::move(ordered_scan)); +} + +Result<EnumeratedRecordBatchIterator> Scanner::AddPositioningToInOrderScan( + TaggedRecordBatchIterator scan) { + ARROW_ASSIGN_OR_RAISE(auto first, scan.Next()); + if (IsIterationEnd(first)) { + return MakeEmptyIterator<EnumeratedRecordBatch>(); + } + struct State { + State(TaggedRecordBatchIterator source, TaggedRecordBatch first) Review comment: `first` is unused here. (I guess it was meant to be used to initialize prev_batch?) ########## File path: cpp/src/arrow/dataset/scanner.h ########## @@ -138,47 +179,147 @@ ARROW_DS_EXPORT Result<ScanTaskIterator> ScanTaskIteratorFromRecordBatch( std::vector<std::shared_ptr<RecordBatch>> batches, std::shared_ptr<ScanOptions> options); -/// \brief Scanner is a materialized scan operation with context and options -/// bound. A scanner is the class that glues ScanTask, Fragment, -/// and Dataset. In python pseudo code, it performs the following: +template <typename T> +struct Enumerated { + T value; + int index; + bool last; +}; + +/// \brief Combines a record batch with the fragment that the record batch originated +/// from +/// +/// Knowing the source fragment can be useful for debugging & understanding loaded data +struct TaggedRecordBatch { + std::shared_ptr<RecordBatch> record_batch; + std::shared_ptr<Fragment> fragment; +}; Review comment: N.B. you also need `operator==` if you want to use range-for over an iterator of these values. ########## File path: cpp/src/arrow/dataset/scanner.cc ########## @@ -67,7 +68,105 @@ Result<RecordBatchGenerator> ScanTask::ExecuteAsync() { bool ScanTask::supports_async() const { return false; } -Result<FragmentIterator> Scanner::GetFragments() { +Result<ScanTaskIterator> Scanner::Scan() { + // TODO(ARROW-12289) This is overridden in SyncScanner and will never be implemented in + // AsyncScanner. It is deprecated and will eventually go away. + return Status::NotImplemented("This scanner does not support the legacy Scan() method"); +} + +Result<EnumeratedRecordBatchIterator> Scanner::ScanBatchesUnordered() { + // If a scanner doesn't support unordered scanning (i.e. SyncScanner) then we just + // fall back to an ordered scan and assign the appropriate tagging + ARROW_ASSIGN_OR_RAISE(auto ordered_scan, ScanBatches()); + return AddPositioningToInOrderScan(std::move(ordered_scan)); +} + +Result<EnumeratedRecordBatchIterator> Scanner::AddPositioningToInOrderScan( + TaggedRecordBatchIterator scan) { + ARROW_ASSIGN_OR_RAISE(auto first, scan.Next()); + if (IsIterationEnd(first)) { + return MakeEmptyIterator<EnumeratedRecordBatch>(); + } + struct State { + State(TaggedRecordBatchIterator source, TaggedRecordBatch first) + : source(std::move(source)), + batch_index(1), + finished(false), + prev_batch(TaggedRecordBatch{nullptr, nullptr}) {} + TaggedRecordBatchIterator source; + int batch_index; + int fragment_index; + bool finished; + TaggedRecordBatch prev_batch; + }; + struct TaggingIterator { Review comment: Overall this state pattern is common enough for iterators/async generators that I feel like we should encapsulate it instead of defining it ad-hoc every time. (Not for this PR, but as a future task.) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org