lidavidm commented on a change in pull request #9644: URL: https://github.com/apache/arrow/pull/9644#discussion_r597663681
########## File path: cpp/src/arrow/record_batch.h ########## @@ -207,6 +208,14 @@ class ARROW_EXPORT RecordBatchReader { /// \return Status virtual Status ReadNext(std::shared_ptr<RecordBatch>* batch) = 0; + // Fallback to sync implementation until all other readers are converted(ARROW-11770) + // and then this could become pure virtual with ReadNext falling back to async impl. + virtual Future<std::shared_ptr<RecordBatch>> ReadNextAsync() { + std::shared_ptr<RecordBatch> batch; + ARROW_RETURN_NOT_OK(ReadNext(&batch)); + return Future<std::shared_ptr<RecordBatch>>::MakeFinished(std::move(batch)); + } + Review comment: It looks like across CSV, Parquet, and Feather, we now have two distinct approaches to async reading: here we add a method to asynchronously read the next batch, while in Parquet/Feather we add a method to convert a reader to a generator of batches. We should probably pick one for consistency's sake. ########## File path: cpp/src/arrow/util/thread_pool.h ########## @@ -102,16 +102,34 @@ class ARROW_EXPORT Executor { // CPU heavy work off the I/O thread pool. So the I/O task should transfer // the future to the CPU executor before returning. template <typename T> - Future<T> Transfer(Future<T> future) { + Future<T> Transfer(Future<T> future, bool force_spawn = false) { auto transferred = Future<T>::Make(); - future.AddCallback([this, transferred](const Result<T>& result) mutable { + auto callback = [this, transferred](const Result<T>& result) mutable { auto spawn_status = Spawn([transferred, result]() mutable { transferred.MarkFinished(std::move(result)); }); if (!spawn_status.ok()) { transferred.MarkFinished(spawn_status); } - }); + }; + auto callback_factory = [&callback]() { return callback; }; + auto callback_added = future.TryAddCallback(callback_factory); + if (!callback_added) { + if (force_spawn) { + auto spawn_status = Spawn([future, transferred]() mutable { + transferred.MarkFinished(future.result()); + }); + if (!spawn_status.ok()) { + transferred.MarkFinished(spawn_status); + } + return transferred; Review comment: I'm a little confused what the new parameter accomplishes. Since the new future essentially completes immediately anyways, there's still no guarantee that future callers of transferred.AddCallback won't just have their callbacks synchronously executed as soon as they add them on the current thread. ########## File path: cpp/src/arrow/csv/reader.cc ########## @@ -679,101 +687,141 @@ class BaseStreamingReader : public ReaderMixin, public csv::StreamingReader { std::vector<std::shared_ptr<ColumnDecoder>> column_decoders_; std::shared_ptr<Schema> schema_; std::shared_ptr<RecordBatch> pending_batch_; - Iterator<std::shared_ptr<Buffer>> buffer_iterator_; + AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator_; + Executor* cpu_executor_; bool eof_ = false; }; ///////////////////////////////////////////////////////////////////////// // Serial StreamingReader implementation -class SerialStreamingReader : public BaseStreamingReader { +class SerialStreamingReader : public BaseStreamingReader, + public std::enable_shared_from_this<SerialStreamingReader> { public: using BaseStreamingReader::BaseStreamingReader; - Status Init() override { + Future<std::shared_ptr<csv::StreamingReader>> Init() override { ARROW_ASSIGN_OR_RAISE(auto istream_it, io::MakeInputStreamIterator(input_, read_options_.block_size)); - // Since we're converting serially, no need to readahead more than one block - int32_t block_queue_size = 1; - ARROW_ASSIGN_OR_RAISE(auto rh_it, - MakeReadaheadIterator(std::move(istream_it), block_queue_size)); - buffer_iterator_ = CSVBufferIterator::Make(std::move(rh_it)); - task_group_ = internal::TaskGroup::MakeSerial(stop_token_); + ARROW_ASSIGN_OR_RAISE(auto bg_it, MakeBackgroundGenerator(std::move(istream_it), + io_context_.executor())); + + auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor_, true); + auto rh_it = MakeSerialReadaheadGenerator(std::move(transferred_it), 2); Review comment: The readahead here probably needs to be configurable eventually. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org