westonpace commented on a change in pull request #9644:
URL: https://github.com/apache/arrow/pull/9644#discussion_r598046231



##########
File path: cpp/src/arrow/csv/reader.cc
##########
@@ -679,101 +687,141 @@ class BaseStreamingReader : public ReaderMixin, public 
csv::StreamingReader {
   std::vector<std::shared_ptr<ColumnDecoder>> column_decoders_;
   std::shared_ptr<Schema> schema_;
   std::shared_ptr<RecordBatch> pending_batch_;
-  Iterator<std::shared_ptr<Buffer>> buffer_iterator_;
+  AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator_;
+  Executor* cpu_executor_;
   bool eof_ = false;
 };
 
 /////////////////////////////////////////////////////////////////////////
 // Serial StreamingReader implementation
 
-class SerialStreamingReader : public BaseStreamingReader {
+class SerialStreamingReader : public BaseStreamingReader,
+                              public 
std::enable_shared_from_this<SerialStreamingReader> {
  public:
   using BaseStreamingReader::BaseStreamingReader;
 
-  Status Init() override {
+  Future<std::shared_ptr<csv::StreamingReader>> Init() override {
     ARROW_ASSIGN_OR_RAISE(auto istream_it,
                           io::MakeInputStreamIterator(input_, 
read_options_.block_size));
 
-    // Since we're converting serially, no need to readahead more than one 
block
-    int32_t block_queue_size = 1;
-    ARROW_ASSIGN_OR_RAISE(auto rh_it,
-                          MakeReadaheadIterator(std::move(istream_it), 
block_queue_size));
-    buffer_iterator_ = CSVBufferIterator::Make(std::move(rh_it));
-    task_group_ = internal::TaskGroup::MakeSerial(stop_token_);
+    ARROW_ASSIGN_OR_RAISE(auto bg_it, 
MakeBackgroundGenerator(std::move(istream_it),
+                                                              
io_context_.executor()));
+
+    auto transferred_it = MakeTransferredGenerator(bg_it, cpu_executor_, true);
 
+    auto rh_it = MakeSerialReadaheadGenerator(std::move(transferred_it), 2);

Review comment:
       I ended up needing to bump it up to 8 from some profiling.  In the other 
CSV reader it was based on the parallelism of the executor.  This isn't 
necessarily right.
   
   I don't think it will end up being configurable though, at least not here.  
In the future I would like to convert readahead from a bunch of fixed constants 
(or arguments) to a decision based on the amount of available RAM (with an 
option to limit the RAM usage of the operation).  Although perhaps then there 
is the risk that too much RAM is put into a quick spot so there may still need 
to be a max limit.  I think this is something that can be tackled later.  I'll 
make a follow up JIRA and tag these spots.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to