pitrou commented on a change in pull request #9644:
URL: https://github.com/apache/arrow/pull/9644#discussion_r601281174



##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -672,101 +687,142 @@ class BaseStreamingReader : public ReaderMixin, public 
csv::StreamingReader {
   std::vector<std::shared_ptr<ColumnDecoder>> column_decoders_;
   std::shared_ptr<Schema> schema_;
   std::shared_ptr<RecordBatch> pending_batch_;
-  Iterator<std::shared_ptr<Buffer>> buffer_iterator_;
+  AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator_;
+  Executor* cpu_executor_;
   bool eof_ = false;
 };
 
 /////////////////////////////////////////////////////////////////////////
 // Serial StreamingReader implementation
 
-class SerialStreamingReader : public BaseStreamingReader {
+class SerialStreamingReader : public BaseStreamingReader,
+                              public 
std::enable_shared_from_this<SerialStreamingReader> {
  public:
   using BaseStreamingReader::BaseStreamingReader;
 
-  Status Init() override {
+  Future<std::shared_ptr<csv::StreamingReader>> Init() override {
     ARROW_ASSIGN_OR_RAISE(auto istream_it,
                           io::MakeInputStreamIterator(input_, 
read_options_.block_size));
 
-    // Since we're converting serially, no need to readahead more than one 
block
-    int32_t block_queue_size = 1;
-    ARROW_ASSIGN_OR_RAISE(auto rh_it,
-                          MakeReadaheadIterator(std::move(istream_it), 
block_queue_size));
-    buffer_iterator_ = CSVBufferIterator::Make(std::move(rh_it));
-    task_group_ = internal::TaskGroup::MakeSerial(stop_token_);
+    ARROW_ASSIGN_OR_RAISE(auto bg_it, 
MakeBackgroundGenerator(std::move(istream_it),
+                                                              
io_context_.executor()));
+
+    auto rh_it = MakeSerialReadaheadGenerator(std::move(bg_it), 8);

Review comment:
       Is there a reason for 8? 

##########
File path: cpp/src/arrow/util/thread_pool.h
##########
@@ -104,15 +104,22 @@ class ARROW_EXPORT Executor {
   template <typename T>
   Future<T> Transfer(Future<T> future) {
     auto transferred = Future<T>::Make();
-    future.AddCallback([this, transferred](const Result<T>& result) mutable {
+    auto callback = [this, transferred](const Result<T>& result) mutable {
       auto spawn_status = Spawn([transferred, result]() mutable {
         transferred.MarkFinished(std::move(result));
       });
       if (!spawn_status.ok()) {
         transferred.MarkFinished(spawn_status);
       }
-    });
-    return transferred;
+    };
+    auto callback_factory = [&callback]() { return callback; };
+    if (future.TryAddCallback(callback_factory)) {
+      return transferred;
+    }
+    // If the future is already finished and we aren't going to force spawn a 
thread
+    // then we don't need to add another layer of callback and can return the 
original
+    // future

Review comment:
       Doesn't this change the semantics? I may want `Transfer` to always call 
the returned future's callbacks on the thread pool. It seems this breaks the 
guarantee. Perhaps you want to add a `TransferIfNotFinished` instead.

##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -199,6 +199,19 @@ class SerialBlockReader : public BlockReader {
     return MakeTransformedIterator(std::move(buffer_iterator), 
block_reader_fn);
   }
 
+  static AsyncGenerator<CSVBlock> MakeAsyncIterator(
+      AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator,
+      std::unique_ptr<Chunker> chunker, std::shared_ptr<Buffer> first_buffer) {
+    auto block_reader =
+        std::make_shared<SerialBlockReader>(std::move(chunker), first_buffer);
+    // Wrap shared pointer in callable
+    Transformer<std::shared_ptr<Buffer>, CSVBlock> block_reader_fn =
+        [block_reader](std::shared_ptr<Buffer> next) {
+          return (*block_reader)(std::move(next));
+        };
+    return MakeAsyncGenerator(std::move(buffer_generator), block_reader_fn);

Review comment:
       Hmm, why didn't we call this `MakeTransformedGenerator`?

##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -574,37 +585,41 @@ class BaseTableReader : public ReaderMixin, public 
csv::TableReader {
 
 class BaseStreamingReader : public ReaderMixin, public csv::StreamingReader {
  public:
-  using ReaderMixin::ReaderMixin;
+  BaseStreamingReader(io::IOContext io_context, Executor* cpu_executor,
+                      std::shared_ptr<io::InputStream> input,
+                      const ReadOptions& read_options, const ParseOptions& 
parse_options,
+                      const ConvertOptions& convert_options)
+      : ReaderMixin(io_context, std::move(input), read_options, parse_options,
+                    convert_options),
+        cpu_executor_(cpu_executor) {}
 
-  virtual Status Init() = 0;
+  virtual Future<std::shared_ptr<csv::StreamingReader>> Init() = 0;
 
   std::shared_ptr<Schema> schema() const override { return schema_; }
 
   Status ReadNext(std::shared_ptr<RecordBatch>* batch) override {
-    do {
-      RETURN_NOT_OK(ReadNext().Value(batch));
-    } while (*batch != nullptr && (*batch)->num_rows() == 0);
+    auto next_fut = ReadNextAsync();
+    auto next_result = next_fut.result();
+    ARROW_ASSIGN_OR_RAISE(*batch, next_result);
     return Status::OK();

Review comment:
       Or simply `return next_result.Value(&batch);`

##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -672,101 +687,142 @@ class BaseStreamingReader : public ReaderMixin, public 
csv::StreamingReader {
   std::vector<std::shared_ptr<ColumnDecoder>> column_decoders_;
   std::shared_ptr<Schema> schema_;
   std::shared_ptr<RecordBatch> pending_batch_;
-  Iterator<std::shared_ptr<Buffer>> buffer_iterator_;
+  AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator_;
+  Executor* cpu_executor_;
   bool eof_ = false;
 };
 
 /////////////////////////////////////////////////////////////////////////
 // Serial StreamingReader implementation
 
-class SerialStreamingReader : public BaseStreamingReader {
+class SerialStreamingReader : public BaseStreamingReader,
+                              public 
std::enable_shared_from_this<SerialStreamingReader> {
  public:
   using BaseStreamingReader::BaseStreamingReader;
 
-  Status Init() override {
+  Future<std::shared_ptr<csv::StreamingReader>> Init() override {
     ARROW_ASSIGN_OR_RAISE(auto istream_it,
                           io::MakeInputStreamIterator(input_, 
read_options_.block_size));
 
-    // Since we're converting serially, no need to readahead more than one 
block
-    int32_t block_queue_size = 1;
-    ARROW_ASSIGN_OR_RAISE(auto rh_it,
-                          MakeReadaheadIterator(std::move(istream_it), 
block_queue_size));
-    buffer_iterator_ = CSVBufferIterator::Make(std::move(rh_it));
-    task_group_ = internal::TaskGroup::MakeSerial(stop_token_);
+    ARROW_ASSIGN_OR_RAISE(auto bg_it, 
MakeBackgroundGenerator(std::move(istream_it),
+                                                              
io_context_.executor()));
+
+    auto rh_it = MakeSerialReadaheadGenerator(std::move(bg_it), 8);
+
+    auto transferred_it = MakeTransferredGenerator(rh_it, cpu_executor_);
 
+    buffer_generator_ = 
CSVBufferIterator::MakeAsync(std::move(transferred_it));
+    task_group_ = internal::TaskGroup::MakeSerial(io_context_.stop_token());
+
+    auto self = shared_from_this();
     // Read schema from first batch
-    ARROW_ASSIGN_OR_RAISE(pending_batch_, ReadNext());
-    DCHECK_NE(schema_, nullptr);
-    return Status::OK();
+    return ReadNextAsync().Then([self](const std::shared_ptr<RecordBatch>& 
first_batch)
+                                    -> 
Result<std::shared_ptr<csv::StreamingReader>> {
+      self->pending_batch_ = first_batch;
+      DCHECK_NE(self->schema_, nullptr);
+      return self;
+    });
   }
 
- protected:
-  Result<std::shared_ptr<RecordBatch>> ReadNext() override {
-    if (eof_) {
-      return nullptr;
-    }
-    if (stop_token_.IsStopRequested()) {
-      eof_ = true;
-      return stop_token_.Poll();
-    }
-    if (!block_iterator_) {
-      Status st = SetupReader();
-      if (!st.ok()) {
-        // Can't setup reader => bail out
-        eof_ = true;
-        return st;
-      }
+  Result<std::shared_ptr<RecordBatch>> DecodeBatchAndUpdateSchema() {
+    auto maybe_batch = DecodeNextBatch();

Review comment:
       This call will block until the next CSV block is parsed and appended to 
the column decoders.
   I think you have to make `ColumnDecoder` async instead...
   (see `ColumnDecoder::NextChunk`)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to