This is an automated email from the ASF dual-hosted git repository.

pitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new de5b933764 GH-49966: [C++] Detect different endianness between IPC 
file and stream in IPC file fuzzer (#49968)
de5b933764 is described below

commit de5b9337649dad11d5f6ecd6de5dd480d7fe4fe9
Author: Antoine Pitrou <[email protected]>
AuthorDate: Thu Jun 4 18:42:30 2026 +0200

    GH-49966: [C++] Detect different endianness between IPC file and stream in 
IPC file fuzzer (#49968)
    
    ### Rationale for this change
    
    In the IPC file format, the IPC file footer has a copy of the embedded IPC 
stream's schema. However, the two copies may be different in case of an 
invalid/corrupted IPC file. The fuzzer would then fail with differing contents 
between IPC file and stream, as only one of them would have undergone 
endianness swapping.
    
    Issue found by OSS-Fuzz: https://issues.oss-fuzz.com/issues/506111650
    
    ### What changes are included in this PR?
    
    1. Add a member variable in `ipc::ReadStats` that reflects the original 
endianness of the IPC schema (before the endianness was normalized to native)
    2. Compare the original endianness of the IPC stream and file schemas in 
the IPC file fuzzer, and skip comparing the contents if the endianness differs
    3. Add unit tests for endianness detection and conversion based on the 
corresponding "golden" integration files
    
    ### Are these changes tested?
    
    Yes, by additional unit tests and additional fuzz regression file.
    
    ### Are there any user-facing changes?
    
    No.
    
    * GitHub Issue: #49966
    
    Authored-by: Antoine Pitrou <[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/arrow/flight/test_util.cc    |  14 +--
 cpp/src/arrow/ipc/CMakeLists.txt     |   1 +
 cpp/src/arrow/ipc/endianness_test.cc | 175 +++++++++++++++++++++++++++++++++++
 cpp/src/arrow/ipc/read_write_test.cc |  10 +-
 cpp/src/arrow/ipc/reader.cc          |  38 ++++++--
 cpp/src/arrow/ipc/reader.h           |   5 +
 cpp/src/arrow/testing/util.cc        |  17 +++-
 cpp/src/arrow/testing/util.h         |   5 +-
 testing                              |   2 +-
 9 files changed, 233 insertions(+), 34 deletions(-)

diff --git a/cpp/src/arrow/flight/test_util.cc 
b/cpp/src/arrow/flight/test_util.cc
index 46c8f4c984..8635f47306 100644
--- a/cpp/src/arrow/flight/test_util.cc
+++ b/cpp/src/arrow/flight/test_util.cc
@@ -265,8 +265,7 @@ std::vector<ActionType> ExampleActionTypes() {
 }
 
 Status ExampleTlsCertificates(std::vector<CertKeyPair>* out) {
-  std::string root;
-  RETURN_NOT_OK(GetTestResourceRoot(&root));
+  ARROW_ASSIGN_OR_RAISE(auto root, GetTestResourceRoot());
 
   *out = std::vector<CertKeyPair>();
   for (int i = 0; i < 2; i++) {
@@ -299,16 +298,11 @@ Status ExampleTlsCertificates(std::vector<CertKeyPair>* 
out) {
 }
 
 Status ExampleTlsCertificateRoot(CertKeyPair* out) {
-  std::string root;
-  RETURN_NOT_OK(GetTestResourceRoot(&root));
-
-  std::stringstream path;
-  path << root << "/flight/root-ca.pem";
-
+  ARROW_ASSIGN_OR_RAISE(auto path, GetTestResourcePath("flight/root-ca.pem"));
   try {
-    std::ifstream cert_file(path.str());
+    std::ifstream cert_file(path);
     if (!cert_file) {
-      return Status::IOError("Could not open certificate: " + path.str());
+      return Status::IOError("Could not open certificate: " + path);
     }
     std::stringstream cert;
     cert << cert_file.rdbuf();
diff --git a/cpp/src/arrow/ipc/CMakeLists.txt b/cpp/src/arrow/ipc/CMakeLists.txt
index 6e73c71d89..af8f776781 100644
--- a/cpp/src/arrow/ipc/CMakeLists.txt
+++ b/cpp/src/arrow/ipc/CMakeLists.txt
@@ -38,6 +38,7 @@ function(ADD_ARROW_IPC_TEST REL_TEST_NAME)
 endfunction()
 
 add_arrow_test(feather_test)
+add_arrow_ipc_test(endianness_test)
 add_arrow_ipc_test(message_internal_test)
 add_arrow_ipc_test(read_write_test)
 add_arrow_ipc_test(tensor_test)
diff --git a/cpp/src/arrow/ipc/endianness_test.cc 
b/cpp/src/arrow/ipc/endianness_test.cc
new file mode 100644
index 0000000000..3f84345cb9
--- /dev/null
+++ b/cpp/src/arrow/ipc/endianness_test.cc
@@ -0,0 +1,175 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <limits>
+#include <memory>
+#include <string>
+
+#include <gtest/gtest.h>
+
+#include "arrow/io/file.h"
+#include "arrow/io/test_common.h"
+#include "arrow/ipc/reader.h"
+#include "arrow/status.h"
+#include "arrow/table.h"
+#include "arrow/testing/gtest_util.h"
+#include "arrow/testing/util.h"
+#include "arrow/type_fwd.h"
+
+namespace arrow::ipc::test {
+
+struct EndiannessTestParam {
+  std::string integration_file_basename;
+  // Whether it's still safe to access data in non-native endianness.
+  // This is false for any type that involves indirect addressing through 
offsets.
+  bool can_access_foreign_endian = false;
+  // Escape hatch for integration files with invalid data (e.g. decimal values
+  // exceeding the advertised precision).
+  bool can_validate_full = true;
+};
+
+// To avoid Valgrind errors
+void PrintTo(const EndiannessTestParam& v, std::ostream* os) {
+  *os << v.integration_file_basename;
+}
+
+class TestEndianness : public ::testing::TestWithParam<EndiannessTestParam> {
+ public:
+  void SetUp() { param_ = GetParam(); }
+
+  std::string GetIpcFilePath(Endianness endianness) const {
+    return GetEndiannessBasePath(endianness) + "/" + 
param_.integration_file_basename +
+           ".arrow_file";
+  }
+
+  std::string GetIpcStreamPath(Endianness endianness) const {
+    return GetEndiannessBasePath(endianness) + "/" + 
param_.integration_file_basename +
+           ".stream";
+  }
+
+  Endianness native_endianness() const { return Endianness::Native; }
+
+  Endianness foreign_endianness() const {
+    return Endianness::Native == Endianness::Little ? Endianness::Big
+                                                    : Endianness::Little;
+  }
+
+  template <typename OpenReaderFunc>
+  void TestReader(OpenReaderFunc&& open_reader) {
+    ARROW_SCOPED_TRACE("Native endianness");
+    ASSERT_OK_AND_ASSIGN(
+        auto reader, open_reader(param_.integration_file_basename, 
native_endianness(),
+                                 /*ensure_native_endian=*/false));
+    ASSERT_OK_AND_ASSIGN(auto native_table, reader->ToTable());
+    ASSERT_EQ(reader->stats().original_endianness, native_endianness());
+    if (param_.can_validate_full) {
+      ASSERT_OK(native_table->ValidateFull());
+    }
+
+    for (const bool ensure_native_endian : {true, false}) {
+      ARROW_SCOPED_TRACE("Foreign endianness: ensure_native_endian = ",
+                         ensure_native_endian);
+      ASSERT_OK_AND_ASSIGN(
+          auto reader, open_reader(param_.integration_file_basename, 
foreign_endianness(),
+                                   ensure_native_endian));
+      ASSERT_OK_AND_ASSIGN(auto foreign_table, reader->ToTable());
+      ASSERT_NE(reader->stats().original_endianness, native_endianness());
+      ASSERT_EQ(reader->stats().original_endianness, foreign_endianness());
+      if (ensure_native_endian) {
+        if (param_.can_validate_full) {
+          ASSERT_OK(foreign_table->ValidateFull());
+        }
+        AssertTablesEqual(*native_table, *foreign_table);
+      } else if (param_.can_access_foreign_endian) {
+        ASSERT_FALSE(foreign_table->Equals(*native_table));
+      }
+    }
+  }
+
+ protected:
+  std::string GetEndiannessBasePath(Endianness endianness) const {
+    return endianness == Endianness::Little
+               ? "arrow-ipc-stream/integration/1.0.0-littleendian"
+               : "arrow-ipc-stream/integration/1.0.0-bigendian";
+  }
+
+  EndiannessTestParam param_;
+};
+
+TEST_P(TestEndianness, StreamReader) {
+  auto open_stream = [this](const std::string& basename, Endianness endianness,
+                            bool ensure_native_endian)
+      -> Result<std::shared_ptr<arrow::ipc::RecordBatchStreamReader>> {
+    auto options = IpcReadOptions::Defaults();
+    options.ensure_native_endian = ensure_native_endian;
+    ARROW_ASSIGN_OR_RAISE(auto path, 
GetTestResourcePath(GetIpcStreamPath(endianness)));
+    ARROW_ASSIGN_OR_RAISE(auto file, io::ReadableFile::Open(path));
+    ARROW_ASSIGN_OR_RAISE(auto reader, RecordBatchStreamReader::Open(file, 
options));
+    return reader;
+  };
+
+  TestReader(open_stream);
+}
+
+TEST_P(TestEndianness, FileReader) {
+  auto open_file = [this](const std::string& basename, Endianness endianness,
+                          bool ensure_native_endian)
+      -> Result<std::shared_ptr<arrow::ipc::RecordBatchFileReader>> {
+    auto options = IpcReadOptions::Defaults();
+    options.ensure_native_endian = ensure_native_endian;
+    ARROW_ASSIGN_OR_RAISE(auto path, 
GetTestResourcePath(GetIpcFilePath(endianness)));
+    ARROW_ASSIGN_OR_RAISE(auto file, io::ReadableFile::Open(path));
+    ARROW_ASSIGN_OR_RAISE(auto reader, RecordBatchFileReader::Open(file, 
options));
+    return reader;
+  };
+
+  TestReader(open_file);
+}
+
+const std::vector<EndiannessTestParam> kEndiannessTestParams{
+    {"generated_datetime", /*can_access_foreign_endian=*/true,
+     /*can_validate_full=*/false},
+    {"generated_decimal", /*can_access_foreign_endian=*/true,
+     /*can_validate_full=*/false},
+    {"generated_decimal256", /*can_access_foreign_endian=*/true,
+     /*can_validate_full=*/false},
+    {"generated_dictionary"},
+    {"generated_dictionary_unsigned"},
+    {"generated_extension"},
+    {"generated_interval", /*can_access_foreign_endian=*/true},
+    {"generated_map"},
+    {"generated_nested"},
+    {"generated_nested_dictionary"},
+    {"generated_nested_large_offsets"},
+    {"generated_null", /*can_access_foreign_endian=*/true},
+    {"generated_null_trivial", /*can_access_foreign_endian=*/true},
+    {"generated_primitive"},
+    {"generated_primitive_large_offsets"},
+    {"generated_primitive_no_batches"},
+    {"generated_primitive_zerolength"},
+    {"generated_recursive_nested"},
+    {"generated_union"},
+};
+
+INSTANTIATE_TEST_SUITE_P(TestEndianness, TestEndianness,
+                         ::testing::ValuesIn(kEndiannessTestParams),
+                         [](const 
::testing::TestParamInfo<EndiannessTestParam>& info) {
+                           return info.param.integration_file_basename;
+                         });
+
+}  // namespace arrow::ipc::test
diff --git a/cpp/src/arrow/ipc/read_write_test.cc 
b/cpp/src/arrow/ipc/read_write_test.cc
index 15cf0258b2..74e16dd8b1 100644
--- a/cpp/src/arrow/ipc/read_write_test.cc
+++ b/cpp/src/arrow/ipc/read_write_test.cc
@@ -361,14 +361,10 @@ TEST_F(TestSchemaMetadata, KeyValueMetadata) {
 
 TEST_F(TestSchemaMetadata, MetadataVersionForwardCompatibility) {
   // ARROW-9399
-  std::string root;
-  ASSERT_OK(GetTestResourceRoot(&root));
-
   // schema_v6.arrow with currently nonexistent MetadataVersion::V6
-  std::stringstream schema_v6_path;
-  schema_v6_path << root << "/forward-compatibility/schema_v6.arrow";
-
-  ASSERT_OK_AND_ASSIGN(auto schema_v6_file, 
io::ReadableFile::Open(schema_v6_path.str()));
+  ASSERT_OK_AND_ASSIGN(auto schema_v6_path,
+                       
GetTestResourcePath("forward-compatibility/schema_v6.arrow"));
+  ASSERT_OK_AND_ASSIGN(auto schema_v6_file, 
io::ReadableFile::Open(schema_v6_path));
 
   DictionaryMemo placeholder_memo;
   ASSERT_RAISES(Invalid, ReadSchema(schema_v6_file.get(), &placeholder_memo));
diff --git a/cpp/src/arrow/ipc/reader.cc b/cpp/src/arrow/ipc/reader.cc
index 512305d657..22a96111f4 100644
--- a/cpp/src/arrow/ipc/reader.cc
+++ b/cpp/src/arrow/ipc/reader.cc
@@ -857,13 +857,15 @@ Status UnpackSchemaMessage(const void* opaque_schema, 
const IpcReadOptions& opti
                            DictionaryMemo* dictionary_memo,
                            std::shared_ptr<Schema>* schema,
                            std::shared_ptr<Schema>* out_schema,
-                           std::vector<bool>* field_inclusion_mask, bool* 
swap_endian) {
+                           std::vector<bool>* field_inclusion_mask,
+                           Endianness* original_endianness, bool* swap_endian) 
{
   RETURN_NOT_OK(internal::GetSchema(opaque_schema, dictionary_memo, schema));
 
   // If we are selecting only certain fields, populate the inclusion mask now
   // for fast lookups
   RETURN_NOT_OK(GetInclusionMaskAndOutSchema(*schema, options.included_fields,
                                              field_inclusion_mask, 
out_schema));
+  *original_endianness = out_schema->get()->endianness();
   *swap_endian = options.ensure_native_endian && 
!out_schema->get()->is_native_endian();
   if (*swap_endian) {
     // create a new schema with native endianness before swapping endian in 
ArrayData
@@ -877,12 +879,14 @@ Status UnpackSchemaMessage(const Message& message, const 
IpcReadOptions& options
                            DictionaryMemo* dictionary_memo,
                            std::shared_ptr<Schema>* schema,
                            std::shared_ptr<Schema>* out_schema,
-                           std::vector<bool>* field_inclusion_mask, bool* 
swap_endian) {
+                           std::vector<bool>* field_inclusion_mask,
+                           Endianness* original_endianness, bool* swap_endian) 
{
   CHECK_MESSAGE_TYPE(MessageType::SCHEMA, message.type());
   CHECK_HAS_NO_BODY(message);
 
   return UnpackSchemaMessage(message.header(), options, dictionary_memo, 
schema,
-                             out_schema, field_inclusion_mask, swap_endian);
+                             out_schema, field_inclusion_mask, 
original_endianness,
+                             swap_endian);
 }
 
 Status ReadDictionary(const Buffer& metadata, IpcReadContext context,
@@ -1061,7 +1065,7 @@ class StreamDecoderInternal : public 
MessageDecoderListener {
   Status OnSchemaMessageDecoded(std::unique_ptr<Message> message) {
     RETURN_NOT_OK(UnpackSchemaMessage(*message, options_, &dictionary_memo_, 
&schema_,
                                       &filtered_schema_, 
&field_inclusion_mask_,
-                                      &swap_endian_));
+                                      &stats_.original_endianness, 
&swap_endian_));
 
     num_required_initial_dictionaries_ = dictionary_memo_.fields().num_dicts();
     num_read_initial_dictionaries_ = 0;
@@ -1498,7 +1502,7 @@ class RecordBatchFileReaderImpl : public 
RecordBatchFileReader {
     // Get the schema and record any observed dictionaries
     RETURN_NOT_OK(UnpackSchemaMessage(footer_->schema(), options, 
&dictionary_memo_,
                                       &schema_, &out_schema_, 
&field_inclusion_mask_,
-                                      &swap_endian_));
+                                      &original_endianness_, &swap_endian_));
     stats_.num_messages.fetch_add(1, std::memory_order_relaxed);
     return Status::OK();
   }
@@ -1528,7 +1532,8 @@ class RecordBatchFileReaderImpl : public 
RecordBatchFileReader {
       // Get the schema and record any observed dictionaries
       RETURN_NOT_OK(UnpackSchemaMessage(
           self->footer_->schema(), options, &self->dictionary_memo_, 
&self->schema_,
-          &self->out_schema_, &self->field_inclusion_mask_, 
&self->swap_endian_));
+          &self->out_schema_, &self->field_inclusion_mask_, 
&self->original_endianness_,
+          &self->swap_endian_));
       self->stats_.num_messages.fetch_add(1, std::memory_order_relaxed);
       return Status::OK();
     });
@@ -1538,7 +1543,11 @@ class RecordBatchFileReaderImpl : public 
RecordBatchFileReader {
 
   std::shared_ptr<const KeyValueMetadata> metadata() const override { return 
metadata_; }
 
-  ReadStats stats() const override { return stats_.poll(); }
+  ReadStats stats() const override {
+    auto stats = stats_.poll();
+    stats.original_endianness = original_endianness_;
+    return stats;
+  }
 
   Result<AsyncGenerator<std::shared_ptr<RecordBatch>>> GetRecordBatchGenerator(
       const bool coalesce, const io::IOContext& io_context,
@@ -1629,7 +1638,10 @@ class RecordBatchFileReaderImpl : public 
RecordBatchFileReader {
   };
 
   Result<FileBlock> GetRecordBatchBlock(int i) const {
-    return FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i), 
footer_offset_);
+    ARROW_ASSIGN_OR_RAISE(
+        auto block,
+        FileBlockFromFlatbuffer(footer_->recordBatches()->Get(i), 
footer_offset_));
+    return block;
   }
 
   Result<FileBlock> GetDictionaryBlock(int i) const {
@@ -1951,6 +1963,7 @@ class RecordBatchFileReaderImpl : public 
RecordBatchFileReader {
   std::shared_ptr<Schema> out_schema_;
 
   AtomicReadStats stats_;
+  Endianness original_endianness_;
   std::shared_ptr<io::internal::ReadRangeCache> metadata_cache_;
   std::unordered_set<int> cached_data_blocks_;
   Future<> dictionary_load_finished_;
@@ -2792,6 +2805,7 @@ Status FuzzIpcFile(const uint8_t* data, int64_t size) {
   struct IpcReadResult {
     std::shared_ptr<Schema> schema;
     std::vector<RecordBatchWithMetadata> batches = {};
+    ReadStats stats = {};
   };
 
   // Try to read the IPC file as a stream to compare the results (differential 
fuzzing)
@@ -2813,7 +2827,7 @@ Status FuzzIpcFile(const uint8_t* data, int64_t size) {
       RETURN_NOT_OK(ValidateFuzzBatch(batch));
       batches.push_back(batch);
     }
-    return IpcReadResult{batch_reader->schema(), batches};
+    return IpcReadResult{batch_reader->schema(), batches, 
batch_reader->stats()};
   };
 
   auto do_file_read = [&](bool pre_buffer) -> Result<IpcReadResult> {
@@ -2836,6 +2850,7 @@ Status FuzzIpcFile(const uint8_t* data, int64_t size) {
       result.batches.push_back(batch);
     }
     RETURN_NOT_OK(st);
+    result.stats = batch_reader->stats();
     return result;
   };
 
@@ -2887,6 +2902,11 @@ Status FuzzIpcFile(const uint8_t* data, int64_t size) {
         final_status &= Status::Invalid(
             "Different number of batches between IPC stream and IPC file 
footer, "
             "skipping comparison");
+      } else if (maybe_read_result->stats.original_endianness !=
+                 maybe_stream_result->stats.original_endianness) {
+        final_status &= Status::Invalid(
+            "Different endianness between IPC stream and IPC file footer, "
+            "skipping comparison");
       } else {
         compare_result(*maybe_stream_result);
       }
diff --git a/cpp/src/arrow/ipc/reader.h b/cpp/src/arrow/ipc/reader.h
index 888f59a627..b5184e967c 100644
--- a/cpp/src/arrow/ipc/reader.h
+++ b/cpp/src/arrow/ipc/reader.h
@@ -59,6 +59,11 @@ struct ReadStats {
   /// Number of replaced dictionaries (i.e. where a dictionary batch replaces
   /// an existing dictionary with an unrelated new dictionary).
   int64_t num_replaced_dictionaries = 0;
+
+  /// The original endianness of the schema in the IPC file or stream.
+  /// This is not reflected in the schema returned by the RecordBatchReader
+  /// unless IpcReadOptions.ensure_native_endian is set to false.
+  Endianness original_endianness = Endianness::Native;
 };
 
 /// \brief Synchronous batch stream reader that reads from io::InputStream
diff --git a/cpp/src/arrow/testing/util.cc b/cpp/src/arrow/testing/util.cc
index 062028481f..5edec7cd21 100644
--- a/cpp/src/arrow/testing/util.cc
+++ b/cpp/src/arrow/testing/util.cc
@@ -112,14 +112,21 @@ Status MakeRandomByteBuffer(int64_t length, MemoryPool* 
pool,
   return Status::OK();
 }
 
-Status GetTestResourceRoot(std::string* out) {
-  const char* c_root = std::getenv("ARROW_TEST_DATA");
-  if (!c_root) {
+Result<std::string> GetTestResourceRoot() {
+  auto maybe_var = ::arrow::internal::GetEnvVar("ARROW_TEST_DATA");
+  if (maybe_var.status().IsKeyError()) {
     return Status::IOError(
         "Test resources not found, set ARROW_TEST_DATA to <repo 
root>/testing/data");
   }
-  *out = std::string(c_root);
-  return Status::OK();
+  return maybe_var;
+}
+
+Result<std::string> GetTestResourcePath(std::string subpath) {
+  ARROW_ASSIGN_OR_RAISE(auto root, GetTestResourceRoot());
+  if (!root.ends_with('/') && !subpath.starts_with('/')) {
+    root += '/';
+  }
+  return root + subpath;
 }
 
 // TODO(GH-48593): Remove when libc++ supports std::chrono timezones.
diff --git a/cpp/src/arrow/testing/util.h b/cpp/src/arrow/testing/util.h
index aa948e0e3e..729ef27623 100644
--- a/cpp/src/arrow/testing/util.h
+++ b/cpp/src/arrow/testing/util.h
@@ -31,6 +31,7 @@
 
 #include "arrow/buffer.h"
 #include "arrow/record_batch.h"
+#include "arrow/result.h"
 #include "arrow/status.h"
 #include "arrow/testing/visibility.h"
 #include "arrow/type_fwd.h"
@@ -109,8 +110,8 @@ UnionTypeFactories() {
 }
 
 // Return the value of the ARROW_TEST_DATA environment variable or return error
-// Status
-ARROW_TESTING_EXPORT Status GetTestResourceRoot(std::string*);
+ARROW_TESTING_EXPORT Result<std::string> GetTestResourceRoot();
+ARROW_TESTING_EXPORT Result<std::string> GetTestResourcePath(std::string 
subpath);
 
 /// \deprecated Deprecated in 24.0.0. Only needed for Clang/libc++ on Windows.
 // TODO(GH-48593): Remove when libc++ supports std::chrono timezones.
diff --git a/testing b/testing
index 9bf001b7d6..9cfebfef89 160000
--- a/testing
+++ b/testing
@@ -1 +1 @@
-Subproject commit 9bf001b7d6ad318e222c06e760bccf480f2f550c
+Subproject commit 9cfebfef8982fb8612e0a2c59059752bd32321a3

Reply via email to