marsupialtail commented on code in PR #13931:
URL: https://github.com/apache/arrow/pull/13931#discussion_r955628813
##########
cpp/src/arrow/csv/reader.cc:
##########
@@ -970,6 +973,150 @@ class StreamingReaderImpl : public ReaderMixin,
std::shared_ptr<std::atomic<int64_t>> bytes_decoded_;
};
+/////////////////////////////////////////////////////////////////////////
+// Base class for streaming readers
+
+class TonyReaderImpl : public
ReaderMixin<std::shared_ptr<io::RandomAccessFile>>,
+ public csv::StreamingReader,
+ public std::enable_shared_from_this<TonyReaderImpl> {
+ public:
+ TonyReaderImpl(io::IOContext io_context,
std::shared_ptr<io::RandomAccessFile> input,
+ const ReadOptions& read_options, const ParseOptions&
parse_options,
+ const ConvertOptions& convert_options, bool count_rows)
+ : ReaderMixin(io_context, std::move(input), read_options, parse_options,
+ convert_options, count_rows),
+ bytes_decoded_(std::make_shared<std::atomic<int64_t>>(0)) {}
+
+ Future<> Init(Executor* cpu_executor) {
+ ARROW_ASSIGN_OR_RAISE(
+ AsyncGenerator<std::shared_ptr<Buffer>> ifile_gen,
+ io::MakeRandomAccessFileGenerator(input_, read_options_.block_size));
+
+ // TODO Consider exposing readahead as a read option (ARROW-12090)
+ auto prefetch_gen =
+ MakeReadaheadGenerator(ifile_gen,
io_context_.executor()->GetCapacity());
+
+ auto transferred_it = MakeTransferredGenerator(prefetch_gen, cpu_executor);
+
+ auto buffer_generator =
CSVBufferIterator::MakeAsync(std::move(transferred_it));
+
+ int max_readahead = cpu_executor->GetCapacity();
+ auto self = shared_from_this();
+
+ return buffer_generator().Then([self, buffer_generator, max_readahead,
cpu_executor](
+ const std::shared_ptr<Buffer>&
first_buffer) {
+ return self->InitAfterFirstBuffer(first_buffer, buffer_generator,
max_readahead,
+ cpu_executor);
+ });
+ }
Review Comment:
The hope is that I will keep improving this TonyReaderImpl (or whatever I
change the name to) to include more and more optimizations in subsequent PRs.
E.g. parallel decode. So the amount of duplicated code is hopefully a temporary
state.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]