lidavidm commented on a change in pull request #9644:
URL: https://github.com/apache/arrow/pull/9644#discussion_r597663681



##########
File path: cpp/src/arrow/record_batch.h
##########
@@ -207,6 +208,14 @@ class ARROW_EXPORT RecordBatchReader {
   /// \return Status
   virtual Status ReadNext(std::shared_ptr<RecordBatch>* batch) = 0;
 
+  // Fallback to sync implementation until all other readers are 
converted(ARROW-11770)
+  // and then this could become pure virtual with ReadNext falling back to 
async impl.
+  virtual Future<std::shared_ptr<RecordBatch>> ReadNextAsync() {
+    std::shared_ptr<RecordBatch> batch;
+    ARROW_RETURN_NOT_OK(ReadNext(&batch));
+    return 
Future<std::shared_ptr<RecordBatch>>::MakeFinished(std::move(batch));
+  }
+

Review comment:
       It looks like across CSV, Parquet, and Feather, we now have two distinct 
approaches to async reading: here we add a method to asynchronously read the 
next batch, while in Parquet/Feather we add a method to convert a reader to a 
generator of batches. We should probably pick one for consistency's sake.

##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -102,16 +102,34 @@ class ARROW_EXPORT Executor {
   // CPU heavy work off the I/O thread pool.  So the I/O task should transfer
   // the future to the CPU executor before returning.
   template <typename T>
-  Future<T> Transfer(Future<T> future) {
+  Future<T> Transfer(Future<T> future, bool force_spawn = false) {
     auto transferred = Future<T>::Make();
-    future.AddCallback([this, transferred](const Result<T>& result) mutable {
+    auto callback = [this, transferred](const Result<T>& result) mutable {
       auto spawn_status = Spawn([transferred, result]() mutable {
         transferred.MarkFinished(std::move(result));
       });
       if (!spawn_status.ok()) {
         transferred.MarkFinished(spawn_status);
       }
-    });
+    };
+    auto callback_factory = [&callback]() { return callback; };
+    auto callback_added = future.TryAddCallback(callback_factory);
+    if (!callback_added) {
+      if (force_spawn) {
+        auto spawn_status = Spawn([future, transferred]() mutable {
+          transferred.MarkFinished(future.result());
+        });
+        if (!spawn_status.ok()) {
+          transferred.MarkFinished(spawn_status);
+        }
+        return transferred;

Review comment:
       I'm a little confused what the new parameter accomplishes. Since the new 
future essentially completes immediately anyways, there's still no guarantee 
that future callers of transferred.AddCallback won't just have their callbacks 
synchronously executed as soon as they add them on the current thread.

##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -679,101 +687,141 @@ class BaseStreamingReader : public ReaderMixin, public 
csv::StreamingReader {
   std::vector<std::shared_ptr<ColumnDecoder>> column_decoders_;
   std::shared_ptr<Schema> schema_;
   std::shared_ptr<RecordBatch> pending_batch_;
-  Iterator<std::shared_ptr<Buffer>> buffer_iterator_;
+  AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator_;
+  Executor* cpu_executor_;
   bool eof_ = false;
 };
 
 /////////////////////////////////////////////////////////////////////////
 // Serial StreamingReader implementation
 
-class SerialStreamingReader : public BaseStreamingReader {
+class SerialStreamingReader : public BaseStreamingReader,
+                              public 
std::enable_shared_from_this<SerialStreamingReader> {
  public:
   using BaseStreamingReader::BaseStreamingReader;
 
-  Status Init() override {
+  Future<std::shared_ptr<csv::StreamingReader>> Init() override {
     ARROW_ASSIGN_OR_RAISE(auto istream_it,
                           io::MakeInputStreamIterator(input_, 
read_options_.block_size));
 
-    // Since we're converting serially, no need to readahead more than one 
block
-    int32_t block_queue_size = 1;
-    ARROW_ASSIGN_OR_RAISE(auto rh_it,
-                          MakeReadaheadIterator(std::move(istream_it), 
block_queue_size));
-    buffer_iterator_ = CSVBufferIterator::Make(std::move(rh_it));
-    task_group_ = internal::TaskGroup::MakeSerial(stop_token_);
+    ARROW_ASSIGN_OR_RAISE(auto bg_it, 
MakeBackgroundGenerator(std::move(istream_it),
+                                                              
io_context_.executor()));
+
+    auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor_, true);
 
+    auto rh_it = MakeSerialReadaheadGenerator(std::move(transferred_it), 2);

Review comment:
       The readahead here probably needs to be configurable eventually.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to