westonpace commented on code in PR #13931:
URL: https://github.com/apache/arrow/pull/13931#discussion_r950606781


##########
cpp/src/arrow/csv/reader.cc:
##########
@@ -545,9 +546,10 @@ class BlockDecodingOperator {
 /////////////////////////////////////////////////////////////////////////
 // Base class for common functionality
 
+template <typename T = std::shared_ptr<io::InputStream>>

Review Comment:
   I'm not sure this makes sense as a default parameter.  A reader probably 
wouldn't intuitively guess this.  Let's just force it to be specified.



##########
cpp/src/arrow/csv/reader.cc:
##########
@@ -970,6 +972,181 @@ class StreamingReaderImpl : public ReaderMixin,
   std::shared_ptr<std::atomic<int64_t>> bytes_decoded_;
 };
 
+
+
+// this is just like a MapGenerator but the map fun returns a thing instead of 
a future
+template <typename T, typename ApplyFn,
+          typename Applied = arrow::detail::result_of_t<ApplyFn(const T&)>,
+          typename V = typename EnsureResult<Applied>::type::ValueType>
+AsyncGenerator<V> MakeApplyGenerator(AsyncGenerator<T> source_gen, ApplyFn 
apply_fun, internal::Executor* cpu_exec) {

Review Comment:
   I think this should actually go in `async_generator.h`.  I think I told you 
otherwise once before but I was thinking it was going to be more tailored to 
the CSV reader.  This seems pretty generic.  Also, it should ideally be 
independently unit tested with a few tests.



##########
cpp/src/arrow/dataset/file_csv.cc:
##########
@@ -184,16 +186,45 @@ static inline 
Future<std::shared_ptr<csv::StreamingReader>> OpenReaderAsync(
   auto span = 
tracer->StartSpan("arrow::dataset::CsvFileFormat::OpenReaderAsync");
 #endif
   ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, 
scan_options));
-
-  ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed());
   const auto& path = source.path();
-  ARROW_ASSIGN_OR_RAISE(
+
+  
+  auto actual_compression = Compression::type::UNCOMPRESSED;
+    // Guess compression from file extension
+  auto extension = fs::internal::GetAbstractPathExtension(path);
+  if (extension == "gz") {
+    actual_compression = Compression::type::GZIP;
+  } else {
+    auto maybe_compression = util::Codec::GetCompressionType(extension);
+    if (maybe_compression.ok()) {
+      ARROW_ASSIGN_OR_RAISE(actual_compression, maybe_compression);
+    }
+  }
+  
+  Future<std::shared_ptr<csv::StreamingReader>> reader_fut;
+
+  if (actual_compression == Compression::type::UNCOMPRESSED) {
+    ARROW_ASSIGN_OR_RAISE(auto input, source.Open() )
+    reader_fut = DeferNotOk(input->io_context().executor()->Submit(
+      [=]() -> Future<std::shared_ptr<csv::StreamingReader>> {
+        ARROW_ASSIGN_OR_RAISE(auto temp_first_block, input->ReadAt(0, 
reader_options.block_size));
+        RETURN_NOT_OK(input->Seek(0));

Review Comment:
   Can you create a follow-up JIRA to remove this double-read?  I don't think 
we need to worry about it now but I'd like to get away from it at some point.



##########
cpp/src/arrow/io/interfaces.h:
##########
@@ -343,5 +344,8 @@ ARROW_EXPORT
 Result<Iterator<std::shared_ptr<Buffer>>> MakeInputStreamIterator(
     std::shared_ptr<InputStream> stream, int64_t block_size);
 
+ARROW_EXPORT
+Result<AsyncGenerator<std::shared_ptr<Buffer>>> 
MakeRandomAccessFileGenerator(std::shared_ptr<RandomAccessFile> file, int64_t 
block_size);

Review Comment:
   We will want to document this to point out that it is async reentrant (e.g. 
can be read with a readahead generator).  Or, maybe this function could apply 
the readahead generator also?  So it would be:
   
   ```
    Result<AsyncGenerator<std::shared_ptr<Buffer>>> 
MakeRandomAccessFileGenerator(std::shared_ptr<RandomAccessFile> file, int64_t 
block_size, int readahead);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to