This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 93fb7157b4 Extract drive-by fixes from PR 12135 for easier reviewing 
(#12240)
93fb7157b4 is described below

commit 93fb7157b454f00319d45ce01e1ba2bdebb52b48
Author: June <[email protected]>
AuthorDate: Mon Sep 2 06:10:17 2024 -0600

    Extract drive-by fixes from PR 12135 for easier reviewing (#12240)
    
    * Extract drive-by fixes from PR 12135 for easier reviewing
    
    * Add a few more cfgs to silence warnings with different feature sets
    
    * fmt
---
 datafusion/common/src/hash_utils.rs                |  2 +
 datafusion/core/src/datasource/listing/helpers.rs  | 16 ++--
 datafusion/core/src/datasource/listing/table.rs    |  7 +-
 .../src/datasource/physical_plan/parquet/mod.rs    | 13 +--
 .../datasource/physical_plan/parquet/row_filter.rs | 95 ++++++++--------------
 datafusion/core/src/datasource/statistics.rs       | 26 ++++--
 .../core/src/execution/session_state_defaults.rs   |  3 +
 datafusion/core/src/physical_optimizer/pruning.rs  |  4 +
 datafusion/physical-expr/src/expressions/binary.rs | 20 +++--
 datafusion/physical-plan/src/execution_plan.rs     |  2 +-
 10 files changed, 93 insertions(+), 95 deletions(-)

diff --git a/datafusion/common/src/hash_utils.rs 
b/datafusion/common/src/hash_utils.rs
index f3d2a0a4f9..72cfeafd0b 100644
--- a/datafusion/common/src/hash_utils.rs
+++ b/datafusion/common/src/hash_utils.rs
@@ -245,6 +245,8 @@ fn hash_struct_array(
     Ok(())
 }
 
+// only adding this `cfg` b/c this function is only used with this `cfg`
+#[cfg(not(feature = "force_hash_collisions"))]
 fn hash_map_array(
     array: &MapArray,
     random_state: &RandomState,
diff --git a/datafusion/core/src/datasource/listing/helpers.rs 
b/datafusion/core/src/datasource/listing/helpers.rs
index dbeaf5dfcc..33a16237e1 100644
--- a/datafusion/core/src/datasource/listing/helpers.rs
+++ b/datafusion/core/src/datasource/listing/helpers.rs
@@ -51,12 +51,12 @@ use object_store::{ObjectMeta, ObjectStore};
 /// - the table provider can filter the table partition values with this 
expression
 /// - the expression can be marked as `TableProviderFilterPushDown::Exact` 
once this filtering
 ///   was performed
-pub fn expr_applicable_for_cols(col_names: &[String], expr: &Expr) -> bool {
+pub fn expr_applicable_for_cols(col_names: &[&str], expr: &Expr) -> bool {
     let mut is_applicable = true;
     expr.apply(|expr| {
         match expr {
             Expr::Column(Column { ref name, .. }) => {
-                is_applicable &= col_names.contains(name);
+                is_applicable &= col_names.contains(&name.as_str());
                 if is_applicable {
                     Ok(TreeNodeRecursion::Jump)
                 } else {
@@ -745,27 +745,27 @@ mod tests {
     #[test]
     fn test_expr_applicable_for_cols() {
         assert!(expr_applicable_for_cols(
-            &[String::from("c1")],
+            &["c1"],
             &Expr::eq(col("c1"), lit("value"))
         ));
         assert!(!expr_applicable_for_cols(
-            &[String::from("c1")],
+            &["c1"],
             &Expr::eq(col("c2"), lit("value"))
         ));
         assert!(!expr_applicable_for_cols(
-            &[String::from("c1")],
+            &["c1"],
             &Expr::eq(col("c1"), col("c2"))
         ));
         assert!(expr_applicable_for_cols(
-            &[String::from("c1"), String::from("c2")],
+            &["c1", "c2"],
             &Expr::eq(col("c1"), col("c2"))
         ));
         assert!(expr_applicable_for_cols(
-            &[String::from("c1"), String::from("c2")],
+            &["c1", "c2"],
             &(Expr::eq(col("c1"), col("c2").alias("c2_alias"))).not()
         ));
         assert!(expr_applicable_for_cols(
-            &[String::from("c1"), String::from("c2")],
+            &["c1", "c2"],
             &(case(col("c1"))
                 .when(lit("v1"), lit(true))
                 .otherwise(lit(false))
diff --git a/datafusion/core/src/datasource/listing/table.rs 
b/datafusion/core/src/datasource/listing/table.rs
index 35286612a8..9246226d43 100644
--- a/datafusion/core/src/datasource/listing/table.rs
+++ b/datafusion/core/src/datasource/listing/table.rs
@@ -826,7 +826,7 @@ impl TableProvider for ListingTable {
         &self,
         filters: &[&Expr],
     ) -> Result<Vec<TableProviderFilterPushDown>> {
-        let support: Vec<_> = filters
+        Ok(filters
             .iter()
             .map(|filter| {
                 if expr_applicable_for_cols(
@@ -834,7 +834,7 @@ impl TableProvider for ListingTable {
                         .options
                         .table_partition_cols
                         .iter()
-                        .map(|x| x.0.clone())
+                        .map(|x| x.0.as_str())
                         .collect::<Vec<_>>(),
                     filter,
                 ) {
@@ -846,8 +846,7 @@ impl TableProvider for ListingTable {
                     TableProviderFilterPushDown::Inexact
                 }
             })
-            .collect();
-        Ok(support)
+            .collect())
     }
 
     fn get_table_definition(&self) -> Option<&str> {
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 85d6f8db23..b2f86db742 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -685,10 +685,12 @@ impl ExecutionPlan for ParquetExec {
         partition_index: usize,
         ctx: Arc<TaskContext>,
     ) -> Result<SendableRecordBatchStream> {
-        let projection = match 
self.base_config.file_column_projection_indices() {
-            Some(proj) => proj,
-            None => (0..self.base_config.file_schema.fields().len()).collect(),
-        };
+        let projection = self
+            .base_config
+            .file_column_projection_indices()
+            .unwrap_or_else(|| {
+                (0..self.base_config.file_schema.fields().len()).collect()
+            });
 
         let parquet_file_reader_factory = self
             .parquet_file_reader_factory
@@ -698,8 +700,7 @@ impl ExecutionPlan for ParquetExec {
                 ctx.runtime_env()
                     .object_store(&self.base_config.object_store_url)
                     .map(|store| {
-                        Arc::new(DefaultParquetFileReaderFactory::new(store))
-                            as Arc<dyn ParquetFileReaderFactory>
+                        Arc::new(DefaultParquetFileReaderFactory::new(store)) 
as _
                     })
             })?;
 
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
index 23fdadc2cd..59d23fd68c 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
@@ -59,6 +59,7 @@
 //!    the unsorted predicates. Within each partition, predicates are
 //!    still be sorted by size.
 
+use std::cmp::Ordering;
 use std::collections::BTreeSet;
 use std::sync::Arc;
 
@@ -129,7 +130,7 @@ impl DatafusionArrowPredicate {
         // on the order they appear in the file
         let projection = match candidate.projection.len() {
             0 | 1 => vec![],
-            _ => remap_projection(&candidate.projection),
+            2.. => remap_projection(&candidate.projection),
         };
 
         Ok(Self {
@@ -151,32 +152,31 @@ impl ArrowPredicate for DatafusionArrowPredicate {
         &self.projection_mask
     }
 
-    fn evaluate(&mut self, batch: RecordBatch) -> ArrowResult<BooleanArray> {
-        let batch = match self.projection.is_empty() {
-            true => batch,
-            false => batch.project(&self.projection)?,
+    fn evaluate(&mut self, mut batch: RecordBatch) -> 
ArrowResult<BooleanArray> {
+        if !self.projection.is_empty() {
+            batch = batch.project(&self.projection)?;
         };
 
         let batch = self.schema_mapping.map_partial_batch(batch)?;
 
         // scoped timer updates on drop
         let mut timer = self.time.timer();
-        match self
-            .physical_expr
+
+        self.physical_expr
             .evaluate(&batch)
             .and_then(|v| v.into_array(batch.num_rows()))
-        {
-            Ok(array) => {
+            .and_then(|array| {
                 let bool_arr = as_boolean_array(&array)?.clone();
                 let num_filtered = bool_arr.len() - bool_arr.true_count();
                 self.rows_filtered.add(num_filtered);
                 timer.stop();
                 Ok(bool_arr)
-            }
-            Err(e) => Err(ArrowError::ComputeError(format!(
-                "Error evaluating filter predicate: {e:?}"
-            ))),
-        }
+            })
+            .map_err(|e| {
+                ArrowError::ComputeError(format!(
+                    "Error evaluating filter predicate: {e:?}"
+                ))
+            })
     }
 }
 
@@ -453,62 +453,33 @@ pub fn build_row_filter(
 
     // no candidates
     if candidates.is_empty() {
-        Ok(None)
-    } else if reorder_predicates {
-        // attempt to reorder the predicates by size and whether they are 
sorted
-        candidates.sort_by_key(|c| c.required_bytes);
-
-        let (indexed_candidates, other_candidates): (Vec<_>, Vec<_>) =
-            candidates.into_iter().partition(|c| c.can_use_index);
-
-        let mut filters: Vec<Box<dyn ArrowPredicate>> = vec![];
-
-        for candidate in indexed_candidates {
-            let filter = DatafusionArrowPredicate::try_new(
-                candidate,
-                file_schema,
-                metadata,
-                rows_filtered.clone(),
-                time.clone(),
-                Arc::clone(&schema_mapping),
-            )?;
-
-            filters.push(Box::new(filter));
-        }
-
-        for candidate in other_candidates {
-            let filter = DatafusionArrowPredicate::try_new(
-                candidate,
-                file_schema,
-                metadata,
-                rows_filtered.clone(),
-                time.clone(),
-                Arc::clone(&schema_mapping),
-            )?;
+        return Ok(None);
+    }
 
-            filters.push(Box::new(filter));
-        }
+    if reorder_predicates {
+        candidates.sort_unstable_by(|c1, c2| {
+            match c1.can_use_index.cmp(&c2.can_use_index) {
+                Ordering::Equal => c1.required_bytes.cmp(&c2.required_bytes),
+                ord => ord,
+            }
+        });
+    }
 
-        Ok(Some(RowFilter::new(filters)))
-    } else {
-        // otherwise evaluate the predicates in the order the appeared in the
-        // original expressions
-        let mut filters: Vec<Box<dyn ArrowPredicate>> = vec![];
-        for candidate in candidates {
-            let filter = DatafusionArrowPredicate::try_new(
+    candidates
+        .into_iter()
+        .map(|candidate| {
+            DatafusionArrowPredicate::try_new(
                 candidate,
                 file_schema,
                 metadata,
                 rows_filtered.clone(),
                 time.clone(),
                 Arc::clone(&schema_mapping),
-            )?;
-
-            filters.push(Box::new(filter));
-        }
-
-        Ok(Some(RowFilter::new(filters)))
-    }
+            )
+            .map(|pred| Box::new(pred) as _)
+        })
+        .collect::<Result<Vec<_>, _>>()
+        .map(|filters| Some(RowFilter::new(filters)))
 }
 
 #[cfg(test)]
diff --git a/datafusion/core/src/datasource/statistics.rs 
b/datafusion/core/src/datasource/statistics.rs
index 6f89657def..201bbfd5c0 100644
--- a/datafusion/core/src/datasource/statistics.rs
+++ b/datafusion/core/src/datasource/statistics.rs
@@ -18,16 +18,21 @@
 use std::mem;
 use std::sync::Arc;
 
-use arrow_schema::DataType;
 use futures::{Stream, StreamExt};
 
 use datafusion_common::stats::Precision;
 use datafusion_common::ScalarValue;
 
-use crate::arrow::datatypes::{Schema, SchemaRef};
+use crate::arrow::datatypes::SchemaRef;
 use crate::error::Result;
-use crate::functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
-use crate::physical_plan::{Accumulator, ColumnStatistics, Statistics};
+use crate::physical_plan::{ColumnStatistics, Statistics};
+
+#[cfg(feature = "parquet")]
+use crate::{
+    arrow::datatypes::Schema,
+    functions_aggregate::min_max::{MaxAccumulator, MinAccumulator},
+    physical_plan::Accumulator,
+};
 
 use super::listing::PartitionedFile;
 
@@ -144,6 +149,8 @@ pub async fn get_statistics_with_limit(
     Ok((result_files, statistics))
 }
 
+// only adding this cfg b/c this is the only feature it's used with currently
+#[cfg(feature = "parquet")]
 pub(crate) fn create_max_min_accs(
     schema: &Schema,
 ) -> (Vec<Option<MaxAccumulator>>, Vec<Option<MinAccumulator>>) {
@@ -175,6 +182,8 @@ fn add_row_stats(
     }
 }
 
+// only adding this cfg b/c this is the only feature it's used with currently
+#[cfg(feature = "parquet")]
 pub(crate) fn get_col_stats(
     schema: &Schema,
     null_counts: Vec<Precision<usize>>,
@@ -205,8 +214,13 @@ pub(crate) fn get_col_stats(
 // (aka non Dictionary) output. We need to adjust the output data type to 
reflect this.
 // The reason min/max aggregate produces unpacked output because there is only 
one
 // min/max value per group; there is no needs to keep them Dictionary encode
-fn min_max_aggregate_data_type(input_type: &DataType) -> &DataType {
-    if let DataType::Dictionary(_, value_type) = input_type {
+//
+// only adding this cfg b/c this is the only feature it's used with currently
+#[cfg(feature = "parquet")]
+fn min_max_aggregate_data_type(
+    input_type: &arrow_schema::DataType,
+) -> &arrow_schema::DataType {
+    if let arrow_schema::DataType::Dictionary(_, value_type) = input_type {
         value_type.as_ref()
     } else {
         input_type
diff --git a/datafusion/core/src/execution/session_state_defaults.rs 
b/datafusion/core/src/execution/session_state_defaults.rs
index bc7e194cae..b5370efa0a 100644
--- a/datafusion/core/src/execution/session_state_defaults.rs
+++ b/datafusion/core/src/execution/session_state_defaults.rs
@@ -100,7 +100,9 @@ impl SessionStateDefaults {
 
     /// returns the list of default [`ScalarUDF']'s
     pub fn default_scalar_functions() -> Vec<Arc<ScalarUDF>> {
+        #[cfg_attr(not(feature = "nested_expressions"), allow(unused_mut))]
         let mut functions: Vec<Arc<ScalarUDF>> = 
functions::all_default_functions();
+
         #[cfg(feature = "nested_expressions")]
         functions.append(&mut 
functions_nested::all_default_nested_functions());
 
@@ -144,6 +146,7 @@ impl SessionStateDefaults {
     }
 
     /// registers all the builtin array functions
+    #[cfg_attr(not(feature = "nested_expressions"), allow(unused_variables))]
     pub fn register_array_functions(state: &mut SessionState) {
         // register crate of array expressions (if enabled)
         #[cfg(feature = "nested_expressions")]
diff --git a/datafusion/core/src/physical_optimizer/pruning.rs 
b/datafusion/core/src/physical_optimizer/pruning.rs
index a16abc607e..9bc2bb1d1d 100644
--- a/datafusion/core/src/physical_optimizer/pruning.rs
+++ b/datafusion/core/src/physical_optimizer/pruning.rs
@@ -615,6 +615,8 @@ impl PruningPredicate {
         is_always_true(&self.predicate_expr) && 
self.literal_guarantees.is_empty()
     }
 
+    // this is only used by `parquet` feature right now
+    #[allow(dead_code)]
     pub(crate) fn required_columns(&self) -> &RequiredColumns {
         &self.required_columns
     }
@@ -746,6 +748,8 @@ impl RequiredColumns {
     /// * `a > 5 OR a < 10` returns `Some(a)`
     /// * `a > 5 OR b < 10` returns `None`
     /// * `true` returns None
+    #[allow(dead_code)]
+    // this fn is only used by `parquet` feature right now, thus the 
`allow(dead_code)`
     pub(crate) fn single_column(&self) -> Option<&phys_expr::Column> {
         if self.columns.windows(2).all(|w| {
             // check if all columns are the same (ignoring statistics and 
field)
diff --git a/datafusion/physical-expr/src/expressions/binary.rs 
b/datafusion/physical-expr/src/expressions/binary.rs
index 2680a7930f..08c133d719 100644
--- a/datafusion/physical-expr/src/expressions/binary.rs
+++ b/datafusion/physical-expr/src/expressions/binary.rs
@@ -33,6 +33,7 @@ use arrow::compute::kernels::comparison::{
 use arrow::compute::kernels::concat_elements::concat_elements_utf8;
 use arrow::compute::{cast, ilike, like, nilike, nlike};
 use arrow::datatypes::*;
+use arrow_schema::ArrowError;
 use datafusion_common::cast::as_boolean_array;
 use datafusion_common::{internal_err, Result, ScalarValue};
 use datafusion_expr::interval_arithmetic::{apply_operator, Interval};
@@ -133,12 +134,15 @@ impl std::fmt::Display for BinaryExpr {
 }
 
 /// Invoke a boolean kernel on a pair of arrays
-macro_rules! boolean_op {
-    ($LEFT:expr, $RIGHT:expr, $OP:ident) => {{
-        let ll = as_boolean_array($LEFT).expect("boolean_op failed to downcast 
array");
-        let rr = as_boolean_array($RIGHT).expect("boolean_op failed to 
downcast array");
-        Ok(Arc::new($OP(&ll, &rr)?))
-    }};
+#[inline]
+fn boolean_op(
+    left: &dyn Array,
+    right: &dyn Array,
+    op: impl FnOnce(&BooleanArray, &BooleanArray) -> Result<BooleanArray, 
ArrowError>,
+) -> Result<Arc<(dyn Array + 'static)>, ArrowError> {
+    let ll = as_boolean_array(left).expect("boolean_op failed to downcast left 
array");
+    let rr = as_boolean_array(right).expect("boolean_op failed to downcast 
right array");
+    op(ll, rr).map(|t| Arc::new(t) as _)
 }
 
 macro_rules! binary_string_array_flag_op {
@@ -596,7 +600,7 @@ impl BinaryExpr {
             | NotLikeMatch | NotILikeMatch => unreachable!(),
             And => {
                 if left_data_type == &DataType::Boolean {
-                    boolean_op!(&left, &right, and_kleene)
+                    Ok(boolean_op(&left, &right, and_kleene)?)
                 } else {
                     internal_err!(
                         "Cannot evaluate binary expression {:?} with types 
{:?} and {:?}",
@@ -608,7 +612,7 @@ impl BinaryExpr {
             }
             Or => {
                 if left_data_type == &DataType::Boolean {
-                    boolean_op!(&left, &right, or_kleene)
+                    Ok(boolean_op(&left, &right, or_kleene)?)
                 } else {
                     internal_err!(
                         "Cannot evaluate binary expression {:?} with types 
{:?} and {:?}",
diff --git a/datafusion/physical-plan/src/execution_plan.rs 
b/datafusion/physical-plan/src/execution_plan.rs
index 53ae59f707..f584542faf 100644
--- a/datafusion/physical-plan/src/execution_plan.rs
+++ b/datafusion/physical-plan/src/execution_plan.rs
@@ -718,7 +718,7 @@ pub fn execute_stream(
     match plan.output_partitioning().partition_count() {
         0 => Ok(Box::pin(EmptyRecordBatchStream::new(plan.schema()))),
         1 => plan.execute(0, context),
-        _ => {
+        2.. => {
             // merge into a single partition
             let plan = CoalescePartitionsExec::new(Arc::clone(&plan));
             // CoalescePartitionsExec must produce a single partition


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to