This is an automated email from the ASF dual-hosted git repository.

wjones127 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/main by this push:
     new 0cf4ffaa90 GH-34053: [C++][Parquet] Write parquet page index (#34054)
0cf4ffaa90 is described below

commit 0cf4ffaa90de6f036c363ac95d83474d6a1dfcc2
Author: Gang Wu <[email protected]>
AuthorDate: Tue Apr 11 07:48:31 2023 +0800

    GH-34053: [C++][Parquet] Write parquet page index (#34054)
    
    ### Rationale for this change
    
    Parquet C++ reader supports reading page index from file, but the writer 
does not yet support writing it.
    
    ### What changes are included in this PR?
    
    Parquet file writer collects page index from all data pages and serializes 
page index into the file.
    
    ### Are these changes tested?
    
    Not yet, will be added later.
    
    ### Are there any user-facing changes?
    
    `WriterProperties::enable_write_page_index()` and 
`WriterProperties::disable_write_page_index()` have been added to toggle it on 
and off.
    * Closes: #34053
    
    Authored-by: Gang Wu <[email protected]>
    Signed-off-by: Will Jones <[email protected]>
---
 cpp/src/parquet/arrow/arrow_reader_writer_test.cc | 285 ++++++++++++++-
 cpp/src/parquet/column_page.h                     |  23 +-
 cpp/src/parquet/column_writer.cc                  |  92 ++++-
 cpp/src/parquet/column_writer.h                   |  10 +-
 cpp/src/parquet/file_writer.cc                    |  61 +++-
 cpp/src/parquet/metadata.cc                       |  38 ++
 cpp/src/parquet/metadata.h                        |  18 +
 cpp/src/parquet/page_index.cc                     | 383 ++++++++++++++++++++
 cpp/src/parquet/page_index.h                      | 114 ++++++
 cpp/src/parquet/page_index_test.cc                | 414 ++++++++++++++++++++++
 cpp/src/parquet/properties.h                      |  36 +-
 cpp/src/parquet/statistics.cc                     |   4 +-
 cpp/src/parquet/statistics.h                      |   6 +
 cpp/src/parquet/thrift_internal.h                 |  12 +
 14 files changed, 1458 insertions(+), 38 deletions(-)

diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc 
b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
index eba964f967..b5456e89c6 100644
--- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
+++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
@@ -21,6 +21,7 @@
 #pragma warning(disable : 4800)
 #endif
 
+#include "gmock/gmock.h"
 #include "gtest/gtest.h"
 
 #include <cstdint>
@@ -66,6 +67,7 @@
 #include "parquet/arrow/writer.h"
 #include "parquet/column_writer.h"
 #include "parquet/file_writer.h"
+#include "parquet/page_index.h"
 #include "parquet/test_util.h"
 
 using arrow::Array;
@@ -5081,10 +5083,21 @@ TEST(TestArrowReadWrite, WriteAndReadRecordBatch) {
 
   // Verify the single record batch has been sliced into two row groups by
   // WriterProperties::max_row_group_length().
-  int num_row_groups = 
arrow_reader->parquet_reader()->metadata()->num_row_groups();
+  auto file_metadata = arrow_reader->parquet_reader()->metadata();
+  int num_row_groups = file_metadata->num_row_groups();
   ASSERT_EQ(2, num_row_groups);
-  ASSERT_EQ(10, 
arrow_reader->parquet_reader()->metadata()->RowGroup(0)->num_rows());
-  ASSERT_EQ(2, 
arrow_reader->parquet_reader()->metadata()->RowGroup(1)->num_rows());
+  ASSERT_EQ(10, file_metadata->RowGroup(0)->num_rows());
+  ASSERT_EQ(2, file_metadata->RowGroup(1)->num_rows());
+
+  // Verify that page index is not written by default.
+  for (int i = 0; i < num_row_groups; ++i) {
+    auto row_group_metadata = file_metadata->RowGroup(i);
+    for (int j = 0; j < row_group_metadata->num_columns(); ++j) {
+      auto column_metadata = row_group_metadata->ColumnChunk(j);
+      EXPECT_FALSE(column_metadata->GetColumnIndexLocation().has_value());
+      EXPECT_FALSE(column_metadata->GetOffsetIndexLocation().has_value());
+    }
+  }
 
   // Verify batch data read via RecordBatch
   std::unique_ptr<::arrow::RecordBatchReader> batch_reader;
@@ -5146,5 +5159,271 @@ TEST(TestArrowReadWrite, FuzzReader) {
   }
 }
 
+namespace {
+
+struct ColumnIndexObject {
+  std::vector<bool> null_pages;
+  std::vector<std::string> min_values;
+  std::vector<std::string> max_values;
+  BoundaryOrder::type boundary_order = BoundaryOrder::Unordered;
+  std::vector<int64_t> null_counts;
+
+  ColumnIndexObject() = default;
+
+  ColumnIndexObject(const std::vector<bool>& null_pages,
+                    const std::vector<std::string>& min_values,
+                    const std::vector<std::string>& max_values,
+                    BoundaryOrder::type boundary_order,
+                    const std::vector<int64_t>& null_counts)
+      : null_pages(null_pages),
+        min_values(min_values),
+        max_values(max_values),
+        boundary_order(boundary_order),
+        null_counts(null_counts) {}
+
+  explicit ColumnIndexObject(const ColumnIndex* column_index) {
+    if (column_index == nullptr) {
+      return;
+    }
+    null_pages = column_index->null_pages();
+    min_values = column_index->encoded_min_values();
+    max_values = column_index->encoded_max_values();
+    boundary_order = column_index->boundary_order();
+    if (column_index->has_null_counts()) {
+      null_counts = column_index->null_counts();
+    }
+  }
+
+  bool operator==(const ColumnIndexObject& b) const {
+    return null_pages == b.null_pages && min_values == b.min_values &&
+           max_values == b.max_values && boundary_order == b.boundary_order &&
+           null_counts == b.null_counts;
+  }
+};
+
+auto encode_int64 = [](int64_t value) {
+  return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+};
+
+auto encode_double = [](double value) {
+  return std::string(reinterpret_cast<const char*>(&value), sizeof(double));
+};
+
+}  // namespace
+
+class ParquetPageIndexRoundTripTest : public ::testing::Test {
+ public:
+  void WriteFile(const std::shared_ptr<WriterProperties>& writer_properties,
+                 const std::shared_ptr<::arrow::Table>& table) {
+    // Get schema from table.
+    auto schema = table->schema();
+    std::shared_ptr<SchemaDescriptor> parquet_schema;
+    auto arrow_writer_properties = default_arrow_writer_properties();
+    ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+                                       *arrow_writer_properties, 
&parquet_schema));
+    auto schema_node = 
std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+    // Write table to buffer.
+    auto sink = CreateOutputStream();
+    auto pool = ::arrow::default_memory_pool();
+    auto writer = ParquetFileWriter::Open(sink, schema_node, 
writer_properties);
+    std::unique_ptr<FileWriter> arrow_writer;
+    ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema, 
arrow_writer_properties,
+                               &arrow_writer));
+    ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table));
+    ASSERT_OK_NO_THROW(arrow_writer->Close());
+    ASSERT_OK_AND_ASSIGN(buffer_, sink->Finish());
+  }
+
+  void ReadPageIndexes(int expect_num_row_groups, int expect_num_pages) {
+    auto read_properties = default_arrow_reader_properties();
+    auto reader = 
ParquetFileReader::Open(std::make_shared<BufferReader>(buffer_));
+
+    auto metadata = reader->metadata();
+    ASSERT_EQ(expect_num_row_groups, metadata->num_row_groups());
+
+    auto page_index_reader = reader->GetPageIndexReader();
+    ASSERT_NE(page_index_reader, nullptr);
+
+    int64_t offset_lower_bound = 0;
+    for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {
+      auto row_group_index_reader = page_index_reader->RowGroup(rg);
+      ASSERT_NE(row_group_index_reader, nullptr);
+
+      for (int col = 0; col < metadata->num_columns(); ++col) {
+        auto column_index = row_group_index_reader->GetColumnIndex(col);
+        column_indexes_.emplace_back(column_index.get());
+
+        auto offset_index = row_group_index_reader->GetOffsetIndex(col);
+        CheckOffsetIndex(offset_index.get(), expect_num_pages, 
&offset_lower_bound);
+      }
+    }
+  }
+
+ private:
+  void CheckOffsetIndex(const OffsetIndex* offset_index, int expect_num_pages,
+                        int64_t* offset_lower_bound_in_out) {
+    ASSERT_NE(offset_index, nullptr);
+    const auto& locations = offset_index->page_locations();
+    ASSERT_EQ(static_cast<size_t>(expect_num_pages), locations.size());
+    int64_t prev_first_row_index = -1;
+    for (const auto& location : locations) {
+      // Make sure first_row_index is in the ascending order within a row 
group.
+      ASSERT_GT(location.first_row_index, prev_first_row_index);
+      // Make sure page offset is in the ascending order across the file.
+      ASSERT_GE(location.offset, *offset_lower_bound_in_out);
+      // Make sure page size is positive.
+      ASSERT_GT(location.compressed_page_size, 0);
+      prev_first_row_index = location.first_row_index;
+      *offset_lower_bound_in_out = location.offset + 
location.compressed_page_size;
+    }
+  }
+
+ protected:
+  std::shared_ptr<Buffer> buffer_;
+  std::vector<ColumnIndexObject> column_indexes_;
+};
+
+TEST_F(ParquetPageIndexRoundTripTest, SimpleRoundTrip) {
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(4)
+                               ->build();
+  auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()),
+                                 ::arrow::field("c1", ::arrow::utf8()),
+                                 ::arrow::field("c2", 
::arrow::list(::arrow::int64()))});
+  WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([
+      [1,     "a",  [1]      ],
+      [2,     "b",  [1, 2]   ],
+      [3,     "c",  [null]   ],
+      [null,  "d",  []       ],
+      [5,     null, [3, 3, 3]],
+      [6,     "f",  null     ]
+    ])"}));
+
+  ReadPageIndexes(/*expect_num_row_groups=*/2, /*expect_num_pages=*/1);
+
+  EXPECT_THAT(
+      column_indexes_,
+      ::testing::ElementsAre(
+          ColumnIndexObject{/*null_pages=*/{false}, 
/*min_values=*/{encode_int64(1)},
+                            /*max_values=*/{encode_int64(3)}, 
BoundaryOrder::Ascending,
+                            /*null_counts=*/{1}},
+          ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"a"},
+                            /*max_values=*/{"d"}, BoundaryOrder::Ascending,
+                            /*null_counts=*/{0}},
+          ColumnIndexObject{/*null_pages=*/{false}, 
/*min_values=*/{encode_int64(1)},
+                            /*max_values=*/{encode_int64(2)}, 
BoundaryOrder::Ascending,
+                            /*null_counts=*/{2}},
+          ColumnIndexObject{/*null_pages=*/{false}, 
/*min_values=*/{encode_int64(5)},
+                            /*max_values=*/{encode_int64(6)}, 
BoundaryOrder::Ascending,
+                            /*null_counts=*/{0}},
+          ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"f"},
+                            /*max_values=*/{"f"}, BoundaryOrder::Ascending,
+                            /*null_counts=*/{1}},
+          ColumnIndexObject{/*null_pages=*/{false}, 
/*min_values=*/{encode_int64(3)},
+                            /*max_values=*/{encode_int64(3)}, 
BoundaryOrder::Ascending,
+                            /*null_counts=*/{1}}));
+}
+
+TEST_F(ParquetPageIndexRoundTripTest, DropLargeStats) {
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(1) /* write single-row 
row group */
+                               ->max_statistics_size(20) /* drop stats larger 
than it */
+                               ->build();
+  auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::utf8())});
+  WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([
+      ["short_string"],
+      ["very_large_string_to_drop_stats"]
+    ])"}));
+
+  ReadPageIndexes(/*expect_num_row_groups=*/2, /*expect_num_pages=*/1);
+
+  EXPECT_THAT(
+      column_indexes_,
+      ::testing::ElementsAre(
+          ColumnIndexObject{/*null_pages=*/{false}, 
/*min_values=*/{"short_string"},
+                            /*max_values=*/{"short_string"}, 
BoundaryOrder::Ascending,
+                            /*null_counts=*/{0}},
+          ColumnIndexObject{}));
+}
+
+TEST_F(ParquetPageIndexRoundTripTest, MultiplePages) {
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->data_pagesize(1) /* write multiple pages */
+                               ->build();
+  auto schema = ::arrow::schema(
+      {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1", 
::arrow::utf8())});
+  WriteFile(
+      writer_properties,
+      ::arrow::TableFromJSON(
+          schema, {R"([[1, "a"], [2, "b"]])", R"([[3, "c"], [4, "d"]])",
+                   R"([[null, null], [6, "f"]])", R"([[null, null], [null, 
null]])"}));
+
+  ReadPageIndexes(/*expect_num_row_groups=*/1, /*expect_num_pages=*/4);
+
+  EXPECT_THAT(
+      column_indexes_,
+      ::testing::ElementsAre(
+          ColumnIndexObject{
+              /*null_pages=*/{false, false, false, true},
+              /*min_values=*/{encode_int64(1), encode_int64(3), 
encode_int64(6), ""},
+              /*max_values=*/{encode_int64(2), encode_int64(4), 
encode_int64(6), ""},
+              BoundaryOrder::Ascending,
+              /*null_counts=*/{0, 0, 1, 2}},
+          ColumnIndexObject{/*null_pages=*/{false, false, false, true},
+                            /*min_values=*/{"a", "c", "f", ""},
+                            /*max_values=*/{"b", "d", "f", ""}, 
BoundaryOrder::Ascending,
+                            /*null_counts=*/{0, 0, 1, 2}}));
+}
+
+TEST_F(ParquetPageIndexRoundTripTest, DoubleWithNaNs) {
+  auto writer_properties = WriterProperties::Builder()
+                               .enable_write_page_index()
+                               ->max_row_group_length(3) /* 3 rows per row 
group */
+                               ->build();
+
+  // Create table to write with NaNs.
+  auto vectors = std::vector<std::shared_ptr<Array>>(4);
+  // NaN will be ignored in min/max stats.
+  ::arrow::ArrayFromVector<::arrow::DoubleType>({1.0, NAN, 0.1}, &vectors[0]);
+  // Lower bound will use -0.0.
+  ::arrow::ArrayFromVector<::arrow::DoubleType>({+0.0, NAN, +0.0}, 
&vectors[1]);
+  // Upper bound will use -0.0.
+  ::arrow::ArrayFromVector<::arrow::DoubleType>({-0.0, NAN, -0.0}, 
&vectors[2]);
+  // Pages with all NaNs will not build column index.
+  ::arrow::ArrayFromVector<::arrow::DoubleType>({NAN, NAN, NAN}, &vectors[3]);
+  ASSERT_OK_AND_ASSIGN(auto chunked_array,
+                       arrow::ChunkedArray::Make(vectors, ::arrow::float64()));
+
+  auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::float64())});
+  auto table = Table::Make(schema, {chunked_array});
+  WriteFile(writer_properties, table);
+
+  ReadPageIndexes(/*expect_num_row_groups=*/4, /*expect_num_pages=*/1);
+
+  EXPECT_THAT(
+      column_indexes_,
+      ::testing::ElementsAre(
+          ColumnIndexObject{/*null_pages=*/{false},
+                            /*min_values=*/{encode_double(0.1)},
+                            /*max_values=*/{encode_double(1.0)}, 
BoundaryOrder::Ascending,
+                            /*null_counts=*/{0}},
+          ColumnIndexObject{/*null_pages=*/{false},
+                            /*min_values=*/{encode_double(-0.0)},
+                            /*max_values=*/{encode_double(+0.0)},
+                            BoundaryOrder::Ascending,
+                            /*null_counts=*/{0}},
+          ColumnIndexObject{/*null_pages=*/{false},
+                            /*min_values=*/{encode_double(-0.0)},
+                            /*max_values=*/{encode_double(+0.0)},
+                            BoundaryOrder::Ascending,
+                            /*null_counts=*/{0}},
+          ColumnIndexObject{
+              /* Page with only NaN values does not have column index built 
*/}));
+}
+
 }  // namespace arrow
 }  // namespace parquet
diff --git a/cpp/src/parquet/column_page.h b/cpp/src/parquet/column_page.h
index 2fab77ed01..905f805b8c 100644
--- a/cpp/src/parquet/column_page.h
+++ b/cpp/src/parquet/column_page.h
@@ -23,6 +23,7 @@
 
 #include <cstdint>
 #include <memory>
+#include <optional>
 #include <string>
 
 #include "parquet/statistics.h"
@@ -64,23 +65,31 @@ class DataPage : public Page {
   Encoding::type encoding() const { return encoding_; }
   int64_t uncompressed_size() const { return uncompressed_size_; }
   const EncodedStatistics& statistics() const { return statistics_; }
+  /// Return the row ordinal within the row group to the first row in the data 
page.
+  /// Currently it is only present from data pages created by ColumnWriter in 
order
+  /// to collect page index.
+  std::optional<int64_t> first_row_index() const { return first_row_index_; }
 
   virtual ~DataPage() = default;
 
  protected:
   DataPage(PageType::type type, const std::shared_ptr<Buffer>& buffer, int32_t 
num_values,
            Encoding::type encoding, int64_t uncompressed_size,
-           const EncodedStatistics& statistics = EncodedStatistics())
+           const EncodedStatistics& statistics = EncodedStatistics(),
+           std::optional<int64_t> first_row_index = std::nullopt)
       : Page(buffer, type),
         num_values_(num_values),
         encoding_(encoding),
         uncompressed_size_(uncompressed_size),
-        statistics_(statistics) {}
+        statistics_(statistics),
+        first_row_index_(std::move(first_row_index)) {}
 
   int32_t num_values_;
   Encoding::type encoding_;
   int64_t uncompressed_size_;
   EncodedStatistics statistics_;
+  /// Row ordinal within the row group to the first row in the data page.
+  std::optional<int64_t> first_row_index_;
 };
 
 class DataPageV1 : public DataPage {
@@ -88,9 +97,10 @@ class DataPageV1 : public DataPage {
   DataPageV1(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
              Encoding::type encoding, Encoding::type definition_level_encoding,
              Encoding::type repetition_level_encoding, int64_t 
uncompressed_size,
-             const EncodedStatistics& statistics = EncodedStatistics())
+             const EncodedStatistics& statistics = EncodedStatistics(),
+             std::optional<int64_t> first_row_index = std::nullopt)
       : DataPage(PageType::DATA_PAGE, buffer, num_values, encoding, 
uncompressed_size,
-                 statistics),
+                 statistics, std::move(first_row_index)),
         definition_level_encoding_(definition_level_encoding),
         repetition_level_encoding_(repetition_level_encoding) {}
 
@@ -109,9 +119,10 @@ class DataPageV2 : public DataPage {
              int32_t num_rows, Encoding::type encoding,
              int32_t definition_levels_byte_length, int32_t 
repetition_levels_byte_length,
              int64_t uncompressed_size, bool is_compressed = false,
-             const EncodedStatistics& statistics = EncodedStatistics())
+             const EncodedStatistics& statistics = EncodedStatistics(),
+             std::optional<int64_t> first_row_index = std::nullopt)
       : DataPage(PageType::DATA_PAGE_V2, buffer, num_values, encoding, 
uncompressed_size,
-                 statistics),
+                 statistics, std::move(first_row_index)),
         num_nulls_(num_nulls),
         num_rows_(num_rows),
         definition_levels_byte_length_(definition_levels_byte_length),
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index fd1b0fc1e2..222fc853e3 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -49,6 +49,7 @@
 #include "parquet/encryption/internal_file_encryptor.h"
 #include "parquet/level_conversion.h"
 #include "parquet/metadata.h"
+#include "parquet/page_index.h"
 #include "parquet/platform.h"
 #include "parquet/properties.h"
 #include "parquet/schema.h"
@@ -253,7 +254,9 @@ class SerializedPageWriter : public PageWriter {
                        bool use_page_checksum_verification,
                        MemoryPool* pool = ::arrow::default_memory_pool(),
                        std::shared_ptr<Encryptor> meta_encryptor = nullptr,
-                       std::shared_ptr<Encryptor> data_encryptor = nullptr)
+                       std::shared_ptr<Encryptor> data_encryptor = nullptr,
+                       ColumnIndexBuilder* column_index_builder = nullptr,
+                       OffsetIndexBuilder* offset_index_builder = nullptr)
       : sink_(std::move(sink)),
         metadata_(metadata),
         pool_(pool),
@@ -268,7 +271,9 @@ class SerializedPageWriter : public PageWriter {
         page_checksum_verification_(use_page_checksum_verification),
         meta_encryptor_(std::move(meta_encryptor)),
         data_encryptor_(std::move(data_encryptor)),
-        encryption_buffer_(AllocateBuffer(pool, 0)) {
+        encryption_buffer_(AllocateBuffer(pool, 0)),
+        column_index_builder_(column_index_builder),
+        offset_index_builder_(offset_index_builder) {
     if (data_encryptor_ != nullptr || meta_encryptor_ != nullptr) {
       InitEncryption();
     }
@@ -339,6 +344,10 @@ class SerializedPageWriter : public PageWriter {
     if (meta_encryptor_ != nullptr) {
       UpdateEncryption(encryption::kColumnMetaData);
     }
+
+    // Serialized page writer does not need to adjust page offsets.
+    FinishPageIndexes(/*final_position=*/0);
+
     // index_page_offset = -1 since they are not supported
     metadata_->Finish(num_values_, dictionary_page_offset_, -1, 
data_page_offset_,
                       total_compressed_size_, total_uncompressed_size_, 
has_dictionary,
@@ -417,6 +426,25 @@ class SerializedPageWriter : public PageWriter {
         thrift_serializer_->Serialize(&page_header, sink_.get(), 
meta_encryptor_);
     PARQUET_THROW_NOT_OK(sink_->Write(output_data_buffer, output_data_len));
 
+    /// Collect page index
+    if (column_index_builder_ != nullptr) {
+      column_index_builder_->AddPage(page.statistics());
+    }
+    if (offset_index_builder_ != nullptr) {
+      const int64_t compressed_size = output_data_len + header_size;
+      if (compressed_size > std::numeric_limits<int32_t>::max()) {
+        throw ParquetException("Compressed page size overflows to INT32_MAX.");
+      }
+      if (!page.first_row_index().has_value()) {
+        throw ParquetException("First row index is not set in data page.");
+      }
+      /// start_pos is a relative offset in the buffered mode. It should be
+      /// adjusted via OffsetIndexBuilder::Finish() after BufferedPageWriter
+      /// has flushed all data pages.
+      offset_index_builder_->AddPage(start_pos, 
static_cast<int32_t>(compressed_size),
+                                     *page.first_row_index());
+    }
+
     total_uncompressed_size_ += uncompressed_size + header_size;
     total_compressed_size_ += output_data_len + header_size;
     num_values_ += page.num_values();
@@ -458,6 +486,17 @@ class SerializedPageWriter : public PageWriter {
     page_header.__set_data_page_header_v2(data_page_header);
   }
 
+  /// \brief Finish page index builders and update the stream offset to adjust
+  /// page offsets.
+  void FinishPageIndexes(int64_t final_position) {
+    if (column_index_builder_ != nullptr) {
+      column_index_builder_->Finish();
+    }
+    if (offset_index_builder_ != nullptr) {
+      offset_index_builder_->Finish(final_position);
+    }
+  }
+
   bool has_compressor() override { return (compressor_ != nullptr); }
 
   int64_t num_values() { return num_values_; }
@@ -563,6 +602,9 @@ class SerializedPageWriter : public PageWriter {
 
   std::map<Encoding::type, int32_t> dict_encoding_stats_;
   std::map<Encoding::type, int32_t> data_encoding_stats_;
+
+  ColumnIndexBuilder* column_index_builder_;
+  OffsetIndexBuilder* offset_index_builder_;
 };
 
 // This implementation of the PageWriter writes to the final sink on Close .
@@ -574,13 +616,16 @@ class BufferedPageWriter : public PageWriter {
                      bool use_page_checksum_verification,
                      MemoryPool* pool = ::arrow::default_memory_pool(),
                      std::shared_ptr<Encryptor> meta_encryptor = nullptr,
-                     std::shared_ptr<Encryptor> data_encryptor = nullptr)
+                     std::shared_ptr<Encryptor> data_encryptor = nullptr,
+                     ColumnIndexBuilder* column_index_builder = nullptr,
+                     OffsetIndexBuilder* offset_index_builder = nullptr)
       : final_sink_(std::move(sink)), metadata_(metadata), 
has_dictionary_pages_(false) {
     in_memory_sink_ = CreateOutputStream(pool);
     pager_ = std::make_unique<SerializedPageWriter>(
         in_memory_sink_, codec, compression_level, metadata, row_group_ordinal,
         current_column_ordinal, use_page_checksum_verification, pool,
-        std::move(meta_encryptor), std::move(data_encryptor));
+        std::move(meta_encryptor), std::move(data_encryptor), 
column_index_builder,
+        offset_index_builder);
   }
 
   int64_t WriteDictionaryPage(const DictionaryPage& page) override {
@@ -606,6 +651,9 @@ class BufferedPageWriter : public PageWriter {
     // Write metadata at end of column chunk
     metadata_->WriteTo(in_memory_sink_.get());
 
+    // Buffered page writer needs to adjust page offsets.
+    pager_->FinishPageIndexes(final_position);
+
     // flush everything to the serialized sink
     PARQUET_ASSIGN_OR_THROW(auto buffer, in_memory_sink_->Finish());
     PARQUET_THROW_NOT_OK(final_sink_->Write(buffer));
@@ -638,17 +686,20 @@ std::unique_ptr<PageWriter> PageWriter::Open(
     int compression_level, ColumnChunkMetaDataBuilder* metadata,
     int16_t row_group_ordinal, int16_t column_chunk_ordinal, MemoryPool* pool,
     bool buffered_row_group, std::shared_ptr<Encryptor> meta_encryptor,
-    std::shared_ptr<Encryptor> data_encryptor, bool 
page_write_checksum_enabled) {
+    std::shared_ptr<Encryptor> data_encryptor, bool 
page_write_checksum_enabled,
+    ColumnIndexBuilder* column_index_builder, OffsetIndexBuilder* 
offset_index_builder) {
   if (buffered_row_group) {
     return std::unique_ptr<PageWriter>(new BufferedPageWriter(
         std::move(sink), codec, compression_level, metadata, row_group_ordinal,
         column_chunk_ordinal, page_write_checksum_enabled, pool,
-        std::move(meta_encryptor), std::move(data_encryptor)));
+        std::move(meta_encryptor), std::move(data_encryptor), 
column_index_builder,
+        offset_index_builder));
   } else {
     return std::unique_ptr<PageWriter>(new SerializedPageWriter(
         std::move(sink), codec, compression_level, metadata, row_group_ordinal,
         column_chunk_ordinal, page_write_checksum_enabled, pool,
-        std::move(meta_encryptor), std::move(data_encryptor)));
+        std::move(meta_encryptor), std::move(data_encryptor), 
column_index_builder,
+        offset_index_builder));
   }
 }
 
@@ -925,6 +976,7 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t 
definition_levels_rle_size,
   }
 
   int32_t num_values = static_cast<int32_t>(num_buffered_values_);
+  int64_t first_row_index = rows_written_ - num_buffered_rows_;
 
   // Write the page to OutputStream eagerly if there is no dictionary or
   // if dictionary encoding has fallen back to PLAIN
@@ -934,13 +986,13 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t 
definition_levels_rle_size,
         compressed_data->CopySlice(0, compressed_data->size(), allocator_));
     std::unique_ptr<DataPage> page_ptr = std::make_unique<DataPageV1>(
         compressed_data_copy, num_values, encoding_, Encoding::RLE, 
Encoding::RLE,
-        uncompressed_size, page_stats);
+        uncompressed_size, page_stats, first_row_index);
     total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader);
 
     data_pages_.push_back(std::move(page_ptr));
   } else {  // Eagerly write pages
     DataPageV1 page(compressed_data, num_values, encoding_, Encoding::RLE, 
Encoding::RLE,
-                    uncompressed_size, page_stats);
+                    uncompressed_size, page_stats, first_row_index);
     WriteDataPage(page);
   }
 }
@@ -977,6 +1029,7 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t 
definition_levels_rle_size,
   int32_t num_rows = static_cast<int32_t>(num_buffered_rows_);
   int32_t def_levels_byte_length = 
static_cast<int32_t>(definition_levels_rle_size);
   int32_t rep_levels_byte_length = 
static_cast<int32_t>(repetition_levels_rle_size);
+  int64_t first_row_index = rows_written_ - num_buffered_rows_;
 
   // page_stats.null_count is not set when page_statistics_ is nullptr. It is 
only used
   // here for safety check.
@@ -989,13 +1042,14 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t 
definition_levels_rle_size,
                             combined->CopySlice(0, combined->size(), 
allocator_));
     std::unique_ptr<DataPage> page_ptr = std::make_unique<DataPageV2>(
         combined, num_values, null_count, num_rows, encoding_, 
def_levels_byte_length,
-        rep_levels_byte_length, uncompressed_size, pager_->has_compressor(), 
page_stats);
+        rep_levels_byte_length, uncompressed_size, pager_->has_compressor(), 
page_stats,
+        first_row_index);
     total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader);
     data_pages_.push_back(std::move(page_ptr));
   } else {
     DataPageV2 page(combined, num_values, null_count, num_rows, encoding_,
                     def_levels_byte_length, rep_levels_byte_length, 
uncompressed_size,
-                    pager_->has_compressor(), page_stats);
+                    pager_->has_compressor(), page_stats, first_row_index);
     WriteDataPage(page);
   }
 }
@@ -1313,7 +1367,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
   const WriterProperties* properties() override { return properties_; }
 
   bool pages_change_on_record_boundaries() const {
-    return properties_->data_page_version() == ParquetDataPageVersion::V2;
+    return properties_->data_page_version() == ParquetDataPageVersion::V2 ||
+           properties_->write_page_index();
   }
 
  private:
@@ -1515,6 +1570,19 @@ class TypedColumnWriterImpl : public ColumnWriterImpl, 
public TypedColumnWriter<
     }
   }
 
+  /// \brief Write values with spaces and update page statistics accordingly.
+  ///
+  /// \param values input buffer of values to write, including spaces.
+  /// \param num_values number of non-null values in the values buffer.
+  /// \param num_spaced_values length of values buffer, including spaces and 
does not
+  ///   count some nulls from ancestor (e.g. empty lists).
+  /// \param valid_bits validity bitmap of values buffer, which does not 
include some
+  ///   nulls from ancestor (e.g. empty lists).
+  /// \param valid_bits_offset offset to valid_bits bitmap.
+  /// \param num_levels number of levels to write, including nulls from values 
buffer
+  ///   and nulls from ancestor (e.g. empty lists).
+  /// \param num_nulls number of nulls in the values buffer as well as nulls 
from the
+  ///   ancestor (e.g. empty lists).
   void WriteValuesSpaced(const T* values, int64_t num_values, int64_t 
num_spaced_values,
                          const uint8_t* valid_bits, int64_t valid_bits_offset,
                          int64_t num_levels, int64_t num_nulls) {
diff --git a/cpp/src/parquet/column_writer.h b/cpp/src/parquet/column_writer.h
index 4dd4b10ccc..792b108ac8 100644
--- a/cpp/src/parquet/column_writer.h
+++ b/cpp/src/parquet/column_writer.h
@@ -42,11 +42,13 @@ class RleEncoder;
 namespace parquet {
 
 struct ArrowWriteContext;
+class ColumnChunkMetaDataBuilder;
 class ColumnDescriptor;
+class ColumnIndexBuilder;
 class DataPage;
 class DictionaryPage;
-class ColumnChunkMetaDataBuilder;
 class Encryptor;
+class OffsetIndexBuilder;
 class WriterProperties;
 
 class PARQUET_EXPORT LevelEncoder {
@@ -91,7 +93,11 @@ class PARQUET_EXPORT PageWriter {
       bool buffered_row_group = false,
       std::shared_ptr<Encryptor> header_encryptor = NULLPTR,
       std::shared_ptr<Encryptor> data_encryptor = NULLPTR,
-      bool page_write_checksum_enabled = false);
+      bool page_write_checksum_enabled = false,
+      // column_index_builder MUST outlive the PageWriter
+      ColumnIndexBuilder* column_index_builder = NULLPTR,
+      // offset_index_builder MUST outlive the PageWriter
+      OffsetIndexBuilder* offset_index_builder = NULLPTR);
 
   // The Column Writer decides if dictionary encoding is used if set and
   // if the dictionary encoding has fallen back to default encoding on 
reaching dictionary
diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc
index ec2e0e8a8a..481d5b6d30 100644
--- a/cpp/src/parquet/file_writer.cc
+++ b/cpp/src/parquet/file_writer.cc
@@ -28,9 +28,9 @@
 #include "parquet/encryption/encryption_internal.h"
 #include "parquet/encryption/internal_file_encryptor.h"
 #include "parquet/exception.h"
+#include "parquet/page_index.h"
 #include "parquet/platform.h"
 #include "parquet/schema.h"
-#include "parquet/types.h"
 
 using arrow::MemoryPool;
 
@@ -89,7 +89,8 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
   RowGroupSerializer(std::shared_ptr<ArrowOutputStream> sink,
                      RowGroupMetaDataBuilder* metadata, int16_t 
row_group_ordinal,
                      const WriterProperties* properties, bool 
buffered_row_group = false,
-                     InternalFileEncryptor* file_encryptor = nullptr)
+                     InternalFileEncryptor* file_encryptor = nullptr,
+                     PageIndexBuilder* page_index_builder = nullptr)
       : sink_(std::move(sink)),
         metadata_(metadata),
         properties_(properties),
@@ -100,7 +101,8 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
         next_column_index_(0),
         num_rows_(0),
         buffered_row_group_(buffered_row_group),
-        file_encryptor_(file_encryptor) {
+        file_encryptor_(file_encryptor),
+        page_index_builder_(page_index_builder) {
     if (buffered_row_group) {
       InitColumns();
     } else {
@@ -135,8 +137,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
           column_writers_[0]->total_compressed_bytes_written();
     }
 
-    ++next_column_index_;
-
+    const int32_t column_ordinal = next_column_index_++;
     const auto& path = col_meta->descr()->path();
     auto meta_encryptor =
         file_encryptor_ ? 
file_encryptor_->GetColumnMetaEncryptor(path->ToDotString())
@@ -144,11 +145,17 @@ class RowGroupSerializer : public 
RowGroupWriter::Contents {
     auto data_encryptor =
         file_encryptor_ ? 
file_encryptor_->GetColumnDataEncryptor(path->ToDotString())
                         : nullptr;
+    auto ci_builder = page_index_builder_
+                          ? 
page_index_builder_->GetColumnIndexBuilder(column_ordinal)
+                          : nullptr;
+    auto oi_builder = page_index_builder_
+                          ? 
page_index_builder_->GetOffsetIndexBuilder(column_ordinal)
+                          : nullptr;
     std::unique_ptr<PageWriter> pager = PageWriter::Open(
         sink_, properties_->compression(path), 
properties_->compression_level(path),
-        col_meta, row_group_ordinal_, static_cast<int16_t>(next_column_index_ 
- 1),
+        col_meta, row_group_ordinal_, static_cast<int16_t>(column_ordinal),
         properties_->memory_pool(), false, meta_encryptor, data_encryptor,
-        properties_->page_checksum_enabled());
+        properties_->page_checksum_enabled(), ci_builder, oi_builder);
     column_writers_[0] = ColumnWriter::Make(col_meta, std::move(pager), 
properties_);
     return column_writers_[0].get();
   }
@@ -240,6 +247,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
   mutable int64_t num_rows_;
   bool buffered_row_group_;
   InternalFileEncryptor* file_encryptor_;
+  PageIndexBuilder* page_index_builder_;
 
   void CheckRowsWritten() const {
     // verify when only one column is written at a time
@@ -267,18 +275,25 @@ class RowGroupSerializer : public 
RowGroupWriter::Contents {
     for (int i = 0; i < num_columns(); i++) {
       auto col_meta = metadata_->NextColumnChunk();
       const auto& path = col_meta->descr()->path();
+      const int32_t column_ordinal = next_column_index_++;
       auto meta_encryptor =
           file_encryptor_ ? 
file_encryptor_->GetColumnMetaEncryptor(path->ToDotString())
                           : nullptr;
       auto data_encryptor =
           file_encryptor_ ? 
file_encryptor_->GetColumnDataEncryptor(path->ToDotString())
                           : nullptr;
+      auto ci_builder = page_index_builder_
+                            ? 
page_index_builder_->GetColumnIndexBuilder(column_ordinal)
+                            : nullptr;
+      auto oi_builder = page_index_builder_
+                            ? 
page_index_builder_->GetOffsetIndexBuilder(column_ordinal)
+                            : nullptr;
       std::unique_ptr<PageWriter> pager = PageWriter::Open(
           sink_, properties_->compression(path), 
properties_->compression_level(path),
           col_meta, static_cast<int16_t>(row_group_ordinal_),
-          static_cast<int16_t>(next_column_index_++), 
properties_->memory_pool(),
+          static_cast<int16_t>(column_ordinal), properties_->memory_pool(),
           buffered_row_group_, meta_encryptor, data_encryptor,
-          properties_->page_checksum_enabled());
+          properties_->page_checksum_enabled(), ci_builder, oi_builder);
       column_writers_.push_back(
           ColumnWriter::Make(col_meta, std::move(pager), properties_));
     }
@@ -317,6 +332,8 @@ class FileSerializer : public ParquetFileWriter::Contents {
       }
       row_group_writer_.reset();
 
+      WritePageIndex();
+
       // Write magic bytes and metadata
       auto file_encryption_properties = 
properties_->file_encryption_properties();
 
@@ -345,9 +362,12 @@ class FileSerializer : public ParquetFileWriter::Contents {
     }
     num_row_groups_++;
     auto rg_metadata = metadata_->AppendRowGroup();
+    if (page_index_builder_) {
+      page_index_builder_->AppendRowGroup();
+    }
     std::unique_ptr<RowGroupWriter::Contents> contents(new RowGroupSerializer(
         sink_, rg_metadata, static_cast<int16_t>(num_row_groups_ - 1), 
properties_.get(),
-        buffered_row_group, file_encryptor_.get()));
+        buffered_row_group, file_encryptor_.get(), page_index_builder_.get()));
     row_group_writer_ = std::make_unique<RowGroupWriter>(std::move(contents));
     return row_group_writer_.get();
   }
@@ -412,6 +432,21 @@ class FileSerializer : public ParquetFileWriter::Contents {
     }
   }
 
+  void WritePageIndex() {
+    if (page_index_builder_ != nullptr) {
+      if (properties_->file_encryption_properties()) {
+        throw ParquetException("Encryption is not supported with page index");
+      }
+
+      // Serialize page index after all row groups have been written and report
+      // location to the file metadata.
+      PageIndexLocation page_index_location;
+      page_index_builder_->Finish();
+      page_index_builder_->WriteTo(sink_.get(), &page_index_location);
+      metadata_->SetPageIndexLocation(page_index_location);
+    }
+  }
+
   std::shared_ptr<ArrowOutputStream> sink_;
   bool is_open_;
   const std::shared_ptr<WriterProperties> properties_;
@@ -420,7 +455,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
   std::unique_ptr<FileMetaDataBuilder> metadata_;
   // Only one of the row group writers is active at a time
   std::unique_ptr<RowGroupWriter> row_group_writer_;
-
+  std::unique_ptr<PageIndexBuilder> page_index_builder_;
   std::unique_ptr<InternalFileEncryptor> file_encryptor_;
 
   void StartFile() {
@@ -459,6 +494,10 @@ class FileSerializer : public ParquetFileWriter::Contents {
         PARQUET_THROW_NOT_OK(sink_->Write(kParquetMagic, 4));
       }
     }
+
+    if (properties_->write_page_index()) {
+      page_index_builder_ = PageIndexBuilder::Make(&schema_);
+    }
   }
 };
 
diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc
index e47257bf89..b6c240115f 100644
--- a/cpp/src/parquet/metadata.cc
+++ b/cpp/src/parquet/metadata.cc
@@ -1763,6 +1763,40 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
     return current_row_group_builder_.get();
   }
 
+  void SetPageIndexLocation(const PageIndexLocation& location) {
+    auto set_index_location =
+        [this](size_t row_group_ordinal,
+               const PageIndexLocation::FileIndexLocation& file_index_location,
+               bool column_index) {
+          auto& row_group_metadata = this->row_groups_.at(row_group_ordinal);
+          auto iter = file_index_location.find(row_group_ordinal);
+          if (iter != file_index_location.cend()) {
+            const auto& row_group_index_location = iter->second;
+            for (size_t i = 0; i < row_group_index_location.size(); ++i) {
+              if (i >= row_group_metadata.columns.size()) {
+                throw ParquetException("Cannot find metadata for column 
ordinal ", i);
+              }
+              auto& column_metadata = row_group_metadata.columns.at(i);
+              const auto& index_location = row_group_index_location.at(i);
+              if (index_location.has_value()) {
+                if (column_index) {
+                  
column_metadata.__set_column_index_offset(index_location->offset);
+                  
column_metadata.__set_column_index_length(index_location->length);
+                } else {
+                  
column_metadata.__set_offset_index_offset(index_location->offset);
+                  
column_metadata.__set_offset_index_length(index_location->length);
+                }
+              }
+            }
+          }
+        };
+
+    for (size_t i = 0; i < row_groups_.size(); ++i) {
+      set_index_location(i, location.column_index_location, true);
+      set_index_location(i, location.offset_index_location, false);
+    }
+  }
+
   std::unique_ptr<FileMetaData> Finish() {
     int64_t total_rows = 0;
     for (auto row_group : row_groups_) {
@@ -1888,6 +1922,10 @@ RowGroupMetaDataBuilder* 
FileMetaDataBuilder::AppendRowGroup() {
   return impl_->AppendRowGroup();
 }
 
+void FileMetaDataBuilder::SetPageIndexLocation(const PageIndexLocation& 
location) {
+  impl_->SetPageIndexLocation(location);
+}
+
 std::unique_ptr<FileMetaData> FileMetaDataBuilder::Finish() { return 
impl_->Finish(); }
 
 std::unique_ptr<FileCryptoMetaData> FileMetaDataBuilder::GetCryptoMetaData() {
diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h
index 277f59a1b6..efcb17be04 100644
--- a/cpp/src/parquet/metadata.h
+++ b/cpp/src/parquet/metadata.h
@@ -506,6 +506,21 @@ class PARQUET_EXPORT RowGroupMetaDataBuilder {
   std::unique_ptr<RowGroupMetaDataBuilderImpl> impl_;
 };
 
+/// \brief Public struct for location to all page indexes in a parquet file.
+struct PageIndexLocation {
+  /// Alias type of page index location of a row group. The index location
+  /// is located by column ordinal. If the column does not have the page index,
+  /// its value is set to std::nullopt.
+  using RowGroupIndexLocation = std::vector<std::optional<IndexLocation>>;
+  /// Alias type of page index location of a parquet file. The index location
+  /// is located by the row group ordinal.
+  using FileIndexLocation = std::map<size_t, RowGroupIndexLocation>;
+  /// Row group column index locations which uses row group ordinal as the key.
+  FileIndexLocation column_index_location;
+  /// Row group offset index locations which uses row group ordinal as the key.
+  FileIndexLocation offset_index_location;
+};
+
 class PARQUET_EXPORT FileMetaDataBuilder {
  public:
   // API convenience to get a MetaData reader
@@ -518,6 +533,9 @@ class PARQUET_EXPORT FileMetaDataBuilder {
   // The prior RowGroupMetaDataBuilder (if any) is destroyed
   RowGroupMetaDataBuilder* AppendRowGroup();
 
+  // Update location to all page indexes in the parquet file
+  void SetPageIndexLocation(const PageIndexLocation& location);
+
   // Complete the Thrift structure
   std::unique_ptr<FileMetaData> Finish();
 
diff --git a/cpp/src/parquet/page_index.cc b/cpp/src/parquet/page_index.cc
index f24cecf43b..c2a42547ee 100644
--- a/cpp/src/parquet/page_index.cc
+++ b/cpp/src/parquet/page_index.cc
@@ -426,6 +426,355 @@ class PageIndexReaderImpl : public PageIndexReader {
   std::unordered_map<int32_t, RowGroupIndexReadRange> index_read_ranges_;
 };
 
+/// \brief Internal state of page index builder.
+enum class BuilderState {
+  /// Created but not yet write any data.
+  kCreated,
+  /// Some data are written but not yet finished.
+  kStarted,
+  /// All data are written and no more write is allowed.
+  kFinished,
+  /// The builder has corrupted data or empty data and therefore discarded.
+  kDiscarded
+};
+
+template <typename DType>
+class ColumnIndexBuilderImpl final : public ColumnIndexBuilder {
+ public:
+  using T = typename DType::c_type;
+
+  explicit ColumnIndexBuilderImpl(const ColumnDescriptor* descr) : 
descr_(descr) {
+    /// Initialize the null_counts vector as set. Invalid null_counts vector 
from
+    /// any page will invalidate the null_counts vector of the column index.
+    column_index_.__isset.null_counts = true;
+    column_index_.boundary_order = format::BoundaryOrder::UNORDERED;
+  }
+
+  void AddPage(const EncodedStatistics& stats) override {
+    if (state_ == BuilderState::kFinished) {
+      throw ParquetException("Cannot add page to finished 
ColumnIndexBuilder.");
+    } else if (state_ == BuilderState::kDiscarded) {
+      /// The offset index is discarded. Do nothing.
+      return;
+    }
+
+    state_ = BuilderState::kStarted;
+
+    if (stats.all_null_value) {
+      column_index_.null_pages.emplace_back(true);
+      column_index_.min_values.emplace_back("");
+      column_index_.max_values.emplace_back("");
+    } else if (stats.has_min && stats.has_max) {
+      const size_t page_ordinal = column_index_.null_pages.size();
+      non_null_page_indices_.emplace_back(page_ordinal);
+      column_index_.min_values.emplace_back(stats.min());
+      column_index_.max_values.emplace_back(stats.max());
+      column_index_.null_pages.emplace_back(false);
+    } else {
+      /// This is a non-null page but it lacks of meaningful min/max values.
+      /// Discard the column index.
+      state_ = BuilderState::kDiscarded;
+      return;
+    }
+
+    if (column_index_.__isset.null_counts && stats.has_null_count) {
+      column_index_.null_counts.emplace_back(stats.null_count);
+    } else {
+      column_index_.__isset.null_counts = false;
+      column_index_.null_counts.clear();
+    }
+  }
+
+  void Finish() override {
+    switch (state_) {
+      case BuilderState::kCreated: {
+        /// No page is added. Discard the column index.
+        state_ = BuilderState::kDiscarded;
+        return;
+      }
+      case BuilderState::kFinished:
+        throw ParquetException("ColumnIndexBuilder is already finished.");
+      case BuilderState::kDiscarded:
+        // The column index is discarded. Do nothing.
+        return;
+      case BuilderState::kStarted:
+        break;
+    }
+
+    state_ = BuilderState::kFinished;
+
+    /// Clear null_counts vector because at least one page does not provide it.
+    if (!column_index_.__isset.null_counts) {
+      column_index_.null_counts.clear();
+    }
+
+    /// Decode min/max values according to the data type.
+    const size_t non_null_page_count = non_null_page_indices_.size();
+    std::vector<T> min_values, max_values;
+    min_values.resize(non_null_page_count);
+    max_values.resize(non_null_page_count);
+    auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
+    for (size_t i = 0; i < non_null_page_count; ++i) {
+      auto page_ordinal = non_null_page_indices_.at(i);
+      Decode<DType>(decoder, column_index_.min_values.at(page_ordinal), 
&min_values, i);
+      Decode<DType>(decoder, column_index_.max_values.at(page_ordinal), 
&max_values, i);
+    }
+
+    /// Decide the boundary order from decoded min/max values.
+    auto boundary_order = DetermineBoundaryOrder(min_values, max_values);
+    column_index_.__set_boundary_order(ToThrift(boundary_order));
+  }
+
+  void WriteTo(::arrow::io::OutputStream* sink) const override {
+    if (state_ == BuilderState::kFinished) {
+      ThriftSerializer{}.Serialize(&column_index_, sink);
+    }
+  }
+
+  std::unique_ptr<ColumnIndex> Build() const override {
+    if (state_ == BuilderState::kFinished) {
+      return std::make_unique<TypedColumnIndexImpl<DType>>(*descr_, 
column_index_);
+    }
+    return nullptr;
+  }
+
+ private:
+  BoundaryOrder::type DetermineBoundaryOrder(const std::vector<T>& min_values,
+                                             const std::vector<T>& max_values) 
const {
+    DCHECK_EQ(min_values.size(), max_values.size());
+    if (min_values.empty()) {
+      return BoundaryOrder::Unordered;
+    }
+
+    std::shared_ptr<TypedComparator<DType>> comparator;
+    try {
+      comparator = MakeComparator<DType>(descr_);
+    } catch (const ParquetException&) {
+      /// Simply return unordered for unsupported comparator.
+      return BoundaryOrder::Unordered;
+    }
+
+    /// Check if both min_values and max_values are in ascending order.
+    bool is_ascending = true;
+    for (size_t i = 1; i < min_values.size(); ++i) {
+      if (comparator->Compare(min_values[i], min_values[i - 1]) ||
+          comparator->Compare(max_values[i], max_values[i - 1])) {
+        is_ascending = false;
+        break;
+      }
+    }
+    if (is_ascending) {
+      return BoundaryOrder::Ascending;
+    }
+
+    /// Check if both min_values and max_values are in descending order.
+    bool is_descending = true;
+    for (size_t i = 1; i < min_values.size(); ++i) {
+      if (comparator->Compare(min_values[i - 1], min_values[i]) ||
+          comparator->Compare(max_values[i - 1], max_values[i])) {
+        is_descending = false;
+        break;
+      }
+    }
+    if (is_descending) {
+      return BoundaryOrder::Descending;
+    }
+
+    /// Neither ascending nor descending is detected.
+    return BoundaryOrder::Unordered;
+  }
+
+  const ColumnDescriptor* descr_;
+  format::ColumnIndex column_index_;
+  std::vector<size_t> non_null_page_indices_;
+  BuilderState state_ = BuilderState::kCreated;
+};
+
+class OffsetIndexBuilderImpl final : public OffsetIndexBuilder {
+ public:
+  OffsetIndexBuilderImpl() = default;
+
+  void AddPage(int64_t offset, int32_t compressed_page_size,
+               int64_t first_row_index) override {
+    if (state_ == BuilderState::kFinished) {
+      throw ParquetException("Cannot add page to finished 
OffsetIndexBuilder.");
+    } else if (state_ == BuilderState::kDiscarded) {
+      /// The offset index is discarded. Do nothing.
+      return;
+    }
+
+    state_ = BuilderState::kStarted;
+
+    format::PageLocation page_location;
+    page_location.__set_offset(offset);
+    page_location.__set_compressed_page_size(compressed_page_size);
+    page_location.__set_first_row_index(first_row_index);
+    offset_index_.page_locations.emplace_back(std::move(page_location));
+  }
+
+  void Finish(int64_t final_position) override {
+    switch (state_) {
+      case BuilderState::kCreated: {
+        /// No pages are added. Simply discard the offset index.
+        state_ = BuilderState::kDiscarded;
+        break;
+      }
+      case BuilderState::kStarted: {
+        /// Adjust page offsets according the final position.
+        if (final_position > 0) {
+          for (auto& page_location : offset_index_.page_locations) {
+            page_location.__set_offset(page_location.offset + final_position);
+          }
+        }
+        state_ = BuilderState::kFinished;
+        break;
+      }
+      case BuilderState::kFinished:
+      case BuilderState::kDiscarded:
+        throw ParquetException("OffsetIndexBuilder is already finished");
+    }
+  }
+
+  void WriteTo(::arrow::io::OutputStream* sink) const override {
+    if (state_ == BuilderState::kFinished) {
+      ThriftSerializer{}.Serialize(&offset_index_, sink);
+    }
+  }
+
+  std::unique_ptr<OffsetIndex> Build() const override {
+    if (state_ == BuilderState::kFinished) {
+      return std::make_unique<OffsetIndexImpl>(offset_index_);
+    }
+    return nullptr;
+  }
+
+ private:
+  format::OffsetIndex offset_index_;
+  BuilderState state_ = BuilderState::kCreated;
+};
+
+class PageIndexBuilderImpl final : public PageIndexBuilder {
+ public:
+  explicit PageIndexBuilderImpl(const SchemaDescriptor* schema) : 
schema_(schema) {}
+
+  void AppendRowGroup() override {
+    if (finished_) {
+      throw ParquetException(
+          "Cannot call AppendRowGroup() to finished PageIndexBuilder.");
+    }
+
+    // Append new builders of next row group.
+    const auto num_columns = static_cast<size_t>(schema_->num_columns());
+    column_index_builders_.emplace_back();
+    offset_index_builders_.emplace_back();
+    column_index_builders_.back().resize(num_columns);
+    offset_index_builders_.back().resize(num_columns);
+
+    DCHECK_EQ(column_index_builders_.size(), offset_index_builders_.size());
+    DCHECK_EQ(column_index_builders_.back().size(), num_columns);
+    DCHECK_EQ(offset_index_builders_.back().size(), num_columns);
+  }
+
+  ColumnIndexBuilder* GetColumnIndexBuilder(int32_t i) override {
+    CheckState(i);
+    std::unique_ptr<ColumnIndexBuilder>& builder = 
column_index_builders_.back()[i];
+    if (builder == nullptr) {
+      builder = ColumnIndexBuilder::Make(schema_->Column(i));
+    }
+    return builder.get();
+  }
+
+  OffsetIndexBuilder* GetOffsetIndexBuilder(int32_t i) override {
+    CheckState(i);
+    std::unique_ptr<OffsetIndexBuilder>& builder = 
offset_index_builders_.back()[i];
+    if (builder == nullptr) {
+      builder = OffsetIndexBuilder::Make();
+    }
+    return builder.get();
+  }
+
+  void Finish() override { finished_ = true; }
+
+  void WriteTo(::arrow::io::OutputStream* sink,
+               PageIndexLocation* location) const override {
+    if (!finished_) {
+      throw ParquetException("Cannot call WriteTo() to unfinished 
PageIndexBuilder.");
+    }
+
+    location->column_index_location.clear();
+    location->offset_index_location.clear();
+
+    /// Serialize column index ordered by row group ordinal and then column 
ordinal.
+    SerializeIndex(column_index_builders_, sink, 
&location->column_index_location);
+
+    /// Serialize offset index ordered by row group ordinal and then column 
ordinal.
+    SerializeIndex(offset_index_builders_, sink, 
&location->offset_index_location);
+  }
+
+ private:
+  /// Make sure column ordinal is not out of bound and the builder is in good 
state.
+  void CheckState(int32_t column_ordinal) const {
+    if (finished_) {
+      throw ParquetException("PageIndexBuilder is already finished.");
+    }
+    if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) {
+      throw ParquetException("Invalid column ordinal: ", column_ordinal);
+    }
+    if (offset_index_builders_.empty() || column_index_builders_.empty()) {
+      throw ParquetException("No row group appended to PageIndexBuilder.");
+    }
+  }
+
+  template <typename Builder>
+  void SerializeIndex(
+      const std::vector<std::vector<std::unique_ptr<Builder>>>& 
page_index_builders,
+      ::arrow::io::OutputStream* sink,
+      std::map<size_t, std::vector<std::optional<IndexLocation>>>* location) 
const {
+    const auto num_columns = static_cast<size_t>(schema_->num_columns());
+
+    /// Serialize the same kind of page index row group by row group.
+    for (size_t row_group = 0; row_group < page_index_builders.size(); 
++row_group) {
+      const auto& row_group_page_index_builders = 
page_index_builders[row_group];
+      DCHECK_EQ(row_group_page_index_builders.size(), num_columns);
+
+      bool has_valid_index = false;
+      std::vector<std::optional<IndexLocation>> locations(num_columns, 
std::nullopt);
+
+      /// In the same row group, serialize the same kind of page index column 
by column.
+      for (size_t column = 0; column < num_columns; ++column) {
+        const auto& column_page_index_builder = 
row_group_page_index_builders[column];
+        if (column_page_index_builder != nullptr) {
+          /// Try serializing the page index.
+          PARQUET_ASSIGN_OR_THROW(int64_t pos_before_write, sink->Tell());
+          column_page_index_builder->WriteTo(sink);
+          PARQUET_ASSIGN_OR_THROW(int64_t pos_after_write, sink->Tell());
+          int64_t len = pos_after_write - pos_before_write;
+
+          /// The page index is not serialized and skip reporting its location
+          if (len == 0) {
+            continue;
+          }
+
+          if (len > std::numeric_limits<int32_t>::max()) {
+            throw ParquetException("Page index size overflows to INT32_MAX");
+          }
+          locations[column] = {pos_before_write, static_cast<int32_t>(len)};
+          has_valid_index = true;
+        }
+      }
+
+      if (has_valid_index) {
+        location->emplace(row_group, std::move(locations));
+      }
+    }
+  }
+
+  const SchemaDescriptor* schema_;
+  std::vector<std::vector<std::unique_ptr<ColumnIndexBuilder>>> 
column_index_builders_;
+  std::vector<std::vector<std::unique_ptr<OffsetIndexBuilder>>> 
offset_index_builders_;
+  bool finished_ = false;
+};
+
 }  // namespace
 
 RowGroupIndexReadRange PageIndexReader::DeterminePageIndexRangesInRowGroup(
@@ -531,4 +880,38 @@ std::shared_ptr<PageIndexReader> PageIndexReader::Make(
                                                std::move(file_decryptor));
 }
 
+std::unique_ptr<ColumnIndexBuilder> ColumnIndexBuilder::Make(
+    const ColumnDescriptor* descr) {
+  switch (descr->physical_type()) {
+    case Type::BOOLEAN:
+      return std::make_unique<ColumnIndexBuilderImpl<BooleanType>>(descr);
+    case Type::INT32:
+      return std::make_unique<ColumnIndexBuilderImpl<Int32Type>>(descr);
+    case Type::INT64:
+      return std::make_unique<ColumnIndexBuilderImpl<Int64Type>>(descr);
+    case Type::INT96:
+      return std::make_unique<ColumnIndexBuilderImpl<Int96Type>>(descr);
+    case Type::FLOAT:
+      return std::make_unique<ColumnIndexBuilderImpl<FloatType>>(descr);
+    case Type::DOUBLE:
+      return std::make_unique<ColumnIndexBuilderImpl<DoubleType>>(descr);
+    case Type::BYTE_ARRAY:
+      return std::make_unique<ColumnIndexBuilderImpl<ByteArrayType>>(descr);
+    case Type::FIXED_LEN_BYTE_ARRAY:
+      return std::make_unique<ColumnIndexBuilderImpl<FLBAType>>(descr);
+    case Type::UNDEFINED:
+      return nullptr;
+  }
+  ::arrow::Unreachable("Cannot make ColumnIndexBuilder of an unknown type");
+  return nullptr;
+}
+
+std::unique_ptr<OffsetIndexBuilder> OffsetIndexBuilder::Make() {
+  return std::make_unique<OffsetIndexBuilderImpl>();
+}
+
+std::unique_ptr<PageIndexBuilder> PageIndexBuilder::Make(const 
SchemaDescriptor* schema) {
+  return std::make_unique<PageIndexBuilderImpl>(schema);
+}
+
 }  // namespace parquet
diff --git a/cpp/src/parquet/page_index.h b/cpp/src/parquet/page_index.h
index bbd9d794c1..b6d27cfc41 100644
--- a/cpp/src/parquet/page_index.h
+++ b/cpp/src/parquet/page_index.h
@@ -26,8 +26,10 @@
 namespace parquet {
 
 class ColumnDescriptor;
+class EncodedStatistics;
 class FileMetaData;
 class InternalFileDecryptor;
+struct PageIndexLocation;
 class ReaderProperties;
 class RowGroupMetaData;
 class RowGroupPageIndexReader;
@@ -250,4 +252,116 @@ class PARQUET_EXPORT PageIndexReader {
       const RowGroupMetaData& row_group_metadata, const std::vector<int32_t>& 
columns);
 };
 
+/// \brief Interface for collecting column index of data pages in a column 
chunk.
+class PARQUET_EXPORT ColumnIndexBuilder {
+ public:
+  /// \brief API convenience to create a ColumnIndexBuilder.
+  static std::unique_ptr<ColumnIndexBuilder> Make(const ColumnDescriptor* 
descr);
+
+  virtual ~ColumnIndexBuilder() = default;
+
+  /// \brief Add statistics of a data page.
+  ///
+  /// If the ColumnIndexBuilder has seen any corrupted statistics, it will
+  /// not update statistics any more.
+  ///
+  /// \param stats Page statistics in the encoded form.
+  virtual void AddPage(const EncodedStatistics& stats) = 0;
+
+  /// \brief Complete the column index.
+  ///
+  /// Once called, AddPage() can no longer be called.
+  /// WriteTo() and Build() can only called after Finish() has been called.
+  virtual void Finish() = 0;
+
+  /// \brief Serialize the column index thrift message.
+  ///
+  /// If the ColumnIndexBuilder has seen any corrupted statistics, it will
+  /// not write any data to the sink.
+  ///
+  /// \param[out] sink output stream to write the serialized message.
+  virtual void WriteTo(::arrow::io::OutputStream* sink) const = 0;
+
+  /// \brief Create a ColumnIndex directly.
+  ///
+  /// \return If the ColumnIndexBuilder has seen any corrupted statistics, it 
simply
+  /// returns nullptr. Otherwise the column index is built and returned.
+  virtual std::unique_ptr<ColumnIndex> Build() const = 0;
+};
+
+/// \brief Interface for collecting offset index of data pages in a column 
chunk.
+class PARQUET_EXPORT OffsetIndexBuilder {
+ public:
+  /// \brief API convenience to create a OffsetIndexBuilder.
+  static std::unique_ptr<OffsetIndexBuilder> Make();
+
+  virtual ~OffsetIndexBuilder() = default;
+
+  /// \brief Add page location of a data page.
+  virtual void AddPage(int64_t offset, int32_t compressed_page_size,
+                       int64_t first_row_index) = 0;
+
+  /// \brief Add page location of a data page.
+  void AddPage(const PageLocation& page_location) {
+    AddPage(page_location.offset, page_location.compressed_page_size,
+            page_location.first_row_index);
+  }
+
+  /// \brief Complete the offset index.
+  ///
+  /// In the buffered row group mode, data pages are flushed into memory
+  /// sink and the OffsetIndexBuilder has only collected the relative offset
+  /// which requires adjustment once they are flushed to the file.
+  ///
+  /// \param final_position Final stream offset to add for page offset 
adjustment.
+  virtual void Finish(int64_t final_position) = 0;
+
+  /// \brief Serialize the offset index thrift message.
+  ///
+  /// \param[out] sink output stream to write the serialized message.
+  virtual void WriteTo(::arrow::io::OutputStream* sink) const = 0;
+
+  /// \brief Create an OffsetIndex directly.
+  virtual std::unique_ptr<OffsetIndex> Build() const = 0;
+};
+
+/// \brief Interface for collecting page index of a parquet file.
+class PARQUET_EXPORT PageIndexBuilder {
+ public:
+  /// \brief API convenience to create a PageIndexBuilder.
+  static std::unique_ptr<PageIndexBuilder> Make(const SchemaDescriptor* 
schema);
+
+  virtual ~PageIndexBuilder() = default;
+
+  /// \brief Start a new row group.
+  virtual void AppendRowGroup() = 0;
+
+  /// \brief Get the ColumnIndexBuilder from column ordinal.
+  ///
+  /// \param i Column ordinal.
+  /// \return ColumnIndexBuilder for the column and its memory ownership 
belongs to
+  /// the PageIndexBuilder.
+  virtual ColumnIndexBuilder* GetColumnIndexBuilder(int32_t i) = 0;
+
+  /// \brief Get the OffsetIndexBuilder from column ordinal.
+  ///
+  /// \param i Column ordinal.
+  /// \return OffsetIndexBuilder for the column and its memory ownership 
belongs to
+  /// the PageIndexBuilder.
+  virtual OffsetIndexBuilder* GetOffsetIndexBuilder(int32_t i) = 0;
+
+  /// \brief Complete the page index builder and no more write is allowed.
+  virtual void Finish() = 0;
+
+  /// \brief Serialize the page index thrift message.
+  ///
+  /// Only valid column indexes and offset indexes are serialized and their 
locations
+  /// are set.
+  ///
+  /// \param[out] sink The output stream to write the page index.
+  /// \param[out] location The location of all page index to the start of sink.
+  virtual void WriteTo(::arrow::io::OutputStream* sink,
+                       PageIndexLocation* location) const = 0;
+};
+
 }  // namespace parquet
diff --git a/cpp/src/parquet/page_index_test.cc 
b/cpp/src/parquet/page_index_test.cc
index 46599960b8..5bfe38522a 100644
--- a/cpp/src/parquet/page_index_test.cc
+++ b/cpp/src/parquet/page_index_test.cc
@@ -18,9 +18,11 @@
 #include "parquet/page_index.h"
 
 #include <gtest/gtest.h>
+#include <memory>
 
 #include "arrow/io/file.h"
 #include "parquet/file_reader.h"
+#include "parquet/metadata.h"
 #include "parquet/schema.h"
 #include "parquet/test_util.h"
 #include "parquet/thrift_internal.h"
@@ -416,4 +418,416 @@ TEST(PageIndex, 
DeterminePageIndexRangesInRowGroupWithMissingPageIndex) {
                          -1);
 }
 
+TEST(PageIndex, WriteOffsetIndex) {
+  /// Create offset index via the OffsetIndexBuilder interface.
+  auto builder = OffsetIndexBuilder::Make();
+  const size_t num_pages = 5;
+  const std::vector<int64_t> offsets = {100, 200, 300, 400, 500};
+  const std::vector<int32_t> page_sizes = {1024, 2048, 3072, 4096, 8192};
+  const std::vector<int64_t> first_row_indices = {0, 10000, 20000, 30000, 
40000};
+  for (size_t i = 0; i < num_pages; ++i) {
+    builder->AddPage(offsets[i], page_sizes[i], first_row_indices[i]);
+  }
+  const int64_t final_position = 4096;
+  builder->Finish(final_position);
+
+  std::vector<std::unique_ptr<OffsetIndex>> offset_indexes;
+  /// 1st element is the offset index just built.
+  offset_indexes.emplace_back(builder->Build());
+  /// 2nd element is the offset index restored by serialize-then-deserialize 
round trip.
+  auto sink = CreateOutputStream();
+  builder->WriteTo(sink.get());
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+  offset_indexes.emplace_back(OffsetIndex::Make(buffer->data(),
+                                                
static_cast<uint32_t>(buffer->size()),
+                                                default_reader_properties()));
+
+  /// Verify the data of the offset index.
+  for (const auto& offset_index : offset_indexes) {
+    ASSERT_EQ(num_pages, offset_index->page_locations().size());
+    for (size_t i = 0; i < num_pages; ++i) {
+      const auto& page_location = offset_index->page_locations().at(i);
+      ASSERT_EQ(offsets[i] + final_position, page_location.offset);
+      ASSERT_EQ(page_sizes[i], page_location.compressed_page_size);
+      ASSERT_EQ(first_row_indices[i], page_location.first_row_index);
+    }
+  }
+}
+
+void TestWriteTypedColumnIndex(schema::NodePtr node,
+                               const std::vector<EncodedStatistics>& 
page_stats,
+                               BoundaryOrder::type boundary_order, bool 
has_null_counts) {
+  auto descr = std::make_unique<ColumnDescriptor>(node, 
/*max_definition_level=*/1, 0);
+
+  auto builder = ColumnIndexBuilder::Make(descr.get());
+  for (const auto& stats : page_stats) {
+    builder->AddPage(stats);
+  }
+  ASSERT_NO_THROW(builder->Finish());
+
+  std::vector<std::unique_ptr<ColumnIndex>> column_indexes;
+  /// 1st element is the column index just built.
+  column_indexes.emplace_back(builder->Build());
+  /// 2nd element is the column index restored by serialize-then-deserialize 
round trip.
+  auto sink = CreateOutputStream();
+  builder->WriteTo(sink.get());
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+  column_indexes.emplace_back(ColumnIndex::Make(*descr, buffer->data(),
+                                                
static_cast<uint32_t>(buffer->size()),
+                                                default_reader_properties()));
+
+  /// Verify the data of the column index.
+  for (const auto& column_index : column_indexes) {
+    ASSERT_EQ(boundary_order, column_index->boundary_order());
+    ASSERT_EQ(has_null_counts, column_index->has_null_counts());
+    const size_t num_pages = column_index->null_pages().size();
+    for (size_t i = 0; i < num_pages; ++i) {
+      ASSERT_EQ(page_stats[i].all_null_value, column_index->null_pages()[i]);
+      ASSERT_EQ(page_stats[i].min(), column_index->encoded_min_values()[i]);
+      ASSERT_EQ(page_stats[i].max(), column_index->encoded_max_values()[i]);
+      if (has_null_counts) {
+        ASSERT_EQ(page_stats[i].null_count, column_index->null_counts()[i]);
+      }
+    }
+  }
+}
+
+TEST(PageIndex, WriteInt32ColumnIndex) {
+  auto encode = [=](int32_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int32_t));
+  };
+
+  // Integer values in the ascending order.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_null_count(1).set_min(encode(1)).set_max(encode(2));
+  page_stats.at(1).set_null_count(2).set_min(encode(2)).set_max(encode(3));
+  page_stats.at(2).set_null_count(3).set_min(encode(3)).set_max(encode(4));
+
+  TestWriteTypedColumnIndex(schema::Int32("c1"), page_stats, 
BoundaryOrder::Ascending,
+                            /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteInt64ColumnIndex) {
+  auto encode = [=](int64_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+  };
+
+  // Integer values in the descending order.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_null_count(4).set_min(encode(-1)).set_max(encode(-2));
+  page_stats.at(1).set_null_count(0).set_min(encode(-2)).set_max(encode(-3));
+  page_stats.at(2).set_null_count(4).set_min(encode(-3)).set_max(encode(-4));
+
+  TestWriteTypedColumnIndex(schema::Int64("c1"), page_stats, 
BoundaryOrder::Descending,
+                            /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteFloatColumnIndex) {
+  auto encode = [=](float value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(float));
+  };
+
+  // Float values with no specific order.
+  std::vector<EncodedStatistics> page_stats(3);
+  
page_stats.at(0).set_null_count(0).set_min(encode(2.2F)).set_max(encode(4.4F));
+  
page_stats.at(1).set_null_count(0).set_min(encode(1.1F)).set_max(encode(5.5F));
+  
page_stats.at(2).set_null_count(0).set_min(encode(3.3F)).set_max(encode(6.6F));
+
+  TestWriteTypedColumnIndex(schema::Float("c1"), page_stats, 
BoundaryOrder::Unordered,
+                            /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteDoubleColumnIndex) {
+  auto encode = [=](double value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(double));
+  };
+
+  // Double values with no specific order and without null count.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_min(encode(1.2)).set_max(encode(4.4));
+  page_stats.at(1).set_min(encode(2.2)).set_max(encode(5.5));
+  page_stats.at(2).set_min(encode(3.3)).set_max(encode(-6.6));
+
+  TestWriteTypedColumnIndex(schema::Double("c1"), page_stats, 
BoundaryOrder::Unordered,
+                            /*has_null_counts=*/false);
+}
+
+TEST(PageIndex, WriteByteArrayColumnIndex) {
+  // Byte array values with identical min/max.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_min("bar").set_max("foo");
+  page_stats.at(1).set_min("bar").set_max("foo");
+  page_stats.at(2).set_min("bar").set_max("foo");
+
+  TestWriteTypedColumnIndex(schema::ByteArray("c1"), page_stats, 
BoundaryOrder::Ascending,
+                            /*has_null_counts=*/false);
+}
+
+TEST(PageIndex, WriteFLBAColumnIndex) {
+  // FLBA values in the ascending order with some null pages
+  std::vector<EncodedStatistics> page_stats(5);
+  page_stats.at(0).set_min("abc").set_max("ABC");
+  page_stats.at(1).all_null_value = true;
+  page_stats.at(2).set_min("foo").set_max("FOO");
+  page_stats.at(3).all_null_value = true;
+  page_stats.at(4).set_min("xyz").set_max("XYZ");
+
+  auto node =
+      schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL, 
Type::FIXED_LEN_BYTE_ARRAY,
+                                  ConvertedType::NONE, /*length=*/3);
+  TestWriteTypedColumnIndex(std::move(node), page_stats, 
BoundaryOrder::Ascending,
+                            /*has_null_counts=*/false);
+}
+
+TEST(PageIndex, WriteColumnIndexWithAllNullPages) {
+  // All values are null.
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_null_count(100).all_null_value = true;
+  page_stats.at(1).set_null_count(100).all_null_value = true;
+  page_stats.at(2).set_null_count(100).all_null_value = true;
+
+  TestWriteTypedColumnIndex(schema::Int32("c1"), page_stats, 
BoundaryOrder::Unordered,
+                            /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteColumnIndexWithInvalidNullCounts) {
+  auto encode = [=](int32_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int32_t));
+  };
+
+  // Some pages do not provide null_count
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_min(encode(1)).set_max(encode(2)).set_null_count(0);
+  page_stats.at(1).set_min(encode(1)).set_max(encode(3));
+  page_stats.at(2).set_min(encode(2)).set_max(encode(3)).set_null_count(0);
+
+  TestWriteTypedColumnIndex(schema::Int32("c1"), page_stats, 
BoundaryOrder::Ascending,
+                            /*has_null_counts=*/false);
+}
+
+TEST(PageIndex, WriteColumnIndexWithCorruptedStats) {
+  auto encode = [=](int32_t value) {
+    return std::string(reinterpret_cast<const char*>(&value), sizeof(int32_t));
+  };
+
+  // 2nd page does not set anything
+  std::vector<EncodedStatistics> page_stats(3);
+  page_stats.at(0).set_min(encode(1)).set_max(encode(2));
+  page_stats.at(2).set_min(encode(3)).set_max(encode(4));
+
+  ColumnDescriptor descr(schema::Int32("c1"), /*max_definition_level=*/1, 0);
+  auto builder = ColumnIndexBuilder::Make(&descr);
+  for (const auto& stats : page_stats) {
+    builder->AddPage(stats);
+  }
+  ASSERT_NO_THROW(builder->Finish());
+  ASSERT_EQ(nullptr, builder->Build());
+
+  auto sink = CreateOutputStream();
+  builder->WriteTo(sink.get());
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+  EXPECT_EQ(0, buffer->size());
+}
+
+TEST(PageIndex, TestPageIndexBuilderWithZeroRowGroup) {
+  schema::NodeVector fields = {schema::Int32("c1"), schema::ByteArray("c2")};
+  schema::NodePtr root = schema::GroupNode::Make("schema", 
Repetition::REPEATED, fields);
+  SchemaDescriptor schema;
+  schema.Init(root);
+
+  auto builder = PageIndexBuilder::Make(&schema);
+
+  // AppendRowGroup() is not called and expect throw.
+  ASSERT_THROW(builder->GetColumnIndexBuilder(0), ParquetException);
+  ASSERT_THROW(builder->GetOffsetIndexBuilder(0), ParquetException);
+
+  // Finish the builder without calling AppendRowGroup().
+  ASSERT_NO_THROW(builder->Finish());
+
+  // Verify WriteTo does not write anything.
+  auto sink = CreateOutputStream();
+  PageIndexLocation location;
+  builder->WriteTo(sink.get(), &location);
+  PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+  ASSERT_EQ(0, buffer->size());
+  ASSERT_TRUE(location.column_index_location.empty());
+  ASSERT_TRUE(location.offset_index_location.empty());
+}
+
+class PageIndexBuilderTest : public ::testing::Test {
+ public:
+  void WritePageIndexes(int num_row_groups, int num_columns,
+                        const std::vector<std::vector<EncodedStatistics>>& 
page_stats,
+                        const std::vector<std::vector<PageLocation>>& 
page_locations,
+                        int final_position) {
+    auto builder = PageIndexBuilder::Make(&schema_);
+    for (int row_group = 0; row_group < num_row_groups; ++row_group) {
+      ASSERT_NO_THROW(builder->AppendRowGroup());
+
+      for (int column = 0; column < num_columns; ++column) {
+        if (static_cast<size_t>(column) < page_stats[row_group].size()) {
+          auto column_index_builder = builder->GetColumnIndexBuilder(column);
+          
ASSERT_NO_THROW(column_index_builder->AddPage(page_stats[row_group][column]));
+          ASSERT_NO_THROW(column_index_builder->Finish());
+        }
+
+        if (static_cast<size_t>(column) < page_locations[row_group].size()) {
+          auto offset_index_builder = builder->GetOffsetIndexBuilder(column);
+          ASSERT_NO_THROW(
+              
offset_index_builder->AddPage(page_locations[row_group][column]));
+          ASSERT_NO_THROW(offset_index_builder->Finish(final_position));
+        }
+      }
+    }
+    ASSERT_NO_THROW(builder->Finish());
+
+    auto sink = CreateOutputStream();
+    builder->WriteTo(sink.get(), &page_index_location_);
+    PARQUET_ASSIGN_OR_THROW(buffer_, sink->Finish());
+
+    ASSERT_EQ(static_cast<size_t>(num_row_groups),
+              page_index_location_.column_index_location.size());
+    ASSERT_EQ(static_cast<size_t>(num_row_groups),
+              page_index_location_.offset_index_location.size());
+    for (int row_group = 0; row_group < num_row_groups; ++row_group) {
+      ASSERT_EQ(static_cast<size_t>(num_columns),
+                page_index_location_.column_index_location[row_group].size());
+      ASSERT_EQ(static_cast<size_t>(num_columns),
+                page_index_location_.offset_index_location[row_group].size());
+    }
+  }
+
+  void CheckColumnIndex(int row_group, int column, const EncodedStatistics& 
stats) {
+    auto column_index = ReadColumnIndex(row_group, column);
+    ASSERT_NE(nullptr, column_index);
+    ASSERT_EQ(size_t{1}, column_index->null_pages().size());
+    ASSERT_EQ(stats.all_null_value, column_index->null_pages()[0]);
+    ASSERT_EQ(stats.min(), column_index->encoded_min_values()[0]);
+    ASSERT_EQ(stats.max(), column_index->encoded_max_values()[0]);
+    ASSERT_EQ(stats.has_null_count, column_index->has_null_counts());
+    if (stats.has_null_count) {
+      ASSERT_EQ(stats.null_count, column_index->null_counts()[0]);
+    }
+  }
+
+  void CheckOffsetIndex(int row_group, int column, const PageLocation& 
expected_location,
+                        int64_t final_location) {
+    auto offset_index = ReadOffsetIndex(row_group, column);
+    ASSERT_NE(nullptr, offset_index);
+    ASSERT_EQ(size_t{1}, offset_index->page_locations().size());
+    const auto& location = offset_index->page_locations()[0];
+    ASSERT_EQ(expected_location.offset + final_location, location.offset);
+    ASSERT_EQ(expected_location.compressed_page_size, 
location.compressed_page_size);
+    ASSERT_EQ(expected_location.first_row_index, location.first_row_index);
+  }
+
+ protected:
+  std::unique_ptr<ColumnIndex> ReadColumnIndex(int row_group, int column) {
+    auto location = 
page_index_location_.column_index_location[row_group][column];
+    if (!location.has_value()) {
+      return nullptr;
+    }
+    auto properties = default_reader_properties();
+    return ColumnIndex::Make(*schema_.Column(column), buffer_->data() + 
location->offset,
+                             static_cast<uint32_t>(location->length), 
properties);
+  }
+
+  std::unique_ptr<OffsetIndex> ReadOffsetIndex(int row_group, int column) {
+    auto location = 
page_index_location_.offset_index_location[row_group][column];
+    if (!location.has_value()) {
+      return nullptr;
+    }
+    auto properties = default_reader_properties();
+    return OffsetIndex::Make(buffer_->data() + location->offset,
+                             static_cast<uint32_t>(location->length), 
properties);
+  }
+
+  SchemaDescriptor schema_;
+  std::shared_ptr<Buffer> buffer_;
+  PageIndexLocation page_index_location_;
+};
+
+TEST_F(PageIndexBuilderTest, SingleRowGroup) {
+  schema::NodePtr root = schema::GroupNode::Make(
+      "schema", Repetition::REPEATED,
+      {schema::ByteArray("c1"), schema::ByteArray("c2"), 
schema::ByteArray("c3")});
+  schema_.Init(root);
+
+  // Prepare page stats and page locations for single row group.
+  // Note that the 3rd column does not have any stats and its page index is 
disabled.
+  const int num_row_groups = 1;
+  const int num_columns = 3;
+  const std::vector<std::vector<EncodedStatistics>> page_stats = {
+      /*row_group_id=0*/
+      {/*column_id=0*/ 
EncodedStatistics().set_null_count(0).set_min("a").set_max("b"),
+       /*column_id=1*/ 
EncodedStatistics().set_null_count(0).set_min("A").set_max("B")}};
+  const std::vector<std::vector<PageLocation>> page_locations = {
+      /*row_group_id=0*/
+      {/*column_id=0*/ {/*offset=*/128, /*compressed_page_size=*/512,
+                        /*first_row_index=*/0},
+       /*column_id=1*/ {/*offset=*/1024, /*compressed_page_size=*/512,
+                        /*first_row_index=*/0}}};
+  const int64_t final_position = 200;
+
+  WritePageIndexes(num_row_groups, num_columns, page_stats, page_locations,
+                   final_position);
+
+  // Verify that first two columns have good page indexes.
+  for (int column = 0; column < 2; ++column) {
+    CheckColumnIndex(/*row_group=*/0, column, page_stats[0][column]);
+    CheckOffsetIndex(/*row_group=*/0, column, page_locations[0][column], 
final_position);
+  }
+
+  // Verify the 3rd column does not have page indexes.
+  ASSERT_EQ(nullptr, ReadColumnIndex(/*row_group=*/0, /*column=*/2));
+  ASSERT_EQ(nullptr, ReadOffsetIndex(/*row_group=*/0, /*column=*/2));
+}
+
+TEST_F(PageIndexBuilderTest, TwoRowGroups) {
+  schema::NodePtr root = schema::GroupNode::Make(
+      "schema", Repetition::REPEATED, {schema::ByteArray("c1"), 
schema::ByteArray("c2")});
+  schema_.Init(root);
+
+  // Prepare page stats and page locations for two row groups.
+  // Note that the 2nd column in the 2nd row group has corrupted stats.
+  const int num_row_groups = 2;
+  const int num_columns = 2;
+  const std::vector<std::vector<EncodedStatistics>> page_stats = {
+      /*row_group_id=0*/
+      {/*column_id=0*/ EncodedStatistics().set_min("a").set_max("b"),
+       /*column_id=1*/ 
EncodedStatistics().set_null_count(0).set_min("A").set_max("B")},
+      /*row_group_id=1*/
+      {/*column_id=0*/ EncodedStatistics() /* corrupted stats */,
+       /*column_id=1*/ 
EncodedStatistics().set_null_count(0).set_min("bar").set_max(
+           "foo")}};
+  const std::vector<std::vector<PageLocation>> page_locations = {
+      /*row_group_id=0*/
+      {/*column_id=0*/ {/*offset=*/128, /*compressed_page_size=*/512,
+                        /*first_row_index=*/0},
+       /*column_id=1*/ {/*offset=*/1024, /*compressed_page_size=*/512,
+                        /*first_row_index=*/0}},
+      /*row_group_id=0*/
+      {/*column_id=0*/ {/*offset=*/128, /*compressed_page_size=*/512,
+                        /*first_row_index=*/0},
+       /*column_id=1*/ {/*offset=*/1024, /*compressed_page_size=*/512,
+                        /*first_row_index=*/0}}};
+  const int64_t final_position = 200;
+
+  WritePageIndexes(num_row_groups, num_columns, page_stats, page_locations,
+                   final_position);
+
+  // Verify that all columns have good column indexes except the 2nd column in 
the 2nd row
+  // group.
+  CheckColumnIndex(/*row_group=*/0, /*column=*/0, page_stats[0][0]);
+  CheckColumnIndex(/*row_group=*/0, /*column=*/1, page_stats[0][1]);
+  CheckColumnIndex(/*row_group=*/1, /*column=*/1, page_stats[1][1]);
+  ASSERT_EQ(nullptr, ReadColumnIndex(/*row_group=*/1, /*column=*/0));
+
+  // Verify that two columns have good offset indexes.
+  CheckOffsetIndex(/*row_group=*/0, /*column=*/0, page_locations[0][0], 
final_position);
+  CheckOffsetIndex(/*row_group=*/0, /*column=*/1, page_locations[0][1], 
final_position);
+  CheckOffsetIndex(/*row_group=*/1, /*column=*/0, page_locations[1][0], 
final_position);
+  CheckOffsetIndex(/*row_group=*/1, /*column=*/1, page_locations[1][1], 
final_position);
+}
+
 }  // namespace parquet
diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h
index 572194f4ee..d892788960 100644
--- a/cpp/src/parquet/properties.h
+++ b/cpp/src/parquet/properties.h
@@ -208,7 +208,8 @@ class PARQUET_EXPORT WriterProperties {
           data_page_version_(ParquetDataPageVersion::V1),
           created_by_(DEFAULT_CREATED_BY),
           store_decimal_as_integer_(false),
-          page_checksum_enabled_(false) {}
+          page_checksum_enabled_(false),
+          write_page_index_(false) {}
     virtual ~Builder() {}
 
     /// Specify the memory pool for the writer. Default default_memory_pool.
@@ -501,6 +502,28 @@ class PARQUET_EXPORT WriterProperties {
       return this;
     }
 
+    /// Enable writing page index.
+    ///
+    /// Page index contains statistics for data pages and can be used to skip 
pages
+    /// when scanning data in ordered and unordered columns.
+    ///
+    /// Please check the link below for more details:
+    /// https://github.com/apache/parquet-format/blob/master/PageIndex.md
+    ///
+    /// Default disabled.
+    Builder* enable_write_page_index() {
+      write_page_index_ = true;
+      return this;
+    }
+
+    /// Disable writing page index.
+    ///
+    /// Default disabled.
+    Builder* disable_write_page_index() {
+      write_page_index_ = false;
+      return this;
+    }
+
     /// \brief Build the WriterProperties with the builder parameters.
     /// \return The WriterProperties defined by the builder.
     std::shared_ptr<WriterProperties> build() {
@@ -526,7 +549,8 @@ class PARQUET_EXPORT WriterProperties {
           pool_, dictionary_pagesize_limit_, write_batch_size_, 
max_row_group_length_,
           pagesize_, version_, created_by_, page_checksum_enabled_,
           std::move(file_encryption_properties_), default_column_properties_,
-          column_properties, data_page_version_, store_decimal_as_integer_));
+          column_properties, data_page_version_, store_decimal_as_integer_,
+          write_page_index_));
     }
 
    private:
@@ -540,6 +564,7 @@ class PARQUET_EXPORT WriterProperties {
     std::string created_by_;
     bool store_decimal_as_integer_;
     bool page_checksum_enabled_;
+    bool write_page_index_;
 
     std::shared_ptr<FileEncryptionProperties> file_encryption_properties_;
 
@@ -574,6 +599,8 @@ class PARQUET_EXPORT WriterProperties {
 
   inline bool page_checksum_enabled() const { return page_checksum_enabled_; }
 
+  inline bool write_page_index() const { return write_page_index_; }
+
   inline Encoding::type dictionary_index_encoding() const {
     if (parquet_version_ == ParquetVersion::PARQUET_1_0) {
       return Encoding::PLAIN_DICTIONARY;
@@ -642,7 +669,8 @@ class PARQUET_EXPORT WriterProperties {
       std::shared_ptr<FileEncryptionProperties> file_encryption_properties,
       const ColumnProperties& default_column_properties,
       const std::unordered_map<std::string, ColumnProperties>& 
column_properties,
-      ParquetDataPageVersion data_page_version, bool 
store_short_decimal_as_integer)
+      ParquetDataPageVersion data_page_version, bool 
store_short_decimal_as_integer,
+      bool write_page_index)
       : pool_(pool),
         dictionary_pagesize_limit_(dictionary_pagesize_limit),
         write_batch_size_(write_batch_size),
@@ -653,6 +681,7 @@ class PARQUET_EXPORT WriterProperties {
         parquet_created_by_(created_by),
         store_decimal_as_integer_(store_short_decimal_as_integer),
         page_checksum_enabled_(page_write_checksum_enabled),
+        write_page_index_(write_page_index),
         file_encryption_properties_(file_encryption_properties),
         default_column_properties_(default_column_properties),
         column_properties_(column_properties) {}
@@ -667,6 +696,7 @@ class PARQUET_EXPORT WriterProperties {
   std::string parquet_created_by_;
   bool store_decimal_as_integer_;
   bool page_checksum_enabled_;
+  bool write_page_index_;
 
   std::shared_ptr<FileEncryptionProperties> file_encryption_properties_;
 
diff --git a/cpp/src/parquet/statistics.cc b/cpp/src/parquet/statistics.cc
index d0130605cd..aa6df7e32a 100644
--- a/cpp/src/parquet/statistics.cc
+++ b/cpp/src/parquet/statistics.cc
@@ -610,6 +610,8 @@ class TypedStatisticsImpl : public TypedStatistics<DType> {
     if (HasNullCount()) {
       s.set_null_count(this->null_count());
     }
+    // num_values_ is reliable and it means number of non-null values.
+    s.all_null_value = num_values_ == 0;
     return s;
   }
 
@@ -625,7 +627,7 @@ class TypedStatisticsImpl : public TypedStatistics<DType> {
   T min_;
   T max_;
   ::arrow::MemoryPool* pool_;
-  int64_t num_values_ = 0;
+  int64_t num_values_ = 0;  // # of non-null values.
   EncodedStatistics statistics_;
   std::shared_ptr<TypedComparator<DType>> comparator_;
   std::shared_ptr<ResizableBuffer> min_buffer_, max_buffer_;
diff --git a/cpp/src/parquet/statistics.h b/cpp/src/parquet/statistics.h
index 71d9b662ba..3f168a938f 100644
--- a/cpp/src/parquet/statistics.h
+++ b/cpp/src/parquet/statistics.h
@@ -137,6 +137,12 @@ class PARQUET_EXPORT EncodedStatistics {
   bool has_null_count = false;
   bool has_distinct_count = false;
 
+  // When all values in the statistics are null, it is set to true.
+  // Otherwise, at least one value is not null, or we are not sure at all.
+  // Page index requires this information to decide whether a data page
+  // is a null page or not.
+  bool all_null_value = false;
+
   // From parquet-mr
   // Don't write stats larger than the max size rather than truncating. The
   // rationale is that some engines may use the minimum value in the page as
diff --git a/cpp/src/parquet/thrift_internal.h 
b/cpp/src/parquet/thrift_internal.h
index 9cc702dfcd..56e2a67c8a 100644
--- a/cpp/src/parquet/thrift_internal.h
+++ b/cpp/src/parquet/thrift_internal.h
@@ -295,6 +295,18 @@ static inline format::CompressionCodec::type 
ToThrift(Compression::type type) {
   }
 }
 
+static inline format::BoundaryOrder::type ToThrift(BoundaryOrder::type type) {
+  switch (type) {
+    case BoundaryOrder::Unordered:
+    case BoundaryOrder::Ascending:
+    case BoundaryOrder::Descending:
+      return static_cast<format::BoundaryOrder::type>(type);
+    default:
+      DCHECK(false) << "Cannot reach here";
+      return format::BoundaryOrder::UNORDERED;
+  }
+}
+
 static inline format::Statistics ToThrift(const EncodedStatistics& stats) {
   format::Statistics statistics;
   if (stats.has_min) {


Reply via email to