This is an automated email from the ASF dual-hosted git repository.
wjones127 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/main by this push:
new 0cf4ffaa90 GH-34053: [C++][Parquet] Write parquet page index (#34054)
0cf4ffaa90 is described below
commit 0cf4ffaa90de6f036c363ac95d83474d6a1dfcc2
Author: Gang Wu <[email protected]>
AuthorDate: Tue Apr 11 07:48:31 2023 +0800
GH-34053: [C++][Parquet] Write parquet page index (#34054)
### Rationale for this change
Parquet C++ reader supports reading page index from file, but the writer
does not yet support writing it.
### What changes are included in this PR?
Parquet file writer collects page index from all data pages and serializes
page index into the file.
### Are these changes tested?
Not yet, will be added later.
### Are there any user-facing changes?
`WriterProperties::enable_write_page_index()` and
`WriterProperties::disable_write_page_index()` have been added to toggle it on
and off.
* Closes: #34053
Authored-by: Gang Wu <[email protected]>
Signed-off-by: Will Jones <[email protected]>
---
cpp/src/parquet/arrow/arrow_reader_writer_test.cc | 285 ++++++++++++++-
cpp/src/parquet/column_page.h | 23 +-
cpp/src/parquet/column_writer.cc | 92 ++++-
cpp/src/parquet/column_writer.h | 10 +-
cpp/src/parquet/file_writer.cc | 61 +++-
cpp/src/parquet/metadata.cc | 38 ++
cpp/src/parquet/metadata.h | 18 +
cpp/src/parquet/page_index.cc | 383 ++++++++++++++++++++
cpp/src/parquet/page_index.h | 114 ++++++
cpp/src/parquet/page_index_test.cc | 414 ++++++++++++++++++++++
cpp/src/parquet/properties.h | 36 +-
cpp/src/parquet/statistics.cc | 4 +-
cpp/src/parquet/statistics.h | 6 +
cpp/src/parquet/thrift_internal.h | 12 +
14 files changed, 1458 insertions(+), 38 deletions(-)
diff --git a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
index eba964f967..b5456e89c6 100644
--- a/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
+++ b/cpp/src/parquet/arrow/arrow_reader_writer_test.cc
@@ -21,6 +21,7 @@
#pragma warning(disable : 4800)
#endif
+#include "gmock/gmock.h"
#include "gtest/gtest.h"
#include <cstdint>
@@ -66,6 +67,7 @@
#include "parquet/arrow/writer.h"
#include "parquet/column_writer.h"
#include "parquet/file_writer.h"
+#include "parquet/page_index.h"
#include "parquet/test_util.h"
using arrow::Array;
@@ -5081,10 +5083,21 @@ TEST(TestArrowReadWrite, WriteAndReadRecordBatch) {
// Verify the single record batch has been sliced into two row groups by
// WriterProperties::max_row_group_length().
- int num_row_groups =
arrow_reader->parquet_reader()->metadata()->num_row_groups();
+ auto file_metadata = arrow_reader->parquet_reader()->metadata();
+ int num_row_groups = file_metadata->num_row_groups();
ASSERT_EQ(2, num_row_groups);
- ASSERT_EQ(10,
arrow_reader->parquet_reader()->metadata()->RowGroup(0)->num_rows());
- ASSERT_EQ(2,
arrow_reader->parquet_reader()->metadata()->RowGroup(1)->num_rows());
+ ASSERT_EQ(10, file_metadata->RowGroup(0)->num_rows());
+ ASSERT_EQ(2, file_metadata->RowGroup(1)->num_rows());
+
+ // Verify that page index is not written by default.
+ for (int i = 0; i < num_row_groups; ++i) {
+ auto row_group_metadata = file_metadata->RowGroup(i);
+ for (int j = 0; j < row_group_metadata->num_columns(); ++j) {
+ auto column_metadata = row_group_metadata->ColumnChunk(j);
+ EXPECT_FALSE(column_metadata->GetColumnIndexLocation().has_value());
+ EXPECT_FALSE(column_metadata->GetOffsetIndexLocation().has_value());
+ }
+ }
// Verify batch data read via RecordBatch
std::unique_ptr<::arrow::RecordBatchReader> batch_reader;
@@ -5146,5 +5159,271 @@ TEST(TestArrowReadWrite, FuzzReader) {
}
}
+namespace {
+
+struct ColumnIndexObject {
+ std::vector<bool> null_pages;
+ std::vector<std::string> min_values;
+ std::vector<std::string> max_values;
+ BoundaryOrder::type boundary_order = BoundaryOrder::Unordered;
+ std::vector<int64_t> null_counts;
+
+ ColumnIndexObject() = default;
+
+ ColumnIndexObject(const std::vector<bool>& null_pages,
+ const std::vector<std::string>& min_values,
+ const std::vector<std::string>& max_values,
+ BoundaryOrder::type boundary_order,
+ const std::vector<int64_t>& null_counts)
+ : null_pages(null_pages),
+ min_values(min_values),
+ max_values(max_values),
+ boundary_order(boundary_order),
+ null_counts(null_counts) {}
+
+ explicit ColumnIndexObject(const ColumnIndex* column_index) {
+ if (column_index == nullptr) {
+ return;
+ }
+ null_pages = column_index->null_pages();
+ min_values = column_index->encoded_min_values();
+ max_values = column_index->encoded_max_values();
+ boundary_order = column_index->boundary_order();
+ if (column_index->has_null_counts()) {
+ null_counts = column_index->null_counts();
+ }
+ }
+
+ bool operator==(const ColumnIndexObject& b) const {
+ return null_pages == b.null_pages && min_values == b.min_values &&
+ max_values == b.max_values && boundary_order == b.boundary_order &&
+ null_counts == b.null_counts;
+ }
+};
+
+auto encode_int64 = [](int64_t value) {
+ return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+};
+
+auto encode_double = [](double value) {
+ return std::string(reinterpret_cast<const char*>(&value), sizeof(double));
+};
+
+} // namespace
+
+class ParquetPageIndexRoundTripTest : public ::testing::Test {
+ public:
+ void WriteFile(const std::shared_ptr<WriterProperties>& writer_properties,
+ const std::shared_ptr<::arrow::Table>& table) {
+ // Get schema from table.
+ auto schema = table->schema();
+ std::shared_ptr<SchemaDescriptor> parquet_schema;
+ auto arrow_writer_properties = default_arrow_writer_properties();
+ ASSERT_OK_NO_THROW(ToParquetSchema(schema.get(), *writer_properties,
+ *arrow_writer_properties,
&parquet_schema));
+ auto schema_node =
std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
+
+ // Write table to buffer.
+ auto sink = CreateOutputStream();
+ auto pool = ::arrow::default_memory_pool();
+ auto writer = ParquetFileWriter::Open(sink, schema_node,
writer_properties);
+ std::unique_ptr<FileWriter> arrow_writer;
+ ASSERT_OK(FileWriter::Make(pool, std::move(writer), schema,
arrow_writer_properties,
+ &arrow_writer));
+ ASSERT_OK_NO_THROW(arrow_writer->WriteTable(*table));
+ ASSERT_OK_NO_THROW(arrow_writer->Close());
+ ASSERT_OK_AND_ASSIGN(buffer_, sink->Finish());
+ }
+
+ void ReadPageIndexes(int expect_num_row_groups, int expect_num_pages) {
+ auto read_properties = default_arrow_reader_properties();
+ auto reader =
ParquetFileReader::Open(std::make_shared<BufferReader>(buffer_));
+
+ auto metadata = reader->metadata();
+ ASSERT_EQ(expect_num_row_groups, metadata->num_row_groups());
+
+ auto page_index_reader = reader->GetPageIndexReader();
+ ASSERT_NE(page_index_reader, nullptr);
+
+ int64_t offset_lower_bound = 0;
+ for (int rg = 0; rg < metadata->num_row_groups(); ++rg) {
+ auto row_group_index_reader = page_index_reader->RowGroup(rg);
+ ASSERT_NE(row_group_index_reader, nullptr);
+
+ for (int col = 0; col < metadata->num_columns(); ++col) {
+ auto column_index = row_group_index_reader->GetColumnIndex(col);
+ column_indexes_.emplace_back(column_index.get());
+
+ auto offset_index = row_group_index_reader->GetOffsetIndex(col);
+ CheckOffsetIndex(offset_index.get(), expect_num_pages,
&offset_lower_bound);
+ }
+ }
+ }
+
+ private:
+ void CheckOffsetIndex(const OffsetIndex* offset_index, int expect_num_pages,
+ int64_t* offset_lower_bound_in_out) {
+ ASSERT_NE(offset_index, nullptr);
+ const auto& locations = offset_index->page_locations();
+ ASSERT_EQ(static_cast<size_t>(expect_num_pages), locations.size());
+ int64_t prev_first_row_index = -1;
+ for (const auto& location : locations) {
+ // Make sure first_row_index is in the ascending order within a row
group.
+ ASSERT_GT(location.first_row_index, prev_first_row_index);
+ // Make sure page offset is in the ascending order across the file.
+ ASSERT_GE(location.offset, *offset_lower_bound_in_out);
+ // Make sure page size is positive.
+ ASSERT_GT(location.compressed_page_size, 0);
+ prev_first_row_index = location.first_row_index;
+ *offset_lower_bound_in_out = location.offset +
location.compressed_page_size;
+ }
+ }
+
+ protected:
+ std::shared_ptr<Buffer> buffer_;
+ std::vector<ColumnIndexObject> column_indexes_;
+};
+
+TEST_F(ParquetPageIndexRoundTripTest, SimpleRoundTrip) {
+ auto writer_properties = WriterProperties::Builder()
+ .enable_write_page_index()
+ ->max_row_group_length(4)
+ ->build();
+ auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::int64()),
+ ::arrow::field("c1", ::arrow::utf8()),
+ ::arrow::field("c2",
::arrow::list(::arrow::int64()))});
+ WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([
+ [1, "a", [1] ],
+ [2, "b", [1, 2] ],
+ [3, "c", [null] ],
+ [null, "d", [] ],
+ [5, null, [3, 3, 3]],
+ [6, "f", null ]
+ ])"}));
+
+ ReadPageIndexes(/*expect_num_row_groups=*/2, /*expect_num_pages=*/1);
+
+ EXPECT_THAT(
+ column_indexes_,
+ ::testing::ElementsAre(
+ ColumnIndexObject{/*null_pages=*/{false},
/*min_values=*/{encode_int64(1)},
+ /*max_values=*/{encode_int64(3)},
BoundaryOrder::Ascending,
+ /*null_counts=*/{1}},
+ ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"a"},
+ /*max_values=*/{"d"}, BoundaryOrder::Ascending,
+ /*null_counts=*/{0}},
+ ColumnIndexObject{/*null_pages=*/{false},
/*min_values=*/{encode_int64(1)},
+ /*max_values=*/{encode_int64(2)},
BoundaryOrder::Ascending,
+ /*null_counts=*/{2}},
+ ColumnIndexObject{/*null_pages=*/{false},
/*min_values=*/{encode_int64(5)},
+ /*max_values=*/{encode_int64(6)},
BoundaryOrder::Ascending,
+ /*null_counts=*/{0}},
+ ColumnIndexObject{/*null_pages=*/{false}, /*min_values=*/{"f"},
+ /*max_values=*/{"f"}, BoundaryOrder::Ascending,
+ /*null_counts=*/{1}},
+ ColumnIndexObject{/*null_pages=*/{false},
/*min_values=*/{encode_int64(3)},
+ /*max_values=*/{encode_int64(3)},
BoundaryOrder::Ascending,
+ /*null_counts=*/{1}}));
+}
+
+TEST_F(ParquetPageIndexRoundTripTest, DropLargeStats) {
+ auto writer_properties = WriterProperties::Builder()
+ .enable_write_page_index()
+ ->max_row_group_length(1) /* write single-row
row group */
+ ->max_statistics_size(20) /* drop stats larger
than it */
+ ->build();
+ auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::utf8())});
+ WriteFile(writer_properties, ::arrow::TableFromJSON(schema, {R"([
+ ["short_string"],
+ ["very_large_string_to_drop_stats"]
+ ])"}));
+
+ ReadPageIndexes(/*expect_num_row_groups=*/2, /*expect_num_pages=*/1);
+
+ EXPECT_THAT(
+ column_indexes_,
+ ::testing::ElementsAre(
+ ColumnIndexObject{/*null_pages=*/{false},
/*min_values=*/{"short_string"},
+ /*max_values=*/{"short_string"},
BoundaryOrder::Ascending,
+ /*null_counts=*/{0}},
+ ColumnIndexObject{}));
+}
+
+TEST_F(ParquetPageIndexRoundTripTest, MultiplePages) {
+ auto writer_properties = WriterProperties::Builder()
+ .enable_write_page_index()
+ ->data_pagesize(1) /* write multiple pages */
+ ->build();
+ auto schema = ::arrow::schema(
+ {::arrow::field("c0", ::arrow::int64()), ::arrow::field("c1",
::arrow::utf8())});
+ WriteFile(
+ writer_properties,
+ ::arrow::TableFromJSON(
+ schema, {R"([[1, "a"], [2, "b"]])", R"([[3, "c"], [4, "d"]])",
+ R"([[null, null], [6, "f"]])", R"([[null, null], [null,
null]])"}));
+
+ ReadPageIndexes(/*expect_num_row_groups=*/1, /*expect_num_pages=*/4);
+
+ EXPECT_THAT(
+ column_indexes_,
+ ::testing::ElementsAre(
+ ColumnIndexObject{
+ /*null_pages=*/{false, false, false, true},
+ /*min_values=*/{encode_int64(1), encode_int64(3),
encode_int64(6), ""},
+ /*max_values=*/{encode_int64(2), encode_int64(4),
encode_int64(6), ""},
+ BoundaryOrder::Ascending,
+ /*null_counts=*/{0, 0, 1, 2}},
+ ColumnIndexObject{/*null_pages=*/{false, false, false, true},
+ /*min_values=*/{"a", "c", "f", ""},
+ /*max_values=*/{"b", "d", "f", ""},
BoundaryOrder::Ascending,
+ /*null_counts=*/{0, 0, 1, 2}}));
+}
+
+TEST_F(ParquetPageIndexRoundTripTest, DoubleWithNaNs) {
+ auto writer_properties = WriterProperties::Builder()
+ .enable_write_page_index()
+ ->max_row_group_length(3) /* 3 rows per row
group */
+ ->build();
+
+ // Create table to write with NaNs.
+ auto vectors = std::vector<std::shared_ptr<Array>>(4);
+ // NaN will be ignored in min/max stats.
+ ::arrow::ArrayFromVector<::arrow::DoubleType>({1.0, NAN, 0.1}, &vectors[0]);
+ // Lower bound will use -0.0.
+ ::arrow::ArrayFromVector<::arrow::DoubleType>({+0.0, NAN, +0.0},
&vectors[1]);
+ // Upper bound will use -0.0.
+ ::arrow::ArrayFromVector<::arrow::DoubleType>({-0.0, NAN, -0.0},
&vectors[2]);
+ // Pages with all NaNs will not build column index.
+ ::arrow::ArrayFromVector<::arrow::DoubleType>({NAN, NAN, NAN}, &vectors[3]);
+ ASSERT_OK_AND_ASSIGN(auto chunked_array,
+ arrow::ChunkedArray::Make(vectors, ::arrow::float64()));
+
+ auto schema = ::arrow::schema({::arrow::field("c0", ::arrow::float64())});
+ auto table = Table::Make(schema, {chunked_array});
+ WriteFile(writer_properties, table);
+
+ ReadPageIndexes(/*expect_num_row_groups=*/4, /*expect_num_pages=*/1);
+
+ EXPECT_THAT(
+ column_indexes_,
+ ::testing::ElementsAre(
+ ColumnIndexObject{/*null_pages=*/{false},
+ /*min_values=*/{encode_double(0.1)},
+ /*max_values=*/{encode_double(1.0)},
BoundaryOrder::Ascending,
+ /*null_counts=*/{0}},
+ ColumnIndexObject{/*null_pages=*/{false},
+ /*min_values=*/{encode_double(-0.0)},
+ /*max_values=*/{encode_double(+0.0)},
+ BoundaryOrder::Ascending,
+ /*null_counts=*/{0}},
+ ColumnIndexObject{/*null_pages=*/{false},
+ /*min_values=*/{encode_double(-0.0)},
+ /*max_values=*/{encode_double(+0.0)},
+ BoundaryOrder::Ascending,
+ /*null_counts=*/{0}},
+ ColumnIndexObject{
+ /* Page with only NaN values does not have column index built
*/}));
+}
+
} // namespace arrow
} // namespace parquet
diff --git a/cpp/src/parquet/column_page.h b/cpp/src/parquet/column_page.h
index 2fab77ed01..905f805b8c 100644
--- a/cpp/src/parquet/column_page.h
+++ b/cpp/src/parquet/column_page.h
@@ -23,6 +23,7 @@
#include <cstdint>
#include <memory>
+#include <optional>
#include <string>
#include "parquet/statistics.h"
@@ -64,23 +65,31 @@ class DataPage : public Page {
Encoding::type encoding() const { return encoding_; }
int64_t uncompressed_size() const { return uncompressed_size_; }
const EncodedStatistics& statistics() const { return statistics_; }
+ /// Return the row ordinal within the row group to the first row in the data
page.
+ /// Currently it is only present from data pages created by ColumnWriter in
order
+ /// to collect page index.
+ std::optional<int64_t> first_row_index() const { return first_row_index_; }
virtual ~DataPage() = default;
protected:
DataPage(PageType::type type, const std::shared_ptr<Buffer>& buffer, int32_t
num_values,
Encoding::type encoding, int64_t uncompressed_size,
- const EncodedStatistics& statistics = EncodedStatistics())
+ const EncodedStatistics& statistics = EncodedStatistics(),
+ std::optional<int64_t> first_row_index = std::nullopt)
: Page(buffer, type),
num_values_(num_values),
encoding_(encoding),
uncompressed_size_(uncompressed_size),
- statistics_(statistics) {}
+ statistics_(statistics),
+ first_row_index_(std::move(first_row_index)) {}
int32_t num_values_;
Encoding::type encoding_;
int64_t uncompressed_size_;
EncodedStatistics statistics_;
+ /// Row ordinal within the row group to the first row in the data page.
+ std::optional<int64_t> first_row_index_;
};
class DataPageV1 : public DataPage {
@@ -88,9 +97,10 @@ class DataPageV1 : public DataPage {
DataPageV1(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
Encoding::type encoding, Encoding::type definition_level_encoding,
Encoding::type repetition_level_encoding, int64_t
uncompressed_size,
- const EncodedStatistics& statistics = EncodedStatistics())
+ const EncodedStatistics& statistics = EncodedStatistics(),
+ std::optional<int64_t> first_row_index = std::nullopt)
: DataPage(PageType::DATA_PAGE, buffer, num_values, encoding,
uncompressed_size,
- statistics),
+ statistics, std::move(first_row_index)),
definition_level_encoding_(definition_level_encoding),
repetition_level_encoding_(repetition_level_encoding) {}
@@ -109,9 +119,10 @@ class DataPageV2 : public DataPage {
int32_t num_rows, Encoding::type encoding,
int32_t definition_levels_byte_length, int32_t
repetition_levels_byte_length,
int64_t uncompressed_size, bool is_compressed = false,
- const EncodedStatistics& statistics = EncodedStatistics())
+ const EncodedStatistics& statistics = EncodedStatistics(),
+ std::optional<int64_t> first_row_index = std::nullopt)
: DataPage(PageType::DATA_PAGE_V2, buffer, num_values, encoding,
uncompressed_size,
- statistics),
+ statistics, std::move(first_row_index)),
num_nulls_(num_nulls),
num_rows_(num_rows),
definition_levels_byte_length_(definition_levels_byte_length),
diff --git a/cpp/src/parquet/column_writer.cc b/cpp/src/parquet/column_writer.cc
index fd1b0fc1e2..222fc853e3 100644
--- a/cpp/src/parquet/column_writer.cc
+++ b/cpp/src/parquet/column_writer.cc
@@ -49,6 +49,7 @@
#include "parquet/encryption/internal_file_encryptor.h"
#include "parquet/level_conversion.h"
#include "parquet/metadata.h"
+#include "parquet/page_index.h"
#include "parquet/platform.h"
#include "parquet/properties.h"
#include "parquet/schema.h"
@@ -253,7 +254,9 @@ class SerializedPageWriter : public PageWriter {
bool use_page_checksum_verification,
MemoryPool* pool = ::arrow::default_memory_pool(),
std::shared_ptr<Encryptor> meta_encryptor = nullptr,
- std::shared_ptr<Encryptor> data_encryptor = nullptr)
+ std::shared_ptr<Encryptor> data_encryptor = nullptr,
+ ColumnIndexBuilder* column_index_builder = nullptr,
+ OffsetIndexBuilder* offset_index_builder = nullptr)
: sink_(std::move(sink)),
metadata_(metadata),
pool_(pool),
@@ -268,7 +271,9 @@ class SerializedPageWriter : public PageWriter {
page_checksum_verification_(use_page_checksum_verification),
meta_encryptor_(std::move(meta_encryptor)),
data_encryptor_(std::move(data_encryptor)),
- encryption_buffer_(AllocateBuffer(pool, 0)) {
+ encryption_buffer_(AllocateBuffer(pool, 0)),
+ column_index_builder_(column_index_builder),
+ offset_index_builder_(offset_index_builder) {
if (data_encryptor_ != nullptr || meta_encryptor_ != nullptr) {
InitEncryption();
}
@@ -339,6 +344,10 @@ class SerializedPageWriter : public PageWriter {
if (meta_encryptor_ != nullptr) {
UpdateEncryption(encryption::kColumnMetaData);
}
+
+ // Serialized page writer does not need to adjust page offsets.
+ FinishPageIndexes(/*final_position=*/0);
+
// index_page_offset = -1 since they are not supported
metadata_->Finish(num_values_, dictionary_page_offset_, -1,
data_page_offset_,
total_compressed_size_, total_uncompressed_size_,
has_dictionary,
@@ -417,6 +426,25 @@ class SerializedPageWriter : public PageWriter {
thrift_serializer_->Serialize(&page_header, sink_.get(),
meta_encryptor_);
PARQUET_THROW_NOT_OK(sink_->Write(output_data_buffer, output_data_len));
+ /// Collect page index
+ if (column_index_builder_ != nullptr) {
+ column_index_builder_->AddPage(page.statistics());
+ }
+ if (offset_index_builder_ != nullptr) {
+ const int64_t compressed_size = output_data_len + header_size;
+ if (compressed_size > std::numeric_limits<int32_t>::max()) {
+ throw ParquetException("Compressed page size overflows to INT32_MAX.");
+ }
+ if (!page.first_row_index().has_value()) {
+ throw ParquetException("First row index is not set in data page.");
+ }
+ /// start_pos is a relative offset in the buffered mode. It should be
+ /// adjusted via OffsetIndexBuilder::Finish() after BufferedPageWriter
+ /// has flushed all data pages.
+ offset_index_builder_->AddPage(start_pos,
static_cast<int32_t>(compressed_size),
+ *page.first_row_index());
+ }
+
total_uncompressed_size_ += uncompressed_size + header_size;
total_compressed_size_ += output_data_len + header_size;
num_values_ += page.num_values();
@@ -458,6 +486,17 @@ class SerializedPageWriter : public PageWriter {
page_header.__set_data_page_header_v2(data_page_header);
}
+ /// \brief Finish page index builders and update the stream offset to adjust
+ /// page offsets.
+ void FinishPageIndexes(int64_t final_position) {
+ if (column_index_builder_ != nullptr) {
+ column_index_builder_->Finish();
+ }
+ if (offset_index_builder_ != nullptr) {
+ offset_index_builder_->Finish(final_position);
+ }
+ }
+
bool has_compressor() override { return (compressor_ != nullptr); }
int64_t num_values() { return num_values_; }
@@ -563,6 +602,9 @@ class SerializedPageWriter : public PageWriter {
std::map<Encoding::type, int32_t> dict_encoding_stats_;
std::map<Encoding::type, int32_t> data_encoding_stats_;
+
+ ColumnIndexBuilder* column_index_builder_;
+ OffsetIndexBuilder* offset_index_builder_;
};
// This implementation of the PageWriter writes to the final sink on Close .
@@ -574,13 +616,16 @@ class BufferedPageWriter : public PageWriter {
bool use_page_checksum_verification,
MemoryPool* pool = ::arrow::default_memory_pool(),
std::shared_ptr<Encryptor> meta_encryptor = nullptr,
- std::shared_ptr<Encryptor> data_encryptor = nullptr)
+ std::shared_ptr<Encryptor> data_encryptor = nullptr,
+ ColumnIndexBuilder* column_index_builder = nullptr,
+ OffsetIndexBuilder* offset_index_builder = nullptr)
: final_sink_(std::move(sink)), metadata_(metadata),
has_dictionary_pages_(false) {
in_memory_sink_ = CreateOutputStream(pool);
pager_ = std::make_unique<SerializedPageWriter>(
in_memory_sink_, codec, compression_level, metadata, row_group_ordinal,
current_column_ordinal, use_page_checksum_verification, pool,
- std::move(meta_encryptor), std::move(data_encryptor));
+ std::move(meta_encryptor), std::move(data_encryptor),
column_index_builder,
+ offset_index_builder);
}
int64_t WriteDictionaryPage(const DictionaryPage& page) override {
@@ -606,6 +651,9 @@ class BufferedPageWriter : public PageWriter {
// Write metadata at end of column chunk
metadata_->WriteTo(in_memory_sink_.get());
+ // Buffered page writer needs to adjust page offsets.
+ pager_->FinishPageIndexes(final_position);
+
// flush everything to the serialized sink
PARQUET_ASSIGN_OR_THROW(auto buffer, in_memory_sink_->Finish());
PARQUET_THROW_NOT_OK(final_sink_->Write(buffer));
@@ -638,17 +686,20 @@ std::unique_ptr<PageWriter> PageWriter::Open(
int compression_level, ColumnChunkMetaDataBuilder* metadata,
int16_t row_group_ordinal, int16_t column_chunk_ordinal, MemoryPool* pool,
bool buffered_row_group, std::shared_ptr<Encryptor> meta_encryptor,
- std::shared_ptr<Encryptor> data_encryptor, bool
page_write_checksum_enabled) {
+ std::shared_ptr<Encryptor> data_encryptor, bool
page_write_checksum_enabled,
+ ColumnIndexBuilder* column_index_builder, OffsetIndexBuilder*
offset_index_builder) {
if (buffered_row_group) {
return std::unique_ptr<PageWriter>(new BufferedPageWriter(
std::move(sink), codec, compression_level, metadata, row_group_ordinal,
column_chunk_ordinal, page_write_checksum_enabled, pool,
- std::move(meta_encryptor), std::move(data_encryptor)));
+ std::move(meta_encryptor), std::move(data_encryptor),
column_index_builder,
+ offset_index_builder));
} else {
return std::unique_ptr<PageWriter>(new SerializedPageWriter(
std::move(sink), codec, compression_level, metadata, row_group_ordinal,
column_chunk_ordinal, page_write_checksum_enabled, pool,
- std::move(meta_encryptor), std::move(data_encryptor)));
+ std::move(meta_encryptor), std::move(data_encryptor),
column_index_builder,
+ offset_index_builder));
}
}
@@ -925,6 +976,7 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t
definition_levels_rle_size,
}
int32_t num_values = static_cast<int32_t>(num_buffered_values_);
+ int64_t first_row_index = rows_written_ - num_buffered_rows_;
// Write the page to OutputStream eagerly if there is no dictionary or
// if dictionary encoding has fallen back to PLAIN
@@ -934,13 +986,13 @@ void ColumnWriterImpl::BuildDataPageV1(int64_t
definition_levels_rle_size,
compressed_data->CopySlice(0, compressed_data->size(), allocator_));
std::unique_ptr<DataPage> page_ptr = std::make_unique<DataPageV1>(
compressed_data_copy, num_values, encoding_, Encoding::RLE,
Encoding::RLE,
- uncompressed_size, page_stats);
+ uncompressed_size, page_stats, first_row_index);
total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader);
data_pages_.push_back(std::move(page_ptr));
} else { // Eagerly write pages
DataPageV1 page(compressed_data, num_values, encoding_, Encoding::RLE,
Encoding::RLE,
- uncompressed_size, page_stats);
+ uncompressed_size, page_stats, first_row_index);
WriteDataPage(page);
}
}
@@ -977,6 +1029,7 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t
definition_levels_rle_size,
int32_t num_rows = static_cast<int32_t>(num_buffered_rows_);
int32_t def_levels_byte_length =
static_cast<int32_t>(definition_levels_rle_size);
int32_t rep_levels_byte_length =
static_cast<int32_t>(repetition_levels_rle_size);
+ int64_t first_row_index = rows_written_ - num_buffered_rows_;
// page_stats.null_count is not set when page_statistics_ is nullptr. It is
only used
// here for safety check.
@@ -989,13 +1042,14 @@ void ColumnWriterImpl::BuildDataPageV2(int64_t
definition_levels_rle_size,
combined->CopySlice(0, combined->size(),
allocator_));
std::unique_ptr<DataPage> page_ptr = std::make_unique<DataPageV2>(
combined, num_values, null_count, num_rows, encoding_,
def_levels_byte_length,
- rep_levels_byte_length, uncompressed_size, pager_->has_compressor(),
page_stats);
+ rep_levels_byte_length, uncompressed_size, pager_->has_compressor(),
page_stats,
+ first_row_index);
total_compressed_bytes_ += page_ptr->size() + sizeof(format::PageHeader);
data_pages_.push_back(std::move(page_ptr));
} else {
DataPageV2 page(combined, num_values, null_count, num_rows, encoding_,
def_levels_byte_length, rep_levels_byte_length,
uncompressed_size,
- pager_->has_compressor(), page_stats);
+ pager_->has_compressor(), page_stats, first_row_index);
WriteDataPage(page);
}
}
@@ -1313,7 +1367,8 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
public TypedColumnWriter<
const WriterProperties* properties() override { return properties_; }
bool pages_change_on_record_boundaries() const {
- return properties_->data_page_version() == ParquetDataPageVersion::V2;
+ return properties_->data_page_version() == ParquetDataPageVersion::V2 ||
+ properties_->write_page_index();
}
private:
@@ -1515,6 +1570,19 @@ class TypedColumnWriterImpl : public ColumnWriterImpl,
public TypedColumnWriter<
}
}
+ /// \brief Write values with spaces and update page statistics accordingly.
+ ///
+ /// \param values input buffer of values to write, including spaces.
+ /// \param num_values number of non-null values in the values buffer.
+ /// \param num_spaced_values length of values buffer, including spaces and
does not
+ /// count some nulls from ancestor (e.g. empty lists).
+ /// \param valid_bits validity bitmap of values buffer, which does not
include some
+ /// nulls from ancestor (e.g. empty lists).
+ /// \param valid_bits_offset offset to valid_bits bitmap.
+ /// \param num_levels number of levels to write, including nulls from values
buffer
+ /// and nulls from ancestor (e.g. empty lists).
+ /// \param num_nulls number of nulls in the values buffer as well as nulls
from the
+ /// ancestor (e.g. empty lists).
void WriteValuesSpaced(const T* values, int64_t num_values, int64_t
num_spaced_values,
const uint8_t* valid_bits, int64_t valid_bits_offset,
int64_t num_levels, int64_t num_nulls) {
diff --git a/cpp/src/parquet/column_writer.h b/cpp/src/parquet/column_writer.h
index 4dd4b10ccc..792b108ac8 100644
--- a/cpp/src/parquet/column_writer.h
+++ b/cpp/src/parquet/column_writer.h
@@ -42,11 +42,13 @@ class RleEncoder;
namespace parquet {
struct ArrowWriteContext;
+class ColumnChunkMetaDataBuilder;
class ColumnDescriptor;
+class ColumnIndexBuilder;
class DataPage;
class DictionaryPage;
-class ColumnChunkMetaDataBuilder;
class Encryptor;
+class OffsetIndexBuilder;
class WriterProperties;
class PARQUET_EXPORT LevelEncoder {
@@ -91,7 +93,11 @@ class PARQUET_EXPORT PageWriter {
bool buffered_row_group = false,
std::shared_ptr<Encryptor> header_encryptor = NULLPTR,
std::shared_ptr<Encryptor> data_encryptor = NULLPTR,
- bool page_write_checksum_enabled = false);
+ bool page_write_checksum_enabled = false,
+ // column_index_builder MUST outlive the PageWriter
+ ColumnIndexBuilder* column_index_builder = NULLPTR,
+ // offset_index_builder MUST outlive the PageWriter
+ OffsetIndexBuilder* offset_index_builder = NULLPTR);
// The Column Writer decides if dictionary encoding is used if set and
// if the dictionary encoding has fallen back to default encoding on
reaching dictionary
diff --git a/cpp/src/parquet/file_writer.cc b/cpp/src/parquet/file_writer.cc
index ec2e0e8a8a..481d5b6d30 100644
--- a/cpp/src/parquet/file_writer.cc
+++ b/cpp/src/parquet/file_writer.cc
@@ -28,9 +28,9 @@
#include "parquet/encryption/encryption_internal.h"
#include "parquet/encryption/internal_file_encryptor.h"
#include "parquet/exception.h"
+#include "parquet/page_index.h"
#include "parquet/platform.h"
#include "parquet/schema.h"
-#include "parquet/types.h"
using arrow::MemoryPool;
@@ -89,7 +89,8 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
RowGroupSerializer(std::shared_ptr<ArrowOutputStream> sink,
RowGroupMetaDataBuilder* metadata, int16_t
row_group_ordinal,
const WriterProperties* properties, bool
buffered_row_group = false,
- InternalFileEncryptor* file_encryptor = nullptr)
+ InternalFileEncryptor* file_encryptor = nullptr,
+ PageIndexBuilder* page_index_builder = nullptr)
: sink_(std::move(sink)),
metadata_(metadata),
properties_(properties),
@@ -100,7 +101,8 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
next_column_index_(0),
num_rows_(0),
buffered_row_group_(buffered_row_group),
- file_encryptor_(file_encryptor) {
+ file_encryptor_(file_encryptor),
+ page_index_builder_(page_index_builder) {
if (buffered_row_group) {
InitColumns();
} else {
@@ -135,8 +137,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
column_writers_[0]->total_compressed_bytes_written();
}
- ++next_column_index_;
-
+ const int32_t column_ordinal = next_column_index_++;
const auto& path = col_meta->descr()->path();
auto meta_encryptor =
file_encryptor_ ?
file_encryptor_->GetColumnMetaEncryptor(path->ToDotString())
@@ -144,11 +145,17 @@ class RowGroupSerializer : public
RowGroupWriter::Contents {
auto data_encryptor =
file_encryptor_ ?
file_encryptor_->GetColumnDataEncryptor(path->ToDotString())
: nullptr;
+ auto ci_builder = page_index_builder_
+ ?
page_index_builder_->GetColumnIndexBuilder(column_ordinal)
+ : nullptr;
+ auto oi_builder = page_index_builder_
+ ?
page_index_builder_->GetOffsetIndexBuilder(column_ordinal)
+ : nullptr;
std::unique_ptr<PageWriter> pager = PageWriter::Open(
sink_, properties_->compression(path),
properties_->compression_level(path),
- col_meta, row_group_ordinal_, static_cast<int16_t>(next_column_index_
- 1),
+ col_meta, row_group_ordinal_, static_cast<int16_t>(column_ordinal),
properties_->memory_pool(), false, meta_encryptor, data_encryptor,
- properties_->page_checksum_enabled());
+ properties_->page_checksum_enabled(), ci_builder, oi_builder);
column_writers_[0] = ColumnWriter::Make(col_meta, std::move(pager),
properties_);
return column_writers_[0].get();
}
@@ -240,6 +247,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
mutable int64_t num_rows_;
bool buffered_row_group_;
InternalFileEncryptor* file_encryptor_;
+ PageIndexBuilder* page_index_builder_;
void CheckRowsWritten() const {
// verify when only one column is written at a time
@@ -267,18 +275,25 @@ class RowGroupSerializer : public
RowGroupWriter::Contents {
for (int i = 0; i < num_columns(); i++) {
auto col_meta = metadata_->NextColumnChunk();
const auto& path = col_meta->descr()->path();
+ const int32_t column_ordinal = next_column_index_++;
auto meta_encryptor =
file_encryptor_ ?
file_encryptor_->GetColumnMetaEncryptor(path->ToDotString())
: nullptr;
auto data_encryptor =
file_encryptor_ ?
file_encryptor_->GetColumnDataEncryptor(path->ToDotString())
: nullptr;
+ auto ci_builder = page_index_builder_
+ ?
page_index_builder_->GetColumnIndexBuilder(column_ordinal)
+ : nullptr;
+ auto oi_builder = page_index_builder_
+ ?
page_index_builder_->GetOffsetIndexBuilder(column_ordinal)
+ : nullptr;
std::unique_ptr<PageWriter> pager = PageWriter::Open(
sink_, properties_->compression(path),
properties_->compression_level(path),
col_meta, static_cast<int16_t>(row_group_ordinal_),
- static_cast<int16_t>(next_column_index_++),
properties_->memory_pool(),
+ static_cast<int16_t>(column_ordinal), properties_->memory_pool(),
buffered_row_group_, meta_encryptor, data_encryptor,
- properties_->page_checksum_enabled());
+ properties_->page_checksum_enabled(), ci_builder, oi_builder);
column_writers_.push_back(
ColumnWriter::Make(col_meta, std::move(pager), properties_));
}
@@ -317,6 +332,8 @@ class FileSerializer : public ParquetFileWriter::Contents {
}
row_group_writer_.reset();
+ WritePageIndex();
+
// Write magic bytes and metadata
auto file_encryption_properties =
properties_->file_encryption_properties();
@@ -345,9 +362,12 @@ class FileSerializer : public ParquetFileWriter::Contents {
}
num_row_groups_++;
auto rg_metadata = metadata_->AppendRowGroup();
+ if (page_index_builder_) {
+ page_index_builder_->AppendRowGroup();
+ }
std::unique_ptr<RowGroupWriter::Contents> contents(new RowGroupSerializer(
sink_, rg_metadata, static_cast<int16_t>(num_row_groups_ - 1),
properties_.get(),
- buffered_row_group, file_encryptor_.get()));
+ buffered_row_group, file_encryptor_.get(), page_index_builder_.get()));
row_group_writer_ = std::make_unique<RowGroupWriter>(std::move(contents));
return row_group_writer_.get();
}
@@ -412,6 +432,21 @@ class FileSerializer : public ParquetFileWriter::Contents {
}
}
+ void WritePageIndex() {
+ if (page_index_builder_ != nullptr) {
+ if (properties_->file_encryption_properties()) {
+ throw ParquetException("Encryption is not supported with page index");
+ }
+
+ // Serialize page index after all row groups have been written and report
+ // location to the file metadata.
+ PageIndexLocation page_index_location;
+ page_index_builder_->Finish();
+ page_index_builder_->WriteTo(sink_.get(), &page_index_location);
+ metadata_->SetPageIndexLocation(page_index_location);
+ }
+ }
+
std::shared_ptr<ArrowOutputStream> sink_;
bool is_open_;
const std::shared_ptr<WriterProperties> properties_;
@@ -420,7 +455,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
std::unique_ptr<FileMetaDataBuilder> metadata_;
// Only one of the row group writers is active at a time
std::unique_ptr<RowGroupWriter> row_group_writer_;
-
+ std::unique_ptr<PageIndexBuilder> page_index_builder_;
std::unique_ptr<InternalFileEncryptor> file_encryptor_;
void StartFile() {
@@ -459,6 +494,10 @@ class FileSerializer : public ParquetFileWriter::Contents {
PARQUET_THROW_NOT_OK(sink_->Write(kParquetMagic, 4));
}
}
+
+ if (properties_->write_page_index()) {
+ page_index_builder_ = PageIndexBuilder::Make(&schema_);
+ }
}
};
diff --git a/cpp/src/parquet/metadata.cc b/cpp/src/parquet/metadata.cc
index e47257bf89..b6c240115f 100644
--- a/cpp/src/parquet/metadata.cc
+++ b/cpp/src/parquet/metadata.cc
@@ -1763,6 +1763,40 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
return current_row_group_builder_.get();
}
+ void SetPageIndexLocation(const PageIndexLocation& location) {
+ auto set_index_location =
+ [this](size_t row_group_ordinal,
+ const PageIndexLocation::FileIndexLocation& file_index_location,
+ bool column_index) {
+ auto& row_group_metadata = this->row_groups_.at(row_group_ordinal);
+ auto iter = file_index_location.find(row_group_ordinal);
+ if (iter != file_index_location.cend()) {
+ const auto& row_group_index_location = iter->second;
+ for (size_t i = 0; i < row_group_index_location.size(); ++i) {
+ if (i >= row_group_metadata.columns.size()) {
+ throw ParquetException("Cannot find metadata for column
ordinal ", i);
+ }
+ auto& column_metadata = row_group_metadata.columns.at(i);
+ const auto& index_location = row_group_index_location.at(i);
+ if (index_location.has_value()) {
+ if (column_index) {
+
column_metadata.__set_column_index_offset(index_location->offset);
+
column_metadata.__set_column_index_length(index_location->length);
+ } else {
+
column_metadata.__set_offset_index_offset(index_location->offset);
+
column_metadata.__set_offset_index_length(index_location->length);
+ }
+ }
+ }
+ }
+ };
+
+ for (size_t i = 0; i < row_groups_.size(); ++i) {
+ set_index_location(i, location.column_index_location, true);
+ set_index_location(i, location.offset_index_location, false);
+ }
+ }
+
std::unique_ptr<FileMetaData> Finish() {
int64_t total_rows = 0;
for (auto row_group : row_groups_) {
@@ -1888,6 +1922,10 @@ RowGroupMetaDataBuilder*
FileMetaDataBuilder::AppendRowGroup() {
return impl_->AppendRowGroup();
}
+void FileMetaDataBuilder::SetPageIndexLocation(const PageIndexLocation&
location) {
+ impl_->SetPageIndexLocation(location);
+}
+
std::unique_ptr<FileMetaData> FileMetaDataBuilder::Finish() { return
impl_->Finish(); }
std::unique_ptr<FileCryptoMetaData> FileMetaDataBuilder::GetCryptoMetaData() {
diff --git a/cpp/src/parquet/metadata.h b/cpp/src/parquet/metadata.h
index 277f59a1b6..efcb17be04 100644
--- a/cpp/src/parquet/metadata.h
+++ b/cpp/src/parquet/metadata.h
@@ -506,6 +506,21 @@ class PARQUET_EXPORT RowGroupMetaDataBuilder {
std::unique_ptr<RowGroupMetaDataBuilderImpl> impl_;
};
+/// \brief Public struct for location to all page indexes in a parquet file.
+struct PageIndexLocation {
+ /// Alias type of page index location of a row group. The index location
+ /// is located by column ordinal. If the column does not have the page index,
+ /// its value is set to std::nullopt.
+ using RowGroupIndexLocation = std::vector<std::optional<IndexLocation>>;
+ /// Alias type of page index location of a parquet file. The index location
+ /// is located by the row group ordinal.
+ using FileIndexLocation = std::map<size_t, RowGroupIndexLocation>;
+ /// Row group column index locations which uses row group ordinal as the key.
+ FileIndexLocation column_index_location;
+ /// Row group offset index locations which uses row group ordinal as the key.
+ FileIndexLocation offset_index_location;
+};
+
class PARQUET_EXPORT FileMetaDataBuilder {
public:
// API convenience to get a MetaData reader
@@ -518,6 +533,9 @@ class PARQUET_EXPORT FileMetaDataBuilder {
// The prior RowGroupMetaDataBuilder (if any) is destroyed
RowGroupMetaDataBuilder* AppendRowGroup();
+ // Update location to all page indexes in the parquet file
+ void SetPageIndexLocation(const PageIndexLocation& location);
+
// Complete the Thrift structure
std::unique_ptr<FileMetaData> Finish();
diff --git a/cpp/src/parquet/page_index.cc b/cpp/src/parquet/page_index.cc
index f24cecf43b..c2a42547ee 100644
--- a/cpp/src/parquet/page_index.cc
+++ b/cpp/src/parquet/page_index.cc
@@ -426,6 +426,355 @@ class PageIndexReaderImpl : public PageIndexReader {
std::unordered_map<int32_t, RowGroupIndexReadRange> index_read_ranges_;
};
+/// \brief Internal state of page index builder.
+enum class BuilderState {
+ /// Created but not yet write any data.
+ kCreated,
+ /// Some data are written but not yet finished.
+ kStarted,
+ /// All data are written and no more write is allowed.
+ kFinished,
+ /// The builder has corrupted data or empty data and therefore discarded.
+ kDiscarded
+};
+
+template <typename DType>
+class ColumnIndexBuilderImpl final : public ColumnIndexBuilder {
+ public:
+ using T = typename DType::c_type;
+
+ explicit ColumnIndexBuilderImpl(const ColumnDescriptor* descr) :
descr_(descr) {
+ /// Initialize the null_counts vector as set. Invalid null_counts vector
from
+ /// any page will invalidate the null_counts vector of the column index.
+ column_index_.__isset.null_counts = true;
+ column_index_.boundary_order = format::BoundaryOrder::UNORDERED;
+ }
+
+ void AddPage(const EncodedStatistics& stats) override {
+ if (state_ == BuilderState::kFinished) {
+ throw ParquetException("Cannot add page to finished
ColumnIndexBuilder.");
+ } else if (state_ == BuilderState::kDiscarded) {
+ /// The offset index is discarded. Do nothing.
+ return;
+ }
+
+ state_ = BuilderState::kStarted;
+
+ if (stats.all_null_value) {
+ column_index_.null_pages.emplace_back(true);
+ column_index_.min_values.emplace_back("");
+ column_index_.max_values.emplace_back("");
+ } else if (stats.has_min && stats.has_max) {
+ const size_t page_ordinal = column_index_.null_pages.size();
+ non_null_page_indices_.emplace_back(page_ordinal);
+ column_index_.min_values.emplace_back(stats.min());
+ column_index_.max_values.emplace_back(stats.max());
+ column_index_.null_pages.emplace_back(false);
+ } else {
+ /// This is a non-null page but it lacks of meaningful min/max values.
+ /// Discard the column index.
+ state_ = BuilderState::kDiscarded;
+ return;
+ }
+
+ if (column_index_.__isset.null_counts && stats.has_null_count) {
+ column_index_.null_counts.emplace_back(stats.null_count);
+ } else {
+ column_index_.__isset.null_counts = false;
+ column_index_.null_counts.clear();
+ }
+ }
+
+ void Finish() override {
+ switch (state_) {
+ case BuilderState::kCreated: {
+ /// No page is added. Discard the column index.
+ state_ = BuilderState::kDiscarded;
+ return;
+ }
+ case BuilderState::kFinished:
+ throw ParquetException("ColumnIndexBuilder is already finished.");
+ case BuilderState::kDiscarded:
+ // The column index is discarded. Do nothing.
+ return;
+ case BuilderState::kStarted:
+ break;
+ }
+
+ state_ = BuilderState::kFinished;
+
+ /// Clear null_counts vector because at least one page does not provide it.
+ if (!column_index_.__isset.null_counts) {
+ column_index_.null_counts.clear();
+ }
+
+ /// Decode min/max values according to the data type.
+ const size_t non_null_page_count = non_null_page_indices_.size();
+ std::vector<T> min_values, max_values;
+ min_values.resize(non_null_page_count);
+ max_values.resize(non_null_page_count);
+ auto decoder = MakeTypedDecoder<DType>(Encoding::PLAIN, descr_);
+ for (size_t i = 0; i < non_null_page_count; ++i) {
+ auto page_ordinal = non_null_page_indices_.at(i);
+ Decode<DType>(decoder, column_index_.min_values.at(page_ordinal),
&min_values, i);
+ Decode<DType>(decoder, column_index_.max_values.at(page_ordinal),
&max_values, i);
+ }
+
+ /// Decide the boundary order from decoded min/max values.
+ auto boundary_order = DetermineBoundaryOrder(min_values, max_values);
+ column_index_.__set_boundary_order(ToThrift(boundary_order));
+ }
+
+ void WriteTo(::arrow::io::OutputStream* sink) const override {
+ if (state_ == BuilderState::kFinished) {
+ ThriftSerializer{}.Serialize(&column_index_, sink);
+ }
+ }
+
+ std::unique_ptr<ColumnIndex> Build() const override {
+ if (state_ == BuilderState::kFinished) {
+ return std::make_unique<TypedColumnIndexImpl<DType>>(*descr_,
column_index_);
+ }
+ return nullptr;
+ }
+
+ private:
+ BoundaryOrder::type DetermineBoundaryOrder(const std::vector<T>& min_values,
+ const std::vector<T>& max_values)
const {
+ DCHECK_EQ(min_values.size(), max_values.size());
+ if (min_values.empty()) {
+ return BoundaryOrder::Unordered;
+ }
+
+ std::shared_ptr<TypedComparator<DType>> comparator;
+ try {
+ comparator = MakeComparator<DType>(descr_);
+ } catch (const ParquetException&) {
+ /// Simply return unordered for unsupported comparator.
+ return BoundaryOrder::Unordered;
+ }
+
+ /// Check if both min_values and max_values are in ascending order.
+ bool is_ascending = true;
+ for (size_t i = 1; i < min_values.size(); ++i) {
+ if (comparator->Compare(min_values[i], min_values[i - 1]) ||
+ comparator->Compare(max_values[i], max_values[i - 1])) {
+ is_ascending = false;
+ break;
+ }
+ }
+ if (is_ascending) {
+ return BoundaryOrder::Ascending;
+ }
+
+ /// Check if both min_values and max_values are in descending order.
+ bool is_descending = true;
+ for (size_t i = 1; i < min_values.size(); ++i) {
+ if (comparator->Compare(min_values[i - 1], min_values[i]) ||
+ comparator->Compare(max_values[i - 1], max_values[i])) {
+ is_descending = false;
+ break;
+ }
+ }
+ if (is_descending) {
+ return BoundaryOrder::Descending;
+ }
+
+ /// Neither ascending nor descending is detected.
+ return BoundaryOrder::Unordered;
+ }
+
+ const ColumnDescriptor* descr_;
+ format::ColumnIndex column_index_;
+ std::vector<size_t> non_null_page_indices_;
+ BuilderState state_ = BuilderState::kCreated;
+};
+
+class OffsetIndexBuilderImpl final : public OffsetIndexBuilder {
+ public:
+ OffsetIndexBuilderImpl() = default;
+
+ void AddPage(int64_t offset, int32_t compressed_page_size,
+ int64_t first_row_index) override {
+ if (state_ == BuilderState::kFinished) {
+ throw ParquetException("Cannot add page to finished
OffsetIndexBuilder.");
+ } else if (state_ == BuilderState::kDiscarded) {
+ /// The offset index is discarded. Do nothing.
+ return;
+ }
+
+ state_ = BuilderState::kStarted;
+
+ format::PageLocation page_location;
+ page_location.__set_offset(offset);
+ page_location.__set_compressed_page_size(compressed_page_size);
+ page_location.__set_first_row_index(first_row_index);
+ offset_index_.page_locations.emplace_back(std::move(page_location));
+ }
+
+ void Finish(int64_t final_position) override {
+ switch (state_) {
+ case BuilderState::kCreated: {
+ /// No pages are added. Simply discard the offset index.
+ state_ = BuilderState::kDiscarded;
+ break;
+ }
+ case BuilderState::kStarted: {
+ /// Adjust page offsets according the final position.
+ if (final_position > 0) {
+ for (auto& page_location : offset_index_.page_locations) {
+ page_location.__set_offset(page_location.offset + final_position);
+ }
+ }
+ state_ = BuilderState::kFinished;
+ break;
+ }
+ case BuilderState::kFinished:
+ case BuilderState::kDiscarded:
+ throw ParquetException("OffsetIndexBuilder is already finished");
+ }
+ }
+
+ void WriteTo(::arrow::io::OutputStream* sink) const override {
+ if (state_ == BuilderState::kFinished) {
+ ThriftSerializer{}.Serialize(&offset_index_, sink);
+ }
+ }
+
+ std::unique_ptr<OffsetIndex> Build() const override {
+ if (state_ == BuilderState::kFinished) {
+ return std::make_unique<OffsetIndexImpl>(offset_index_);
+ }
+ return nullptr;
+ }
+
+ private:
+ format::OffsetIndex offset_index_;
+ BuilderState state_ = BuilderState::kCreated;
+};
+
+class PageIndexBuilderImpl final : public PageIndexBuilder {
+ public:
+ explicit PageIndexBuilderImpl(const SchemaDescriptor* schema) :
schema_(schema) {}
+
+ void AppendRowGroup() override {
+ if (finished_) {
+ throw ParquetException(
+ "Cannot call AppendRowGroup() to finished PageIndexBuilder.");
+ }
+
+ // Append new builders of next row group.
+ const auto num_columns = static_cast<size_t>(schema_->num_columns());
+ column_index_builders_.emplace_back();
+ offset_index_builders_.emplace_back();
+ column_index_builders_.back().resize(num_columns);
+ offset_index_builders_.back().resize(num_columns);
+
+ DCHECK_EQ(column_index_builders_.size(), offset_index_builders_.size());
+ DCHECK_EQ(column_index_builders_.back().size(), num_columns);
+ DCHECK_EQ(offset_index_builders_.back().size(), num_columns);
+ }
+
+ ColumnIndexBuilder* GetColumnIndexBuilder(int32_t i) override {
+ CheckState(i);
+ std::unique_ptr<ColumnIndexBuilder>& builder =
column_index_builders_.back()[i];
+ if (builder == nullptr) {
+ builder = ColumnIndexBuilder::Make(schema_->Column(i));
+ }
+ return builder.get();
+ }
+
+ OffsetIndexBuilder* GetOffsetIndexBuilder(int32_t i) override {
+ CheckState(i);
+ std::unique_ptr<OffsetIndexBuilder>& builder =
offset_index_builders_.back()[i];
+ if (builder == nullptr) {
+ builder = OffsetIndexBuilder::Make();
+ }
+ return builder.get();
+ }
+
+ void Finish() override { finished_ = true; }
+
+ void WriteTo(::arrow::io::OutputStream* sink,
+ PageIndexLocation* location) const override {
+ if (!finished_) {
+ throw ParquetException("Cannot call WriteTo() to unfinished
PageIndexBuilder.");
+ }
+
+ location->column_index_location.clear();
+ location->offset_index_location.clear();
+
+ /// Serialize column index ordered by row group ordinal and then column
ordinal.
+ SerializeIndex(column_index_builders_, sink,
&location->column_index_location);
+
+ /// Serialize offset index ordered by row group ordinal and then column
ordinal.
+ SerializeIndex(offset_index_builders_, sink,
&location->offset_index_location);
+ }
+
+ private:
+ /// Make sure column ordinal is not out of bound and the builder is in good
state.
+ void CheckState(int32_t column_ordinal) const {
+ if (finished_) {
+ throw ParquetException("PageIndexBuilder is already finished.");
+ }
+ if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) {
+ throw ParquetException("Invalid column ordinal: ", column_ordinal);
+ }
+ if (offset_index_builders_.empty() || column_index_builders_.empty()) {
+ throw ParquetException("No row group appended to PageIndexBuilder.");
+ }
+ }
+
+ template <typename Builder>
+ void SerializeIndex(
+ const std::vector<std::vector<std::unique_ptr<Builder>>>&
page_index_builders,
+ ::arrow::io::OutputStream* sink,
+ std::map<size_t, std::vector<std::optional<IndexLocation>>>* location)
const {
+ const auto num_columns = static_cast<size_t>(schema_->num_columns());
+
+ /// Serialize the same kind of page index row group by row group.
+ for (size_t row_group = 0; row_group < page_index_builders.size();
++row_group) {
+ const auto& row_group_page_index_builders =
page_index_builders[row_group];
+ DCHECK_EQ(row_group_page_index_builders.size(), num_columns);
+
+ bool has_valid_index = false;
+ std::vector<std::optional<IndexLocation>> locations(num_columns,
std::nullopt);
+
+ /// In the same row group, serialize the same kind of page index column
by column.
+ for (size_t column = 0; column < num_columns; ++column) {
+ const auto& column_page_index_builder =
row_group_page_index_builders[column];
+ if (column_page_index_builder != nullptr) {
+ /// Try serializing the page index.
+ PARQUET_ASSIGN_OR_THROW(int64_t pos_before_write, sink->Tell());
+ column_page_index_builder->WriteTo(sink);
+ PARQUET_ASSIGN_OR_THROW(int64_t pos_after_write, sink->Tell());
+ int64_t len = pos_after_write - pos_before_write;
+
+ /// The page index is not serialized and skip reporting its location
+ if (len == 0) {
+ continue;
+ }
+
+ if (len > std::numeric_limits<int32_t>::max()) {
+ throw ParquetException("Page index size overflows to INT32_MAX");
+ }
+ locations[column] = {pos_before_write, static_cast<int32_t>(len)};
+ has_valid_index = true;
+ }
+ }
+
+ if (has_valid_index) {
+ location->emplace(row_group, std::move(locations));
+ }
+ }
+ }
+
+ const SchemaDescriptor* schema_;
+ std::vector<std::vector<std::unique_ptr<ColumnIndexBuilder>>>
column_index_builders_;
+ std::vector<std::vector<std::unique_ptr<OffsetIndexBuilder>>>
offset_index_builders_;
+ bool finished_ = false;
+};
+
} // namespace
RowGroupIndexReadRange PageIndexReader::DeterminePageIndexRangesInRowGroup(
@@ -531,4 +880,38 @@ std::shared_ptr<PageIndexReader> PageIndexReader::Make(
std::move(file_decryptor));
}
+std::unique_ptr<ColumnIndexBuilder> ColumnIndexBuilder::Make(
+ const ColumnDescriptor* descr) {
+ switch (descr->physical_type()) {
+ case Type::BOOLEAN:
+ return std::make_unique<ColumnIndexBuilderImpl<BooleanType>>(descr);
+ case Type::INT32:
+ return std::make_unique<ColumnIndexBuilderImpl<Int32Type>>(descr);
+ case Type::INT64:
+ return std::make_unique<ColumnIndexBuilderImpl<Int64Type>>(descr);
+ case Type::INT96:
+ return std::make_unique<ColumnIndexBuilderImpl<Int96Type>>(descr);
+ case Type::FLOAT:
+ return std::make_unique<ColumnIndexBuilderImpl<FloatType>>(descr);
+ case Type::DOUBLE:
+ return std::make_unique<ColumnIndexBuilderImpl<DoubleType>>(descr);
+ case Type::BYTE_ARRAY:
+ return std::make_unique<ColumnIndexBuilderImpl<ByteArrayType>>(descr);
+ case Type::FIXED_LEN_BYTE_ARRAY:
+ return std::make_unique<ColumnIndexBuilderImpl<FLBAType>>(descr);
+ case Type::UNDEFINED:
+ return nullptr;
+ }
+ ::arrow::Unreachable("Cannot make ColumnIndexBuilder of an unknown type");
+ return nullptr;
+}
+
+std::unique_ptr<OffsetIndexBuilder> OffsetIndexBuilder::Make() {
+ return std::make_unique<OffsetIndexBuilderImpl>();
+}
+
+std::unique_ptr<PageIndexBuilder> PageIndexBuilder::Make(const
SchemaDescriptor* schema) {
+ return std::make_unique<PageIndexBuilderImpl>(schema);
+}
+
} // namespace parquet
diff --git a/cpp/src/parquet/page_index.h b/cpp/src/parquet/page_index.h
index bbd9d794c1..b6d27cfc41 100644
--- a/cpp/src/parquet/page_index.h
+++ b/cpp/src/parquet/page_index.h
@@ -26,8 +26,10 @@
namespace parquet {
class ColumnDescriptor;
+class EncodedStatistics;
class FileMetaData;
class InternalFileDecryptor;
+struct PageIndexLocation;
class ReaderProperties;
class RowGroupMetaData;
class RowGroupPageIndexReader;
@@ -250,4 +252,116 @@ class PARQUET_EXPORT PageIndexReader {
const RowGroupMetaData& row_group_metadata, const std::vector<int32_t>&
columns);
};
+/// \brief Interface for collecting column index of data pages in a column
chunk.
+class PARQUET_EXPORT ColumnIndexBuilder {
+ public:
+ /// \brief API convenience to create a ColumnIndexBuilder.
+ static std::unique_ptr<ColumnIndexBuilder> Make(const ColumnDescriptor*
descr);
+
+ virtual ~ColumnIndexBuilder() = default;
+
+ /// \brief Add statistics of a data page.
+ ///
+ /// If the ColumnIndexBuilder has seen any corrupted statistics, it will
+ /// not update statistics any more.
+ ///
+ /// \param stats Page statistics in the encoded form.
+ virtual void AddPage(const EncodedStatistics& stats) = 0;
+
+ /// \brief Complete the column index.
+ ///
+ /// Once called, AddPage() can no longer be called.
+ /// WriteTo() and Build() can only called after Finish() has been called.
+ virtual void Finish() = 0;
+
+ /// \brief Serialize the column index thrift message.
+ ///
+ /// If the ColumnIndexBuilder has seen any corrupted statistics, it will
+ /// not write any data to the sink.
+ ///
+ /// \param[out] sink output stream to write the serialized message.
+ virtual void WriteTo(::arrow::io::OutputStream* sink) const = 0;
+
+ /// \brief Create a ColumnIndex directly.
+ ///
+ /// \return If the ColumnIndexBuilder has seen any corrupted statistics, it
simply
+ /// returns nullptr. Otherwise the column index is built and returned.
+ virtual std::unique_ptr<ColumnIndex> Build() const = 0;
+};
+
+/// \brief Interface for collecting offset index of data pages in a column
chunk.
+class PARQUET_EXPORT OffsetIndexBuilder {
+ public:
+ /// \brief API convenience to create a OffsetIndexBuilder.
+ static std::unique_ptr<OffsetIndexBuilder> Make();
+
+ virtual ~OffsetIndexBuilder() = default;
+
+ /// \brief Add page location of a data page.
+ virtual void AddPage(int64_t offset, int32_t compressed_page_size,
+ int64_t first_row_index) = 0;
+
+ /// \brief Add page location of a data page.
+ void AddPage(const PageLocation& page_location) {
+ AddPage(page_location.offset, page_location.compressed_page_size,
+ page_location.first_row_index);
+ }
+
+ /// \brief Complete the offset index.
+ ///
+ /// In the buffered row group mode, data pages are flushed into memory
+ /// sink and the OffsetIndexBuilder has only collected the relative offset
+ /// which requires adjustment once they are flushed to the file.
+ ///
+ /// \param final_position Final stream offset to add for page offset
adjustment.
+ virtual void Finish(int64_t final_position) = 0;
+
+ /// \brief Serialize the offset index thrift message.
+ ///
+ /// \param[out] sink output stream to write the serialized message.
+ virtual void WriteTo(::arrow::io::OutputStream* sink) const = 0;
+
+ /// \brief Create an OffsetIndex directly.
+ virtual std::unique_ptr<OffsetIndex> Build() const = 0;
+};
+
+/// \brief Interface for collecting page index of a parquet file.
+class PARQUET_EXPORT PageIndexBuilder {
+ public:
+ /// \brief API convenience to create a PageIndexBuilder.
+ static std::unique_ptr<PageIndexBuilder> Make(const SchemaDescriptor*
schema);
+
+ virtual ~PageIndexBuilder() = default;
+
+ /// \brief Start a new row group.
+ virtual void AppendRowGroup() = 0;
+
+ /// \brief Get the ColumnIndexBuilder from column ordinal.
+ ///
+ /// \param i Column ordinal.
+ /// \return ColumnIndexBuilder for the column and its memory ownership
belongs to
+ /// the PageIndexBuilder.
+ virtual ColumnIndexBuilder* GetColumnIndexBuilder(int32_t i) = 0;
+
+ /// \brief Get the OffsetIndexBuilder from column ordinal.
+ ///
+ /// \param i Column ordinal.
+ /// \return OffsetIndexBuilder for the column and its memory ownership
belongs to
+ /// the PageIndexBuilder.
+ virtual OffsetIndexBuilder* GetOffsetIndexBuilder(int32_t i) = 0;
+
+ /// \brief Complete the page index builder and no more write is allowed.
+ virtual void Finish() = 0;
+
+ /// \brief Serialize the page index thrift message.
+ ///
+ /// Only valid column indexes and offset indexes are serialized and their
locations
+ /// are set.
+ ///
+ /// \param[out] sink The output stream to write the page index.
+ /// \param[out] location The location of all page index to the start of sink.
+ virtual void WriteTo(::arrow::io::OutputStream* sink,
+ PageIndexLocation* location) const = 0;
+};
+
} // namespace parquet
diff --git a/cpp/src/parquet/page_index_test.cc
b/cpp/src/parquet/page_index_test.cc
index 46599960b8..5bfe38522a 100644
--- a/cpp/src/parquet/page_index_test.cc
+++ b/cpp/src/parquet/page_index_test.cc
@@ -18,9 +18,11 @@
#include "parquet/page_index.h"
#include <gtest/gtest.h>
+#include <memory>
#include "arrow/io/file.h"
#include "parquet/file_reader.h"
+#include "parquet/metadata.h"
#include "parquet/schema.h"
#include "parquet/test_util.h"
#include "parquet/thrift_internal.h"
@@ -416,4 +418,416 @@ TEST(PageIndex,
DeterminePageIndexRangesInRowGroupWithMissingPageIndex) {
-1);
}
+TEST(PageIndex, WriteOffsetIndex) {
+ /// Create offset index via the OffsetIndexBuilder interface.
+ auto builder = OffsetIndexBuilder::Make();
+ const size_t num_pages = 5;
+ const std::vector<int64_t> offsets = {100, 200, 300, 400, 500};
+ const std::vector<int32_t> page_sizes = {1024, 2048, 3072, 4096, 8192};
+ const std::vector<int64_t> first_row_indices = {0, 10000, 20000, 30000,
40000};
+ for (size_t i = 0; i < num_pages; ++i) {
+ builder->AddPage(offsets[i], page_sizes[i], first_row_indices[i]);
+ }
+ const int64_t final_position = 4096;
+ builder->Finish(final_position);
+
+ std::vector<std::unique_ptr<OffsetIndex>> offset_indexes;
+ /// 1st element is the offset index just built.
+ offset_indexes.emplace_back(builder->Build());
+ /// 2nd element is the offset index restored by serialize-then-deserialize
round trip.
+ auto sink = CreateOutputStream();
+ builder->WriteTo(sink.get());
+ PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+ offset_indexes.emplace_back(OffsetIndex::Make(buffer->data(),
+
static_cast<uint32_t>(buffer->size()),
+ default_reader_properties()));
+
+ /// Verify the data of the offset index.
+ for (const auto& offset_index : offset_indexes) {
+ ASSERT_EQ(num_pages, offset_index->page_locations().size());
+ for (size_t i = 0; i < num_pages; ++i) {
+ const auto& page_location = offset_index->page_locations().at(i);
+ ASSERT_EQ(offsets[i] + final_position, page_location.offset);
+ ASSERT_EQ(page_sizes[i], page_location.compressed_page_size);
+ ASSERT_EQ(first_row_indices[i], page_location.first_row_index);
+ }
+ }
+}
+
+void TestWriteTypedColumnIndex(schema::NodePtr node,
+ const std::vector<EncodedStatistics>&
page_stats,
+ BoundaryOrder::type boundary_order, bool
has_null_counts) {
+ auto descr = std::make_unique<ColumnDescriptor>(node,
/*max_definition_level=*/1, 0);
+
+ auto builder = ColumnIndexBuilder::Make(descr.get());
+ for (const auto& stats : page_stats) {
+ builder->AddPage(stats);
+ }
+ ASSERT_NO_THROW(builder->Finish());
+
+ std::vector<std::unique_ptr<ColumnIndex>> column_indexes;
+ /// 1st element is the column index just built.
+ column_indexes.emplace_back(builder->Build());
+ /// 2nd element is the column index restored by serialize-then-deserialize
round trip.
+ auto sink = CreateOutputStream();
+ builder->WriteTo(sink.get());
+ PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+ column_indexes.emplace_back(ColumnIndex::Make(*descr, buffer->data(),
+
static_cast<uint32_t>(buffer->size()),
+ default_reader_properties()));
+
+ /// Verify the data of the column index.
+ for (const auto& column_index : column_indexes) {
+ ASSERT_EQ(boundary_order, column_index->boundary_order());
+ ASSERT_EQ(has_null_counts, column_index->has_null_counts());
+ const size_t num_pages = column_index->null_pages().size();
+ for (size_t i = 0; i < num_pages; ++i) {
+ ASSERT_EQ(page_stats[i].all_null_value, column_index->null_pages()[i]);
+ ASSERT_EQ(page_stats[i].min(), column_index->encoded_min_values()[i]);
+ ASSERT_EQ(page_stats[i].max(), column_index->encoded_max_values()[i]);
+ if (has_null_counts) {
+ ASSERT_EQ(page_stats[i].null_count, column_index->null_counts()[i]);
+ }
+ }
+ }
+}
+
+TEST(PageIndex, WriteInt32ColumnIndex) {
+ auto encode = [=](int32_t value) {
+ return std::string(reinterpret_cast<const char*>(&value), sizeof(int32_t));
+ };
+
+ // Integer values in the ascending order.
+ std::vector<EncodedStatistics> page_stats(3);
+ page_stats.at(0).set_null_count(1).set_min(encode(1)).set_max(encode(2));
+ page_stats.at(1).set_null_count(2).set_min(encode(2)).set_max(encode(3));
+ page_stats.at(2).set_null_count(3).set_min(encode(3)).set_max(encode(4));
+
+ TestWriteTypedColumnIndex(schema::Int32("c1"), page_stats,
BoundaryOrder::Ascending,
+ /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteInt64ColumnIndex) {
+ auto encode = [=](int64_t value) {
+ return std::string(reinterpret_cast<const char*>(&value), sizeof(int64_t));
+ };
+
+ // Integer values in the descending order.
+ std::vector<EncodedStatistics> page_stats(3);
+ page_stats.at(0).set_null_count(4).set_min(encode(-1)).set_max(encode(-2));
+ page_stats.at(1).set_null_count(0).set_min(encode(-2)).set_max(encode(-3));
+ page_stats.at(2).set_null_count(4).set_min(encode(-3)).set_max(encode(-4));
+
+ TestWriteTypedColumnIndex(schema::Int64("c1"), page_stats,
BoundaryOrder::Descending,
+ /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteFloatColumnIndex) {
+ auto encode = [=](float value) {
+ return std::string(reinterpret_cast<const char*>(&value), sizeof(float));
+ };
+
+ // Float values with no specific order.
+ std::vector<EncodedStatistics> page_stats(3);
+
page_stats.at(0).set_null_count(0).set_min(encode(2.2F)).set_max(encode(4.4F));
+
page_stats.at(1).set_null_count(0).set_min(encode(1.1F)).set_max(encode(5.5F));
+
page_stats.at(2).set_null_count(0).set_min(encode(3.3F)).set_max(encode(6.6F));
+
+ TestWriteTypedColumnIndex(schema::Float("c1"), page_stats,
BoundaryOrder::Unordered,
+ /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteDoubleColumnIndex) {
+ auto encode = [=](double value) {
+ return std::string(reinterpret_cast<const char*>(&value), sizeof(double));
+ };
+
+ // Double values with no specific order and without null count.
+ std::vector<EncodedStatistics> page_stats(3);
+ page_stats.at(0).set_min(encode(1.2)).set_max(encode(4.4));
+ page_stats.at(1).set_min(encode(2.2)).set_max(encode(5.5));
+ page_stats.at(2).set_min(encode(3.3)).set_max(encode(-6.6));
+
+ TestWriteTypedColumnIndex(schema::Double("c1"), page_stats,
BoundaryOrder::Unordered,
+ /*has_null_counts=*/false);
+}
+
+TEST(PageIndex, WriteByteArrayColumnIndex) {
+ // Byte array values with identical min/max.
+ std::vector<EncodedStatistics> page_stats(3);
+ page_stats.at(0).set_min("bar").set_max("foo");
+ page_stats.at(1).set_min("bar").set_max("foo");
+ page_stats.at(2).set_min("bar").set_max("foo");
+
+ TestWriteTypedColumnIndex(schema::ByteArray("c1"), page_stats,
BoundaryOrder::Ascending,
+ /*has_null_counts=*/false);
+}
+
+TEST(PageIndex, WriteFLBAColumnIndex) {
+ // FLBA values in the ascending order with some null pages
+ std::vector<EncodedStatistics> page_stats(5);
+ page_stats.at(0).set_min("abc").set_max("ABC");
+ page_stats.at(1).all_null_value = true;
+ page_stats.at(2).set_min("foo").set_max("FOO");
+ page_stats.at(3).all_null_value = true;
+ page_stats.at(4).set_min("xyz").set_max("XYZ");
+
+ auto node =
+ schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL,
Type::FIXED_LEN_BYTE_ARRAY,
+ ConvertedType::NONE, /*length=*/3);
+ TestWriteTypedColumnIndex(std::move(node), page_stats,
BoundaryOrder::Ascending,
+ /*has_null_counts=*/false);
+}
+
+TEST(PageIndex, WriteColumnIndexWithAllNullPages) {
+ // All values are null.
+ std::vector<EncodedStatistics> page_stats(3);
+ page_stats.at(0).set_null_count(100).all_null_value = true;
+ page_stats.at(1).set_null_count(100).all_null_value = true;
+ page_stats.at(2).set_null_count(100).all_null_value = true;
+
+ TestWriteTypedColumnIndex(schema::Int32("c1"), page_stats,
BoundaryOrder::Unordered,
+ /*has_null_counts=*/true);
+}
+
+TEST(PageIndex, WriteColumnIndexWithInvalidNullCounts) {
+ auto encode = [=](int32_t value) {
+ return std::string(reinterpret_cast<const char*>(&value), sizeof(int32_t));
+ };
+
+ // Some pages do not provide null_count
+ std::vector<EncodedStatistics> page_stats(3);
+ page_stats.at(0).set_min(encode(1)).set_max(encode(2)).set_null_count(0);
+ page_stats.at(1).set_min(encode(1)).set_max(encode(3));
+ page_stats.at(2).set_min(encode(2)).set_max(encode(3)).set_null_count(0);
+
+ TestWriteTypedColumnIndex(schema::Int32("c1"), page_stats,
BoundaryOrder::Ascending,
+ /*has_null_counts=*/false);
+}
+
+TEST(PageIndex, WriteColumnIndexWithCorruptedStats) {
+ auto encode = [=](int32_t value) {
+ return std::string(reinterpret_cast<const char*>(&value), sizeof(int32_t));
+ };
+
+ // 2nd page does not set anything
+ std::vector<EncodedStatistics> page_stats(3);
+ page_stats.at(0).set_min(encode(1)).set_max(encode(2));
+ page_stats.at(2).set_min(encode(3)).set_max(encode(4));
+
+ ColumnDescriptor descr(schema::Int32("c1"), /*max_definition_level=*/1, 0);
+ auto builder = ColumnIndexBuilder::Make(&descr);
+ for (const auto& stats : page_stats) {
+ builder->AddPage(stats);
+ }
+ ASSERT_NO_THROW(builder->Finish());
+ ASSERT_EQ(nullptr, builder->Build());
+
+ auto sink = CreateOutputStream();
+ builder->WriteTo(sink.get());
+ PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+ EXPECT_EQ(0, buffer->size());
+}
+
+TEST(PageIndex, TestPageIndexBuilderWithZeroRowGroup) {
+ schema::NodeVector fields = {schema::Int32("c1"), schema::ByteArray("c2")};
+ schema::NodePtr root = schema::GroupNode::Make("schema",
Repetition::REPEATED, fields);
+ SchemaDescriptor schema;
+ schema.Init(root);
+
+ auto builder = PageIndexBuilder::Make(&schema);
+
+ // AppendRowGroup() is not called and expect throw.
+ ASSERT_THROW(builder->GetColumnIndexBuilder(0), ParquetException);
+ ASSERT_THROW(builder->GetOffsetIndexBuilder(0), ParquetException);
+
+ // Finish the builder without calling AppendRowGroup().
+ ASSERT_NO_THROW(builder->Finish());
+
+ // Verify WriteTo does not write anything.
+ auto sink = CreateOutputStream();
+ PageIndexLocation location;
+ builder->WriteTo(sink.get(), &location);
+ PARQUET_ASSIGN_OR_THROW(auto buffer, sink->Finish());
+ ASSERT_EQ(0, buffer->size());
+ ASSERT_TRUE(location.column_index_location.empty());
+ ASSERT_TRUE(location.offset_index_location.empty());
+}
+
+class PageIndexBuilderTest : public ::testing::Test {
+ public:
+ void WritePageIndexes(int num_row_groups, int num_columns,
+ const std::vector<std::vector<EncodedStatistics>>&
page_stats,
+ const std::vector<std::vector<PageLocation>>&
page_locations,
+ int final_position) {
+ auto builder = PageIndexBuilder::Make(&schema_);
+ for (int row_group = 0; row_group < num_row_groups; ++row_group) {
+ ASSERT_NO_THROW(builder->AppendRowGroup());
+
+ for (int column = 0; column < num_columns; ++column) {
+ if (static_cast<size_t>(column) < page_stats[row_group].size()) {
+ auto column_index_builder = builder->GetColumnIndexBuilder(column);
+
ASSERT_NO_THROW(column_index_builder->AddPage(page_stats[row_group][column]));
+ ASSERT_NO_THROW(column_index_builder->Finish());
+ }
+
+ if (static_cast<size_t>(column) < page_locations[row_group].size()) {
+ auto offset_index_builder = builder->GetOffsetIndexBuilder(column);
+ ASSERT_NO_THROW(
+
offset_index_builder->AddPage(page_locations[row_group][column]));
+ ASSERT_NO_THROW(offset_index_builder->Finish(final_position));
+ }
+ }
+ }
+ ASSERT_NO_THROW(builder->Finish());
+
+ auto sink = CreateOutputStream();
+ builder->WriteTo(sink.get(), &page_index_location_);
+ PARQUET_ASSIGN_OR_THROW(buffer_, sink->Finish());
+
+ ASSERT_EQ(static_cast<size_t>(num_row_groups),
+ page_index_location_.column_index_location.size());
+ ASSERT_EQ(static_cast<size_t>(num_row_groups),
+ page_index_location_.offset_index_location.size());
+ for (int row_group = 0; row_group < num_row_groups; ++row_group) {
+ ASSERT_EQ(static_cast<size_t>(num_columns),
+ page_index_location_.column_index_location[row_group].size());
+ ASSERT_EQ(static_cast<size_t>(num_columns),
+ page_index_location_.offset_index_location[row_group].size());
+ }
+ }
+
+ void CheckColumnIndex(int row_group, int column, const EncodedStatistics&
stats) {
+ auto column_index = ReadColumnIndex(row_group, column);
+ ASSERT_NE(nullptr, column_index);
+ ASSERT_EQ(size_t{1}, column_index->null_pages().size());
+ ASSERT_EQ(stats.all_null_value, column_index->null_pages()[0]);
+ ASSERT_EQ(stats.min(), column_index->encoded_min_values()[0]);
+ ASSERT_EQ(stats.max(), column_index->encoded_max_values()[0]);
+ ASSERT_EQ(stats.has_null_count, column_index->has_null_counts());
+ if (stats.has_null_count) {
+ ASSERT_EQ(stats.null_count, column_index->null_counts()[0]);
+ }
+ }
+
+ void CheckOffsetIndex(int row_group, int column, const PageLocation&
expected_location,
+ int64_t final_location) {
+ auto offset_index = ReadOffsetIndex(row_group, column);
+ ASSERT_NE(nullptr, offset_index);
+ ASSERT_EQ(size_t{1}, offset_index->page_locations().size());
+ const auto& location = offset_index->page_locations()[0];
+ ASSERT_EQ(expected_location.offset + final_location, location.offset);
+ ASSERT_EQ(expected_location.compressed_page_size,
location.compressed_page_size);
+ ASSERT_EQ(expected_location.first_row_index, location.first_row_index);
+ }
+
+ protected:
+ std::unique_ptr<ColumnIndex> ReadColumnIndex(int row_group, int column) {
+ auto location =
page_index_location_.column_index_location[row_group][column];
+ if (!location.has_value()) {
+ return nullptr;
+ }
+ auto properties = default_reader_properties();
+ return ColumnIndex::Make(*schema_.Column(column), buffer_->data() +
location->offset,
+ static_cast<uint32_t>(location->length),
properties);
+ }
+
+ std::unique_ptr<OffsetIndex> ReadOffsetIndex(int row_group, int column) {
+ auto location =
page_index_location_.offset_index_location[row_group][column];
+ if (!location.has_value()) {
+ return nullptr;
+ }
+ auto properties = default_reader_properties();
+ return OffsetIndex::Make(buffer_->data() + location->offset,
+ static_cast<uint32_t>(location->length),
properties);
+ }
+
+ SchemaDescriptor schema_;
+ std::shared_ptr<Buffer> buffer_;
+ PageIndexLocation page_index_location_;
+};
+
+TEST_F(PageIndexBuilderTest, SingleRowGroup) {
+ schema::NodePtr root = schema::GroupNode::Make(
+ "schema", Repetition::REPEATED,
+ {schema::ByteArray("c1"), schema::ByteArray("c2"),
schema::ByteArray("c3")});
+ schema_.Init(root);
+
+ // Prepare page stats and page locations for single row group.
+ // Note that the 3rd column does not have any stats and its page index is
disabled.
+ const int num_row_groups = 1;
+ const int num_columns = 3;
+ const std::vector<std::vector<EncodedStatistics>> page_stats = {
+ /*row_group_id=0*/
+ {/*column_id=0*/
EncodedStatistics().set_null_count(0).set_min("a").set_max("b"),
+ /*column_id=1*/
EncodedStatistics().set_null_count(0).set_min("A").set_max("B")}};
+ const std::vector<std::vector<PageLocation>> page_locations = {
+ /*row_group_id=0*/
+ {/*column_id=0*/ {/*offset=*/128, /*compressed_page_size=*/512,
+ /*first_row_index=*/0},
+ /*column_id=1*/ {/*offset=*/1024, /*compressed_page_size=*/512,
+ /*first_row_index=*/0}}};
+ const int64_t final_position = 200;
+
+ WritePageIndexes(num_row_groups, num_columns, page_stats, page_locations,
+ final_position);
+
+ // Verify that first two columns have good page indexes.
+ for (int column = 0; column < 2; ++column) {
+ CheckColumnIndex(/*row_group=*/0, column, page_stats[0][column]);
+ CheckOffsetIndex(/*row_group=*/0, column, page_locations[0][column],
final_position);
+ }
+
+ // Verify the 3rd column does not have page indexes.
+ ASSERT_EQ(nullptr, ReadColumnIndex(/*row_group=*/0, /*column=*/2));
+ ASSERT_EQ(nullptr, ReadOffsetIndex(/*row_group=*/0, /*column=*/2));
+}
+
+TEST_F(PageIndexBuilderTest, TwoRowGroups) {
+ schema::NodePtr root = schema::GroupNode::Make(
+ "schema", Repetition::REPEATED, {schema::ByteArray("c1"),
schema::ByteArray("c2")});
+ schema_.Init(root);
+
+ // Prepare page stats and page locations for two row groups.
+ // Note that the 2nd column in the 2nd row group has corrupted stats.
+ const int num_row_groups = 2;
+ const int num_columns = 2;
+ const std::vector<std::vector<EncodedStatistics>> page_stats = {
+ /*row_group_id=0*/
+ {/*column_id=0*/ EncodedStatistics().set_min("a").set_max("b"),
+ /*column_id=1*/
EncodedStatistics().set_null_count(0).set_min("A").set_max("B")},
+ /*row_group_id=1*/
+ {/*column_id=0*/ EncodedStatistics() /* corrupted stats */,
+ /*column_id=1*/
EncodedStatistics().set_null_count(0).set_min("bar").set_max(
+ "foo")}};
+ const std::vector<std::vector<PageLocation>> page_locations = {
+ /*row_group_id=0*/
+ {/*column_id=0*/ {/*offset=*/128, /*compressed_page_size=*/512,
+ /*first_row_index=*/0},
+ /*column_id=1*/ {/*offset=*/1024, /*compressed_page_size=*/512,
+ /*first_row_index=*/0}},
+ /*row_group_id=0*/
+ {/*column_id=0*/ {/*offset=*/128, /*compressed_page_size=*/512,
+ /*first_row_index=*/0},
+ /*column_id=1*/ {/*offset=*/1024, /*compressed_page_size=*/512,
+ /*first_row_index=*/0}}};
+ const int64_t final_position = 200;
+
+ WritePageIndexes(num_row_groups, num_columns, page_stats, page_locations,
+ final_position);
+
+ // Verify that all columns have good column indexes except the 2nd column in
the 2nd row
+ // group.
+ CheckColumnIndex(/*row_group=*/0, /*column=*/0, page_stats[0][0]);
+ CheckColumnIndex(/*row_group=*/0, /*column=*/1, page_stats[0][1]);
+ CheckColumnIndex(/*row_group=*/1, /*column=*/1, page_stats[1][1]);
+ ASSERT_EQ(nullptr, ReadColumnIndex(/*row_group=*/1, /*column=*/0));
+
+ // Verify that two columns have good offset indexes.
+ CheckOffsetIndex(/*row_group=*/0, /*column=*/0, page_locations[0][0],
final_position);
+ CheckOffsetIndex(/*row_group=*/0, /*column=*/1, page_locations[0][1],
final_position);
+ CheckOffsetIndex(/*row_group=*/1, /*column=*/0, page_locations[1][0],
final_position);
+ CheckOffsetIndex(/*row_group=*/1, /*column=*/1, page_locations[1][1],
final_position);
+}
+
} // namespace parquet
diff --git a/cpp/src/parquet/properties.h b/cpp/src/parquet/properties.h
index 572194f4ee..d892788960 100644
--- a/cpp/src/parquet/properties.h
+++ b/cpp/src/parquet/properties.h
@@ -208,7 +208,8 @@ class PARQUET_EXPORT WriterProperties {
data_page_version_(ParquetDataPageVersion::V1),
created_by_(DEFAULT_CREATED_BY),
store_decimal_as_integer_(false),
- page_checksum_enabled_(false) {}
+ page_checksum_enabled_(false),
+ write_page_index_(false) {}
virtual ~Builder() {}
/// Specify the memory pool for the writer. Default default_memory_pool.
@@ -501,6 +502,28 @@ class PARQUET_EXPORT WriterProperties {
return this;
}
+ /// Enable writing page index.
+ ///
+ /// Page index contains statistics for data pages and can be used to skip
pages
+ /// when scanning data in ordered and unordered columns.
+ ///
+ /// Please check the link below for more details:
+ /// https://github.com/apache/parquet-format/blob/master/PageIndex.md
+ ///
+ /// Default disabled.
+ Builder* enable_write_page_index() {
+ write_page_index_ = true;
+ return this;
+ }
+
+ /// Disable writing page index.
+ ///
+ /// Default disabled.
+ Builder* disable_write_page_index() {
+ write_page_index_ = false;
+ return this;
+ }
+
/// \brief Build the WriterProperties with the builder parameters.
/// \return The WriterProperties defined by the builder.
std::shared_ptr<WriterProperties> build() {
@@ -526,7 +549,8 @@ class PARQUET_EXPORT WriterProperties {
pool_, dictionary_pagesize_limit_, write_batch_size_,
max_row_group_length_,
pagesize_, version_, created_by_, page_checksum_enabled_,
std::move(file_encryption_properties_), default_column_properties_,
- column_properties, data_page_version_, store_decimal_as_integer_));
+ column_properties, data_page_version_, store_decimal_as_integer_,
+ write_page_index_));
}
private:
@@ -540,6 +564,7 @@ class PARQUET_EXPORT WriterProperties {
std::string created_by_;
bool store_decimal_as_integer_;
bool page_checksum_enabled_;
+ bool write_page_index_;
std::shared_ptr<FileEncryptionProperties> file_encryption_properties_;
@@ -574,6 +599,8 @@ class PARQUET_EXPORT WriterProperties {
inline bool page_checksum_enabled() const { return page_checksum_enabled_; }
+ inline bool write_page_index() const { return write_page_index_; }
+
inline Encoding::type dictionary_index_encoding() const {
if (parquet_version_ == ParquetVersion::PARQUET_1_0) {
return Encoding::PLAIN_DICTIONARY;
@@ -642,7 +669,8 @@ class PARQUET_EXPORT WriterProperties {
std::shared_ptr<FileEncryptionProperties> file_encryption_properties,
const ColumnProperties& default_column_properties,
const std::unordered_map<std::string, ColumnProperties>&
column_properties,
- ParquetDataPageVersion data_page_version, bool
store_short_decimal_as_integer)
+ ParquetDataPageVersion data_page_version, bool
store_short_decimal_as_integer,
+ bool write_page_index)
: pool_(pool),
dictionary_pagesize_limit_(dictionary_pagesize_limit),
write_batch_size_(write_batch_size),
@@ -653,6 +681,7 @@ class PARQUET_EXPORT WriterProperties {
parquet_created_by_(created_by),
store_decimal_as_integer_(store_short_decimal_as_integer),
page_checksum_enabled_(page_write_checksum_enabled),
+ write_page_index_(write_page_index),
file_encryption_properties_(file_encryption_properties),
default_column_properties_(default_column_properties),
column_properties_(column_properties) {}
@@ -667,6 +696,7 @@ class PARQUET_EXPORT WriterProperties {
std::string parquet_created_by_;
bool store_decimal_as_integer_;
bool page_checksum_enabled_;
+ bool write_page_index_;
std::shared_ptr<FileEncryptionProperties> file_encryption_properties_;
diff --git a/cpp/src/parquet/statistics.cc b/cpp/src/parquet/statistics.cc
index d0130605cd..aa6df7e32a 100644
--- a/cpp/src/parquet/statistics.cc
+++ b/cpp/src/parquet/statistics.cc
@@ -610,6 +610,8 @@ class TypedStatisticsImpl : public TypedStatistics<DType> {
if (HasNullCount()) {
s.set_null_count(this->null_count());
}
+ // num_values_ is reliable and it means number of non-null values.
+ s.all_null_value = num_values_ == 0;
return s;
}
@@ -625,7 +627,7 @@ class TypedStatisticsImpl : public TypedStatistics<DType> {
T min_;
T max_;
::arrow::MemoryPool* pool_;
- int64_t num_values_ = 0;
+ int64_t num_values_ = 0; // # of non-null values.
EncodedStatistics statistics_;
std::shared_ptr<TypedComparator<DType>> comparator_;
std::shared_ptr<ResizableBuffer> min_buffer_, max_buffer_;
diff --git a/cpp/src/parquet/statistics.h b/cpp/src/parquet/statistics.h
index 71d9b662ba..3f168a938f 100644
--- a/cpp/src/parquet/statistics.h
+++ b/cpp/src/parquet/statistics.h
@@ -137,6 +137,12 @@ class PARQUET_EXPORT EncodedStatistics {
bool has_null_count = false;
bool has_distinct_count = false;
+ // When all values in the statistics are null, it is set to true.
+ // Otherwise, at least one value is not null, or we are not sure at all.
+ // Page index requires this information to decide whether a data page
+ // is a null page or not.
+ bool all_null_value = false;
+
// From parquet-mr
// Don't write stats larger than the max size rather than truncating. The
// rationale is that some engines may use the minimum value in the page as
diff --git a/cpp/src/parquet/thrift_internal.h
b/cpp/src/parquet/thrift_internal.h
index 9cc702dfcd..56e2a67c8a 100644
--- a/cpp/src/parquet/thrift_internal.h
+++ b/cpp/src/parquet/thrift_internal.h
@@ -295,6 +295,18 @@ static inline format::CompressionCodec::type
ToThrift(Compression::type type) {
}
}
+static inline format::BoundaryOrder::type ToThrift(BoundaryOrder::type type) {
+ switch (type) {
+ case BoundaryOrder::Unordered:
+ case BoundaryOrder::Ascending:
+ case BoundaryOrder::Descending:
+ return static_cast<format::BoundaryOrder::type>(type);
+ default:
+ DCHECK(false) << "Cannot reach here";
+ return format::BoundaryOrder::UNORDERED;
+ }
+}
+
static inline format::Statistics ToThrift(const EncodedStatistics& stats) {
format::Statistics statistics;
if (stats.has_min) {