This is an automated email from the ASF dual-hosted git repository.
apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 7c4522872c GH-48846: [C++] Read message metadata and body in one go in
IPC file reader (#48975)
7c4522872c is described below
commit 7c4522872c0631c6f6dfc17b10753262cb514372
Author: Abhishek Bansal <[email protected]>
AuthorDate: Tue Feb 17 19:34:06 2026 +0530
GH-48846: [C++] Read message metadata and body in one go in IPC file reader
(#48975)
### Rationale for this change
ReadMessageAsync takes a body_length parameter and reads Message metadata +
body in one go, but the blocking version ReadMessage reads the body length
from the Message and issues a second read for the body. This PR adds a
ReadMessage overload that takes the body length as parameter and does a single
read like the async version does. This reduces the number of IOs issued by the
IPC file reader.
### What changes are included in this PR?
1. Add ReadMessage overload accepting body_length
2. Update IPC file reader to use the new ReadMessage overload when reading
full record batches
### Are these changes tested?
Yes, added TestReadMessage.ReadBodyWithLength and updated other tests to
use the new overload.
### Are there any user-facing changes?
No.
* GitHub Issue: #48846
Lead-authored-by: Abhishek Bansal <[email protected]>
Co-authored-by: Antoine Pitrou <[email protected]>
Signed-off-by: Antoine Pitrou <[email protected]>
---
cpp/src/arrow/ipc/message.cc | 48 +++++++++++++++----
cpp/src/arrow/ipc/message.h | 24 +++++++++-
cpp/src/arrow/ipc/read_write_test.cc | 90 ++++++++++++++++++++++++++++++------
cpp/src/arrow/ipc/reader.cc | 12 +++--
4 files changed, 146 insertions(+), 28 deletions(-)
diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc
index 8be09956f1..c21eb913c3 100644
--- a/cpp/src/arrow/ipc/message.cc
+++ b/cpp/src/arrow/ipc/message.cc
@@ -21,6 +21,7 @@
#include <cstddef>
#include <cstdint>
#include <memory>
+#include <optional>
#include <string>
#include <utility>
#include <vector>
@@ -363,9 +364,13 @@ Result<std::unique_ptr<Message>>
ReadMessage(std::shared_ptr<Buffer> metadata,
}
}
-Result<std::unique_ptr<Message>> ReadMessage(int64_t offset, int32_t
metadata_length,
- io::RandomAccessFile* file,
- const FieldsLoaderFunction&
fields_loader) {
+// Common helper for the two ReadMessage overloads that take a file + offset.
+// When body_length is provided, metadata and body are read in a single IO.
+// When body_length is absent, metadata is read first, then the body is read
+// separately.
+static Result<std::unique_ptr<Message>> ReadMessageInternal(
+ int64_t offset, int32_t metadata_length, std::optional<int64_t>
body_length,
+ io::RandomAccessFile* file, const FieldsLoaderFunction& fields_loader) {
std::unique_ptr<Message> result;
auto listener = std::make_shared<AssignMessageDecoderListener>(&result);
MessageDecoder decoder(listener);
@@ -375,15 +380,18 @@ Result<std::unique_ptr<Message>> ReadMessage(int64_t
offset, int32_t metadata_le
decoder.next_required_size());
}
- // TODO(GH-48846): we should take a body_length just like ReadMessageAsync
- // and read metadata + body in one go.
- ARROW_ASSIGN_OR_RAISE(auto metadata, file->ReadAt(offset, metadata_length));
+ // When body_length is known, read metadata + body in one IO call.
+ // Otherwise, read only metadata first.
+ ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> metadata,
+ file->ReadAt(offset, metadata_length +
body_length.value_or(0)));
+
if (metadata->size() < metadata_length) {
return Status::Invalid("Expected to read ", metadata_length,
" metadata bytes at offset ", offset, " but got ",
metadata->size());
}
- ARROW_RETURN_NOT_OK(decoder.Consume(metadata));
+
+ ARROW_RETURN_NOT_OK(decoder.Consume(SliceBuffer(metadata, 0,
metadata_length)));
switch (decoder.state()) {
case MessageDecoder::State::INITIAL:
@@ -398,14 +406,23 @@ Result<std::unique_ptr<Message>> ReadMessage(int64_t
offset, int32_t metadata_le
case MessageDecoder::State::BODY: {
std::shared_ptr<Buffer> body;
if (fields_loader) {
+ // Selective field loading: allocate a body buffer and read only the
+ // requested field ranges into it.
ARROW_ASSIGN_OR_RAISE(
body, AllocateBuffer(decoder.next_required_size(),
default_memory_pool()));
RETURN_NOT_OK(ReadFieldsSubset(offset, metadata_length, file,
fields_loader,
- metadata, decoder.next_required_size(),
body));
+ SliceBuffer(metadata, 0,
metadata_length),
+ decoder.next_required_size(), body));
+ } else if (body_length.has_value()) {
+ // Body was already read as part of the combined IO; just slice it out.
+ body = SliceBuffer(metadata, metadata_length,
+ std::min(*body_length, metadata->size() -
metadata_length));
} else {
+ // Body length was unknown; do a separate IO to read the body.
ARROW_ASSIGN_OR_RAISE(
body, file->ReadAt(offset + metadata_length,
decoder.next_required_size()));
}
+
if (body->size() < decoder.next_required_size()) {
return Status::IOError("Expected to be able to read ",
decoder.next_required_size(),
@@ -421,6 +438,21 @@ Result<std::unique_ptr<Message>> ReadMessage(int64_t
offset, int32_t metadata_le
}
}
+Result<std::unique_ptr<Message>> ReadMessage(int64_t offset, int32_t
metadata_length,
+ io::RandomAccessFile* file,
+ const FieldsLoaderFunction&
fields_loader) {
+ return ReadMessageInternal(offset, metadata_length,
/*body_length=*/std::nullopt, file,
+ fields_loader);
+}
+
+Result<std::unique_ptr<Message>> ReadMessage(const int64_t offset,
+ const int32_t metadata_length,
+ const int64_t body_length,
+ io::RandomAccessFile* file) {
+ return ReadMessageInternal(offset, metadata_length, body_length, file,
+ /*fields_loader=*/{});
+}
+
Future<std::shared_ptr<Message>> ReadMessageAsync(int64_t offset, int32_t
metadata_length,
int64_t body_length,
io::RandomAccessFile* file,
diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h
index 1cd72ce993..df80b0eba2 100644
--- a/cpp/src/arrow/ipc/message.h
+++ b/cpp/src/arrow/ipc/message.h
@@ -449,7 +449,7 @@ class ARROW_EXPORT MessageReader {
// org::apache::arrow::flatbuf::RecordBatch*)
using FieldsLoaderFunction = std::function<Status(const void*,
io::RandomAccessFile*)>;
-/// \brief Read encapsulated RPC message from position in file
+/// \brief Read encapsulated IPC message from position in file
///
/// Read a length-prefixed message flatbuffer starting at the indicated file
/// offset. If the message has a body with non-zero length, it will also be
@@ -469,7 +469,27 @@ Result<std::unique_ptr<Message>> ReadMessage(
const int64_t offset, const int32_t metadata_length, io::RandomAccessFile*
file,
const FieldsLoaderFunction& fields_loader = {});
-/// \brief Read encapsulated RPC message from cached buffers
+/// \brief Read encapsulated IPC message from position in file
+///
+/// Read a length-prefixed message flatbuffer starting at the indicated file
+/// offset.
+///
+/// The metadata_length includes at least the length prefix and the flatbuffer
+///
+/// \param[in] offset the position in the file where the message starts. The
+/// first 4 bytes after the offset are the message length
+/// \param[in] metadata_length the total number of bytes to read from file
+/// \param[in] body_length the number of bytes for the message body
+/// \param[in] file the seekable file interface to read from
+/// \return the message read
+
+ARROW_EXPORT
+Result<std::unique_ptr<Message>> ReadMessage(const int64_t offset,
+ const int32_t metadata_length,
+ const int64_t body_length,
+ io::RandomAccessFile* file);
+
+/// \brief Read encapsulated IPC message from cached buffers
///
/// The buffers should contain an entire message. Partial reads are not
handled.
///
diff --git a/cpp/src/arrow/ipc/read_write_test.cc
b/cpp/src/arrow/ipc/read_write_test.cc
index 9f7df541bd..86cd0e06ab 100644
--- a/cpp/src/arrow/ipc/read_write_test.cc
+++ b/cpp/src/arrow/ipc/read_write_test.cc
@@ -552,9 +552,15 @@ class TestIpcRoundTrip : public
::testing::TestWithParam<MakeRecordBatch*>,
ASSERT_OK(WriteRecordBatch(*batch, buffer_offset, mmap_.get(),
&metadata_length,
&body_length, options_));
- ASSERT_OK_AND_ASSIGN(std::unique_ptr<Message> message,
+ ASSERT_OK_AND_ASSIGN(std::unique_ptr<Message> message1,
ReadMessage(0, metadata_length, mmap_.get()));
- ASSERT_EQ(expected_version, message->metadata_version());
+ ASSERT_EQ(expected_version, message1->metadata_version());
+
+ ASSERT_OK_AND_ASSIGN(auto message2,
+ ReadMessage(0, metadata_length, body_length,
mmap_.get()));
+ ASSERT_EQ(expected_version, message2->metadata_version());
+
+ ASSERT_TRUE(message1->Equals(*message2));
}
};
@@ -613,6 +619,27 @@ TEST(TestReadMessage, CorruptedSmallInput) {
ASSERT_EQ(nullptr, message);
}
+TEST(TestReadMessage, ReadBodyWithLength) {
+ // Test the optimized ReadMessage(offset, meta_len, body_len, file) overload
+ std::shared_ptr<RecordBatch> batch;
+ ASSERT_OK(MakeIntRecordBatch(&batch));
+
+ ASSERT_OK_AND_ASSIGN(auto stream, io::BufferOutputStream::Create(0));
+ int32_t metadata_length;
+ int64_t body_length;
+ ASSERT_OK(WriteRecordBatch(*batch, 0, stream.get(), &metadata_length,
&body_length,
+ IpcWriteOptions::Defaults()));
+
+ ASSERT_OK_AND_ASSIGN(auto buffer, stream->Finish());
+ io::BufferReader reader(buffer);
+
+ ASSERT_OK_AND_ASSIGN(auto message,
+ ReadMessage(0, metadata_length, body_length, &reader));
+
+ ASSERT_EQ(body_length, message->body_length());
+ ASSERT_TRUE(message->Verify());
+}
+
TEST(TestMetadata, GetMetadataVersion) {
ASSERT_EQ(MetadataVersion::V1, ipc::internal::GetMetadataVersion(
flatbuf::MetadataVersion::MetadataVersion_V1));
@@ -1094,7 +1121,7 @@ TEST_F(RecursionLimits, ReadLimit) {
&schema));
ASSERT_OK_AND_ASSIGN(std::unique_ptr<Message> message,
- ReadMessage(0, metadata_length, mmap_.get()));
+ ReadMessage(0, metadata_length, body_length,
mmap_.get()));
io::BufferReader reader(message->body());
@@ -1119,7 +1146,7 @@ TEST_F(RecursionLimits, StressLimit) {
&schema));
ASSERT_OK_AND_ASSIGN(std::unique_ptr<Message> message,
- ReadMessage(0, metadata_length, mmap_.get()));
+ ReadMessage(0, metadata_length, body_length,
mmap_.get()));
DictionaryMemo empty_memo;
@@ -3018,25 +3045,56 @@ void GetReadRecordBatchReadRanges(
auto read_ranges = tracked->get_read_ranges();
- // there are 3 read IOs before reading body:
- // 1) read magic and footer length IO
- // 2) read footer IO
- // 3) read record batch metadata IO
- EXPECT_EQ(read_ranges.size(), 3 + expected_body_read_lengths.size());
const int32_t magic_size =
static_cast<int>(strlen(ipc::internal::kArrowMagicBytes));
// read magic and footer length IO
auto file_end_size = magic_size + sizeof(int32_t);
auto footer_length_offset = buffer->size() - file_end_size;
auto footer_length = bit_util::FromLittleEndian(
util::SafeLoadAs<int32_t>(buffer->data() + footer_length_offset));
+
+ // there are at least 2 read IOs before reading body:
+ // 1) read magic and footer length IO
+ // 2) footer IO
+ EXPECT_GE(read_ranges.size(), 2);
+
+ // read magic and footer length IO
EXPECT_EQ(read_ranges[0].length, file_end_size);
// read footer IO
EXPECT_EQ(read_ranges[1].length, footer_length);
- // read record batch metadata. The exact size is tricky to determine but it
doesn't
- // matter for this test and it should be smaller than the footer.
- EXPECT_LE(read_ranges[2].length, footer_length);
- for (uint32_t i = 0; i < expected_body_read_lengths.size(); i++) {
- EXPECT_EQ(read_ranges[3 + i].length, expected_body_read_lengths[i]);
+
+ if (included_fields.empty()) {
+ // When no fields are explicitly included, the reader optimizes by
+ // reading metadata and the entire body in a single IO.
+ // Thus, there are exactly 3 read IOs in total:
+ // 1) magic and footer length
+ // 2) footer
+ // 3) record batch metadata + body
+ EXPECT_EQ(read_ranges.size(), 3);
+
+ int64_t total_body = 0;
+ for (auto len : expected_body_read_lengths) total_body += len;
+
+ // In the optimized path (included_fields is empty), the 3rd read operation
+ // fetches both the message metadata (flatbuffer) and the entire message
body
+ // in one contiguous block. Therefore, its length must at least exceed the
+ // total body length by the size of the metadata.
+ EXPECT_GT(read_ranges[2].length, total_body);
+ EXPECT_LE(read_ranges[2].length, total_body + footer_length);
+ } else {
+ // When fields are filtered, we see 3 initial reads followed by N body
reads
+ // (one for each field/buffer range):
+ // 1) magic and footer length
+ // 2) footer
+ // 3) record batch metadata
+ // 4) individual body buffer reads
+ EXPECT_EQ(read_ranges.size(), 3 + expected_body_read_lengths.size());
+
+ // read record batch metadata. The exact size is tricky to determine but
it doesn't
+ // matter for this test and it should be smaller than the footer.
+ EXPECT_LE(read_ranges[2].length, footer_length);
+ for (uint32_t i = 0; i < expected_body_read_lengths.size(); i++) {
+ EXPECT_EQ(read_ranges[3 + i].length, expected_body_read_lengths[i]);
+ }
}
}
@@ -3186,7 +3244,9 @@ class PreBufferingTest : public
::testing::TestWithParam<bool> {
metadata_reads++;
}
}
- ASSERT_EQ(metadata_reads, reader_->num_record_batches() -
num_indices_pre_buffered);
+ // With ReadMessage optimization, non-prebuffered reads verify metadata
and body
+ // in a single large read, so we no longer see small metadata-only reads
here.
+ ASSERT_EQ(metadata_reads, 0);
ASSERT_EQ(data_reads, reader_->num_record_batches());
}
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index a47a629072..908a223a57 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -1236,9 +1236,15 @@ Result<std::unique_ptr<Message>> ReadMessageFromBlock(
const FileBlock& block, io::RandomAccessFile* file,
const FieldsLoaderFunction& fields_loader) {
RETURN_NOT_OK(CheckAligned(block));
- ARROW_ASSIGN_OR_RAISE(auto message, ReadMessage(block.offset,
block.metadata_length,
- file, fields_loader));
- return CheckBodyLength(std::move(message), block);
+ if (fields_loader) {
+ ARROW_ASSIGN_OR_RAISE(auto message, ReadMessage(block.offset,
block.metadata_length,
+ file, fields_loader));
+ return CheckBodyLength(std::move(message), block);
+ } else {
+ ARROW_ASSIGN_OR_RAISE(auto message, ReadMessage(block.offset,
block.metadata_length,
+ block.body_length, file));
+ return CheckBodyLength(std::move(message), block);
+ }
}
Future<std::shared_ptr<Message>> ReadMessageFromBlockAsync(