This is an automated email from the ASF dual-hosted git repository.

apitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7c4522872c GH-48846: [C++] Read message metadata and body in one go in 
IPC file reader (#48975)
7c4522872c is described below

commit 7c4522872c0631c6f6dfc17b10753262cb514372
Author: Abhishek Bansal <[email protected]>
AuthorDate: Tue Feb 17 19:34:06 2026 +0530

    GH-48846: [C++] Read message metadata and body in one go in IPC file reader 
(#48975)
    
    ### Rationale for this change
    
    ReadMessageAsync takes a body_length parameter and reads Message metadata + 
body in one go, but the blocking version  ReadMessage reads the body length 
from the Message and issues a second read for the body. This PR adds a 
ReadMessage overload that takes the body length as parameter and does a single 
read like the async version does. This reduces the number of IOs issued by the 
IPC file reader.
    
    ### What changes are included in this PR?
    1. Add ReadMessage overload accepting body_length
    2. Update IPC file reader to use the new ReadMessage overload when reading 
full record batches
    
    ### Are these changes tested?
    Yes, added TestReadMessage.ReadBodyWithLength and updated other tests to 
use the new overload.
    
    ### Are there any user-facing changes?
    No.
    
    * GitHub Issue: #48846
    
    Lead-authored-by: Abhishek Bansal <[email protected]>
    Co-authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/arrow/ipc/message.cc         | 48 +++++++++++++++----
 cpp/src/arrow/ipc/message.h          | 24 +++++++++-
 cpp/src/arrow/ipc/read_write_test.cc | 90 ++++++++++++++++++++++++++++++------
 cpp/src/arrow/ipc/reader.cc          | 12 +++--
 4 files changed, 146 insertions(+), 28 deletions(-)

diff --git a/cpp/src/arrow/ipc/message.cc b/cpp/src/arrow/ipc/message.cc
index 8be09956f1..c21eb913c3 100644
--- a/cpp/src/arrow/ipc/message.cc
+++ b/cpp/src/arrow/ipc/message.cc
@@ -21,6 +21,7 @@
 #include <cstddef>
 #include <cstdint>
 #include <memory>
+#include <optional>
 #include <string>
 #include <utility>
 #include <vector>
@@ -363,9 +364,13 @@ Result<std::unique_ptr<Message>> 
ReadMessage(std::shared_ptr<Buffer> metadata,
   }
 }
 
-Result<std::unique_ptr<Message>> ReadMessage(int64_t offset, int32_t 
metadata_length,
-                                             io::RandomAccessFile* file,
-                                             const FieldsLoaderFunction& 
fields_loader) {
+// Common helper for the two ReadMessage overloads that take a file + offset.
+// When body_length is provided, metadata and body are read in a single IO.
+// When body_length is absent, metadata is read first, then the body is read
+// separately.
+static Result<std::unique_ptr<Message>> ReadMessageInternal(
+    int64_t offset, int32_t metadata_length, std::optional<int64_t> 
body_length,
+    io::RandomAccessFile* file, const FieldsLoaderFunction& fields_loader) {
   std::unique_ptr<Message> result;
   auto listener = std::make_shared<AssignMessageDecoderListener>(&result);
   MessageDecoder decoder(listener);
@@ -375,15 +380,18 @@ Result<std::unique_ptr<Message>> ReadMessage(int64_t 
offset, int32_t metadata_le
                            decoder.next_required_size());
   }
 
-  // TODO(GH-48846): we should take a body_length just like ReadMessageAsync
-  // and read metadata + body in one go.
-  ARROW_ASSIGN_OR_RAISE(auto metadata, file->ReadAt(offset, metadata_length));
+  // When body_length is known, read metadata + body in one IO call.
+  // Otherwise, read only metadata first.
+  ARROW_ASSIGN_OR_RAISE(std::shared_ptr<Buffer> metadata,
+                        file->ReadAt(offset, metadata_length + 
body_length.value_or(0)));
+
   if (metadata->size() < metadata_length) {
     return Status::Invalid("Expected to read ", metadata_length,
                            " metadata bytes at offset ", offset, " but got ",
                            metadata->size());
   }
-  ARROW_RETURN_NOT_OK(decoder.Consume(metadata));
+
+  ARROW_RETURN_NOT_OK(decoder.Consume(SliceBuffer(metadata, 0, 
metadata_length)));
 
   switch (decoder.state()) {
     case MessageDecoder::State::INITIAL:
@@ -398,14 +406,23 @@ Result<std::unique_ptr<Message>> ReadMessage(int64_t 
offset, int32_t metadata_le
     case MessageDecoder::State::BODY: {
       std::shared_ptr<Buffer> body;
       if (fields_loader) {
+        // Selective field loading: allocate a body buffer and read only the
+        // requested field ranges into it.
         ARROW_ASSIGN_OR_RAISE(
             body, AllocateBuffer(decoder.next_required_size(), 
default_memory_pool()));
         RETURN_NOT_OK(ReadFieldsSubset(offset, metadata_length, file, 
fields_loader,
-                                       metadata, decoder.next_required_size(), 
body));
+                                       SliceBuffer(metadata, 0, 
metadata_length),
+                                       decoder.next_required_size(), body));
+      } else if (body_length.has_value()) {
+        // Body was already read as part of the combined IO; just slice it out.
+        body = SliceBuffer(metadata, metadata_length,
+                           std::min(*body_length, metadata->size() - 
metadata_length));
       } else {
+        // Body length was unknown; do a separate IO to read the body.
         ARROW_ASSIGN_OR_RAISE(
             body, file->ReadAt(offset + metadata_length, 
decoder.next_required_size()));
       }
+
       if (body->size() < decoder.next_required_size()) {
         return Status::IOError("Expected to be able to read ",
                                decoder.next_required_size(),
@@ -421,6 +438,21 @@ Result<std::unique_ptr<Message>> ReadMessage(int64_t 
offset, int32_t metadata_le
   }
 }
 
+Result<std::unique_ptr<Message>> ReadMessage(int64_t offset, int32_t 
metadata_length,
+                                             io::RandomAccessFile* file,
+                                             const FieldsLoaderFunction& 
fields_loader) {
+  return ReadMessageInternal(offset, metadata_length, 
/*body_length=*/std::nullopt, file,
+                             fields_loader);
+}
+
+Result<std::unique_ptr<Message>> ReadMessage(const int64_t offset,
+                                             const int32_t metadata_length,
+                                             const int64_t body_length,
+                                             io::RandomAccessFile* file) {
+  return ReadMessageInternal(offset, metadata_length, body_length, file,
+                             /*fields_loader=*/{});
+}
+
 Future<std::shared_ptr<Message>> ReadMessageAsync(int64_t offset, int32_t 
metadata_length,
                                                   int64_t body_length,
                                                   io::RandomAccessFile* file,
diff --git a/cpp/src/arrow/ipc/message.h b/cpp/src/arrow/ipc/message.h
index 1cd72ce993..df80b0eba2 100644
--- a/cpp/src/arrow/ipc/message.h
+++ b/cpp/src/arrow/ipc/message.h
@@ -449,7 +449,7 @@ class ARROW_EXPORT MessageReader {
 // org::apache::arrow::flatbuf::RecordBatch*)
 using FieldsLoaderFunction = std::function<Status(const void*, 
io::RandomAccessFile*)>;
 
-/// \brief Read encapsulated RPC message from position in file
+/// \brief Read encapsulated IPC message from position in file
 ///
 /// Read a length-prefixed message flatbuffer starting at the indicated file
 /// offset. If the message has a body with non-zero length, it will also be
@@ -469,7 +469,27 @@ Result<std::unique_ptr<Message>> ReadMessage(
     const int64_t offset, const int32_t metadata_length, io::RandomAccessFile* 
file,
     const FieldsLoaderFunction& fields_loader = {});
 
-/// \brief Read encapsulated RPC message from cached buffers
+/// \brief Read encapsulated IPC message from position in file
+///
+/// Read a length-prefixed message flatbuffer starting at the indicated file
+/// offset.
+///
+/// The metadata_length includes at least the length prefix and the flatbuffer
+///
+/// \param[in] offset the position in the file where the message starts. The
+/// first 4 bytes after the offset are the message length
+/// \param[in] metadata_length the total number of bytes to read from file
+/// \param[in] body_length the number of bytes for the message body
+/// \param[in] file the seekable file interface to read from
+/// \return the message read
+
+ARROW_EXPORT
+Result<std::unique_ptr<Message>> ReadMessage(const int64_t offset,
+                                             const int32_t metadata_length,
+                                             const int64_t body_length,
+                                             io::RandomAccessFile* file);
+
+/// \brief Read encapsulated IPC message from cached buffers
 ///
 /// The buffers should contain an entire message.  Partial reads are not 
handled.
 ///
diff --git a/cpp/src/arrow/ipc/read_write_test.cc 
b/cpp/src/arrow/ipc/read_write_test.cc
index 9f7df541bd..86cd0e06ab 100644
--- a/cpp/src/arrow/ipc/read_write_test.cc
+++ b/cpp/src/arrow/ipc/read_write_test.cc
@@ -552,9 +552,15 @@ class TestIpcRoundTrip : public 
::testing::TestWithParam<MakeRecordBatch*>,
     ASSERT_OK(WriteRecordBatch(*batch, buffer_offset, mmap_.get(), 
&metadata_length,
                                &body_length, options_));
 
-    ASSERT_OK_AND_ASSIGN(std::unique_ptr<Message> message,
+    ASSERT_OK_AND_ASSIGN(std::unique_ptr<Message> message1,
                          ReadMessage(0, metadata_length, mmap_.get()));
-    ASSERT_EQ(expected_version, message->metadata_version());
+    ASSERT_EQ(expected_version, message1->metadata_version());
+
+    ASSERT_OK_AND_ASSIGN(auto message2,
+                         ReadMessage(0, metadata_length, body_length, 
mmap_.get()));
+    ASSERT_EQ(expected_version, message2->metadata_version());
+
+    ASSERT_TRUE(message1->Equals(*message2));
   }
 };
 
@@ -613,6 +619,27 @@ TEST(TestReadMessage, CorruptedSmallInput) {
   ASSERT_EQ(nullptr, message);
 }
 
+TEST(TestReadMessage, ReadBodyWithLength) {
+  // Test the optimized ReadMessage(offset, meta_len, body_len, file) overload
+  std::shared_ptr<RecordBatch> batch;
+  ASSERT_OK(MakeIntRecordBatch(&batch));
+
+  ASSERT_OK_AND_ASSIGN(auto stream, io::BufferOutputStream::Create(0));
+  int32_t metadata_length;
+  int64_t body_length;
+  ASSERT_OK(WriteRecordBatch(*batch, 0, stream.get(), &metadata_length, 
&body_length,
+                             IpcWriteOptions::Defaults()));
+
+  ASSERT_OK_AND_ASSIGN(auto buffer, stream->Finish());
+  io::BufferReader reader(buffer);
+
+  ASSERT_OK_AND_ASSIGN(auto message,
+                       ReadMessage(0, metadata_length, body_length, &reader));
+
+  ASSERT_EQ(body_length, message->body_length());
+  ASSERT_TRUE(message->Verify());
+}
+
 TEST(TestMetadata, GetMetadataVersion) {
   ASSERT_EQ(MetadataVersion::V1, ipc::internal::GetMetadataVersion(
                                      
flatbuf::MetadataVersion::MetadataVersion_V1));
@@ -1094,7 +1121,7 @@ TEST_F(RecursionLimits, ReadLimit) {
                         &schema));
 
   ASSERT_OK_AND_ASSIGN(std::unique_ptr<Message> message,
-                       ReadMessage(0, metadata_length, mmap_.get()));
+                       ReadMessage(0, metadata_length, body_length, 
mmap_.get()));
 
   io::BufferReader reader(message->body());
 
@@ -1119,7 +1146,7 @@ TEST_F(RecursionLimits, StressLimit) {
                           &schema));
 
     ASSERT_OK_AND_ASSIGN(std::unique_ptr<Message> message,
-                         ReadMessage(0, metadata_length, mmap_.get()));
+                         ReadMessage(0, metadata_length, body_length, 
mmap_.get()));
 
     DictionaryMemo empty_memo;
 
@@ -3018,25 +3045,56 @@ void GetReadRecordBatchReadRanges(
 
   auto read_ranges = tracked->get_read_ranges();
 
-  // there are 3 read IOs before reading body:
-  // 1) read magic and footer length IO
-  // 2) read footer IO
-  // 3) read record batch metadata IO
-  EXPECT_EQ(read_ranges.size(), 3 + expected_body_read_lengths.size());
   const int32_t magic_size = 
static_cast<int>(strlen(ipc::internal::kArrowMagicBytes));
   // read magic and footer length IO
   auto file_end_size = magic_size + sizeof(int32_t);
   auto footer_length_offset = buffer->size() - file_end_size;
   auto footer_length = bit_util::FromLittleEndian(
       util::SafeLoadAs<int32_t>(buffer->data() + footer_length_offset));
+
+  // there are at least 2 read IOs before reading body:
+  // 1) read magic and footer length IO
+  // 2) footer IO
+  EXPECT_GE(read_ranges.size(), 2);
+
+  // read magic and footer length IO
   EXPECT_EQ(read_ranges[0].length, file_end_size);
   // read footer IO
   EXPECT_EQ(read_ranges[1].length, footer_length);
-  // read record batch metadata.  The exact size is tricky to determine but it 
doesn't
-  // matter for this test and it should be smaller than the footer.
-  EXPECT_LE(read_ranges[2].length, footer_length);
-  for (uint32_t i = 0; i < expected_body_read_lengths.size(); i++) {
-    EXPECT_EQ(read_ranges[3 + i].length, expected_body_read_lengths[i]);
+
+  if (included_fields.empty()) {
+    // When no fields are explicitly included, the reader optimizes by
+    // reading metadata and the entire body in a single IO.
+    // Thus, there are exactly 3 read IOs in total:
+    // 1) magic and footer length
+    // 2) footer
+    // 3) record batch metadata + body
+    EXPECT_EQ(read_ranges.size(), 3);
+
+    int64_t total_body = 0;
+    for (auto len : expected_body_read_lengths) total_body += len;
+
+    // In the optimized path (included_fields is empty), the 3rd read operation
+    // fetches both the message metadata (flatbuffer) and the entire message 
body
+    // in one contiguous block. Therefore, its length must at least exceed the
+    // total body length by the size of the metadata.
+    EXPECT_GT(read_ranges[2].length, total_body);
+    EXPECT_LE(read_ranges[2].length, total_body + footer_length);
+  } else {
+    // When fields are filtered, we see 3 initial reads followed by N body 
reads
+    // (one for each field/buffer range):
+    // 1) magic and footer length
+    // 2) footer
+    // 3) record batch metadata
+    // 4) individual body buffer reads
+    EXPECT_EQ(read_ranges.size(), 3 + expected_body_read_lengths.size());
+
+    // read record batch metadata.  The exact size is tricky to determine but 
it doesn't
+    // matter for this test and it should be smaller than the footer.
+    EXPECT_LE(read_ranges[2].length, footer_length);
+    for (uint32_t i = 0; i < expected_body_read_lengths.size(); i++) {
+      EXPECT_EQ(read_ranges[3 + i].length, expected_body_read_lengths[i]);
+    }
   }
 }
 
@@ -3186,7 +3244,9 @@ class PreBufferingTest : public 
::testing::TestWithParam<bool> {
         metadata_reads++;
       }
     }
-    ASSERT_EQ(metadata_reads, reader_->num_record_batches() - 
num_indices_pre_buffered);
+    // With ReadMessage optimization, non-prebuffered reads verify metadata 
and body
+    // in a single large read, so we no longer see small metadata-only reads 
here.
+    ASSERT_EQ(metadata_reads, 0);
     ASSERT_EQ(data_reads, reader_->num_record_batches());
   }
 
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index a47a629072..908a223a57 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -1236,9 +1236,15 @@ Result<std::unique_ptr<Message>> ReadMessageFromBlock(
     const FileBlock& block, io::RandomAccessFile* file,
     const FieldsLoaderFunction& fields_loader) {
   RETURN_NOT_OK(CheckAligned(block));
-  ARROW_ASSIGN_OR_RAISE(auto message, ReadMessage(block.offset, 
block.metadata_length,
-                                                  file, fields_loader));
-  return CheckBodyLength(std::move(message), block);
+  if (fields_loader) {
+    ARROW_ASSIGN_OR_RAISE(auto message, ReadMessage(block.offset, 
block.metadata_length,
+                                                    file, fields_loader));
+    return CheckBodyLength(std::move(message), block);
+  } else {
+    ARROW_ASSIGN_OR_RAISE(auto message, ReadMessage(block.offset, 
block.metadata_length,
+                                                    block.body_length, file));
+    return CheckBodyLength(std::move(message), block);
+  }
 }
 
 Future<std::shared_ptr<Message>> ReadMessageFromBlockAsync(

Reply via email to