tustvold commented on code in PR #2526:
URL: https://github.com/apache/arrow-rs/pull/2526#discussion_r950295767
##########
object_store/src/local.rs:
##########
@@ -1068,6 +1068,7 @@ mod tests {
integration.head(&path).await.unwrap();
}
+ #[ignore]
Review Comment:
:smile:
##########
parquet/src/arrow/async_reader.rs:
##########
@@ -218,6 +296,36 @@ impl<T: AsyncFileReader + Send + 'static>
ArrowReaderBuilder<AsyncReader<T>> {
Self::new_builder(AsyncReader(input), metadata, Default::default())
}
+ pub async fn new_with_index(mut input: T) -> Result<Self> {
Review Comment:
I think it would be more consistent to have `new_with_options` accepting
`ArrowReaderOptions`. This already has a field on it for the page index
##########
parquet/src/arrow/async_reader.rs:
##########
@@ -139,6 +143,80 @@ pub trait AsyncFileReader: Send {
/// allowing fine-grained control over how metadata is sourced, in
particular allowing
/// for caching, pre-fetching, catalog metadata, etc...
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;
+
+ /// Provides asynchronous access to the the page index for each column
chunk in a
+ /// row group. Will panic if `row_group_idx` is greater than or equal to
`num_row_groups`
+ fn get_column_indexes(
+ &mut self,
+ metadata: Arc<ParquetMetaData>,
+ row_group_idx: usize,
+ ) -> BoxFuture<'_, Result<Vec<Index>>> {
+ async move {
+ let chunks = metadata.row_group(row_group_idx).columns();
+
+ let (offset, lengths) =
index_reader::get_index_offset_and_lengths(chunks)?;
+ let length = lengths.iter().sum::<usize>();
+
+ if length == 0 {
+ return Ok(vec![Index::NONE; chunks.len()]);
+ }
+
+ //read all need data into buffer
+ let data = self
+ .get_bytes(offset as usize..offset as usize + length)
Review Comment:
This will perform separate get_bytes requests to fetch the page index and
column index information for each column chunk. This is likely not a good idea,
especially since this will be performed serially.
Ideally we would identify the ranges of all the index information, and then
call get_byte_ranges, this will allow coalescing proximate requests to
ObjectStore, paralell fetch, etc...
##########
parquet/src/file/page_index/index_reader.rs:
##########
@@ -64,6 +68,10 @@ pub fn read_pages_locations<R: ChunkReader>(
) -> Result<Vec<Vec<PageLocation>>, ParquetError> {
let (offset, total_length) = get_location_offset_and_total_length(chunks)?;
+ if total_length == 0 {
Review Comment:
I think this might fix https://github.com/apache/arrow-rs/issues/2434
##########
parquet/src/arrow/async_reader.rs:
##########
@@ -218,6 +296,36 @@ impl<T: AsyncFileReader + Send + 'static>
ArrowReaderBuilder<AsyncReader<T>> {
Self::new_builder(AsyncReader(input), metadata, Default::default())
}
+ pub async fn new_with_index(mut input: T) -> Result<Self> {
+ let metadata = input.get_metadata().await?;
+
+ let mut row_groups = metadata.row_groups().to_vec();
+
+ let mut columns_indexes = vec![];
+ let mut offset_indexes = vec![];
+
+ for (idx, rg) in row_groups.iter_mut().enumerate() {
+ let column_index = input.get_column_indexes(metadata.clone(),
idx).await?;
+
+ columns_indexes.push(column_index);
+ if let Some(offset_index) =
+ input.get_page_locations(metadata.clone(), idx).await?
+ {
+ rg.set_page_offset(offset_index.clone());
+ offset_indexes.push(offset_index);
+ }
+ }
+
+ let metadata = Arc::new(ParquetMetaData::new_with_page_index(
Review Comment:
Not part of this PR, but I still feel something is off with the way the
index information is located on ParquetMetadata...
##########
parquet/src/arrow/arrow_reader/selection.rs:
##########
@@ -162,7 +162,12 @@ impl RowSelection {
current_selector = selectors.next();
}
} else {
- break;
+ if !(selector.skip || current_page_included) {
Review Comment:
What are the implications of this change?
##########
parquet/src/arrow/async_reader.rs:
##########
@@ -218,6 +296,36 @@ impl<T: AsyncFileReader + Send + 'static>
ArrowReaderBuilder<AsyncReader<T>> {
Self::new_builder(AsyncReader(input), metadata, Default::default())
}
+ pub async fn new_with_index(mut input: T) -> Result<Self> {
+ let metadata = input.get_metadata().await?;
+
+ let mut row_groups = metadata.row_groups().to_vec();
+
+ let mut columns_indexes = vec![];
+ let mut offset_indexes = vec![];
+
+ for (idx, rg) in row_groups.iter_mut().enumerate() {
+ let column_index = input.get_column_indexes(metadata.clone(),
idx).await?;
Review Comment:
We should check if the column index has already been fetched to the
metadata, and not fetch it again if it is already present
##########
parquet/src/arrow/async_reader.rs:
##########
@@ -139,6 +143,80 @@ pub trait AsyncFileReader: Send {
/// allowing fine-grained control over how metadata is sourced, in
particular allowing
/// for caching, pre-fetching, catalog metadata, etc...
fn get_metadata(&mut self) -> BoxFuture<'_, Result<Arc<ParquetMetaData>>>;
+
+ /// Provides asynchronous access to the the page index for each column
chunk in a
+ /// row group. Will panic if `row_group_idx` is greater than or equal to
`num_row_groups`
+ fn get_column_indexes(
Review Comment:
I'm not sure of the value of exposing these on `AsyncFileReader`, and not
just handling the logic internally. Ultimately if the implementer wants to
override the way the index is fetched, they can just return ParquetMetadata
from `get_metadata` with the index information already loaded.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]