This is an automated email from the ASF dual-hosted git repository.

alamb pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 0c4e4a1795 Minor Fix for Logical and Physical Expr Conversions (#11142)
0c4e4a1795 is described below

commit 0c4e4a179558bd7f1425d96f286415c9aa9174ca
Author: Berkay Şahin <[email protected]>
AuthorDate: Thu Jun 27 21:25:36 2024 +0300

    Minor Fix for Logical and Physical Expr Conversions (#11142)
    
    * Minor
    
    * Update planner.rs
---
 .../src/datasource/physical_plan/parquet/mod.rs    | 14 +++---------
 .../datasource/physical_plan/parquet/row_filter.rs | 25 +++++++---------------
 .../datasource/physical_plan/parquet/row_groups.rs | 18 ++++++----------
 datafusion/core/src/physical_optimizer/pruning.rs  | 16 +++++---------
 .../physical-expr-common/src/aggregate/mod.rs      |  3 +++
 datafusion/physical-expr/src/planner.rs            | 22 ++++++++++++-------
 datafusion/physical-expr/src/utils/guarantee.rs    | 16 +++++---------
 7 files changed, 45 insertions(+), 69 deletions(-)

diff --git a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
index 959e50fac8..ea7faac08c 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/mod.rs
@@ -796,17 +796,15 @@ mod tests {
         ArrayRef, Date64Array, Int32Array, Int64Array, Int8Array, StringArray,
         StructArray,
     };
-
     use arrow::datatypes::{Field, Schema, SchemaBuilder};
     use arrow::record_batch::RecordBatch;
     use arrow_schema::Fields;
-    use datafusion_common::{assert_contains, FileType, GetExt, ScalarValue, 
ToDFSchema};
-    use datafusion_expr::execution_props::ExecutionProps;
+    use datafusion_common::{assert_contains, FileType, GetExt, ScalarValue};
     use datafusion_expr::{col, lit, when, Expr};
-    use datafusion_physical_expr::create_physical_expr;
+    use datafusion_physical_expr::planner::logical2physical;
+    use datafusion_physical_plan::ExecutionPlanProperties;
 
     use chrono::{TimeZone, Utc};
-    use datafusion_physical_plan::ExecutionPlanProperties;
     use futures::StreamExt;
     use object_store::local::LocalFileSystem;
     use object_store::path::Path;
@@ -2061,12 +2059,6 @@ mod tests {
         Ok(())
     }
 
-    fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr> 
{
-        let df_schema = schema.clone().to_dfschema().unwrap();
-        let execution_props = ExecutionProps::new();
-        create_physical_expr(expr, &df_schema, &execution_props).unwrap()
-    }
-
     #[tokio::test]
     async fn test_struct_filter_parquet() -> Result<()> {
         let tmp_dir = TempDir::new()?;
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
index 18c6c51d28..f9cce5f783 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_filter.rs
@@ -410,23 +410,20 @@ pub fn build_row_filter(
 
 #[cfg(test)]
 mod test {
-    use arrow::datatypes::Field;
-    use arrow_schema::TimeUnit::Nanosecond;
-    use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
-    use parquet::arrow::parquet_to_arrow_schema;
-    use parquet::file::reader::{FileReader, SerializedFileReader};
-    use rand::prelude::*;
-
+    use super::*;
     use crate::datasource::schema_adapter::DefaultSchemaAdapterFactory;
     use crate::datasource::schema_adapter::SchemaAdapterFactory;
 
-    use datafusion_common::ToDFSchema;
-    use datafusion_expr::execution_props::ExecutionProps;
+    use arrow::datatypes::Field;
+    use arrow_schema::TimeUnit::Nanosecond;
     use datafusion_expr::{cast, col, lit, Expr};
-    use datafusion_physical_expr::create_physical_expr;
+    use datafusion_physical_expr::planner::logical2physical;
     use datafusion_physical_plan::metrics::{Count, Time};
 
-    use super::*;
+    use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder;
+    use parquet::arrow::parquet_to_arrow_schema;
+    use parquet::file::reader::{FileReader, SerializedFileReader};
+    use rand::prelude::*;
 
     // We should ignore predicate that read non-primitive columns
     #[test]
@@ -590,10 +587,4 @@ mod test {
             assert_eq!(projection, remapped)
         }
     }
-
-    fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr> 
{
-        let df_schema = schema.clone().to_dfschema().unwrap();
-        let execution_props = ExecutionProps::new();
-        create_physical_expr(expr, &df_schema, &execution_props).unwrap()
-    }
 }
diff --git a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs 
b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
index da8b793a5c..9bc7980574 100644
--- a/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
+++ b/datafusion/core/src/datasource/physical_plan/parquet/row_groups.rs
@@ -404,15 +404,19 @@ impl<'a> PruningStatistics for 
RowGroupPruningStatistics<'a> {
 
 #[cfg(test)]
 mod tests {
+    use std::ops::Rem;
+    use std::sync::Arc;
+
     use super::*;
     use crate::datasource::physical_plan::parquet::reader::ParquetFileReader;
     use crate::physical_plan::metrics::ExecutionPlanMetricsSet;
+
     use arrow::datatypes::DataType::Decimal128;
     use arrow::datatypes::{DataType, Field};
-    use datafusion_common::{Result, ToDFSchema};
-    use datafusion_expr::execution_props::ExecutionProps;
+    use datafusion_common::Result;
     use datafusion_expr::{cast, col, lit, Expr};
-    use datafusion_physical_expr::{create_physical_expr, PhysicalExpr};
+    use datafusion_physical_expr::planner::logical2physical;
+
     use parquet::arrow::arrow_to_parquet_schema;
     use parquet::arrow::async_reader::ParquetObjectReader;
     use parquet::basic::LogicalType;
@@ -422,8 +426,6 @@ mod tests {
         basic::Type as PhysicalType, file::statistics::Statistics as 
ParquetStatistics,
         schema::types::SchemaDescPtr,
     };
-    use std::ops::Rem;
-    use std::sync::Arc;
 
     struct PrimitiveTypeField {
         name: &'static str,
@@ -1111,12 +1113,6 @@ mod tests {
         ParquetFileMetrics::new(0, "file.parquet", &metrics)
     }
 
-    fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr> 
{
-        let df_schema = schema.clone().to_dfschema().unwrap();
-        let execution_props = ExecutionProps::new();
-        create_physical_expr(expr, &df_schema, &execution_props).unwrap()
-    }
-
     #[tokio::test]
     async fn test_row_group_bloom_filter_pruning_predicate_simple_expr() {
         BloomFilterTest::new_data_index_bloom_encoding_stats()
diff --git a/datafusion/core/src/physical_optimizer/pruning.rs 
b/datafusion/core/src/physical_optimizer/pruning.rs
index 7051dd9978..e8f2f34abd 100644
--- a/datafusion/core/src/physical_optimizer/pruning.rs
+++ b/datafusion/core/src/physical_optimizer/pruning.rs
@@ -1555,22 +1555,22 @@ pub(crate) enum StatisticsType {
 
 #[cfg(test)]
 mod tests {
+    use std::collections::HashMap;
+    use std::ops::{Not, Rem};
+
     use super::*;
     use crate::assert_batches_eq;
     use crate::logical_expr::{col, lit};
+
     use arrow::array::Decimal128Array;
     use arrow::{
         array::{BinaryArray, Int32Array, Int64Array, StringArray},
         datatypes::TimeUnit,
     };
     use arrow_array::UInt64Array;
-    use datafusion_common::ToDFSchema;
-    use datafusion_expr::execution_props::ExecutionProps;
     use datafusion_expr::expr::InList;
     use datafusion_expr::{cast, is_null, try_cast, Expr};
-    use datafusion_physical_expr::create_physical_expr;
-    use std::collections::HashMap;
-    use std::ops::{Not, Rem};
+    use datafusion_physical_expr::planner::logical2physical;
 
     #[derive(Debug, Default)]
     /// Mock statistic provider for tests
@@ -3876,10 +3876,4 @@ mod tests {
         let expr = logical2physical(expr, schema);
         build_predicate_expression(&expr, schema, required_columns)
     }
-
-    fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr> 
{
-        let df_schema = schema.clone().to_dfschema().unwrap();
-        let execution_props = ExecutionProps::new();
-        create_physical_expr(expr, &df_schema, &execution_props).unwrap()
-    }
 }
diff --git a/datafusion/physical-expr-common/src/aggregate/mod.rs 
b/datafusion/physical-expr-common/src/aggregate/mod.rs
index 432267e045..336e28b4d2 100644
--- a/datafusion/physical-expr-common/src/aggregate/mod.rs
+++ b/datafusion/physical-expr-common/src/aggregate/mod.rs
@@ -211,6 +211,9 @@ pub trait AggregateExpr: Send + Sync + Debug + 
PartialEq<dyn Any> {
     /// Rewrites [`AggregateExpr`], with new expressions given. The argument 
should be consistent
     /// with the return value of the [`AggregateExpr::all_expressions`] method.
     /// Returns `Some(Arc<dyn AggregateExpr>)` if re-write is supported, 
otherwise returns `None`.
+    /// TODO: This method only rewrites the [`PhysicalExpr`]s and does not 
handle [`Expr`]s.
+    /// This can cause silent bugs and should be fixed in the future (possibly 
with physical-to-logical
+    /// conversions).
     fn with_new_expressions(
         &self,
         _args: Vec<Arc<dyn PhysicalExpr>>,
diff --git a/datafusion/physical-expr/src/planner.rs 
b/datafusion/physical-expr/src/planner.rs
index 29b9069c04..8fe99cdca5 100644
--- a/datafusion/physical-expr/src/planner.rs
+++ b/datafusion/physical-expr/src/planner.rs
@@ -17,10 +17,15 @@
 
 use std::sync::Arc;
 
-use arrow::datatypes::Schema;
+use crate::scalar_function;
+use crate::{
+    expressions::{self, binary, like, Column, Literal},
+    PhysicalExpr,
+};
 
+use arrow::datatypes::Schema;
 use datafusion_common::{
-    exec_err, not_impl_err, plan_err, DFSchema, Result, ScalarValue,
+    exec_err, not_impl_err, plan_err, DFSchema, Result, ScalarValue, 
ToDFSchema,
 };
 use datafusion_expr::execution_props::ExecutionProps;
 use datafusion_expr::expr::{Alias, Cast, InList, ScalarFunction};
@@ -28,12 +33,6 @@ use datafusion_expr::var_provider::is_system_variables;
 use datafusion_expr::var_provider::VarType;
 use datafusion_expr::{binary_expr, Between, BinaryExpr, Expr, Like, Operator, 
TryCast};
 
-use crate::scalar_function;
-use crate::{
-    expressions::{self, binary, like, Column, Literal},
-    PhysicalExpr,
-};
-
 /// [PhysicalExpr] evaluate DataFusion expressions such as `A + 1`, or `CAST(c1
 /// AS int)`.
 ///
@@ -358,6 +357,13 @@ where
         .collect::<Result<Vec<_>>>()
 }
 
+/// Convert a logical expression to a physical expression (without any 
simplification, etc)
+pub fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr> 
{
+    let df_schema = schema.clone().to_dfschema().unwrap();
+    let execution_props = ExecutionProps::new();
+    create_physical_expr(expr, &df_schema, &execution_props).unwrap()
+}
+
 #[cfg(test)]
 mod tests {
     use arrow_array::{ArrayRef, BooleanArray, RecordBatch, StringArray};
diff --git a/datafusion/physical-expr/src/utils/guarantee.rs 
b/datafusion/physical-expr/src/utils/guarantee.rs
index deaff54538..070034116f 100644
--- a/datafusion/physical-expr/src/utils/guarantee.rs
+++ b/datafusion/physical-expr/src/utils/guarantee.rs
@@ -419,15 +419,16 @@ impl<'a> ColOpLit<'a> {
 
 #[cfg(test)]
 mod test {
+    use std::sync::OnceLock;
+
     use super::*;
-    use crate::create_physical_expr;
+    use crate::planner::logical2physical;
+
     use arrow_schema::{DataType, Field, Schema, SchemaRef};
-    use datafusion_common::ToDFSchema;
-    use datafusion_expr::execution_props::ExecutionProps;
     use datafusion_expr::expr_fn::*;
     use datafusion_expr::{lit, Expr};
+
     use itertools::Itertools;
-    use std::sync::OnceLock;
 
     #[test]
     fn test_literal() {
@@ -867,13 +868,6 @@ mod test {
         LiteralGuarantee::try_new(column, Guarantee::NotIn, 
literals.iter()).unwrap()
     }
 
-    /// Convert a logical expression to a physical expression (without any 
simplification, etc)
-    fn logical2physical(expr: &Expr, schema: &Schema) -> Arc<dyn PhysicalExpr> 
{
-        let df_schema = schema.clone().to_dfschema().unwrap();
-        let execution_props = ExecutionProps::new();
-        create_physical_expr(expr, &df_schema, &execution_props).unwrap()
-    }
-
     // Schema for testing
     fn schema() -> SchemaRef {
         SCHEMA


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to