lidavidm commented on a change in pull request #9656:
URL: https://github.com/apache/arrow/pull/9656#discussion_r621532088



##########
File path: cpp/src/arrow/ipc/reader.h
##########
@@ -147,6 +149,26 @@ class ARROW_EXPORT RecordBatchFileReader {
       const std::shared_ptr<io::RandomAccessFile>& file, int64_t footer_offset,
       const IpcReadOptions& options = IpcReadOptions::Defaults());
 
+  /// \brief Open a file asynchronously (owns the file).
+  static Future<std::shared_ptr<RecordBatchFileReader>> OpenAsync(
+      const std::shared_ptr<io::RandomAccessFile>& file,

Review comment:
       This is just mirroring the sync API. If we wanted to cut down on 
overloads, I'd rather have only this overload as it keeps the file alive (the 
other one would be prone to dangling pointers/isn't used by datasets).

##########
File path: cpp/src/arrow/ipc/read_write_test.cc
##########
@@ -1009,6 +1010,47 @@ struct FileWriterHelper {
   int64_t footer_offset_;
 };
 
+struct FileGeneratorWriterHelper : public FileWriterHelper {
+  Status ReadBatches(const IpcReadOptions& options, RecordBatchVector* 
out_batches,
+                     ReadStats* out_stats = nullptr) override {
+    auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
+    AsyncGenerator<std::shared_ptr<RecordBatch>> generator;
+
+    {
+      auto fut =
+          RecordBatchFileReader::OpenAsync(buf_reader.get(), footer_offset_, 
options);
+      RETURN_NOT_OK(fut.status());
+      EXPECT_FINISHES_OK_AND_ASSIGN(auto reader, fut);
+      EXPECT_EQ(num_batches_written_, reader->num_record_batches());
+      // Generator's lifetime is independent of the reader's

Review comment:
       Right, I meant "the generator will keep the reader alive for us".

##########
File path: cpp/src/arrow/ipc/message.cc
##########
@@ -324,6 +325,60 @@ Result<std::unique_ptr<Message>> ReadMessage(int64_t 
offset, int32_t metadata_le
   }
 }
 
+Future<std::shared_ptr<Message>> ReadMessageAsync(int64_t offset, int32_t 
metadata_length,
+                                                  int64_t body_length,
+                                                  io::RandomAccessFile* file,
+                                                  const io::IOContext& 
context) {
+  struct State {
+    std::unique_ptr<Message> result;
+    std::shared_ptr<MessageDecoderListener> listener;
+    std::shared_ptr<MessageDecoder> decoder;
+  };
+  auto state = std::make_shared<State>();

Review comment:
       Unfortunately yes, since MessageDecoder takes shared_ptr/unique_ptr 
directly.

##########
File path: cpp/src/arrow/ipc/read_write_test.cc
##########
@@ -1009,6 +1010,47 @@ struct FileWriterHelper {
   int64_t footer_offset_;
 };
 
+struct FileGeneratorWriterHelper : public FileWriterHelper {
+  Status ReadBatches(const IpcReadOptions& options, RecordBatchVector* 
out_batches,
+                     ReadStats* out_stats = nullptr) override {
+    auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
+    AsyncGenerator<std::shared_ptr<RecordBatch>> generator;
+
+    {
+      auto fut =
+          RecordBatchFileReader::OpenAsync(buf_reader.get(), footer_offset_, 
options);
+      RETURN_NOT_OK(fut.status());
+      EXPECT_FINISHES_OK_AND_ASSIGN(auto reader, fut);
+      EXPECT_EQ(num_batches_written_, reader->num_record_batches());
+      // Generator's lifetime is independent of the reader's
+      ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator());
+    }
+
+    // Generator is async-reentrant
+    std::vector<Future<std::shared_ptr<RecordBatch>>> futures;
+    for (int i = 0; i < num_batches_written_; ++i) {
+      futures.push_back(generator());
+    }
+    auto fut = generator();
+    EXPECT_FINISHES_OK_AND_ASSIGN(auto extra_read, fut);

Review comment:
       The `ASSERT` variants don't work in non-void returning contexts so I'll 
add the `EXPECT`.

##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -1175,6 +1284,92 @@ Result<std::shared_ptr<RecordBatchFileReader>> 
RecordBatchFileReader::Open(
   return result;
 }
 
+Future<std::shared_ptr<RecordBatchFileReader>> 
RecordBatchFileReader::OpenAsync(
+    const std::shared_ptr<io::RandomAccessFile>& file, const IpcReadOptions& 
options) {
+  ARROW_ASSIGN_OR_RAISE(int64_t footer_offset, file->GetSize());
+  return OpenAsync(std::move(file), footer_offset, options);
+}
+
+Future<std::shared_ptr<RecordBatchFileReader>> 
RecordBatchFileReader::OpenAsync(
+    io::RandomAccessFile* file, const IpcReadOptions& options) {
+  ARROW_ASSIGN_OR_RAISE(int64_t footer_offset, file->GetSize());
+  return OpenAsync(file, footer_offset, options);
+}
+
+Future<std::shared_ptr<RecordBatchFileReader>> 
RecordBatchFileReader::OpenAsync(
+    const std::shared_ptr<io::RandomAccessFile>& file, int64_t footer_offset,
+    const IpcReadOptions& options) {
+  auto result = std::make_shared<RecordBatchFileReaderImpl>();
+  return result->OpenAsync(file, footer_offset, options)
+      .Then(
+          [=](...) -> Result<std::shared_ptr<RecordBatchFileReader>> { return 
result; });
+}
+
+Future<std::shared_ptr<RecordBatchFileReader>> 
RecordBatchFileReader::OpenAsync(
+    io::RandomAccessFile* file, int64_t footer_offset, const IpcReadOptions& 
options) {
+  auto result = std::make_shared<RecordBatchFileReaderImpl>();
+  return result->OpenAsync(file, footer_offset, options)
+      .Then(
+          [=](...) -> Result<std::shared_ptr<RecordBatchFileReader>> { return 
result; });
+}
+
+Future<IpcFileRecordBatchGenerator::Item> 
IpcFileRecordBatchGenerator::operator()() {
+  auto state = state_;
+  if (!read_dictionaries_.is_valid()) {
+    std::vector<Future<std::shared_ptr<Message>>> 
messages(state->num_dictionaries());
+    for (int i = 0; i < state->num_dictionaries(); i++) {
+      auto block = 
FileBlockFromFlatbuffer(state->footer_->dictionaries()->Get(i));
+      messages[i] = ReadMessageFromBlockAsync(block, state->file_, 
io_context_);
+    }
+    auto read_messages = All(std::move(messages));
+    if (executor_) read_messages = executor_->Transfer(read_messages);
+    read_dictionaries_ = read_messages.Then(
+        [=](const std::vector<Result<std::shared_ptr<Message>>> maybe_messages)
+            -> Result<std::shared_ptr<Message>> {
+          std::vector<std::shared_ptr<Message>> 
messages(state->num_dictionaries());
+          for (size_t i = 0; i < messages.size(); i++) {
+            ARROW_ASSIGN_OR_RAISE(messages[i], maybe_messages[i]);
+          }
+          return ReadDictionaries(state.get(), std::move(messages));
+        });
+  }
+  if (index_ >= state_->num_record_batches()) {
+    return Future<Item>::MakeFinished(IterationTraits<Item>::End());
+  }
+  auto block = 
FileBlockFromFlatbuffer(state->footer_->recordBatches()->Get(index_++));
+  auto read_message = ReadMessageFromBlockAsync(block, state->file_, 
io_context_);
+  std::vector<Future<std::shared_ptr<Message>>> 
dependencies{read_dictionaries_,
+                                                             
std::move(read_message)};
+  auto read_messages = All(dependencies);
+  if (executor_) read_messages = executor_->Transfer(read_messages);
+  return read_messages.Then(
+      [=](const std::vector<Result<std::shared_ptr<Message>>> maybe_messages)
+          -> Result<Item> {
+        RETURN_NOT_OK(maybe_messages[0]);  // Make sure dictionaries were read
+        ARROW_ASSIGN_OR_RAISE(auto message, maybe_messages[1]);
+        return ReadRecordBatch(state.get(), message.get());
+      });
+}
+
+Result<std::shared_ptr<Message>> IpcFileRecordBatchGenerator::ReadDictionaries(
+    RecordBatchFileReaderImpl* state,
+    std::vector<std::shared_ptr<Message>> dictionary_messages) {
+  IpcReadContext context(&state->dictionary_memo_, state->options_, 
state->swap_endian_);
+  for (const auto& message : dictionary_messages) {
+    RETURN_NOT_OK(ReadOneDictionary(message.get(), context));
+  }
+  return nullptr;
+}
+
+Result<std::shared_ptr<RecordBatch>> 
IpcFileRecordBatchGenerator::ReadRecordBatch(
+    RecordBatchFileReaderImpl* state, Message* message) {
+  CHECK_HAS_BODY(*message);

Review comment:
       I am not sure why it was originally done that way, I guess to be extra 
sure it was inlined?

##########
File path: cpp/src/arrow/ipc/read_write_test.cc
##########
@@ -1009,6 +1010,47 @@ struct FileWriterHelper {
   int64_t footer_offset_;
 };
 
+struct FileGeneratorWriterHelper : public FileWriterHelper {
+  Status ReadBatches(const IpcReadOptions& options, RecordBatchVector* 
out_batches,
+                     ReadStats* out_stats = nullptr) override {
+    auto buf_reader = std::make_shared<io::BufferReader>(buffer_);
+    AsyncGenerator<std::shared_ptr<RecordBatch>> generator;
+
+    {
+      auto fut =
+          RecordBatchFileReader::OpenAsync(buf_reader.get(), footer_offset_, 
options);
+      RETURN_NOT_OK(fut.status());
+      EXPECT_FINISHES_OK_AND_ASSIGN(auto reader, fut);
+      EXPECT_EQ(num_batches_written_, reader->num_record_batches());
+      // Generator's lifetime is independent of the reader's
+      ARROW_ASSIGN_OR_RAISE(generator, reader->GetRecordBatchGenerator());
+    }
+
+    // Generator is async-reentrant
+    std::vector<Future<std::shared_ptr<RecordBatch>>> futures;
+    for (int i = 0; i < num_batches_written_; ++i) {
+      futures.push_back(generator());
+    }
+    auto fut = generator();
+    EXPECT_FINISHES_OK_AND_ASSIGN(auto extra_read, fut);
+    EXPECT_EQ(nullptr, extra_read);
+    for (auto& future : futures) {
+      EXPECT_FINISHES_OK_AND_ASSIGN(auto batch, future);
+      out_batches->push_back(batch);
+    }
+
+    // The generator doesn't track stats.
+    EXPECT_EQ(nullptr, out_stats);
+
+    return Status::OK();
+  }
+
+  Status Read(const IpcReadOptions& options, RecordBatchVector* out_batches,

Review comment:
       They aren't called ever so I assume they are vestigial (I will delete 
them).

##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -1175,6 +1284,92 @@ Result<std::shared_ptr<RecordBatchFileReader>> 
RecordBatchFileReader::Open(
   return result;
 }
 
+Future<std::shared_ptr<RecordBatchFileReader>> 
RecordBatchFileReader::OpenAsync(
+    const std::shared_ptr<io::RandomAccessFile>& file, const IpcReadOptions& 
options) {
+  ARROW_ASSIGN_OR_RAISE(int64_t footer_offset, file->GetSize());
+  return OpenAsync(std::move(file), footer_offset, options);
+}
+
+Future<std::shared_ptr<RecordBatchFileReader>> 
RecordBatchFileReader::OpenAsync(
+    io::RandomAccessFile* file, const IpcReadOptions& options) {
+  ARROW_ASSIGN_OR_RAISE(int64_t footer_offset, file->GetSize());
+  return OpenAsync(file, footer_offset, options);
+}
+
+Future<std::shared_ptr<RecordBatchFileReader>> 
RecordBatchFileReader::OpenAsync(
+    const std::shared_ptr<io::RandomAccessFile>& file, int64_t footer_offset,
+    const IpcReadOptions& options) {
+  auto result = std::make_shared<RecordBatchFileReaderImpl>();
+  return result->OpenAsync(file, footer_offset, options)
+      .Then(
+          [=](...) -> Result<std::shared_ptr<RecordBatchFileReader>> { return 
result; });
+}
+
+Future<std::shared_ptr<RecordBatchFileReader>> 
RecordBatchFileReader::OpenAsync(
+    io::RandomAccessFile* file, int64_t footer_offset, const IpcReadOptions& 
options) {
+  auto result = std::make_shared<RecordBatchFileReaderImpl>();
+  return result->OpenAsync(file, footer_offset, options)
+      .Then(
+          [=](...) -> Result<std::shared_ptr<RecordBatchFileReader>> { return 
result; });
+}
+
+Future<IpcFileRecordBatchGenerator::Item> 
IpcFileRecordBatchGenerator::operator()() {
+  auto state = state_;
+  if (!read_dictionaries_.is_valid()) {
+    std::vector<Future<std::shared_ptr<Message>>> 
messages(state->num_dictionaries());
+    for (int i = 0; i < state->num_dictionaries(); i++) {
+      auto block = 
FileBlockFromFlatbuffer(state->footer_->dictionaries()->Get(i));
+      messages[i] = ReadMessageFromBlockAsync(block, state->file_, 
io_context_);

Review comment:
       I intend to rebase on ARROW-12522 and add coalescing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to