alamb commented on code in PR #8254:
URL: https://github.com/apache/arrow-datafusion/pull/8254#discussion_r1397564462


##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -256,167 +253,52 @@ impl FileFormat for ParquetFormat {
     }
 }
 
-fn summarize_min_max(
-    max_values: &mut [Option<MaxAccumulator>],
-    min_values: &mut [Option<MinAccumulator>],
-    fields: &Fields,
-    i: usize,
+/// Convert the statistics for a RowGroup ([`ParquetStatistics`]) to a
+/// [`ColumnStatistics`].
+fn column_chunk_statisics_to_column_statistics(
     stat: &ParquetStatistics,
-) {
-    match stat {
-        ParquetStatistics::Boolean(s) => {
-            if let DataType::Boolean = fields[i].data_type() {
-                if s.has_min_max_set() {
-                    if let Some(max_value) = &mut max_values[i] {
-                        match 
max_value.update_batch(&[Arc::new(BooleanArray::from(
-                            vec![Some(*s.max())],
-                        ))]) {
-                            Ok(_) => {}
-                            Err(_) => {
-                                max_values[i] = None;
-                            }
-                        }
-                    }
-                    if let Some(min_value) = &mut min_values[i] {
-                        match 
min_value.update_batch(&[Arc::new(BooleanArray::from(
-                            vec![Some(*s.min())],
-                        ))]) {
-                            Ok(_) => {}
-                            Err(_) => {
-                                min_values[i] = None;
-                            }
-                        }
-                    }
-                    return;
-                }
-            }
-            max_values[i] = None;
-            min_values[i] = None;
-        }
-        ParquetStatistics::Int32(s) => {
-            if let DataType::Int32 = fields[i].data_type() {
-                if s.has_min_max_set() {
-                    if let Some(max_value) = &mut max_values[i] {
-                        match 
max_value.update_batch(&[Arc::new(Int32Array::from_value(
-                            *s.max(),
-                            1,
-                        ))]) {
-                            Ok(_) => {}
-                            Err(_) => {
-                                max_values[i] = None;
-                            }
-                        }
-                    }
-                    if let Some(min_value) = &mut min_values[i] {
-                        match 
min_value.update_batch(&[Arc::new(Int32Array::from_value(
-                            *s.min(),
-                            1,
-                        ))]) {
-                            Ok(_) => {}
-                            Err(_) => {
-                                min_values[i] = None;
-                            }
-                        }
-                    }
-                    return;
-                }
-            }
-            max_values[i] = None;
-            min_values[i] = None;
-        }
-        ParquetStatistics::Int64(s) => {
-            if let DataType::Int64 = fields[i].data_type() {
-                if s.has_min_max_set() {
-                    if let Some(max_value) = &mut max_values[i] {
-                        match 
max_value.update_batch(&[Arc::new(Int64Array::from_value(
-                            *s.max(),
-                            1,
-                        ))]) {
-                            Ok(_) => {}
-                            Err(_) => {
-                                max_values[i] = None;
-                            }
-                        }
-                    }
-                    if let Some(min_value) = &mut min_values[i] {
-                        match 
min_value.update_batch(&[Arc::new(Int64Array::from_value(
-                            *s.min(),
-                            1,
-                        ))]) {
-                            Ok(_) => {}
-                            Err(_) => {
-                                min_values[i] = None;
-                            }
-                        }
-                    }
-                    return;
-                }
-            }
-            max_values[i] = None;
-            min_values[i] = None;
-        }
-        ParquetStatistics::Float(s) => {
-            if let DataType::Float32 = fields[i].data_type() {
-                if s.has_min_max_set() {
-                    if let Some(max_value) = &mut max_values[i] {
-                        match 
max_value.update_batch(&[Arc::new(Float32Array::from(
-                            vec![Some(*s.max())],
-                        ))]) {
-                            Ok(_) => {}
-                            Err(_) => {
-                                max_values[i] = None;
-                            }
-                        }
-                    }
-                    if let Some(min_value) = &mut min_values[i] {
-                        match 
min_value.update_batch(&[Arc::new(Float32Array::from(
-                            vec![Some(*s.min())],
-                        ))]) {
-                            Ok(_) => {}
-                            Err(_) => {
-                                min_values[i] = None;
-                            }
-                        }
-                    }
-                    return;
-                }
-            }
-            max_values[i] = None;
-            min_values[i] = None;
-        }
-        ParquetStatistics::Double(s) => {
-            if let DataType::Float64 = fields[i].data_type() {
-                if s.has_min_max_set() {
-                    if let Some(max_value) = &mut max_values[i] {
-                        match 
max_value.update_batch(&[Arc::new(Float64Array::from(
-                            vec![Some(*s.max())],
-                        ))]) {
-                            Ok(_) => {}
-                            Err(_) => {
-                                max_values[i] = None;
-                            }
-                        }
-                    }
-                    if let Some(min_value) = &mut min_values[i] {
-                        match 
min_value.update_batch(&[Arc::new(Float64Array::from(
-                            vec![Some(*s.min())],
-                        ))]) {
-                            Ok(_) => {}
-                            Err(_) => {
-                                min_values[i] = None;
-                            }
-                        }
-                    }
-                    return;
-                }
-            }
-            max_values[i] = None;
-            min_values[i] = None;
-        }
-        _ => {
-            max_values[i] = None;
-            min_values[i] = None;
+) -> ColumnStatistics {
+    let (min_value, max_value) = if stat.has_min_max_set() {
+        match stat {
+            ParquetStatistics::Boolean(s) => (
+                Some(ScalarValue::Boolean(Some(*s.min()))),

Review Comment:
   This may look like a regression in performance, but I think it will actually 
perform better (as the old code is creating a single row array just to call an 
accumulator method)



##########
datafusion/core/src/datasource/statistics.rs:
##########
@@ -57,83 +44,36 @@ pub async fn get_statistics_with_limit(
         let (file, file_stats) = first_file?;
         result_files.push(file);
 
-        // First file, we set them directly from the file statistics.
-        num_rows = file_stats.num_rows;
-        total_byte_size = file_stats.total_byte_size;
-        for (index, file_column) in 
file_stats.column_statistics.into_iter().enumerate() {
-            null_counts[index] = file_column.null_count;
-            max_values[index] = file_column.max_value;
-            min_values[index] = file_column.min_value;
-        }
+        stats_agg.update(&file_stats, &file_schema)?;
 
         // If the number of rows exceeds the limit, we can stop processing
         // files. This only applies when we know the number of rows. It also
         // currently ignores tables that have no statistics regarding the
         // number of rows.
-        let conservative_num_rows = match num_rows {
-            Precision::Exact(nr) => nr,
+        let conservative_num_rows = match stats_agg.num_rows() {
+            Precision::Exact(nr) => *nr,
             _ => usize::MIN,
         };
-        if conservative_num_rows <= limit.unwrap_or(usize::MAX) {
+
+        if conservative_num_rows <= limit {
             while let Some(current) = all_files.next().await {
                 let (file, file_stats) = current?;
                 result_files.push(file);
-
-                // We accumulate the number of rows, total byte size and null
-                // counts across all the files in question. If any file does 
not
-                // provide any information or provides an inexact value, we 
demote
-                // the statistic precision to inexact.
-                num_rows = add_row_stats(file_stats.num_rows, num_rows);
-
-                total_byte_size =
-                    add_row_stats(file_stats.total_byte_size, total_byte_size);
-
-                (null_counts, max_values, min_values) = multiunzip(
-                    izip!(
-                        file_stats.column_statistics.into_iter(),
-                        null_counts.into_iter(),
-                        max_values.into_iter(),
-                        min_values.into_iter()
-                    )
-                    .map(
-                        |(
-                            ColumnStatistics {
-                                null_count: file_nc,
-                                max_value: file_max,
-                                min_value: file_min,
-                                distinct_count: _,
-                            },
-                            null_count,
-                            max_value,
-                            min_value,
-                        )| {
-                            (
-                                add_row_stats(file_nc, null_count),
-                                set_max_if_greater(file_max, max_value),
-                                set_min_if_lesser(file_min, min_value),
-                            )
-                        },
-                    ),
-                );
+                stats_agg.update(&file_stats, &file_schema)?;
 
                 // If the number of rows exceeds the limit, we can stop 
processing
-                // files. This only applies when we know the number of rows. 
It also
-                // currently ignores tables that have no statistics regarding 
the
-                // number of rows.
-                if num_rows.get_value().unwrap_or(&usize::MIN)

Review Comment:
   I think this check may be incorrect in the sense that even if the file 
statistics are not precise, it may stop reading files early



##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -256,167 +253,52 @@ impl FileFormat for ParquetFormat {
     }
 }
 
-fn summarize_min_max(
-    max_values: &mut [Option<MaxAccumulator>],
-    min_values: &mut [Option<MinAccumulator>],
-    fields: &Fields,
-    i: usize,
+/// Convert the statistics for a RowGroup ([`ParquetStatistics`]) to a
+/// [`ColumnStatistics`].
+fn column_chunk_statisics_to_column_statistics(
     stat: &ParquetStatistics,
-) {
-    match stat {
-        ParquetStatistics::Boolean(s) => {
-            if let DataType::Boolean = fields[i].data_type() {
-                if s.has_min_max_set() {
-                    if let Some(max_value) = &mut max_values[i] {
-                        match 
max_value.update_batch(&[Arc::new(BooleanArray::from(
-                            vec![Some(*s.max())],
-                        ))]) {
-                            Ok(_) => {}
-                            Err(_) => {
-                                max_values[i] = None;
-                            }
-                        }
-                    }
-                    if let Some(min_value) = &mut min_values[i] {
-                        match 
min_value.update_batch(&[Arc::new(BooleanArray::from(
-                            vec![Some(*s.min())],
-                        ))]) {
-                            Ok(_) => {}
-                            Err(_) => {
-                                min_values[i] = None;
-                            }
-                        }
-                    }
-                    return;
-                }
-            }
-            max_values[i] = None;
-            min_values[i] = None;
-        }
-        ParquetStatistics::Int32(s) => {
-            if let DataType::Int32 = fields[i].data_type() {
-                if s.has_min_max_set() {
-                    if let Some(max_value) = &mut max_values[i] {
-                        match 
max_value.update_batch(&[Arc::new(Int32Array::from_value(
-                            *s.max(),
-                            1,
-                        ))]) {
-                            Ok(_) => {}
-                            Err(_) => {
-                                max_values[i] = None;
-                            }
-                        }
-                    }
-                    if let Some(min_value) = &mut min_values[i] {
-                        match 
min_value.update_batch(&[Arc::new(Int32Array::from_value(
-                            *s.min(),
-                            1,
-                        ))]) {
-                            Ok(_) => {}
-                            Err(_) => {
-                                min_values[i] = None;
-                            }
-                        }
-                    }
-                    return;
-                }
-            }
-            max_values[i] = None;
-            min_values[i] = None;
-        }
-        ParquetStatistics::Int64(s) => {
-            if let DataType::Int64 = fields[i].data_type() {
-                if s.has_min_max_set() {
-                    if let Some(max_value) = &mut max_values[i] {
-                        match 
max_value.update_batch(&[Arc::new(Int64Array::from_value(
-                            *s.max(),
-                            1,
-                        ))]) {
-                            Ok(_) => {}
-                            Err(_) => {
-                                max_values[i] = None;
-                            }
-                        }
-                    }
-                    if let Some(min_value) = &mut min_values[i] {
-                        match 
min_value.update_batch(&[Arc::new(Int64Array::from_value(
-                            *s.min(),
-                            1,
-                        ))]) {
-                            Ok(_) => {}
-                            Err(_) => {
-                                min_values[i] = None;
-                            }
-                        }
-                    }
-                    return;
-                }
-            }
-            max_values[i] = None;
-            min_values[i] = None;
-        }
-        ParquetStatistics::Float(s) => {
-            if let DataType::Float32 = fields[i].data_type() {
-                if s.has_min_max_set() {
-                    if let Some(max_value) = &mut max_values[i] {
-                        match 
max_value.update_batch(&[Arc::new(Float32Array::from(
-                            vec![Some(*s.max())],
-                        ))]) {
-                            Ok(_) => {}
-                            Err(_) => {
-                                max_values[i] = None;
-                            }
-                        }
-                    }
-                    if let Some(min_value) = &mut min_values[i] {
-                        match 
min_value.update_batch(&[Arc::new(Float32Array::from(
-                            vec![Some(*s.min())],
-                        ))]) {
-                            Ok(_) => {}
-                            Err(_) => {
-                                min_values[i] = None;
-                            }
-                        }
-                    }
-                    return;
-                }
-            }
-            max_values[i] = None;
-            min_values[i] = None;
-        }
-        ParquetStatistics::Double(s) => {
-            if let DataType::Float64 = fields[i].data_type() {
-                if s.has_min_max_set() {
-                    if let Some(max_value) = &mut max_values[i] {
-                        match 
max_value.update_batch(&[Arc::new(Float64Array::from(
-                            vec![Some(*s.max())],
-                        ))]) {
-                            Ok(_) => {}
-                            Err(_) => {
-                                max_values[i] = None;
-                            }
-                        }
-                    }
-                    if let Some(min_value) = &mut min_values[i] {
-                        match 
min_value.update_batch(&[Arc::new(Float64Array::from(
-                            vec![Some(*s.min())],
-                        ))]) {
-                            Ok(_) => {}
-                            Err(_) => {
-                                min_values[i] = None;
-                            }
-                        }
-                    }
-                    return;
-                }
-            }
-            max_values[i] = None;
-            min_values[i] = None;
-        }
-        _ => {
-            max_values[i] = None;
-            min_values[i] = None;
+) -> ColumnStatistics {
+    let (min_value, max_value) = if stat.has_min_max_set() {
+        match stat {
+            ParquetStatistics::Boolean(s) => (
+                Some(ScalarValue::Boolean(Some(*s.min()))),
+                Some(ScalarValue::Boolean(Some(*s.max()))),
+            ),
+            ParquetStatistics::Int32(s) => (
+                Some(ScalarValue::Int32(Some(*s.min()))),
+                Some(ScalarValue::Int32(Some(*s.max()))),
+            ),
+            ParquetStatistics::Int64(s) => (
+                Some(ScalarValue::Int64(Some(*s.min()))),
+                Some(ScalarValue::Int64(Some(*s.max()))),
+            ),
+            ParquetStatistics::Float(s) => (
+                Some(ScalarValue::Float32(Some(*s.min()))),
+                Some(ScalarValue::Float32(Some(*s.max()))),
+            ),
+            ParquetStatistics::Double(s) => (
+                Some(ScalarValue::Float64(Some(*s.min()))),
+                Some(ScalarValue::Float64(Some(*s.max()))),
+            ),
+            // TODO: file ticket to support fetching byte array (aka string) 
metadata
+            ParquetStatistics::ByteArray(_) => (None, None),

Review Comment:
   I think this new structure also makes it clear DataFusion currently ignores 
statistics for string columns



##########
datafusion/core/src/datasource/statistics.rs:
##########
@@ -16,39 +16,26 @@
 // under the License.
 
 use super::listing::PartitionedFile;
-use crate::arrow::datatypes::{Schema, SchemaRef};
+use crate::arrow::datatypes::SchemaRef;
 use crate::error::Result;
-use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
-use crate::physical_plan::{Accumulator, ColumnStatistics, Statistics};
+use crate::physical_plan::Statistics;
 
-use datafusion_common::stats::Precision;
-use datafusion_common::ScalarValue;
+use datafusion_common::stats::{Precision, StatisticsAggregator};
 
 use futures::{Stream, StreamExt};
-use itertools::izip;
-use itertools::multiunzip;
 
-/// Get all files as well as the file level summary statistics (no statistic 
for partition columns).
-/// If the optional `limit` is provided, includes only sufficient files.
-/// Needed to read up to `limit` number of rows.
+/// Get all files as well as the file level summary statistics (no statistic 
for
+/// partition columns). If the optional `limit` is provided, includes only
+/// sufficient files needed to read up to `limit` number of rows.
 pub async fn get_statistics_with_limit(
     all_files: impl Stream<Item = Result<(PartitionedFile, Statistics)>>,
     file_schema: SchemaRef,
     limit: Option<usize>,
 ) -> Result<(Vec<PartitionedFile>, Statistics)> {
+    let limit = limit.unwrap_or(usize::MAX);
+
     let mut result_files = vec![];
-    // These statistics can be calculated as long as at least one file provides

Review Comment:
   This is basically a second (different) way to aggregate statistics



##########
datafusion/core/src/datasource/statistics.rs:
##########
@@ -143,117 +83,3 @@ pub async fn get_statistics_with_limit(
 
     Ok((result_files, statistics))
 }
-
-pub(crate) fn create_max_min_accs(

Review Comment:
   This is all handled in the StatisticsAccumulator now



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to