alamb commented on code in PR #11196:
URL: https://github.com/apache/datafusion/pull/11196#discussion_r1663091279


##########
datafusion/physical-expr/src/equivalence/class.rs:
##########
@@ -28,6 +28,61 @@ use crate::{
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
 use datafusion_common::JoinType;
 
+#[derive(Debug, Clone)]
+/// A structure representing a constant expression in a physical execution 
plan.
+///
+/// The `ConstExpr` struct encapsulates an expression that is constant during 
the execution
+/// of a query.

Review Comment:
   ```suggestion
   /// of a query. For example if a predicate like `A = 5` applied earlier in 
the plan `A` would 
   /// be known constant
   ```



##########
datafusion/physical-expr/src/equivalence/properties.rs:
##########
@@ -173,6 +174,12 @@ impl EquivalenceProperties {
         self.oeq_class.clear();
     }
 
+    /// Removes constant expressions that may change across partitions.
+    /// This method should be used when different partitions are fused.

Review Comment:
   What does a "fused" partition mean? That is not a term I have run into 
previously



##########
datafusion/sqllogictest/test_files/joins.slt:
##########
@@ -3188,18 +3191,16 @@ logical_plan
 physical_plan
 01)SortPreservingMergeExec: [rn1@5 ASC NULLS LAST]
 02)--SortMergeJoin: join_type=Inner, on=[(a@1, a@1)]
-03)----SortExec: expr=[rn1@5 ASC NULLS LAST], preserve_partitioning=[true]
-04)------CoalesceBatchesExec: target_batch_size=2
-05)--------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2
-06)----------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-07)------------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, 
d@4 as d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED 
FOLLOWING@5 as rn1]
-08)--------------BoundedWindowAggExec: wdw=[ROW_NUMBER() ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "ROW_NUMBER() 
ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, 
nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: 
WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: 
Following(UInt64(NULL)), is_causal: false }], mode=[Sorted]
-09)----------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], 
has_header=true
-10)----SortExec: expr=[a@1 ASC], preserve_partitioning=[true]
-11)------CoalesceBatchesExec: target_batch_size=2
-12)--------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2
-13)----------RepartitionExec: partitioning=RoundRobinBatch(2), 
input_partitions=1
-14)------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, 
b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], 
has_header=true
+03)----CoalesceBatchesExec: target_batch_size=2

Review Comment:
   Are these results due to changing the setting
   
   ```sql
   set datafusion.optimizer.prefer_existing_sort = true;
   ```
   
   Or to the changes in the calculations for the sortedness calculations?
   
   The plan seems better to me (there is no sort)



##########
datafusion/core/src/physical_optimizer/sanity_checker.rs:
##########
@@ -0,0 +1,666 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+//! The [SanityCheckPlan] rule ensures that a given plan can
+//! accommodate its infinite sources, if there are any. It will reject
+//! non-runnable query plans that use pipeline-breaking operators on
+//! infinite input(s). In addition, it will check if all order and
+//! distribution requirements of a plan are satisfied by its children.
+
+use std::sync::Arc;
+
+use crate::error::Result;
+use crate::physical_optimizer::PhysicalOptimizerRule;
+use crate::physical_plan::ExecutionPlan;
+
+use datafusion_common::config::{ConfigOptions, OptimizerOptions};
+use datafusion_common::plan_err;
+use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
+use datafusion_physical_expr::intervals::utils::{check_support, 
is_datatype_supported};
+use datafusion_physical_plan::joins::SymmetricHashJoinExec;
+use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties};
+
+use itertools::izip;
+
+/// The SanityCheckPlan rule rejects the following query plans:
+/// 1. Invalid plans containing nodes whose order and/or distribution 
requirements
+///    are not satisfied by their children.
+/// 2. Plans that use pipeline-breaking operators on infinite input(s),
+///    it is impossible to execute such queries.

Review Comment:
   ```suggestion
   ///    it is impossible to execute such queries (they will never generate 
output nor finish)
   ```



##########
datafusion/physical-expr/src/equivalence/class.rs:
##########
@@ -28,6 +28,61 @@ use crate::{
 use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode};
 use datafusion_common::JoinType;
 
+#[derive(Debug, Clone)]
+/// A structure representing a constant expression in a physical execution 
plan.

Review Comment:
   ```suggestion
   /// A structure representing a expression known to be constant in a physical 
execution plan.
   ```



##########
datafusion/physical-expr/src/equivalence/properties.rs:
##########
@@ -781,24 +814,30 @@ impl EquivalenceProperties {
     /// # Returns
     ///
     /// Returns a `Vec<Arc<dyn PhysicalExpr>>` containing the projected 
constants.
-    fn projected_constants(
-        &self,
-        mapping: &ProjectionMapping,
-    ) -> Vec<Arc<dyn PhysicalExpr>> {
+    fn projected_constants(&self, mapping: &ProjectionMapping) -> 
Vec<ConstExpr> {
         // First, project existing constants. For example, assume that `a + b`
         // is known to be constant. If the projection were `a as a_new`, `b as 
b_new`,
         // then we would project constant `a + b` as `a_new + b_new`.
         let mut projected_constants = self
             .constants
             .iter()
-            .flat_map(|expr| self.eq_group.project_expr(mapping, expr))
+            .flat_map(|const_expr| {
+                self.eq_group

Review Comment:
   I wonder if you could add an API like `ConstExpr::map` that applied a 
function to the contained Expr.  That way this could look like
   
   ```rust
               .flat_map(|expr| expr.map(|expr| 
self.eq_group.project_expr(mapping, expr))))
   ```
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to