westonpace commented on a change in pull request #9589:
URL: https://github.com/apache/arrow/pull/9589#discussion_r583932908
##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -224,5 +225,100 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() {
FlattenRecordBatchVector(std::move(state->batches)));
}
+struct ToBatchesState {
+ explicit ToBatchesState(size_t n_tasks)
+ : batches(n_tasks), task_drained(n_tasks, false) {}
+
+ /// Protecting mutating accesses to batches
+ std::mutex mutex;
+ std::vector<std::deque<std::shared_ptr<RecordBatch>>> batches;
+ std::vector<bool> task_drained;
+ size_t pop_cursor = 0;
+
+ void Push(std::shared_ptr<RecordBatch> b, size_t i_task) {
+ std::lock_guard<std::mutex> lock(mutex);
+ if (batches.size() <= i_task) {
+ batches.resize(i_task + 1);
+ task_drained.resize(i_task + 1);
+ }
+ batches[i_task].push_back(std::move(b));
+ }
+
+ Status Finish(size_t position) {
+ std::lock_guard<std::mutex> lock(mutex);
+ task_drained[position] = true;
+ return Status::OK();
+ }
+
+ std::shared_ptr<RecordBatch> Pop() {
+ std::unique_lock<std::mutex> lock(mutex);
+ std::condition_variable().wait_for(lock, std::chrono::milliseconds{1},
[this] {
Review comment:
This seems a little off. It looks like you're saying "If the current
batch is still being filled then wait up to 1ms for it to add a new item." But
what happens if that 1ms expires? Also, it looks like you're ignoring the
return of `wait_for`.
For example, let's pretend that there are two scan tasks.
Scan task 1 RB 1 arrives at 10ms
Scan task 1 RB 1 arrives at 20ms
Scan task 2 RB 1 arrives at 30ms
The consumer grabs scan tasks very quickly. So they come in at timestamp
15ms and they try to `Pop`. batches[0]. It will sit in this loop for about
1ms (because tasks_drained[0] == false) and then break out. Then pop_cursor ==
batches.size() will be true and so it will return null.
##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -224,5 +225,100 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() {
FlattenRecordBatchVector(std::move(state->batches)));
}
+struct ToBatchesState {
+ explicit ToBatchesState(size_t n_tasks)
+ : batches(n_tasks), task_drained(n_tasks, false) {}
+
+ /// Protecting mutating accesses to batches
+ std::mutex mutex;
+ std::vector<std::deque<std::shared_ptr<RecordBatch>>> batches;
+ std::vector<bool> task_drained;
+ size_t pop_cursor = 0;
+
+ void Push(std::shared_ptr<RecordBatch> b, size_t i_task) {
+ std::lock_guard<std::mutex> lock(mutex);
+ if (batches.size() <= i_task) {
+ batches.resize(i_task + 1);
+ task_drained.resize(i_task + 1);
+ }
+ batches[i_task].push_back(std::move(b));
+ }
+
+ Status Finish(size_t position) {
+ std::lock_guard<std::mutex> lock(mutex);
+ task_drained[position] = true;
+ return Status::OK();
+ }
+
+ std::shared_ptr<RecordBatch> Pop() {
+ std::unique_lock<std::mutex> lock(mutex);
+ std::condition_variable().wait_for(lock, std::chrono::milliseconds{1},
[this] {
+ while (pop_cursor < batches.size()) {
+ // queue for current scan task contains at least one batch, pop that
+ if (!batches[pop_cursor].empty()) return true;
+
+ // queue is empty but will be appended to eventually, wait for that
+ if (!task_drained[pop_cursor]) return false;
+
+ ++pop_cursor;
+ }
+ // all scan tasks drained, terminate
+ return true;
+ });
+
+ if (pop_cursor == batches.size()) return nullptr;
+
+ auto batch = std::move(batches[pop_cursor].front());
+ batches[pop_cursor].pop_front();
+ return batch;
+ }
+};
+
+Result<RecordBatchIterator> Scanner::ToBatches() {
+ ARROW_ASSIGN_OR_RAISE(auto scan_task_it, Scan());
+ ARROW_ASSIGN_OR_RAISE(auto scan_task_vector, scan_task_it.ToVector());
+
+ auto task_group = scan_context_->TaskGroup();
+ auto state = std::make_shared<ToBatchesState>(scan_task_vector.size());
+
+ size_t scan_task_id = 0;
+ for (auto scan_task : scan_task_vector) {
+ auto id = scan_task_id++;
+ task_group->Append([state, id, scan_task] {
Review comment:
Again, what I'm working on will work around this so maybe not stress at
the moment but there's no back-pressure here. If the batch consumer is not
fast enough and the dataset is larger than RAM the system will run out of RAM.
It's a bit odd because you are fixing ARROW-11800 here (though without
pressure) and then I'll be breaking it again with my implementation (the first
pass of my impl will have back pressure and parse loaded buffers a bit more
serially)
##########
File path: cpp/src/arrow/dataset/scanner.cc
##########
@@ -224,5 +225,100 @@ Result<std::shared_ptr<Table>> Scanner::ToTable() {
FlattenRecordBatchVector(std::move(state->batches)));
}
+struct ToBatchesState {
+ explicit ToBatchesState(size_t n_tasks)
+ : batches(n_tasks), task_drained(n_tasks, false) {}
+
+ /// Protecting mutating accesses to batches
+ std::mutex mutex;
+ std::vector<std::deque<std::shared_ptr<RecordBatch>>> batches;
+ std::vector<bool> task_drained;
+ size_t pop_cursor = 0;
+
+ void Push(std::shared_ptr<RecordBatch> b, size_t i_task) {
+ std::lock_guard<std::mutex> lock(mutex);
+ if (batches.size() <= i_task) {
+ batches.resize(i_task + 1);
+ task_drained.resize(i_task + 1);
+ }
+ batches[i_task].push_back(std::move(b));
+ }
+
+ Status Finish(size_t position) {
+ std::lock_guard<std::mutex> lock(mutex);
+ task_drained[position] = true;
+ return Status::OK();
+ }
+
+ std::shared_ptr<RecordBatch> Pop() {
+ std::unique_lock<std::mutex> lock(mutex);
+ std::condition_variable().wait_for(lock, std::chrono::milliseconds{1},
[this] {
+ while (pop_cursor < batches.size()) {
+ // queue for current scan task contains at least one batch, pop that
+ if (!batches[pop_cursor].empty()) return true;
+
+ // queue is empty but will be appended to eventually, wait for that
+ if (!task_drained[pop_cursor]) return false;
+
+ ++pop_cursor;
+ }
+ // all scan tasks drained, terminate
+ return true;
+ });
+
+ if (pop_cursor == batches.size()) return nullptr;
+
+ auto batch = std::move(batches[pop_cursor].front());
+ batches[pop_cursor].pop_front();
+ return batch;
+ }
+};
+
+Result<RecordBatchIterator> Scanner::ToBatches() {
+ ARROW_ASSIGN_OR_RAISE(auto scan_task_it, Scan());
+ ARROW_ASSIGN_OR_RAISE(auto scan_task_vector, scan_task_it.ToVector());
Review comment:
I'll be replacing this with something better so I don't know how much we
care to worry but this is not ideal. For example, with parquet, this would
fetch metadata for every file in the scan before starting to read any
individual file. It introduces more latency than necessary.
Also, I'm not sure how this will interact with parquet preloading.
##########
File path: cpp/src/arrow/dataset/scanner.h
##########
@@ -163,12 +164,20 @@ class ARROW_DS_EXPORT Scanner {
/// in a concurrent fashion and outlive the iterator.
Result<ScanTaskIterator> Scan();
+ /// \brief Apply a visitor to each RecordBatch as it is scanned. If multiple
+ /// threads are used, the visitor will be invoked from those threads and is
+ /// responsible for any synchronization.
+ Status Scan(std::function<Status(std::shared_ptr<RecordBatch>)> visitor);
Review comment:
Why both `Scan(visitor)` and `ToBatches`? Couldn't you just do
`ToBatches().Visit(visitor)`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]