alamb commented on code in PR #8254:
URL: https://github.com/apache/arrow-datafusion/pull/8254#discussion_r1397564462
##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -256,167 +253,52 @@ impl FileFormat for ParquetFormat {
}
}
-fn summarize_min_max(
- max_values: &mut [Option<MaxAccumulator>],
- min_values: &mut [Option<MinAccumulator>],
- fields: &Fields,
- i: usize,
+/// Convert the statistics for a RowGroup ([`ParquetStatistics`]) to a
+/// [`ColumnStatistics`].
+fn column_chunk_statisics_to_column_statistics(
stat: &ParquetStatistics,
-) {
- match stat {
- ParquetStatistics::Boolean(s) => {
- if let DataType::Boolean = fields[i].data_type() {
- if s.has_min_max_set() {
- if let Some(max_value) = &mut max_values[i] {
- match
max_value.update_batch(&[Arc::new(BooleanArray::from(
- vec![Some(*s.max())],
- ))]) {
- Ok(_) => {}
- Err(_) => {
- max_values[i] = None;
- }
- }
- }
- if let Some(min_value) = &mut min_values[i] {
- match
min_value.update_batch(&[Arc::new(BooleanArray::from(
- vec![Some(*s.min())],
- ))]) {
- Ok(_) => {}
- Err(_) => {
- min_values[i] = None;
- }
- }
- }
- return;
- }
- }
- max_values[i] = None;
- min_values[i] = None;
- }
- ParquetStatistics::Int32(s) => {
- if let DataType::Int32 = fields[i].data_type() {
- if s.has_min_max_set() {
- if let Some(max_value) = &mut max_values[i] {
- match
max_value.update_batch(&[Arc::new(Int32Array::from_value(
- *s.max(),
- 1,
- ))]) {
- Ok(_) => {}
- Err(_) => {
- max_values[i] = None;
- }
- }
- }
- if let Some(min_value) = &mut min_values[i] {
- match
min_value.update_batch(&[Arc::new(Int32Array::from_value(
- *s.min(),
- 1,
- ))]) {
- Ok(_) => {}
- Err(_) => {
- min_values[i] = None;
- }
- }
- }
- return;
- }
- }
- max_values[i] = None;
- min_values[i] = None;
- }
- ParquetStatistics::Int64(s) => {
- if let DataType::Int64 = fields[i].data_type() {
- if s.has_min_max_set() {
- if let Some(max_value) = &mut max_values[i] {
- match
max_value.update_batch(&[Arc::new(Int64Array::from_value(
- *s.max(),
- 1,
- ))]) {
- Ok(_) => {}
- Err(_) => {
- max_values[i] = None;
- }
- }
- }
- if let Some(min_value) = &mut min_values[i] {
- match
min_value.update_batch(&[Arc::new(Int64Array::from_value(
- *s.min(),
- 1,
- ))]) {
- Ok(_) => {}
- Err(_) => {
- min_values[i] = None;
- }
- }
- }
- return;
- }
- }
- max_values[i] = None;
- min_values[i] = None;
- }
- ParquetStatistics::Float(s) => {
- if let DataType::Float32 = fields[i].data_type() {
- if s.has_min_max_set() {
- if let Some(max_value) = &mut max_values[i] {
- match
max_value.update_batch(&[Arc::new(Float32Array::from(
- vec![Some(*s.max())],
- ))]) {
- Ok(_) => {}
- Err(_) => {
- max_values[i] = None;
- }
- }
- }
- if let Some(min_value) = &mut min_values[i] {
- match
min_value.update_batch(&[Arc::new(Float32Array::from(
- vec![Some(*s.min())],
- ))]) {
- Ok(_) => {}
- Err(_) => {
- min_values[i] = None;
- }
- }
- }
- return;
- }
- }
- max_values[i] = None;
- min_values[i] = None;
- }
- ParquetStatistics::Double(s) => {
- if let DataType::Float64 = fields[i].data_type() {
- if s.has_min_max_set() {
- if let Some(max_value) = &mut max_values[i] {
- match
max_value.update_batch(&[Arc::new(Float64Array::from(
- vec![Some(*s.max())],
- ))]) {
- Ok(_) => {}
- Err(_) => {
- max_values[i] = None;
- }
- }
- }
- if let Some(min_value) = &mut min_values[i] {
- match
min_value.update_batch(&[Arc::new(Float64Array::from(
- vec![Some(*s.min())],
- ))]) {
- Ok(_) => {}
- Err(_) => {
- min_values[i] = None;
- }
- }
- }
- return;
- }
- }
- max_values[i] = None;
- min_values[i] = None;
- }
- _ => {
- max_values[i] = None;
- min_values[i] = None;
+) -> ColumnStatistics {
+ let (min_value, max_value) = if stat.has_min_max_set() {
+ match stat {
+ ParquetStatistics::Boolean(s) => (
+ Some(ScalarValue::Boolean(Some(*s.min()))),
Review Comment:
This may look like a regression in performance, but I think it will actually
perform better (as the old code is creating a single row array just to call an
accumulator method)
##########
datafusion/core/src/datasource/statistics.rs:
##########
@@ -57,83 +44,36 @@ pub async fn get_statistics_with_limit(
let (file, file_stats) = first_file?;
result_files.push(file);
- // First file, we set them directly from the file statistics.
- num_rows = file_stats.num_rows;
- total_byte_size = file_stats.total_byte_size;
- for (index, file_column) in
file_stats.column_statistics.into_iter().enumerate() {
- null_counts[index] = file_column.null_count;
- max_values[index] = file_column.max_value;
- min_values[index] = file_column.min_value;
- }
+ stats_agg.update(&file_stats, &file_schema)?;
// If the number of rows exceeds the limit, we can stop processing
// files. This only applies when we know the number of rows. It also
// currently ignores tables that have no statistics regarding the
// number of rows.
- let conservative_num_rows = match num_rows {
- Precision::Exact(nr) => nr,
+ let conservative_num_rows = match stats_agg.num_rows() {
+ Precision::Exact(nr) => *nr,
_ => usize::MIN,
};
- if conservative_num_rows <= limit.unwrap_or(usize::MAX) {
+
+ if conservative_num_rows <= limit {
while let Some(current) = all_files.next().await {
let (file, file_stats) = current?;
result_files.push(file);
-
- // We accumulate the number of rows, total byte size and null
- // counts across all the files in question. If any file does
not
- // provide any information or provides an inexact value, we
demote
- // the statistic precision to inexact.
- num_rows = add_row_stats(file_stats.num_rows, num_rows);
-
- total_byte_size =
- add_row_stats(file_stats.total_byte_size, total_byte_size);
-
- (null_counts, max_values, min_values) = multiunzip(
- izip!(
- file_stats.column_statistics.into_iter(),
- null_counts.into_iter(),
- max_values.into_iter(),
- min_values.into_iter()
- )
- .map(
- |(
- ColumnStatistics {
- null_count: file_nc,
- max_value: file_max,
- min_value: file_min,
- distinct_count: _,
- },
- null_count,
- max_value,
- min_value,
- )| {
- (
- add_row_stats(file_nc, null_count),
- set_max_if_greater(file_max, max_value),
- set_min_if_lesser(file_min, min_value),
- )
- },
- ),
- );
+ stats_agg.update(&file_stats, &file_schema)?;
// If the number of rows exceeds the limit, we can stop
processing
- // files. This only applies when we know the number of rows.
It also
- // currently ignores tables that have no statistics regarding
the
- // number of rows.
- if num_rows.get_value().unwrap_or(&usize::MIN)
Review Comment:
I think this check may be incorrect in the sense that even if the file
statistics are not precise, it may stop reading files early
##########
datafusion/core/src/datasource/file_format/parquet.rs:
##########
@@ -256,167 +253,52 @@ impl FileFormat for ParquetFormat {
}
}
-fn summarize_min_max(
- max_values: &mut [Option<MaxAccumulator>],
- min_values: &mut [Option<MinAccumulator>],
- fields: &Fields,
- i: usize,
+/// Convert the statistics for a RowGroup ([`ParquetStatistics`]) to a
+/// [`ColumnStatistics`].
+fn column_chunk_statisics_to_column_statistics(
stat: &ParquetStatistics,
-) {
- match stat {
- ParquetStatistics::Boolean(s) => {
- if let DataType::Boolean = fields[i].data_type() {
- if s.has_min_max_set() {
- if let Some(max_value) = &mut max_values[i] {
- match
max_value.update_batch(&[Arc::new(BooleanArray::from(
- vec![Some(*s.max())],
- ))]) {
- Ok(_) => {}
- Err(_) => {
- max_values[i] = None;
- }
- }
- }
- if let Some(min_value) = &mut min_values[i] {
- match
min_value.update_batch(&[Arc::new(BooleanArray::from(
- vec![Some(*s.min())],
- ))]) {
- Ok(_) => {}
- Err(_) => {
- min_values[i] = None;
- }
- }
- }
- return;
- }
- }
- max_values[i] = None;
- min_values[i] = None;
- }
- ParquetStatistics::Int32(s) => {
- if let DataType::Int32 = fields[i].data_type() {
- if s.has_min_max_set() {
- if let Some(max_value) = &mut max_values[i] {
- match
max_value.update_batch(&[Arc::new(Int32Array::from_value(
- *s.max(),
- 1,
- ))]) {
- Ok(_) => {}
- Err(_) => {
- max_values[i] = None;
- }
- }
- }
- if let Some(min_value) = &mut min_values[i] {
- match
min_value.update_batch(&[Arc::new(Int32Array::from_value(
- *s.min(),
- 1,
- ))]) {
- Ok(_) => {}
- Err(_) => {
- min_values[i] = None;
- }
- }
- }
- return;
- }
- }
- max_values[i] = None;
- min_values[i] = None;
- }
- ParquetStatistics::Int64(s) => {
- if let DataType::Int64 = fields[i].data_type() {
- if s.has_min_max_set() {
- if let Some(max_value) = &mut max_values[i] {
- match
max_value.update_batch(&[Arc::new(Int64Array::from_value(
- *s.max(),
- 1,
- ))]) {
- Ok(_) => {}
- Err(_) => {
- max_values[i] = None;
- }
- }
- }
- if let Some(min_value) = &mut min_values[i] {
- match
min_value.update_batch(&[Arc::new(Int64Array::from_value(
- *s.min(),
- 1,
- ))]) {
- Ok(_) => {}
- Err(_) => {
- min_values[i] = None;
- }
- }
- }
- return;
- }
- }
- max_values[i] = None;
- min_values[i] = None;
- }
- ParquetStatistics::Float(s) => {
- if let DataType::Float32 = fields[i].data_type() {
- if s.has_min_max_set() {
- if let Some(max_value) = &mut max_values[i] {
- match
max_value.update_batch(&[Arc::new(Float32Array::from(
- vec![Some(*s.max())],
- ))]) {
- Ok(_) => {}
- Err(_) => {
- max_values[i] = None;
- }
- }
- }
- if let Some(min_value) = &mut min_values[i] {
- match
min_value.update_batch(&[Arc::new(Float32Array::from(
- vec![Some(*s.min())],
- ))]) {
- Ok(_) => {}
- Err(_) => {
- min_values[i] = None;
- }
- }
- }
- return;
- }
- }
- max_values[i] = None;
- min_values[i] = None;
- }
- ParquetStatistics::Double(s) => {
- if let DataType::Float64 = fields[i].data_type() {
- if s.has_min_max_set() {
- if let Some(max_value) = &mut max_values[i] {
- match
max_value.update_batch(&[Arc::new(Float64Array::from(
- vec![Some(*s.max())],
- ))]) {
- Ok(_) => {}
- Err(_) => {
- max_values[i] = None;
- }
- }
- }
- if let Some(min_value) = &mut min_values[i] {
- match
min_value.update_batch(&[Arc::new(Float64Array::from(
- vec![Some(*s.min())],
- ))]) {
- Ok(_) => {}
- Err(_) => {
- min_values[i] = None;
- }
- }
- }
- return;
- }
- }
- max_values[i] = None;
- min_values[i] = None;
- }
- _ => {
- max_values[i] = None;
- min_values[i] = None;
+) -> ColumnStatistics {
+ let (min_value, max_value) = if stat.has_min_max_set() {
+ match stat {
+ ParquetStatistics::Boolean(s) => (
+ Some(ScalarValue::Boolean(Some(*s.min()))),
+ Some(ScalarValue::Boolean(Some(*s.max()))),
+ ),
+ ParquetStatistics::Int32(s) => (
+ Some(ScalarValue::Int32(Some(*s.min()))),
+ Some(ScalarValue::Int32(Some(*s.max()))),
+ ),
+ ParquetStatistics::Int64(s) => (
+ Some(ScalarValue::Int64(Some(*s.min()))),
+ Some(ScalarValue::Int64(Some(*s.max()))),
+ ),
+ ParquetStatistics::Float(s) => (
+ Some(ScalarValue::Float32(Some(*s.min()))),
+ Some(ScalarValue::Float32(Some(*s.max()))),
+ ),
+ ParquetStatistics::Double(s) => (
+ Some(ScalarValue::Float64(Some(*s.min()))),
+ Some(ScalarValue::Float64(Some(*s.max()))),
+ ),
+ // TODO: file ticket to support fetching byte array (aka string)
metadata
+ ParquetStatistics::ByteArray(_) => (None, None),
Review Comment:
I think this new structure also makes it clear DataFusion currently ignores
statistics for string columns
##########
datafusion/core/src/datasource/statistics.rs:
##########
@@ -16,39 +16,26 @@
// under the License.
use super::listing::PartitionedFile;
-use crate::arrow::datatypes::{Schema, SchemaRef};
+use crate::arrow::datatypes::SchemaRef;
use crate::error::Result;
-use crate::physical_plan::expressions::{MaxAccumulator, MinAccumulator};
-use crate::physical_plan::{Accumulator, ColumnStatistics, Statistics};
+use crate::physical_plan::Statistics;
-use datafusion_common::stats::Precision;
-use datafusion_common::ScalarValue;
+use datafusion_common::stats::{Precision, StatisticsAggregator};
use futures::{Stream, StreamExt};
-use itertools::izip;
-use itertools::multiunzip;
-/// Get all files as well as the file level summary statistics (no statistic
for partition columns).
-/// If the optional `limit` is provided, includes only sufficient files.
-/// Needed to read up to `limit` number of rows.
+/// Get all files as well as the file level summary statistics (no statistic
for
+/// partition columns). If the optional `limit` is provided, includes only
+/// sufficient files needed to read up to `limit` number of rows.
pub async fn get_statistics_with_limit(
all_files: impl Stream<Item = Result<(PartitionedFile, Statistics)>>,
file_schema: SchemaRef,
limit: Option<usize>,
) -> Result<(Vec<PartitionedFile>, Statistics)> {
+ let limit = limit.unwrap_or(usize::MAX);
+
let mut result_files = vec![];
- // These statistics can be calculated as long as at least one file provides
Review Comment:
This is basically a second (different) way to aggregate statistics
##########
datafusion/core/src/datasource/statistics.rs:
##########
@@ -143,117 +83,3 @@ pub async fn get_statistics_with_limit(
Ok((result_files, statistics))
}
-
-pub(crate) fn create_max_min_accs(
Review Comment:
This is all handled in the StatisticsAccumulator now
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]