alamb commented on code in PR #2022: URL: https://github.com/apache/arrow-rs/pull/2022#discussion_r916303534
########## parquet/src/arrow/schema.rs: ########## @@ -152,7 +152,10 @@ pub(crate) fn add_encoded_arrow_schema_to_metadata( value: Some(encoded), }; - let mut meta = props.key_value_metadata.clone().unwrap_or_default(); + let meta = props Review Comment: Does this change have any effect (other than to make the code more readable)? ########## parquet/src/column/writer.rs: ########## @@ -302,61 +308,52 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { // Find out the minimal length to prevent index out of bound errors. let mut min_len = values.len(); if let Some(levels) = def_levels { - min_len = cmp::min(min_len, levels.len()); + min_len = min_len.min(levels.len()); } if let Some(levels) = rep_levels { - min_len = cmp::min(min_len, levels.len()); + min_len = min_len.min(levels.len()); } // Find out number of batches to process. let write_batch_size = self.props.write_batch_size(); let num_batches = min_len / write_batch_size; - // Process pre-calculated statistics - match (min, max) { - (Some(min), Some(max)) => { - if self - .min_column_value - .as_ref() - .map_or(true, |v| self.compare_greater(v, min)) - { - self.min_column_value = Some(min.clone()); + if self.statistics_enabled == EnabledStatistics::Chunk { Review Comment: In fact, no that I re-read this, shouldn't this also be set if we are computing page level statistics? ########## parquet/src/file/properties.rs: ########## @@ -394,23 +376,33 @@ impl WriterPropertiesBuilder { /// Sets flag to enable/disable statistics for a column. /// Takes precedence over globally defined settings. - pub fn set_column_statistics_enabled(mut self, col: ColumnPath, value: bool) -> Self { - self.get_mut_props(col).set_statistics_enabled(value); - self - } - - /// Sets max size for statistics for a column. - /// Takes precedence over globally defined settings. - pub fn set_column_max_statistics_size( + pub fn set_column_statistics_enabled( mut self, col: ColumnPath, - value: usize, + value: EnabledStatistics, ) -> Self { - self.get_mut_props(col).set_max_statistics_size(value); + self.get_mut_props(col).set_statistics_enabled(value); self } } +/// Controls the level of statistics to be computed by the writer +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub enum EnabledStatistics { Review Comment: It would be good to clarify in here if `Page` also computes `Chunk` level statistics. If `Page` does not also include `Chunk` that seems strange to me (as I would expect lower level stats to also have coarser grained statistics). ########## parquet/src/column/writer.rs: ########## @@ -1754,6 +1753,56 @@ mod tests { } } + #[test] + fn test_mixed_precomputed_statistics() { + let mut buf = Vec::with_capacity(100); + let mut write = TrackedWrite::new(&mut buf); + let page_writer = Box::new(SerializedPageWriter::new(&mut write)); + let props = Arc::new(WriterProperties::builder().build()); + let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 0, props); + + writer.write_batch(&[1, 2, 3, 4], None, None).unwrap(); + writer + .write_batch_with_statistics( + &[5, 6, 7], + None, + None, + Some(&5), + Some(&7), + Some(0), + Some(3), + ) + .unwrap(); + + let (_, _, metadata, _, _) = writer.close().unwrap(); + + let stats = metadata.statistics().unwrap(); + assert_eq!(stats.min_bytes(), 1_i32.to_le_bytes()); + assert_eq!(stats.max_bytes(), 7_i32.to_le_bytes()); + assert_eq!(stats.null_count(), 0); + assert!(stats.distinct_count().is_none()); + + let reader = SerializedPageReader::new( + std::io::Cursor::new(buf), + 7, + Compression::UNCOMPRESSED, + Type::INT32, + ) + .unwrap(); + + let pages = reader.collect::<Result<Vec<_>>>().unwrap(); + assert_eq!(pages.len(), 2); + + assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE); + assert_eq!(pages[1].page_type(), PageType::DATA_PAGE); + + let page_statistics = pages[1].statistics().unwrap(); + assert_eq!(page_statistics.min_bytes(), 1_i32.to_le_bytes()); Review Comment: 👍 ########## parquet/src/column/writer.rs: ########## @@ -302,61 +308,52 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { // Find out the minimal length to prevent index out of bound errors. let mut min_len = values.len(); if let Some(levels) = def_levels { - min_len = cmp::min(min_len, levels.len()); + min_len = min_len.min(levels.len()); } if let Some(levels) = rep_levels { - min_len = cmp::min(min_len, levels.len()); + min_len = min_len.min(levels.len()); } // Find out number of batches to process. let write_batch_size = self.props.write_batch_size(); let num_batches = min_len / write_batch_size; - // Process pre-calculated statistics - match (min, max) { - (Some(min), Some(max)) => { - if self - .min_column_value - .as_ref() - .map_or(true, |v| self.compare_greater(v, min)) - { - self.min_column_value = Some(min.clone()); + if self.statistics_enabled == EnabledStatistics::Chunk { Review Comment: I would personally find `match self.statistics_enabled` ... to be easier to validate that other variants of `EnabledStatistics::Chunk` did not apply. Or perhaps a function `EnabledStatistics::compute_chunk_statistics()`. Basically I am thinking about "what if someone adds a new variant" -- it would be nice if the compiler told them where to update ########## parquet/src/column/writer.rs: ########## @@ -393,9 +389,11 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { self.write_batch_internal(values, def_levels, rep_levels, None, None, None, None) } - /// Writer may optionally provide pre-calculated statistics for this batch, in which case we do - /// not calculate page level statistics as this will defeat the purpose of speeding up the write - /// process with pre-calculated statistics. + /// Writer may optionally provide pre-calculated statistics for this batch + /// + /// Note: Unless disabled using by using [`WriterProperties`] to set + /// enabled statistics to [`EnabledStatistics::Chunk`], this will still compute Review Comment: This comment almost reads the opposite of what I think the code is doing (using the passed in pre-calculated statistics or computing them if passed in) ########## parquet/src/file/properties.rs: ########## @@ -463,15 +453,10 @@ impl ColumnProperties { } /// Sets whether or not statistics are enabled for this column. - fn set_statistics_enabled(&mut self, enabled: bool) { + fn set_statistics_enabled(&mut self, enabled: EnabledStatistics) { self.statistics_enabled = Some(enabled); } - /// Sets max size for statistics for this column. Review Comment: I don't see any mention of `max_statistics_size` in this PR description or the linked tickets. I think it is a reasonable feature (to limit bloating parquet files). Is the issue that it was ignored? If so perhaps we can leave the API in with a `warn` or `deprecate` that it is ignored and file a ticket to properly support it? ########## parquet/src/column/writer.rs: ########## @@ -1029,59 +1025,58 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { } } - #[allow(clippy::eq_op)] fn update_page_min_max(&mut self, val: &T::T) { + Self::update_min(&self.descr, val, &mut self.min_page_value); + Self::update_max(&self.descr, val, &mut self.max_page_value); + } + + fn update_column_min_max(&mut self) { + let min = self.min_page_value.as_ref().unwrap(); + Self::update_min(&self.descr, min, &mut self.min_column_value); + + let max = self.max_page_value.as_ref().unwrap(); + Self::update_max(&self.descr, max, &mut self.max_column_value); + } + + fn update_min(descr: &ColumnDescriptor, val: &T::T, min: &mut Option<T::T>) { + Self::update_stat(val, min, |cur| Self::compare_greater(descr, cur, val)) + } + + fn update_max(descr: &ColumnDescriptor, val: &T::T, max: &mut Option<T::T>) { + Self::update_stat(val, max, |cur| Self::compare_greater(descr, val, cur)) + } + + /// Perform a conditional update of `cur`, skipping any NaN values + /// + /// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls `should_update` with + /// the value of `cur`, and updates `cur` to `Some(val)` if it returns `true` Review Comment: cc @crepererum as I think you added the initial `NaN` handling a while ago ########## parquet/src/file/properties.rs: ########## @@ -394,23 +376,33 @@ impl WriterPropertiesBuilder { /// Sets flag to enable/disable statistics for a column. /// Takes precedence over globally defined settings. - pub fn set_column_statistics_enabled(mut self, col: ColumnPath, value: bool) -> Self { - self.get_mut_props(col).set_statistics_enabled(value); - self - } - - /// Sets max size for statistics for a column. - /// Takes precedence over globally defined settings. - pub fn set_column_max_statistics_size( + pub fn set_column_statistics_enabled( mut self, col: ColumnPath, - value: usize, + value: EnabledStatistics, ) -> Self { - self.get_mut_props(col).set_max_statistics_size(value); + self.get_mut_props(col).set_statistics_enabled(value); self } } +/// Controls the level of statistics to be computed by the writer +#[derive(Debug, Clone, Copy, Eq, PartialEq)] +pub enum EnabledStatistics { Review Comment: I actually like this API -- as for some use cases page level statistics are likely to have a higher overhead in storage and writing than benefit (e.g. large string columns for small row groups) ########## parquet/src/column/writer.rs: ########## @@ -302,61 +308,52 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> { // Find out the minimal length to prevent index out of bound errors. let mut min_len = values.len(); if let Some(levels) = def_levels { - min_len = cmp::min(min_len, levels.len()); + min_len = min_len.min(levels.len()); } if let Some(levels) = rep_levels { - min_len = cmp::min(min_len, levels.len()); + min_len = min_len.min(levels.len()); } // Find out number of batches to process. let write_batch_size = self.props.write_batch_size(); let num_batches = min_len / write_batch_size; - // Process pre-calculated statistics - match (min, max) { - (Some(min), Some(max)) => { - if self - .min_column_value - .as_ref() - .map_or(true, |v| self.compare_greater(v, min)) - { - self.min_column_value = Some(min.clone()); + if self.statistics_enabled == EnabledStatistics::Chunk { + match (min, max) { + (Some(min), Some(max)) => { + Self::update_min(&self.descr, min, &mut self.min_column_value); + Self::update_max(&self.descr, max, &mut self.max_column_value); } - if self - .max_column_value - .as_ref() - .map_or(true, |v| self.compare_greater(max, v)) - { - self.max_column_value = Some(max.clone()); + (None, Some(_)) | (Some(_), None) => { + panic!("min/max should be both set or both None") } - } - (None, Some(_)) | (Some(_), None) => { - panic!("min/max should be both set or both None") - } - (None, None) => {} + (None, None) => { + for val in values { + Self::update_min(&self.descr, val, &mut self.min_column_value); Review Comment: Is this the fix for https://github.com/apache/arrow-rs/issues/2015? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org