westonpace commented on a change in pull request #9589:
URL: https://github.com/apache/arrow/pull/9589#discussion_r609060135



##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -240,5 +241,121 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() {
                                   
FlattenRecordBatchVector(std::move(state->batches)));
 }
 
+struct ToBatchesState : public std::enable_shared_from_this<ToBatchesState> {
+  explicit ToBatchesState(ScanTaskIterator scan_task_it,
+                          std::shared_ptr<TaskGroup> task_group_)
+      : scan_tasks(std::move(scan_task_it)), 
task_group(std::move(task_group_)) {}
+
+  /// Protecting mutating accesses to batches
+  std::mutex mutex;

Review comment:
       Style nit: member variables should be at the bottom of a struct:
   
   https://google.github.io/styleguide/cppguide.html#Declaration_Order
   

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -240,5 +241,121 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() {
                                   
FlattenRecordBatchVector(std::move(state->batches)));
 }
 
+struct ToBatchesState : public std::enable_shared_from_this<ToBatchesState> {
+  explicit ToBatchesState(ScanTaskIterator scan_task_it,
+                          std::shared_ptr<TaskGroup> task_group_)
+      : scan_tasks(std::move(scan_task_it)), 
task_group(std::move(task_group_)) {}
+
+  /// Protecting mutating accesses to batches
+  std::mutex mutex;
+  std::condition_variable ready;
+  ScanTaskIterator scan_tasks;
+  std::shared_ptr<TaskGroup> task_group;
+  int next_scan_task_id = 0;
+  bool no_more_tasks = false;
+  Status iteration_error;
+  std::vector<std::deque<std::shared_ptr<RecordBatch>>> task_batches;
+  std::vector<bool> task_drained;
+  size_t pop_cursor = 0;
+
+  void ResizeBatches(size_t task_index) {
+    if (task_batches.size() <= task_index) {
+      task_batches.resize(task_index + 1);
+      task_drained.resize(task_index + 1);
+    }
+  }
+
+  void Push(std::shared_ptr<RecordBatch> batch, size_t task_index) {
+    std::lock_guard<std::mutex> lock(mutex);
+    ResizeBatches(task_index);
+    task_batches[task_index].push_back(std::move(batch));
+    ready.notify_one();
+  }
+
+  Status Finish(size_t task_index) {
+    std::lock_guard<std::mutex> lock(mutex);
+    ResizeBatches(task_index);
+    task_drained[task_index] = true;
+    ready.notify_one();
+    return Status::OK();
+  }
+
+  void PushScanTask() {
+    if (no_more_tasks) return;
+    auto maybe_task = scan_tasks.Next();
+    if (!maybe_task.ok()) {
+      no_more_tasks = true;
+      iteration_error = maybe_task.status();

Review comment:
       I'm not entirely certain it is safe to modify `iteration_error` outside 
the mutex.  What happens if `Pop` is accessing it at the same time?

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -240,5 +241,121 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() {
                                   
FlattenRecordBatchVector(std::move(state->batches)));
 }
 
+struct ToBatchesState : public std::enable_shared_from_this<ToBatchesState> {
+  explicit ToBatchesState(ScanTaskIterator scan_task_it,
+                          std::shared_ptr<TaskGroup> task_group_)
+      : scan_tasks(std::move(scan_task_it)), 
task_group(std::move(task_group_)) {}
+
+  /// Protecting mutating accesses to batches
+  std::mutex mutex;
+  std::condition_variable ready;
+  ScanTaskIterator scan_tasks;
+  std::shared_ptr<TaskGroup> task_group;
+  int next_scan_task_id = 0;
+  bool no_more_tasks = false;
+  Status iteration_error;
+  std::vector<std::deque<std::shared_ptr<RecordBatch>>> task_batches;
+  std::vector<bool> task_drained;
+  size_t pop_cursor = 0;
+
+  void ResizeBatches(size_t task_index) {
+    if (task_batches.size() <= task_index) {
+      task_batches.resize(task_index + 1);
+      task_drained.resize(task_index + 1);
+    }
+  }
+
+  void Push(std::shared_ptr<RecordBatch> batch, size_t task_index) {
+    std::lock_guard<std::mutex> lock(mutex);
+    ResizeBatches(task_index);
+    task_batches[task_index].push_back(std::move(batch));
+    ready.notify_one();
+  }
+
+  Status Finish(size_t task_index) {
+    std::lock_guard<std::mutex> lock(mutex);
+    ResizeBatches(task_index);
+    task_drained[task_index] = true;
+    ready.notify_one();
+    return Status::OK();
+  }
+
+  void PushScanTask() {
+    if (no_more_tasks) return;
+    auto maybe_task = scan_tasks.Next();
+    if (!maybe_task.ok()) {
+      no_more_tasks = true;
+      iteration_error = maybe_task.status();
+      return;
+    }
+    auto scan_task = maybe_task.ValueOrDie();
+    if (IsIterationEnd(scan_task)) {
+      no_more_tasks = true;
+      return;
+    }
+    auto state = shared_from_this();
+    auto id = next_scan_task_id++;
+    ResizeBatches(id);

Review comment:
       This is also not safe to do outside the mutex.

##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -151,6 +157,64 @@ TEST_F(TestScanner, ToTable) {
   AssertTablesEqual(*expected, *actual);
 }
 
+TEST_F(TestScanner, ToBatches) {
+  SetSchema({field("i32", int32()), field("f64", float64())});
+  auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_);
+
+  for (bool use_threads : {false, true}) {
+    options_->use_threads = use_threads;
+    auto scanner = MakeScanner(batch);

Review comment:
       Is this just testing a scan of one scan task & one batch?  It seems we 
would want to test more than that.

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -240,5 +241,121 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() {
                                   
FlattenRecordBatchVector(std::move(state->batches)));
 }
 
+struct ToBatchesState : public std::enable_shared_from_this<ToBatchesState> {
+  explicit ToBatchesState(ScanTaskIterator scan_task_it,
+                          std::shared_ptr<TaskGroup> task_group_)
+      : scan_tasks(std::move(scan_task_it)), 
task_group(std::move(task_group_)) {}
+
+  /// Protecting mutating accesses to batches
+  std::mutex mutex;
+  std::condition_variable ready;
+  ScanTaskIterator scan_tasks;
+  std::shared_ptr<TaskGroup> task_group;
+  int next_scan_task_id = 0;
+  bool no_more_tasks = false;
+  Status iteration_error;
+  std::vector<std::deque<std::shared_ptr<RecordBatch>>> task_batches;
+  std::vector<bool> task_drained;
+  size_t pop_cursor = 0;
+
+  void ResizeBatches(size_t task_index) {
+    if (task_batches.size() <= task_index) {
+      task_batches.resize(task_index + 1);
+      task_drained.resize(task_index + 1);
+    }
+  }
+
+  void Push(std::shared_ptr<RecordBatch> batch, size_t task_index) {
+    std::lock_guard<std::mutex> lock(mutex);
+    ResizeBatches(task_index);
+    task_batches[task_index].push_back(std::move(batch));
+    ready.notify_one();
+  }
+
+  Status Finish(size_t task_index) {
+    std::lock_guard<std::mutex> lock(mutex);
+    ResizeBatches(task_index);
+    task_drained[task_index] = true;
+    ready.notify_one();
+    return Status::OK();
+  }
+
+  void PushScanTask() {
+    if (no_more_tasks) return;
+    auto maybe_task = scan_tasks.Next();
+    if (!maybe_task.ok()) {
+      no_more_tasks = true;
+      iteration_error = maybe_task.status();
+      return;
+    }
+    auto scan_task = maybe_task.ValueOrDie();
+    if (IsIterationEnd(scan_task)) {
+      no_more_tasks = true;
+      return;
+    }
+    auto state = shared_from_this();
+    auto id = next_scan_task_id++;
+    ResizeBatches(id);
+    task_group->Append([state, id, scan_task]() {
+      ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute());
+      for (auto maybe_batch : batch_it) {
+        ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch);
+        state->Push(std::move(batch), id);
+      }
+      return state->Finish(id);
+    });
+  }
+
+  Result<std::shared_ptr<RecordBatch>> Pop() {
+    std::unique_lock<std::mutex> lock(mutex);
+    ready.wait(lock, [this, &lock] {
+      while (pop_cursor < task_batches.size()) {
+        // queue for current scan task contains at least one batch, pop that
+        if (!task_batches[pop_cursor].empty()) return true;
+        // queue is empty but will be appended to eventually, wait for that
+        if (!task_drained[pop_cursor]) return false;
+
+        // Finished draining current scan task, enqueue a new one
+        ++pop_cursor;
+        // Must unlock since serial task group will execute synchronously
+        lock.unlock();
+        PushScanTask();
+        lock.lock();
+      }
+      DCHECK(no_more_tasks);
+      // all scan tasks drained (or getting next task failed), terminate
+      return true;
+    });
+
+    if (pop_cursor == task_batches.size()) {
+      // Don't report an error until we yield up everything we can first

Review comment:
       Hmm, in 7001 we generally purge all state on error and deliver it as 
quickly as possible.  I'm not saying this approach is wrong.  But we should 
probably decide on the best approach.

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -240,5 +241,121 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() {
                                   
FlattenRecordBatchVector(std::move(state->batches)));
 }
 
+struct ToBatchesState : public std::enable_shared_from_this<ToBatchesState> {
+  explicit ToBatchesState(ScanTaskIterator scan_task_it,
+                          std::shared_ptr<TaskGroup> task_group_)
+      : scan_tasks(std::move(scan_task_it)), 
task_group(std::move(task_group_)) {}
+
+  /// Protecting mutating accesses to batches
+  std::mutex mutex;
+  std::condition_variable ready;
+  ScanTaskIterator scan_tasks;
+  std::shared_ptr<TaskGroup> task_group;
+  int next_scan_task_id = 0;
+  bool no_more_tasks = false;
+  Status iteration_error;
+  std::vector<std::deque<std::shared_ptr<RecordBatch>>> task_batches;
+  std::vector<bool> task_drained;
+  size_t pop_cursor = 0;
+
+  void ResizeBatches(size_t task_index) {
+    if (task_batches.size() <= task_index) {
+      task_batches.resize(task_index + 1);
+      task_drained.resize(task_index + 1);
+    }
+  }
+
+  void Push(std::shared_ptr<RecordBatch> batch, size_t task_index) {
+    std::lock_guard<std::mutex> lock(mutex);
+    ResizeBatches(task_index);
+    task_batches[task_index].push_back(std::move(batch));
+    ready.notify_one();

Review comment:
       Nit: best to relinquish the mutex before calling notify

##########
File path: python/pyarrow/tests/test_dataset.py
##########
@@ -334,7 +333,8 @@ def test_dataset_execute_iterator(dataset):
     # ARROW-11596: this would segfault due to Cython raising
     # StopIteration without holding the GIL. (Fixed on Cython master,
     # post 3.0a6)
-    tasks = dataset.scan()
+    with pytest.deprecated_call():

Review comment:
       At the very least we should create a JIRA to migrate these over to the 
new scan.  Although I'm wondering if we want to just do that now because that 
would give us a lot more coverage of the non-deprecated path.

##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -165,6 +166,9 @@ class ARROW_DS_EXPORT Scanner {
   /// Scan result in memory before creating the Table.
   Result<std::shared_ptr<Table>> ToTable();
 
+  /// \brief ToBatches returns an iterator over all Batches yielded by this 
scan.
+  Result<RecordBatchIterator> ToBatches();

Review comment:
       Nit: If I see `ToBatches` I expect `RecordBatchVector`.  I had named it 
`ScanBatches` but I don't feel too strongly on this point.

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -240,5 +241,121 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() {
                                   
FlattenRecordBatchVector(std::move(state->batches)));
 }
 
+struct ToBatchesState : public std::enable_shared_from_this<ToBatchesState> {
+  explicit ToBatchesState(ScanTaskIterator scan_task_it,
+                          std::shared_ptr<TaskGroup> task_group_)
+      : scan_tasks(std::move(scan_task_it)), 
task_group(std::move(task_group_)) {}
+
+  /// Protecting mutating accesses to batches
+  std::mutex mutex;
+  std::condition_variable ready;
+  ScanTaskIterator scan_tasks;
+  std::shared_ptr<TaskGroup> task_group;
+  int next_scan_task_id = 0;
+  bool no_more_tasks = false;
+  Status iteration_error;
+  std::vector<std::deque<std::shared_ptr<RecordBatch>>> task_batches;
+  std::vector<bool> task_drained;
+  size_t pop_cursor = 0;
+
+  void ResizeBatches(size_t task_index) {
+    if (task_batches.size() <= task_index) {
+      task_batches.resize(task_index + 1);
+      task_drained.resize(task_index + 1);
+    }
+  }
+
+  void Push(std::shared_ptr<RecordBatch> batch, size_t task_index) {
+    std::lock_guard<std::mutex> lock(mutex);
+    ResizeBatches(task_index);
+    task_batches[task_index].push_back(std::move(batch));
+    ready.notify_one();
+  }
+
+  Status Finish(size_t task_index) {
+    std::lock_guard<std::mutex> lock(mutex);
+    ResizeBatches(task_index);
+    task_drained[task_index] = true;
+    ready.notify_one();
+    return Status::OK();
+  }
+
+  void PushScanTask() {
+    if (no_more_tasks) return;
+    auto maybe_task = scan_tasks.Next();
+    if (!maybe_task.ok()) {
+      no_more_tasks = true;
+      iteration_error = maybe_task.status();
+      return;
+    }
+    auto scan_task = maybe_task.ValueOrDie();
+    if (IsIterationEnd(scan_task)) {
+      no_more_tasks = true;
+      return;
+    }
+    auto state = shared_from_this();
+    auto id = next_scan_task_id++;
+    ResizeBatches(id);
+    task_group->Append([state, id, scan_task]() {
+      ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute());
+      for (auto maybe_batch : batch_it) {
+        ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch);
+        state->Push(std::move(batch), id);
+      }
+      return state->Finish(id);
+    });
+  }
+
+  Result<std::shared_ptr<RecordBatch>> Pop() {
+    std::unique_lock<std::mutex> lock(mutex);
+    ready.wait(lock, [this, &lock] {
+      while (pop_cursor < task_batches.size()) {
+        // queue for current scan task contains at least one batch, pop that
+        if (!task_batches[pop_cursor].empty()) return true;
+        // queue is empty but will be appended to eventually, wait for that
+        if (!task_drained[pop_cursor]) return false;
+
+        // Finished draining current scan task, enqueue a new one
+        ++pop_cursor;
+        // Must unlock since serial task group will execute synchronously
+        lock.unlock();
+        PushScanTask();
+        lock.lock();
+      }
+      DCHECK(no_more_tasks);
+      // all scan tasks drained (or getting next task failed), terminate

Review comment:
       I think I'm just not seeing it but what causes the above loop to exit if 
there is an error scanning?

##########
File path: r/R/dataset-scan.R
##########
@@ -142,17 +146,13 @@ map_batches <- function(X, FUN, ..., .data.frame = TRUE) {
   scanner <- Scanner$create(ensure_group_vars(X))
   FUN <- as_mapper(FUN)
   # message("Making ScanTasks")
-  lapply(scanner$Scan(), function(scan_task) {
-    # This outer lapply could be parallelized
-    # message("Making Batches")
-    lapply(scan_task$Execute(), function(batch) {
-      # message("Processing Batch")
-      # This inner lapply cannot be parallelized
-      # TODO: wrap batch in arrow_dplyr_query with X$selected_columns,
-      # X$temp_columns, and X$group_by_vars
-      # if X is arrow_dplyr_query, if some other arg (.dplyr?) == TRUE
-      FUN(batch, ...)
-    })
+  lapply(scanner$ToBatches(), function(batch) {
+    # message("Processing Batch")
+    # This inner lapply cannot be parallelized

Review comment:
       Nit: Does this comment still make sense?

##########
File path: cpp/src/arrow/dataset/scanner_test.cc
##########
@@ -151,6 +157,64 @@ TEST_F(TestScanner, ToTable) {
   AssertTablesEqual(*expected, *actual);
 }
 
+TEST_F(TestScanner, ToBatches) {
+  SetSchema({field("i32", int32()), field("f64", float64())});
+  auto batch = ConstantArrayGenerator::Zeroes(kBatchSize, schema_);
+
+  for (bool use_threads : {false, true}) {

Review comment:
       Consider making a parameterized test instead so it is easier to trace 
failures if needed?

##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -240,5 +241,121 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() {
                                   
FlattenRecordBatchVector(std::move(state->batches)));
 }
 
+struct ToBatchesState : public std::enable_shared_from_this<ToBatchesState> {
+  explicit ToBatchesState(ScanTaskIterator scan_task_it,
+                          std::shared_ptr<TaskGroup> task_group_)
+      : scan_tasks(std::move(scan_task_it)), 
task_group(std::move(task_group_)) {}
+
+  /// Protecting mutating accesses to batches
+  std::mutex mutex;
+  std::condition_variable ready;
+  ScanTaskIterator scan_tasks;
+  std::shared_ptr<TaskGroup> task_group;
+  int next_scan_task_id = 0;
+  bool no_more_tasks = false;
+  Status iteration_error;
+  std::vector<std::deque<std::shared_ptr<RecordBatch>>> task_batches;
+  std::vector<bool> task_drained;
+  size_t pop_cursor = 0;
+
+  void ResizeBatches(size_t task_index) {
+    if (task_batches.size() <= task_index) {
+      task_batches.resize(task_index + 1);
+      task_drained.resize(task_index + 1);
+    }
+  }
+
+  void Push(std::shared_ptr<RecordBatch> batch, size_t task_index) {
+    std::lock_guard<std::mutex> lock(mutex);
+    ResizeBatches(task_index);
+    task_batches[task_index].push_back(std::move(batch));
+    ready.notify_one();
+  }
+
+  Status Finish(size_t task_index) {
+    std::lock_guard<std::mutex> lock(mutex);
+    ResizeBatches(task_index);
+    task_drained[task_index] = true;
+    ready.notify_one();
+    return Status::OK();
+  }
+
+  void PushScanTask() {
+    if (no_more_tasks) return;
+    auto maybe_task = scan_tasks.Next();
+    if (!maybe_task.ok()) {
+      no_more_tasks = true;
+      iteration_error = maybe_task.status();
+      return;
+    }
+    auto scan_task = maybe_task.ValueOrDie();
+    if (IsIterationEnd(scan_task)) {
+      no_more_tasks = true;
+      return;
+    }
+    auto state = shared_from_this();
+    auto id = next_scan_task_id++;
+    ResizeBatches(id);
+    task_group->Append([state, id, scan_task]() {
+      ARROW_ASSIGN_OR_RAISE(auto batch_it, scan_task->Execute());
+      for (auto maybe_batch : batch_it) {
+        ARROW_ASSIGN_OR_RAISE(auto batch, maybe_batch);
+        state->Push(std::move(batch), id);
+      }
+      return state->Finish(id);
+    });
+  }
+
+  Result<std::shared_ptr<RecordBatch>> Pop() {
+    std::unique_lock<std::mutex> lock(mutex);
+    ready.wait(lock, [this, &lock] {
+      while (pop_cursor < task_batches.size()) {
+        // queue for current scan task contains at least one batch, pop that
+        if (!task_batches[pop_cursor].empty()) return true;
+        // queue is empty but will be appended to eventually, wait for that
+        if (!task_drained[pop_cursor]) return false;
+
+        // Finished draining current scan task, enqueue a new one
+        ++pop_cursor;
+        // Must unlock since serial task group will execute synchronously
+        lock.unlock();
+        PushScanTask();
+        lock.lock();
+      }
+      DCHECK(no_more_tasks);
+      // all scan tasks drained (or getting next task failed), terminate
+      return true;
+    });
+
+    if (pop_cursor == task_batches.size()) {
+      // Don't report an error until we yield up everything we can first
+      RETURN_NOT_OK(iteration_error);
+      return nullptr;
+    }
+
+    auto batch = std::move(task_batches[pop_cursor].front());
+    task_batches[pop_cursor].pop_front();
+    return batch;
+  }
+};
+
+constexpr int kToBatchesReadaheadLevel = 2;

Review comment:
       So we have scan task readahead but no limit on the batch readahead?  
This is probably ok.  If a user really needed it they could use scan.  Both IPC 
and CSV readers are single-scan-task multiple-batches so it would mean there is 
no memory pressure for the IPC and CSV readers.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to