pitrou commented on a change in pull request #9644:
URL: https://github.com/apache/arrow/pull/9644#discussion_r601474004



##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -672,101 +687,142 @@ class BaseStreamingReader : public ReaderMixin, public 
csv::StreamingReader {
   std::vector<std::shared_ptr<ColumnDecoder>> column_decoders_;
   std::shared_ptr<Schema> schema_;
   std::shared_ptr<RecordBatch> pending_batch_;
-  Iterator<std::shared_ptr<Buffer>> buffer_iterator_;
+  AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator_;
+  Executor* cpu_executor_;
   bool eof_ = false;
 };
 
 /////////////////////////////////////////////////////////////////////////
 // Serial StreamingReader implementation
 
-class SerialStreamingReader : public BaseStreamingReader {
+class SerialStreamingReader : public BaseStreamingReader,
+                              public 
std::enable_shared_from_this<SerialStreamingReader> {
  public:
   using BaseStreamingReader::BaseStreamingReader;
 
-  Status Init() override {
+  Future<std::shared_ptr<csv::StreamingReader>> Init() override {
     ARROW_ASSIGN_OR_RAISE(auto istream_it,
                           io::MakeInputStreamIterator(input_, 
read_options_.block_size));
 
-    // Since we're converting serially, no need to readahead more than one 
block
-    int32_t block_queue_size = 1;
-    ARROW_ASSIGN_OR_RAISE(auto rh_it,
-                          MakeReadaheadIterator(std::move(istream_it), 
block_queue_size));
-    buffer_iterator_ = CSVBufferIterator::Make(std::move(rh_it));
-    task_group_ = internal::TaskGroup::MakeSerial(stop_token_);
+    ARROW_ASSIGN_OR_RAISE(auto bg_it, 
MakeBackgroundGenerator(std::move(istream_it),
+                                                              
io_context_.executor()));
+
+    auto rh_it = MakeSerialReadaheadGenerator(std::move(bg_it), 8);
+
+    auto transferred_it = MakeTransferredGenerator(rh_it, cpu_executor_);
 
+    buffer_generator_ = 
CSVBufferIterator::MakeAsync(std::move(transferred_it));
+    task_group_ = internal::TaskGroup::MakeSerial(io_context_.stop_token());
+
+    auto self = shared_from_this();
     // Read schema from first batch
-    ARROW_ASSIGN_OR_RAISE(pending_batch_, ReadNext());
-    DCHECK_NE(schema_, nullptr);
-    return Status::OK();
+    return ReadNextAsync().Then([self](const std::shared_ptr<RecordBatch>& 
first_batch)
+                                    -> 
Result<std::shared_ptr<csv::StreamingReader>> {
+      self->pending_batch_ = first_batch;
+      DCHECK_NE(self->schema_, nullptr);
+      return self;
+    });
   }
 
- protected:
-  Result<std::shared_ptr<RecordBatch>> ReadNext() override {
-    if (eof_) {
-      return nullptr;
-    }
-    if (stop_token_.IsStopRequested()) {
-      eof_ = true;
-      return stop_token_.Poll();
-    }
-    if (!block_iterator_) {
-      Status st = SetupReader();
-      if (!st.ok()) {
-        // Can't setup reader => bail out
-        eof_ = true;
-        return st;
-      }
+  Result<std::shared_ptr<RecordBatch>> DecodeBatchAndUpdateSchema() {
+    auto maybe_batch = DecodeNextBatch();

Review comment:
       `ColumnDecoder` is used only by the streaming reader. In the current 
behaviour, the blocks are appended before they are waited on, so the waiting in 
`ColumnDecoder::NextChunk` returns immediately. But by switching to async 
execution, the blocks can be appended after the waiting starts.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to