pitrou commented on a change in pull request #9656:
URL: https://github.com/apache/arrow/pull/9656#discussion_r590494650
##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -958,10 +959,196 @@ Result<std::shared_ptr<RecordBatchStreamReader>>
RecordBatchStreamReader::Open(
// ----------------------------------------------------------------------
// Reader implementation
+// Common functions used in both the random-access file reader and the
+// asynchronous generator
static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) {
return FileBlock{block->offset(), block->metaDataLength(),
block->bodyLength()};
}
+Result<std::unique_ptr<Message>> ReadMessageFromBlock(const FileBlock& block,
+ io::RandomAccessFile*
file) {
+ if (!BitUtil::IsMultipleOf8(block.offset) ||
+ !BitUtil::IsMultipleOf8(block.metadata_length) ||
+ !BitUtil::IsMultipleOf8(block.body_length)) {
+ return Status::Invalid("Unaligned block in IPC file");
+ }
+
+ // TODO(wesm): this breaks integration tests, see ARROW-3256
+ // DCHECK_EQ((*out)->body_length(), block.body_length);
+
+ ARROW_ASSIGN_OR_RAISE(auto message,
+ ReadMessage(block.offset, block.metadata_length,
file));
+ return std::move(message);
+}
+
+Status ReadOneDictionary(Message* message, const IpcReadContext& context) {
+ CHECK_HAS_BODY(*message);
+ ARROW_ASSIGN_OR_RAISE(auto reader, Buffer::GetReader(message->body()));
+ DictionaryKind kind;
+ RETURN_NOT_OK(ReadDictionary(*message->metadata(), context, &kind,
reader.get()));
+ if (kind != DictionaryKind::New) {
+ return Status::Invalid(
+ "Unsupported dictionary replacement or "
+ "dictionary delta in IPC file");
+ }
+ return Status::OK();
+}
+
+/// Common state used by the IPC message generator and record batch generator.
+struct ARROW_EXPORT IpcFileRecordBatchGeneratorState {
Review comment:
This looks a bit tedious. You're essentially copying most of those
fields from `RandomAccessFile`?
##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -958,10 +959,196 @@ Result<std::shared_ptr<RecordBatchStreamReader>>
RecordBatchStreamReader::Open(
// ----------------------------------------------------------------------
// Reader implementation
+// Common functions used in both the random-access file reader and the
+// asynchronous generator
static inline FileBlock FileBlockFromFlatbuffer(const flatbuf::Block* block) {
return FileBlock{block->offset(), block->metaDataLength(),
block->bodyLength()};
}
+Result<std::unique_ptr<Message>> ReadMessageFromBlock(const FileBlock& block,
Review comment:
Can you make those functions static or put them in the anonymous
namespace?
##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -1022,6 +1209,31 @@ class RecordBatchFileReaderImpl : public
RecordBatchFileReader {
ReadStats stats() const override { return stats_; }
+ Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> GetRecordBatchGenerator(
+ int readahead_messages, const io::IOContext& io_context) override {
+ auto state = std::make_shared<IpcFileRecordBatchGeneratorState>();
+ state->num_dictionaries_ = num_dictionaries();
+ state->num_record_batches_ = num_record_batches();
+ state->file_ = file_;
+ state->options_ = options_;
+ state->owned_file_ = owned_file_;
+ state->footer_buffer_ = footer_buffer_;
+ state->footer_ = footer_;
+ // Must regenerate uncopyable DictionaryMemo
Review comment:
Would be nice to avoid this.
##########
File path: cpp/src/arrow/ipc/reader.cc
##########
@@ -1022,6 +1209,31 @@ class RecordBatchFileReaderImpl : public
RecordBatchFileReader {
ReadStats stats() const override { return stats_; }
+ Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> GetRecordBatchGenerator(
+ int readahead_messages, const io::IOContext& io_context) override {
+ auto state = std::make_shared<IpcFileRecordBatchGeneratorState>();
+ state->num_dictionaries_ = num_dictionaries();
+ state->num_record_batches_ = num_record_batches();
+ state->file_ = file_;
+ state->options_ = options_;
+ state->owned_file_ = owned_file_;
+ state->footer_buffer_ = footer_buffer_;
+ state->footer_ = footer_;
+ // Must regenerate uncopyable DictionaryMemo
+ RETURN_NOT_OK(UnpackSchemaMessage(state->footer_->schema(),
state->options_,
+ &state->dictionary_memo_,
&state->schema_,
+ &state->out_schema_,
&state->field_inclusion_mask_,
+ &state->swap_endian_));
+ AsyncGenerator<std::shared_ptr<Message>> message_generator =
+ IpcMessageGenerator(state, io_context);
+ if (readahead_messages > 0) {
+ message_generator =
+ MakeReadaheadGenerator(std::move(message_generator),
readahead_messages);
+ }
+ return IpcFileRecordBatchGenerator(state, message_generator,
+ arrow::internal::GetCpuThreadPool());
Review comment:
I don't think it's desirable to force all processing to go to the global
thread pool unconditionally.
(also, are you sure the processing is heavy enough that it benefits from it?)
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]