This is an automated email from the ASF dual-hosted git repository.

pitrou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 55958e01be GH-50156: [C++][Parquet] Ignore min/max for unknown column 
order (#50157)
55958e01be is described below

commit 55958e01be79ff3b913e749fe53de152520960ef
Author: Gang Wu <[email protected]>
AuthorDate: Tue Jun 16 15:37:18 2026 +0800

    GH-50156: [C++][Parquet] Ignore min/max for unknown column order (#50157)
    
    ### Rationale for this change
    
    Parquet column order defines how min/max statistics should be interpreted. 
If a reader sees an unsupported ColumnOrder, it cannot safely use chunk 
statistics or page index min/max values for that column.
    
    ### What changes are included in this PR?
    
    - Added an internal `ColumnOrder::UNKNOWN` state for unsupported thrift 
column order.
    - Kept missing ColumnOrder as `UNDEFINED`, preserving legacy min/max 
behavior.
    - Ignored chunk-level min/max statistics when column order or sort order is 
unknown.
    - Exposed `ColumnDescriptor::can_use_min_max()` to indicate whether min/max 
fields can be trusted.
    
    ### Are these changes tested?
    
    Added regression tests for unsupported column order, missing column order, 
and page index guards.
    
    ### Are there any user-facing changes?
    
    No.
    
    * GitHub Issue: #50156
    
    Lead-authored-by: Gang Wu <[email protected]>
    Co-authored-by: Copilot Autofix powered by AI 
<[email protected]>
    Signed-off-by: Antoine Pitrou <[email protected]>
---
 cpp/src/parquet/column_io_benchmark.cc   |  3 +-
 cpp/src/parquet/column_reader.cc         | 35 ++++++++----
 cpp/src/parquet/column_reader.h          | 10 ++++
 cpp/src/parquet/column_writer_test.cc    |  9 +--
 cpp/src/parquet/file_deserialize_test.cc | 34 ++++++++---
 cpp/src/parquet/file_reader.cc           |  5 +-
 cpp/src/parquet/metadata.cc              | 78 +++++++++++++------------
 cpp/src/parquet/metadata_test.cc         | 98 ++++++++++++++++++++++++++++++++
 cpp/src/parquet/schema.h                 | 16 ++++++
 cpp/src/parquet/schema_test.cc           | 26 +++++++++
 cpp/src/parquet/statistics_test.cc       |  7 +++
 cpp/src/parquet/thrift_internal.h        | 40 ++++++++++---
 cpp/src/parquet/types.cc                 |  1 +
 cpp/src/parquet/types.h                  | 12 +++-
 14 files changed, 303 insertions(+), 71 deletions(-)

diff --git a/cpp/src/parquet/column_io_benchmark.cc 
b/cpp/src/parquet/column_io_benchmark.cc
index 4b29a1284d..0dc1b0f8c9 100644
--- a/cpp/src/parquet/column_io_benchmark.cc
+++ b/cpp/src/parquet/column_io_benchmark.cc
@@ -161,7 +161,8 @@ std::shared_ptr<Int64Reader> 
BuildReader(std::shared_ptr<Buffer>& buffer,
                                          int64_t num_values, Compression::type 
codec,
                                          ColumnDescriptor* schema) {
   auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
-  std::unique_ptr<PageReader> page_reader = PageReader::Open(source, 
num_values, codec);
+  std::unique_ptr<PageReader> page_reader =
+      PageReader::Open(source, num_values, codec, ReaderProperties(), *schema);
   return std::static_pointer_cast<Int64Reader>(
       ColumnReader::Make(schema, std::move(page_reader)));
 }
diff --git a/cpp/src/parquet/column_reader.cc b/cpp/src/parquet/column_reader.cc
index 79b837f755..7241632e4b 100644
--- a/cpp/src/parquet/column_reader.cc
+++ b/cpp/src/parquet/column_reader.cc
@@ -200,10 +200,10 @@ namespace {
 
 // Extracts encoded statistics from V1 and V2 data page headers
 template <typename H>
-EncodedStatistics ExtractStatsFromHeader(const H& header) {
+EncodedStatistics ExtractStatsFromHeader(const H& header, 
StatisticsMinMaxField min_max) {
   EncodedStatistics page_statistics;
   if (header.__isset.statistics) {
-    page_statistics = FromThrift(header.statistics);
+    page_statistics = FromThrift(header.statistics, min_max);
   }
   return page_statistics;
 }
@@ -225,13 +225,15 @@ class SerializedPageReader : public PageReader {
  public:
   SerializedPageReader(std::shared_ptr<ArrowInputStream> stream, int64_t 
total_num_values,
                        Compression::type codec, const ReaderProperties& 
properties,
-                       const CryptoContext* crypto_ctx, bool always_compressed)
+                       const CryptoContext* crypto_ctx, bool always_compressed,
+                       StatisticsMinMaxField stats_min_max_field)
       : properties_(properties),
         stream_(std::move(stream)),
         decompression_buffer_(AllocateBuffer(properties_.memory_pool(), 0)),
         page_ordinal_(0),
         seen_num_values_(0),
-        total_num_values_(total_num_values) {
+        total_num_values_(total_num_values),
+        stats_min_max_field_(stats_min_max_field) {
     if (crypto_ctx != nullptr) {
       crypto_ctx_ = *crypto_ctx;
       InitDecryption();
@@ -302,6 +304,8 @@ class SerializedPageReader : public PageReader {
   // Number of values in all the data pages
   int64_t total_num_values_;
 
+  StatisticsMinMaxField stats_min_max_field_;
+
   // data_page_aad_ and data_page_header_aad_ contain the AAD for data page 
and data page
   // header in a single column respectively.
   // While calculating AAD for different pages in a single column the pages 
AAD is
@@ -349,7 +353,7 @@ bool 
SerializedPageReader::ShouldSkipPage(EncodedStatistics* data_page_statistic
   if (page_type == PageType::DATA_PAGE) {
     const format::DataPageHeader& header = 
current_page_header_.data_page_header;
     CheckNumValuesInHeader(header.num_values);
-    *data_page_statistics = ExtractStatsFromHeader(header);
+    *data_page_statistics = ExtractStatsFromHeader(header, 
stats_min_max_field_);
     seen_num_values_ += header.num_values;
     if (data_page_filter_) {
       const EncodedStatistics* filter_statistics =
@@ -370,7 +374,7 @@ bool 
SerializedPageReader::ShouldSkipPage(EncodedStatistics* data_page_statistic
         header.repetition_levels_byte_length < 0) {
       throw ParquetException("Invalid page header (negative levels byte 
length)");
     }
-    *data_page_statistics = ExtractStatsFromHeader(header);
+    *data_page_statistics = ExtractStatsFromHeader(header, 
stats_min_max_field_);
     seen_num_values_ += header.num_values;
     if (data_page_filter_) {
       const EncodedStatistics* filter_statistics =
@@ -591,6 +595,16 @@ std::shared_ptr<Buffer> 
SerializedPageReader::DecompressIfNeeded(
 
 }  // namespace
 
+std::unique_ptr<PageReader> PageReader::Open(
+    std::shared_ptr<ArrowInputStream> stream, int64_t total_num_values,
+    Compression::type codec, const ReaderProperties& properties,
+    const ColumnDescriptor& descr, bool always_compressed, const 
CryptoContext* ctx) {
+  const auto stats_min_max_field = GetStatisticsMinMaxField(descr);
+  return std::unique_ptr<PageReader>(
+      new SerializedPageReader(std::move(stream), total_num_values, codec, 
properties,
+                               ctx, always_compressed, stats_min_max_field));
+}
+
 std::unique_ptr<PageReader> PageReader::Open(std::shared_ptr<ArrowInputStream> 
stream,
                                              int64_t total_num_values,
                                              Compression::type codec,
@@ -598,7 +612,8 @@ std::unique_ptr<PageReader> 
PageReader::Open(std::shared_ptr<ArrowInputStream> s
                                              bool always_compressed,
                                              const CryptoContext* ctx) {
   return std::unique_ptr<PageReader>(new SerializedPageReader(
-      std::move(stream), total_num_values, codec, properties, ctx, 
always_compressed));
+      std::move(stream), total_num_values, codec, properties, ctx, 
always_compressed,
+      StatisticsMinMaxField::kMinValueMaxValue));
 }
 
 std::unique_ptr<PageReader> PageReader::Open(std::shared_ptr<ArrowInputStream> 
stream,
@@ -607,9 +622,9 @@ std::unique_ptr<PageReader> 
PageReader::Open(std::shared_ptr<ArrowInputStream> s
                                              bool always_compressed,
                                              ::arrow::MemoryPool* pool,
                                              const CryptoContext* ctx) {
-  return std::unique_ptr<PageReader>(
-      new SerializedPageReader(std::move(stream), total_num_values, codec,
-                               ReaderProperties(pool), ctx, 
always_compressed));
+  return std::unique_ptr<PageReader>(new SerializedPageReader(
+      std::move(stream), total_num_values, codec, ReaderProperties(pool), ctx,
+      always_compressed, StatisticsMinMaxField::kMinValueMaxValue));
 }
 
 namespace {
diff --git a/cpp/src/parquet/column_reader.h b/cpp/src/parquet/column_reader.h
index ac4469b190..ad20d4a9e6 100644
--- a/cpp/src/parquet/column_reader.h
+++ b/cpp/src/parquet/column_reader.h
@@ -117,11 +117,21 @@ class PARQUET_EXPORT PageReader {
  public:
   virtual ~PageReader() = default;
 
+  static std::unique_ptr<PageReader> Open(std::shared_ptr<ArrowInputStream> 
stream,
+                                          int64_t total_num_values,
+                                          Compression::type codec,
+                                          const ReaderProperties& properties,
+                                          const ColumnDescriptor& descr,
+                                          bool always_compressed = false,
+                                          const CryptoContext* ctx = NULLPTR);
+
+  PARQUET_DEPRECATED("Deprecated in 25.0.0. Use the ColumnDescriptor overload 
instead.")
   static std::unique_ptr<PageReader> Open(
       std::shared_ptr<ArrowInputStream> stream, int64_t total_num_values,
       Compression::type codec, bool always_compressed = false,
       ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(),
       const CryptoContext* ctx = NULLPTR);
+  PARQUET_DEPRECATED("Deprecated in 25.0.0. Use the ColumnDescriptor overload 
instead.")
   static std::unique_ptr<PageReader> Open(std::shared_ptr<ArrowInputStream> 
stream,
                                           int64_t total_num_values,
                                           Compression::type codec,
diff --git a/cpp/src/parquet/column_writer_test.cc 
b/cpp/src/parquet/column_writer_test.cc
index a453949172..69ec07c1b7 100644
--- a/cpp/src/parquet/column_writer_test.cc
+++ b/cpp/src/parquet/column_writer_test.cc
@@ -100,8 +100,8 @@ class TestPrimitiveWriter : public 
PrimitiveTypedTest<TestType> {
     auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
     ReaderProperties readerProperties;
     readerProperties.set_page_checksum_verification(page_checksum_verify);
-    std::unique_ptr<PageReader> page_reader =
-        PageReader::Open(std::move(source), num_rows, compression, 
readerProperties);
+    std::unique_ptr<PageReader> page_reader = PageReader::Open(
+        std::move(source), num_rows, compression, readerProperties, 
*this->descr_);
     reader_ = std::static_pointer_cast<TypedColumnReader<TestType>>(
         ColumnReader::Make(this->descr_, std::move(page_reader)));
   }
@@ -2058,8 +2058,9 @@ TEST_F(TestValuesWriterInt32Type, 
AvoidCompressedInDataPageV2) {
     ASSERT_OK_AND_ASSIGN(auto buffer, this->sink_->Finish());
     auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
     ReaderProperties readerProperties;
-    std::unique_ptr<PageReader> page_reader = PageReader::Open(
-        std::move(source), total_num_values, compression, readerProperties);
+    std::unique_ptr<PageReader> page_reader =
+        PageReader::Open(std::move(source), total_num_values, compression,
+                         readerProperties, *this->descr_);
     auto data_page = 
std::static_pointer_cast<DataPageV2>(page_reader->NextPage());
     ASSERT_TRUE(data_page != nullptr);
     ASSERT_FALSE(data_page->is_compressed());
diff --git a/cpp/src/parquet/file_deserialize_test.cc 
b/cpp/src/parquet/file_deserialize_test.cc
index 7fa5e2f167..e81f7689a6 100644
--- a/cpp/src/parquet/file_deserialize_test.cc
+++ b/cpp/src/parquet/file_deserialize_test.cc
@@ -46,6 +46,14 @@ namespace parquet {
 using ::arrow::io::BufferReader;
 using ::parquet::DataPageStats;
 
+// Make a column descriptor with an undefined column order.
+ColumnDescriptor MakeColumnDescriptor() {
+  auto node = schema::Int32("int_col", Repetition::REQUIRED);
+  static_cast<schema::PrimitiveNode*>(node.get())
+      ->SetColumnOrder(ColumnOrder::undefined_);
+  return ColumnDescriptor(node, /*max_definition_level=*/0, 
/*max_repetition_level=*/0);
+}
+
 // Adds page statistics occupying a certain amount of bytes (for testing very
 // large page headers)
 template <typename H>
@@ -110,6 +118,8 @@ static std::vector<Compression::type> 
GetSupportedCodecTypes() {
 
 class TestPageSerde : public ::testing::Test {
  public:
+  TestPageSerde() : descr_(MakeColumnDescriptor()) {}
+
   void SetUp() {
     data_page_header_.encoding = format::Encoding::PLAIN;
     data_page_header_.definition_level_encoding = format::Encoding::RLE;
@@ -124,7 +134,7 @@ class TestPageSerde : public ::testing::Test {
     EndStream();
 
     auto stream = std::make_shared<::arrow::io::BufferReader>(out_buffer_);
-    page_reader_ = PageReader::Open(stream, num_rows, codec, properties);
+    page_reader_ = PageReader::Open(stream, num_rows, codec, properties, 
descr_);
   }
 
   void WriteDataPageHeader(int max_serialized_len = 1024, int32_t 
uncompressed_size = 0,
@@ -204,6 +214,7 @@ class TestPageSerde : public ::testing::Test {
   std::shared_ptr<::arrow::io::BufferOutputStream> out_stream_;
   std::shared_ptr<Buffer> out_buffer_;
 
+  ColumnDescriptor descr_;
   std::unique_ptr<PageReader> page_reader_;
   format::PageHeader page_header_;
   format::DataPageHeader data_page_header_;
@@ -466,7 +477,8 @@ TYPED_TEST(PageFilterTest, TestPageWithoutStatistics) {
 
   auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
   this->page_reader_ =
-      PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+      PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED,
+                       ReaderProperties(), this->descr_);
 
   int num_pages = 0;
   bool is_stats_null = false;
@@ -492,7 +504,8 @@ TYPED_TEST(PageFilterTest, TestPageFilterCallback) {
      // are right.
     auto stream = 
std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
     this->page_reader_ =
-        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED,
+                         ReaderProperties(), this->descr_);
 
     std::vector<EncodedStatistics> read_stats;
     std::vector<int64_t> read_num_values;
@@ -526,7 +539,8 @@ TYPED_TEST(PageFilterTest, TestPageFilterCallback) {
   {  // Skip all pages.
     auto stream = 
std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
     this->page_reader_ =
-        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED,
+                         ReaderProperties(), this->descr_);
 
     auto skip_all_pages = [](const DataPageStats& stats) -> bool { return 
true; };
 
@@ -538,7 +552,8 @@ TYPED_TEST(PageFilterTest, TestPageFilterCallback) {
   {  // Skip every other page.
     auto stream = 
std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
     this->page_reader_ =
-        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+        PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED,
+                         ReaderProperties(), this->descr_);
 
     // Skip pages with even number of values.
     auto skip_even_pages = [](const DataPageStats& stats) -> bool {
@@ -569,7 +584,8 @@ TYPED_TEST(PageFilterTest, TestChangingPageFilter) {
 
   auto stream = std::make_shared<::arrow::io::BufferReader>(this->out_buffer_);
   this->page_reader_ =
-      PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED);
+      PageReader::Open(stream, this->total_rows_, Compression::UNCOMPRESSED,
+                       ReaderProperties(), this->descr_);
 
   // This callback will always return false.
   auto read_all_pages = [](const DataPageStats& stats) -> bool { return false; 
};
@@ -604,7 +620,8 @@ TEST_F(TestPageSerde, DoesNotFilterDictionaryPages) {
 
   // Try to read it back while asking for all data pages to be skipped.
   auto stream = std::make_shared<::arrow::io::BufferReader>(out_buffer_);
-  page_reader_ = PageReader::Open(stream, /*num_rows=*/100, 
Compression::UNCOMPRESSED);
+  page_reader_ = PageReader::Open(stream, /*num_rows=*/100, 
Compression::UNCOMPRESSED,
+                                  ReaderProperties(), descr_);
 
   auto skip_all_pages = [](const DataPageStats& stats) -> bool { return true; 
};
 
@@ -641,7 +658,8 @@ TEST_F(TestPageSerde, SkipsNonDataPages) {
   EndStream();
 
   auto stream = std::make_shared<::arrow::io::BufferReader>(out_buffer_);
-  page_reader_ = PageReader::Open(stream, /*num_rows=*/100, 
Compression::UNCOMPRESSED);
+  page_reader_ = PageReader::Open(stream, /*num_rows=*/100, 
Compression::UNCOMPRESSED,
+                                  ReaderProperties(), descr_);
 
   // Only the two data pages are returned.
   std::shared_ptr<Page> current_page = page_reader_->NextPage();
diff --git a/cpp/src/parquet/file_reader.cc b/cpp/src/parquet/file_reader.cc
index d0552adcee..2f46a5e296 100644
--- a/cpp/src/parquet/file_reader.cc
+++ b/cpp/src/parquet/file_reader.cc
@@ -239,6 +239,7 @@ class SerializedRowGroup : public RowGroupReader::Contents {
   std::unique_ptr<PageReader> GetColumnPageReader(int i) override {
     // Read column chunk from the file
     auto col = row_group_metadata_->ColumnChunk(i);
+    const ColumnDescriptor* descr = row_group_metadata_->schema()->Column(i);
 
     ::arrow::io::ReadRange col_range =
         ComputeColumnChunkRange(file_metadata_, source_size_, 
row_group_ordinal_, i);
@@ -263,7 +264,7 @@ class SerializedRowGroup : public RowGroupReader::Contents {
     // Column is encrypted only if crypto_metadata exists.
     if (!crypto_metadata) {
       return PageReader::Open(stream, col->num_values(), col->compression(), 
properties_,
-                              always_compressed);
+                              *descr, always_compressed);
     }
 
     // The column is encrypted
@@ -286,7 +287,7 @@ class SerializedRowGroup : public RowGroupReader::Contents {
                       std::move(meta_decryptor_factory),
                       std::move(data_decryptor_factory)};
     return PageReader::Open(stream, col->num_values(), col->compression(), 
properties_,
-                            always_compressed, &ctx);
+                            *descr, always_compressed, &ctx);
   }
 
  private:
diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc
index 46cd1e4424..98f60df63d 100644
--- a/cpp/src/parquet/metadata.cc
+++ b/cpp/src/parquet/metadata.cc
@@ -90,40 +90,48 @@ std::string ParquetVersionToString(ParquetVersion::type 
ver) {
   return "UNKNOWN";
 }
 
+namespace {
+
 template <typename DType>
-static std::shared_ptr<Statistics> MakeTypedColumnStats(
-    const format::ColumnMetaData& metadata, const ColumnDescriptor* descr,
-    ::arrow::MemoryPool* pool) {
-  std::optional<bool> min_exact =
-      metadata.statistics.__isset.is_min_value_exact
-          ? std::optional<bool>(metadata.statistics.is_min_value_exact)
-          : std::nullopt;
-  std::optional<bool> max_exact =
-      metadata.statistics.__isset.is_max_value_exact
-          ? std::optional<bool>(metadata.statistics.is_max_value_exact)
-          : std::nullopt;
-  // If ColumnOrder is defined, return max_value and min_value
-  if (descr->column_order().get_order() == ColumnOrder::TYPE_DEFINED_ORDER) {
-    return MakeStatistics<DType>(
-        descr, metadata.statistics.min_value, metadata.statistics.max_value,
-        metadata.num_values - metadata.statistics.null_count,
-        metadata.statistics.null_count, metadata.statistics.distinct_count,
-        metadata.statistics.__isset.max_value && 
metadata.statistics.__isset.min_value,
-        metadata.statistics.__isset.null_count,
-        metadata.statistics.__isset.distinct_count, min_exact, max_exact, 
pool);
-  }
-  // Default behavior
+std::shared_ptr<Statistics> MakeTypedColumnStats(const format::ColumnMetaData& 
metadata,
+                                                 const ColumnDescriptor* descr,
+                                                 ::arrow::MemoryPool* pool) {
+  const auto& statistics = metadata.statistics;
+  const std::string kEmpty = "";
+  const std::string* encoded_min = &kEmpty;
+  const std::string* encoded_max = &kEmpty;
+  bool has_min_max = false;
+  std::optional<bool> min_exact = std::nullopt;
+  std::optional<bool> max_exact = std::nullopt;
+
+  switch (GetStatisticsMinMaxField(*descr)) {
+    case StatisticsMinMaxField::kMinValueMaxValue:
+      encoded_min = &statistics.min_value;
+      encoded_max = &statistics.max_value;
+      has_min_max = statistics.__isset.max_value && 
statistics.__isset.min_value;
+      min_exact = statistics.__isset.is_min_value_exact
+                      ? std::optional<bool>(statistics.is_min_value_exact)
+                      : std::nullopt;
+      max_exact = statistics.__isset.is_max_value_exact
+                      ? std::optional<bool>(statistics.is_max_value_exact)
+                      : std::nullopt;
+      break;
+    case StatisticsMinMaxField::kLegacyMinMax:
+      encoded_min = &statistics.min;
+      encoded_max = &statistics.max;
+      has_min_max = statistics.__isset.max && statistics.__isset.min;
+      break;
+    case StatisticsMinMaxField::kInvalid:
+      break;
+  }
+
   return MakeStatistics<DType>(
-      descr, metadata.statistics.min, metadata.statistics.max,
-      metadata.num_values - metadata.statistics.null_count,
-      metadata.statistics.null_count, metadata.statistics.distinct_count,
-      metadata.statistics.__isset.max && metadata.statistics.__isset.min,
-      metadata.statistics.__isset.null_count, 
metadata.statistics.__isset.distinct_count,
-      min_exact, max_exact, pool);
+      descr, *encoded_min, *encoded_max, metadata.num_values - 
statistics.null_count,
+      statistics.null_count, statistics.distinct_count, has_min_max,
+      statistics.__isset.null_count, statistics.__isset.distinct_count, 
min_exact,
+      max_exact, pool);
 }
 
-namespace {
-
 std::shared_ptr<geospatial::GeoStatistics> MakeColumnGeometryStats(
     const format::ColumnMetaData& metadata, const ColumnDescriptor* descr) {
   if (metadata.__isset.geospatial_statistics) {
@@ -336,12 +344,8 @@ class ColumnChunkMetaData::ColumnChunkMetaDataImpl {
     {
       const std::lock_guard<std::mutex> guard(stats_mutex_);
       if (possible_encoded_stats_ == nullptr) {
-        possible_encoded_stats_ =
-            
std::make_shared<EncodedStatistics>(FromThrift(column_metadata_->statistics));
-        if (descr_->sort_order() == SortOrder::UNKNOWN) {
-          // If the column SortOrder is Unknown we can't trust max/min.
-          possible_encoded_stats_->ClearMinMax();
-        }
+        possible_encoded_stats_ = std::make_shared<EncodedStatistics>(
+            FromThrift(column_metadata_->statistics, 
GetStatisticsMinMaxField(*descr_)));
       }
     }
     return writer_version_->HasCorrectStatistics(type(), 
*possible_encoded_stats_,
@@ -1037,7 +1041,7 @@ class FileMetaData::FileMetaDataImpl {
         if (column_order.__isset.TYPE_ORDER) {
           column_orders.push_back(ColumnOrder::type_defined_);
         } else {
-          column_orders.push_back(ColumnOrder::undefined_);
+          column_orders.push_back(ColumnOrder::unknown_);
         }
       }
     } else {
diff --git a/cpp/src/parquet/metadata_test.cc b/cpp/src/parquet/metadata_test.cc
index 572f053179..ac45be1fac 100644
--- a/cpp/src/parquet/metadata_test.cc
+++ b/cpp/src/parquet/metadata_test.cc
@@ -275,6 +275,104 @@ TEST(Metadata, TestBuildAccess) {
   ASSERT_TRUE(f_accessor_1->Equals(*f_accessor->Subset({2, 0})));
 }
 
+namespace {
+
+std::string EncodeInt32(int32_t value) {
+  return std::string(reinterpret_cast<const char*>(&value), sizeof(value));
+}
+
+constexpr int32_t kLegacyMin = 100, kLegacyMax = 200;
+
+std::string SerializeMetadata(const format::FileMetaData& thrift_metadata) {
+  std::string out;
+  ThriftSerializer{}.SerializeToString(&thrift_metadata, &out);
+  return out;
+}
+
+std::shared_ptr<FileMetaData> ParseMetadata(std::string serialized_metadata) {
+  uint32_t decoded_len = static_cast<uint32_t>(serialized_metadata.size());
+  return FileMetaData::Make(serialized_metadata.data(), &decoded_len);
+}
+
+format::FileMetaData SingleInt32MetadataWithStats() {
+  format::FileMetaData metadata;
+  format::SchemaElement root, leaf;
+  schema::NodeVector fields = {schema::Int32("int_col", Repetition::REQUIRED)};
+  schema::GroupNode::Make("schema", Repetition::REPEATED, 
fields)->ToParquet(&root);
+  fields.back()->ToParquet(&leaf);
+  metadata.schema = {std::move(root), std::move(leaf)};
+
+  auto& column = metadata.row_groups.emplace_back().columns.emplace_back();
+  column.__isset.meta_data = true;
+  auto& column_metadata = column.meta_data;
+  column_metadata.__set_type(format::Type::INT32);
+  column_metadata.__isset.statistics = true;
+  auto& statistics = column_metadata.statistics;
+  statistics.__set_min(EncodeInt32(kLegacyMin));
+  statistics.__set_max(EncodeInt32(kLegacyMax));
+  statistics.__set_min_value(EncodeInt32(kLegacyMin));
+  statistics.__set_max_value(EncodeInt32(kLegacyMax));
+  
metadata.column_orders.emplace_back().__set_TYPE_ORDER(format::TypeDefinedOrder{});
+  metadata.__isset.column_orders = true;
+  return metadata;
+}
+
+std::unique_ptr<ColumnChunkMetaData> GetOnlyColumnChunk(
+    const FileMetaData& metadata, ColumnOrder::type expected_order) {
+  EXPECT_EQ(expected_order, 
metadata.schema()->Column(0)->column_order().get_order());
+  return metadata.RowGroup(0)->ColumnChunk(0);
+}
+
+void AssertColumnChunkHasNoMinMax(const FileMetaData& metadata,
+                                  ColumnOrder::type expected_order) {
+  auto column = GetOnlyColumnChunk(metadata, expected_order);
+  ASSERT_NE(nullptr, column->encoded_statistics());
+  EXPECT_FALSE(column->encoded_statistics()->has_min);
+  EXPECT_FALSE(column->encoded_statistics()->has_max);
+  ASSERT_NE(nullptr, column->statistics());
+  EXPECT_FALSE(column->statistics()->HasMinMax());
+}
+
+void AssertColumnChunkMinMax(const FileMetaData& metadata,
+                             ColumnOrder::type expected_order, int32_t min, 
int32_t max) {
+  auto column = GetOnlyColumnChunk(metadata, expected_order);
+  const std::string encoded_min = EncodeInt32(min);
+  const std::string encoded_max = EncodeInt32(max);
+
+  ASSERT_NE(nullptr, column->encoded_statistics());
+  EXPECT_EQ(encoded_min, column->encoded_statistics()->min());
+  EXPECT_EQ(encoded_max, column->encoded_statistics()->max());
+  ASSERT_NE(nullptr, column->statistics());
+  EXPECT_EQ(encoded_min, column->statistics()->EncodeMin());
+  EXPECT_EQ(encoded_max, column->statistics()->EncodeMax());
+}
+
+}  // namespace
+
+TEST(Metadata, UnknownColumnOrderIgnoresMinMax) {
+  format::FileMetaData thrift_metadata = SingleInt32MetadataWithStats();
+  // Simulate an unsupported ColumnOrder value: unknown union fields are 
skipped by
+  // Thrift, leaving an entry with no known field set.
+  thrift_metadata.column_orders.clear();
+  thrift_metadata.column_orders.emplace_back();
+  thrift_metadata.__isset.column_orders = true;
+
+  auto metadata = ParseMetadata(SerializeMetadata(thrift_metadata));
+  AssertColumnChunkHasNoMinMax(*metadata, ColumnOrder::UNKNOWN);
+}
+
+TEST(Metadata, MissingColumnOrderUsesLegacyMinMax) {
+  format::FileMetaData thrift_metadata = SingleInt32MetadataWithStats();
+  thrift_metadata.column_orders.clear();
+  thrift_metadata.__isset.column_orders = false;
+  auto& statistics = 
thrift_metadata.row_groups.at(0).columns.at(0).meta_data.statistics;
+  statistics.__set_min_value(EncodeInt32(kLegacyMin - 100));
+  statistics.__set_max_value(EncodeInt32(kLegacyMax + 100));
+
+  auto metadata = ParseMetadata(SerializeMetadata(thrift_metadata));
+  AssertColumnChunkMinMax(*metadata, ColumnOrder::UNDEFINED, kLegacyMin, 
kLegacyMax);
+}
+
 TEST(Metadata, TestV1Version) {
   // PARQUET-839
   parquet::schema::NodeVector fields;
diff --git a/cpp/src/parquet/schema.h b/cpp/src/parquet/schema.h
index 1addc73bd3..65732603ea 100644
--- a/cpp/src/parquet/schema.h
+++ b/cpp/src/parquet/schema.h
@@ -382,6 +382,22 @@ class PARQUET_EXPORT ColumnDescriptor {
     return la ? GetSortOrder(la, pt) : GetSortOrder(converted_type(), pt);
   }
 
+  // Whether ColumnOrder-governed min/max values have a supported ordering.
+  bool can_use_min_max() const {
+    switch (column_order().get_order()) {
+      case ColumnOrder::TYPE_DEFINED_ORDER:
+        return sort_order() != SortOrder::UNKNOWN;
+      case ColumnOrder::UNDEFINED:
+        // If there is no defined column order, the obsolete min and max fields
+        // in the Statistics object are to be used, and they are always sorted
+        // by signed comparison, so this has to be the column type's sort 
order.
+        return sort_order() == SortOrder::SIGNED;
+      case ColumnOrder::UNKNOWN:
+        return false;
+    }
+    return false;
+  }
+
   const std::string& name() const { return primitive_node_->name(); }
 
   const std::shared_ptr<schema::ColumnPath> path() const;
diff --git a/cpp/src/parquet/schema_test.cc b/cpp/src/parquet/schema_test.cc
index 2950a7df70..859f14a34d 100644
--- a/cpp/src/parquet/schema_test.cc
+++ b/cpp/src/parquet/schema_test.cc
@@ -660,6 +660,32 @@ TEST(TestColumnDescriptor, TestAttrs) {
   ASSERT_EQ(expected_descr, descr2.ToString());
 }
 
+TEST(TestColumnDescriptor, CanUseStats) {
+  NodePtr node = Int32("name");
+  ColumnDescriptor descr(node, 0, 0);
+  // Type-defined column order is usable when the type has a known sort order.
+  EXPECT_TRUE(descr.can_use_min_max());
+
+  auto primitive_node = std::static_pointer_cast<PrimitiveNode>(node);
+  primitive_node->SetColumnOrder(ColumnOrder::undefined_);
+  // Missing column order falls back to legacy min/max, which are signed.
+  EXPECT_TRUE(ColumnDescriptor(node, 0, 0).can_use_min_max());
+
+  primitive_node->SetColumnOrder(ColumnOrder::unknown_);
+  // Unsupported column order means min/max ordering is unknown to this reader.
+  EXPECT_FALSE(ColumnDescriptor(node, 0, 0).can_use_min_max());
+
+  node = PrimitiveNode::Make("name", Repetition::REQUIRED, Type::BYTE_ARRAY);
+  primitive_node = std::static_pointer_cast<PrimitiveNode>(node);
+  primitive_node->SetColumnOrder(ColumnOrder::undefined_);
+  // Legacy min/max are signed, so they cannot represent unsigned byte 
ordering.
+  EXPECT_FALSE(ColumnDescriptor(node, 0, 0).can_use_min_max());
+
+  node = PrimitiveNode::Make("name", Repetition::REQUIRED, Type::INT96);
+  // INT96 has no defined sort order in the Parquet type-defined ordering.
+  EXPECT_FALSE(ColumnDescriptor(node, 0, 0).can_use_min_max());
+}
+
 class TestSchemaDescriptor : public ::testing::Test {
  public:
   void setUp() {}
diff --git a/cpp/src/parquet/statistics_test.cc 
b/cpp/src/parquet/statistics_test.cc
index 905502cb0a..0d300b856c 100644
--- a/cpp/src/parquet/statistics_test.cc
+++ b/cpp/src/parquet/statistics_test.cc
@@ -1660,6 +1660,13 @@ TEST(TestStatisticsSortOrder, UNKNOWN) {
   ASSERT_EQ(1, enc_stats->null_count);
   ASSERT_FALSE(enc_stats->is_max_value_exact.has_value());
   ASSERT_FALSE(enc_stats->is_min_value_exact.has_value());
+
+  // Unknown sort order should not cause min/max to be set
+  std::shared_ptr<Statistics> stats = column_chunk->statistics();
+  ASSERT_NE(nullptr, stats);
+  ASSERT_FALSE(stats->HasMinMax());
+  ASSERT_TRUE(stats->HasNullCount());
+  ASSERT_EQ(1, stats->null_count());
 }
 
 // Test statistics for binary column with UNSIGNED sort order
diff --git a/cpp/src/parquet/thrift_internal.h 
b/cpp/src/parquet/thrift_internal.h
index 5d14ae4e28..971e6ccebc 100644
--- a/cpp/src/parquet/thrift_internal.h
+++ b/cpp/src/parquet/thrift_internal.h
@@ -44,6 +44,7 @@
 #include "parquet/geospatial/statistics.h"
 #include "parquet/platform.h"
 #include "parquet/properties.h"
+#include "parquet/schema.h"
 #include "parquet/size_statistics.h"
 #include "parquet/statistics.h"
 #include "parquet/types.h"
@@ -252,12 +253,39 @@ static inline AadMetadata FromThrift(format::AesGcmCtrV1 
aesGcmCtrV1) {
                      aesGcmCtrV1.supply_aad_prefix};
 }
 
-static inline EncodedStatistics FromThrift(const format::Statistics& stats) {
+// Selects how thrift Statistics min/max fields should populate 
EncodedStatistics.
+enum class StatisticsMinMaxField {
+  // Do not populate min/max, because the ordering is undefined or unsupported.
+  kInvalid,
+  // Populate min/max from the min_value/max_value fields.
+  kMinValueMaxValue,
+  // Populate min/max from the legacy min/max fields.
+  kLegacyMinMax,
+};
+
+// Keep this field-selection logic consistent with 
ColumnDescriptor::can_use_min_max().
+static inline StatisticsMinMaxField GetStatisticsMinMaxField(
+    const ColumnDescriptor& descr) {
+  switch (descr.column_order().get_order()) {
+    case ColumnOrder::TYPE_DEFINED_ORDER:
+      return descr.sort_order() != SortOrder::UNKNOWN
+                 ? StatisticsMinMaxField::kMinValueMaxValue
+                 : StatisticsMinMaxField::kInvalid;
+    case ColumnOrder::UNDEFINED:
+      return descr.sort_order() == SortOrder::SIGNED
+                 ? StatisticsMinMaxField::kLegacyMinMax
+                 : StatisticsMinMaxField::kInvalid;
+    case ColumnOrder::UNKNOWN:
+      return StatisticsMinMaxField::kInvalid;
+  }
+  return StatisticsMinMaxField::kInvalid;
+}
+
+static inline EncodedStatistics FromThrift(const format::Statistics& stats,
+                                           StatisticsMinMaxField min_max) {
   EncodedStatistics out;
 
-  // Use the new V2 min-max statistics over the former one if it is filled
-  if (stats.__isset.max_value || stats.__isset.min_value) {
-    // TODO: check if the column_order is TYPE_DEFINED_ORDER.
+  if (min_max == StatisticsMinMaxField::kMinValueMaxValue) {
     if (stats.__isset.max_value) {
       out.set_max(stats.max_value);
       if (stats.__isset.is_max_value_exact) {
@@ -270,9 +298,7 @@ static inline EncodedStatistics FromThrift(const 
format::Statistics& stats) {
         out.is_min_value_exact = stats.is_min_value_exact;
       }
     }
-  } else if (stats.__isset.max || stats.__isset.min) {
-    // TODO: check created_by to see if it is corrupted for some types.
-    // TODO: check if the sort_order is SIGNED.
+  } else if (min_max == StatisticsMinMaxField::kLegacyMinMax) {
     if (stats.__isset.max) {
       out.set_max(stats.max);
     }
diff --git a/cpp/src/parquet/types.cc b/cpp/src/parquet/types.cc
index fb4eb92a75..ff5bd4eafb 100644
--- a/cpp/src/parquet/types.cc
+++ b/cpp/src/parquet/types.cc
@@ -444,6 +444,7 @@ SortOrder::type GetSortOrder(const std::shared_ptr<const 
LogicalType>& logical_t
 
 ColumnOrder ColumnOrder::undefined_ = ColumnOrder(ColumnOrder::UNDEFINED);
 ColumnOrder ColumnOrder::type_defined_ = 
ColumnOrder(ColumnOrder::TYPE_DEFINED_ORDER);
+ColumnOrder ColumnOrder::unknown_ = ColumnOrder(ColumnOrder::UNKNOWN);
 
 // Static methods for LogicalType class
 
diff --git a/cpp/src/parquet/types.h b/cpp/src/parquet/types.h
index ad4df5119e..687353aa9b 100644
--- a/cpp/src/parquet/types.h
+++ b/cpp/src/parquet/types.h
@@ -597,9 +597,16 @@ struct PageType {
 
 bool PageCanUseChecksum(PageType::type pageType);
 
-class ColumnOrder {
+class PARQUET_EXPORT ColumnOrder {
  public:
-  enum type { UNDEFINED, TYPE_DEFINED_ORDER };
+  enum type {
+    // File metadata has no column order, only legacy min/max in stats are 
defined.
+    UNDEFINED,
+    // File metadata uses TypeDefinedOrder from the Parquet format.
+    TYPE_DEFINED_ORDER,
+    // Column order value unsupported by this reader.
+    UNKNOWN
+  };
   explicit ColumnOrder(ColumnOrder::type column_order) : 
column_order_(column_order) {}
   // Default to Type Defined Order
   ColumnOrder() : column_order_(type::TYPE_DEFINED_ORDER) {}
@@ -607,6 +614,7 @@ class ColumnOrder {
 
   static ColumnOrder undefined_;
   static ColumnOrder type_defined_;
+  static ColumnOrder unknown_;
 
  private:
   ColumnOrder::type column_order_;


Reply via email to