This is an automated email from the ASF dual-hosted git repository.
etseidl pushed a commit to branch gh5854_thrift_remodel
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/gh5854_thrift_remodel by this
push:
new b0cc25486a [thrift-remodel] Write Parquet page indexes (#8427)
b0cc25486a is described below
commit b0cc25486aeee21d886b99d7e677abb0f18ed2bb
Author: Ed Seidl <[email protected]>
AuthorDate: Thu Sep 25 08:57:38 2025 -0700
[thrift-remodel] Write Parquet page indexes (#8427)
# Which issue does this PR close?
**Note: this targets a feature branch, not main**
- Closes #5854.
# Rationale for this change
Continues the remodel by implementing writing of the page index
structures.
# What changes are included in this PR?
This PR removes the old `parquet::file::page_index::Index` enum and
replaces with the new `ColumnIndexMetaData` struct.
# Are these changes tested?
Covered by existing tests
# Are there any user-facing changes?
Yes.
---
parquet/src/arrow/arrow_reader/mod.rs | 2 +-
parquet/src/arrow/arrow_reader/selection.rs | 2 +-
parquet/src/column/writer/mod.rs | 76 +++--
parquet/src/file/metadata/memory.rs | 29 --
parquet/src/file/metadata/mod.rs | 164 +++-------
parquet/src/file/metadata/writer.rs | 96 +++---
parquet/src/file/page_index/column_index.rs | 332 +++++++++++++++-----
parquet/src/file/page_index/index.rs | 455 ----------------------------
parquet/src/file/page_index/index_reader.rs | 28 +-
parquet/src/file/page_index/mod.rs | 1 -
parquet/src/file/page_index/offset_index.rs | 8 -
parquet/src/file/properties.rs | 2 +-
parquet/src/file/statistics.rs | 9 +-
parquet/src/file/writer.rs | 49 +--
14 files changed, 415 insertions(+), 838 deletions(-)
diff --git a/parquet/src/arrow/arrow_reader/mod.rs
b/parquet/src/arrow/arrow_reader/mod.rs
index 44e0441ac9..ff221656a3 100644
--- a/parquet/src/arrow/arrow_reader/mod.rs
+++ b/parquet/src/arrow/arrow_reader/mod.rs
@@ -261,7 +261,7 @@ impl<T> ArrowReaderBuilder<T> {
/// Skip 1100 (skip the remaining 900 rows in row group 2 and the
first 200 rows in row group 3)
/// ```
///
- /// [`Index`]: crate::file::page_index::index::Index
+ /// [`Index`]: crate::file::page_index::column_index::ColumnIndexMetaData
pub fn with_row_selection(self, selection: RowSelection) -> Self {
Self {
selection: Some(selection),
diff --git a/parquet/src/arrow/arrow_reader/selection.rs
b/parquet/src/arrow/arrow_reader/selection.rs
index 9f06dc184b..21ed97b8bd 100644
--- a/parquet/src/arrow/arrow_reader/selection.rs
+++ b/parquet/src/arrow/arrow_reader/selection.rs
@@ -97,7 +97,7 @@ impl RowSelector {
/// * It contains no [`RowSelector`] of 0 rows
/// * Consecutive [`RowSelector`]s alternate skipping or selecting rows
///
-/// [`PageIndex`]: crate::file::page_index::index::PageIndex
+/// [`PageIndex`]: crate::file::page_index::column_index::ColumnIndexMetaData
#[derive(Debug, Clone, Default, Eq, PartialEq)]
pub struct RowSelection {
selectors: Vec<RowSelector>,
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index acfe0ce3d0..3f516462f2 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -21,7 +21,7 @@ use bytes::Bytes;
use half::f16;
use crate::bloom_filter::Sbbf;
-use crate::file::page_index::index::Index;
+use crate::file::page_index::column_index::ColumnIndexMetaData;
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use std::collections::{BTreeSet, VecDeque};
use std::str;
@@ -192,7 +192,7 @@ pub struct ColumnCloseResult {
/// Optional bloom filter for this column
pub bloom_filter: Option<Sbbf>,
/// Optional column index, for filtering
- pub column_index: Option<Index>,
+ pub column_index: Option<ColumnIndexMetaData>,
/// Optional offset index, identifying page locations
pub offset_index: Option<OffsetIndexMetaData>,
}
@@ -2959,28 +2959,22 @@ mod tests {
assert!(r.column_index.is_some());
let col_idx = r.column_index.unwrap();
let col_idx = match col_idx {
- Index::INT32(col_idx) => col_idx,
+ ColumnIndexMetaData::INT32(col_idx) => col_idx,
_ => panic!("wrong stats type"),
};
// null_pages should be true for page 0
- assert!(col_idx.indexes[0].is_null_page());
+ assert!(col_idx.is_null_page(0));
// min and max should be empty byte arrays
- assert!(col_idx.indexes[0].min().is_none());
- assert!(col_idx.indexes[0].max().is_none());
+ assert!(col_idx.min_value(0).is_none());
+ assert!(col_idx.max_value(0).is_none());
// null_counts should be defined and be 4 for page 0
- assert!(col_idx.indexes[0].null_count().is_some());
- assert_eq!(col_idx.indexes[0].null_count().unwrap(), 4);
+ assert!(col_idx.null_count(0).is_some());
+ assert_eq!(col_idx.null_count(0), Some(4));
// there is no repetition so rep histogram should be absent
- assert!(col_idx.indexes[0].repetition_level_histogram().is_none());
+ assert!(col_idx.repetition_level_histogram(0).is_none());
// definition_level_histogram should be present and should be 0:4, 1:0
- assert!(col_idx.indexes[0].definition_level_histogram().is_some());
- assert_eq!(
- col_idx.indexes[0]
- .definition_level_histogram()
- .unwrap()
- .values(),
- &[4, 0]
- );
+ assert!(col_idx.definition_level_histogram(0).is_some());
+ assert_eq!(col_idx.definition_level_histogram(0).unwrap(), &[4, 0]);
}
#[test]
@@ -3004,15 +2998,15 @@ mod tests {
// column index
let column_index = match column_index {
- Index::INT32(column_index) => column_index,
+ ColumnIndexMetaData::INT32(column_index) => column_index,
_ => panic!("wrong stats type"),
};
- assert_eq!(2, column_index.indexes.len());
+ assert_eq!(2, column_index.num_pages());
assert_eq!(2, offset_index.page_locations.len());
assert_eq!(BoundaryOrder::UNORDERED, column_index.boundary_order);
for idx in 0..2 {
- assert!(!column_index.indexes[idx].is_null_page());
- assert_eq!(0,
*column_index.indexes[idx].null_count.as_ref().unwrap());
+ assert!(!column_index.is_null_page(idx));
+ assert_eq!(0, column_index.null_count(0).unwrap());
}
if let Some(stats) = r.metadata.statistics() {
@@ -3022,8 +3016,8 @@ mod tests {
// first page is [1,2,3,4]
// second page is [-5,2,4,8]
// note that we don't increment here, as this is a non
BinaryArray type.
- assert_eq!(stats.min_opt(), column_index.indexes[1].min());
- assert_eq!(stats.max_opt(), column_index.indexes[1].max());
+ assert_eq!(stats.min_opt(), column_index.min_value(1));
+ assert_eq!(stats.max_opt(), column_index.max_value(1));
} else {
panic!("expecting Statistics::Int32");
}
@@ -3064,25 +3058,25 @@ mod tests {
let offset_index = r.offset_index.unwrap();
let column_index = match column_index {
- Index::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
+ ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) =>
column_index,
_ => panic!("wrong stats type"),
};
assert_eq!(3, r.rows_written);
// column index
- assert_eq!(1, column_index.indexes.len());
+ assert_eq!(1, column_index.num_pages());
assert_eq!(1, offset_index.page_locations.len());
assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
- assert!(!column_index.indexes[0].is_null_page());
- assert_eq!(Some(0), column_index.indexes[0].null_count());
+ assert!(!column_index.is_null_page(0));
+ assert_eq!(Some(0), column_index.null_count(0));
if let Some(stats) = r.metadata.statistics() {
assert_eq!(stats.null_count_opt(), Some(0));
assert_eq!(stats.distinct_count_opt(), None);
if let Statistics::FixedLenByteArray(stats) = stats {
- let column_index_min_value =
column_index.indexes[0].min_bytes().unwrap();
- let column_index_max_value =
column_index.indexes[0].max_bytes().unwrap();
+ let column_index_min_value =
column_index.min_value(0).unwrap();
+ let column_index_max_value =
column_index.max_value(0).unwrap();
// Column index stats are truncated, while the column chunk's
aren't.
assert_ne!(stats.min_bytes_opt().unwrap(),
column_index_min_value);
@@ -3135,25 +3129,25 @@ mod tests {
let offset_index = r.offset_index.unwrap();
let column_index = match column_index {
- Index::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
+ ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) =>
column_index,
_ => panic!("wrong stats type"),
};
assert_eq!(1, r.rows_written);
// column index
- assert_eq!(1, column_index.indexes.len());
+ assert_eq!(1, column_index.num_pages());
assert_eq!(1, offset_index.page_locations.len());
assert_eq!(BoundaryOrder::ASCENDING, column_index.boundary_order);
- assert!(!column_index.indexes[0].is_null_page());
- assert_eq!(Some(0), column_index.indexes[0].null_count());
+ assert!(!column_index.is_null_page(0));
+ assert_eq!(Some(0), column_index.null_count(0));
if let Some(stats) = r.metadata.statistics() {
assert_eq!(stats.null_count_opt(), Some(0));
assert_eq!(stats.distinct_count_opt(), None);
if let Statistics::FixedLenByteArray(_stats) = stats {
- let column_index_min_value =
column_index.indexes[0].min_bytes().unwrap();
- let column_index_max_value =
column_index.indexes[0].max_bytes().unwrap();
+ let column_index_min_value =
column_index.min_value(0).unwrap();
+ let column_index_max_value =
column_index.max_value(0).unwrap();
assert_eq!(column_index_min_value.len(), 1);
assert_eq!(column_index_max_value.len(), 1);
@@ -3190,11 +3184,11 @@ mod tests {
// ensure bytes weren't truncated for column index
let column_index = r.column_index.unwrap();
let column_index = match column_index {
- Index::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
+ ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) =>
column_index,
_ => panic!("wrong stats type"),
};
- let column_index_min_bytes =
column_index.indexes[0].min_bytes().unwrap();
- let column_index_max_bytes =
column_index.indexes[0].min_bytes().unwrap();
+ let column_index_min_bytes = column_index.min_value(0).unwrap();
+ let column_index_max_bytes = column_index.max_value(0).unwrap();
assert_eq!(expected_value, column_index_min_bytes);
assert_eq!(expected_value, column_index_max_bytes);
@@ -3233,11 +3227,11 @@ mod tests {
// ensure bytes weren't truncated for column index
let column_index = r.column_index.unwrap();
let column_index = match column_index {
- Index::FIXED_LEN_BYTE_ARRAY(column_index) => column_index,
+ ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) =>
column_index,
_ => panic!("wrong stats type"),
};
- let column_index_min_bytes =
column_index.indexes[0].min_bytes().unwrap();
- let column_index_max_bytes =
column_index.indexes[0].min_bytes().unwrap();
+ let column_index_min_bytes = column_index.min_value(0).unwrap();
+ let column_index_max_bytes = column_index.max_value(0).unwrap();
assert_eq!(expected_value, column_index_min_bytes);
assert_eq!(expected_value, column_index_max_bytes);
diff --git a/parquet/src/file/metadata/memory.rs
b/parquet/src/file/metadata/memory.rs
index 69eee3c299..19122a1b55 100644
--- a/parquet/src/file/metadata/memory.rs
+++ b/parquet/src/file/metadata/memory.rs
@@ -27,7 +27,6 @@ use crate::file::page_encoding_stats::PageEncodingStats;
use crate::file::page_index::column_index::{
ByteArrayColumnIndex, ColumnIndex, ColumnIndexMetaData,
PrimitiveColumnIndex,
};
-use crate::file::page_index::index::{Index, NativeIndex, PageIndex};
use crate::file::page_index::offset_index::{OffsetIndexMetaData, PageLocation};
use crate::file::statistics::{Statistics, ValueStatistics};
use std::sync::Arc;
@@ -199,34 +198,6 @@ impl HeapSize for ByteArrayColumnIndex {
}
}
-impl HeapSize for Index {
- fn heap_size(&self) -> usize {
- match self {
- Index::NONE => 0,
- Index::BOOLEAN(native_index) => native_index.heap_size(),
- Index::INT32(native_index) => native_index.heap_size(),
- Index::INT64(native_index) => native_index.heap_size(),
- Index::INT96(native_index) => native_index.heap_size(),
- Index::FLOAT(native_index) => native_index.heap_size(),
- Index::DOUBLE(native_index) => native_index.heap_size(),
- Index::BYTE_ARRAY(native_index) => native_index.heap_size(),
- Index::FIXED_LEN_BYTE_ARRAY(native_index) =>
native_index.heap_size(),
- }
- }
-}
-
-impl<T: ParquetValueType> HeapSize for NativeIndex<T> {
- fn heap_size(&self) -> usize {
- self.indexes.heap_size() + self.boundary_order.heap_size()
- }
-}
-
-impl<T: ParquetValueType> HeapSize for PageIndex<T> {
- fn heap_size(&self) -> usize {
- self.min.heap_size() + self.max.heap_size() +
self.null_count.heap_size()
- }
-}
-
impl<T: ParquetValueType> HeapSize for ValueStatistics<T> {
fn heap_size(&self) -> usize {
self.min_opt().map(T::heap_size).unwrap_or(0)
diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs
index 95e9a48b46..caf001e5fa 100644
--- a/parquet/src/file/metadata/mod.rs
+++ b/parquet/src/file/metadata/mod.rs
@@ -101,19 +101,18 @@ use crate::encryption::decrypt::FileDecryptor;
#[cfg(feature = "encryption")]
use crate::file::column_crypto_metadata::{self, ColumnCryptoMetaData};
pub(crate) use crate::file::metadata::memory::HeapSize;
+use crate::file::page_index::column_index::{ByteArrayColumnIndex,
PrimitiveColumnIndex};
+use crate::file::statistics::{self, Statistics};
use crate::file::{
page_encoding_stats::{self, PageEncodingStats},
page_index::{column_index::ColumnIndexMetaData,
offset_index::PageLocation},
};
-use crate::file::{
- page_index::index::PageIndex,
- statistics::{self, Statistics},
-};
use crate::format::ColumnCryptoMetaData as TColumnCryptoMetaData;
use crate::schema::types::{
ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr,
SchemaDescriptor,
Type as SchemaType,
};
+use crate::thrift_struct;
use crate::{
basic::BoundaryOrder,
errors::{ParquetError, Result},
@@ -128,10 +127,6 @@ use crate::{
use crate::{
data_type::private::ParquetValueType,
file::page_index::offset_index::OffsetIndexMetaData,
};
-use crate::{
- file::page_index::index::{Index, NativeIndex},
- thrift_struct,
-};
pub use push_decoder::ParquetMetaDataPushDecoder;
pub use reader::{FooterTail, PageIndexPolicy, ParquetMetaDataReader};
@@ -145,18 +140,18 @@ pub(crate) use writer::ThriftMetadataWriter;
///
/// This structure is an in-memory representation of multiple [`ColumnIndex`]
/// structures in a parquet file footer, as described in the Parquet [PageIndex
-/// documentation]. Each [`Index`] holds statistics about all the pages in a
+/// documentation]. Each [`ColumnIndex`] holds statistics about all the pages
in a
/// particular column chunk.
///
/// `column_index[row_group_number][column_number]` holds the
-/// [`Index`] corresponding to column `column_number` of row group
+/// [`ColumnIndex`] corresponding to column `column_number` of row group
/// `row_group_number`.
///
-/// For example `column_index[2][3]` holds the [`Index`] for the fourth
+/// For example `column_index[2][3]` holds the [`ColumnIndex`] for the fourth
/// column in the third row group of the parquet file.
///
/// [PageIndex documentation]:
https://github.com/apache/parquet-format/blob/master/PageIndex.md
-/// [`ColumnIndex`]: crate::format::ColumnIndex
+/// [`ColumnIndex`]: crate::file::page_index::column_index::ColumnIndexMetaData
pub type ParquetColumnIndex = Vec<Vec<ColumnIndexMetaData>>;
/// [`OffsetIndexMetaData`] for each data page of each row group of each column
@@ -1632,135 +1627,74 @@ impl ColumnIndexBuilder {
/// Build and get the column index
///
/// Note: callers should check [`Self::valid`] before calling this method
- pub fn build(self) -> Result<Index> {
+ pub fn build(self) -> Result<ColumnIndexMetaData> {
Ok(match self.column_type {
Type::BOOLEAN => {
- let (indexes, boundary_order) = self.build_page_index()?;
- Index::BOOLEAN(NativeIndex {
- indexes,
- boundary_order,
- })
+ let index = self.build_page_index()?;
+ ColumnIndexMetaData::BOOLEAN(index)
}
Type::INT32 => {
- let (indexes, boundary_order) = self.build_page_index()?;
- Index::INT32(NativeIndex {
- indexes,
- boundary_order,
- })
+ let index = self.build_page_index()?;
+ ColumnIndexMetaData::INT32(index)
}
Type::INT64 => {
- let (indexes, boundary_order) = self.build_page_index()?;
- Index::INT64(NativeIndex {
- indexes,
- boundary_order,
- })
+ let index = self.build_page_index()?;
+ ColumnIndexMetaData::INT64(index)
}
Type::INT96 => {
- let (indexes, boundary_order) = self.build_page_index()?;
- Index::INT96(NativeIndex {
- indexes,
- boundary_order,
- })
+ let index = self.build_page_index()?;
+ ColumnIndexMetaData::INT96(index)
}
Type::FLOAT => {
- let (indexes, boundary_order) = self.build_page_index()?;
- Index::FLOAT(NativeIndex {
- indexes,
- boundary_order,
- })
+ let index = self.build_page_index()?;
+ ColumnIndexMetaData::FLOAT(index)
}
Type::DOUBLE => {
- let (indexes, boundary_order) = self.build_page_index()?;
- Index::DOUBLE(NativeIndex {
- indexes,
- boundary_order,
- })
+ let index = self.build_page_index()?;
+ ColumnIndexMetaData::DOUBLE(index)
}
Type::BYTE_ARRAY => {
- let (indexes, boundary_order) = self.build_page_index()?;
- Index::BYTE_ARRAY(NativeIndex {
- indexes,
- boundary_order,
- })
+ let index = self.build_byte_array_index()?;
+ ColumnIndexMetaData::BYTE_ARRAY(index)
}
Type::FIXED_LEN_BYTE_ARRAY => {
- let (indexes, boundary_order) = self.build_page_index()?;
- Index::FIXED_LEN_BYTE_ARRAY(NativeIndex {
- indexes,
- boundary_order,
- })
+ let index = self.build_byte_array_index()?;
+ ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index)
}
})
}
- fn build_page_index<T>(self) -> Result<(Vec<PageIndex<T>>, BoundaryOrder)>
+ fn build_page_index<T>(self) -> Result<PrimitiveColumnIndex<T>>
where
T: ParquetValueType,
{
- let len = self.min_values.len();
-
- let null_counts = self
- .null_counts
- .iter()
- .map(|x| Some(*x))
- .collect::<Vec<_>>();
-
- // histograms are a 1D array encoding a 2D num_pages X num_levels
matrix.
- let to_page_histograms = |opt_hist: Option<Vec<i64>>| {
- if let Some(hist) = opt_hist {
- // TODO: should we assert (hist.len() % len) == 0?
- let num_levels = hist.len() / len;
- let mut res = Vec::with_capacity(len);
- for i in 0..len {
- let page_idx = i * num_levels;
- let page_hist = hist[page_idx..page_idx +
num_levels].to_vec();
- res.push(Some(LevelHistogram::from(page_hist)));
- }
- res
- } else {
- vec![None; len]
- }
- };
+ let min_values: Vec<&[u8]> = self.min_values.iter().map(|v|
v.as_slice()).collect();
+ let max_values: Vec<&[u8]> = self.max_values.iter().map(|v|
v.as_slice()).collect();
- let rep_hists: Vec<Option<LevelHistogram>> =
- to_page_histograms(self.repetition_level_histograms);
- let def_hists: Vec<Option<LevelHistogram>> =
- to_page_histograms(self.definition_level_histograms);
+ PrimitiveColumnIndex::try_new(
+ self.null_pages,
+ self.boundary_order,
+ Some(self.null_counts),
+ self.repetition_level_histograms,
+ self.definition_level_histograms,
+ min_values,
+ max_values,
+ )
+ }
- let indexes = self
- .min_values
- .iter()
- .zip(self.max_values.iter())
- .zip(self.null_pages.into_iter())
- .zip(null_counts.into_iter())
- .zip(rep_hists.into_iter())
- .zip(def_hists.into_iter())
- .map(
- |(
- ((((min, max), is_null), null_count),
repetition_level_histogram),
- definition_level_histogram,
- )| {
- let (min, max) = if is_null {
- (None, None)
- } else {
- (
- Some(T::try_from_le_slice(min)?),
- Some(T::try_from_le_slice(max)?),
- )
- };
- Ok(PageIndex {
- min,
- max,
- null_count,
- repetition_level_histogram,
- definition_level_histogram,
- })
- },
- )
- .collect::<Result<Vec<_>, ParquetError>>()?;
+ fn build_byte_array_index(self) -> Result<ByteArrayColumnIndex> {
+ let min_values: Vec<&[u8]> = self.min_values.iter().map(|v|
v.as_slice()).collect();
+ let max_values: Vec<&[u8]> = self.max_values.iter().map(|v|
v.as_slice()).collect();
- let boundary_order = self.boundary_order;
- Ok((indexes, boundary_order))
+ ByteArrayColumnIndex::try_new(
+ self.null_pages,
+ self.boundary_order,
+ Some(self.null_counts),
+ self.repetition_level_histograms,
+ self.definition_level_histograms,
+ min_values,
+ max_values,
+ )
}
}
diff --git a/parquet/src/file/metadata/writer.rs
b/parquet/src/file/metadata/writer.rs
index 404bcf5dba..a09a703ade 100644
--- a/parquet/src/file/metadata/writer.rs
+++ b/parquet/src/file/metadata/writer.rs
@@ -24,8 +24,6 @@ use crate::encryption::{
};
#[cfg(feature = "encryption")]
use crate::errors::ParquetError;
-use crate::file::metadata::{KeyValue, ParquetMetaData};
-use crate::file::writer::{get_file_magic, TrackedWrite};
use crate::format::EncryptionAlgorithm;
#[cfg(feature = "encryption")]
use crate::format::{AesGcmV1, ColumnCryptoMetaData};
@@ -33,6 +31,17 @@ use crate::schema::types;
use crate::schema::types::{SchemaDescPtr, SchemaDescriptor, TypePtr};
use crate::thrift::TSerializable;
use crate::{errors::Result,
file::page_index::column_index::ColumnIndexMetaData};
+use crate::{
+ file::writer::{get_file_magic, TrackedWrite},
+ parquet_thrift::WriteThrift,
+};
+use crate::{
+ file::{
+ metadata::{KeyValue, ParquetMetaData},
+ page_index::offset_index::OffsetIndexMetaData,
+ },
+ parquet_thrift::ThriftCompactOutputProtocol,
+};
use std::io::Write;
use std::sync::Arc;
use thrift::protocol::TCompactOutputProtocol;
@@ -45,8 +54,8 @@ pub(crate) struct ThriftMetadataWriter<'a, W: Write> {
schema: &'a TypePtr,
schema_descr: &'a SchemaDescPtr,
row_groups: Vec<crate::format::RowGroup>,
- column_indexes: Option<&'a [Vec<Option<crate::format::ColumnIndex>>]>,
- offset_indexes: Option<&'a [Vec<Option<crate::format::OffsetIndex>>]>,
+ column_indexes: Option<&'a [Vec<Option<ColumnIndexMetaData>>]>,
+ offset_indexes: Option<&'a [Vec<Option<OffsetIndexMetaData>>]>,
key_value_metadata: Option<Vec<KeyValue>>,
created_by: Option<String>,
object_writer: MetadataObjectWriter,
@@ -61,7 +70,7 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
/// of the serialized offset indexes.
fn write_offset_indexes(
&mut self,
- offset_indexes: &[Vec<Option<crate::format::OffsetIndex>>],
+ offset_indexes: &[Vec<Option<OffsetIndexMetaData>>],
) -> Result<()> {
// iter row group
// iter each column
@@ -94,7 +103,7 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
/// of the serialized column indexes.
fn write_column_indexes(
&mut self,
- column_indexes: &[Vec<Option<crate::format::ColumnIndex>>],
+ column_indexes: &[Vec<Option<ColumnIndexMetaData>>],
) -> Result<()> {
// iter row group
// iter each column
@@ -214,7 +223,7 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
pub fn with_column_indexes(
mut self,
- column_indexes: &'a [Vec<Option<crate::format::ColumnIndex>>],
+ column_indexes: &'a [Vec<Option<ColumnIndexMetaData>>],
) -> Self {
self.column_indexes = Some(column_indexes);
self
@@ -222,7 +231,7 @@ impl<'a, W: Write> ThriftMetadataWriter<'a, W> {
pub fn with_offset_indexes(
mut self,
- offset_indexes: &'a [Vec<Option<crate::format::OffsetIndex>>],
+ offset_indexes: &'a [Vec<Option<OffsetIndexMetaData>>],
) -> Self {
self.offset_indexes = Some(offset_indexes);
self
@@ -382,40 +391,14 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
Ok(())
}
- fn convert_column_indexes(&self) ->
Vec<Vec<Option<crate::format::ColumnIndex>>> {
+ fn convert_column_indexes(&self) -> Vec<Vec<Option<ColumnIndexMetaData>>> {
if let Some(row_group_column_indexes) = self.metadata.column_index() {
(0..self.metadata.row_groups().len())
.map(|rg_idx| {
let column_indexes = &row_group_column_indexes[rg_idx];
column_indexes
.iter()
- .map(|column_index| match column_index {
- ColumnIndexMetaData::NONE => None,
- ColumnIndexMetaData::BOOLEAN(column_index) => {
- Some(column_index.to_thrift())
- }
- ColumnIndexMetaData::BYTE_ARRAY(column_index) => {
- Some(column_index.to_thrift())
- }
- ColumnIndexMetaData::DOUBLE(column_index) => {
- Some(column_index.to_thrift())
- }
-
ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(column_index) => {
- Some(column_index.to_thrift())
- }
- ColumnIndexMetaData::FLOAT(column_index) => {
- Some(column_index.to_thrift())
- }
- ColumnIndexMetaData::INT32(column_index) => {
- Some(column_index.to_thrift())
- }
- ColumnIndexMetaData::INT64(column_index) => {
- Some(column_index.to_thrift())
- }
- ColumnIndexMetaData::INT96(column_index) => {
- Some(column_index.to_thrift())
- }
- })
+ .map(|column_index| Some(column_index.clone()))
.collect()
})
.collect()
@@ -429,14 +412,14 @@ impl<'a, W: Write> ParquetMetaDataWriter<'a, W> {
}
}
- fn convert_offset_index(&self) ->
Vec<Vec<Option<crate::format::OffsetIndex>>> {
+ fn convert_offset_index(&self) -> Vec<Vec<Option<OffsetIndexMetaData>>> {
if let Some(row_group_offset_indexes) = self.metadata.offset_index() {
(0..self.metadata.row_groups().len())
.map(|rg_idx| {
let offset_indexes = &row_group_offset_indexes[rg_idx];
offset_indexes
.iter()
- .map(|offset_index| Some(offset_index.to_thrift()))
+ .map(|offset_index| Some(offset_index.clone()))
.collect()
})
.collect()
@@ -464,6 +447,13 @@ impl MetadataObjectWriter {
object.write_to_out_protocol(&mut protocol)?;
Ok(())
}
+
+ #[inline]
+ fn write_thrift_object(object: &impl WriteThrift, sink: impl Write) ->
Result<()> {
+ let mut protocol = ThriftCompactOutputProtocol::new(sink);
+ object.write_thrift(&mut protocol)?;
+ Ok(())
+ }
}
/// Implementations of [`MetadataObjectWriter`] methods for when encryption is
disabled
@@ -481,25 +471,25 @@ impl MetadataObjectWriter {
/// Write a column [`OffsetIndex`] in Thrift format
fn write_offset_index(
&self,
- offset_index: &crate::format::OffsetIndex,
+ offset_index: &OffsetIndexMetaData,
_column_chunk: &crate::format::ColumnChunk,
_row_group_idx: usize,
_column_idx: usize,
sink: impl Write,
) -> Result<()> {
- Self::write_object(offset_index, sink)
+ Self::write_thrift_object(offset_index, sink)
}
/// Write a column [`ColumnIndex`] in Thrift format
fn write_column_index(
&self,
- column_index: &crate::format::ColumnIndex,
+ column_index: &ColumnIndexMetaData,
_column_chunk: &crate::format::ColumnChunk,
_row_group_idx: usize,
_column_idx: usize,
sink: impl Write,
) -> Result<()> {
- Self::write_object(column_index, sink)
+ Self::write_thrift_object(column_index, sink)
}
/// No-op implementation of row-group metadata encryption
@@ -568,14 +558,14 @@ impl MetadataObjectWriter {
/// [`OffsetIndex`]:
https://github.com/apache/parquet-format/blob/master/PageIndex.md
fn write_offset_index(
&self,
- offset_index: &crate::format::OffsetIndex,
+ offset_index: &OffsetIndexMetaData,
column_chunk: &crate::format::ColumnChunk,
row_group_idx: usize,
column_idx: usize,
sink: impl Write,
) -> Result<()> {
match &self.file_encryptor {
- Some(file_encryptor) => Self::write_object_with_encryption(
+ Some(file_encryptor) => Self::write_thrift_object_with_encryption(
offset_index,
sink,
file_encryptor,
@@ -584,7 +574,7 @@ impl MetadataObjectWriter {
row_group_idx,
column_idx,
),
- None => Self::write_object(offset_index, sink),
+ None => Self::write_thrift_object(offset_index, sink),
}
}
@@ -593,14 +583,14 @@ impl MetadataObjectWriter {
/// [`ColumnIndex`]:
https://github.com/apache/parquet-format/blob/master/PageIndex.md
fn write_column_index(
&self,
- column_index: &crate::format::ColumnIndex,
+ column_index: &ColumnIndexMetaData,
column_chunk: &crate::format::ColumnChunk,
row_group_idx: usize,
column_idx: usize,
sink: impl Write,
) -> Result<()> {
match &self.file_encryptor {
- Some(file_encryptor) => Self::write_object_with_encryption(
+ Some(file_encryptor) => Self::write_thrift_object_with_encryption(
column_index,
sink,
file_encryptor,
@@ -609,7 +599,7 @@ impl MetadataObjectWriter {
row_group_idx,
column_idx,
),
- None => Self::write_object(column_index, sink),
+ None => Self::write_thrift_object(column_index, sink),
}
}
@@ -642,8 +632,8 @@ impl MetadataObjectWriter {
)
}
- fn write_object_with_encryption(
- object: &impl TSerializable,
+ fn write_thrift_object_with_encryption(
+ object: &impl WriteThrift,
mut sink: impl Write,
file_encryptor: &FileEncryptor,
column_metadata: &crate::format::ColumnChunk,
@@ -671,6 +661,8 @@ impl MetadataObjectWriter {
};
if file_encryptor.is_column_encrypted(column_path) {
+ use crate::encryption::encrypt::encrypt_thrift_object;
+
let aad = create_module_aad(
file_encryptor.file_aad(),
module_type,
@@ -679,9 +671,9 @@ impl MetadataObjectWriter {
None,
)?;
let mut encryptor =
file_encryptor.get_column_encryptor(column_path)?;
- encrypt_object(object, &mut encryptor, &mut sink, &aad)
+ encrypt_thrift_object(object, &mut encryptor, &mut sink, &aad)
} else {
- Self::write_object(object, sink)
+ Self::write_thrift_object(object, sink)
}
}
diff --git a/parquet/src/file/page_index/column_index.rs
b/parquet/src/file/page_index/column_index.rs
index 2d43c93b2e..a0893cc9ea 100644
--- a/parquet/src/file/page_index/column_index.rs
+++ b/parquet/src/file/page_index/column_index.rs
@@ -22,7 +22,10 @@
use crate::{
data_type::{ByteArray, FixedLenByteArray},
- errors::Result,
+ errors::{ParquetError, Result},
+ parquet_thrift::{
+ ElementType, FieldType, ThriftCompactOutputProtocol, WriteThrift,
WriteThriftField,
+ },
};
use std::ops::Deref;
@@ -92,18 +95,26 @@ pub struct PrimitiveColumnIndex<T> {
}
impl<T: ParquetValueType> PrimitiveColumnIndex<T> {
- pub(super) fn try_new(index: ThriftColumnIndex) -> Result<Self> {
- let len = index.null_pages.len();
+ pub(crate) fn try_new(
+ null_pages: Vec<bool>,
+ boundary_order: BoundaryOrder,
+ null_counts: Option<Vec<i64>>,
+ repetition_level_histograms: Option<Vec<i64>>,
+ definition_level_histograms: Option<Vec<i64>>,
+ min_bytes: Vec<&[u8]>,
+ max_bytes: Vec<&[u8]>,
+ ) -> Result<Self> {
+ let len = null_pages.len();
let mut min_values = Vec::with_capacity(len);
let mut max_values = Vec::with_capacity(len);
- for (i, is_null) in index.null_pages.iter().enumerate().take(len) {
+ for (i, is_null) in null_pages.iter().enumerate().take(len) {
if !is_null {
- let min = index.min_values[i];
+ let min = min_bytes[i];
min_values.push(T::try_from_le_slice(min)?);
- let max = index.max_values[i];
+ let max = max_bytes[i];
max_values.push(T::try_from_le_slice(max)?);
} else {
// need placeholders
@@ -114,43 +125,26 @@ impl<T: ParquetValueType> PrimitiveColumnIndex<T> {
Ok(Self {
column_index: ColumnIndex {
- null_pages: index.null_pages,
- boundary_order: index.boundary_order,
- null_counts: index.null_counts,
- repetition_level_histograms: index.repetition_level_histograms,
- definition_level_histograms: index.definition_level_histograms,
+ null_pages,
+ boundary_order,
+ null_counts,
+ repetition_level_histograms,
+ definition_level_histograms,
},
min_values,
max_values,
})
}
- pub(crate) fn to_thrift(&self) -> crate::format::ColumnIndex {
- let min_values = self
- .min_values
- .iter()
- .map(|x| x.as_bytes().to_vec())
- .collect::<Vec<_>>();
-
- let max_values = self
- .max_values
- .iter()
- .map(|x| x.as_bytes().to_vec())
- .collect::<Vec<_>>();
-
- let null_counts = self.null_counts.clone();
- let repetition_level_histograms =
self.repetition_level_histograms.clone();
- let definition_level_histograms =
self.definition_level_histograms.clone();
- let null_pages = self.null_pages.clone();
-
- crate::format::ColumnIndex::new(
- null_pages,
- min_values,
- max_values,
- self.boundary_order.into(),
- null_counts,
- repetition_level_histograms,
- definition_level_histograms,
+ pub(super) fn try_from_thrift(index: ThriftColumnIndex) -> Result<Self> {
+ Self::try_new(
+ index.null_pages,
+ index.boundary_order,
+ index.null_counts,
+ index.repetition_level_histograms,
+ index.definition_level_histograms,
+ index.min_values,
+ index.max_values,
)
}
}
@@ -229,6 +223,53 @@ impl<T> Deref for PrimitiveColumnIndex<T> {
}
}
+impl<T: ParquetValueType> WriteThrift for PrimitiveColumnIndex<T> {
+ const ELEMENT_TYPE: ElementType = ElementType::Struct;
+ fn write_thrift<W: std::io::Write>(
+ &self,
+ writer: &mut ThriftCompactOutputProtocol<W>,
+ ) -> Result<()> {
+ self.null_pages.write_thrift_field(writer, 1, 0)?;
+
+ // need to handle min/max manually
+ let len = self.null_pages.len();
+ writer.write_field_begin(FieldType::List, 2, 1)?;
+ writer.write_list_begin(ElementType::Binary, len)?;
+ for i in 0..len {
+ let min = self.min_value(i).map(|m| m.as_bytes()).unwrap_or(&[]);
+ min.write_thrift(writer)?;
+ }
+ writer.write_field_begin(FieldType::List, 3, 2)?;
+ writer.write_list_begin(ElementType::Binary, len)?;
+ for i in 0..len {
+ let max = self.max_value(i).map(|m| m.as_bytes()).unwrap_or(&[]);
+ max.write_thrift(writer)?;
+ }
+ let mut last_field_id = self.boundary_order.write_thrift_field(writer,
4, 3)?;
+ if self.null_counts.is_some() {
+ last_field_id =
+ self.null_counts
+ .as_ref()
+ .unwrap()
+ .write_thrift_field(writer, 5, last_field_id)?;
+ }
+ if self.repetition_level_histograms.is_some() {
+ last_field_id = self
+ .repetition_level_histograms
+ .as_ref()
+ .unwrap()
+ .write_thrift_field(writer, 6, last_field_id)?;
+ }
+ if self.definition_level_histograms.is_some() {
+ self.definition_level_histograms
+ .as_ref()
+ .unwrap()
+ .write_thrift_field(writer, 7, last_field_id)?;
+ }
+ writer.write_struct_end()
+ }
+}
+
/// Column index for byte arrays (fixed length and variable)
#[derive(Debug, Clone, PartialEq)]
pub struct ByteArrayColumnIndex {
@@ -241,11 +282,19 @@ pub struct ByteArrayColumnIndex {
}
impl ByteArrayColumnIndex {
- pub(super) fn try_new(index: ThriftColumnIndex) -> Result<Self> {
- let len = index.null_pages.len();
-
- let min_len = index.min_values.iter().map(|&v| v.len()).sum();
- let max_len = index.max_values.iter().map(|&v| v.len()).sum();
+ pub(crate) fn try_new(
+ null_pages: Vec<bool>,
+ boundary_order: BoundaryOrder,
+ null_counts: Option<Vec<i64>>,
+ repetition_level_histograms: Option<Vec<i64>>,
+ definition_level_histograms: Option<Vec<i64>>,
+ min_values: Vec<&[u8]>,
+ max_values: Vec<&[u8]>,
+ ) -> Result<Self> {
+ let len = null_pages.len();
+
+ let min_len = min_values.iter().map(|&v| v.len()).sum();
+ let max_len = max_values.iter().map(|&v| v.len()).sum();
let mut min_bytes = vec![0u8; min_len];
let mut max_bytes = vec![0u8; max_len];
@@ -255,15 +304,15 @@ impl ByteArrayColumnIndex {
let mut min_pos = 0;
let mut max_pos = 0;
- for (i, is_null) in index.null_pages.iter().enumerate().take(len) {
+ for (i, is_null) in null_pages.iter().enumerate().take(len) {
if !is_null {
- let min = index.min_values[i];
+ let min = min_values[i];
let dst = &mut min_bytes[min_pos..min_pos + min.len()];
dst.copy_from_slice(min);
min_offsets[i] = min_pos;
min_pos += min.len();
- let max = index.max_values[i];
+ let max = max_values[i];
let dst = &mut max_bytes[max_pos..max_pos + max.len()];
dst.copy_from_slice(max);
max_offsets[i] = max_pos;
@@ -279,13 +328,12 @@ impl ByteArrayColumnIndex {
Ok(Self {
column_index: ColumnIndex {
- null_pages: index.null_pages,
- boundary_order: index.boundary_order,
- null_counts: index.null_counts,
- repetition_level_histograms: index.repetition_level_histograms,
- definition_level_histograms: index.definition_level_histograms,
+ null_pages,
+ boundary_order,
+ null_counts,
+ repetition_level_histograms,
+ definition_level_histograms,
},
-
min_bytes,
min_offsets,
max_bytes,
@@ -293,6 +341,18 @@ impl ByteArrayColumnIndex {
})
}
+ pub(super) fn try_from_thrift(index: ThriftColumnIndex) -> Result<Self> {
+ Self::try_new(
+ index.null_pages,
+ index.boundary_order,
+ index.null_counts,
+ index.repetition_level_histograms,
+ index.definition_level_histograms,
+ index.min_values,
+ index.max_values,
+ )
+ }
+
/// Returns the min value for the page indexed by `idx`
///
/// It is `None` when all values are null
@@ -344,33 +404,6 @@ impl ByteArrayColumnIndex {
}
})
}
-
- pub(crate) fn to_thrift(&self) -> crate::format::ColumnIndex {
- let mut min_values = Vec::with_capacity(self.num_pages() as usize);
- for i in 0..self.num_pages() as usize {
- min_values.push(self.min_value(i).unwrap_or(&[]).to_owned());
- }
-
- let mut max_values = Vec::with_capacity(self.num_pages() as usize);
- for i in 0..self.num_pages() as usize {
- max_values.push(self.max_value(i).unwrap_or(&[]).to_owned());
- }
-
- let null_counts = self.null_counts.clone();
- let repetition_level_histograms =
self.repetition_level_histograms.clone();
- let definition_level_histograms =
self.definition_level_histograms.clone();
- let null_pages = self.null_pages.clone();
-
- crate::format::ColumnIndex::new(
- null_pages,
- min_values,
- max_values,
- self.boundary_order.into(),
- null_counts,
- repetition_level_histograms,
- definition_level_histograms,
- )
- }
}
impl Deref for ByteArrayColumnIndex {
@@ -381,6 +414,53 @@ impl Deref for ByteArrayColumnIndex {
}
}
+impl WriteThrift for ByteArrayColumnIndex {
+ const ELEMENT_TYPE: ElementType = ElementType::Struct;
+ fn write_thrift<W: std::io::Write>(
+ &self,
+ writer: &mut ThriftCompactOutputProtocol<W>,
+ ) -> Result<()> {
+ self.null_pages.write_thrift_field(writer, 1, 0)?;
+
+ // need to handle min/max manually
+ let len = self.null_pages.len();
+ writer.write_field_begin(FieldType::List, 2, 1)?;
+ writer.write_list_begin(ElementType::Binary, len)?;
+ for i in 0..len {
+ let min = self.min_value(i).unwrap_or(&[]);
+ min.write_thrift(writer)?;
+ }
+ writer.write_field_begin(FieldType::List, 3, 2)?;
+ writer.write_list_begin(ElementType::Binary, len)?;
+ for i in 0..len {
+ let max = self.max_value(i).unwrap_or(&[]);
+ max.write_thrift(writer)?;
+ }
+ let mut last_field_id = self.boundary_order.write_thrift_field(writer,
4, 3)?;
+ if self.null_counts.is_some() {
+ last_field_id =
+ self.null_counts
+ .as_ref()
+ .unwrap()
+ .write_thrift_field(writer, 5, last_field_id)?;
+ }
+ if self.repetition_level_histograms.is_some() {
+ last_field_id = self
+ .repetition_level_histograms
+ .as_ref()
+ .unwrap()
+ .write_thrift_field(writer, 6, last_field_id)?;
+ }
+ if self.definition_level_histograms.is_some() {
+ self.definition_level_histograms
+ .as_ref()
+ .unwrap()
+ .write_thrift_field(writer, 7, last_field_id)?;
+ }
+ writer.write_struct_end()
+ }
+}
+
// Macro to generate getter functions for ColumnIndexMetaData.
macro_rules! colidx_enum_func {
($self:ident, $func:ident, $arg:ident) => {{
@@ -567,3 +647,99 @@ column_index_iters!(ByteArray, BYTE_ARRAY, |v| v
.map(|v| ByteArray::from(v.to_owned())));
column_index_iters!(FixedLenByteArray, FIXED_LEN_BYTE_ARRAY, |v| v
.map(|v| FixedLenByteArray::from(v.to_owned())));
+
+impl WriteThrift for ColumnIndexMetaData {
+ const ELEMENT_TYPE: ElementType = ElementType::Struct;
+
+ fn write_thrift<W: std::io::Write>(
+ &self,
+ writer: &mut ThriftCompactOutputProtocol<W>,
+ ) -> Result<()> {
+ match self {
+ ColumnIndexMetaData::BOOLEAN(index) => index.write_thrift(writer),
+ ColumnIndexMetaData::INT32(index) => index.write_thrift(writer),
+ ColumnIndexMetaData::INT64(index) => index.write_thrift(writer),
+ ColumnIndexMetaData::INT96(index) => index.write_thrift(writer),
+ ColumnIndexMetaData::FLOAT(index) => index.write_thrift(writer),
+ ColumnIndexMetaData::DOUBLE(index) => index.write_thrift(writer),
+ ColumnIndexMetaData::BYTE_ARRAY(index) =>
index.write_thrift(writer),
+ ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(index) =>
index.write_thrift(writer),
+ _ => Err(general_err!("Cannot serialize NONE index")),
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn test_page_index_min_max_null() {
+ let column_index = PrimitiveColumnIndex {
+ column_index: ColumnIndex {
+ null_pages: vec![false],
+ boundary_order: BoundaryOrder::ASCENDING,
+ null_counts: Some(vec![0]),
+ repetition_level_histograms: Some(vec![1, 2]),
+ definition_level_histograms: Some(vec![1, 2, 3]),
+ },
+ min_values: vec![-123],
+ max_values: vec![234],
+ };
+
+ assert_eq!(column_index.min_value(0), Some(&-123));
+ assert_eq!(column_index.max_value(0), Some(&234));
+ assert_eq!(column_index.null_count(0), Some(0));
+ assert_eq!(column_index.repetition_level_histogram(0).unwrap(), &[1,
2]);
+ assert_eq!(
+ column_index.definition_level_histogram(0).unwrap(),
+ &[1, 2, 3]
+ );
+ }
+
+ #[test]
+ fn test_page_index_min_max_null_none() {
+ let column_index: PrimitiveColumnIndex<i32> =
PrimitiveColumnIndex::<i32> {
+ column_index: ColumnIndex {
+ null_pages: vec![true],
+ boundary_order: BoundaryOrder::ASCENDING,
+ null_counts: Some(vec![1]),
+ repetition_level_histograms: None,
+ definition_level_histograms: Some(vec![1, 0]),
+ },
+ min_values: vec![Default::default()],
+ max_values: vec![Default::default()],
+ };
+
+ assert_eq!(column_index.min_value(0), None);
+ assert_eq!(column_index.max_value(0), None);
+ assert_eq!(column_index.null_count(0), Some(1));
+ assert_eq!(column_index.repetition_level_histogram(0), None);
+ assert_eq!(column_index.definition_level_histogram(0).unwrap(), &[1,
0]);
+ }
+
+ #[test]
+ fn test_invalid_column_index() {
+ let column_index = ThriftColumnIndex {
+ null_pages: vec![true, false],
+ min_values: vec![
+ &[],
+ &[], // this shouldn't be empty as null_pages[1] is false
+ ],
+ max_values: vec![
+ &[],
+ &[], // this shouldn't be empty as null_pages[1] is false
+ ],
+ null_counts: None,
+ repetition_level_histograms: None,
+ definition_level_histograms: None,
+ boundary_order: BoundaryOrder::UNORDERED,
+ };
+
+ let err =
PrimitiveColumnIndex::<i32>::try_from_thrift(column_index).unwrap_err();
+ assert_eq!(
+ err.to_string(),
+ "Parquet error: error converting value, expected 4 bytes got 0"
+ );
+ }
+}
diff --git a/parquet/src/file/page_index/index.rs
b/parquet/src/file/page_index/index.rs
deleted file mode 100644
index 861dc0c3b0..0000000000
--- a/parquet/src/file/page_index/index.rs
+++ /dev/null
@@ -1,455 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-//! [`Index`] structures holding decoded [`ColumnIndex`] information
-//!
-//! [`ColumnIndex`]: crate::format::ColumnIndex
-
-use crate::basic::{BoundaryOrder, Type};
-use crate::data_type::private::ParquetValueType;
-use crate::data_type::{AsBytes, ByteArray, FixedLenByteArray, Int96};
-use crate::errors::ParquetError;
-use crate::file::metadata::LevelHistogram;
-use crate::file::page_index::index_reader::ThriftColumnIndex;
-use std::fmt::Debug;
-
-/// Typed statistics for one data page
-///
-/// See [`NativeIndex`] for more details
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
-pub struct PageIndex<T> {
- /// The minimum value, It is None when all values are null
- pub min: Option<T>,
- /// The maximum value, It is None when all values are null
- pub max: Option<T>,
- /// Null values in the page
- pub null_count: Option<i64>,
- /// Repetition level histogram for the page
- ///
- /// `repetition_level_histogram[i]` is a count of how many values are at
repetition level `i`.
- /// For example, `repetition_level_histogram[0]` indicates how many rows
the page contains.
- pub repetition_level_histogram: Option<LevelHistogram>,
- /// Definition level histogram for the page
- ///
- /// `definition_level_histogram[i]` is a count of how many values are at
definition level `i`.
- /// For example, `definition_level_histogram[max_definition_level]`
indicates how many
- /// non-null values are present in the page.
- pub definition_level_histogram: Option<LevelHistogram>,
-}
-
-impl<T> PageIndex<T> {
- /// Returns the minimum value in the page
- ///
- /// It is `None` when all values are null
- pub fn min(&self) -> Option<&T> {
- self.min.as_ref()
- }
-
- /// Returns the maximum value in the page
- ///
- /// It is `None` when all values are null
- pub fn max(&self) -> Option<&T> {
- self.max.as_ref()
- }
-
- /// Returns the number of null values in the page
- pub fn null_count(&self) -> Option<i64> {
- self.null_count
- }
-
- /// Returns the repetition level histogram for the page
- pub fn repetition_level_histogram(&self) -> Option<&LevelHistogram> {
- self.repetition_level_histogram.as_ref()
- }
-
- /// Returns the definition level histogram for the page
- pub fn definition_level_histogram(&self) -> Option<&LevelHistogram> {
- self.definition_level_histogram.as_ref()
- }
-
- /// Returns whether this is an all null page
- pub fn is_null_page(&self) -> bool {
- self.min.is_none()
- }
-}
-
-impl<T> PageIndex<T>
-where
- T: AsBytes,
-{
- /// Returns the minimum value in the page as bytes
- ///
- /// It is `None` when all values are null
- pub fn max_bytes(&self) -> Option<&[u8]> {
- self.max.as_ref().map(|x| x.as_bytes())
- }
-
- /// Returns the maximum value in the page as bytes
- ///
- /// It is `None` when all values are null
- pub fn min_bytes(&self) -> Option<&[u8]> {
- self.min.as_ref().map(|x| x.as_bytes())
- }
-}
-
-#[derive(Debug, Clone, PartialEq)]
-#[allow(non_camel_case_types)]
-/// Statistics for data pages in a column chunk.
-///
-/// See [`NativeIndex`] for more information
-pub enum Index {
- /// Sometimes reading page index from parquet file
- /// will only return pageLocations without min_max index,
- /// `NONE` represents this lack of index information
- NONE,
- /// Boolean type index
- BOOLEAN(NativeIndex<bool>),
- /// 32-bit integer type index
- INT32(NativeIndex<i32>),
- /// 64-bit integer type index
- INT64(NativeIndex<i64>),
- /// 96-bit integer type (timestamp) index
- INT96(NativeIndex<Int96>),
- /// 32-bit floating point type index
- FLOAT(NativeIndex<f32>),
- /// 64-bit floating point type index
- DOUBLE(NativeIndex<f64>),
- /// Byte array type index
- BYTE_ARRAY(NativeIndex<ByteArray>),
- /// Fixed length byte array type index
- FIXED_LEN_BYTE_ARRAY(NativeIndex<FixedLenByteArray>),
-}
-
-impl Index {
- /// Return min/max elements inside ColumnIndex are ordered or not.
- pub fn is_sorted(&self) -> bool {
- // 0:UNORDERED, 1:ASCENDING ,2:DESCENDING,
- if let Some(order) = self.get_boundary_order() {
- order != BoundaryOrder::UNORDERED
- } else {
- false
- }
- }
-
- /// Get boundary_order of this page index.
- pub fn get_boundary_order(&self) -> Option<BoundaryOrder> {
- match self {
- Index::NONE => None,
- Index::BOOLEAN(index) => Some(index.boundary_order),
- Index::INT32(index) => Some(index.boundary_order),
- Index::INT64(index) => Some(index.boundary_order),
- Index::INT96(index) => Some(index.boundary_order),
- Index::FLOAT(index) => Some(index.boundary_order),
- Index::DOUBLE(index) => Some(index.boundary_order),
- Index::BYTE_ARRAY(index) => Some(index.boundary_order),
- Index::FIXED_LEN_BYTE_ARRAY(index) => Some(index.boundary_order),
- }
- }
-}
-
-/// Strongly typed statistics for data pages in a column chunk.
-///
-/// This structure is a natively typed, in memory representation of the
-/// [`ColumnIndex`] structure in a parquet file footer, as described in the
-/// Parquet [PageIndex documentation]. The statistics stored in this structure
-/// can be used by query engines to skip decoding pages while reading parquet
-/// data.
-///
-/// # Differences with Row Group Level Statistics
-///
-/// One significant difference between `NativeIndex` and row group level
-/// [`Statistics`] is that page level statistics may not store actual column
-/// values as min and max (e.g. they may store truncated strings to save space)
-///
-/// [PageIndex documentation]:
https://github.com/apache/parquet-format/blob/master/PageIndex.md
-/// [`Statistics`]: crate::file::statistics::Statistics
-/// [`ColumnIndex`]: crate::format::ColumnIndex
-#[derive(Debug, Clone, PartialEq, Eq, Hash)]
-pub struct NativeIndex<T: ParquetValueType> {
- /// The actual column indexes, one item per page
- pub indexes: Vec<PageIndex<T>>,
- /// If the min/max elements are ordered, and if so in which
- /// direction. See [source] for details.
- ///
- /// [source]:
https://github.com/apache/parquet-format/blob/bfc549b93e6927cb1fc425466e4084f76edc6d22/src/main/thrift/parquet.thrift#L959-L964
- pub boundary_order: BoundaryOrder,
-}
-
-impl<T: ParquetValueType> NativeIndex<T> {
- /// The physical data type of the column
- pub const PHYSICAL_TYPE: Type = T::PHYSICAL_TYPE;
-
- /// Creates a new [`NativeIndex`]
- #[allow(dead_code)]
- pub(crate) fn try_new(index: crate::format::ColumnIndex) -> Result<Self,
ParquetError> {
- let len = index.min_values.len();
-
- let null_counts = index
- .null_counts
- .map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
- .unwrap_or_else(|| vec![None; len]);
-
- // histograms are a 1D array encoding a 2D num_pages X num_levels
matrix.
- let to_page_histograms = |opt_hist: Option<Vec<i64>>| {
- if let Some(hist) = opt_hist {
- // TODO: should we assert (hist.len() % len) == 0?
- let num_levels = hist.len() / len;
- let mut res = Vec::with_capacity(len);
- for i in 0..len {
- let page_idx = i * num_levels;
- let page_hist = hist[page_idx..page_idx +
num_levels].to_vec();
- res.push(Some(LevelHistogram::from(page_hist)));
- }
- res
- } else {
- vec![None; len]
- }
- };
-
- let rep_hists: Vec<Option<LevelHistogram>> =
- to_page_histograms(index.repetition_level_histograms);
- let def_hists: Vec<Option<LevelHistogram>> =
- to_page_histograms(index.definition_level_histograms);
-
- let indexes = index
- .min_values
- .iter()
- .zip(index.max_values.iter())
- .zip(index.null_pages.into_iter())
- .zip(null_counts.into_iter())
- .zip(rep_hists.into_iter())
- .zip(def_hists.into_iter())
- .map(
- |(
- ((((min, max), is_null), null_count),
repetition_level_histogram),
- definition_level_histogram,
- )| {
- let (min, max) = if is_null {
- (None, None)
- } else {
- (
- Some(T::try_from_le_slice(min)?),
- Some(T::try_from_le_slice(max)?),
- )
- };
- Ok(PageIndex {
- min,
- max,
- null_count,
- repetition_level_histogram,
- definition_level_histogram,
- })
- },
- )
- .collect::<Result<Vec<_>, ParquetError>>()?;
-
- let boundary_order = index.boundary_order.try_into()?;
- Ok(Self {
- indexes,
- boundary_order,
- })
- }
-
- pub(crate) fn to_thrift(&self) -> crate::format::ColumnIndex {
- let min_values = self
- .indexes
- .iter()
- .map(|x| x.min_bytes().unwrap_or(&[]).to_vec())
- .collect::<Vec<_>>();
-
- let max_values = self
- .indexes
- .iter()
- .map(|x| x.max_bytes().unwrap_or(&[]).to_vec())
- .collect::<Vec<_>>();
-
- let null_counts = self
- .indexes
- .iter()
- .map(|x| x.null_count())
- .collect::<Option<Vec<_>>>();
-
- // Concatenate page histograms into a single Option<Vec>
- let repetition_level_histograms = self
- .indexes
- .iter()
- .map(|x| x.repetition_level_histogram().map(|v| v.values()))
- .collect::<Option<Vec<&[i64]>>>()
- .map(|hists| hists.concat());
-
- let definition_level_histograms = self
- .indexes
- .iter()
- .map(|x| x.definition_level_histogram().map(|v| v.values()))
- .collect::<Option<Vec<&[i64]>>>()
- .map(|hists| hists.concat());
-
- crate::format::ColumnIndex::new(
- self.indexes.iter().map(|x| x.min().is_none()).collect(),
- min_values,
- max_values,
- self.boundary_order.into(),
- null_counts,
- repetition_level_histograms,
- definition_level_histograms,
- )
- }
-
- /// Creates a new [`NativeIndex`]
- #[allow(dead_code)]
- pub(super) fn try_new_local(index: ThriftColumnIndex) -> Result<Self,
ParquetError> {
- let len = index.min_values.len();
-
- // turn Option<Vec<i64>> into Vec<Option<i64>>
- let null_counts = index
- .null_counts
- .map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
- .unwrap_or_else(|| vec![None; len]);
-
- // histograms are a 1D array encoding a 2D num_pages X num_levels
matrix.
- let to_page_histograms = |opt_hist: Option<Vec<i64>>| {
- if let Some(hist) = opt_hist {
- // TODO: should we assert (hist.len() % len) == 0?
- let num_levels = hist.len() / len;
- let mut res = Vec::with_capacity(len);
- for i in 0..len {
- let page_idx = i * num_levels;
- let page_hist = hist[page_idx..page_idx +
num_levels].to_vec();
- res.push(Some(LevelHistogram::from(page_hist)));
- }
- res
- } else {
- vec![None; len]
- }
- };
-
- // turn Option<Vec<i64>> into Vec<Option<i64>>
- let rep_hists: Vec<Option<LevelHistogram>> =
- to_page_histograms(index.repetition_level_histograms);
- let def_hists: Vec<Option<LevelHistogram>> =
- to_page_histograms(index.definition_level_histograms);
-
- // start assembling Vec<PageIndex>
- let mut indexes: Vec<PageIndex<T>> = Vec::with_capacity(len);
- let mut rep_iter = rep_hists.into_iter();
- let mut def_iter = def_hists.into_iter();
-
- // this used to zip together the other iters, but that was quite a bit
- // slower than this approach.
- for (i, null_count) in null_counts.into_iter().enumerate().take(len) {
- let is_null = index.null_pages[i];
- let min = if is_null {
- None
- } else {
- Some(T::try_from_le_slice(index.min_values[i])?)
- };
- let max = if is_null {
- None
- } else {
- Some(T::try_from_le_slice(index.max_values[i])?)
- };
-
- indexes.push(PageIndex {
- min,
- max,
- null_count,
- repetition_level_histogram: rep_iter.next().unwrap_or(None),
- definition_level_histogram: def_iter.next().unwrap_or(None),
- })
- }
-
- let boundary_order = index.boundary_order;
- Ok(Self {
- indexes,
- boundary_order,
- })
- }
-}
-
-#[cfg(test)]
-mod tests {
- use super::*;
-
- #[test]
- fn test_page_index_min_max_null() {
- let page_index = PageIndex {
- min: Some(-123),
- max: Some(234),
- null_count: Some(0),
- repetition_level_histogram: Some(LevelHistogram::from(vec![1, 2])),
- definition_level_histogram: Some(LevelHistogram::from(vec![1, 2,
3])),
- };
-
- assert_eq!(page_index.min().unwrap(), &-123);
- assert_eq!(page_index.max().unwrap(), &234);
- assert_eq!(page_index.min_bytes().unwrap(), (-123).as_bytes());
- assert_eq!(page_index.max_bytes().unwrap(), 234.as_bytes());
- assert_eq!(page_index.null_count().unwrap(), 0);
- assert_eq!(
- page_index.repetition_level_histogram().unwrap().values(),
- &vec![1, 2]
- );
- assert_eq!(
- page_index.definition_level_histogram().unwrap().values(),
- &vec![1, 2, 3]
- );
- }
-
- #[test]
- fn test_page_index_min_max_null_none() {
- let page_index: PageIndex<i32> = PageIndex {
- min: None,
- max: None,
- null_count: None,
- repetition_level_histogram: None,
- definition_level_histogram: None,
- };
-
- assert_eq!(page_index.min(), None);
- assert_eq!(page_index.max(), None);
- assert_eq!(page_index.min_bytes(), None);
- assert_eq!(page_index.max_bytes(), None);
- assert_eq!(page_index.null_count(), None);
- assert_eq!(page_index.repetition_level_histogram(), None);
- assert_eq!(page_index.definition_level_histogram(), None);
- }
-
- #[test]
- fn test_invalid_column_index() {
- let column_index = crate::format::ColumnIndex {
- null_pages: vec![true, false],
- min_values: vec![
- vec![],
- vec![], // this shouldn't be empty as null_pages[1] is false
- ],
- max_values: vec![
- vec![],
- vec![], // this shouldn't be empty as null_pages[1] is false
- ],
- null_counts: None,
- repetition_level_histograms: None,
- definition_level_histograms: None,
- boundary_order: crate::format::BoundaryOrder::UNORDERED,
- };
-
- let err = NativeIndex::<i32>::try_new(column_index).unwrap_err();
- assert_eq!(
- err.to_string(),
- "Parquet error: error converting value, expected 4 bytes got 0"
- );
- }
-}
diff --git a/parquet/src/file/page_index/index_reader.rs
b/parquet/src/file/page_index/index_reader.rs
index 3db597954e..fbf97ad92c 100644
--- a/parquet/src/file/page_index/index_reader.rs
+++ b/parquet/src/file/page_index/index_reader.rs
@@ -171,16 +171,28 @@ pub(crate) fn decode_column_index(
let index = match column_type {
Type::BOOLEAN => {
-
ColumnIndexMetaData::BOOLEAN(PrimitiveColumnIndex::<bool>::try_new(index)?)
+
ColumnIndexMetaData::BOOLEAN(PrimitiveColumnIndex::<bool>::try_from_thrift(index)?)
+ }
+ Type::INT32 => {
+
ColumnIndexMetaData::INT32(PrimitiveColumnIndex::<i32>::try_from_thrift(index)?)
+ }
+ Type::INT64 => {
+
ColumnIndexMetaData::INT64(PrimitiveColumnIndex::<i64>::try_from_thrift(index)?)
+ }
+ Type::INT96 => {
+
ColumnIndexMetaData::INT96(PrimitiveColumnIndex::<Int96>::try_from_thrift(index)?)
+ }
+ Type::FLOAT => {
+
ColumnIndexMetaData::FLOAT(PrimitiveColumnIndex::<f32>::try_from_thrift(index)?)
+ }
+ Type::DOUBLE => {
+
ColumnIndexMetaData::DOUBLE(PrimitiveColumnIndex::<f64>::try_from_thrift(index)?)
+ }
+ Type::BYTE_ARRAY => {
+
ColumnIndexMetaData::BYTE_ARRAY(ByteArrayColumnIndex::try_from_thrift(index)?)
}
- Type::INT32 =>
ColumnIndexMetaData::INT32(PrimitiveColumnIndex::<i32>::try_new(index)?),
- Type::INT64 =>
ColumnIndexMetaData::INT64(PrimitiveColumnIndex::<i64>::try_new(index)?),
- Type::INT96 =>
ColumnIndexMetaData::INT96(PrimitiveColumnIndex::<Int96>::try_new(index)?),
- Type::FLOAT =>
ColumnIndexMetaData::FLOAT(PrimitiveColumnIndex::<f32>::try_new(index)?),
- Type::DOUBLE =>
ColumnIndexMetaData::DOUBLE(PrimitiveColumnIndex::<f64>::try_new(index)?),
- Type::BYTE_ARRAY =>
ColumnIndexMetaData::BYTE_ARRAY(ByteArrayColumnIndex::try_new(index)?),
Type::FIXED_LEN_BYTE_ARRAY => {
-
ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(ByteArrayColumnIndex::try_new(index)?)
+
ColumnIndexMetaData::FIXED_LEN_BYTE_ARRAY(ByteArrayColumnIndex::try_from_thrift(index)?)
}
};
diff --git a/parquet/src/file/page_index/mod.rs
b/parquet/src/file/page_index/mod.rs
index ff70e2eca5..71b8290d5d 100644
--- a/parquet/src/file/page_index/mod.rs
+++ b/parquet/src/file/page_index/mod.rs
@@ -20,6 +20,5 @@
//! [Column Index]:
https://github.com/apache/parquet-format/blob/master/PageIndex.md
pub mod column_index;
-pub mod index;
pub mod index_reader;
pub mod offset_index;
diff --git a/parquet/src/file/page_index/offset_index.rs
b/parquet/src/file/page_index/offset_index.rs
index 2153b8ed30..30b58ce0ac 100644
--- a/parquet/src/file/page_index/offset_index.rs
+++ b/parquet/src/file/page_index/offset_index.rs
@@ -102,14 +102,6 @@ impl OffsetIndexMetaData {
self.unencoded_byte_array_data_bytes.as_ref()
}
- pub(crate) fn to_thrift(&self) -> crate::format::OffsetIndex {
- let page_locations = self.page_locations.iter().map(|loc|
loc.into()).collect();
- crate::format::OffsetIndex::new(
- page_locations,
- self.unencoded_byte_array_data_bytes.clone(),
- )
- }
-
// Fast-path read of offset index. This works because we expect all field
deltas to be 1,
// and there's no nesting beyond PageLocation, so no need to save the last
field id. Like
// read_page_locations(), this will fail if absolute field id's are used.
diff --git a/parquet/src/file/properties.rs b/parquet/src/file/properties.rs
index b6003dc4d9..a76db64656 100644
--- a/parquet/src/file/properties.rs
+++ b/parquet/src/file/properties.rs
@@ -639,7 +639,7 @@ impl WriterPropertiesBuilder {
/// * If `Some`, must be greater than 0, otherwise will panic
/// * If `None`, there's no effective limit.
///
- /// [`Index`]: crate::file::page_index::index::Index
+ /// [`Index`]: crate::file::page_index::column_index::ColumnIndexMetaData
pub fn set_column_index_truncate_length(mut self, max_length:
Option<usize>) -> Self {
if let Some(value) = max_length {
assert!(value > 0, "Cannot have a 0 column index truncate length.
If you wish to disable min/max value truncation, set it to `None`.");
diff --git a/parquet/src/file/statistics.rs b/parquet/src/file/statistics.rs
index e51f445b7e..38c0d1ff06 100644
--- a/parquet/src/file/statistics.rs
+++ b/parquet/src/file/statistics.rs
@@ -518,15 +518,14 @@ pub(crate) fn page_stats_to_thrift(stats:
Option<&Statistics>) -> Option<PageSta
/// Strongly typed statistics for a column chunk within a row group.
///
-/// This structure is a natively typed, in memory representation of the
-/// [`Statistics`] structure in a parquet file footer. The statistics stored in
+/// This structure is a natively typed, in memory representation of the thrift
+/// `Statistics` structure in a Parquet file footer. The statistics stored in
/// this structure can be used by query engines to skip decoding pages while
/// reading parquet data.
///
-/// Page level statistics are stored separately, in [NativeIndex].
+/// Page level statistics are stored separately, in [ColumnIndexMetaData].
///
-/// [`Statistics`]: crate::format::Statistics
-/// [NativeIndex]: crate::file::page_index::index::NativeIndex
+/// [ColumnIndexMetaData]:
crate::file::page_index::column_index::ColumnIndexMetaData
#[derive(Debug, Clone, PartialEq)]
pub enum Statistics {
/// Statistics for Boolean column
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index c8fba9bc5c..d6f742c137 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -19,7 +19,7 @@
use crate::bloom_filter::Sbbf;
use crate::file::metadata::thrift_gen::PageHeader;
-use crate::file::page_index::index::Index;
+use crate::file::page_index::column_index::ColumnIndexMetaData;
use crate::file::page_index::offset_index::OffsetIndexMetaData;
use crate::parquet_thrift::{ThriftCompactOutputProtocol, WriteThrift};
use std::fmt::Debug;
@@ -127,7 +127,7 @@ pub type OnCloseRowGroup<'a, W> = Box<
&'a mut TrackedWrite<W>,
RowGroupMetaData,
Vec<Option<Sbbf>>,
- Vec<Option<Index>>,
+ Vec<Option<ColumnIndexMetaData>>,
Vec<Option<OffsetIndexMetaData>>,
) -> Result<()>
+ 'a
@@ -160,7 +160,7 @@ pub struct SerializedFileWriter<W: Write> {
props: WriterPropertiesPtr,
row_groups: Vec<RowGroupMetaData>,
bloom_filters: Vec<Vec<Option<Sbbf>>>,
- column_indexes: Vec<Vec<Option<Index>>>,
+ column_indexes: Vec<Vec<Option<ColumnIndexMetaData>>>,
offset_indexes: Vec<Vec<Option<OffsetIndexMetaData>>>,
row_group_index: usize,
// kv_metadatas will be appended to `props` when `write_metadata`
@@ -347,9 +347,6 @@ impl<W: Write + Send> SerializedFileWriter<W> {
.map(|v| v.to_thrift())
.collect::<Vec<_>>();
- let column_indexes = self.convert_column_indexes();
- let offset_indexes = self.convert_offset_index();
-
let mut encoder = ThriftMetadataWriter::new(
&mut self.buf,
&self.schema,
@@ -368,45 +365,11 @@ impl<W: Write + Send> SerializedFileWriter<W> {
encoder = encoder.with_key_value_metadata(key_value_metadata)
}
- encoder = encoder.with_column_indexes(&column_indexes);
- encoder = encoder.with_offset_indexes(&offset_indexes);
+ encoder = encoder.with_column_indexes(&self.column_indexes);
+ encoder = encoder.with_offset_indexes(&self.offset_indexes);
encoder.finish()
}
- fn convert_column_indexes(&self) ->
Vec<Vec<Option<crate::format::ColumnIndex>>> {
- self.column_indexes
- .iter()
- .map(|cis| {
- cis.iter()
- .map(|ci| {
- ci.as_ref().map(|column_index| match column_index {
- Index::NONE => panic!("trying to serialize missing
column index"),
- Index::BOOLEAN(column_index) =>
column_index.to_thrift(),
- Index::BYTE_ARRAY(column_index) =>
column_index.to_thrift(),
- Index::DOUBLE(column_index) =>
column_index.to_thrift(),
- Index::FIXED_LEN_BYTE_ARRAY(column_index) =>
column_index.to_thrift(),
- Index::FLOAT(column_index) =>
column_index.to_thrift(),
- Index::INT32(column_index) =>
column_index.to_thrift(),
- Index::INT64(column_index) =>
column_index.to_thrift(),
- Index::INT96(column_index) =>
column_index.to_thrift(),
- })
- })
- .collect()
- })
- .collect()
- }
-
- fn convert_offset_index(&self) ->
Vec<Vec<Option<crate::format::OffsetIndex>>> {
- self.offset_indexes
- .iter()
- .map(|ois| {
- ois.iter()
- .map(|oi| oi.as_ref().map(|offset_index|
offset_index.to_thrift()))
- .collect()
- })
- .collect()
- }
-
#[inline]
fn assert_previous_writer_closed(&self) -> Result<()> {
if self.finished {
@@ -546,7 +509,7 @@ pub struct SerializedRowGroupWriter<'a, W: Write> {
row_group_metadata: Option<RowGroupMetaDataPtr>,
column_chunks: Vec<ColumnChunkMetaData>,
bloom_filters: Vec<Option<Sbbf>>,
- column_indexes: Vec<Option<Index>>,
+ column_indexes: Vec<Option<ColumnIndexMetaData>>,
offset_indexes: Vec<Option<OffsetIndexMetaData>>,
row_group_index: i16,
file_offset: i64,