westonpace commented on code in PR #13931:
URL: https://github.com/apache/arrow/pull/13931#discussion_r950606781
##########
cpp/src/arrow/csv/reader.cc:
##########
@@ -545,9 +546,10 @@ class BlockDecodingOperator {
/////////////////////////////////////////////////////////////////////////
// Base class for common functionality
+template <typename T = std::shared_ptr<io::InputStream>>
Review Comment:
I'm not sure this makes sense as a default parameter. A reader probably
wouldn't intuitively guess this. Let's just force it to be specified.
##########
cpp/src/arrow/csv/reader.cc:
##########
@@ -970,6 +972,181 @@ class StreamingReaderImpl : public ReaderMixin,
std::shared_ptr<std::atomic<int64_t>> bytes_decoded_;
};
+
+
+// this is just like a MapGenerator but the map fun returns a thing instead of
a future
+template <typename T, typename ApplyFn,
+ typename Applied = arrow::detail::result_of_t<ApplyFn(const T&)>,
+ typename V = typename EnsureResult<Applied>::type::ValueType>
+AsyncGenerator<V> MakeApplyGenerator(AsyncGenerator<T> source_gen, ApplyFn
apply_fun, internal::Executor* cpu_exec) {
Review Comment:
I think this should actually go in `async_generator.h`. I think I told you
otherwise once before but I was thinking it was going to be more tailored to
the CSV reader. This seems pretty generic. Also, it should ideally be
independently unit tested with a few tests.
##########
cpp/src/arrow/dataset/file_csv.cc:
##########
@@ -184,16 +186,45 @@ static inline
Future<std::shared_ptr<csv::StreamingReader>> OpenReaderAsync(
auto span =
tracer->StartSpan("arrow::dataset::CsvFileFormat::OpenReaderAsync");
#endif
ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format,
scan_options));
-
- ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed());
const auto& path = source.path();
- ARROW_ASSIGN_OR_RAISE(
+
+
+ auto actual_compression = Compression::type::UNCOMPRESSED;
+ // Guess compression from file extension
+ auto extension = fs::internal::GetAbstractPathExtension(path);
+ if (extension == "gz") {
+ actual_compression = Compression::type::GZIP;
+ } else {
+ auto maybe_compression = util::Codec::GetCompressionType(extension);
+ if (maybe_compression.ok()) {
+ ARROW_ASSIGN_OR_RAISE(actual_compression, maybe_compression);
+ }
+ }
+
+ Future<std::shared_ptr<csv::StreamingReader>> reader_fut;
+
+ if (actual_compression == Compression::type::UNCOMPRESSED) {
+ ARROW_ASSIGN_OR_RAISE(auto input, source.Open() )
+ reader_fut = DeferNotOk(input->io_context().executor()->Submit(
+ [=]() -> Future<std::shared_ptr<csv::StreamingReader>> {
+ ARROW_ASSIGN_OR_RAISE(auto temp_first_block, input->ReadAt(0,
reader_options.block_size));
+ RETURN_NOT_OK(input->Seek(0));
Review Comment:
Can you create a follow-up JIRA to remove this double-read? I don't think
we need to worry about it now but I'd like to get away from it at some point.
##########
cpp/src/arrow/io/interfaces.h:
##########
@@ -343,5 +344,8 @@ ARROW_EXPORT
Result<Iterator<std::shared_ptr<Buffer>>> MakeInputStreamIterator(
std::shared_ptr<InputStream> stream, int64_t block_size);
+ARROW_EXPORT
+Result<AsyncGenerator<std::shared_ptr<Buffer>>>
MakeRandomAccessFileGenerator(std::shared_ptr<RandomAccessFile> file, int64_t
block_size);
Review Comment:
We will want to document this to point out that it is async reentrant (e.g.
can be read with a readahead generator). Or, maybe this function could apply
the readahead generator also? So it would be:
```
Result<AsyncGenerator<std::shared_ptr<Buffer>>>
MakeRandomAccessFileGenerator(std::shared_ptr<RandomAccessFile> file, int64_t
block_size, int readahead);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]