This is an automated email from the ASF dual-hosted git repository.
tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git
The following commit(s) were added to refs/heads/master by this push:
new ac5073f27 Support reading PageIndex from parquet metadata, prepare for
skipping pages at reading (#1762)
ac5073f27 is described below
commit ac5073f27cc51473362161122c7661e8051543b9
Author: Yang Jiang <[email protected]>
AuthorDate: Wed Jun 1 04:43:37 2022 +0800
Support reading PageIndex from parquet metadata, prepare for skipping pages
at reading (#1762)
* Add read options for column index based filtering
* try to deserialize pageIndex from parquet
* unable read pageIndex from parquet and test
* fix fmt
* remove FixedLenByteIndex use ByteIndex
* Apply suggestions from code review
Co-authored-by: Raphael Taylor-Davies
<[email protected]>
* fix comments
* use enum instead of trait object
* fix and add comment
* use from_le_slice instead of from_ne_slice
* use builder option
* remove useless trait
* fix from_le_slice
Co-authored-by: Raphael Taylor-Davies
<[email protected]>
---
parquet/src/basic.rs | 2 +-
parquet/src/data_type.rs | 20 ++-
parquet/src/file/metadata.rs | 31 ++++-
parquet/src/file/mod.rs | 1 +
parquet/src/file/page_index/index.rs | 209 ++++++++++++++++++++++++++++
parquet/src/file/page_index/index_reader.rs | 167 ++++++++++++++++++++++
parquet/src/file/page_index/mod.rs | 19 +++
parquet/src/file/serialized_reader.rs | 119 ++++++++++++++--
parquet/src/util/bit_util.rs | 11 ++
9 files changed, 559 insertions(+), 20 deletions(-)
diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs
index 7eff2156f..59a0fe07b 100644
--- a/parquet/src/basic.rs
+++ b/parquet/src/basic.rs
@@ -41,7 +41,7 @@ pub use parquet_format::{
/// control the on disk storage format.
/// For example INT16 is not included as a type since a good encoding of INT32
/// would handle this.
-#[derive(Debug, Clone, Copy, PartialEq)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum Type {
BOOLEAN,
INT32,
diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs
index c01fb1530..86ccefbd8 100644
--- a/parquet/src/data_type.rs
+++ b/parquet/src/data_type.rs
@@ -30,7 +30,7 @@ use crate::column::reader::{ColumnReader, ColumnReaderImpl};
use crate::column::writer::{ColumnWriter, ColumnWriterImpl};
use crate::errors::{ParquetError, Result};
use crate::util::{
- bit_util::{from_ne_slice, FromBytes},
+ bit_util::{from_le_slice, from_ne_slice, FromBytes},
memory::ByteBufferPtr,
};
@@ -1194,8 +1194,14 @@ make_type!(
impl FromBytes for Int96 {
type Buffer = [u8; 12];
- fn from_le_bytes(_bs: Self::Buffer) -> Self {
- unimplemented!()
+ fn from_le_bytes(bs: Self::Buffer) -> Self {
+ let mut i = Int96::new();
+ i.set_data(
+ from_le_slice(&bs[0..4]),
+ from_le_slice(&bs[4..8]),
+ from_le_slice(&bs[8..12]),
+ );
+ i
}
fn from_be_bytes(_bs: Self::Buffer) -> Self {
unimplemented!()
@@ -1215,8 +1221,8 @@ impl FromBytes for Int96 {
// appear to actual be converted directly from bytes
impl FromBytes for ByteArray {
type Buffer = [u8; 8];
- fn from_le_bytes(_bs: Self::Buffer) -> Self {
- unreachable!()
+ fn from_le_bytes(bs: Self::Buffer) -> Self {
+ ByteArray::from(bs.to_vec())
}
fn from_be_bytes(_bs: Self::Buffer) -> Self {
unreachable!()
@@ -1229,8 +1235,8 @@ impl FromBytes for ByteArray {
impl FromBytes for FixedLenByteArray {
type Buffer = [u8; 8];
- fn from_le_bytes(_bs: Self::Buffer) -> Self {
- unreachable!()
+ fn from_le_bytes(bs: Self::Buffer) -> Self {
+ Self(ByteArray::from(bs.to_vec()))
}
fn from_be_bytes(_bs: Self::Buffer) -> Self {
unreachable!()
diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs
index 1d35d1963..2a385be51 100644
--- a/parquet/src/file/metadata.rs
+++ b/parquet/src/file/metadata.rs
@@ -35,11 +35,12 @@
use std::sync::Arc;
-use parquet_format::{ColumnChunk, ColumnMetaData, RowGroup};
+use parquet_format::{ColumnChunk, ColumnMetaData, PageLocation, RowGroup};
use crate::basic::{ColumnOrder, Compression, Encoding, Type};
use crate::errors::{ParquetError, Result};
use crate::file::page_encoding_stats::{self, PageEncodingStats};
+use crate::file::page_index::index::Index;
use crate::file::statistics::{self, Statistics};
use crate::schema::types::{
ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr,
SchemaDescriptor,
@@ -51,6 +52,8 @@ use crate::schema::types::{
pub struct ParquetMetaData {
file_metadata: FileMetaData,
row_groups: Vec<RowGroupMetaData>,
+ page_indexes: Option<Vec<Index>>,
+ offset_indexes: Option<Vec<Vec<PageLocation>>>,
}
impl ParquetMetaData {
@@ -60,6 +63,22 @@ impl ParquetMetaData {
ParquetMetaData {
file_metadata,
row_groups,
+ page_indexes: None,
+ offset_indexes: None,
+ }
+ }
+
+ pub fn new_with_page_index(
+ file_metadata: FileMetaData,
+ row_groups: Vec<RowGroupMetaData>,
+ page_indexes: Option<Vec<Index>>,
+ offset_indexes: Option<Vec<Vec<PageLocation>>>,
+ ) -> Self {
+ ParquetMetaData {
+ file_metadata,
+ row_groups,
+ page_indexes,
+ offset_indexes,
}
}
@@ -83,6 +102,16 @@ impl ParquetMetaData {
pub fn row_groups(&self) -> &[RowGroupMetaData] {
&self.row_groups
}
+
+ /// Returns page indexes in this file.
+ pub fn page_indexes(&self) -> Option<&Vec<Index>> {
+ self.page_indexes.as_ref()
+ }
+
+ /// Returns offset indexes in this file.
+ pub fn offset_indexes(&self) -> Option<&Vec<Vec<PageLocation>>> {
+ self.offset_indexes.as_ref()
+ }
}
pub type KeyValue = parquet_format::KeyValue;
diff --git a/parquet/src/file/mod.rs b/parquet/src/file/mod.rs
index d293dc773..65e85d53a 100644
--- a/parquet/src/file/mod.rs
+++ b/parquet/src/file/mod.rs
@@ -98,6 +98,7 @@
pub mod footer;
pub mod metadata;
pub mod page_encoding_stats;
+pub mod page_index;
pub mod properties;
pub mod reader;
pub mod serialized_reader;
diff --git a/parquet/src/file/page_index/index.rs
b/parquet/src/file/page_index/index.rs
new file mode 100644
index 000000000..e97826c63
--- /dev/null
+++ b/parquet/src/file/page_index/index.rs
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::basic::Type;
+use crate::data_type::private::ParquetValueType;
+use crate::data_type::Int96;
+use crate::errors::ParquetError;
+use crate::util::bit_util::from_le_slice;
+use parquet_format::{BoundaryOrder, ColumnIndex};
+use std::fmt::Debug;
+
+/// The statistics in one page
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct PageIndex<T> {
+ /// The minimum value, It is None when all values are null
+ pub min: Option<T>,
+ /// The maximum value, It is None when all values are null
+ pub max: Option<T>,
+ /// Null values in the page
+ pub null_count: Option<i64>,
+}
+
+impl<T> PageIndex<T> {
+ pub fn min(&self) -> Option<&T> {
+ self.min.as_ref()
+ }
+ pub fn max(&self) -> Option<&T> {
+ self.max.as_ref()
+ }
+ pub fn null_count(&self) -> Option<i64> {
+ self.null_count
+ }
+}
+
+#[derive(Debug, Clone, PartialEq)]
+pub enum Index {
+ BOOLEAN(BooleanIndex),
+ INT32(NativeIndex<i32>),
+ INT64(NativeIndex<i64>),
+ INT96(NativeIndex<Int96>),
+ FLOAT(NativeIndex<f32>),
+ DOUBLE(NativeIndex<f64>),
+ BYTE_ARRAY(ByteArrayIndex),
+ FIXED_LEN_BYTE_ARRAY(ByteArrayIndex),
+}
+
+/// An index of a column of [`Type`] physical representation
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct NativeIndex<T: ParquetValueType> {
+ /// The physical type
+ pub physical_type: Type,
+ /// The indexes, one item per page
+ pub indexes: Vec<PageIndex<T>>,
+ /// the order
+ pub boundary_order: BoundaryOrder,
+}
+
+impl<T: ParquetValueType> NativeIndex<T> {
+ /// Creates a new [`NativeIndex`]
+ pub(crate) fn try_new(
+ index: ColumnIndex,
+ physical_type: Type,
+ ) -> Result<Self, ParquetError> {
+ let len = index.min_values.len();
+
+ let null_counts = index
+ .null_counts
+ .map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
+ .unwrap_or_else(|| vec![None; len]);
+
+ let indexes = index
+ .min_values
+ .iter()
+ .zip(index.max_values.into_iter())
+ .zip(index.null_pages.into_iter())
+ .zip(null_counts.into_iter())
+ .map(|(((min, max), is_null), null_count)| {
+ let (min, max) = if is_null {
+ (None, None)
+ } else {
+ let min = min.as_slice();
+ let max = max.as_slice();
+ (Some(from_le_slice::<T>(min)),
Some(from_le_slice::<T>(max)))
+ };
+ Ok(PageIndex {
+ min,
+ max,
+ null_count,
+ })
+ })
+ .collect::<Result<Vec<_>, ParquetError>>()?;
+
+ Ok(Self {
+ physical_type,
+ indexes,
+ boundary_order: index.boundary_order,
+ })
+ }
+}
+
+/// An index of a column of bytes type
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct ByteArrayIndex {
+ /// The physical type
+ pub physical_type: Type,
+ /// The indexes, one item per page
+ pub indexes: Vec<PageIndex<Vec<u8>>>,
+ pub boundary_order: BoundaryOrder,
+}
+
+impl ByteArrayIndex {
+ pub(crate) fn try_new(
+ index: ColumnIndex,
+ physical_type: Type,
+ ) -> Result<Self, ParquetError> {
+ let len = index.min_values.len();
+
+ let null_counts = index
+ .null_counts
+ .map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
+ .unwrap_or_else(|| vec![None; len]);
+
+ let indexes = index
+ .min_values
+ .into_iter()
+ .zip(index.max_values.into_iter())
+ .zip(index.null_pages.into_iter())
+ .zip(null_counts.into_iter())
+ .map(|(((min, max), is_null), null_count)| {
+ let (min, max) = if is_null {
+ (None, None)
+ } else {
+ (Some(min), Some(max))
+ };
+ Ok(PageIndex {
+ min,
+ max,
+ null_count,
+ })
+ })
+ .collect::<Result<Vec<_>, ParquetError>>()?;
+
+ Ok(Self {
+ physical_type,
+ indexes,
+ boundary_order: index.boundary_order,
+ })
+ }
+}
+
+/// An index of a column of boolean physical type
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct BooleanIndex {
+ /// The indexes, one item per page
+ pub indexes: Vec<PageIndex<bool>>,
+ pub boundary_order: BoundaryOrder,
+}
+
+impl BooleanIndex {
+ pub(crate) fn try_new(index: ColumnIndex) -> Result<Self, ParquetError> {
+ let len = index.min_values.len();
+
+ let null_counts = index
+ .null_counts
+ .map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
+ .unwrap_or_else(|| vec![None; len]);
+
+ let indexes = index
+ .min_values
+ .into_iter()
+ .zip(index.max_values.into_iter())
+ .zip(index.null_pages.into_iter())
+ .zip(null_counts.into_iter())
+ .map(|(((min, max), is_null), null_count)| {
+ let (min, max) = if is_null {
+ (None, None)
+ } else {
+ let min = min[0] != 0;
+ let max = max[0] == 1;
+ (Some(min), Some(max))
+ };
+ Ok(PageIndex {
+ min,
+ max,
+ null_count,
+ })
+ })
+ .collect::<Result<Vec<_>, ParquetError>>()?;
+
+ Ok(Self {
+ indexes,
+ boundary_order: index.boundary_order,
+ })
+ }
+}
diff --git a/parquet/src/file/page_index/index_reader.rs
b/parquet/src/file/page_index/index_reader.rs
new file mode 100644
index 000000000..841448090
--- /dev/null
+++ b/parquet/src/file/page_index/index_reader.rs
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::basic::Type;
+use crate::data_type::Int96;
+use crate::errors::ParquetError;
+use crate::file::metadata::ColumnChunkMetaData;
+use crate::file::page_index::index::{BooleanIndex, ByteArrayIndex, Index,
NativeIndex};
+use crate::file::reader::ChunkReader;
+use parquet_format::{ColumnIndex, OffsetIndex, PageLocation};
+use std::io::{Cursor, Read};
+use thrift::protocol::TCompactInputProtocol;
+
+/// Read on row group's all columns indexes and change into [`Index`]
+/// If not the format not available return an empty vector.
+pub fn read_columns_indexes<R: ChunkReader>(
+ reader: &R,
+ chunks: &[ColumnChunkMetaData],
+) -> Result<Vec<Index>, ParquetError> {
+ let (offset, lengths) = get_index_offset_and_lengths(chunks)?;
+ let length = lengths.iter().sum::<usize>();
+
+ //read all need data into buffer
+ let mut reader = reader.get_read(offset, reader.len() as usize)?;
+ let mut data = vec![0; length];
+ reader.read_exact(&mut data)?;
+
+ let mut start = 0;
+ let data = lengths.into_iter().map(|length| {
+ let r = &data[start..start + length];
+ start += length;
+ r
+ });
+
+ chunks
+ .iter()
+ .zip(data)
+ .map(|(chunk, data)| {
+ let column_type = chunk.column_type();
+ deserialize_column_index(data, column_type)
+ })
+ .collect()
+}
+
+/// Read on row group's all indexes and change into [`Index`]
+/// If not the format not available return an empty vector.
+pub fn read_pages_locations<R: ChunkReader>(
+ reader: &R,
+ chunks: &[ColumnChunkMetaData],
+) -> Result<Vec<Vec<PageLocation>>, ParquetError> {
+ let (offset, total_length) = get_location_offset_and_total_length(chunks)?;
+
+ //read all need data into buffer
+ let mut reader = reader.get_read(offset, reader.len() as usize)?;
+ let mut data = vec![0; total_length];
+ reader.read_exact(&mut data)?;
+
+ let mut d = Cursor::new(data);
+ let mut result = vec![];
+
+ for _ in 0..chunks.len() {
+ let mut prot = TCompactInputProtocol::new(&mut d);
+ let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
+ result.push(offset.page_locations);
+ }
+ Ok(result)
+}
+
+//Get File offsets of every ColumnChunk's page_index
+//If there are invalid offset return a zero offset with empty lengths.
+fn get_index_offset_and_lengths(
+ chunks: &[ColumnChunkMetaData],
+) -> Result<(u64, Vec<usize>), ParquetError> {
+ let first_col_metadata = if let Some(chunk) = chunks.first() {
+ chunk
+ } else {
+ return Ok((0, vec![]));
+ };
+
+ let offset: u64 = if let Some(offset) =
first_col_metadata.column_index_offset() {
+ offset.try_into().unwrap()
+ } else {
+ return Ok((0, vec![]));
+ };
+
+ let lengths = chunks
+ .iter()
+ .map(|x| x.column_index_length())
+ .map(|maybe_length| {
+ let index_length = maybe_length.ok_or_else(|| {
+ ParquetError::General(
+ "The column_index_length must exist if offset_index_offset
exists"
+ .to_string(),
+ )
+ })?;
+
+ Ok(index_length.try_into().unwrap())
+ })
+ .collect::<Result<Vec<_>, ParquetError>>()?;
+
+ Ok((offset, lengths))
+}
+
+//Get File offset of ColumnChunk's pages_locations
+//If there are invalid offset return a zero offset with zero length.
+fn get_location_offset_and_total_length(
+ chunks: &[ColumnChunkMetaData],
+) -> Result<(u64, usize), ParquetError> {
+ let metadata = if let Some(chunk) = chunks.first() {
+ chunk
+ } else {
+ return Ok((0, 0));
+ };
+
+ let offset: u64 = if let Some(offset) = metadata.offset_index_offset() {
+ offset.try_into().unwrap()
+ } else {
+ return Ok((0, 0));
+ };
+
+ let total_length = chunks
+ .iter()
+ .map(|x| x.offset_index_length().unwrap())
+ .sum::<i32>() as usize;
+ Ok((offset, total_length))
+}
+
+fn deserialize_column_index(
+ data: &[u8],
+ column_type: Type,
+) -> Result<Index, ParquetError> {
+ let mut d = Cursor::new(data);
+ let mut prot = TCompactInputProtocol::new(&mut d);
+
+ let index = ColumnIndex::read_from_in_protocol(&mut prot)?;
+
+ let index = match column_type {
+ Type::BOOLEAN => Index::BOOLEAN(BooleanIndex::try_new(index)?),
+ Type::INT32 => Index::INT32(NativeIndex::<i32>::try_new(index,
column_type)?),
+ Type::INT64 => Index::INT64(NativeIndex::<i64>::try_new(index,
column_type)?),
+ Type::INT96 => Index::INT96(NativeIndex::<Int96>::try_new(index,
column_type)?),
+ Type::FLOAT => Index::FLOAT(NativeIndex::<f32>::try_new(index,
column_type)?),
+ Type::DOUBLE => Index::DOUBLE(NativeIndex::<f64>::try_new(index,
column_type)?),
+ Type::BYTE_ARRAY => {
+ Index::BYTE_ARRAY(ByteArrayIndex::try_new(index, column_type)?)
+ }
+ Type::FIXED_LEN_BYTE_ARRAY => {
+ Index::FIXED_LEN_BYTE_ARRAY(ByteArrayIndex::try_new(index,
column_type)?)
+ }
+ };
+
+ Ok(index)
+}
diff --git a/parquet/src/file/page_index/mod.rs
b/parquet/src/file/page_index/mod.rs
new file mode 100644
index 000000000..dcc1120fc
--- /dev/null
+++ b/parquet/src/file/page_index/mod.rs
@@ -0,0 +1,19 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub mod index;
+pub mod index_reader;
diff --git a/parquet/src/file/serialized_reader.rs
b/parquet/src/file/serialized_reader.rs
index 8059157aa..1dd374ef8 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -27,6 +27,7 @@ use crate::basic::{Compression, Encoding, Type};
use crate::column::page::{Page, PageReader};
use crate::compression::{create_codec, Codec};
use crate::errors::{ParquetError, Result};
+use crate::file::page_index::index_reader;
use crate::file::{footer, metadata::*, reader::*, statistics};
use crate::record::reader::RowIter;
use crate::record::Row;
@@ -132,12 +133,16 @@ pub struct SerializedFileReader<R: ChunkReader> {
/// they will be chained using 'AND' to filter the row groups.
pub struct ReadOptionsBuilder {
predicates: Vec<Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>>,
+ enable_page_index: bool,
}
impl ReadOptionsBuilder {
/// New builder
pub fn new() -> Self {
- ReadOptionsBuilder { predicates: vec![] }
+ ReadOptionsBuilder {
+ predicates: vec![],
+ enable_page_index: false,
+ }
}
/// Add a predicate on row group metadata to the reading option,
@@ -162,10 +167,17 @@ impl ReadOptionsBuilder {
self
}
+ /// Enable page index in the reading option,
+ pub fn with_page_index(mut self) -> Self {
+ self.enable_page_index = true;
+ self
+ }
+
/// Seal the builder and return the read options
pub fn build(self) -> ReadOptions {
ReadOptions {
predicates: self.predicates,
+ enable_page_index: self.enable_page_index,
}
}
}
@@ -176,6 +188,7 @@ impl ReadOptionsBuilder {
/// All predicates will be chained using 'AND' to filter the row groups.
pub struct ReadOptions {
predicates: Vec<Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>>,
+ enable_page_index: bool,
}
impl<R: 'static + ChunkReader> SerializedFileReader<R> {
@@ -209,13 +222,33 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
}
}
- Ok(Self {
- chunk_reader: Arc::new(chunk_reader),
- metadata: ParquetMetaData::new(
- metadata.file_metadata().clone(),
- filtered_row_groups,
- ),
- })
+ if options.enable_page_index {
+ //Todo for now test data `data_index_bloom_encoding_stats.parquet`
only have one rowgroup
+ //support multi after create multi-RG test data.
+ let cols = metadata.row_group(0);
+ let columns_indexes =
+ index_reader::read_columns_indexes(&chunk_reader,
cols.columns())?;
+ let pages_locations =
+ index_reader::read_pages_locations(&chunk_reader,
cols.columns())?;
+
+ Ok(Self {
+ chunk_reader: Arc::new(chunk_reader),
+ metadata: ParquetMetaData::new_with_page_index(
+ metadata.file_metadata().clone(),
+ filtered_row_groups,
+ Some(columns_indexes),
+ Some(pages_locations),
+ ),
+ })
+ } else {
+ Ok(Self {
+ chunk_reader: Arc::new(chunk_reader),
+ metadata: ParquetMetaData::new(
+ metadata.file_metadata().clone(),
+ filtered_row_groups,
+ ),
+ })
+ }
}
}
@@ -475,9 +508,11 @@ impl<T: Read + Send> PageReader for
SerializedPageReader<T> {
mod tests {
use super::*;
use crate::basic::{self, ColumnOrder};
+ use crate::file::page_index::index::Index;
use crate::record::RowAccessor;
use crate::schema::parser::parse_message_type;
use crate::util::test_common::{get_test_file, get_test_path};
+ use parquet_format::BoundaryOrder;
use std::sync::Arc;
#[test]
@@ -605,9 +640,9 @@ mod tests {
let file_metadata = metadata.file_metadata();
assert!(file_metadata.created_by().is_some());
assert_eq!(
- file_metadata.created_by().unwrap(),
- "impala version 1.3.0-INTERNAL (build
8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)"
- );
+ file_metadata.created_by().unwrap(),
+ "impala version 1.3.0-INTERNAL (build
8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)"
+ );
assert!(file_metadata.key_value_metadata().is_none());
assert_eq!(file_metadata.num_rows(), 8);
assert_eq!(file_metadata.version(), 1);
@@ -955,4 +990,66 @@ mod tests {
assert_eq!(metadata.num_row_groups(), 0);
Ok(())
}
+
+ #[test]
+ // Use java parquet-tools get below pageIndex info
+ // !```
+ // parquet-tools column-index ./data_index_bloom_encoding_stats.parquet
+ // row group 0:
+ // column index for column String:
+ // Boudary order: ASCENDING
+ // page-0 :
+ // null count min max
+ // 0 Hello today
+ //
+ // offset index for column String:
+ // page-0 :
+ // offset compressed size first row index
+ // 4 152 0
+ ///```
+ //
+ fn test_page_index_reader() {
+ let test_file =
get_test_file("data_index_bloom_encoding_stats.parquet");
+ let builder = ReadOptionsBuilder::new();
+ //enable read page index
+ let options = builder.with_page_index().build();
+ let reader_result = SerializedFileReader::new_with_options(test_file,
options);
+ let reader = reader_result.unwrap();
+
+ // Test contents in Parquet metadata
+ let metadata = reader.metadata();
+ assert_eq!(metadata.num_row_groups(), 1);
+
+ let page_indexes = metadata.page_indexes().unwrap();
+
+ // only one row group
+ assert_eq!(page_indexes.len(), 1);
+ let index = if let Index::BYTE_ARRAY(index) =
page_indexes.get(0).unwrap() {
+ index
+ } else {
+ unreachable!()
+ };
+
+ assert_eq!(index.boundary_order, BoundaryOrder::Ascending);
+ let index_in_pages = &index.indexes;
+
+ //only one page group
+ assert_eq!(index_in_pages.len(), 1);
+
+ let page0 = index_in_pages.get(0).unwrap();
+ let min = page0.min.as_ref().unwrap();
+ let max = page0.max.as_ref().unwrap();
+ assert_eq!("Hello", std::str::from_utf8(min.as_slice()).unwrap());
+ assert_eq!("today", std::str::from_utf8(max.as_slice()).unwrap());
+
+ let offset_indexes = metadata.offset_indexes().unwrap();
+ // only one row group
+ assert_eq!(offset_indexes.len(), 1);
+ let offset_index = offset_indexes.get(0).unwrap();
+ let page_offset = offset_index.get(0).unwrap();
+
+ assert_eq!(4, page_offset.offset);
+ assert_eq!(152, page_offset.compressed_page_size);
+ assert_eq!(0, page_offset.first_row_index);
+ }
}
diff --git a/parquet/src/util/bit_util.rs b/parquet/src/util/bit_util.rs
index 288c771b0..b535ee02a 100644
--- a/parquet/src/util/bit_util.rs
+++ b/parquet/src/util/bit_util.rs
@@ -32,6 +32,17 @@ pub fn from_ne_slice<T: FromBytes>(bs: &[u8]) -> T {
T::from_ne_bytes(b)
}
+#[inline]
+pub fn from_le_slice<T: FromBytes>(bs: &[u8]) -> T {
+ let mut b = T::Buffer::default();
+ {
+ let b = b.as_mut();
+ let bs = &bs[..b.len()];
+ b.copy_from_slice(bs);
+ }
+ T::from_le_bytes(b)
+}
+
pub trait FromBytes: Sized {
type Buffer: AsMut<[u8]> + Default;
fn from_le_bytes(bs: Self::Buffer) -> Self;