This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new ac5073f27 Support reading PageIndex from parquet metadata, prepare for 
skipping pages at reading (#1762)
ac5073f27 is described below

commit ac5073f27cc51473362161122c7661e8051543b9
Author: Yang Jiang <[email protected]>
AuthorDate: Wed Jun 1 04:43:37 2022 +0800

    Support reading PageIndex from parquet metadata, prepare for skipping pages 
at reading (#1762)
    
    * Add read options for column index based filtering
    
    * try to deserialize pageIndex from parquet
    
    * unable read pageIndex from parquet and test
    
    * fix fmt
    
    * remove FixedLenByteIndex use ByteIndex
    
    * Apply suggestions from code review
    
    Co-authored-by: Raphael Taylor-Davies 
<[email protected]>
    
    * fix comments
    
    * use enum instead of trait object
    
    * fix and add comment
    
    * use from_le_slice instead of from_ne_slice
    
    * use builder option
    
    * remove useless trait
    
    * fix from_le_slice
    
    Co-authored-by: Raphael Taylor-Davies 
<[email protected]>
---
 parquet/src/basic.rs                        |   2 +-
 parquet/src/data_type.rs                    |  20 ++-
 parquet/src/file/metadata.rs                |  31 ++++-
 parquet/src/file/mod.rs                     |   1 +
 parquet/src/file/page_index/index.rs        | 209 ++++++++++++++++++++++++++++
 parquet/src/file/page_index/index_reader.rs | 167 ++++++++++++++++++++++
 parquet/src/file/page_index/mod.rs          |  19 +++
 parquet/src/file/serialized_reader.rs       | 119 ++++++++++++++--
 parquet/src/util/bit_util.rs                |  11 ++
 9 files changed, 559 insertions(+), 20 deletions(-)

diff --git a/parquet/src/basic.rs b/parquet/src/basic.rs
index 7eff2156f..59a0fe07b 100644
--- a/parquet/src/basic.rs
+++ b/parquet/src/basic.rs
@@ -41,7 +41,7 @@ pub use parquet_format::{
 /// control the on disk storage format.
 /// For example INT16 is not included as a type since a good encoding of INT32
 /// would handle this.
-#[derive(Debug, Clone, Copy, PartialEq)]
+#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
 pub enum Type {
     BOOLEAN,
     INT32,
diff --git a/parquet/src/data_type.rs b/parquet/src/data_type.rs
index c01fb1530..86ccefbd8 100644
--- a/parquet/src/data_type.rs
+++ b/parquet/src/data_type.rs
@@ -30,7 +30,7 @@ use crate::column::reader::{ColumnReader, ColumnReaderImpl};
 use crate::column::writer::{ColumnWriter, ColumnWriterImpl};
 use crate::errors::{ParquetError, Result};
 use crate::util::{
-    bit_util::{from_ne_slice, FromBytes},
+    bit_util::{from_le_slice, from_ne_slice, FromBytes},
     memory::ByteBufferPtr,
 };
 
@@ -1194,8 +1194,14 @@ make_type!(
 
 impl FromBytes for Int96 {
     type Buffer = [u8; 12];
-    fn from_le_bytes(_bs: Self::Buffer) -> Self {
-        unimplemented!()
+    fn from_le_bytes(bs: Self::Buffer) -> Self {
+        let mut i = Int96::new();
+        i.set_data(
+            from_le_slice(&bs[0..4]),
+            from_le_slice(&bs[4..8]),
+            from_le_slice(&bs[8..12]),
+        );
+        i
     }
     fn from_be_bytes(_bs: Self::Buffer) -> Self {
         unimplemented!()
@@ -1215,8 +1221,8 @@ impl FromBytes for Int96 {
 // appear to actual be converted directly from bytes
 impl FromBytes for ByteArray {
     type Buffer = [u8; 8];
-    fn from_le_bytes(_bs: Self::Buffer) -> Self {
-        unreachable!()
+    fn from_le_bytes(bs: Self::Buffer) -> Self {
+        ByteArray::from(bs.to_vec())
     }
     fn from_be_bytes(_bs: Self::Buffer) -> Self {
         unreachable!()
@@ -1229,8 +1235,8 @@ impl FromBytes for ByteArray {
 impl FromBytes for FixedLenByteArray {
     type Buffer = [u8; 8];
 
-    fn from_le_bytes(_bs: Self::Buffer) -> Self {
-        unreachable!()
+    fn from_le_bytes(bs: Self::Buffer) -> Self {
+        Self(ByteArray::from(bs.to_vec()))
     }
     fn from_be_bytes(_bs: Self::Buffer) -> Self {
         unreachable!()
diff --git a/parquet/src/file/metadata.rs b/parquet/src/file/metadata.rs
index 1d35d1963..2a385be51 100644
--- a/parquet/src/file/metadata.rs
+++ b/parquet/src/file/metadata.rs
@@ -35,11 +35,12 @@
 
 use std::sync::Arc;
 
-use parquet_format::{ColumnChunk, ColumnMetaData, RowGroup};
+use parquet_format::{ColumnChunk, ColumnMetaData, PageLocation, RowGroup};
 
 use crate::basic::{ColumnOrder, Compression, Encoding, Type};
 use crate::errors::{ParquetError, Result};
 use crate::file::page_encoding_stats::{self, PageEncodingStats};
+use crate::file::page_index::index::Index;
 use crate::file::statistics::{self, Statistics};
 use crate::schema::types::{
     ColumnDescPtr, ColumnDescriptor, ColumnPath, SchemaDescPtr, 
SchemaDescriptor,
@@ -51,6 +52,8 @@ use crate::schema::types::{
 pub struct ParquetMetaData {
     file_metadata: FileMetaData,
     row_groups: Vec<RowGroupMetaData>,
+    page_indexes: Option<Vec<Index>>,
+    offset_indexes: Option<Vec<Vec<PageLocation>>>,
 }
 
 impl ParquetMetaData {
@@ -60,6 +63,22 @@ impl ParquetMetaData {
         ParquetMetaData {
             file_metadata,
             row_groups,
+            page_indexes: None,
+            offset_indexes: None,
+        }
+    }
+
+    pub fn new_with_page_index(
+        file_metadata: FileMetaData,
+        row_groups: Vec<RowGroupMetaData>,
+        page_indexes: Option<Vec<Index>>,
+        offset_indexes: Option<Vec<Vec<PageLocation>>>,
+    ) -> Self {
+        ParquetMetaData {
+            file_metadata,
+            row_groups,
+            page_indexes,
+            offset_indexes,
         }
     }
 
@@ -83,6 +102,16 @@ impl ParquetMetaData {
     pub fn row_groups(&self) -> &[RowGroupMetaData] {
         &self.row_groups
     }
+
+    /// Returns page indexes in this file.
+    pub fn page_indexes(&self) -> Option<&Vec<Index>> {
+        self.page_indexes.as_ref()
+    }
+
+    /// Returns offset indexes in this file.
+    pub fn offset_indexes(&self) -> Option<&Vec<Vec<PageLocation>>> {
+        self.offset_indexes.as_ref()
+    }
 }
 
 pub type KeyValue = parquet_format::KeyValue;
diff --git a/parquet/src/file/mod.rs b/parquet/src/file/mod.rs
index d293dc773..65e85d53a 100644
--- a/parquet/src/file/mod.rs
+++ b/parquet/src/file/mod.rs
@@ -98,6 +98,7 @@
 pub mod footer;
 pub mod metadata;
 pub mod page_encoding_stats;
+pub mod page_index;
 pub mod properties;
 pub mod reader;
 pub mod serialized_reader;
diff --git a/parquet/src/file/page_index/index.rs 
b/parquet/src/file/page_index/index.rs
new file mode 100644
index 000000000..e97826c63
--- /dev/null
+++ b/parquet/src/file/page_index/index.rs
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::basic::Type;
+use crate::data_type::private::ParquetValueType;
+use crate::data_type::Int96;
+use crate::errors::ParquetError;
+use crate::util::bit_util::from_le_slice;
+use parquet_format::{BoundaryOrder, ColumnIndex};
+use std::fmt::Debug;
+
+/// The statistics in one page
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct PageIndex<T> {
+    /// The minimum value, It is None when all values are null
+    pub min: Option<T>,
+    /// The maximum value, It is None when all values are null
+    pub max: Option<T>,
+    /// Null values in the page
+    pub null_count: Option<i64>,
+}
+
+impl<T> PageIndex<T> {
+    pub fn min(&self) -> Option<&T> {
+        self.min.as_ref()
+    }
+    pub fn max(&self) -> Option<&T> {
+        self.max.as_ref()
+    }
+    pub fn null_count(&self) -> Option<i64> {
+        self.null_count
+    }
+}
+
+#[derive(Debug, Clone, PartialEq)]
+pub enum Index {
+    BOOLEAN(BooleanIndex),
+    INT32(NativeIndex<i32>),
+    INT64(NativeIndex<i64>),
+    INT96(NativeIndex<Int96>),
+    FLOAT(NativeIndex<f32>),
+    DOUBLE(NativeIndex<f64>),
+    BYTE_ARRAY(ByteArrayIndex),
+    FIXED_LEN_BYTE_ARRAY(ByteArrayIndex),
+}
+
+/// An index of a column of [`Type`] physical representation
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct NativeIndex<T: ParquetValueType> {
+    /// The physical type
+    pub physical_type: Type,
+    /// The indexes, one item per page
+    pub indexes: Vec<PageIndex<T>>,
+    /// the order
+    pub boundary_order: BoundaryOrder,
+}
+
+impl<T: ParquetValueType> NativeIndex<T> {
+    /// Creates a new [`NativeIndex`]
+    pub(crate) fn try_new(
+        index: ColumnIndex,
+        physical_type: Type,
+    ) -> Result<Self, ParquetError> {
+        let len = index.min_values.len();
+
+        let null_counts = index
+            .null_counts
+            .map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
+            .unwrap_or_else(|| vec![None; len]);
+
+        let indexes = index
+            .min_values
+            .iter()
+            .zip(index.max_values.into_iter())
+            .zip(index.null_pages.into_iter())
+            .zip(null_counts.into_iter())
+            .map(|(((min, max), is_null), null_count)| {
+                let (min, max) = if is_null {
+                    (None, None)
+                } else {
+                    let min = min.as_slice();
+                    let max = max.as_slice();
+                    (Some(from_le_slice::<T>(min)), 
Some(from_le_slice::<T>(max)))
+                };
+                Ok(PageIndex {
+                    min,
+                    max,
+                    null_count,
+                })
+            })
+            .collect::<Result<Vec<_>, ParquetError>>()?;
+
+        Ok(Self {
+            physical_type,
+            indexes,
+            boundary_order: index.boundary_order,
+        })
+    }
+}
+
+/// An index of a column of bytes type
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct ByteArrayIndex {
+    /// The physical type
+    pub physical_type: Type,
+    /// The indexes, one item per page
+    pub indexes: Vec<PageIndex<Vec<u8>>>,
+    pub boundary_order: BoundaryOrder,
+}
+
+impl ByteArrayIndex {
+    pub(crate) fn try_new(
+        index: ColumnIndex,
+        physical_type: Type,
+    ) -> Result<Self, ParquetError> {
+        let len = index.min_values.len();
+
+        let null_counts = index
+            .null_counts
+            .map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
+            .unwrap_or_else(|| vec![None; len]);
+
+        let indexes = index
+            .min_values
+            .into_iter()
+            .zip(index.max_values.into_iter())
+            .zip(index.null_pages.into_iter())
+            .zip(null_counts.into_iter())
+            .map(|(((min, max), is_null), null_count)| {
+                let (min, max) = if is_null {
+                    (None, None)
+                } else {
+                    (Some(min), Some(max))
+                };
+                Ok(PageIndex {
+                    min,
+                    max,
+                    null_count,
+                })
+            })
+            .collect::<Result<Vec<_>, ParquetError>>()?;
+
+        Ok(Self {
+            physical_type,
+            indexes,
+            boundary_order: index.boundary_order,
+        })
+    }
+}
+
+/// An index of a column of boolean physical type
+#[derive(Debug, Clone, PartialEq, Eq, Hash)]
+pub struct BooleanIndex {
+    /// The indexes, one item per page
+    pub indexes: Vec<PageIndex<bool>>,
+    pub boundary_order: BoundaryOrder,
+}
+
+impl BooleanIndex {
+    pub(crate) fn try_new(index: ColumnIndex) -> Result<Self, ParquetError> {
+        let len = index.min_values.len();
+
+        let null_counts = index
+            .null_counts
+            .map(|x| x.into_iter().map(Some).collect::<Vec<_>>())
+            .unwrap_or_else(|| vec![None; len]);
+
+        let indexes = index
+            .min_values
+            .into_iter()
+            .zip(index.max_values.into_iter())
+            .zip(index.null_pages.into_iter())
+            .zip(null_counts.into_iter())
+            .map(|(((min, max), is_null), null_count)| {
+                let (min, max) = if is_null {
+                    (None, None)
+                } else {
+                    let min = min[0] != 0;
+                    let max = max[0] == 1;
+                    (Some(min), Some(max))
+                };
+                Ok(PageIndex {
+                    min,
+                    max,
+                    null_count,
+                })
+            })
+            .collect::<Result<Vec<_>, ParquetError>>()?;
+
+        Ok(Self {
+            indexes,
+            boundary_order: index.boundary_order,
+        })
+    }
+}
diff --git a/parquet/src/file/page_index/index_reader.rs 
b/parquet/src/file/page_index/index_reader.rs
new file mode 100644
index 000000000..841448090
--- /dev/null
+++ b/parquet/src/file/page_index/index_reader.rs
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+use crate::basic::Type;
+use crate::data_type::Int96;
+use crate::errors::ParquetError;
+use crate::file::metadata::ColumnChunkMetaData;
+use crate::file::page_index::index::{BooleanIndex, ByteArrayIndex, Index, 
NativeIndex};
+use crate::file::reader::ChunkReader;
+use parquet_format::{ColumnIndex, OffsetIndex, PageLocation};
+use std::io::{Cursor, Read};
+use thrift::protocol::TCompactInputProtocol;
+
+/// Read on row group's all columns indexes and change into  [`Index`]
+/// If not the format not available return an empty vector.
+pub fn read_columns_indexes<R: ChunkReader>(
+    reader: &R,
+    chunks: &[ColumnChunkMetaData],
+) -> Result<Vec<Index>, ParquetError> {
+    let (offset, lengths) = get_index_offset_and_lengths(chunks)?;
+    let length = lengths.iter().sum::<usize>();
+
+    //read all need data into buffer
+    let mut reader = reader.get_read(offset, reader.len() as usize)?;
+    let mut data = vec![0; length];
+    reader.read_exact(&mut data)?;
+
+    let mut start = 0;
+    let data = lengths.into_iter().map(|length| {
+        let r = &data[start..start + length];
+        start += length;
+        r
+    });
+
+    chunks
+        .iter()
+        .zip(data)
+        .map(|(chunk, data)| {
+            let column_type = chunk.column_type();
+            deserialize_column_index(data, column_type)
+        })
+        .collect()
+}
+
+/// Read on row group's all indexes and change into  [`Index`]
+/// If not the format not available return an empty vector.
+pub fn read_pages_locations<R: ChunkReader>(
+    reader: &R,
+    chunks: &[ColumnChunkMetaData],
+) -> Result<Vec<Vec<PageLocation>>, ParquetError> {
+    let (offset, total_length) = get_location_offset_and_total_length(chunks)?;
+
+    //read all need data into buffer
+    let mut reader = reader.get_read(offset, reader.len() as usize)?;
+    let mut data = vec![0; total_length];
+    reader.read_exact(&mut data)?;
+
+    let mut d = Cursor::new(data);
+    let mut result = vec![];
+
+    for _ in 0..chunks.len() {
+        let mut prot = TCompactInputProtocol::new(&mut d);
+        let offset = OffsetIndex::read_from_in_protocol(&mut prot)?;
+        result.push(offset.page_locations);
+    }
+    Ok(result)
+}
+
+//Get File offsets of every ColumnChunk's page_index
+//If there are invalid offset return a zero offset with empty lengths.
+fn get_index_offset_and_lengths(
+    chunks: &[ColumnChunkMetaData],
+) -> Result<(u64, Vec<usize>), ParquetError> {
+    let first_col_metadata = if let Some(chunk) = chunks.first() {
+        chunk
+    } else {
+        return Ok((0, vec![]));
+    };
+
+    let offset: u64 = if let Some(offset) = 
first_col_metadata.column_index_offset() {
+        offset.try_into().unwrap()
+    } else {
+        return Ok((0, vec![]));
+    };
+
+    let lengths = chunks
+        .iter()
+        .map(|x| x.column_index_length())
+        .map(|maybe_length| {
+            let index_length = maybe_length.ok_or_else(|| {
+                ParquetError::General(
+                    "The column_index_length must exist if offset_index_offset 
exists"
+                        .to_string(),
+                )
+            })?;
+
+            Ok(index_length.try_into().unwrap())
+        })
+        .collect::<Result<Vec<_>, ParquetError>>()?;
+
+    Ok((offset, lengths))
+}
+
+//Get File offset of ColumnChunk's pages_locations
+//If there are invalid offset return a zero offset with zero length.
+fn get_location_offset_and_total_length(
+    chunks: &[ColumnChunkMetaData],
+) -> Result<(u64, usize), ParquetError> {
+    let metadata = if let Some(chunk) = chunks.first() {
+        chunk
+    } else {
+        return Ok((0, 0));
+    };
+
+    let offset: u64 = if let Some(offset) = metadata.offset_index_offset() {
+        offset.try_into().unwrap()
+    } else {
+        return Ok((0, 0));
+    };
+
+    let total_length = chunks
+        .iter()
+        .map(|x| x.offset_index_length().unwrap())
+        .sum::<i32>() as usize;
+    Ok((offset, total_length))
+}
+
+fn deserialize_column_index(
+    data: &[u8],
+    column_type: Type,
+) -> Result<Index, ParquetError> {
+    let mut d = Cursor::new(data);
+    let mut prot = TCompactInputProtocol::new(&mut d);
+
+    let index = ColumnIndex::read_from_in_protocol(&mut prot)?;
+
+    let index = match column_type {
+        Type::BOOLEAN => Index::BOOLEAN(BooleanIndex::try_new(index)?),
+        Type::INT32 => Index::INT32(NativeIndex::<i32>::try_new(index, 
column_type)?),
+        Type::INT64 => Index::INT64(NativeIndex::<i64>::try_new(index, 
column_type)?),
+        Type::INT96 => Index::INT96(NativeIndex::<Int96>::try_new(index, 
column_type)?),
+        Type::FLOAT => Index::FLOAT(NativeIndex::<f32>::try_new(index, 
column_type)?),
+        Type::DOUBLE => Index::DOUBLE(NativeIndex::<f64>::try_new(index, 
column_type)?),
+        Type::BYTE_ARRAY => {
+            Index::BYTE_ARRAY(ByteArrayIndex::try_new(index, column_type)?)
+        }
+        Type::FIXED_LEN_BYTE_ARRAY => {
+            Index::FIXED_LEN_BYTE_ARRAY(ByteArrayIndex::try_new(index, 
column_type)?)
+        }
+    };
+
+    Ok(index)
+}
diff --git a/parquet/src/file/page_index/mod.rs 
b/parquet/src/file/page_index/mod.rs
new file mode 100644
index 000000000..dcc1120fc
--- /dev/null
+++ b/parquet/src/file/page_index/mod.rs
@@ -0,0 +1,19 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+pub mod index;
+pub mod index_reader;
diff --git a/parquet/src/file/serialized_reader.rs 
b/parquet/src/file/serialized_reader.rs
index 8059157aa..1dd374ef8 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -27,6 +27,7 @@ use crate::basic::{Compression, Encoding, Type};
 use crate::column::page::{Page, PageReader};
 use crate::compression::{create_codec, Codec};
 use crate::errors::{ParquetError, Result};
+use crate::file::page_index::index_reader;
 use crate::file::{footer, metadata::*, reader::*, statistics};
 use crate::record::reader::RowIter;
 use crate::record::Row;
@@ -132,12 +133,16 @@ pub struct SerializedFileReader<R: ChunkReader> {
 /// they will be chained using 'AND' to filter the row groups.
 pub struct ReadOptionsBuilder {
     predicates: Vec<Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>>,
+    enable_page_index: bool,
 }
 
 impl ReadOptionsBuilder {
     /// New builder
     pub fn new() -> Self {
-        ReadOptionsBuilder { predicates: vec![] }
+        ReadOptionsBuilder {
+            predicates: vec![],
+            enable_page_index: false,
+        }
     }
 
     /// Add a predicate on row group metadata to the reading option,
@@ -162,10 +167,17 @@ impl ReadOptionsBuilder {
         self
     }
 
+    /// Enable page index in the reading option,
+    pub fn with_page_index(mut self) -> Self {
+        self.enable_page_index = true;
+        self
+    }
+
     /// Seal the builder and return the read options
     pub fn build(self) -> ReadOptions {
         ReadOptions {
             predicates: self.predicates,
+            enable_page_index: self.enable_page_index,
         }
     }
 }
@@ -176,6 +188,7 @@ impl ReadOptionsBuilder {
 /// All predicates will be chained using 'AND' to filter the row groups.
 pub struct ReadOptions {
     predicates: Vec<Box<dyn FnMut(&RowGroupMetaData, usize) -> bool>>,
+    enable_page_index: bool,
 }
 
 impl<R: 'static + ChunkReader> SerializedFileReader<R> {
@@ -209,13 +222,33 @@ impl<R: 'static + ChunkReader> SerializedFileReader<R> {
             }
         }
 
-        Ok(Self {
-            chunk_reader: Arc::new(chunk_reader),
-            metadata: ParquetMetaData::new(
-                metadata.file_metadata().clone(),
-                filtered_row_groups,
-            ),
-        })
+        if options.enable_page_index {
+            //Todo for now test data `data_index_bloom_encoding_stats.parquet` 
only have one rowgroup
+            //support multi after create multi-RG test data.
+            let cols = metadata.row_group(0);
+            let columns_indexes =
+                index_reader::read_columns_indexes(&chunk_reader, 
cols.columns())?;
+            let pages_locations =
+                index_reader::read_pages_locations(&chunk_reader, 
cols.columns())?;
+
+            Ok(Self {
+                chunk_reader: Arc::new(chunk_reader),
+                metadata: ParquetMetaData::new_with_page_index(
+                    metadata.file_metadata().clone(),
+                    filtered_row_groups,
+                    Some(columns_indexes),
+                    Some(pages_locations),
+                ),
+            })
+        } else {
+            Ok(Self {
+                chunk_reader: Arc::new(chunk_reader),
+                metadata: ParquetMetaData::new(
+                    metadata.file_metadata().clone(),
+                    filtered_row_groups,
+                ),
+            })
+        }
     }
 }
 
@@ -475,9 +508,11 @@ impl<T: Read + Send> PageReader for 
SerializedPageReader<T> {
 mod tests {
     use super::*;
     use crate::basic::{self, ColumnOrder};
+    use crate::file::page_index::index::Index;
     use crate::record::RowAccessor;
     use crate::schema::parser::parse_message_type;
     use crate::util::test_common::{get_test_file, get_test_path};
+    use parquet_format::BoundaryOrder;
     use std::sync::Arc;
 
     #[test]
@@ -605,9 +640,9 @@ mod tests {
         let file_metadata = metadata.file_metadata();
         assert!(file_metadata.created_by().is_some());
         assert_eq!(
-      file_metadata.created_by().unwrap(),
-      "impala version 1.3.0-INTERNAL (build 
8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)"
-    );
+            file_metadata.created_by().unwrap(),
+            "impala version 1.3.0-INTERNAL (build 
8a48ddb1eff84592b3fc06bc6f51ec120e1fffc9)"
+        );
         assert!(file_metadata.key_value_metadata().is_none());
         assert_eq!(file_metadata.num_rows(), 8);
         assert_eq!(file_metadata.version(), 1);
@@ -955,4 +990,66 @@ mod tests {
         assert_eq!(metadata.num_row_groups(), 0);
         Ok(())
     }
+
+    #[test]
+    // Use java parquet-tools get below pageIndex info
+    // !```
+    // parquet-tools column-index ./data_index_bloom_encoding_stats.parquet
+    // row group 0:
+    // column index for column String:
+    // Boudary order: ASCENDING
+    // page-0  :
+    // null count                 min                                  max
+    // 0                          Hello                                today
+    //
+    // offset index for column String:
+    // page-0   :
+    // offset   compressed size       first row index
+    // 4               152                     0
+    ///```
+    //
+    fn test_page_index_reader() {
+        let test_file = 
get_test_file("data_index_bloom_encoding_stats.parquet");
+        let builder = ReadOptionsBuilder::new();
+        //enable read page index
+        let options = builder.with_page_index().build();
+        let reader_result = SerializedFileReader::new_with_options(test_file, 
options);
+        let reader = reader_result.unwrap();
+
+        // Test contents in Parquet metadata
+        let metadata = reader.metadata();
+        assert_eq!(metadata.num_row_groups(), 1);
+
+        let page_indexes = metadata.page_indexes().unwrap();
+
+        // only one row group
+        assert_eq!(page_indexes.len(), 1);
+        let index = if let Index::BYTE_ARRAY(index) = 
page_indexes.get(0).unwrap() {
+            index
+        } else {
+            unreachable!()
+        };
+
+        assert_eq!(index.boundary_order, BoundaryOrder::Ascending);
+        let index_in_pages = &index.indexes;
+
+        //only one page group
+        assert_eq!(index_in_pages.len(), 1);
+
+        let page0 = index_in_pages.get(0).unwrap();
+        let min = page0.min.as_ref().unwrap();
+        let max = page0.max.as_ref().unwrap();
+        assert_eq!("Hello", std::str::from_utf8(min.as_slice()).unwrap());
+        assert_eq!("today", std::str::from_utf8(max.as_slice()).unwrap());
+
+        let offset_indexes = metadata.offset_indexes().unwrap();
+        // only one row group
+        assert_eq!(offset_indexes.len(), 1);
+        let offset_index = offset_indexes.get(0).unwrap();
+        let page_offset = offset_index.get(0).unwrap();
+
+        assert_eq!(4, page_offset.offset);
+        assert_eq!(152, page_offset.compressed_page_size);
+        assert_eq!(0, page_offset.first_row_index);
+    }
 }
diff --git a/parquet/src/util/bit_util.rs b/parquet/src/util/bit_util.rs
index 288c771b0..b535ee02a 100644
--- a/parquet/src/util/bit_util.rs
+++ b/parquet/src/util/bit_util.rs
@@ -32,6 +32,17 @@ pub fn from_ne_slice<T: FromBytes>(bs: &[u8]) -> T {
     T::from_ne_bytes(b)
 }
 
+#[inline]
+pub fn from_le_slice<T: FromBytes>(bs: &[u8]) -> T {
+    let mut b = T::Buffer::default();
+    {
+        let b = b.as_mut();
+        let bs = &bs[..b.len()];
+        b.copy_from_slice(bs);
+    }
+    T::from_le_bytes(b)
+}
+
 pub trait FromBytes: Sized {
     type Buffer: AsMut<[u8]> + Default;
     fn from_le_bytes(bs: Self::Buffer) -> Self;

Reply via email to