This is an automated email from the ASF dual-hosted git repository.
alamb pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new b06ffceaa Add support for level histograms added in PARQUET-2261 to
`ParquetMetaData` (#6105)
b06ffceaa is described below
commit b06ffceaab2b59edc098d86f75b2a5125a8352ee
Author: Ed Seidl <[email protected]>
AuthorDate: Fri Jul 26 10:37:30 2024 -0700
Add support for level histograms added in PARQUET-2261 to `ParquetMetaData`
(#6105)
* bump `tonic` to 0.12 and `prost` to 0.13 for `arrow-flight` (#6041)
* bump `tonic` to 0.12 and `prost` to 0.13 for `arrow-flight`
Signed-off-by: Bugen Zhao <[email protected]>
* fix example tests
Signed-off-by: Bugen Zhao <[email protected]>
---------
Signed-off-by: Bugen Zhao <[email protected]>
* Remove `impl<T: AsRef<[u8]>> From<T> for Buffer` that easily
accidentally copies data (#6043)
* deprecate auto copy, ask explicit reference
* update comments
* make cargo doc happy
* Make display of interval types more pretty (#6006)
* improve dispaly for interval.
* update test in pretty, and fix display problem.
* tmp
* fix tests in arrow-cast.
* fix tests in pretty.
* fix style.
* Update snafu (#5930)
* Update Parquet thrift generated structures (#6045)
* update to latest thrift (as of 11 Jul 2024) from parquet-format
* pass None for optional size statistics
* escape HTML tags
* don't need to escape brackets in arrays
* Revert "Revert "Write Bloom filters between row groups instead of the end
(#…" (#5933)
This reverts commit 22e0b4432c9838f2536284015271d3de9a165135.
* Revert "Update snafu (#5930)" (#6069)
This reverts commit 756b1fb26d1702f36f446faf9bb40a4869c3e840.
* Update pyo3 requirement from 0.21.1 to 0.22.1 (fixed) (#6075)
* Update pyo3 requirement from 0.21.1 to 0.22.1
Updates the requirements on [pyo3](https://github.com/pyo3/pyo3) to permit
the latest version.
- [Release notes](https://github.com/pyo3/pyo3/releases)
- [Changelog](https://github.com/PyO3/pyo3/blob/main/CHANGELOG.md)
- [Commits](https://github.com/pyo3/pyo3/compare/v0.21.1...v0.22.1)
---
updated-dependencies:
- dependency-name: pyo3
dependency-type: direct:production
...
Signed-off-by: dependabot[bot] <[email protected]>
* refactor: remove deprecated `FromPyArrow::from_pyarrow`
"GIL Refs" are being phased out.
* chore: update `pyo3` in integration tests
---------
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
* remove repeated codes to make the codes more concise. (#6080)
* Add `unencoded_byte_array_data_bytes` to `ParquetMetaData` (#6068)
* update to latest thrift (as of 11 Jul 2024) from parquet-format
* pass None for optional size statistics
* escape HTML tags
* don't need to escape brackets in arrays
* add support for unencoded_byte_array_data_bytes
* add comments
* change sig of ColumnMetrics::update_variable_length_bytes()
* rename ParquetOffsetIndex to OffsetSizeIndex
* rename some functions
* suggestion from review
Co-authored-by: Andrew Lamb <[email protected]>
* add Default trait to ColumnMetrics as suggested in review
* rename OffsetSizeIndex to OffsetIndexMetaData
---------
Co-authored-by: Andrew Lamb <[email protected]>
* deprecate read_page_locations
* add level histograms to metadata
* add to_thrift() to OffsetIndexMetaData
* Update pyo3 requirement from 0.21.1 to 0.22.2 (#6085)
Updates the requirements on [pyo3](https://github.com/pyo3/pyo3) to permit
the latest version.
- [Release notes](https://github.com/pyo3/pyo3/releases)
- [Changelog](https://github.com/PyO3/pyo3/blob/v0.22.2/CHANGELOG.md)
- [Commits](https://github.com/pyo3/pyo3/compare/v0.21.1...v0.22.2)
---
updated-dependencies:
- dependency-name: pyo3
dependency-type: direct:production
...
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
* Deprecate read_page_locations() and simplify offset index in
`ParquetMetaData` (#6095)
* deprecate read_page_locations
* add to_thrift() to OffsetIndexMetaData
* move valid test into ColumnIndexBuilder::append_histograms
* move update_histogram() inside ColumnMetrics
* Update parquet/src/column/writer/mod.rs
Co-authored-by: Ed Seidl <[email protected]>
* Implement LevelHistograms as a struct
* formatting
* fix error in docs
---------
Signed-off-by: Bugen Zhao <[email protected]>
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: Bugen Zhao <[email protected]>
Co-authored-by: Xiangpeng Hao <[email protected]>
Co-authored-by: kamille <[email protected]>
Co-authored-by: Jesse <[email protected]>
Co-authored-by: Andrew Lamb <[email protected]>
Co-authored-by: Marco Neumann <[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
---
parquet/src/column/writer/mod.rs | 137 ++++++++++++++++++++--
parquet/src/file/metadata/memory.rs | 2 +
parquet/src/file/metadata/mod.rs | 217 +++++++++++++++++++++++++++++++++--
parquet/src/file/page_index/index.rs | 87 +++++++++++---
parquet/src/file/writer.rs | 144 ++++++++++++++++++++++-
5 files changed, 550 insertions(+), 37 deletions(-)
diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 2c0c957d8..54d8fd3cc 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -33,7 +33,7 @@ use crate::data_type::private::ParquetValueType;
use crate::data_type::*;
use crate::encodings::levels::LevelEncoder;
use crate::errors::{ParquetError, Result};
-use crate::file::metadata::{ColumnIndexBuilder, OffsetIndexBuilder};
+use crate::file::metadata::{ColumnIndexBuilder, LevelHistogram,
OffsetIndexBuilder};
use crate::file::properties::EnabledStatistics;
use crate::file::statistics::{Statistics, ValueStatistics};
use crate::file::{
@@ -189,6 +189,54 @@ struct PageMetrics {
num_buffered_values: u32,
num_buffered_rows: u32,
num_page_nulls: u64,
+ repetition_level_histogram: Option<LevelHistogram>,
+ definition_level_histogram: Option<LevelHistogram>,
+}
+
+impl PageMetrics {
+ fn new() -> Self {
+ Default::default()
+ }
+
+ /// Initialize the repetition level histogram
+ fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
+ self.repetition_level_histogram = LevelHistogram::try_new(max_level);
+ self
+ }
+
+ /// Initialize the definition level histogram
+ fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
+ self.definition_level_histogram = LevelHistogram::try_new(max_level);
+ self
+ }
+
+ /// Resets the state of this `PageMetrics` to the initial state.
+ /// If histograms have been initialized their contents will be reset to
zero.
+ fn new_page(&mut self) {
+ self.num_buffered_values = 0;
+ self.num_buffered_rows = 0;
+ self.num_page_nulls = 0;
+ self.repetition_level_histogram
+ .as_mut()
+ .map(LevelHistogram::reset);
+ self.definition_level_histogram
+ .as_mut()
+ .map(LevelHistogram::reset);
+ }
+
+ /// Updates histogram values using provided repetition levels
+ fn update_repetition_level_histogram(&mut self, levels: &[i16]) {
+ if let Some(ref mut rep_hist) = self.repetition_level_histogram {
+ rep_hist.update_from_levels(levels);
+ }
+ }
+
+ /// Updates histogram values using provided definition levels
+ fn update_definition_level_histogram(&mut self, levels: &[i16]) {
+ if let Some(ref mut def_hist) = self.definition_level_histogram {
+ def_hist.update_from_levels(levels);
+ }
+ }
}
// Metrics per column writer
@@ -206,6 +254,8 @@ struct ColumnMetrics<T: Default> {
num_column_nulls: u64,
column_distinct_count: Option<u64>,
variable_length_bytes: Option<i64>,
+ repetition_level_histogram: Option<LevelHistogram>,
+ definition_level_histogram: Option<LevelHistogram>,
}
impl<T: Default> ColumnMetrics<T> {
@@ -213,6 +263,41 @@ impl<T: Default> ColumnMetrics<T> {
Default::default()
}
+ /// Initialize the repetition level histogram
+ fn with_repetition_level_histogram(mut self, max_level: i16) -> Self {
+ self.repetition_level_histogram = LevelHistogram::try_new(max_level);
+ self
+ }
+
+ /// Initialize the definition level histogram
+ fn with_definition_level_histogram(mut self, max_level: i16) -> Self {
+ self.definition_level_histogram = LevelHistogram::try_new(max_level);
+ self
+ }
+
+ /// Sum `page_histogram` into `chunk_histogram`
+ fn update_histogram(
+ chunk_histogram: &mut Option<LevelHistogram>,
+ page_histogram: &Option<LevelHistogram>,
+ ) {
+ if let (Some(page_hist), Some(chunk_hist)) = (page_histogram,
chunk_histogram) {
+ chunk_hist.add(page_hist);
+ }
+ }
+
+ /// Sum the provided PageMetrics histograms into the chunk histograms.
Does nothing if
+ /// page histograms are not initialized.
+ fn update_from_page_metrics(&mut self, page_metrics: &PageMetrics) {
+ ColumnMetrics::<T>::update_histogram(
+ &mut self.definition_level_histogram,
+ &page_metrics.definition_level_histogram,
+ );
+ ColumnMetrics::<T>::update_histogram(
+ &mut self.repetition_level_histogram,
+ &page_metrics.repetition_level_histogram,
+ );
+ }
+
/// Sum the provided page variable_length_bytes into the chunk
variable_length_bytes
fn update_variable_length_bytes(&mut self, variable_length_bytes:
Option<i64>) {
if let Some(var_bytes) = variable_length_bytes {
@@ -275,6 +360,19 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E>
{
// Used for level information
encodings.insert(Encoding::RLE);
+ let mut page_metrics = PageMetrics::new();
+ let mut column_metrics = ColumnMetrics::<E::T>::new();
+
+ // Initialize level histograms if collecting page or chunk statistics
+ if statistics_enabled != EnabledStatistics::None {
+ page_metrics = page_metrics
+ .with_repetition_level_histogram(descr.max_rep_level())
+ .with_definition_level_histogram(descr.max_def_level());
+ column_metrics = column_metrics
+ .with_repetition_level_histogram(descr.max_rep_level())
+ .with_definition_level_histogram(descr.max_def_level())
+ }
+
// Disable column_index_builder if not collecting page statistics.
let mut column_index_builder = ColumnIndexBuilder::new();
if statistics_enabled != EnabledStatistics::Page {
@@ -292,12 +390,8 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E>
{
def_levels_sink: vec![],
rep_levels_sink: vec![],
data_pages: VecDeque::new(),
- page_metrics: PageMetrics {
- num_buffered_values: 0,
- num_buffered_rows: 0,
- num_page_nulls: 0,
- },
- column_metrics: ColumnMetrics::<E::T>::new(),
+ page_metrics,
+ column_metrics,
column_index_builder,
offset_index_builder: OffsetIndexBuilder::new(),
encodings,
@@ -547,6 +641,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
}
}
+ // Update histogram
+ self.page_metrics.update_definition_level_histogram(levels);
+
self.def_levels_sink.extend_from_slice(levels);
values_to_write
} else {
@@ -575,6 +672,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
self.page_metrics.num_buffered_rows += (level == 0) as u32
}
+ // Update histogram
+ self.page_metrics.update_repetition_level_histogram(levels);
+
self.rep_levels_sink.extend_from_slice(levels);
} else {
// Each value is exactly one row.
@@ -718,7 +818,14 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E>
{
}
}
}
- // update the offset index
+
+ // Append page histograms to the `ColumnIndex` histograms
+ self.column_index_builder.append_histograms(
+ &self.page_metrics.repetition_level_histogram,
+ &self.page_metrics.definition_level_histogram,
+ );
+
+ // Update the offset index
self.offset_index_builder
.append_row_count(self.page_metrics.num_buffered_rows as i64);
@@ -804,7 +911,9 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E> {
values_data.variable_length_bytes,
);
- // Update variable_length_bytes in column_metrics
+ // Update histograms and variable_length_bytes in column_metrics
+ self.column_metrics
+ .update_from_page_metrics(&self.page_metrics);
self.column_metrics
.update_variable_length_bytes(values_data.variable_length_bytes);
@@ -911,7 +1020,7 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a, E>
{
// Reset state.
self.rep_levels_sink.clear();
self.def_levels_sink.clear();
- self.page_metrics = PageMetrics::default();
+ self.page_metrics.new_page();
Ok(())
}
@@ -1019,7 +1128,13 @@ impl<'a, E: ColumnValueEncoder> GenericColumnWriter<'a,
E> {
builder = builder
.set_statistics(statistics)
-
.set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes);
+
.set_unencoded_byte_array_data_bytes(self.column_metrics.variable_length_bytes)
+ .set_repetition_level_histogram(
+ self.column_metrics.repetition_level_histogram.take(),
+ )
+ .set_definition_level_histogram(
+ self.column_metrics.definition_level_histogram.take(),
+ );
}
let metadata = builder.build()?;
diff --git a/parquet/src/file/metadata/memory.rs
b/parquet/src/file/metadata/memory.rs
index 0b6d1f0d1..bb822b4cc 100644
--- a/parquet/src/file/metadata/memory.rs
+++ b/parquet/src/file/metadata/memory.rs
@@ -99,6 +99,8 @@ impl HeapSize for ColumnChunkMetaData {
+ self.statistics.heap_size()
+ self.encoding_stats.heap_size()
+ self.unencoded_byte_array_data_bytes.heap_size()
+ + self.repetition_level_histogram.heap_size()
+ + self.definition_level_histogram.heap_size()
}
}
diff --git a/parquet/src/file/metadata/mod.rs b/parquet/src/file/metadata/mod.rs
index 52206e66a..cd3555de8 100644
--- a/parquet/src/file/metadata/mod.rs
+++ b/parquet/src/file/metadata/mod.rs
@@ -557,6 +557,114 @@ pub struct ColumnChunkMetaData {
column_index_offset: Option<i64>,
column_index_length: Option<i32>,
unencoded_byte_array_data_bytes: Option<i64>,
+ repetition_level_histogram: Option<LevelHistogram>,
+ definition_level_histogram: Option<LevelHistogram>,
+}
+
+/// Histograms for repetition and definition levels.
+///
+/// Each histogram is a vector of length `max_level + 1`. The value at index
`i` is the number of
+/// values at level `i`.
+///
+/// For example, `vec[0]` is the number of rows with level 0, `vec[1]` is the
+/// number of rows with level 1, and so on.
+///
+#[derive(Debug, Clone, PartialEq)]
+pub struct LevelHistogram {
+ inner: Vec<i64>,
+}
+
+impl LevelHistogram {
+ /// Creates a new level histogram data.
+ ///
+ /// Length will be `max_level + 1`.
+ ///
+ /// Returns `None` when `max_level == 0` (because histograms are not
necessary in this case)
+ pub fn try_new(max_level: i16) -> Option<Self> {
+ if max_level > 0 {
+ Some(Self {
+ inner: vec![0; max_level as usize + 1],
+ })
+ } else {
+ None
+ }
+ }
+ /// Returns a reference to the the histogram's values.
+ pub fn values(&self) -> &[i64] {
+ &self.inner
+ }
+
+ /// Return the inner vector, consuming self
+ pub fn into_inner(self) -> Vec<i64> {
+ self.inner
+ }
+
+ /// Returns the histogram value at the given index.
+ ///
+ /// The value of `i` is the number of values with level `i`. For example,
+ /// `get(1)` returns the number of values with level 1.
+ ///
+ /// Returns `None` if the index is out of bounds.
+ pub fn get(&self, index: usize) -> Option<i64> {
+ self.inner.get(index).copied()
+ }
+
+ /// Adds the values from the other histogram to this histogram
+ ///
+ /// # Panics
+ /// If the histograms have different lengths
+ pub fn add(&mut self, other: &Self) {
+ assert_eq!(self.len(), other.len());
+ for (dst, src) in self.inner.iter_mut().zip(other.inner.iter()) {
+ *dst += src;
+ }
+ }
+
+ /// return the length of the histogram
+ pub fn len(&self) -> usize {
+ self.inner.len()
+ }
+
+ /// returns if the histogram is empty
+ pub fn is_empty(&self) -> bool {
+ self.inner.is_empty()
+ }
+
+ /// Sets the values of all histogram levels to 0.
+ pub fn reset(&mut self) {
+ for value in self.inner.iter_mut() {
+ *value = 0;
+ }
+ }
+
+ /// Updates histogram values using provided repetition levels
+ ///
+ /// # Panics
+ /// if any of the levels is greater than the length of the histogram (
+ /// the argument supplied to [`Self::try_new`])
+ pub fn update_from_levels(&mut self, levels: &[i16]) {
+ for &level in levels {
+ self.inner[level as usize] += 1;
+ }
+ }
+}
+
+impl From<Vec<i64>> for LevelHistogram {
+ fn from(inner: Vec<i64>) -> Self {
+ Self { inner }
+ }
+}
+
+impl From<LevelHistogram> for Vec<i64> {
+ fn from(value: LevelHistogram) -> Self {
+ value.into_inner()
+ }
+}
+
+impl HeapSize for LevelHistogram {
+ fn heap_size(&self) -> usize {
+ self.inner.heap_size()
+ }
}
/// Represents common operations for a column chunk.
@@ -717,6 +825,24 @@ impl ColumnChunkMetaData {
self.unencoded_byte_array_data_bytes
}
+ /// Returns the repetition level histogram.
+ ///
+ /// The returned value `vec[i]` is how many values are at repetition level
`i`. For example,
+ /// `vec[0]` indicates how many rows the page contains.
+ /// This field may not be set by older writers.
+ pub fn repetition_level_histogram(&self) -> Option<&LevelHistogram> {
+ self.repetition_level_histogram.as_ref()
+ }
+
+ /// Returns the definition level histogram.
+ ///
+ /// The returned value `vec[i]` is how many values are at definition level
`i`. For example,
+ /// `vec[max_definition_level]` indicates how many non-null values are
present in the page.
+ /// This field may not be set by older writers.
+ pub fn definition_level_histogram(&self) -> Option<&LevelHistogram> {
+ self.definition_level_histogram.as_ref()
+ }
+
/// Method to convert from Thrift.
pub fn from_thrift(column_descr: ColumnDescPtr, cc: ColumnChunk) ->
Result<Self> {
if cc.meta_data.is_none() {
@@ -754,13 +880,23 @@ impl ColumnChunkMetaData {
let offset_index_length = cc.offset_index_length;
let column_index_offset = cc.column_index_offset;
let column_index_length = cc.column_index_length;
- let unencoded_byte_array_data_bytes = if let Some(size_stats) =
col_metadata.size_statistics
- {
- size_stats.unencoded_byte_array_data_bytes
+ let (
+ unencoded_byte_array_data_bytes,
+ repetition_level_histogram,
+ definition_level_histogram,
+ ) = if let Some(size_stats) = col_metadata.size_statistics {
+ (
+ size_stats.unencoded_byte_array_data_bytes,
+ size_stats.repetition_level_histogram,
+ size_stats.definition_level_histogram,
+ )
} else {
- None
+ (None, None, None)
};
+ let repetition_level_histogram =
repetition_level_histogram.map(LevelHistogram::from);
+ let definition_level_histogram =
definition_level_histogram.map(LevelHistogram::from);
+
let result = ColumnChunkMetaData {
column_descr,
encodings,
@@ -782,6 +918,8 @@ impl ColumnChunkMetaData {
column_index_offset,
column_index_length,
unencoded_byte_array_data_bytes,
+ repetition_level_histogram,
+ definition_level_histogram,
};
Ok(result)
}
@@ -805,11 +943,24 @@ impl ColumnChunkMetaData {
/// Method to convert to Thrift `ColumnMetaData`
pub fn to_column_metadata_thrift(&self) -> ColumnMetaData {
- let size_statistics = if
self.unencoded_byte_array_data_bytes.is_some() {
+ let size_statistics = if self.unencoded_byte_array_data_bytes.is_some()
+ || self.repetition_level_histogram.is_some()
+ || self.definition_level_histogram.is_some()
+ {
+ let repetition_level_histogram = self
+ .repetition_level_histogram
+ .as_ref()
+ .map(|hist| hist.clone().into_inner());
+
+ let definition_level_histogram = self
+ .definition_level_histogram
+ .as_ref()
+ .map(|hist| hist.clone().into_inner());
+
Some(SizeStatistics {
unencoded_byte_array_data_bytes:
self.unencoded_byte_array_data_bytes,
- repetition_level_histogram: None,
- definition_level_histogram: None,
+ repetition_level_histogram,
+ definition_level_histogram,
})
} else {
None
@@ -871,6 +1022,8 @@ impl ColumnChunkMetaDataBuilder {
column_index_offset: None,
column_index_length: None,
unencoded_byte_array_data_bytes: None,
+ repetition_level_histogram: None,
+ definition_level_histogram: None,
})
}
@@ -988,6 +1141,18 @@ impl ColumnChunkMetaDataBuilder {
self
}
+ /// Sets optional repetition level histogram
+ pub fn set_repetition_level_histogram(mut self, value:
Option<LevelHistogram>) -> Self {
+ self.0.repetition_level_histogram = value;
+ self
+ }
+
+ /// Sets optional repetition level histogram
+ pub fn set_definition_level_histogram(mut self, value:
Option<LevelHistogram>) -> Self {
+ self.0.definition_level_histogram = value;
+ self
+ }
+
/// Builds column chunk metadata.
pub fn build(self) -> Result<ColumnChunkMetaData> {
Ok(self.0)
@@ -1003,6 +1168,10 @@ pub struct ColumnIndexBuilder {
max_values: Vec<Vec<u8>>,
null_counts: Vec<i64>,
boundary_order: BoundaryOrder,
+ /// contains the concatenation of the histograms of all pages
+ repetition_level_histograms: Option<Vec<i64>>,
+ /// contains the concatenation of the histograms of all pages
+ definition_level_histograms: Option<Vec<i64>>,
/// Is the information in the builder valid?
///
/// Set to `false` if any entry in the page doesn't have statistics for
@@ -1027,6 +1196,8 @@ impl ColumnIndexBuilder {
max_values: Vec::new(),
null_counts: Vec::new(),
boundary_order: BoundaryOrder::UNORDERED,
+ repetition_level_histograms: None,
+ definition_level_histograms: None,
valid: true,
}
}
@@ -1045,6 +1216,28 @@ impl ColumnIndexBuilder {
self.null_counts.push(null_count);
}
+ /// Append the given page-level histograms to the [`ColumnIndex`]
histograms.
+ /// Does nothing if the `ColumnIndexBuilder` is not in the `valid` state.
+ pub fn append_histograms(
+ &mut self,
+ repetition_level_histogram: &Option<LevelHistogram>,
+ definition_level_histogram: &Option<LevelHistogram>,
+ ) {
+ if !self.valid {
+ return;
+ }
+ if let Some(ref rep_lvl_hist) = repetition_level_histogram {
+ let hist =
self.repetition_level_histograms.get_or_insert(Vec::new());
+ hist.reserve(rep_lvl_hist.len());
+ hist.extend(rep_lvl_hist.values());
+ }
+ if let Some(ref def_lvl_hist) = definition_level_histogram {
+ let hist =
self.definition_level_histograms.get_or_insert(Vec::new());
+ hist.reserve(def_lvl_hist.len());
+ hist.extend(def_lvl_hist.values());
+ }
+ }
+
pub fn set_boundary_order(&mut self, boundary_order: BoundaryOrder) {
self.boundary_order = boundary_order;
}
@@ -1069,8 +1262,8 @@ impl ColumnIndexBuilder {
self.max_values,
self.boundary_order,
self.null_counts,
- None,
- None,
+ self.repetition_level_histograms,
+ self.definition_level_histograms,
)
}
}
@@ -1286,6 +1479,8 @@ mod tests {
.set_column_index_offset(Some(8000))
.set_column_index_length(Some(25))
.set_unencoded_byte_array_data_bytes(Some(2000))
+
.set_repetition_level_histogram(Some(LevelHistogram::from(vec![100, 100])))
+ .set_definition_level_histogram(Some(LevelHistogram::from(vec![0,
200])))
.build()
.unwrap();
@@ -1397,7 +1592,7 @@ mod tests {
let row_group_meta_with_stats = vec![row_group_meta_with_stats];
let parquet_meta = ParquetMetaData::new(file_metadata.clone(),
row_group_meta_with_stats);
- let base_expected_size = 2088;
+ let base_expected_size = 2280;
assert_eq!(parquet_meta.memory_size(), base_expected_size);
@@ -1425,7 +1620,7 @@ mod tests {
]]),
);
- let bigger_expected_size = 2400;
+ let bigger_expected_size = 2784;
// more set fields means more memory usage
assert!(bigger_expected_size > base_expected_size);
assert_eq!(parquet_meta.memory_size(), bigger_expected_size);
diff --git a/parquet/src/file/page_index/index.rs
b/parquet/src/file/page_index/index.rs
index 7eba94904..68412572b 100644
--- a/parquet/src/file/page_index/index.rs
+++ b/parquet/src/file/page_index/index.rs
@@ -36,6 +36,17 @@ pub struct PageIndex<T> {
pub max: Option<T>,
/// Null values in the page
pub null_count: Option<i64>,
+ /// Repetition level histogram for the page
+ ///
+ /// `repetition_level_histogram[i]` is a count of how many values are at
repetition level `i`.
+ /// For example, `repetition_level_histogram[0]` indicates how many rows
the page contains.
+ pub repetition_level_histogram: Option<Vec<i64>>,
+ /// Definition level histogram for the page
+ ///
+ /// `definition_level_histogram[i]` is a count of how many values are at
definition level `i`.
+ /// For example, `definition_level_histogram[max_definition_level]`
indicates how many
+ /// non-null values are present in the page.
+ pub definition_level_histogram: Option<Vec<i64>>,
}
impl<T> PageIndex<T> {
@@ -48,6 +59,12 @@ impl<T> PageIndex<T> {
pub fn null_count(&self) -> Option<i64> {
self.null_count
}
+ pub fn repetition_level_histogram(&self) -> Option<&Vec<i64>> {
+ self.repetition_level_histogram.as_ref()
+ }
+ pub fn definition_level_histogram(&self) -> Option<&Vec<i64>> {
+ self.definition_level_histogram.as_ref()
+ }
}
impl<T> PageIndex<T>
@@ -149,26 +166,57 @@ impl<T: ParquetValueType> NativeIndex<T> {
.map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
.unwrap_or_else(|| vec![None; len]);
+ // histograms are a 1D array encoding a 2D num_pages X num_levels
matrix.
+ let to_page_histograms = |opt_hist: Option<Vec<i64>>| {
+ if let Some(hist) = opt_hist {
+ // TODO: should we assert (hist.len() % len) == 0?
+ let num_levels = hist.len() / len;
+ let mut res = Vec::with_capacity(len);
+ for i in 0..len {
+ let page_idx = i * num_levels;
+ let page_hist = hist[page_idx..page_idx +
num_levels].to_vec();
+ res.push(Some(page_hist));
+ }
+ res
+ } else {
+ vec![None; len]
+ }
+ };
+
+ let rep_hists: Vec<Option<Vec<i64>>> =
+ to_page_histograms(index.repetition_level_histograms);
+ let def_hists: Vec<Option<Vec<i64>>> =
+ to_page_histograms(index.definition_level_histograms);
+
let indexes = index
.min_values
.iter()
.zip(index.max_values.into_iter())
.zip(index.null_pages.into_iter())
.zip(null_counts.into_iter())
- .map(|(((min, max), is_null), null_count)| {
- let (min, max) = if is_null {
- (None, None)
- } else {
- let min = min.as_slice();
- let max = max.as_slice();
- (Some(from_le_slice::<T>(min)),
Some(from_le_slice::<T>(max)))
- };
- Ok(PageIndex {
- min,
- max,
- null_count,
- })
- })
+ .zip(rep_hists.into_iter())
+ .zip(def_hists.into_iter())
+ .map(
+ |(
+ ((((min, max), is_null), null_count),
repetition_level_histogram),
+ definition_level_histogram,
+ )| {
+ let (min, max) = if is_null {
+ (None, None)
+ } else {
+ let min = min.as_slice();
+ let max = max.as_slice();
+ (Some(from_le_slice::<T>(min)),
Some(from_le_slice::<T>(max)))
+ };
+ Ok(PageIndex {
+ min,
+ max,
+ null_count,
+ repetition_level_histogram,
+ definition_level_histogram,
+ })
+ },
+ )
.collect::<Result<Vec<_>, ParquetError>>()?;
Ok(Self {
@@ -188,6 +236,8 @@ mod tests {
min: Some(-123),
max: Some(234),
null_count: Some(0),
+ repetition_level_histogram: Some(vec![1, 2]),
+ definition_level_histogram: Some(vec![1, 2, 3]),
};
assert_eq!(page_index.min().unwrap(), &-123);
@@ -195,6 +245,11 @@ mod tests {
assert_eq!(page_index.min_bytes().unwrap(), (-123).as_bytes());
assert_eq!(page_index.max_bytes().unwrap(), 234.as_bytes());
assert_eq!(page_index.null_count().unwrap(), 0);
+ assert_eq!(page_index.repetition_level_histogram(), Some(&vec![1, 2]));
+ assert_eq!(
+ page_index.definition_level_histogram(),
+ Some(&vec![1, 2, 3])
+ );
}
#[test]
@@ -203,6 +258,8 @@ mod tests {
min: None,
max: None,
null_count: None,
+ repetition_level_histogram: None,
+ definition_level_histogram: None,
};
assert_eq!(page_index.min(), None);
@@ -210,5 +267,7 @@ mod tests {
assert_eq!(page_index.min_bytes(), None);
assert_eq!(page_index.max_bytes(), None);
assert_eq!(page_index.null_count(), None);
+ assert_eq!(page_index.repetition_level_histogram(), None);
+ assert_eq!(page_index.definition_level_histogram(), None);
}
}
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index c44a7e669..f2e8f74a3 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -663,6 +663,12 @@ impl<'a, W: Write + Send> SerializedRowGroupWriter<'a, W> {
.set_dictionary_page_offset(src_dictionary_offset.map(map_offset))
.set_unencoded_byte_array_data_bytes(metadata.unencoded_byte_array_data_bytes());
+ if let Some(rep_hist) = metadata.repetition_level_histogram() {
+ builder =
builder.set_repetition_level_histogram(Some(rep_hist.clone()))
+ }
+ if let Some(def_hist) = metadata.definition_level_histogram() {
+ builder =
builder.set_definition_level_histogram(Some(def_hist.clone()))
+ }
if let Some(statistics) = metadata.statistics() {
builder = builder.set_statistics(statistics.clone())
}
@@ -1889,6 +1895,12 @@ mod tests {
assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
+ let check_def_hist = |def_hist: &[i64]| {
+ assert_eq!(def_hist.len(), 2);
+ assert_eq!(def_hist[0], 3);
+ assert_eq!(def_hist[1], 7);
+ };
+
assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
let meta_data = file_metadata.row_groups[0].columns[0]
.meta_data
@@ -1898,12 +1910,13 @@ mod tests {
let size_stats = meta_data.size_statistics.as_ref().unwrap();
assert!(size_stats.repetition_level_histogram.is_none());
- assert!(size_stats.definition_level_histogram.is_none());
+ assert!(size_stats.definition_level_histogram.is_some());
assert!(size_stats.unencoded_byte_array_data_bytes.is_some());
assert_eq!(
unenc_size,
size_stats.unencoded_byte_array_data_bytes.unwrap()
);
+
check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap());
// check that the read metadata is also correct
let options = ReadOptionsBuilder::new().with_page_index().build();
@@ -1915,12 +1928,31 @@ mod tests {
let rowgroup = reader.get_row_group(0).unwrap();
assert_eq!(rowgroup.num_columns(), 1);
let column = rowgroup.metadata().column(0);
+ assert!(column.definition_level_histogram().is_some());
+ assert!(column.repetition_level_histogram().is_none());
assert!(column.unencoded_byte_array_data_bytes().is_some());
+ check_def_hist(column.definition_level_histogram().unwrap().values());
assert_eq!(
unenc_size,
column.unencoded_byte_array_data_bytes().unwrap()
);
+ // check histogram in column index as well
+ assert!(reader.metadata().column_index().is_some());
+ let column_index = reader.metadata().column_index().unwrap();
+ assert_eq!(column_index.len(), 1);
+ assert_eq!(column_index[0].len(), 1);
+ let col_idx = if let Index::BYTE_ARRAY(index) = &column_index[0][0] {
+ assert_eq!(index.indexes.len(), 1);
+ &index.indexes[0]
+ } else {
+ unreachable!()
+ };
+
+ assert!(col_idx.repetition_level_histogram().is_none());
+ assert!(col_idx.definition_level_histogram().is_some());
+ check_def_hist(col_idx.definition_level_histogram().unwrap());
+
assert!(reader.metadata().offset_index().is_some());
let offset_index = reader.metadata().offset_index().unwrap();
assert_eq!(offset_index.len(), 1);
@@ -1933,4 +1965,114 @@ mod tests {
assert_eq!(page_sizes.len(), 1);
assert_eq!(page_sizes[0], unenc_size);
}
+
+ #[test]
+ fn test_size_statistics_with_repetition_and_nulls() {
+ let message_type = "
+ message test_schema {
+ OPTIONAL group i32_list (LIST) {
+ REPEATED group list {
+ OPTIONAL INT32 element;
+ }
+ }
+ }
+ ";
+ // column is:
+ // row 0: [1, 2]
+ // row 1: NULL
+ // row 2: [4, NULL]
+ // row 3: []
+ // row 4: [7, 8, 9, 10]
+ let schema = Arc::new(parse_message_type(message_type).unwrap());
+ let data = [1, 2, 4, 7, 8, 9, 10];
+ let def_levels = [3, 3, 0, 3, 2, 1, 3, 3, 3, 3];
+ let rep_levels = [0, 1, 0, 0, 1, 0, 0, 1, 1, 1];
+ let file = tempfile::tempfile().unwrap();
+ let props = Arc::new(
+ WriterProperties::builder()
+ .set_statistics_enabled(EnabledStatistics::Page)
+ .build(),
+ );
+ let mut writer = SerializedFileWriter::new(&file, schema,
props).unwrap();
+ let mut row_group_writer = writer.next_row_group().unwrap();
+
+ let mut col_writer = row_group_writer.next_column().unwrap().unwrap();
+ col_writer
+ .typed::<Int32Type>()
+ .write_batch(&data, Some(&def_levels), Some(&rep_levels))
+ .unwrap();
+ col_writer.close().unwrap();
+ row_group_writer.close().unwrap();
+ let file_metadata = writer.close().unwrap();
+
+ assert_eq!(file_metadata.row_groups.len(), 1);
+ assert_eq!(file_metadata.row_groups[0].columns.len(), 1);
+ assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
+
+ let check_def_hist = |def_hist: &[i64]| {
+ assert_eq!(def_hist.len(), 4);
+ assert_eq!(def_hist[0], 1);
+ assert_eq!(def_hist[1], 1);
+ assert_eq!(def_hist[2], 1);
+ assert_eq!(def_hist[3], 7);
+ };
+
+ let check_rep_hist = |rep_hist: &[i64]| {
+ assert_eq!(rep_hist.len(), 2);
+ assert_eq!(rep_hist[0], 5);
+ assert_eq!(rep_hist[1], 5);
+ };
+
+ // check that histograms are set properly in the write and read
metadata
+ // also check that unencoded_byte_array_data_bytes is not set
+ assert!(file_metadata.row_groups[0].columns[0].meta_data.is_some());
+ let meta_data = file_metadata.row_groups[0].columns[0]
+ .meta_data
+ .as_ref()
+ .unwrap();
+ assert!(meta_data.size_statistics.is_some());
+ let size_stats = meta_data.size_statistics.as_ref().unwrap();
+ assert!(size_stats.repetition_level_histogram.is_some());
+ assert!(size_stats.definition_level_histogram.is_some());
+ assert!(size_stats.unencoded_byte_array_data_bytes.is_none());
+
check_def_hist(size_stats.definition_level_histogram.as_ref().unwrap());
+
check_rep_hist(size_stats.repetition_level_histogram.as_ref().unwrap());
+
+ // check that the read metadata is also correct
+ let options = ReadOptionsBuilder::new().with_page_index().build();
+ let reader = SerializedFileReader::new_with_options(file,
options).unwrap();
+
+ let rfile_metadata = reader.metadata().file_metadata();
+ assert_eq!(rfile_metadata.num_rows(), file_metadata.num_rows);
+ assert_eq!(reader.num_row_groups(), 1);
+ let rowgroup = reader.get_row_group(0).unwrap();
+ assert_eq!(rowgroup.num_columns(), 1);
+ let column = rowgroup.metadata().column(0);
+ assert!(column.definition_level_histogram().is_some());
+ assert!(column.repetition_level_histogram().is_some());
+ assert!(column.unencoded_byte_array_data_bytes().is_none());
+ check_def_hist(column.definition_level_histogram().unwrap().values());
+ check_rep_hist(column.repetition_level_histogram().unwrap().values());
+
+ // check histogram in column index as well
+ assert!(reader.metadata().column_index().is_some());
+ let column_index = reader.metadata().column_index().unwrap();
+ assert_eq!(column_index.len(), 1);
+ assert_eq!(column_index[0].len(), 1);
+ let col_idx = if let Index::INT32(index) = &column_index[0][0] {
+ assert_eq!(index.indexes.len(), 1);
+ &index.indexes[0]
+ } else {
+ unreachable!()
+ };
+
+ check_def_hist(col_idx.definition_level_histogram().unwrap());
+ check_rep_hist(col_idx.repetition_level_histogram().unwrap());
+
+ assert!(reader.metadata().offset_index().is_some());
+ let offset_index = reader.metadata().offset_index().unwrap();
+ assert_eq!(offset_index.len(), 1);
+ assert_eq!(offset_index[0].len(), 1);
+ assert!(offset_index[0][0].unencoded_byte_array_data_bytes.is_none());
+ }
}