This is an automated email from the ASF dual-hosted git repository.

tustvold pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow-rs.git


The following commit(s) were added to refs/heads/master by this push:
     new 63ab69e5b Push ChunkReader into SerializedPageReader (#2463) (#2464)
63ab69e5b is described below

commit 63ab69e5bf5b97a7ff9ae58a6f20f6065a1ab932
Author: Raphael Taylor-Davies <[email protected]>
AuthorDate: Wed Aug 17 10:01:33 2022 +0100

    Push ChunkReader into SerializedPageReader (#2463) (#2464)
---
 parquet/src/column/writer/mod.rs      |  46 ++--
 parquet/src/file/reader.rs            |   2 +-
 parquet/src/file/serialized_reader.rs | 383 +++++++++++++++++-----------------
 parquet/src/file/writer.rs            |  25 ++-
 parquet/src/util/mod.rs               |   1 -
 parquet/src/util/page_util.rs         |  96 ---------
 6 files changed, 238 insertions(+), 315 deletions(-)

diff --git a/parquet/src/column/writer/mod.rs b/parquet/src/column/writer/mod.rs
index 669cacee6..c7518c89e 100644
--- a/parquet/src/column/writer/mod.rs
+++ b/parquet/src/column/writer/mod.rs
@@ -1082,6 +1082,7 @@ fn compare_greater_byte_array_decimals(a: &[u8], b: 
&[u8]) -> bool {
 
 #[cfg(test)]
 mod tests {
+    use bytes::Bytes;
     use parquet_format::BoundaryOrder;
     use rand::distributions::uniform::SampleUniform;
     use std::sync::Arc;
@@ -1096,7 +1097,7 @@ mod tests {
         writer::SerializedPageWriter,
     };
     use crate::schema::types::{ColumnDescriptor, ColumnPath, Type as 
SchemaType};
-    use crate::util::{io::FileSource, 
test_common::rand_gen::random_numbers_range};
+    use crate::util::test_common::rand_gen::random_numbers_range;
 
     use super::*;
 
@@ -1645,7 +1646,7 @@ mod tests {
             )
             .unwrap();
 
-        let (_, _, metadata, _, _) = writer.close().unwrap();
+        let (_, rows_written, metadata, _, _) = writer.close().unwrap();
 
         let stats = metadata.statistics().unwrap();
         assert_eq!(stats.min_bytes(), 1_i32.to_le_bytes());
@@ -1654,10 +1655,10 @@ mod tests {
         assert!(stats.distinct_count().is_none());
 
         let reader = SerializedPageReader::new(
-            std::io::Cursor::new(buf),
-            7,
-            Compression::UNCOMPRESSED,
-            Type::INT32,
+            Arc::new(Bytes::from(buf)),
+            &metadata,
+            rows_written as usize,
+            None,
         )
         .unwrap();
 
@@ -1690,14 +1691,14 @@ mod tests {
             .write_batch(&[1, 2, 3, 4], Some(&[1, 0, 0, 1, 1, 1]), None)
             .unwrap();
 
-        let (_, _, metadata, _, _) = writer.close().unwrap();
+        let (_, rows_written, metadata, _, _) = writer.close().unwrap();
         assert!(metadata.statistics().is_none());
 
         let reader = SerializedPageReader::new(
-            std::io::Cursor::new(buf),
-            6,
-            Compression::UNCOMPRESSED,
-            Type::INT32,
+            Arc::new(Bytes::from(buf)),
+            &metadata,
+            rows_written as usize,
+            None,
         )
         .unwrap();
 
@@ -1818,16 +1819,15 @@ mod tests {
         let data = &[1, 2, 3, 4, 5, 6, 7, 8, 9, 10];
         let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 
0, props);
         writer.write_batch(data, None, None).unwrap();
-        let (bytes_written, _, _, _, _) = writer.close().unwrap();
+        let (_, rows_written, metadata, _, _) = writer.close().unwrap();
 
         // Read pages and check the sequence
-        let source = FileSource::new(&file, 0, bytes_written as usize);
         let mut page_reader = Box::new(
             SerializedPageReader::new(
-                source,
-                data.len() as i64,
-                Compression::UNCOMPRESSED,
-                Int32Type::get_physical_type(),
+                Arc::new(file),
+                &metadata,
+                rows_written as usize,
+                None,
             )
             .unwrap(),
         );
@@ -2201,16 +2201,14 @@ mod tests {
 
         let values_written = writer.write_batch(values, def_levels, 
rep_levels).unwrap();
         assert_eq!(values_written, values.len());
-        let (bytes_written, rows_written, column_metadata, _, _) =
-            writer.close().unwrap();
+        let (_, rows_written, column_metadata, _, _) = writer.close().unwrap();
 
-        let source = FileSource::new(&file, 0, bytes_written as usize);
         let page_reader = Box::new(
             SerializedPageReader::new(
-                source,
-                column_metadata.num_values(),
-                column_metadata.compression(),
-                T::get_physical_type(),
+                Arc::new(file),
+                &column_metadata,
+                rows_written as usize,
+                None,
             )
             .unwrap(),
         );
diff --git a/parquet/src/file/reader.rs b/parquet/src/file/reader.rs
index d75227365..2d7c6c5e4 100644
--- a/parquet/src/file/reader.rs
+++ b/parquet/src/file/reader.rs
@@ -45,7 +45,7 @@ pub trait Length {
 /// For an object store reader, each read can be mapped to a range request.
 pub trait ChunkReader: Length + Send + Sync {
     type T: Read + Send;
-    /// get a serialy readeable slice of the current reader
+    /// Get a serially readable slice of the current reader
     /// This should fail if the slice exceeds the current bounds
     fn get_read(&self, start: u64, length: usize) -> Result<Self::T>;
 }
diff --git a/parquet/src/file/serialized_reader.rs 
b/parquet/src/file/serialized_reader.rs
index e8ef025ad..1a6a9026d 100644
--- a/parquet/src/file/serialized_reader.rs
+++ b/parquet/src/file/serialized_reader.rs
@@ -18,14 +18,15 @@
 //! Contains implementations of the reader traits FileReader, RowGroupReader 
and PageReader
 //! Also contains implementations of the ChunkReader for files (with 
buffering) and byte arrays (RAM)
 
-use bytes::{Buf, Bytes};
 use std::collections::VecDeque;
+use std::io::Cursor;
 use std::{convert::TryFrom, fs::File, io::Read, path::Path, sync::Arc};
 
+use bytes::{Buf, Bytes};
 use parquet_format::{PageHeader, PageLocation, PageType};
 use thrift::protocol::TCompactInputProtocol;
 
-use crate::basic::{Compression, Encoding, Type};
+use crate::basic::{Encoding, Type};
 use crate::column::page::{Page, PageMetadata, PageReader};
 use crate::compression::{create_codec, Codec};
 use crate::errors::{ParquetError, Result};
@@ -34,9 +35,7 @@ use crate::file::{footer, metadata::*, reader::*, statistics};
 use crate::record::reader::RowIter;
 use crate::record::Row;
 use crate::schema::types::Type as SchemaType;
-use crate::util::page_util::{calculate_row_count, get_pages_readable_slices};
 use crate::util::{io::TryClone, memory::ByteBufferPtr};
-
 // export `SliceableCursor` and `FileSource` publically so clients can
 // re-use the logic in their own ParquetFileWriter wrappers
 pub use crate::util::io::FileSource;
@@ -335,33 +334,19 @@ impl<'a, R: 'static + ChunkReader> RowGroupReader for 
SerializedRowGroupReader<'
     // TODO: fix PARQUET-816
     fn get_column_page_reader(&self, i: usize) -> Result<Box<dyn PageReader>> {
         let col = self.metadata.column(i);
-        let (col_start, col_length) = col.byte_range();
-        let page_reader = if let Some(offset_index) = 
self.metadata.page_offset_index() {
-            let col_chunk_offset_index = &offset_index[i];
-            let (page_bufs, has_dict) = get_pages_readable_slices(
-                col_chunk_offset_index,
-                col_start,
-                self.chunk_reader.clone(),
-            )?;
-            SerializedPageReader::new_with_page_offsets(
-                col.num_values(),
-                col.compression(),
-                col.column_descr().physical_type(),
-                col_chunk_offset_index.clone(),
-                has_dict,
-                page_bufs,
-            )?
-        } else {
-            let file_chunk =
-                self.chunk_reader.get_read(col_start, col_length as usize)?;
-            SerializedPageReader::new(
-                file_chunk,
-                col.num_values(),
-                col.compression(),
-                col.column_descr().physical_type(),
-            )?
-        };
-        Ok(Box::new(page_reader))
+
+        let page_locations = self
+            .metadata
+            .page_offset_index()
+            .as_ref()
+            .map(|x| x[i].clone());
+
+        Ok(Box::new(SerializedPageReader::new(
+            Arc::clone(&self.chunk_reader),
+            col,
+            self.metadata.num_rows() as usize,
+            page_locations,
+        )?))
     }
 
     fn get_row_iter(&self, projection: Option<SchemaType>) -> Result<RowIter> {
@@ -376,6 +361,30 @@ pub(crate) fn read_page_header<T: Read>(input: &mut T) -> 
Result<PageHeader> {
     Ok(page_header)
 }
 
+/// Reads a [`PageHeader`] from the provided [`Read`] returning the number of 
bytes read
+fn read_page_header_len<T: Read>(input: &mut T) -> Result<(usize, PageHeader)> 
{
+    /// A wrapper around a [`std::io::Read`] that keeps track of the bytes read
+    struct TrackedRead<R> {
+        inner: R,
+        bytes_read: usize,
+    }
+
+    impl<R: Read> Read for TrackedRead<R> {
+        fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
+            let v = self.inner.read(buf)?;
+            self.bytes_read += v;
+            Ok(v)
+        }
+    }
+
+    let mut tracked = TrackedRead {
+        inner: input,
+        bytes_read: 0,
+    };
+    let header = read_page_header(&mut tracked)?;
+    Ok((tracked.bytes_read, header))
+}
+
 /// Decodes a [`Page`] from the provided `buffer`
 pub(crate) fn decode_page(
     page_header: PageHeader,
@@ -471,83 +480,85 @@ pub(crate) fn decode_page(
     Ok(result)
 }
 
-enum SerializedPages<T: Read> {
-    /// Read entire chunk
-    Chunk { buf: T },
-    /// Read operate pages which can skip.
+enum SerializedPageReaderState {
+    Values {
+        /// The current byte offset in the reader
+        offset: usize,
+
+        /// The length of the chunk in bytes
+        remaining_bytes: usize,
+    },
     Pages {
-        offset_index: Vec<PageLocation>,
-        seen_num_data_pages: usize,
-        has_dictionary_page_to_read: bool,
-        page_bufs: VecDeque<T>,
+        /// Remaining page locations
+        page_locations: VecDeque<PageLocation>,
+        /// Remaining dictionary location if any
+        dictionary_page: Option<PageLocation>,
+        /// The total number of rows in this column chunk
+        total_rows: usize,
     },
 }
 
 /// A serialized implementation for Parquet [`PageReader`].
-pub struct SerializedPageReader<T: Read> {
-    // The file source buffer which references exactly the bytes for the 
column trunk
-    // to be read by this page reader.
-    buf: SerializedPages<T>,
+pub struct SerializedPageReader<R: ChunkReader> {
+    /// The chunk reader
+    reader: Arc<R>,
 
-    // The compression codec for this column chunk. Only set for non-PLAIN 
codec.
+    /// The compression codec for this column chunk. Only set for non-PLAIN 
codec.
     decompressor: Option<Box<dyn Codec>>,
 
-    // The number of values we have seen so far.
-    seen_num_values: i64,
-
-    // The number of total values in this column chunk.
-    total_num_values: i64,
-
-    // Column chunk type.
+    /// Column chunk type.
     physical_type: Type,
+
+    state: SerializedPageReaderState,
 }
 
-impl<T: Read> SerializedPageReader<T> {
-    /// Creates a new serialized page reader from file source.
+impl<R: ChunkReader> SerializedPageReader<R> {
+    /// Creates a new serialized page reader from a chunk reader and metadata
     pub fn new(
-        buf: T,
-        total_num_values: i64,
-        compression: Compression,
-        physical_type: Type,
+        reader: Arc<R>,
+        meta: &ColumnChunkMetaData,
+        total_rows: usize,
+        page_locations: Option<Vec<PageLocation>>,
     ) -> Result<Self> {
-        let decompressor = create_codec(compression)?;
-        let result = Self {
-            buf: SerializedPages::Chunk { buf },
-            total_num_values,
-            seen_num_values: 0,
-            decompressor,
-            physical_type,
-        };
-        Ok(result)
-    }
+        let decompressor = create_codec(meta.compression())?;
+        let (start, len) = meta.byte_range();
+
+        let state = match page_locations {
+            Some(locations) => {
+                let dictionary_page = match locations.first() {
+                    Some(dict_offset) if dict_offset.offset as u64 != start => 
{
+                        Some(PageLocation {
+                            offset: start as i64,
+                            compressed_page_size: (dict_offset.offset as u64 - 
start)
+                                as i32,
+                            first_row_index: 0,
+                        })
+                    }
+                    _ => None,
+                };
 
-    /// Creates a new serialized page reader from file source.
-    pub fn new_with_page_offsets(
-        total_num_values: i64,
-        compression: Compression,
-        physical_type: Type,
-        offset_index: Vec<PageLocation>,
-        has_dictionary_page_to_read: bool,
-        page_bufs: VecDeque<T>,
-    ) -> Result<Self> {
-        let decompressor = create_codec(compression)?;
-        let result = Self {
-            buf: SerializedPages::Pages {
-                offset_index,
-                seen_num_data_pages: 0,
-                has_dictionary_page_to_read,
-                page_bufs,
+                SerializedPageReaderState::Pages {
+                    page_locations: locations.into(),
+                    dictionary_page,
+                    total_rows,
+                }
+            }
+            None => SerializedPageReaderState::Values {
+                offset: start as usize,
+                remaining_bytes: len as usize,
             },
-            total_num_values,
-            seen_num_values: 0,
-            decompressor,
-            physical_type,
         };
-        Ok(result)
+
+        Ok(Self {
+            reader,
+            decompressor,
+            state,
+            physical_type: meta.column_type(),
+        })
     }
 }
 
-impl<T: Read + Send> Iterator for SerializedPageReader<T> {
+impl<R: ChunkReader> Iterator for SerializedPageReader<R> {
     type Item = Result<Page>;
 
     fn next(&mut self) -> Option<Self::Item> {
@@ -555,133 +566,126 @@ impl<T: Read + Send> Iterator for 
SerializedPageReader<T> {
     }
 }
 
-impl<T: Read + Send> PageReader for SerializedPageReader<T> {
+impl<R: ChunkReader> PageReader for SerializedPageReader<R> {
     fn get_next_page(&mut self) -> Result<Option<Page>> {
-        let mut cursor;
-        let mut dictionary_cursor;
-        while self.seen_num_values < self.total_num_values {
-            match &mut self.buf {
-                SerializedPages::Chunk { buf } => {
-                    cursor = buf;
-                }
-                SerializedPages::Pages {
-                    offset_index,
-                    seen_num_data_pages,
-                    has_dictionary_page_to_read,
-                    page_bufs,
+        loop {
+            let page = match &mut self.state {
+                SerializedPageReaderState::Values {
+                    offset,
+                    remaining_bytes: remaining,
+                    ..
                 } => {
-                    if offset_index.len() <= *seen_num_data_pages {
+                    if *remaining == 0 {
                         return Ok(None);
-                    } else if *seen_num_data_pages == 0 && 
*has_dictionary_page_to_read {
-                        dictionary_cursor = page_bufs.pop_front().unwrap();
-                        cursor = &mut dictionary_cursor;
-                    } else {
-                        cursor = 
page_bufs.get_mut(*seen_num_data_pages).unwrap();
                     }
-                }
-            }
 
-            let page_header = read_page_header(cursor)?;
+                    let mut read = self.reader.get_read(*offset as u64, 
*remaining)?;
 
-            let to_read = page_header.compressed_page_size as usize;
-            let mut buffer = Vec::with_capacity(to_read);
-            let read = cursor.take(to_read as u64).read_to_end(&mut buffer)?;
+                    let (header_len, header) = read_page_header_len(&mut 
read)?;
+                    let data_len = header.compressed_page_size as usize;
+                    *offset += header_len + data_len;
+                    *remaining -= header_len + data_len;
 
-            if read != to_read {
-                return Err(eof_err!(
-                    "Expected to read {} bytes of page, read only {}",
-                    to_read,
-                    read
-                ));
-            }
+                    if header.type_ == PageType::IndexPage {
+                        continue;
+                    }
+
+                    let mut buffer = Vec::with_capacity(data_len);
+                    let read = read.take(data_len as u64).read_to_end(&mut 
buffer)?;
 
-            let buffer = ByteBufferPtr::new(buffer);
-            let result = match page_header.type_ {
-                PageType::DataPage | PageType::DataPageV2 => {
-                    let decoded = decode_page(
-                        page_header,
-                        buffer,
+                    if read != data_len {
+                        return Err(eof_err!(
+                            "Expected to read {} bytes of page, read only {}",
+                            data_len,
+                            read
+                        ));
+                    }
+
+                    decode_page(
+                        header,
+                        ByteBufferPtr::new(buffer),
                         self.physical_type,
                         self.decompressor.as_mut(),
-                    )?;
-                    self.seen_num_values += decoded.num_values() as i64;
-                    if let SerializedPages::Pages {
-                        seen_num_data_pages,
-                        ..
-                    } = &mut self.buf
-                    {
-                        *seen_num_data_pages += 1;
-                    }
-                    decoded
+                    )?
                 }
-                PageType::DictionaryPage => {
-                    if let SerializedPages::Pages {
-                        has_dictionary_page_to_read,
-                        ..
-                    } = &mut self.buf
+                SerializedPageReaderState::Pages {
+                    page_locations,
+                    dictionary_page,
+                    ..
+                } => {
+                    let front = match dictionary_page
+                        .take()
+                        .or_else(|| page_locations.pop_front())
                     {
-                        *has_dictionary_page_to_read = false;
+                        Some(front) => front,
+                        None => return Ok(None),
+                    };
+
+                    let page_len = front.compressed_page_size as usize;
+
+                    // TODO: Add ChunkReader get_bytes to potentially avoid 
copy
+                    let mut buffer = Vec::with_capacity(page_len);
+                    let read = self
+                        .reader
+                        .get_read(front.offset as u64, page_len)?
+                        .read_to_end(&mut buffer)?;
+
+                    if read != page_len {
+                        return Err(eof_err!(
+                            "Expected to read {} bytes of page, read only {}",
+                            page_len,
+                            read
+                        ));
                     }
+
+                    let mut cursor = Cursor::new(buffer);
+                    let header = read_page_header(&mut cursor)?;
+                    let offset = cursor.position();
+
+                    let bytes = Bytes::from(cursor.into_inner()).slice(offset 
as usize..);
                     decode_page(
-                        page_header,
-                        buffer,
+                        header,
+                        bytes.into(),
                         self.physical_type,
                         self.decompressor.as_mut(),
                     )?
                 }
-                _ => {
-                    // For unknown page type (e.g., INDEX_PAGE), skip and read 
next.
-                    continue;
-                }
             };
-            return Ok(Some(result));
-        }
 
-        // We are at the end of this column chunk and no more page left. 
Return None.
-        Ok(None)
+            return Ok(Some(page));
+        }
     }
 
     fn peek_next_page(&mut self) -> Result<Option<PageMetadata>> {
-        match &mut self.buf {
-            SerializedPages::Chunk { .. } => { Err(general_err!("Must set 
page_offset_index when using peek_next_page in SerializedPageReader.")) }
-            SerializedPages::Pages { offset_index, seen_num_data_pages, 
has_dictionary_page_to_read, .. } => {
-                if *seen_num_data_pages >= offset_index.len() {
-                    Ok(None)
-                } else if *seen_num_data_pages == 0 && 
*has_dictionary_page_to_read {
-                    // Will set `has_dictionary_page_to_read` false in 
`get_next_page`,
-                    // assume dictionary page must be read and cannot be 
skipped.
-                    Ok(Some(PageMetadata {
-                        num_rows: usize::MIN,
-                        is_dict: true,
+        match &self.state {
+            SerializedPageReaderState::Values {..} => Err(general_err!("Must 
set page_offset_index when using peek_next_page in SerializedPageReader.")),
+            SerializedPageReaderState::Pages { page_locations, 
dictionary_page, total_rows } => {
+                if dictionary_page.is_some() {
+                    Ok(Some(PageMetadata{
+                        num_rows: 0,
+                        is_dict: true
                     }))
-                } else {
-                    let row_count = calculate_row_count(
-                        offset_index,
-                        *seen_num_data_pages,
-                        self.total_num_values,
-                    )?;
-                    Ok(Some(PageMetadata {
-                        num_rows: row_count,
-                        is_dict: false,
+                } else if let Some(page) = page_locations.front() {
+                    let next_rows = page_locations.get(1).map(|x| 
x.first_row_index as usize).unwrap_or(*total_rows);
+
+                    Ok(Some(PageMetadata{
+                        num_rows: next_rows - page.first_row_index as usize,
+                        is_dict: false
                     }))
+                } else {
+                    Ok(None)
                 }
             }
         }
     }
 
     fn skip_next_page(&mut self) -> Result<()> {
-        match &mut self.buf {
-            SerializedPages::Chunk { .. } => { Err(general_err!("Must set 
page_offset_index when using skip_next_page in SerializedPageReader.")) }
-            SerializedPages::Pages { offset_index, seen_num_data_pages, .. } 
=> {
-                if offset_index.len() <= *seen_num_data_pages {
-                    Err(general_err!(
-                    "seen_num_data_pages is out of bound in 
SerializedPageReader."
-                ))
-                } else {
-                    *seen_num_data_pages += 1;
-                    // Notice: maybe need 'self.seen_num_values += xxx', for 
now we can not get skip values in skip_next_page.
-                    Ok(())
-                }
+        match &mut self.state {
+            SerializedPageReaderState::Values {..} =>{ Err(general_err!("Must 
set page_offset_index when using skip_next_page in SerializedPageReader.")) },
+            SerializedPageReaderState::Pages { page_locations, .. } => {
+                page_locations.pop_front();
+
+                Ok(())
             }
         }
     }
@@ -689,7 +693,10 @@ impl<T: Read + Send> PageReader for 
SerializedPageReader<T> {
 
 #[cfg(test)]
 mod tests {
-    use super::*;
+    use std::sync::Arc;
+
+    use parquet_format::BoundaryOrder;
+
     use crate::basic::{self, ColumnOrder};
     use crate::data_type::private::ParquetValueType;
     use crate::file::page_index::index::{ByteArrayIndex, Index, NativeIndex};
@@ -697,8 +704,8 @@ mod tests {
     use crate::schema::parser::parse_message_type;
     use crate::util::bit_util::from_le_slice;
     use crate::util::test_common::file_util::{get_test_file, get_test_path};
-    use parquet_format::BoundaryOrder;
-    use std::sync::Arc;
+
+    use super::*;
 
     #[test]
     fn test_cursor_and_file_has_the_same_behaviour() {
diff --git a/parquet/src/file/writer.rs b/parquet/src/file/writer.rs
index 863ccf854..d3d1f8809 100644
--- a/parquet/src/file/writer.rs
+++ b/parquet/src/file/writer.rs
@@ -663,7 +663,7 @@ mod tests {
     use super::*;
 
     use bytes::Bytes;
-    use std::{fs::File, io::Cursor};
+    use std::fs::File;
 
     use crate::basic::{Compression, Encoding, LogicalType, Repetition, Type};
     use crate::column::page::PageReader;
@@ -675,6 +675,7 @@ mod tests {
         statistics::{from_thrift, to_thrift, Statistics},
     };
     use crate::record::RowAccessor;
+    use crate::schema::types::{ColumnDescriptor, ColumnPath};
     use crate::util::memory::ByteBufferPtr;
 
     #[test]
@@ -1062,11 +1063,25 @@ mod tests {
             page_writer.close().unwrap();
         }
         {
+            let reader = bytes::Bytes::from(buffer);
+
+            let t = types::Type::primitive_type_builder("t", physical_type)
+                .build()
+                .unwrap();
+
+            let desc = ColumnDescriptor::new(Arc::new(t), 0, 0, 
ColumnPath::new(vec![]));
+            let meta = ColumnChunkMetaData::builder(Arc::new(desc))
+                .set_compression(codec)
+                .set_total_compressed_size(reader.len() as i64)
+                .set_num_values(total_num_values)
+                .build()
+                .unwrap();
+
             let mut page_reader = SerializedPageReader::new(
-                Cursor::new(&buffer),
-                total_num_values,
-                codec,
-                physical_type,
+                Arc::new(reader),
+                &meta,
+                total_num_values as usize,
+                None,
             )
             .unwrap();
 
diff --git a/parquet/src/util/mod.rs b/parquet/src/util/mod.rs
index d8ad739db..5f4302394 100644
--- a/parquet/src/util/mod.rs
+++ b/parquet/src/util/mod.rs
@@ -21,7 +21,6 @@ pub mod memory;
 pub mod bit_util;
 mod bit_pack;
 pub(crate) mod interner;
-pub(crate) mod page_util;
 #[cfg(any(test, feature = "test_common"))]
 pub(crate) mod test_common;
 
diff --git a/parquet/src/util/page_util.rs b/parquet/src/util/page_util.rs
deleted file mode 100644
index 7716b7116..000000000
--- a/parquet/src/util/page_util.rs
+++ /dev/null
@@ -1,96 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-use std::collections::VecDeque;
-use std::io::Read;
-use std::sync::Arc;
-use crate::errors::Result;
-use parquet_format::PageLocation;
-use crate::file::reader::ChunkReader;
-
-/// Use column chunk's offset index to get the `page_num` page row count.
-pub(crate) fn calculate_row_count(indexes: &[PageLocation], page_num: usize, 
total_row_count: i64) -> Result<usize> {
-    if page_num == indexes.len() - 1 {
-        // first_row_index start with 0, so no need to plus one additional.
-        Ok((total_row_count - indexes[page_num].first_row_index) as usize)
-    } else {
-        Ok((indexes[page_num + 1].first_row_index - 
indexes[page_num].first_row_index) as usize)
-    }
-}
-
-/// Use column chunk's offset index to get each page serially readable slice
-/// and a flag indicates whether having one dictionary page in this column 
chunk.
-pub(crate) fn get_pages_readable_slices<T: Read + Send, R: 
ChunkReader<T=T>>(col_chunk_offset_index: &[PageLocation], col_start: u64, 
chunk_reader: Arc<R>) -> Result<(VecDeque<T>, bool)> {
-    let first_data_page_offset = col_chunk_offset_index[0].offset as u64;
-    let has_dictionary_page = first_data_page_offset != col_start;
-    let mut page_readers = 
VecDeque::with_capacity(col_chunk_offset_index.len() + 1);
-
-    if has_dictionary_page {
-        let length = (first_data_page_offset - col_start) as usize;
-        let reader: T = chunk_reader.get_read(col_start, length)?;
-        page_readers.push_back(reader);
-    }
-
-    for index in col_chunk_offset_index {
-        let start = index.offset as u64;
-        let length = index.compressed_page_size as usize;
-        let reader: T = chunk_reader.get_read(start, length)?;
-        page_readers.push_back(reader)
-    }
-    Ok((page_readers, has_dictionary_page))
-}
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    /**
-     parquet-tools meta  ./test.parquet got:
-
-                file schema: test_schema
-        
--------------------------------------------------------------------------------
-        leaf:        REQUIRED INT64 R:0 D:
-
-            row group 1: RC:256 TS:2216 OFFSET:4
-        
--------------------------------------------------------------------------------
-        leaf:         INT64 UNCOMPRESSED DO:0 FPO:4 SZ:2216/2216/1.00 VC:256 
ENC:PLAIN,RLE ST:[min: 0, max: 255, num_nulls not defined
-
-    parquet-tools column-index -c leaf ./test.parquet got:
-
-            offset index for column leaf:
-                              offset   compressed size       first row index
-        page-0                         4               554                     0
-        page-1                       558               554                    
64
-        page-2                      1112               554                   
128
-        page-3                      1666               554                   
192
-
-    **/
-    #[test]
-    fn test_calculate_row_count() {
-        let total_row_count = 256;
-        let mut  indexes = vec![];
-        indexes.push(PageLocation::new(4, 554, 0));
-        indexes.push(PageLocation::new(558, 554, 64));
-        indexes.push(PageLocation::new(1112, 554, 128));
-        indexes.push(PageLocation::new(1666, 554, 192));
-        for i in 0..4 {
-            // each page should has 64 rows.
-            assert_eq!(64, calculate_row_count(indexes.as_slice(), i, 
total_row_count).unwrap());
-        }
-
-    }
-}

Reply via email to