alamb commented on code in PR #2022:
URL: https://github.com/apache/arrow-rs/pull/2022#discussion_r916303534


##########
parquet/src/arrow/schema.rs:
##########
@@ -152,7 +152,10 @@ pub(crate) fn add_encoded_arrow_schema_to_metadata(
         value: Some(encoded),
     };
 
-    let mut meta = props.key_value_metadata.clone().unwrap_or_default();
+    let meta = props

Review Comment:
   Does this change have any effect (other than to make the code more 
readable)? 



##########
parquet/src/column/writer.rs:
##########
@@ -302,61 +308,52 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
         // Find out the minimal length to prevent index out of bound errors.
         let mut min_len = values.len();
         if let Some(levels) = def_levels {
-            min_len = cmp::min(min_len, levels.len());
+            min_len = min_len.min(levels.len());
         }
         if let Some(levels) = rep_levels {
-            min_len = cmp::min(min_len, levels.len());
+            min_len = min_len.min(levels.len());
         }
 
         // Find out number of batches to process.
         let write_batch_size = self.props.write_batch_size();
         let num_batches = min_len / write_batch_size;
 
-        // Process pre-calculated statistics
-        match (min, max) {
-            (Some(min), Some(max)) => {
-                if self
-                    .min_column_value
-                    .as_ref()
-                    .map_or(true, |v| self.compare_greater(v, min))
-                {
-                    self.min_column_value = Some(min.clone());
+        if self.statistics_enabled == EnabledStatistics::Chunk {

Review Comment:
   In fact, no that I re-read this, shouldn't this also be set if we are 
computing page level statistics? 



##########
parquet/src/file/properties.rs:
##########
@@ -394,23 +376,33 @@ impl WriterPropertiesBuilder {
 
     /// Sets flag to enable/disable statistics for a column.
     /// Takes precedence over globally defined settings.
-    pub fn set_column_statistics_enabled(mut self, col: ColumnPath, value: 
bool) -> Self {
-        self.get_mut_props(col).set_statistics_enabled(value);
-        self
-    }
-
-    /// Sets max size for statistics for a column.
-    /// Takes precedence over globally defined settings.
-    pub fn set_column_max_statistics_size(
+    pub fn set_column_statistics_enabled(
         mut self,
         col: ColumnPath,
-        value: usize,
+        value: EnabledStatistics,
     ) -> Self {
-        self.get_mut_props(col).set_max_statistics_size(value);
+        self.get_mut_props(col).set_statistics_enabled(value);
         self
     }
 }
 
+/// Controls the level of statistics to be computed by the writer
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
+pub enum EnabledStatistics {

Review Comment:
   It would be good to clarify in here if `Page` also computes `Chunk` level 
statistics.
   
   If `Page` does not also include `Chunk` that seems strange to me (as I would 
expect lower level stats to also have coarser grained statistics). 
   
   



##########
parquet/src/column/writer.rs:
##########
@@ -1754,6 +1753,56 @@ mod tests {
         }
     }
 
+    #[test]
+    fn test_mixed_precomputed_statistics() {
+        let mut buf = Vec::with_capacity(100);
+        let mut write = TrackedWrite::new(&mut buf);
+        let page_writer = Box::new(SerializedPageWriter::new(&mut write));
+        let props = Arc::new(WriterProperties::builder().build());
+        let mut writer = get_test_column_writer::<Int32Type>(page_writer, 0, 
0, props);
+
+        writer.write_batch(&[1, 2, 3, 4], None, None).unwrap();
+        writer
+            .write_batch_with_statistics(
+                &[5, 6, 7],
+                None,
+                None,
+                Some(&5),
+                Some(&7),
+                Some(0),
+                Some(3),
+            )
+            .unwrap();
+
+        let (_, _, metadata, _, _) = writer.close().unwrap();
+
+        let stats = metadata.statistics().unwrap();
+        assert_eq!(stats.min_bytes(), 1_i32.to_le_bytes());
+        assert_eq!(stats.max_bytes(), 7_i32.to_le_bytes());
+        assert_eq!(stats.null_count(), 0);
+        assert!(stats.distinct_count().is_none());
+
+        let reader = SerializedPageReader::new(
+            std::io::Cursor::new(buf),
+            7,
+            Compression::UNCOMPRESSED,
+            Type::INT32,
+        )
+        .unwrap();
+
+        let pages = reader.collect::<Result<Vec<_>>>().unwrap();
+        assert_eq!(pages.len(), 2);
+
+        assert_eq!(pages[0].page_type(), PageType::DICTIONARY_PAGE);
+        assert_eq!(pages[1].page_type(), PageType::DATA_PAGE);
+
+        let page_statistics = pages[1].statistics().unwrap();
+        assert_eq!(page_statistics.min_bytes(), 1_i32.to_le_bytes());

Review Comment:
   👍 



##########
parquet/src/column/writer.rs:
##########
@@ -302,61 +308,52 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
         // Find out the minimal length to prevent index out of bound errors.
         let mut min_len = values.len();
         if let Some(levels) = def_levels {
-            min_len = cmp::min(min_len, levels.len());
+            min_len = min_len.min(levels.len());
         }
         if let Some(levels) = rep_levels {
-            min_len = cmp::min(min_len, levels.len());
+            min_len = min_len.min(levels.len());
         }
 
         // Find out number of batches to process.
         let write_batch_size = self.props.write_batch_size();
         let num_batches = min_len / write_batch_size;
 
-        // Process pre-calculated statistics
-        match (min, max) {
-            (Some(min), Some(max)) => {
-                if self
-                    .min_column_value
-                    .as_ref()
-                    .map_or(true, |v| self.compare_greater(v, min))
-                {
-                    self.min_column_value = Some(min.clone());
+        if self.statistics_enabled == EnabledStatistics::Chunk {

Review Comment:
   I would personally find `match self.statistics_enabled` ... to be easier to 
validate that other variants of `EnabledStatistics::Chunk` did not apply.
   
   Or perhaps a function `EnabledStatistics::compute_chunk_statistics()`.
   
   Basically I am thinking about "what if someone adds a new variant" -- it 
would be nice if the compiler told them where to update



##########
parquet/src/column/writer.rs:
##########
@@ -393,9 +389,11 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
         self.write_batch_internal(values, def_levels, rep_levels, None, None, 
None, None)
     }
 
-    /// Writer may optionally provide pre-calculated statistics for this 
batch, in which case we do
-    /// not calculate page level statistics as this will defeat the purpose of 
speeding up the write
-    /// process with pre-calculated statistics.
+    /// Writer may optionally provide pre-calculated statistics for this batch
+    ///
+    /// Note: Unless disabled using by using [`WriterProperties`] to set
+    /// enabled statistics to [`EnabledStatistics::Chunk`], this will still 
compute

Review Comment:
   This comment almost reads the opposite of what I think the code is doing 
(using the passed in pre-calculated statistics or computing them if passed in)



##########
parquet/src/file/properties.rs:
##########
@@ -463,15 +453,10 @@ impl ColumnProperties {
     }
 
     /// Sets whether or not statistics are enabled for this column.
-    fn set_statistics_enabled(&mut self, enabled: bool) {
+    fn set_statistics_enabled(&mut self, enabled: EnabledStatistics) {
         self.statistics_enabled = Some(enabled);
     }
 
-    /// Sets max size for statistics for this column.

Review Comment:
   I don't see any mention of `max_statistics_size` in this PR description or 
the linked tickets. 
   
   I think it is a reasonable feature (to limit bloating parquet files). Is the 
issue that it was ignored? If so perhaps we can leave the API in with a `warn` 
or `deprecate` that it is ignored and file a ticket to properly support it?



##########
parquet/src/column/writer.rs:
##########
@@ -1029,59 +1025,58 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
         }
     }
 
-    #[allow(clippy::eq_op)]
     fn update_page_min_max(&mut self, val: &T::T) {
+        Self::update_min(&self.descr, val, &mut self.min_page_value);
+        Self::update_max(&self.descr, val, &mut self.max_page_value);
+    }
+
+    fn update_column_min_max(&mut self) {
+        let min = self.min_page_value.as_ref().unwrap();
+        Self::update_min(&self.descr, min, &mut self.min_column_value);
+
+        let max = self.max_page_value.as_ref().unwrap();
+        Self::update_max(&self.descr, max, &mut self.max_column_value);
+    }
+
+    fn update_min(descr: &ColumnDescriptor, val: &T::T, min: &mut 
Option<T::T>) {
+        Self::update_stat(val, min, |cur| Self::compare_greater(descr, cur, 
val))
+    }
+
+    fn update_max(descr: &ColumnDescriptor, val: &T::T, max: &mut 
Option<T::T>) {
+        Self::update_stat(val, max, |cur| Self::compare_greater(descr, val, 
cur))
+    }
+
+    /// Perform a conditional update of `cur`, skipping any NaN values
+    ///
+    /// If `cur` is `None`, sets `cur` to `Some(val)`, otherwise calls 
`should_update` with
+    /// the value of `cur`, and updates `cur` to `Some(val)` if it returns 
`true`

Review Comment:
   cc @crepererum as I think you added the initial `NaN` handling a while ago



##########
parquet/src/file/properties.rs:
##########
@@ -394,23 +376,33 @@ impl WriterPropertiesBuilder {
 
     /// Sets flag to enable/disable statistics for a column.
     /// Takes precedence over globally defined settings.
-    pub fn set_column_statistics_enabled(mut self, col: ColumnPath, value: 
bool) -> Self {
-        self.get_mut_props(col).set_statistics_enabled(value);
-        self
-    }
-
-    /// Sets max size for statistics for a column.
-    /// Takes precedence over globally defined settings.
-    pub fn set_column_max_statistics_size(
+    pub fn set_column_statistics_enabled(
         mut self,
         col: ColumnPath,
-        value: usize,
+        value: EnabledStatistics,
     ) -> Self {
-        self.get_mut_props(col).set_max_statistics_size(value);
+        self.get_mut_props(col).set_statistics_enabled(value);
         self
     }
 }
 
+/// Controls the level of statistics to be computed by the writer
+#[derive(Debug, Clone, Copy, Eq, PartialEq)]
+pub enum EnabledStatistics {

Review Comment:
   I actually like this API -- as for some use cases page level statistics are 
likely to have a higher overhead in storage and writing than benefit (e.g. 
large string columns for small row groups)



##########
parquet/src/column/writer.rs:
##########
@@ -302,61 +308,52 @@ impl<'a, T: DataType> ColumnWriterImpl<'a, T> {
         // Find out the minimal length to prevent index out of bound errors.
         let mut min_len = values.len();
         if let Some(levels) = def_levels {
-            min_len = cmp::min(min_len, levels.len());
+            min_len = min_len.min(levels.len());
         }
         if let Some(levels) = rep_levels {
-            min_len = cmp::min(min_len, levels.len());
+            min_len = min_len.min(levels.len());
         }
 
         // Find out number of batches to process.
         let write_batch_size = self.props.write_batch_size();
         let num_batches = min_len / write_batch_size;
 
-        // Process pre-calculated statistics
-        match (min, max) {
-            (Some(min), Some(max)) => {
-                if self
-                    .min_column_value
-                    .as_ref()
-                    .map_or(true, |v| self.compare_greater(v, min))
-                {
-                    self.min_column_value = Some(min.clone());
+        if self.statistics_enabled == EnabledStatistics::Chunk {
+            match (min, max) {
+                (Some(min), Some(max)) => {
+                    Self::update_min(&self.descr, min, &mut 
self.min_column_value);
+                    Self::update_max(&self.descr, max, &mut 
self.max_column_value);
                 }
-                if self
-                    .max_column_value
-                    .as_ref()
-                    .map_or(true, |v| self.compare_greater(max, v))
-                {
-                    self.max_column_value = Some(max.clone());
+                (None, Some(_)) | (Some(_), None) => {
+                    panic!("min/max should be both set or both None")
                 }
-            }
-            (None, Some(_)) | (Some(_), None) => {
-                panic!("min/max should be both set or both None")
-            }
-            (None, None) => {}
+                (None, None) => {
+                    for val in values {
+                        Self::update_min(&self.descr, val, &mut 
self.min_column_value);

Review Comment:
   Is this the fix for https://github.com/apache/arrow-rs/issues/2015?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@arrow.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to