alamb commented on code in PR #11196: URL: https://github.com/apache/datafusion/pull/11196#discussion_r1663091279
########## datafusion/physical-expr/src/equivalence/class.rs: ########## @@ -28,6 +28,61 @@ use crate::{ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::JoinType; +#[derive(Debug, Clone)] +/// A structure representing a constant expression in a physical execution plan. +/// +/// The `ConstExpr` struct encapsulates an expression that is constant during the execution +/// of a query. Review Comment: ```suggestion /// of a query. For example if a predicate like `A = 5` applied earlier in the plan `A` would /// be known constant ``` ########## datafusion/physical-expr/src/equivalence/properties.rs: ########## @@ -173,6 +174,12 @@ impl EquivalenceProperties { self.oeq_class.clear(); } + /// Removes constant expressions that may change across partitions. + /// This method should be used when different partitions are fused. Review Comment: What does a "fused" partition mean? That is not a term I have run into previously ########## datafusion/sqllogictest/test_files/joins.slt: ########## @@ -3188,18 +3191,16 @@ logical_plan physical_plan 01)SortPreservingMergeExec: [rn1@5 ASC NULLS LAST] 02)--SortMergeJoin: join_type=Inner, on=[(a@1, a@1)] -03)----SortExec: expr=[rn1@5 ASC NULLS LAST], preserve_partitioning=[true] -04)------CoalesceBatchesExec: target_batch_size=2 -05)--------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2 -06)----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -07)------------ProjectionExec: expr=[a0@0 as a0, a@1 as a, b@2 as b, c@3 as c, d@4 as d, ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@5 as rn1] -08)--------------BoundedWindowAggExec: wdw=[ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "ROW_NUMBER() ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }], mode=[Sorted] -09)----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true -10)----SortExec: expr=[a@1 ASC], preserve_partitioning=[true] -11)------CoalesceBatchesExec: target_batch_size=2 -12)--------RepartitionExec: partitioning=Hash([a@1], 2), input_partitions=2 -13)----------RepartitionExec: partitioning=RoundRobinBatch(2), input_partitions=1 -14)------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a0, a, b, c, d], output_ordering=[a@1 ASC, b@2 ASC NULLS LAST, c@3 ASC NULLS LAST], has_header=true +03)----CoalesceBatchesExec: target_batch_size=2 Review Comment: Are these results due to changing the setting ```sql set datafusion.optimizer.prefer_existing_sort = true; ``` Or to the changes in the calculations for the sortedness calculations? The plan seems better to me (there is no sort) ########## datafusion/core/src/physical_optimizer/sanity_checker.rs: ########## @@ -0,0 +1,666 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +//http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! The [SanityCheckPlan] rule ensures that a given plan can +//! accommodate its infinite sources, if there are any. It will reject +//! non-runnable query plans that use pipeline-breaking operators on +//! infinite input(s). In addition, it will check if all order and +//! distribution requirements of a plan are satisfied by its children. + +use std::sync::Arc; + +use crate::error::Result; +use crate::physical_optimizer::PhysicalOptimizerRule; +use crate::physical_plan::ExecutionPlan; + +use datafusion_common::config::{ConfigOptions, OptimizerOptions}; +use datafusion_common::plan_err; +use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; +use datafusion_physical_expr::intervals::utils::{check_support, is_datatype_supported}; +use datafusion_physical_plan::joins::SymmetricHashJoinExec; +use datafusion_physical_plan::{get_plan_string, ExecutionPlanProperties}; + +use itertools::izip; + +/// The SanityCheckPlan rule rejects the following query plans: +/// 1. Invalid plans containing nodes whose order and/or distribution requirements +/// are not satisfied by their children. +/// 2. Plans that use pipeline-breaking operators on infinite input(s), +/// it is impossible to execute such queries. Review Comment: ```suggestion /// it is impossible to execute such queries (they will never generate output nor finish) ``` ########## datafusion/physical-expr/src/equivalence/class.rs: ########## @@ -28,6 +28,61 @@ use crate::{ use datafusion_common::tree_node::{Transformed, TransformedResult, TreeNode}; use datafusion_common::JoinType; +#[derive(Debug, Clone)] +/// A structure representing a constant expression in a physical execution plan. Review Comment: ```suggestion /// A structure representing a expression known to be constant in a physical execution plan. ``` ########## datafusion/physical-expr/src/equivalence/properties.rs: ########## @@ -781,24 +814,30 @@ impl EquivalenceProperties { /// # Returns /// /// Returns a `Vec<Arc<dyn PhysicalExpr>>` containing the projected constants. - fn projected_constants( - &self, - mapping: &ProjectionMapping, - ) -> Vec<Arc<dyn PhysicalExpr>> { + fn projected_constants(&self, mapping: &ProjectionMapping) -> Vec<ConstExpr> { // First, project existing constants. For example, assume that `a + b` // is known to be constant. If the projection were `a as a_new`, `b as b_new`, // then we would project constant `a + b` as `a_new + b_new`. let mut projected_constants = self .constants .iter() - .flat_map(|expr| self.eq_group.project_expr(mapping, expr)) + .flat_map(|const_expr| { + self.eq_group Review Comment: I wonder if you could add an API like `ConstExpr::map` that applied a function to the contained Expr. That way this could look like ```rust .flat_map(|expr| expr.map(|expr| self.eq_group.project_expr(mapping, expr)))) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org For additional commands, e-mail: github-h...@datafusion.apache.org