westonpace commented on code in PR #13931:
URL: https://github.com/apache/arrow/pull/13931#discussion_r955544496
##########
cpp/src/arrow/csv/reader.cc:
##########
@@ -545,11 +546,12 @@ class BlockDecodingOperator {
/////////////////////////////////////////////////////////////////////////
// Base class for common functionality
+template <typename T>
Review Comment:
I wonder if this might be more readable as `InputType` instead of `T`.
##########
cpp/src/arrow/csv/reader.cc:
##########
@@ -970,6 +973,150 @@ class StreamingReaderImpl : public ReaderMixin,
std::shared_ptr<std::atomic<int64_t>> bytes_decoded_;
};
+/////////////////////////////////////////////////////////////////////////
+// Base class for streaming readers
+
+class TonyReaderImpl : public
ReaderMixin<std::shared_ptr<io::RandomAccessFile>>,
Review Comment:
`TonyReader` has a nice ring to it but I think `StreamingFileReader` might
be easier to understand for future readers.
##########
cpp/src/arrow/io/interfaces.cc:
##########
@@ -138,6 +137,42 @@ Result<Iterator<std::shared_ptr<Buffer>>>
MakeInputStreamIterator(
return Iterator<std::shared_ptr<Buffer>>(InputStreamBlockIterator(stream,
block_size));
}
+Result<AsyncGenerator<std::shared_ptr<Buffer>>> MakeRandomAccessFileGenerator(
+ std::shared_ptr<RandomAccessFile> file, int64_t block_size) {
+ struct State {
+ explicit State(std::shared_ptr<RandomAccessFile> file_, int64_t
block_size_)
+ : file(std::move(file_)), block_size(block_size_), position(0) {}
+
+ Status init() {
+ RETURN_NOT_OK(file->Seek(0));
+ // if seek worked this will also work.
+ total_size = file->GetSize().ValueOrDie();
+ return Status::OK();
Review Comment:
```suggestion
return file->GetSize();
```
You are probably right but I'm not sure avoiding this if check is worth
making this method slightly more confusing given it shouldn't really be called
all that often.
##########
cpp/src/arrow/csv/reader.cc:
##########
@@ -970,6 +973,150 @@ class StreamingReaderImpl : public ReaderMixin,
std::shared_ptr<std::atomic<int64_t>> bytes_decoded_;
};
+/////////////////////////////////////////////////////////////////////////
+// Base class for streaming readers
+
+class TonyReaderImpl : public
ReaderMixin<std::shared_ptr<io::RandomAccessFile>>,
+ public csv::StreamingReader,
+ public std::enable_shared_from_this<TonyReaderImpl> {
+ public:
+ TonyReaderImpl(io::IOContext io_context,
std::shared_ptr<io::RandomAccessFile> input,
+ const ReadOptions& read_options, const ParseOptions&
parse_options,
+ const ConvertOptions& convert_options, bool count_rows)
+ : ReaderMixin(io_context, std::move(input), read_options, parse_options,
+ convert_options, count_rows),
+ bytes_decoded_(std::make_shared<std::atomic<int64_t>>(0)) {}
+
+ Future<> Init(Executor* cpu_executor) {
+ ARROW_ASSIGN_OR_RAISE(
+ AsyncGenerator<std::shared_ptr<Buffer>> ifile_gen,
+ io::MakeRandomAccessFileGenerator(input_, read_options_.block_size));
+
+ // TODO Consider exposing readahead as a read option (ARROW-12090)
+ auto prefetch_gen =
+ MakeReadaheadGenerator(ifile_gen,
io_context_.executor()->GetCapacity());
+
+ auto transferred_it = MakeTransferredGenerator(prefetch_gen, cpu_executor);
+
+ auto buffer_generator =
CSVBufferIterator::MakeAsync(std::move(transferred_it));
+
+ int max_readahead = cpu_executor->GetCapacity();
+ auto self = shared_from_this();
+
+ return buffer_generator().Then([self, buffer_generator, max_readahead,
cpu_executor](
+ const std::shared_ptr<Buffer>&
first_buffer) {
+ return self->InitAfterFirstBuffer(first_buffer, buffer_generator,
max_readahead,
+ cpu_executor);
+ });
+ }
Review Comment:
Is this the only method that differs from `StreamingReaderImpl`? Is there
any way the two streaming readers could extend some kind of
`StreamingReaderBase` to avoid duplication?
##########
cpp/src/arrow/csv/reader.h:
##########
@@ -101,6 +102,10 @@ class ARROW_EXPORT StreamingReader : public
RecordBatchReader {
io::IOContext io_context, std::shared_ptr<io::InputStream> input,
arrow::internal::Executor* cpu_executor, const ReadOptions&, const
ParseOptions&,
const ConvertOptions&);
+ static Future<std::shared_ptr<StreamingReader>> MakeAsync(
Review Comment:
Let's add at least a brief comment here explaining the difference between
the two reader types.
##########
cpp/src/arrow/io/interfaces.cc:
##########
@@ -19,6 +19,7 @@
#include <algorithm>
#include <cstdint>
+#include <iostream>
Review Comment:
```suggestion
```
##########
cpp/src/arrow/util/async_generator.h:
##########
@@ -147,6 +147,51 @@ Future<std::vector<T>>
CollectAsyncGenerator(AsyncGenerator<T> generator) {
return Loop(LoopBody{std::move(generator), std::move(vec)});
}
+/// \brief this is just like a MapGenerator but the map fun returns a thing
instead of a
+/// future
+template <typename T, typename ApplyFn,
+ typename Applied = arrow::detail::result_of_t<ApplyFn(const T&)>,
+ typename V = typename EnsureResult<Applied>::type::ValueType>
+AsyncGenerator<V> MakeApplyGenerator(AsyncGenerator<T> source_gen, ApplyFn
apply_fun,
+ internal::Executor* cpu_exec) {
+ struct State {
+ explicit State(AsyncGenerator<T> source_gen_, ApplyFn apply_fun_,
+ internal::Executor* cpu_exec_)
+ : source_gen(std::move(source_gen_)),
+ apply_fun(std::move(apply_fun_)),
+ cpu_exec(cpu_exec_),
+ finished(false) {}
+
+ AsyncGenerator<T> source_gen;
+ ApplyFn apply_fun;
+ internal::Executor* cpu_exec;
+ bool finished;
+ };
+
+ auto state =
+ std::make_shared<State>(std::move(source_gen), std::move(apply_fun),
cpu_exec);
+ return [state]() {
+ CallbackOptions options;
+ options.executor = state->cpu_exec;
+ options.should_schedule = ShouldSchedule::Always;
+
+ return state->source_gen().Then(
+ [state](const T& next) -> Result<V> {
+ if (IsIterationEnd(next)) {
+ return IterationTraits<V>::End();
+ } else {
+ auto value = state->apply_fun(next);
+ if (!value.ok()) {
+ return Status::NotImplemented("not implemented");
+ } else {
+ return value.ValueOrDie();
+ }
Review Comment:
```suggestion
return state->apply_fun(next);
```
I'm not entirely sure why you are rewriting the failure to "not implemented".
##########
cpp/src/arrow/csv/reader.cc:
##########
@@ -20,6 +20,7 @@
#include <cstdint>
#include <cstring>
#include <functional>
+#include <iostream>
Review Comment:
```suggestion
```
##########
cpp/src/arrow/io/interfaces.cc:
##########
@@ -138,6 +137,42 @@ Result<Iterator<std::shared_ptr<Buffer>>>
MakeInputStreamIterator(
return Iterator<std::shared_ptr<Buffer>>(InputStreamBlockIterator(stream,
block_size));
}
+Result<AsyncGenerator<std::shared_ptr<Buffer>>> MakeRandomAccessFileGenerator(
+ std::shared_ptr<RandomAccessFile> file, int64_t block_size) {
+ struct State {
+ explicit State(std::shared_ptr<RandomAccessFile> file_, int64_t
block_size_)
+ : file(std::move(file_)), block_size(block_size_), position(0) {}
+
+ Status init() {
Review Comment:
```suggestion
Status Init() {
```
##########
cpp/src/arrow/dataset/file_csv.cc:
##########
@@ -18,6 +18,7 @@
#include "arrow/dataset/file_csv.h"
#include <algorithm>
+#include <iostream>
Review Comment:
```suggestion
```
##########
cpp/src/arrow/csv/reader.cc:
##########
@@ -1166,10 +1313,26 @@ Future<std::shared_ptr<StreamingReader>>
MakeStreamingReader(
});
}
+Future<std::shared_ptr<StreamingReader>> MakeStreamingReader(
Review Comment:
I'll defer to the input of others but I think an overload of
`MakeStreamingReader` based solely on the type of `input` is a rather subtle
API. Could we maybe just rename this to `MakeStreamingFileReader`?
##########
cpp/src/arrow/util/async_generator.h:
##########
@@ -147,6 +147,51 @@ Future<std::vector<T>>
CollectAsyncGenerator(AsyncGenerator<T> generator) {
return Loop(LoopBody{std::move(generator), std::move(vec)});
}
+/// \brief this is just like a MapGenerator but the map fun returns a thing
instead of a
+/// future
Review Comment:
Can you expand on this description? I'm pretty sure the important part is
that you are launching a new thread task with each input right?
##########
cpp/src/arrow/csv/reader.cc:
##########
@@ -1287,6 +1450,14 @@ Future<std::shared_ptr<StreamingReader>>
StreamingReader::MakeAsync(
parse_options, convert_options);
}
+Future<std::shared_ptr<StreamingReader>> StreamingReader::MakeAsync(
+ io::IOContext io_context, std::shared_ptr<io::RandomAccessFile> input,
+ Executor* cpu_executor, const ReadOptions& read_options,
+ const ParseOptions& parse_options, const ConvertOptions& convert_options) {
+ return MakeStreamingReader(io_context, std::move(input), cpu_executor,
read_options,
+ parse_options, convert_options);
+}
+
Review Comment:
I suppose the same comment I had for `MakeStreamingReader` applies here too
but `MakeFileAsync` doesn't really make sense so maybe we can leave this method
name alone and just live with an overload but I'm open to suggestions.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]