westonpace commented on a change in pull request #9589: URL: https://github.com/apache/arrow/pull/9589#discussion_r609060135
########## File path: cpp/src/arrow/dataset/scanner.cc ########## @@ -240,5 +241,121 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() { FlattenRecordBatchVector(std::move(state->batches))); } +struct ToBatchesState : public std::enable_shared_from_this<ToBatchesState> { + explicit ToBatchesState(ScanTaskIterator scan_task_it, + std::shared_ptr<TaskGroup> task_group_) + : scan_tasks(std::move(scan_task_it)), task_group(std::move(task_group_)) {} + + /// Protecting mutating accesses to batches + std::mutex mutex; Review comment: Style nit: member variables should be at the bottom of a struct: https://google.github.io/styleguide/cppguide.html#Declaration_Order ########## File path: cpp/src/arrow/dataset/scanner.cc ########## @@ -240,5 +241,121 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() { FlattenRecordBatchVector(std::move(state->batches))); } +struct ToBatchesState : public std::enable_shared_from_this<ToBatchesState> { + explicit ToBatchesState(ScanTaskIterator scan_task_it, + std::shared_ptr<TaskGroup> task_group_) + : scan_tasks(std::move(scan_task_it)), task_group(std::move(task_group_)) {} + + /// Protecting mutating accesses to batches + std::mutex mutex; + std::condition_variable ready; + ScanTaskIterator scan_tasks; + std::shared_ptr<TaskGroup> task_group; + int next_scan_task_id = 0; + bool no_more_tasks = false; + Status iteration_error; + std::vector<std::deque<std::shared_ptr<RecordBatch>>> task_batches; + std::vector<bool> task_drained; + size_t pop_cursor = 0; + + void ResizeBatches(size_t task_index) { + if (task_batches.size() <= task_index) { + task_batches.resize(task_index + 1); + task_drained.resize(task_index + 1); + } + } + + void Push(std::shared_ptr<RecordBatch> batch, size_t task_index) { + std::lock_guard<std::mutex> lock(mutex); + ResizeBatches(task_index); + task_batches[task_index].push_back(std::move(batch)); + ready.notify_one(); + } + + Status Finish(size_t task_index) { + std::lock_guard<std::mutex> lock(mutex); + ResizeBatches(task_index); + task_drained[task_index] = true; + ready.notify_one(); + return Status::OK(); + } + + void PushScanTask() { + if (no_more_tasks) return; + auto maybe_task = scan_tasks.Next(); + if (!maybe_task.ok()) { + no_more_tasks = true; + iteration_error = maybe_task.status(); Review comment: I'm not entirely certain it is safe to modify `iteration_error` outside the mutex. What happens if `Pop` is accessing it at the same time? ########## File path: cpp/src/arrow/dataset/scanner.cc ########## @@ -240,5 +241,121 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() { FlattenRecordBatchVector(std::move(state->batches))); } +struct ToBatchesState : public std::enable_shared_from_this<ToBatchesState> { + explicit ToBatchesState(ScanTaskIterator scan_task_it, + std::shared_ptr<TaskGroup> task_group_) + : scan_tasks(std::move(scan_task_it)), task_group(std::move(task_group_)) {} + + /// Protecting mutating accesses to batches + std::mutex mutex; + std::condition_variable ready; + ScanTaskIterator scan_tasks; + std::shared_ptr<TaskGroup> task_group; + int next_scan_task_id = 0; + bool no_more_tasks = false; + Status iteration_error; + std::vector<std::deque<std::shared_ptr<RecordBatch>>> task_batches; + std::vector<bool> task_drained; + size_t pop_cursor = 0; + + void ResizeBatches(size_t task_index) { + if (task_batches.size() <= task_index) { + task_batches.resize(task_index + 1); + task_drained.resize(task_index + 1); + } + } + + void Push(std::shared_ptr<RecordBatch> batch, size_t task_index) { + std::lock_guard<std::mutex> lock(mutex); + ResizeBatches(task_index); + task_batches[task_index].push_back(std::move(batch)); + ready.notify_one(); + } + + Status Finish(size_t task_index) { + std::lock_guard<std::mutex> lock(mutex); + ResizeBatches(task_index); + task_drained[task_index] = true; + ready.notify_one(); + return Status::OK(); + } + + void PushScanTask() { + if (no_more_tasks) return; + auto maybe_task = scan_tasks.Next(); + if (!maybe_task.ok()) { + no_more_tasks = true; + iteration_error = maybe_task.status(); + return; + } + auto scan_task = maybe_task.ValueOrDie(); + if (IsIterationEnd(scan_task)) { + no_more_tasks = true; + return; + } + auto state = shared_from_this(); + auto id = next_scan_task_id++; + ResizeBatches(id); Review comment: This is also not safe to do outside the mutex. ########## File path: cpp/src/arrow/dataset/scanner_test.cc ########## @@ -151,6 +157,64 @@ TEST_F(TestScanner, ToTable) { AssertTablesEqual(*expected, *actual); } +TEST_F(TestScanner, ToBatches) { + SetSchema({field("i32", int32()), field("f64", float64())}); + auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_); + + for (bool use_threads : {false, true}) { + options_->use_threads = use_threads; + auto scanner = MakeScanner(batch); Review comment: Is this just testing a scan of one scan task & one batch? It seems we would want to test more than that. ########## File path: cpp/src/arrow/dataset/scanner.cc ########## @@ -240,5 +241,121 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() { FlattenRecordBatchVector(std::move(state->batches))); } +struct ToBatchesState : public std::enable_shared_from_this<ToBatchesState> { + explicit ToBatchesState(ScanTaskIterator scan_task_it, + std::shared_ptr<TaskGroup> task_group_) + : scan_tasks(std::move(scan_task_it)), task_group(std::move(task_group_)) {} + + /// Protecting mutating accesses to batches + std::mutex mutex; + std::condition_variable ready; + ScanTaskIterator scan_tasks; + std::shared_ptr<TaskGroup> task_group; + int next_scan_task_id = 0; + bool no_more_tasks = false; + Status iteration_error; + std::vector<std::deque<std::shared_ptr<RecordBatch>>> task_batches; + std::vector<bool> task_drained; + size_t pop_cursor = 0; + + void ResizeBatches(size_t task_index) { + if (task_batches.size() <= task_index) { + task_batches.resize(task_index + 1); + task_drained.resize(task_index + 1); + } + } + + void Push(std::shared_ptr<RecordBatch> batch, size_t task_index) { + std::lock_guard<std::mutex> lock(mutex); + ResizeBatches(task_index); + task_batches[task_index].push_back(std::move(batch)); + ready.notify_one(); + } + + Status Finish(size_t task_index) { + std::lock_guard<std::mutex> lock(mutex); + ResizeBatches(task_index); + task_drained[task_index] = true; + ready.notify_one(); + return Status::OK(); + } + + void PushScanTask() { + if (no_more_tasks) return; + auto maybe_task = scan_tasks.Next(); + if (!maybe_task.ok()) { + no_more_tasks = true; + iteration_error = maybe_task.status(); + return; + } + auto scan_task = maybe_task.ValueOrDie(); + if (IsIterationEnd(scan_task)) { + no_more_tasks = true; + return; + } + auto state = shared_from_this(); + auto id = next_scan_task_id++; + ResizeBatches(id); + task_group->Append([state, id, scan_task]() { + ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute()); + for (auto maybe_batch : batch_it) { + ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch); + state->Push(std::move(batch), id); + } + return state->Finish(id); + }); + } + + Result<std::shared_ptr<RecordBatch>> Pop() { + std::unique_lock<std::mutex> lock(mutex); + ready.wait(lock, [this, &lock] { + while (pop_cursor < task_batches.size()) { + // queue for current scan task contains at least one batch, pop that + if (!task_batches[pop_cursor].empty()) return true; + // queue is empty but will be appended to eventually, wait for that + if (!task_drained[pop_cursor]) return false; + + // Finished draining current scan task, enqueue a new one + ++pop_cursor; + // Must unlock since serial task group will execute synchronously + lock.unlock(); + PushScanTask(); + lock.lock(); + } + DCHECK(no_more_tasks); + // all scan tasks drained (or getting next task failed), terminate + return true; + }); + + if (pop_cursor == task_batches.size()) { + // Don't report an error until we yield up everything we can first Review comment: Hmm, in 7001 we generally purge all state on error and deliver it as quickly as possible. I'm not saying this approach is wrong. But we should probably decide on the best approach. ########## File path: cpp/src/arrow/dataset/scanner.cc ########## @@ -240,5 +241,121 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() { FlattenRecordBatchVector(std::move(state->batches))); } +struct ToBatchesState : public std::enable_shared_from_this<ToBatchesState> { + explicit ToBatchesState(ScanTaskIterator scan_task_it, + std::shared_ptr<TaskGroup> task_group_) + : scan_tasks(std::move(scan_task_it)), task_group(std::move(task_group_)) {} + + /// Protecting mutating accesses to batches + std::mutex mutex; + std::condition_variable ready; + ScanTaskIterator scan_tasks; + std::shared_ptr<TaskGroup> task_group; + int next_scan_task_id = 0; + bool no_more_tasks = false; + Status iteration_error; + std::vector<std::deque<std::shared_ptr<RecordBatch>>> task_batches; + std::vector<bool> task_drained; + size_t pop_cursor = 0; + + void ResizeBatches(size_t task_index) { + if (task_batches.size() <= task_index) { + task_batches.resize(task_index + 1); + task_drained.resize(task_index + 1); + } + } + + void Push(std::shared_ptr<RecordBatch> batch, size_t task_index) { + std::lock_guard<std::mutex> lock(mutex); + ResizeBatches(task_index); + task_batches[task_index].push_back(std::move(batch)); + ready.notify_one(); Review comment: Nit: best to relinquish the mutex before calling notify ########## File path: python/pyarrow/tests/test_dataset.py ########## @@ -334,7 +333,8 @@ def test_dataset_execute_iterator(dataset): # ARROW-11596: this would segfault due to Cython raising # StopIteration without holding the GIL. (Fixed on Cython master, # post 3.0a6) - tasks = dataset.scan() + with pytest.deprecated_call(): Review comment: At the very least we should create a JIRA to migrate these over to the new scan. Although I'm wondering if we want to just do that now because that would give us a lot more coverage of the non-deprecated path. ########## File path: cpp/src/arrow/dataset/scanner.h ########## @@ -165,6 +166,9 @@ class ARROW_DS_EXPORT Scanner { /// Scan result in memory before creating the Table. Result<std::shared_ptr<Table>> ToTable(); + /// \brief ToBatches returns an iterator over all Batches yielded by this scan. + Result<RecordBatchIterator> ToBatches(); Review comment: Nit: If I see `ToBatches` I expect `RecordBatchVector`. I had named it `ScanBatches` but I don't feel too strongly on this point. ########## File path: cpp/src/arrow/dataset/scanner.cc ########## @@ -240,5 +241,121 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() { FlattenRecordBatchVector(std::move(state->batches))); } +struct ToBatchesState : public std::enable_shared_from_this<ToBatchesState> { + explicit ToBatchesState(ScanTaskIterator scan_task_it, + std::shared_ptr<TaskGroup> task_group_) + : scan_tasks(std::move(scan_task_it)), task_group(std::move(task_group_)) {} + + /// Protecting mutating accesses to batches + std::mutex mutex; + std::condition_variable ready; + ScanTaskIterator scan_tasks; + std::shared_ptr<TaskGroup> task_group; + int next_scan_task_id = 0; + bool no_more_tasks = false; + Status iteration_error; + std::vector<std::deque<std::shared_ptr<RecordBatch>>> task_batches; + std::vector<bool> task_drained; + size_t pop_cursor = 0; + + void ResizeBatches(size_t task_index) { + if (task_batches.size() <= task_index) { + task_batches.resize(task_index + 1); + task_drained.resize(task_index + 1); + } + } + + void Push(std::shared_ptr<RecordBatch> batch, size_t task_index) { + std::lock_guard<std::mutex> lock(mutex); + ResizeBatches(task_index); + task_batches[task_index].push_back(std::move(batch)); + ready.notify_one(); + } + + Status Finish(size_t task_index) { + std::lock_guard<std::mutex> lock(mutex); + ResizeBatches(task_index); + task_drained[task_index] = true; + ready.notify_one(); + return Status::OK(); + } + + void PushScanTask() { + if (no_more_tasks) return; + auto maybe_task = scan_tasks.Next(); + if (!maybe_task.ok()) { + no_more_tasks = true; + iteration_error = maybe_task.status(); + return; + } + auto scan_task = maybe_task.ValueOrDie(); + if (IsIterationEnd(scan_task)) { + no_more_tasks = true; + return; + } + auto state = shared_from_this(); + auto id = next_scan_task_id++; + ResizeBatches(id); + task_group->Append([state, id, scan_task]() { + ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute()); + for (auto maybe_batch : batch_it) { + ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch); + state->Push(std::move(batch), id); + } + return state->Finish(id); + }); + } + + Result<std::shared_ptr<RecordBatch>> Pop() { + std::unique_lock<std::mutex> lock(mutex); + ready.wait(lock, [this, &lock] { + while (pop_cursor < task_batches.size()) { + // queue for current scan task contains at least one batch, pop that + if (!task_batches[pop_cursor].empty()) return true; + // queue is empty but will be appended to eventually, wait for that + if (!task_drained[pop_cursor]) return false; + + // Finished draining current scan task, enqueue a new one + ++pop_cursor; + // Must unlock since serial task group will execute synchronously + lock.unlock(); + PushScanTask(); + lock.lock(); + } + DCHECK(no_more_tasks); + // all scan tasks drained (or getting next task failed), terminate Review comment: I think I'm just not seeing it but what causes the above loop to exit if there is an error scanning? ########## File path: r/R/dataset-scan.R ########## @@ -142,17 +146,13 @@ map_batches <- function(X, FUN, ..., .data.frame = TRUE) { scanner <- Scanner$create(ensure_group_vars(X)) FUN <- as_mapper(FUN) # message("Making ScanTasks") - lapply(scanner$Scan(), function(scan_task) { - # This outer lapply could be parallelized - # message("Making Batches") - lapply(scan_task$Execute(), function(batch) { - # message("Processing Batch") - # This inner lapply cannot be parallelized - # TODO: wrap batch in arrow_dplyr_query with X$selected_columns, - # X$temp_columns, and X$group_by_vars - # if X is arrow_dplyr_query, if some other arg (.dplyr?) == TRUE - FUN(batch, ...) - }) + lapply(scanner$ToBatches(), function(batch) { + # message("Processing Batch") + # This inner lapply cannot be parallelized Review comment: Nit: Does this comment still make sense? ########## File path: cpp/src/arrow/dataset/scanner_test.cc ########## @@ -151,6 +157,64 @@ TEST_F(TestScanner, ToTable) { AssertTablesEqual(*expected, *actual); } +TEST_F(TestScanner, ToBatches) { + SetSchema({field("i32", int32()), field("f64", float64())}); + auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_); + + for (bool use_threads : {false, true}) { Review comment: Consider making a parameterized test instead so it is easier to trace failures if needed? ########## File path: cpp/src/arrow/dataset/scanner.cc ########## @@ -240,5 +241,121 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() { FlattenRecordBatchVector(std::move(state->batches))); } +struct ToBatchesState : public std::enable_shared_from_this<ToBatchesState> { + explicit ToBatchesState(ScanTaskIterator scan_task_it, + std::shared_ptr<TaskGroup> task_group_) + : scan_tasks(std::move(scan_task_it)), task_group(std::move(task_group_)) {} + + /// Protecting mutating accesses to batches + std::mutex mutex; + std::condition_variable ready; + ScanTaskIterator scan_tasks; + std::shared_ptr<TaskGroup> task_group; + int next_scan_task_id = 0; + bool no_more_tasks = false; + Status iteration_error; + std::vector<std::deque<std::shared_ptr<RecordBatch>>> task_batches; + std::vector<bool> task_drained; + size_t pop_cursor = 0; + + void ResizeBatches(size_t task_index) { + if (task_batches.size() <= task_index) { + task_batches.resize(task_index + 1); + task_drained.resize(task_index + 1); + } + } + + void Push(std::shared_ptr<RecordBatch> batch, size_t task_index) { + std::lock_guard<std::mutex> lock(mutex); + ResizeBatches(task_index); + task_batches[task_index].push_back(std::move(batch)); + ready.notify_one(); + } + + Status Finish(size_t task_index) { + std::lock_guard<std::mutex> lock(mutex); + ResizeBatches(task_index); + task_drained[task_index] = true; + ready.notify_one(); + return Status::OK(); + } + + void PushScanTask() { + if (no_more_tasks) return; + auto maybe_task = scan_tasks.Next(); + if (!maybe_task.ok()) { + no_more_tasks = true; + iteration_error = maybe_task.status(); + return; + } + auto scan_task = maybe_task.ValueOrDie(); + if (IsIterationEnd(scan_task)) { + no_more_tasks = true; + return; + } + auto state = shared_from_this(); + auto id = next_scan_task_id++; + ResizeBatches(id); + task_group->Append([state, id, scan_task]() { + ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute()); + for (auto maybe_batch : batch_it) { + ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch); + state->Push(std::move(batch), id); + } + return state->Finish(id); + }); + } + + Result<std::shared_ptr<RecordBatch>> Pop() { + std::unique_lock<std::mutex> lock(mutex); + ready.wait(lock, [this, &lock] { + while (pop_cursor < task_batches.size()) { + // queue for current scan task contains at least one batch, pop that + if (!task_batches[pop_cursor].empty()) return true; + // queue is empty but will be appended to eventually, wait for that + if (!task_drained[pop_cursor]) return false; + + // Finished draining current scan task, enqueue a new one + ++pop_cursor; + // Must unlock since serial task group will execute synchronously + lock.unlock(); + PushScanTask(); + lock.lock(); + } + DCHECK(no_more_tasks); + // all scan tasks drained (or getting next task failed), terminate + return true; + }); + + if (pop_cursor == task_batches.size()) { + // Don't report an error until we yield up everything we can first + RETURN_NOT_OK(iteration_error); + return nullptr; + } + + auto batch = std::move(task_batches[pop_cursor].front()); + task_batches[pop_cursor].pop_front(); + return batch; + } +}; + +constexpr int kToBatchesReadaheadLevel = 2; Review comment: So we have scan task readahead but no limit on the batch readahead? This is probably ok. If a user really needed it they could use scan. Both IPC and CSV readers are single-scan-task multiple-batches so it would mean there is no memory pressure for the IPC and CSV readers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org