This is an automated email from the ASF dual-hosted git repository.

ozankabak pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/arrow-datafusion.git


The following commit(s) were added to refs/heads/main by this push:
     new 8f9d6e3496 Add PRIMARY KEY Aggregate support to dataframe API (#8356)
8f9d6e3496 is described below

commit 8f9d6e349627e7eaf818942aa039441bc2bd61a8
Author: Mustafa Akur <106137913+mustafasr...@users.noreply.github.com>
AuthorDate: Fri Dec 8 16:40:56 2023 +0300

    Add PRIMARY KEY Aggregate support to dataframe API (#8356)
    
    * Aggregate rewrite for dataframe API.
    
    * Simplifications
    
    * Minor changes
    
    * Minor changes
    
    * Add new test
    
    * Add new tests
    
    * Minor changes
    
    * Add rule, for aggregate simplification
    
    * Simplifications
    
    * Simplifications
    
    * Simplifications
    
    * Minor changes
    
    * Simplifications
    
    * Add new test condition
    
    * Tmp
    
    * Push requirement below aggregate
    
    * Add join and subqeury alias
    
    * Add cross join support
    
    * Minor changes
    
    * Add logical plan repartition support
    
    * Add union support
    
    * Add table scan
    
    * Add limit
    
    * Minor changes, buggy
    
    * Add new tests, fix existing bugs
    
    * change concat type array_concat
    
    * Resolve some of the bugs
    
    * Comment out a rule
    
    * All tests pass, when single distinct is closed
    
    * Fix aggregate bug
    
    * Change analyze and explain implementations
    
    * All tests pass
    
    * Resolve linter errors
    
    * Simplifications, remove unnecessary codes
    
    * Comment out tests
    
    * Remove pushdown projection
    
    * Pushdown empty projections
    
    * Fix failing tests
    
    * Simplifications
    
    * Update comments, simplifications
    
    * Remove eliminate projection rule, Add method for group expr len aggregate
    
    * Simplifications, subquery support
    
    * Update comments, add unnest support, simplifications
    
    * Remove eliminate projection pass
    
    * Change name
    
    * Minor changes
    
    * Minor changes
    
    * Add comments
    
    * Fix failing test
    
    * Minor simplifications
    
    * update
    
    * Minor
    
    * Remove ordering
    
    * Minor changes
    
    * add merge projections
    
    * Add comments, resolve linter errors
    
    * Minor changes
    
    * Minor changes
    
    * Minor changes
    
    * Minor changes
    
    * Minor changes
    
    * Minor changes
    
    * Minor changes
    
    * Minor changes
    
    * Review Part 1
    
    * Review Part 2
    
    * Fix quadratic search, Change trim_expr impl
    
    * Review Part 3
    
    * Address reviews
    
    * Minor changes
    
    * Review Part 4
    
    * Add case expr support
    
    * Review Part 5
    
    * Review Part 6
    
    * Finishing touch: Improve comments
    
    ---------
    
    Co-authored-by: berkaysynnada <berkay.sa...@synnada.ai>
    Co-authored-by: Mehmet Ozan Kabak <ozanka...@gmail.com>
---
 datafusion/common/src/dfschema.rs                  |  13 +-
 datafusion/common/src/functional_dependencies.rs   | 117 ++-
 datafusion/common/src/lib.rs                       |   5 +-
 datafusion/core/src/dataframe/mod.rs               | 346 +++++++-
 datafusion/core/tests/dataframe/mod.rs             |  85 ++
 datafusion/expr/src/logical_plan/builder.rs        |  33 +-
 datafusion/expr/src/logical_plan/plan.rs           |  56 +-
 datafusion/expr/src/type_coercion/binary.rs        |   2 +-
 datafusion/expr/src/utils.rs                       |  34 +-
 datafusion/optimizer/src/optimize_projections.rs   | 958 ++++++++++++---------
 datafusion/optimizer/src/optimizer.rs              |  20 +-
 .../optimizer/tests/optimizer_integration.rs       |  10 +-
 datafusion/sql/src/select.rs                       |  76 +-
 datafusion/sqllogictest/test_files/groupby.slt     | 205 ++++-
 14 files changed, 1349 insertions(+), 611 deletions(-)

diff --git a/datafusion/common/src/dfschema.rs 
b/datafusion/common/src/dfschema.rs
index 9819ae795b..e06f947ad5 100644
--- a/datafusion/common/src/dfschema.rs
+++ b/datafusion/common/src/dfschema.rs
@@ -199,9 +199,16 @@ impl DFSchema {
     pub fn with_functional_dependencies(
         mut self,
         functional_dependencies: FunctionalDependencies,
-    ) -> Self {
-        self.functional_dependencies = functional_dependencies;
-        self
+    ) -> Result<Self> {
+        if functional_dependencies.is_valid(self.fields.len()) {
+            self.functional_dependencies = functional_dependencies;
+            Ok(self)
+        } else {
+            _plan_err!(
+                "Invalid functional dependency: {:?}",
+                functional_dependencies
+            )
+        }
     }
 
     /// Create a new schema that contains the fields from this schema followed 
by the fields
diff --git a/datafusion/common/src/functional_dependencies.rs 
b/datafusion/common/src/functional_dependencies.rs
index 4587677e77..1cb1751d71 100644
--- a/datafusion/common/src/functional_dependencies.rs
+++ b/datafusion/common/src/functional_dependencies.rs
@@ -24,6 +24,7 @@ use std::ops::Deref;
 use std::vec::IntoIter;
 
 use crate::error::_plan_err;
+use crate::utils::{merge_and_order_indices, set_difference};
 use crate::{DFSchema, DFSchemaRef, DataFusionError, JoinType, Result};
 
 use sqlparser::ast::TableConstraint;
@@ -271,6 +272,29 @@ impl FunctionalDependencies {
         self.deps.extend(other.deps);
     }
 
+    /// Sanity checks if functional dependencies are valid. For example, if
+    /// there are 10 fields, we cannot receive any index further than 9.
+    pub fn is_valid(&self, n_field: usize) -> bool {
+        self.deps.iter().all(
+            |FunctionalDependence {
+                 source_indices,
+                 target_indices,
+                 ..
+             }| {
+                source_indices
+                    .iter()
+                    .max()
+                    .map(|&max_index| max_index < n_field)
+                    .unwrap_or(true)
+                    && target_indices
+                        .iter()
+                        .max()
+                        .map(|&max_index| max_index < n_field)
+                        .unwrap_or(true)
+            },
+        )
+    }
+
     /// Adds the `offset` value to `source_indices` and `target_indices` for
     /// each functional dependency.
     pub fn add_offset(&mut self, offset: usize) {
@@ -442,44 +466,56 @@ pub fn aggregate_functional_dependencies(
     } in &func_dependencies.deps
     {
         // Keep source indices in a `HashSet` to prevent duplicate entries:
-        let mut new_source_indices = HashSet::new();
+        let mut new_source_indices = vec![];
+        let mut new_source_field_names = vec![];
         let source_field_names = source_indices
             .iter()
             .map(|&idx| aggr_input_fields[idx].qualified_name())
             .collect::<Vec<_>>();
+
         for (idx, group_by_expr_name) in 
group_by_expr_names.iter().enumerate() {
             // When one of the input determinant expressions matches with
             // the GROUP BY expression, add the index of the GROUP BY
             // expression as a new determinant key:
             if source_field_names.contains(group_by_expr_name) {
-                new_source_indices.insert(idx);
+                new_source_indices.push(idx);
+                new_source_field_names.push(group_by_expr_name.clone());
             }
         }
+        let existing_target_indices =
+            get_target_functional_dependencies(aggr_input_schema, 
group_by_expr_names);
+        let new_target_indices = get_target_functional_dependencies(
+            aggr_input_schema,
+            &new_source_field_names,
+        );
+        let mode = if existing_target_indices == new_target_indices
+            && new_target_indices.is_some()
+        {
+            // If dependency covers all GROUP BY expressions, mode will be 
`Single`:
+            Dependency::Single
+        } else {
+            // Otherwise, existing mode is preserved:
+            *mode
+        };
         // All of the composite indices occur in the GROUP BY expression:
         if new_source_indices.len() == source_indices.len() {
             aggregate_func_dependencies.push(
                 FunctionalDependence::new(
-                    new_source_indices.into_iter().collect(),
+                    new_source_indices,
                     target_indices.clone(),
                     *nullable,
                 )
-                // input uniqueness stays the same when GROUP BY matches with 
input functional dependence determinants
-                .with_mode(*mode),
+                .with_mode(mode),
             );
         }
     }
+
     // If we have a single GROUP BY key, we can guarantee uniqueness after
     // aggregation:
     if group_by_expr_names.len() == 1 {
         // If `source_indices` contain 0, delete this functional dependency
         // as it will be added anyway with mode `Dependency::Single`:
-        if let Some(idx) = aggregate_func_dependencies
-            .iter()
-            .position(|item| item.source_indices.contains(&0))
-        {
-            // Delete the functional dependency that contains zeroth idx:
-            aggregate_func_dependencies.remove(idx);
-        }
+        aggregate_func_dependencies.retain(|item| 
!item.source_indices.contains(&0));
         // Add a new functional dependency associated with the whole table:
         aggregate_func_dependencies.push(
             // Use nullable property of the group by expression
@@ -527,8 +563,61 @@ pub fn get_target_functional_dependencies(
             combined_target_indices.extend(target_indices.iter());
         }
     }
-    (!combined_target_indices.is_empty())
-        .then_some(combined_target_indices.iter().cloned().collect::<Vec<_>>())
+    (!combined_target_indices.is_empty()).then_some({
+        let mut result = 
combined_target_indices.into_iter().collect::<Vec<_>>();
+        result.sort();
+        result
+    })
+}
+
+/// Returns indices for the minimal subset of GROUP BY expressions that are
+/// functionally equivalent to the original set of GROUP BY expressions.
+pub fn get_required_group_by_exprs_indices(
+    schema: &DFSchema,
+    group_by_expr_names: &[String],
+) -> Option<Vec<usize>> {
+    let dependencies = schema.functional_dependencies();
+    let field_names = schema
+        .fields()
+        .iter()
+        .map(|item| item.qualified_name())
+        .collect::<Vec<_>>();
+    let mut groupby_expr_indices = group_by_expr_names
+        .iter()
+        .map(|group_by_expr_name| {
+            field_names
+                .iter()
+                .position(|field_name| field_name == group_by_expr_name)
+        })
+        .collect::<Option<Vec<_>>>()?;
+
+    groupby_expr_indices.sort();
+    for FunctionalDependence {
+        source_indices,
+        target_indices,
+        ..
+    } in &dependencies.deps
+    {
+        if source_indices
+            .iter()
+            .all(|source_idx| groupby_expr_indices.contains(source_idx))
+        {
+            // If all source indices are among GROUP BY expression indices, we
+            // can remove target indices from GROUP BY expression indices and
+            // use source indices instead.
+            groupby_expr_indices = set_difference(&groupby_expr_indices, 
target_indices);
+            groupby_expr_indices =
+                merge_and_order_indices(groupby_expr_indices, source_indices);
+        }
+    }
+    groupby_expr_indices
+        .iter()
+        .map(|idx| {
+            group_by_expr_names
+                .iter()
+                .position(|name| &field_names[*idx] == name)
+        })
+        .collect()
 }
 
 /// Updates entries inside the `entries` vector with their corresponding
diff --git a/datafusion/common/src/lib.rs b/datafusion/common/src/lib.rs
index 6df89624fc..ed547782e4 100644
--- a/datafusion/common/src/lib.rs
+++ b/datafusion/common/src/lib.rs
@@ -56,8 +56,9 @@ pub use file_options::file_type::{
 };
 pub use file_options::FileTypeWriterOptions;
 pub use functional_dependencies::{
-    aggregate_functional_dependencies, get_target_functional_dependencies, 
Constraint,
-    Constraints, Dependency, FunctionalDependence, FunctionalDependencies,
+    aggregate_functional_dependencies, get_required_group_by_exprs_indices,
+    get_target_functional_dependencies, Constraint, Constraints, Dependency,
+    FunctionalDependence, FunctionalDependencies,
 };
 pub use join_type::{JoinConstraint, JoinSide, JoinType};
 pub use param_value::ParamValues;
diff --git a/datafusion/core/src/dataframe/mod.rs 
b/datafusion/core/src/dataframe/mod.rs
index 52b5157b73..c40dd522a4 100644
--- a/datafusion/core/src/dataframe/mod.rs
+++ b/datafusion/core/src/dataframe/mod.rs
@@ -23,44 +23,43 @@ mod parquet;
 use std::any::Any;
 use std::sync::Arc;
 
+use crate::arrow::datatypes::{Schema, SchemaRef};
+use crate::arrow::record_batch::RecordBatch;
+use crate::arrow::util::pretty;
+use crate::datasource::{provider_as_source, MemTable, TableProvider};
+use crate::error::Result;
+use crate::execution::{
+    context::{SessionState, TaskContext},
+    FunctionRegistry,
+};
+use crate::logical_expr::utils::find_window_exprs;
+use crate::logical_expr::{
+    col, Expr, JoinType, LogicalPlan, LogicalPlanBuilder, Partitioning, 
TableType,
+};
+use crate::physical_plan::{
+    collect, collect_partitioned, execute_stream, execute_stream_partitioned,
+    ExecutionPlan, SendableRecordBatchStream,
+};
+use crate::prelude::SessionContext;
+
 use arrow::array::{Array, ArrayRef, Int64Array, StringArray};
 use arrow::compute::{cast, concat};
 use arrow::csv::WriterBuilder;
 use arrow::datatypes::{DataType, Field};
-use async_trait::async_trait;
 use datafusion_common::file_options::csv_writer::CsvWriterOptions;
 use datafusion_common::file_options::json_writer::JsonWriterOptions;
 use datafusion_common::parsers::CompressionTypeVariant;
 use datafusion_common::{
-    DataFusionError, FileType, FileTypeWriterOptions, ParamValues, SchemaError,
-    UnnestOptions,
+    Column, DFSchema, DataFusionError, FileType, FileTypeWriterOptions, 
ParamValues,
+    SchemaError, UnnestOptions,
 };
 use datafusion_expr::dml::CopyOptions;
-
-use datafusion_common::{Column, DFSchema};
 use datafusion_expr::{
     avg, count, is_null, max, median, min, stddev, utils::COUNT_STAR_EXPANSION,
     TableProviderFilterPushDown, UNNAMED_TABLE,
 };
 
-use crate::arrow::datatypes::Schema;
-use crate::arrow::datatypes::SchemaRef;
-use crate::arrow::record_batch::RecordBatch;
-use crate::arrow::util::pretty;
-use crate::datasource::{provider_as_source, MemTable, TableProvider};
-use crate::error::Result;
-use crate::execution::{
-    context::{SessionState, TaskContext},
-    FunctionRegistry,
-};
-use crate::logical_expr::{
-    col, utils::find_window_exprs, Expr, JoinType, LogicalPlan, 
LogicalPlanBuilder,
-    Partitioning, TableType,
-};
-use crate::physical_plan::SendableRecordBatchStream;
-use crate::physical_plan::{collect, collect_partitioned};
-use crate::physical_plan::{execute_stream, execute_stream_partitioned, 
ExecutionPlan};
-use crate::prelude::SessionContext;
+use async_trait::async_trait;
 
 /// Contains options that control how data is
 /// written out from a DataFrame
@@ -1343,24 +1342,43 @@ impl TableProvider for DataFrameTableProvider {
 mod tests {
     use std::vec;
 
-    use arrow::array::Int32Array;
-    use arrow::datatypes::DataType;
+    use super::*;
+    use crate::execution::context::SessionConfig;
+    use crate::physical_plan::{ColumnarValue, Partitioning, PhysicalExpr};
+    use crate::test_util::{register_aggregate_csv, test_table, 
test_table_with_name};
+    use crate::{assert_batches_sorted_eq, execution::context::SessionContext};
 
+    use arrow::array::{self, Int32Array};
+    use arrow::datatypes::DataType;
+    use datafusion_common::{Constraint, Constraints, ScalarValue};
     use datafusion_expr::{
         avg, cast, count, count_distinct, create_udf, expr, lit, max, min, sum,
-        BuiltInWindowFunction, ScalarFunctionImplementation, Volatility, 
WindowFrame,
-        WindowFunction,
+        BinaryExpr, BuiltInWindowFunction, Operator, 
ScalarFunctionImplementation,
+        Volatility, WindowFrame, WindowFunction,
     };
     use datafusion_physical_expr::expressions::Column;
-
-    use crate::execution::context::SessionConfig;
-    use crate::physical_plan::ColumnarValue;
-    use crate::physical_plan::Partitioning;
-    use crate::physical_plan::PhysicalExpr;
-    use crate::test_util::{register_aggregate_csv, test_table, 
test_table_with_name};
-    use crate::{assert_batches_sorted_eq, execution::context::SessionContext};
-
-    use super::*;
+    use datafusion_physical_plan::get_plan_string;
+
+    pub fn table_with_constraints() -> Arc<dyn TableProvider> {
+        let dual_schema = Arc::new(Schema::new(vec![
+            Field::new("id", DataType::Int32, false),
+            Field::new("name", DataType::Utf8, false),
+        ]));
+        let batch = RecordBatch::try_new(
+            dual_schema.clone(),
+            vec![
+                Arc::new(array::Int32Array::from(vec![1])),
+                Arc::new(array::StringArray::from(vec!["a"])),
+            ],
+        )
+        .unwrap();
+        let provider = MemTable::try_new(dual_schema, vec![vec![batch]])
+            .unwrap()
+            
.with_constraints(Constraints::new_unverified(vec![Constraint::PrimaryKey(
+                vec![0],
+            )]));
+        Arc::new(provider)
+    }
 
     async fn assert_logical_expr_schema_eq_physical_expr_schema(
         df: DataFrame,
@@ -1557,6 +1575,262 @@ mod tests {
         Ok(())
     }
 
+    #[tokio::test]
+    async fn test_aggregate_with_pk() -> Result<()> {
+        // create the dataframe
+        let config = SessionConfig::new().with_target_partitions(1);
+        let ctx = SessionContext::new_with_config(config);
+
+        let table1 = table_with_constraints();
+        let df = ctx.read_table(table1)?;
+        let col_id = Expr::Column(datafusion_common::Column {
+            relation: None,
+            name: "id".to_string(),
+        });
+        let col_name = Expr::Column(datafusion_common::Column {
+            relation: None,
+            name: "name".to_string(),
+        });
+
+        // group by contains id column
+        let group_expr = vec![col_id.clone()];
+        let aggr_expr = vec![];
+        let df = df.aggregate(group_expr, aggr_expr)?;
+
+        // expr list contains id, name
+        let expr_list = vec![col_id, col_name];
+        let df = df.select(expr_list)?;
+        let physical_plan = df.clone().create_physical_plan().await?;
+        let expected = vec![
+            "AggregateExec: mode=Single, gby=[id@0 as id, name@1 as name], 
aggr=[]",
+            "  MemoryExec: partitions=1, partition_sizes=[1]",
+        ];
+        // Get string representation of the plan
+        let actual = get_plan_string(&physical_plan);
+        assert_eq!(
+            expected, actual,
+            "\n**Optimized Plan 
Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+        );
+        // Since id and name are functionally dependant, we can use name among 
expression
+        // even if it is not part of the group by expression.
+        let df_results = collect(physical_plan, ctx.task_ctx()).await?;
+
+        #[rustfmt::skip]
+        assert_batches_sorted_eq!(
+            ["+----+------+",
+             "| id | name |",
+             "+----+------+",
+             "| 1  | a    |",
+             "+----+------+",],
+            &df_results
+        );
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_aggregate_with_pk2() -> Result<()> {
+        // create the dataframe
+        let config = SessionConfig::new().with_target_partitions(1);
+        let ctx = SessionContext::new_with_config(config);
+
+        let table1 = table_with_constraints();
+        let df = ctx.read_table(table1)?;
+        let col_id = Expr::Column(datafusion_common::Column {
+            relation: None,
+            name: "id".to_string(),
+        });
+        let col_name = Expr::Column(datafusion_common::Column {
+            relation: None,
+            name: "name".to_string(),
+        });
+
+        // group by contains id column
+        let group_expr = vec![col_id.clone()];
+        let aggr_expr = vec![];
+        let df = df.aggregate(group_expr, aggr_expr)?;
+
+        let condition1 = Expr::BinaryExpr(BinaryExpr::new(
+            Box::new(col_id.clone()),
+            Operator::Eq,
+            Box::new(Expr::Literal(ScalarValue::Int32(Some(1)))),
+        ));
+        let condition2 = Expr::BinaryExpr(BinaryExpr::new(
+            Box::new(col_name),
+            Operator::Eq,
+            Box::new(Expr::Literal(ScalarValue::Utf8(Some("a".to_string())))),
+        ));
+        // Predicate refers to id, and name fields
+        let predicate = Expr::BinaryExpr(BinaryExpr::new(
+            Box::new(condition1),
+            Operator::And,
+            Box::new(condition2),
+        ));
+        let df = df.filter(predicate)?;
+        let physical_plan = df.clone().create_physical_plan().await?;
+
+        let expected = vec![
+            "CoalesceBatchesExec: target_batch_size=8192",
+            "  FilterExec: id@0 = 1 AND name@1 = a",
+            "    AggregateExec: mode=Single, gby=[id@0 as id, name@1 as name], 
aggr=[]",
+            "      MemoryExec: partitions=1, partition_sizes=[1]",
+        ];
+        // Get string representation of the plan
+        let actual = get_plan_string(&physical_plan);
+        assert_eq!(
+            expected, actual,
+            "\n**Optimized Plan 
Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+        );
+
+        // Since id and name are functionally dependant, we can use name among 
expression
+        // even if it is not part of the group by expression.
+        let df_results = collect(physical_plan, ctx.task_ctx()).await?;
+
+        #[rustfmt::skip]
+        assert_batches_sorted_eq!(
+            ["+----+------+",
+             "| id | name |",
+             "+----+------+",
+             "| 1  | a    |",
+             "+----+------+",],
+            &df_results
+        );
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_aggregate_with_pk3() -> Result<()> {
+        // create the dataframe
+        let config = SessionConfig::new().with_target_partitions(1);
+        let ctx = SessionContext::new_with_config(config);
+
+        let table1 = table_with_constraints();
+        let df = ctx.read_table(table1)?;
+        let col_id = Expr::Column(datafusion_common::Column {
+            relation: None,
+            name: "id".to_string(),
+        });
+        let col_name = Expr::Column(datafusion_common::Column {
+            relation: None,
+            name: "name".to_string(),
+        });
+
+        // group by contains id column
+        let group_expr = vec![col_id.clone()];
+        let aggr_expr = vec![];
+        // group by id,
+        let df = df.aggregate(group_expr, aggr_expr)?;
+
+        let condition1 = Expr::BinaryExpr(BinaryExpr::new(
+            Box::new(col_id.clone()),
+            Operator::Eq,
+            Box::new(Expr::Literal(ScalarValue::Int32(Some(1)))),
+        ));
+        // Predicate refers to id field
+        let predicate = condition1;
+        // id=0
+        let df = df.filter(predicate)?;
+        // Select expression refers to id, and name columns.
+        // id, name
+        let df = df.select(vec![col_id.clone(), col_name.clone()])?;
+        let physical_plan = df.clone().create_physical_plan().await?;
+
+        let expected = vec![
+            "CoalesceBatchesExec: target_batch_size=8192",
+            "  FilterExec: id@0 = 1",
+            "    AggregateExec: mode=Single, gby=[id@0 as id, name@1 as name], 
aggr=[]",
+            "      MemoryExec: partitions=1, partition_sizes=[1]",
+        ];
+        // Get string representation of the plan
+        let actual = get_plan_string(&physical_plan);
+        assert_eq!(
+            expected, actual,
+            "\n**Optimized Plan 
Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+        );
+
+        // Since id and name are functionally dependant, we can use name among 
expression
+        // even if it is not part of the group by expression.
+        let df_results = collect(physical_plan, ctx.task_ctx()).await?;
+
+        #[rustfmt::skip]
+        assert_batches_sorted_eq!(
+            ["+----+------+",
+             "| id | name |",
+             "+----+------+",
+             "| 1  | a    |",
+             "+----+------+",],
+            &df_results
+        );
+
+        Ok(())
+    }
+
+    #[tokio::test]
+    async fn test_aggregate_with_pk4() -> Result<()> {
+        // create the dataframe
+        let config = SessionConfig::new().with_target_partitions(1);
+        let ctx = SessionContext::new_with_config(config);
+
+        let table1 = table_with_constraints();
+        let df = ctx.read_table(table1)?;
+        let col_id = Expr::Column(datafusion_common::Column {
+            relation: None,
+            name: "id".to_string(),
+        });
+
+        // group by contains id column
+        let group_expr = vec![col_id.clone()];
+        let aggr_expr = vec![];
+        // group by id,
+        let df = df.aggregate(group_expr, aggr_expr)?;
+
+        let condition1 = Expr::BinaryExpr(BinaryExpr::new(
+            Box::new(col_id.clone()),
+            Operator::Eq,
+            Box::new(Expr::Literal(ScalarValue::Int32(Some(1)))),
+        ));
+        // Predicate refers to id field
+        let predicate = condition1;
+        // id=1
+        let df = df.filter(predicate)?;
+        // Select expression refers to id column.
+        // id
+        let df = df.select(vec![col_id.clone()])?;
+        let physical_plan = df.clone().create_physical_plan().await?;
+
+        // In this case aggregate shouldn't be expanded, since these
+        // columns are not used.
+        let expected = vec![
+            "CoalesceBatchesExec: target_batch_size=8192",
+            "  FilterExec: id@0 = 1",
+            "    AggregateExec: mode=Single, gby=[id@0 as id], aggr=[]",
+            "      MemoryExec: partitions=1, partition_sizes=[1]",
+        ];
+        // Get string representation of the plan
+        let actual = get_plan_string(&physical_plan);
+        assert_eq!(
+            expected, actual,
+            "\n**Optimized Plan 
Mismatch\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+        );
+
+        // Since id and name are functionally dependant, we can use name among 
expression
+        // even if it is not part of the group by expression.
+        let df_results = collect(physical_plan, ctx.task_ctx()).await?;
+
+        #[rustfmt::skip]
+        assert_batches_sorted_eq!(
+            [    "+----+",
+                "| id |",
+                "+----+",
+                "| 1  |",
+                "+----+",],
+            &df_results
+        );
+
+        Ok(())
+    }
+
     #[tokio::test]
     async fn test_distinct() -> Result<()> {
         let t = test_table().await?;
diff --git a/datafusion/core/tests/dataframe/mod.rs 
b/datafusion/core/tests/dataframe/mod.rs
index 10f4574020..c6b8e0e01b 100644
--- a/datafusion/core/tests/dataframe/mod.rs
+++ b/datafusion/core/tests/dataframe/mod.rs
@@ -1323,6 +1323,91 @@ async fn unnest_array_agg() -> Result<()> {
     Ok(())
 }
 
+#[tokio::test]
+async fn unnest_with_redundant_columns() -> Result<()> {
+    let mut shape_id_builder = UInt32Builder::new();
+    let mut tag_id_builder = UInt32Builder::new();
+
+    for shape_id in 1..=3 {
+        for tag_id in 1..=3 {
+            shape_id_builder.append_value(shape_id as u32);
+            tag_id_builder.append_value((shape_id * 10 + tag_id) as u32);
+        }
+    }
+
+    let batch = RecordBatch::try_from_iter(vec![
+        ("shape_id", Arc::new(shape_id_builder.finish()) as ArrayRef),
+        ("tag_id", Arc::new(tag_id_builder.finish()) as ArrayRef),
+    ])?;
+
+    let ctx = SessionContext::new();
+    ctx.register_batch("shapes", batch)?;
+    let df = ctx.table("shapes").await?;
+
+    let results = df.clone().collect().await?;
+    let expected = vec![
+        "+----------+--------+",
+        "| shape_id | tag_id |",
+        "+----------+--------+",
+        "| 1        | 11     |",
+        "| 1        | 12     |",
+        "| 1        | 13     |",
+        "| 2        | 21     |",
+        "| 2        | 22     |",
+        "| 2        | 23     |",
+        "| 3        | 31     |",
+        "| 3        | 32     |",
+        "| 3        | 33     |",
+        "+----------+--------+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+
+    // Doing an `array_agg` by `shape_id` produces:
+    let df = df
+        .clone()
+        .aggregate(
+            vec![col("shape_id")],
+            vec![array_agg(col("shape_id")).alias("shape_id2")],
+        )?
+        .unnest_column("shape_id2")?
+        .select(vec![col("shape_id")])?;
+
+    let optimized_plan = df.clone().into_optimized_plan()?;
+    let expected = vec![
+        "Projection: shapes.shape_id [shape_id:UInt32]",
+        "  Unnest: shape_id2 [shape_id:UInt32, shape_id2:UInt32;N]",
+        "    Aggregate: groupBy=[[shapes.shape_id]], 
aggr=[[ARRAY_AGG(shapes.shape_id) AS shape_id2]] [shape_id:UInt32, 
shape_id2:List(Field { name: \"item\", data_type: UInt32, nullable: true, 
dict_id: 0, dict_is_ordered: false, metadata: {} });N]",
+        "      TableScan: shapes projection=[shape_id] [shape_id:UInt32]",
+    ];
+
+    let formatted = optimized_plan.display_indent_schema().to_string();
+    let actual: Vec<&str> = formatted.trim().lines().collect();
+    assert_eq!(
+        expected, actual,
+        "\n\nexpected:\n\n{expected:#?}\nactual:\n\n{actual:#?}\n\n"
+    );
+
+    let results = df.collect().await?;
+    let expected = [
+        "+----------+",
+        "| shape_id |",
+        "+----------+",
+        "| 1        |",
+        "| 1        |",
+        "| 1        |",
+        "| 2        |",
+        "| 2        |",
+        "| 2        |",
+        "| 3        |",
+        "| 3        |",
+        "| 3        |",
+        "+----------+",
+    ];
+    assert_batches_sorted_eq!(expected, &results);
+
+    Ok(())
+}
+
 async fn create_test_table(name: &str) -> Result<DataFrame> {
     let schema = Arc::new(Schema::new(vec![
         Field::new("a", DataType::Utf8, false),
diff --git a/datafusion/expr/src/logical_plan/builder.rs 
b/datafusion/expr/src/logical_plan/builder.rs
index c4ff9fe954..be2c45b901 100644
--- a/datafusion/expr/src/logical_plan/builder.rs
+++ b/datafusion/expr/src/logical_plan/builder.rs
@@ -50,9 +50,9 @@ use crate::{
 use arrow::datatypes::{DataType, Schema, SchemaRef};
 use datafusion_common::display::ToStringifiedPlan;
 use datafusion_common::{
-    plan_datafusion_err, plan_err, Column, DFField, DFSchema, DFSchemaRef,
-    DataFusionError, FileType, OwnedTableReference, Result, ScalarValue, 
TableReference,
-    ToDFSchema, UnnestOptions,
+    get_target_functional_dependencies, plan_datafusion_err, plan_err, Column, 
DFField,
+    DFSchema, DFSchemaRef, DataFusionError, FileType, OwnedTableReference, 
Result,
+    ScalarValue, TableReference, ToDFSchema, UnnestOptions,
 };
 
 /// Default table name for unnamed table
@@ -904,8 +904,27 @@ impl LogicalPlanBuilder {
         group_expr: impl IntoIterator<Item = impl Into<Expr>>,
         aggr_expr: impl IntoIterator<Item = impl Into<Expr>>,
     ) -> Result<Self> {
-        let group_expr = normalize_cols(group_expr, &self.plan)?;
+        let mut group_expr = normalize_cols(group_expr, &self.plan)?;
         let aggr_expr = normalize_cols(aggr_expr, &self.plan)?;
+
+        // Rewrite groupby exprs according to functional dependencies
+        let group_by_expr_names = group_expr
+            .iter()
+            .map(|group_by_expr| group_by_expr.display_name())
+            .collect::<Result<Vec<_>>>()?;
+        let schema = self.plan.schema();
+        if let Some(target_indices) =
+            get_target_functional_dependencies(schema, &group_by_expr_names)
+        {
+            for idx in target_indices {
+                let field = schema.field(idx);
+                let expr =
+                    Expr::Column(Column::new(field.qualifier().cloned(), 
field.name()));
+                if !group_expr.contains(&expr) {
+                    group_expr.push(expr);
+                }
+            }
+        }
         Aggregate::try_new(Arc::new(self.plan), group_expr, aggr_expr)
             .map(LogicalPlan::Aggregate)
             .map(Self::from)
@@ -1166,8 +1185,8 @@ pub fn build_join_schema(
     );
     let mut metadata = left.metadata().clone();
     metadata.extend(right.metadata().clone());
-    DFSchema::new_with_metadata(fields, metadata)
-        .map(|schema| schema.with_functional_dependencies(func_dependencies))
+    let schema = DFSchema::new_with_metadata(fields, metadata)?;
+    schema.with_functional_dependencies(func_dependencies)
 }
 
 /// Errors if one or more expressions have equal names.
@@ -1491,7 +1510,7 @@ pub fn unnest_with_options(
     let df_schema = DFSchema::new_with_metadata(fields, metadata)?;
     // We can use the existing functional dependencies:
     let deps = input_schema.functional_dependencies().clone();
-    let schema = Arc::new(df_schema.with_functional_dependencies(deps));
+    let schema = Arc::new(df_schema.with_functional_dependencies(deps)?);
 
     Ok(LogicalPlan::Unnest(Unnest {
         input: Arc::new(input),
diff --git a/datafusion/expr/src/logical_plan/plan.rs 
b/datafusion/expr/src/logical_plan/plan.rs
index d85e0b5b0a..dfd4fbf65d 100644
--- a/datafusion/expr/src/logical_plan/plan.rs
+++ b/datafusion/expr/src/logical_plan/plan.rs
@@ -946,7 +946,7 @@ impl LogicalPlan {
                     // We can use the existing functional dependencies as is:
                     .with_functional_dependencies(
                         input.schema().functional_dependencies().clone(),
-                    ),
+                    )?,
                 );
 
                 Ok(LogicalPlan::Unnest(Unnest {
@@ -1834,8 +1834,9 @@ pub fn projection_schema(input: &LogicalPlan, exprs: 
&[Expr]) -> Result<Arc<DFSc
         exprlist_to_fields(exprs, input)?,
         input.schema().metadata().clone(),
     )?;
-    schema = schema
-        
.with_functional_dependencies(calc_func_dependencies_for_project(exprs, 
input)?);
+    schema = 
schema.with_functional_dependencies(calc_func_dependencies_for_project(
+        exprs, input,
+    )?)?;
     Ok(Arc::new(schema))
 }
 
@@ -1864,7 +1865,7 @@ impl SubqueryAlias {
         let func_dependencies = 
plan.schema().functional_dependencies().clone();
         let schema = DFSchemaRef::new(
             DFSchema::try_from_qualified_schema(&alias, &schema)?
-                .with_functional_dependencies(func_dependencies),
+                .with_functional_dependencies(func_dependencies)?,
         );
         Ok(SubqueryAlias {
             input: Arc::new(plan),
@@ -2017,7 +2018,7 @@ impl Window {
             window_expr,
             schema: Arc::new(
                 DFSchema::new_with_metadata(window_fields, metadata)?
-                    .with_functional_dependencies(window_func_dependencies),
+                    .with_functional_dependencies(window_func_dependencies)?,
             ),
         })
     }
@@ -2087,7 +2088,7 @@ impl TableScan {
             .map(|p| {
                 let projected_func_dependencies =
                     func_dependencies.project_functional_dependencies(p, 
p.len());
-                DFSchema::new_with_metadata(
+                let df_schema = DFSchema::new_with_metadata(
                     p.iter()
                         .map(|i| {
                             DFField::from_qualified(
@@ -2097,15 +2098,13 @@ impl TableScan {
                         })
                         .collect(),
                     schema.metadata().clone(),
-                )
-                .map(|df_schema| {
-                    
df_schema.with_functional_dependencies(projected_func_dependencies)
-                })
+                )?;
+                
df_schema.with_functional_dependencies(projected_func_dependencies)
             })
             .unwrap_or_else(|| {
-                DFSchema::try_from_qualified_schema(table_name.clone(), 
&schema).map(
-                    |df_schema| 
df_schema.with_functional_dependencies(func_dependencies),
-                )
+                let df_schema =
+                    DFSchema::try_from_qualified_schema(table_name.clone(), 
&schema)?;
+                df_schema.with_functional_dependencies(func_dependencies)
             })?;
         let projected_schema = Arc::new(projected_schema);
         Ok(Self {
@@ -2417,7 +2416,7 @@ impl Aggregate {
             calc_func_dependencies_for_aggregate(&group_expr, &input, 
&schema)?;
         let new_schema = schema.as_ref().clone();
         let schema = Arc::new(
-            
new_schema.with_functional_dependencies(aggregate_func_dependencies),
+            
new_schema.with_functional_dependencies(aggregate_func_dependencies)?,
         );
         Ok(Self {
             input,
@@ -2627,17 +2626,19 @@ pub struct Unnest {
 
 #[cfg(test)]
 mod tests {
+    use std::collections::HashMap;
+    use std::sync::Arc;
+
     use super::*;
     use crate::builder::LogicalTableSource;
     use crate::logical_plan::table_scan;
     use crate::{col, count, exists, in_subquery, lit, placeholder, 
GroupingSet};
+
     use arrow::datatypes::{DataType, Field, Schema};
     use datafusion_common::tree_node::TreeNodeVisitor;
     use datafusion_common::{
         not_impl_err, Constraint, DFSchema, ScalarValue, TableReference,
     };
-    use std::collections::HashMap;
-    use std::sync::Arc;
 
     fn employee_schema() -> Schema {
         Schema::new(vec![
@@ -3164,15 +3165,20 @@ digraph {
         )
         .unwrap();
         assert!(!filter.is_scalar());
-        let unique_schema =
-            Arc::new(schema.as_ref().clone().with_functional_dependencies(
-                FunctionalDependencies::new_from_constraints(
-                    Some(&Constraints::new_unverified(vec![Constraint::Unique(
-                        vec![0],
-                    )])),
-                    1,
-                ),
-            ));
+        let unique_schema = Arc::new(
+            schema
+                .as_ref()
+                .clone()
+                .with_functional_dependencies(
+                    FunctionalDependencies::new_from_constraints(
+                        
Some(&Constraints::new_unverified(vec![Constraint::Unique(
+                            vec![0],
+                        )])),
+                        1,
+                    ),
+                )
+                .unwrap(),
+        );
         let scan = Arc::new(LogicalPlan::TableScan(TableScan {
             table_name: TableReference::bare("tab"),
             source,
diff --git a/datafusion/expr/src/type_coercion/binary.rs 
b/datafusion/expr/src/type_coercion/binary.rs
index 1027e97d06..dd94491987 100644
--- a/datafusion/expr/src/type_coercion/binary.rs
+++ b/datafusion/expr/src/type_coercion/binary.rs
@@ -116,7 +116,7 @@ fn signature(lhs: &DataType, op: &Operator, rhs: &DataType) 
-> Result<Signature>
             })
         }
         AtArrow | ArrowAt => {
-            // ArrowAt and AtArrow check for whether one array ic contained in 
another.
+            // ArrowAt and AtArrow check for whether one array is contained in 
another.
             // The result type is boolean. Signature::comparison defines this 
signature.
             // Operation has nothing to do with comparison
             array_coercion(lhs, rhs).map(Signature::comparison).ok_or_else(|| {
diff --git a/datafusion/expr/src/utils.rs b/datafusion/expr/src/utils.rs
index c30c734fcf..abdd7f5f57 100644
--- a/datafusion/expr/src/utils.rs
+++ b/datafusion/expr/src/utils.rs
@@ -17,6 +17,10 @@
 
 //! Expression utilities
 
+use std::cmp::Ordering;
+use std::collections::HashSet;
+use std::sync::Arc;
+
 use crate::expr::{Alias, Sort, WindowFunction};
 use crate::expr_rewriter::strip_outer_reference;
 use crate::logical_plan::Aggregate;
@@ -25,16 +29,15 @@ use crate::{
     and, BinaryExpr, Cast, Expr, ExprSchemable, Filter, GroupingSet, 
LogicalPlan,
     Operator, TryCast,
 };
+
 use arrow::datatypes::{DataType, TimeUnit};
 use datafusion_common::tree_node::{TreeNode, VisitRecursion};
 use datafusion_common::{
     internal_err, plan_datafusion_err, plan_err, Column, DFField, DFSchema, 
DFSchemaRef,
     DataFusionError, Result, ScalarValue, TableReference,
 };
+
 use sqlparser::ast::{ExceptSelectItem, ExcludeSelectItem, 
WildcardAdditionalOptions};
-use std::cmp::Ordering;
-use std::collections::HashSet;
-use std::sync::Arc;
 
 ///  The value to which `COUNT(*)` is expanded to in
 ///  `COUNT(<constant>)` expressions
@@ -433,7 +436,7 @@ pub fn expand_qualified_wildcard(
     let qualified_schema =
         DFSchema::new_with_metadata(qualified_fields, 
schema.metadata().clone())?
             // We can use the functional dependencies as is, since it only 
stores indices:
-            
.with_functional_dependencies(schema.functional_dependencies().clone());
+            
.with_functional_dependencies(schema.functional_dependencies().clone())?;
     let excluded_columns = if let Some(WildcardAdditionalOptions {
         opt_exclude,
         opt_except,
@@ -730,11 +733,7 @@ fn agg_cols(agg: &Aggregate) -> Vec<Column> {
         .collect()
 }
 
-fn exprlist_to_fields_aggregate(
-    exprs: &[Expr],
-    plan: &LogicalPlan,
-    agg: &Aggregate,
-) -> Result<Vec<DFField>> {
+fn exprlist_to_fields_aggregate(exprs: &[Expr], agg: &Aggregate) -> 
Result<Vec<DFField>> {
     let agg_cols = agg_cols(agg);
     let mut fields = vec![];
     for expr in exprs {
@@ -743,7 +742,7 @@ fn exprlist_to_fields_aggregate(
                 // resolve against schema of input to aggregate
                 fields.push(expr.to_field(agg.input.schema())?);
             }
-            _ => fields.push(expr.to_field(plan.schema())?),
+            _ => fields.push(expr.to_field(&agg.schema)?),
         }
     }
     Ok(fields)
@@ -760,15 +759,7 @@ pub fn exprlist_to_fields<'a>(
     // `GROUPING(person.state)` so in order to resolve `person.state` in this 
case we need to
     // look at the input to the aggregate instead.
     let fields = match plan {
-        LogicalPlan::Aggregate(agg) => {
-            Some(exprlist_to_fields_aggregate(&exprs, plan, agg))
-        }
-        LogicalPlan::Window(window) => match window.input.as_ref() {
-            LogicalPlan::Aggregate(agg) => {
-                Some(exprlist_to_fields_aggregate(&exprs, plan, agg))
-            }
-            _ => None,
-        },
+        LogicalPlan::Aggregate(agg) => 
Some(exprlist_to_fields_aggregate(&exprs, agg)),
         _ => None,
     };
     if let Some(fields) = fields {
@@ -1240,10 +1231,9 @@ pub fn merge_schema(inputs: Vec<&LogicalPlan>) -> 
DFSchema {
 #[cfg(test)]
 mod tests {
     use super::*;
-    use crate::expr_vec_fmt;
     use crate::{
-        col, cube, expr, grouping_set, lit, rollup, AggregateFunction, 
WindowFrame,
-        WindowFunction,
+        col, cube, expr, expr_vec_fmt, grouping_set, lit, rollup, 
AggregateFunction,
+        WindowFrame, WindowFunction,
     };
 
     #[test]
diff --git a/datafusion/optimizer/src/optimize_projections.rs 
b/datafusion/optimizer/src/optimize_projections.rs
index 8bee295154..7ae9f7edf5 100644
--- a/datafusion/optimizer/src/optimize_projections.rs
+++ b/datafusion/optimizer/src/optimize_projections.rs
@@ -15,33 +15,42 @@
 // specific language governing permissions and limitations
 // under the License.
 
-//! Optimizer rule to prune unnecessary Columns from the intermediate schemas 
inside the [LogicalPlan].
-//! This rule
-//! - Removes unnecessary columns that are not showed at the output, and that 
are not used during computation.
-//! - Adds projection to decrease table column size before operators that 
benefits from less memory at its input.
-//! - Removes unnecessary [LogicalPlan::Projection] from the [LogicalPlan].
+//! Optimizer rule to prune unnecessary columns from intermediate schemas
+//! inside the [`LogicalPlan`]. This rule:
+//! - Removes unnecessary columns that do not appear at the output and/or are
+//!   not used during any computation step.
+//! - Adds projections to decrease table column size before operators that
+//!   benefit from a smaller memory footprint at its input.
+//! - Removes unnecessary [`LogicalPlan::Projection`]s from the 
[`LogicalPlan`].
+
+use std::collections::HashSet;
+use std::sync::Arc;
+
 use crate::optimizer::ApplyOrder;
-use datafusion_common::{Column, DFSchema, DFSchemaRef, JoinType, Result};
-use datafusion_expr::expr::{Alias, ScalarFunction};
+use crate::{OptimizerConfig, OptimizerRule};
+
+use arrow::datatypes::SchemaRef;
+use datafusion_common::{
+    get_required_group_by_exprs_indices, Column, DFSchema, DFSchemaRef, 
JoinType, Result,
+};
+use datafusion_expr::expr::{Alias, ScalarFunction, ScalarFunctionDefinition};
 use datafusion_expr::{
     logical_plan::LogicalPlan, projection_schema, Aggregate, BinaryExpr, Cast, 
Distinct,
-    Expr, Projection, ScalarFunctionDefinition, TableScan, Window,
+    Expr, GroupingSet, Projection, TableScan, Window,
 };
+
 use hashbrown::HashMap;
 use itertools::{izip, Itertools};
-use std::collections::HashSet;
-use std::sync::Arc;
-
-use crate::{OptimizerConfig, OptimizerRule};
 
-/// A rule for optimizing logical plans by removing unused Columns/Fields.
+/// A rule for optimizing logical plans by removing unused columns/fields.
 ///
-/// `OptimizeProjections` is an optimizer rule that identifies and eliminates 
columns from a logical plan
-/// that are not used in any downstream operations. This can improve query 
performance and reduce unnecessary
-/// data processing.
+/// `OptimizeProjections` is an optimizer rule that identifies and eliminates
+/// columns from a logical plan that are not used by downstream operations.
+/// This can improve query performance and reduce unnecessary data processing.
 ///
-/// The rule analyzes the input logical plan, determines the necessary column 
indices, and then removes any
-/// unnecessary columns. Additionally, it eliminates any unnecessary 
projections in the plan.
+/// The rule analyzes the input logical plan, determines the necessary column
+/// indices, and then removes any unnecessary columns. It also removes any
+/// unnecessary projections from the plan tree.
 #[derive(Default)]
 pub struct OptimizeProjections {}
 
@@ -58,8 +67,8 @@ impl OptimizerRule for OptimizeProjections {
         plan: &LogicalPlan,
         config: &dyn OptimizerConfig,
     ) -> Result<Option<LogicalPlan>> {
-        // All of the fields at the output are necessary.
-        let indices = require_all_indices(plan);
+        // All output fields are necessary:
+        let indices = (0..plan.schema().fields().len()).collect::<Vec<_>>();
         optimize_projections(plan, config, &indices)
     }
 
@@ -72,30 +81,35 @@ impl OptimizerRule for OptimizeProjections {
     }
 }
 
-/// Removes unnecessary columns (e.g Columns that are not referred at the 
output schema and
-/// Columns that are not used during any computation, expression evaluation) 
from the logical plan and its inputs.
+/// Removes unnecessary columns (e.g. columns that do not appear in the output
+/// schema and/or are not used during any computation step such as expression
+/// evaluation) from the logical plan and its inputs.
 ///
-/// # Arguments
+/// # Parameters
 ///
-/// - `plan`: A reference to the input `LogicalPlan` to be optimized.
-/// - `_config`: A reference to the optimizer configuration (not currently 
used).
-/// - `indices`: A slice of column indices that represent the necessary column 
indices for downstream operations.
+/// - `plan`: A reference to the input `LogicalPlan` to optimize.
+/// - `config`: A reference to the optimizer configuration.
+/// - `indices`: A slice of column indices that represent the necessary column
+///   indices for downstream operations.
 ///
 /// # Returns
 ///
-/// - `Ok(Some(LogicalPlan))`: An optimized `LogicalPlan` with unnecessary 
columns removed.
-/// - `Ok(None)`: If the optimization process results in a logical plan that 
doesn't require further propagation.
-/// - `Err(error)`: If an error occurs during the optimization process.
+/// A `Result` object with the following semantics:
+///
+/// - `Ok(Some(LogicalPlan))`: An optimized `LogicalPlan` without unnecessary
+///   columns.
+/// - `Ok(None)`: Signal that the given logical plan did not require any 
change.
+/// - `Err(error)`: An error occured during the optimization process.
 fn optimize_projections(
     plan: &LogicalPlan,
-    _config: &dyn OptimizerConfig,
+    config: &dyn OptimizerConfig,
     indices: &[usize],
 ) -> Result<Option<LogicalPlan>> {
     // `child_required_indices` stores
     // - indices of the columns required for each child
     // - a flag indicating whether putting a projection above children is 
beneficial for the parent.
     // As an example LogicalPlan::Filter benefits from small tables. Hence for 
filter child this flag would be `true`.
-    let child_required_indices: Option<Vec<(Vec<usize>, bool)>> = match plan {
+    let child_required_indices: Vec<(Vec<usize>, bool)> = match plan {
         LogicalPlan::Sort(_)
         | LogicalPlan::Filter(_)
         | LogicalPlan::Repartition(_)
@@ -103,36 +117,32 @@ fn optimize_projections(
         | LogicalPlan::Union(_)
         | LogicalPlan::SubqueryAlias(_)
         | LogicalPlan::Distinct(Distinct::On(_)) => {
-            // Re-route required indices from the parent + column indices 
referred by expressions in the plan
-            // to the child.
-            // All of these operators benefits from small tables at their 
inputs. Hence projection_beneficial flag is `true`.
+            // Pass index requirements from the parent as well as column 
indices
+            // that appear in this plan's expressions to its child. All these
+            // operators benefit from "small" inputs, so the 
projection_beneficial
+            // flag is `true`.
             let exprs = plan.expressions();
-            let child_req_indices = plan
-                .inputs()
+            plan.inputs()
                 .into_iter()
                 .map(|input| {
-                    let required_indices =
-                        get_all_required_indices(indices, input, 
exprs.iter())?;
-                    Ok((required_indices, true))
+                    get_all_required_indices(indices, input, exprs.iter())
+                        .map(|idxs| (idxs, true))
                 })
-                .collect::<Result<Vec<_>>>()?;
-            Some(child_req_indices)
+                .collect::<Result<_>>()?
         }
         LogicalPlan::Limit(_) | LogicalPlan::Prepare(_) => {
-            // Re-route required indices from the parent + column indices 
referred by expressions in the plan
-            // to the child.
-            // Limit, Prepare doesn't benefit from small column numbers. Hence 
projection_beneficial flag is `false`.
+            // Pass index requirements from the parent as well as column 
indices
+            // that appear in this plan's expressions to its child. These 
operators
+            // do not benefit from "small" inputs, so the projection_beneficial
+            // flag is `false`.
             let exprs = plan.expressions();
-            let child_req_indices = plan
-                .inputs()
+            plan.inputs()
                 .into_iter()
                 .map(|input| {
-                    let required_indices =
-                        get_all_required_indices(indices, input, 
exprs.iter())?;
-                    Ok((required_indices, false))
+                    get_all_required_indices(indices, input, exprs.iter())
+                        .map(|idxs| (idxs, false))
                 })
-                .collect::<Result<Vec<_>>>()?;
-            Some(child_req_indices)
+                .collect::<Result<_>>()?
         }
         LogicalPlan::Copy(_)
         | LogicalPlan::Ddl(_)
@@ -141,81 +151,99 @@ fn optimize_projections(
         | LogicalPlan::Analyze(_)
         | LogicalPlan::Subquery(_)
         | LogicalPlan::Distinct(Distinct::All(_)) => {
-            // Require all of the fields of the Dml, Ddl, Copy, Explain, 
Analyze, Subquery, Distinct::All input(s).
-            // Their child plan can be treated as final plan. Otherwise 
expected schema may not match.
-            // TODO: For some subquery variants we may not need to require all 
indices for its input.
-            // such as Exists<SubQuery>.
-            let child_requirements = plan
-                .inputs()
+            // These plans require all their fields, and their children should
+            // be treated as final plans -- otherwise, we may have schema a
+            // mismatch.
+            // TODO: For some subquery variants (e.g. a subquery arising from 
an
+            //       EXISTS expression), we may not need to require all 
indices.
+            plan.inputs()
                 .iter()
-                .map(|input| {
-                    // Require all of the fields for each input.
-                    // No projection since all of the fields at the child is 
required
-                    (require_all_indices(input), false)
-                })
-                .collect::<Vec<_>>();
-            Some(child_requirements)
+                .map(|input| 
((0..input.schema().fields().len()).collect_vec(), false))
+                .collect::<Vec<_>>()
         }
         LogicalPlan::EmptyRelation(_)
         | LogicalPlan::Statement(_)
         | LogicalPlan::Values(_)
         | LogicalPlan::Extension(_)
         | LogicalPlan::DescribeTable(_) => {
-            // EmptyRelation, Values, DescribeTable, Statement has no inputs 
stop iteration
-
-            // TODO: Add support for extension
-            // It is not known how to direct requirements to children for 
LogicalPlan::Extension.
-            // Safest behaviour is to stop propagation.
-            None
+            // These operators have no inputs, so stop the optimization 
process.
+            // TODO: Add support for `LogicalPlan::Extension`.
+            return Ok(None);
         }
         LogicalPlan::Projection(proj) => {
             return if let Some(proj) = merge_consecutive_projections(proj)? {
-                rewrite_projection_given_requirements(&proj, _config, indices)?
-                    .map(|res| Ok(Some(res)))
-                    // Even if projection cannot be optimized, return merged 
version
-                    .unwrap_or_else(|| Ok(Some(LogicalPlan::Projection(proj))))
+                Ok(Some(
+                    rewrite_projection_given_requirements(&proj, config, 
indices)?
+                        // Even if we cannot optimize the projection, merge if 
possible:
+                        .unwrap_or_else(|| LogicalPlan::Projection(proj)),
+                ))
             } else {
-                rewrite_projection_given_requirements(proj, _config, indices)
+                rewrite_projection_given_requirements(proj, config, indices)
             };
         }
         LogicalPlan::Aggregate(aggregate) => {
-            // Split parent requirements to group by and aggregate sections
-            let group_expr_len = aggregate.group_expr_len()?;
-            let (_group_by_reqs, mut aggregate_reqs): (Vec<usize>, Vec<usize>) 
=
-                indices.iter().partition(|&&idx| idx < group_expr_len);
-            // Offset aggregate indices so that they point to valid indices at 
the `aggregate.aggr_expr`
-            aggregate_reqs
-                .iter_mut()
-                .for_each(|idx| *idx -= group_expr_len);
-
-            // Group by expressions are same
-            let new_group_bys = aggregate.group_expr.clone();
-
-            // Only use absolutely necessary aggregate expressions required by 
parent.
+            // Split parent requirements to GROUP BY and aggregate sections:
+            let n_group_exprs = aggregate.group_expr_len()?;
+            let (group_by_reqs, mut aggregate_reqs): (Vec<usize>, Vec<usize>) =
+                indices.iter().partition(|&&idx| idx < n_group_exprs);
+            // Offset aggregate indices so that they point to valid indices at
+            // `aggregate.aggr_expr`:
+            for idx in aggregate_reqs.iter_mut() {
+                *idx -= n_group_exprs;
+            }
+
+            // Get absolutely necessary GROUP BY fields:
+            let group_by_expr_existing = aggregate
+                .group_expr
+                .iter()
+                .map(|group_by_expr| group_by_expr.display_name())
+                .collect::<Result<Vec<_>>>()?;
+            let new_group_bys = if let Some(simplest_groupby_indices) =
+                get_required_group_by_exprs_indices(
+                    aggregate.input.schema(),
+                    &group_by_expr_existing,
+                ) {
+                // Some of the fields in the GROUP BY may be required by the
+                // parent even if these fields are unnecessary in terms of
+                // functional dependency.
+                let required_indices =
+                    merge_slices(&simplest_groupby_indices, &group_by_reqs);
+                get_at_indices(&aggregate.group_expr, &required_indices)
+            } else {
+                aggregate.group_expr.clone()
+            };
+
+            // Only use the absolutely necessary aggregate expressions required
+            // by the parent:
             let mut new_aggr_expr = get_at_indices(&aggregate.aggr_expr, 
&aggregate_reqs);
             let all_exprs_iter = 
new_group_bys.iter().chain(new_aggr_expr.iter());
-            let necessary_indices =
-                indices_referred_by_exprs(&aggregate.input, all_exprs_iter)?;
+            let schema = aggregate.input.schema();
+            let necessary_indices = indices_referred_by_exprs(schema, 
all_exprs_iter)?;
 
             let aggregate_input = if let Some(input) =
-                optimize_projections(&aggregate.input, _config, 
&necessary_indices)?
+                optimize_projections(&aggregate.input, config, 
&necessary_indices)?
             {
                 input
             } else {
                 aggregate.input.as_ref().clone()
             };
 
-            // Simplify input of the aggregation by adding a projection so 
that its input only contains
-            // absolutely necessary columns for the aggregate expressions. 
Please no that we use aggregate.input.schema()
-            // because necessary_indices refers to fields in this schema.
-            let necessary_exprs =
-                get_required_exprs(aggregate.input.schema(), 
&necessary_indices);
-            let (aggregate_input, _is_added) =
-                add_projection_on_top_if_helpful(aggregate_input, 
necessary_exprs, true)?;
-
-            // Aggregate always needs at least one aggregate expression.
-            // With a nested count we don't require any column as input, but 
still need to create a correct aggregate
-            // The aggregate may be optimized out later (select count(*) from 
(select count(*) from [...]) always returns 1
+            // Simplify the input of the aggregation by adding a projection so
+            // that its input only contains absolutely necessary columns for
+            // the aggregate expressions. Note that necessary_indices refer to
+            // fields in `aggregate.input.schema()`.
+            let necessary_exprs = get_required_exprs(schema, 
&necessary_indices);
+            let (aggregate_input, _) =
+                add_projection_on_top_if_helpful(aggregate_input, 
necessary_exprs)?;
+
+            // Aggregations always need at least one aggregate expression.
+            // With a nested count, we don't require any column as input, but
+            // still need to create a correct aggregate, which may be optimized
+            // out later. As an example, consider the following query:
+            //
+            // SELECT COUNT(*) FROM (SELECT COUNT(*) FROM [...])
+            //
+            // which always returns 1.
             if new_aggr_expr.is_empty()
                 && new_group_bys.is_empty()
                 && !aggregate.aggr_expr.is_empty()
@@ -223,7 +251,8 @@ fn optimize_projections(
                 new_aggr_expr = vec![aggregate.aggr_expr[0].clone()];
             }
 
-            // Create new aggregate plan with updated input, and absolutely 
necessary fields.
+            // Create a new aggregate plan with the updated input and only the
+            // absolutely necessary fields:
             return Aggregate::try_new(
                 Arc::new(aggregate_input),
                 new_group_bys,
@@ -232,43 +261,48 @@ fn optimize_projections(
             .map(|aggregate| Some(LogicalPlan::Aggregate(aggregate)));
         }
         LogicalPlan::Window(window) => {
-            // Split parent requirements to child and window expression 
sections.
+            // Split parent requirements to child and window expression 
sections:
             let n_input_fields = window.input.schema().fields().len();
             let (child_reqs, mut window_reqs): (Vec<usize>, Vec<usize>) =
                 indices.iter().partition(|&&idx| idx < n_input_fields);
-            // Offset window expr indices so that they point to valid indices 
at the `window.window_expr`
-            window_reqs
-                .iter_mut()
-                .for_each(|idx| *idx -= n_input_fields);
+            // Offset window expression indices so that they point to valid
+            // indices at `window.window_expr`:
+            for idx in window_reqs.iter_mut() {
+                *idx -= n_input_fields;
+            }
 
-            // Only use window expressions that are absolutely necessary by 
parent requirements.
+            // Only use window expressions that are absolutely necessary 
according
+            // to parent requirements:
             let new_window_expr = get_at_indices(&window.window_expr, 
&window_reqs);
 
-            // All of the required column indices at the input of the window 
by parent, and window expression requirements.
+            // Get all the required column indices at the input, either by the
+            // parent or window expression requirements.
             let required_indices = get_all_required_indices(
                 &child_reqs,
                 &window.input,
                 new_window_expr.iter(),
             )?;
             let window_child = if let Some(new_window_child) =
-                optimize_projections(&window.input, _config, 
&required_indices)?
+                optimize_projections(&window.input, config, &required_indices)?
             {
                 new_window_child
             } else {
                 window.input.as_ref().clone()
             };
-            // When no window expression is necessary, just use window input. 
(Remove window operator)
+
             return if new_window_expr.is_empty() {
+                // When no window expression is necessary, use the input 
directly:
                 Ok(Some(window_child))
             } else {
                 // Calculate required expressions at the input of the window.
-                // Please note that we use `old_child`, because 
`required_indices` refers to `old_child`.
+                // Please note that we use `old_child`, because 
`required_indices`
+                // refers to `old_child`.
                 let required_exprs =
                     get_required_exprs(window.input.schema(), 
&required_indices);
-                let (window_child, _is_added) =
-                    add_projection_on_top_if_helpful(window_child, 
required_exprs, true)?;
-                let window = Window::try_new(new_window_expr, 
Arc::new(window_child))?;
-                Ok(Some(LogicalPlan::Window(window)))
+                let (window_child, _) =
+                    add_projection_on_top_if_helpful(window_child, 
required_exprs)?;
+                Window::try_new(new_window_expr, Arc::new(window_child))
+                    .map(|window| Some(LogicalPlan::Window(window)))
             };
         }
         LogicalPlan::Join(join) => {
@@ -280,136 +314,137 @@ fn optimize_projections(
                 get_all_required_indices(&left_req_indices, &join.left, 
exprs.iter())?;
             let right_indices =
                 get_all_required_indices(&right_req_indices, &join.right, 
exprs.iter())?;
-            // Join benefits from small columns numbers at its input 
(decreases memory usage)
-            // Hence each child benefits from projection.
-            Some(vec![(left_indices, true), (right_indices, true)])
+            // Joins benefit from "small" input tables (lower memory usage).
+            // Therefore, each child benefits from projection:
+            vec![(left_indices, true), (right_indices, true)]
         }
         LogicalPlan::CrossJoin(cross_join) => {
             let left_len = cross_join.left.schema().fields().len();
             let (left_child_indices, right_child_indices) =
                 split_join_requirements(left_len, indices, &JoinType::Inner);
-            // Join benefits from small columns numbers at its input 
(decreases memory usage)
-            // Hence each child benefits from projection.
-            Some(vec![
-                (left_child_indices, true),
-                (right_child_indices, true),
-            ])
+            // Joins benefit from "small" input tables (lower memory usage).
+            // Therefore, each child benefits from projection:
+            vec![(left_child_indices, true), (right_child_indices, true)]
         }
         LogicalPlan::TableScan(table_scan) => {
-            let projection_fields = table_scan.projected_schema.fields();
             let schema = table_scan.source.schema();
-            // We expect to find all of the required indices of the projected 
schema fields.
-            // among original schema. If at least one of them cannot be found. 
Use all of the fields in the file.
-            // (No projection at the source)
-            let projection = indices
-                .iter()
-                .map(|&idx| {
-                    schema.fields().iter().position(|field_source| {
-                        projection_fields[idx].field() == field_source
-                    })
-                })
-                .collect::<Option<Vec<_>>>();
+            // Get indices referred to in the original (schema with all fields)
+            // given projected indices.
+            let projection = with_indices(&table_scan.projection, schema, 
|map| {
+                indices.iter().map(|&idx| map[idx]).collect()
+            });
 
-            return Ok(Some(LogicalPlan::TableScan(TableScan::try_new(
+            return TableScan::try_new(
                 table_scan.table_name.clone(),
                 table_scan.source.clone(),
-                projection,
+                Some(projection),
                 table_scan.filters.clone(),
                 table_scan.fetch,
-            )?)));
+            )
+            .map(|table| Some(LogicalPlan::TableScan(table)));
         }
     };
 
-    let child_required_indices =
-        if let Some(child_required_indices) = child_required_indices {
-            child_required_indices
-        } else {
-            // Stop iteration, cannot propagate requirement down below this 
operator.
-            return Ok(None);
-        };
-
     let new_inputs = izip!(child_required_indices, plan.inputs().into_iter())
         .map(|((required_indices, projection_beneficial), child)| {
-            let (input, mut is_changed) = if let Some(new_input) =
-                optimize_projections(child, _config, &required_indices)?
+            let (input, is_changed) = if let Some(new_input) =
+                optimize_projections(child, config, &required_indices)?
             {
                 (new_input, true)
             } else {
                 (child.clone(), false)
             };
             let project_exprs = get_required_exprs(child.schema(), 
&required_indices);
-            let (input, is_projection_added) = 
add_projection_on_top_if_helpful(
-                input,
-                project_exprs,
-                projection_beneficial,
-            )?;
-            is_changed |= is_projection_added;
-            Ok(is_changed.then_some(input))
+            let (input, proj_added) = if projection_beneficial {
+                add_projection_on_top_if_helpful(input, project_exprs)?
+            } else {
+                (input, false)
+            };
+            Ok((is_changed || proj_added).then_some(input))
         })
-        .collect::<Result<Vec<Option<_>>>>()?;
-    // All of the children are same in this case, no need to change plan
+        .collect::<Result<Vec<_>>>()?;
     if new_inputs.iter().all(|child| child.is_none()) {
+        // All children are the same in this case, no need to change the plan:
         Ok(None)
     } else {
-        // At least one of the children is changed.
+        // At least one of the children is changed:
         let new_inputs = izip!(new_inputs, plan.inputs())
-            // If new_input is `None`, this means child is not changed. Hence 
use `old_child` during construction.
+            // If new_input is `None`, this means child is not changed, so use
+            // `old_child` during construction:
             .map(|(new_input, old_child)| new_input.unwrap_or_else(|| 
old_child.clone()))
             .collect::<Vec<_>>();
-        let res = plan.with_new_inputs(&new_inputs)?;
-        Ok(Some(res))
+        plan.with_new_inputs(&new_inputs).map(Some)
     }
 }
 
-/// Merge Consecutive Projections
+/// This function applies the given function `f` to the projection indices
+/// `proj_indices` if they exist. Otherwise, applies `f` to a default set
+/// of indices according to `schema`.
+fn with_indices<F>(
+    proj_indices: &Option<Vec<usize>>,
+    schema: SchemaRef,
+    mut f: F,
+) -> Vec<usize>
+where
+    F: FnMut(&[usize]) -> Vec<usize>,
+{
+    match proj_indices {
+        Some(indices) => f(indices.as_slice()),
+        None => {
+            let range: Vec<usize> = (0..schema.fields.len()).collect();
+            f(range.as_slice())
+        }
+    }
+}
+
+/// Merges consecutive projections.
 ///
 /// Given a projection `proj`, this function attempts to merge it with a 
previous
-/// projection if it exists and if the merging is beneficial. Merging is 
considered
-/// beneficial when expressions in the current projection are non-trivial and 
referred to
-/// more than once in its input fields. This can act as a caching mechanism 
for non-trivial
-/// computations.
+/// projection if it exists and if merging is beneficial. Merging is considered
+/// beneficial when expressions in the current projection are non-trivial and
+/// appear more than once in its input fields. This can act as a caching 
mechanism
+/// for non-trivial computations.
 ///
-/// # Arguments
+/// # Parameters
 ///
 /// * `proj` - A reference to the `Projection` to be merged.
 ///
 /// # Returns
 ///
-/// A `Result` containing an `Option` of the merged `Projection`. If merging 
is not beneficial
-/// it returns `Ok(None)`.
+/// A `Result` object with the following semantics:
+///
+/// - `Ok(Some(Projection))`: Merge was beneficial and successful. Contains the
+///   merged projection.
+/// - `Ok(None)`: Signals that merge is not beneficial (and has not taken 
place).
+/// - `Err(error)`: An error occured during the function call.
 fn merge_consecutive_projections(proj: &Projection) -> 
Result<Option<Projection>> {
-    let prev_projection = if let LogicalPlan::Projection(prev) = 
proj.input.as_ref() {
-        prev
-    } else {
+    let LogicalPlan::Projection(prev_projection) = proj.input.as_ref() else {
         return Ok(None);
     };
 
-    // Count usages (referral counts) of each projection expression in its 
input fields
-    let column_referral_map: HashMap<Column, usize> = proj
-        .expr
-        .iter()
-        .flat_map(|expr| expr.to_columns())
-        .fold(HashMap::new(), |mut map, cols| {
-            cols.into_iter()
-                .for_each(|col| *map.entry(col).or_default() += 1);
-            map
-        });
-
-    // Merging these projections is not beneficial, e.g
-    // If an expression is not trivial and it is referred more than 1, 
consecutive projections will be
-    // beneficial as caching mechanism for non-trivial computations.
-    // See discussion in: 
https://github.com/apache/arrow-datafusion/issues/8296
-    if column_referral_map.iter().any(|(col, usage)| {
-        *usage > 1
+    // Count usages (referrals) of each projection expression in its input 
fields:
+    let mut column_referral_map = HashMap::<Column, usize>::new();
+    for columns in proj.expr.iter().flat_map(|expr| expr.to_columns()) {
+        for col in columns.into_iter() {
+            *column_referral_map.entry(col.clone()).or_default() += 1;
+        }
+    }
+
+    // If an expression is non-trivial and appears more than once, consecutive
+    // projections will benefit from a compute-once approach. For details, see:
+    // https://github.com/apache/arrow-datafusion/issues/8296
+    if column_referral_map.into_iter().any(|(col, usage)| {
+        usage > 1
             && !is_expr_trivial(
                 &prev_projection.expr
-                    [prev_projection.schema.index_of_column(col).unwrap()],
+                    [prev_projection.schema.index_of_column(&col).unwrap()],
             )
     }) {
         return Ok(None);
     }
 
-    // If all of the expression of the top projection can be rewritten. 
Rewrite expressions and create a new projection
+    // If all the expression of the top projection can be rewritten, do so and
+    // create a new projection:
     let new_exprs = proj
         .expr
         .iter()
@@ -429,183 +464,252 @@ fn merge_consecutive_projections(proj: &Projection) -> 
Result<Option<Projection>
     }
 }
 
-/// Trim Expression
-///
-/// Trim the given expression by removing any unnecessary layers of 
abstraction.
+/// Trim the given expression by removing any unnecessary layers of aliasing.
 /// If the expression is an alias, the function returns the underlying 
expression.
-/// Otherwise, it returns the original expression unchanged.
-///
-/// # Arguments
+/// Otherwise, it returns the given expression as is.
 ///
-/// * `expr` - The input expression to be trimmed.
+/// Without trimming, we can end up with unnecessary indirections inside 
expressions
+/// during projection merges.
 ///
-/// # Returns
-///
-/// The trimmed expression. If the input is an alias, the underlying 
expression is returned.
-///
-/// Without trimming, during projection merge we can end up unnecessary 
indirections inside the expressions.
 /// Consider:
 ///
-/// Projection (a1 + b1 as sum1)
-/// --Projection (a as a1, b as b1)
-/// ----Source (a, b)
+/// ```text
+/// Projection(a1 + b1 as sum1)
+/// --Projection(a as a1, b as b1)
+/// ----Source(a, b)
+/// ```
 ///
-/// After merge we want to produce
+/// After merge, we want to produce:
 ///
-/// Projection (a + b as sum1)
+/// ```text
+/// Projection(a + b as sum1)
 /// --Source(a, b)
+/// ```
 ///
-/// Without trimming we would end up
+/// Without trimming, we would end up with:
 ///
-/// Projection (a as a1 + b as b1 as sum1)
+/// ```text
+/// Projection((a as a1 + b as b1) as sum1)
 /// --Source(a, b)
+/// ```
 fn trim_expr(expr: Expr) -> Expr {
     match expr {
-        Expr::Alias(alias) => *alias.expr,
+        Expr::Alias(alias) => trim_expr(*alias.expr),
         _ => expr,
     }
 }
 
-// Check whether expression is trivial (e.g it doesn't include computation.)
+// Check whether `expr` is trivial; i.e. it doesn't imply any computation.
 fn is_expr_trivial(expr: &Expr) -> bool {
     matches!(expr, Expr::Column(_) | Expr::Literal(_))
 }
 
-// Exit early when None is seen.
+// Exit early when there is no rewrite to do.
 macro_rules! rewrite_expr_with_check {
     ($expr:expr, $input:expr) => {
-        if let Some(val) = rewrite_expr($expr, $input)? {
-            val
+        if let Some(value) = rewrite_expr($expr, $input)? {
+            value
         } else {
             return Ok(None);
         }
     };
 }
 
-// Rewrites expression using its input projection (Merges consecutive 
projection expressions).
-/// Rewrites an projections expression using its input projection
-/// (Helper during merging consecutive projection expressions).
+/// Rewrites a projection expression using the projection before it (i.e. its 
input)
+/// This is a subroutine to the `merge_consecutive_projections` function.
 ///
-/// # Arguments
+/// # Parameters
 ///
-/// * `expr` - A reference to the expression to be rewritten.
-/// * `input` - A reference to the input (itself a projection) of the 
projection expression.
+/// * `expr` - A reference to the expression to rewrite.
+/// * `input` - A reference to the input of the projection expression (itself
+///   a projection).
 ///
 /// # Returns
 ///
-/// A `Result` containing an `Option` of the rewritten expression. If the 
rewrite is successful,
-/// it returns `Ok(Some)` with the modified expression. If the expression 
cannot be rewritten
-/// it returns `Ok(None)`.
+/// A `Result` object with the following semantics:
+///
+/// - `Ok(Some(Expr))`: Rewrite was successful. Contains the rewritten result.
+/// - `Ok(None)`: Signals that `expr` can not be rewritten.
+/// - `Err(error)`: An error occured during the function call.
 fn rewrite_expr(expr: &Expr, input: &Projection) -> Result<Option<Expr>> {
-    Ok(match expr {
+    let result = match expr {
         Expr::Column(col) => {
-            // Find index of column
+            // Find index of column:
             let idx = input.schema.index_of_column(col)?;
-            Some(input.expr[idx].clone())
+            input.expr[idx].clone()
         }
-        Expr::BinaryExpr(binary) => {
-            let lhs = trim_expr(rewrite_expr_with_check!(&binary.left, input));
-            let rhs = trim_expr(rewrite_expr_with_check!(&binary.right, 
input));
-            Some(Expr::BinaryExpr(BinaryExpr::new(
-                Box::new(lhs),
-                binary.op,
-                Box::new(rhs),
-            )))
-        }
-        Expr::Alias(alias) => {
-            let new_expr = trim_expr(rewrite_expr_with_check!(&alias.expr, 
input));
-            Some(Expr::Alias(Alias::new(
-                new_expr,
-                alias.relation.clone(),
-                alias.name.clone(),
-            )))
-        }
-        Expr::Literal(_val) => Some(expr.clone()),
+        Expr::BinaryExpr(binary) => Expr::BinaryExpr(BinaryExpr::new(
+            Box::new(trim_expr(rewrite_expr_with_check!(&binary.left, input))),
+            binary.op,
+            Box::new(trim_expr(rewrite_expr_with_check!(&binary.right, 
input))),
+        )),
+        Expr::Alias(alias) => Expr::Alias(Alias::new(
+            trim_expr(rewrite_expr_with_check!(&alias.expr, input)),
+            alias.relation.clone(),
+            alias.name.clone(),
+        )),
+        Expr::Literal(_) => expr.clone(),
         Expr::Cast(cast) => {
             let new_expr = rewrite_expr_with_check!(&cast.expr, input);
-            Some(Expr::Cast(Cast::new(
-                Box::new(new_expr),
-                cast.data_type.clone(),
-            )))
+            Expr::Cast(Cast::new(Box::new(new_expr), cast.data_type.clone()))
         }
         Expr::ScalarFunction(scalar_fn) => {
-            let fun = if let ScalarFunctionDefinition::BuiltIn(fun) = 
scalar_fn.func_def {
-                fun
-            } else {
+            // TODO: Support UDFs.
+            let ScalarFunctionDefinition::BuiltIn(fun) = scalar_fn.func_def 
else {
                 return Ok(None);
             };
-            scalar_fn
+            return Ok(scalar_fn
                 .args
                 .iter()
                 .map(|expr| rewrite_expr(expr, input))
-                .collect::<Result<Option<Vec<_>>>>()?
-                .map(|new_args| Expr::ScalarFunction(ScalarFunction::new(fun, 
new_args)))
+                .collect::<Result<Option<_>>>()?
+                .map(|new_args| {
+                    Expr::ScalarFunction(ScalarFunction::new(fun, new_args))
+                }));
         }
-        _ => {
-            // Unsupported type to merge in consecutive projections
-            None
-        }
-    })
+        // Unsupported type for consecutive projection merge analysis.
+        _ => return Ok(None),
+    };
+    Ok(Some(result))
 }
 
-/// Retrieves a set of outer-referenced columns from an expression.
-/// Please note that `expr.to_columns()` API doesn't return these columns.
+/// Retrieves a set of outer-referenced columns by the given expression, 
`expr`.
+/// Note that the `Expr::to_columns()` function doesn't return these columns.
 ///
-/// # Arguments
+/// # Parameters
 ///
-/// * `expr` - The expression to be analyzed for outer-referenced columns.
+/// * `expr` - The expression to analyze for outer-referenced columns.
 ///
 /// # Returns
 ///
-/// A `HashSet<Column>` containing columns that are referenced by the 
expression.
-fn outer_columns(expr: &Expr) -> HashSet<Column> {
+/// If the function can safely infer all outer-referenced columns, returns a
+/// `Some(HashSet<Column>)` containing these columns. Otherwise, returns 
`None`.
+fn outer_columns(expr: &Expr) -> Option<HashSet<Column>> {
     let mut columns = HashSet::new();
-    outer_columns_helper(expr, &mut columns);
-    columns
+    outer_columns_helper(expr, &mut columns).then_some(columns)
 }
 
-/// Helper function to accumulate outer-referenced columns referred by the 
`expr`.
+/// A recursive subroutine that accumulates outer-referenced columns by the
+/// given expression, `expr`.
 ///
-/// # Arguments
+/// # Parameters
 ///
-/// * `expr` - The expression to be analyzed for outer-referenced columns.
-/// * `columns` - A mutable reference to a `HashSet<Column>` where the 
detected columns are collected.
-fn outer_columns_helper(expr: &Expr, columns: &mut HashSet<Column>) {
+/// * `expr` - The expression to analyze for outer-referenced columns.
+/// * `columns` - A mutable reference to a `HashSet<Column>` where detected
+///   columns are collected.
+///
+/// Returns `true` if it can safely collect all outer-referenced columns.
+/// Otherwise, returns `false`.
+fn outer_columns_helper(expr: &Expr, columns: &mut HashSet<Column>) -> bool {
     match expr {
         Expr::OuterReferenceColumn(_, col) => {
             columns.insert(col.clone());
+            true
         }
         Expr::BinaryExpr(binary_expr) => {
-            outer_columns_helper(&binary_expr.left, columns);
-            outer_columns_helper(&binary_expr.right, columns);
+            outer_columns_helper(&binary_expr.left, columns)
+                && outer_columns_helper(&binary_expr.right, columns)
         }
         Expr::ScalarSubquery(subquery) => {
-            for expr in &subquery.outer_ref_columns {
-                outer_columns_helper(expr, columns);
-            }
+            let exprs = subquery.outer_ref_columns.iter();
+            outer_columns_helper_multi(exprs, columns)
         }
         Expr::Exists(exists) => {
-            for expr in &exists.subquery.outer_ref_columns {
-                outer_columns_helper(expr, columns);
+            let exprs = exists.subquery.outer_ref_columns.iter();
+            outer_columns_helper_multi(exprs, columns)
+        }
+        Expr::Alias(alias) => outer_columns_helper(&alias.expr, columns),
+        Expr::InSubquery(insubquery) => {
+            let exprs = insubquery.subquery.outer_ref_columns.iter();
+            outer_columns_helper_multi(exprs, columns)
+        }
+        Expr::IsNotNull(expr) | Expr::IsNull(expr) => 
outer_columns_helper(expr, columns),
+        Expr::Cast(cast) => outer_columns_helper(&cast.expr, columns),
+        Expr::Sort(sort) => outer_columns_helper(&sort.expr, columns),
+        Expr::AggregateFunction(aggregate_fn) => {
+            outer_columns_helper_multi(aggregate_fn.args.iter(), columns)
+                && aggregate_fn
+                    .order_by
+                    .as_ref()
+                    .map_or(true, |obs| outer_columns_helper_multi(obs.iter(), 
columns))
+                && aggregate_fn
+                    .filter
+                    .as_ref()
+                    .map_or(true, |filter| outer_columns_helper(filter, 
columns))
+        }
+        Expr::WindowFunction(window_fn) => {
+            outer_columns_helper_multi(window_fn.args.iter(), columns)
+                && outer_columns_helper_multi(window_fn.order_by.iter(), 
columns)
+                && outer_columns_helper_multi(window_fn.partition_by.iter(), 
columns)
+        }
+        Expr::GroupingSet(groupingset) => match groupingset {
+            GroupingSet::GroupingSets(multi_exprs) => multi_exprs
+                .iter()
+                .all(|e| outer_columns_helper_multi(e.iter(), columns)),
+            GroupingSet::Cube(exprs) | GroupingSet::Rollup(exprs) => {
+                outer_columns_helper_multi(exprs.iter(), columns)
             }
+        },
+        Expr::ScalarFunction(scalar_fn) => {
+            outer_columns_helper_multi(scalar_fn.args.iter(), columns)
         }
-        Expr::Alias(alias) => {
-            outer_columns_helper(&alias.expr, columns);
+        Expr::Like(like) => {
+            outer_columns_helper(&like.expr, columns)
+                && outer_columns_helper(&like.pattern, columns)
         }
-        _ => {}
+        Expr::InList(in_list) => {
+            outer_columns_helper(&in_list.expr, columns)
+                && outer_columns_helper_multi(in_list.list.iter(), columns)
+        }
+        Expr::Case(case) => {
+            let when_then_exprs = case
+                .when_then_expr
+                .iter()
+                .flat_map(|(first, second)| [first.as_ref(), second.as_ref()]);
+            outer_columns_helper_multi(when_then_exprs, columns)
+                && case
+                    .expr
+                    .as_ref()
+                    .map_or(true, |expr| outer_columns_helper(expr, columns))
+                && case
+                    .else_expr
+                    .as_ref()
+                    .map_or(true, |expr| outer_columns_helper(expr, columns))
+        }
+        Expr::Column(_) | Expr::Literal(_) | Expr::Wildcard { .. } => true,
+        _ => false,
     }
 }
 
-/// Generates the required expressions(Column) that resides at `indices` of 
the `input_schema`.
+/// A recursive subroutine that accumulates outer-referenced columns by the
+/// given expressions (`exprs`).
+///
+/// # Parameters
+///
+/// * `exprs` - The expressions to analyze for outer-referenced columns.
+/// * `columns` - A mutable reference to a `HashSet<Column>` where detected
+///   columns are collected.
+///
+/// Returns `true` if it can safely collect all outer-referenced columns.
+/// Otherwise, returns `false`.
+fn outer_columns_helper_multi<'a>(
+    mut exprs: impl Iterator<Item = &'a Expr>,
+    columns: &mut HashSet<Column>,
+) -> bool {
+    exprs.all(|e| outer_columns_helper(e, columns))
+}
+
+/// Generates the required expressions (columns) that reside at `indices` of
+/// the given `input_schema`.
 ///
 /// # Arguments
 ///
 /// * `input_schema` - A reference to the input schema.
-/// * `indices` - A slice of `usize` indices specifying which columns are 
required.
+/// * `indices` - A slice of `usize` indices specifying required columns.
 ///
 /// # Returns
 ///
-/// A vector of `Expr::Column` expressions, that sits at `indices` of the 
`input_schema`.
+/// A vector of `Expr::Column` expressions residing at `indices` of the 
`input_schema`.
 fn get_required_exprs(input_schema: &Arc<DFSchema>, indices: &[usize]) -> 
Vec<Expr> {
     let fields = input_schema.fields();
     indices
@@ -614,58 +718,70 @@ fn get_required_exprs(input_schema: &Arc<DFSchema>, 
indices: &[usize]) -> Vec<Ex
         .collect()
 }
 
-/// Get indices of the necessary fields referred by all of the `exprs` among 
input LogicalPlan.
+/// Get indices of the fields referred to by any expression in `exprs` within
+/// the given schema (`input_schema`).
 ///
 /// # Arguments
 ///
-/// * `input`: The input logical plan to analyze for index requirements.
-/// * `exprs`: An iterator of expressions for which we want to find necessary 
field indices at the input.
+/// * `input_schema`: The input schema to analyze for index requirements.
+/// * `exprs`: An iterator of expressions for which we want to find necessary
+///   field indices.
 ///
 /// # Returns
 ///
-/// A [Result] object that contains the required field indices for the `input` 
operator, to be able to calculate
-/// successfully all of the `exprs`.
-fn indices_referred_by_exprs<'a, I: Iterator<Item = &'a Expr>>(
-    input: &LogicalPlan,
-    exprs: I,
+/// A [`Result`] object containing the indices of all required fields in
+/// `input_schema` to calculate all `exprs` successfully.
+fn indices_referred_by_exprs<'a>(
+    input_schema: &DFSchemaRef,
+    exprs: impl Iterator<Item = &'a Expr>,
 ) -> Result<Vec<usize>> {
-    let new_indices = exprs
-        .flat_map(|expr| indices_referred_by_expr(input.schema(), expr))
+    let indices = exprs
+        .map(|expr| indices_referred_by_expr(input_schema, expr))
+        .collect::<Result<Vec<_>>>()?;
+    Ok(indices
+        .into_iter()
         .flatten()
-        // Make sure no duplicate entries exists and indices are ordered.
+        // Make sure no duplicate entries exist and indices are ordered:
         .sorted()
         .dedup()
-        .collect::<Vec<_>>();
-    Ok(new_indices)
+        .collect())
 }
 
-/// Get indices of the necessary fields referred by the `expr` among input 
schema.
+/// Get indices of the fields referred to by the given expression `expr` within
+/// the given schema (`input_schema`).
 ///
-/// # Arguments
+/// # Parameters
 ///
-/// * `input_schema`: The input schema to search for indices referred by expr.
-/// * `expr`: An expression for which we want to find necessary field indices 
at the input schema.
+/// * `input_schema`: The input schema to analyze for index requirements.
+/// * `expr`: An expression for which we want to find necessary field indices.
 ///
 /// # Returns
 ///
-/// A [Result] object that contains the required field indices of the 
`input_schema`, to be able to calculate
-/// the `expr` successfully.
+/// A [`Result`] object containing the indices of all required fields in
+/// `input_schema` to calculate `expr` successfully.
 fn indices_referred_by_expr(
     input_schema: &DFSchemaRef,
     expr: &Expr,
 ) -> Result<Vec<usize>> {
     let mut cols = expr.to_columns()?;
-    // Get outer referenced columns (expr.to_columns() doesn't return these 
columns).
-    cols.extend(outer_columns(expr));
-    cols.iter()
-        .filter(|&col| input_schema.has_column(col))
-        .map(|col| input_schema.index_of_column(col))
-        .collect::<Result<Vec<_>>>()
+    // Get outer-referenced columns:
+    if let Some(outer_cols) = outer_columns(expr) {
+        cols.extend(outer_cols);
+    } else {
+        // Expression is not known to contain outer columns or not. Hence, do
+        // not assume anything and require all the schema indices at the input:
+        return Ok((0..input_schema.fields().len()).collect());
+    }
+    Ok(cols
+        .iter()
+        .flat_map(|col| input_schema.index_of_column(col))
+        .collect())
 }
 
-/// Get all required indices for the input (indices required by parent + 
indices referred by `exprs`)
+/// Gets all required indices for the input; i.e. those required by the parent
+/// and those referred to by `exprs`.
 ///
-/// # Arguments
+/// # Parameters
 ///
 /// * `parent_required_indices` - A slice of indices required by the parent 
plan.
 /// * `input` - The input logical plan to analyze for index requirements.
@@ -673,30 +789,28 @@ fn indices_referred_by_expr(
 ///
 /// # Returns
 ///
-/// A `Result` containing a vector of `usize` indices containing all required 
indices.
-fn get_all_required_indices<'a, I: Iterator<Item = &'a Expr>>(
+/// A `Result` containing a vector of `usize` indices containing all the 
required
+/// indices.
+fn get_all_required_indices<'a>(
     parent_required_indices: &[usize],
     input: &LogicalPlan,
-    exprs: I,
+    exprs: impl Iterator<Item = &'a Expr>,
 ) -> Result<Vec<usize>> {
-    let referred_indices = indices_referred_by_exprs(input, exprs)?;
-    Ok(merge_vectors(parent_required_indices, &referred_indices))
+    indices_referred_by_exprs(input.schema(), exprs)
+        .map(|indices| merge_slices(parent_required_indices, &indices))
 }
 
-/// Retrieves a list of expressions at specified indices from a slice of 
expressions.
+/// Retrieves the expressions at specified indices within the given slice. 
Ignores
+/// any invalid indices.
 ///
-/// This function takes a slice of expressions `exprs` and a slice of `usize` 
indices `indices`.
-/// It returns a new vector containing the expressions from `exprs` that 
correspond to the provided indices (with bound check).
+/// # Parameters
 ///
-/// # Arguments
-///
-/// * `exprs` - A slice of expressions from which expressions are to be 
retrieved.
-/// * `indices` - A slice of `usize` indices specifying the positions of the 
expressions to be retrieved.
+/// * `exprs` - A slice of expressions to index into.
+/// * `indices` - A slice of indices specifying the positions of expressions 
sought.
 ///
 /// # Returns
 ///
-/// A vector of expressions that correspond to the specified indices. If any 
index is out of bounds,
-/// the associated expression is skipped in the result.
+/// A vector of expressions corresponding to specified indices.
 fn get_at_indices(exprs: &[Expr], indices: &[usize]) -> Vec<Expr> {
     indices
         .iter()
@@ -705,158 +819,148 @@ fn get_at_indices(exprs: &[Expr], indices: &[usize]) -> 
Vec<Expr> {
         .collect()
 }
 
-/// Merges two slices of `usize` values into a single vector with sorted 
(ascending) and deduplicated elements.
-///
-/// # Arguments
-///
-/// * `lhs` - The first slice of `usize` values to be merged.
-/// * `rhs` - The second slice of `usize` values to be merged.
-///
-/// # Returns
-///
-/// A vector of `usize` values containing the merged, sorted, and deduplicated 
elements from `lhs` and `rhs`.
-/// As an example merge of [3, 2, 4] and [3, 6, 1] will produce [1, 2, 3, 6]
-fn merge_vectors(lhs: &[usize], rhs: &[usize]) -> Vec<usize> {
-    let mut merged = lhs.to_vec();
-    merged.extend(rhs);
-    // Make sure to run sort before dedup.
-    // Dedup removes consecutive same entries
-    // If sort is run before it, all duplicates are removed.
-    merged.sort();
-    merged.dedup();
-    merged
+/// Merges two slices into a single vector with sorted (ascending) and
+/// deduplicated elements. For example, merging `[3, 2, 4]` and `[3, 6, 1]`
+/// will produce `[1, 2, 3, 6]`.
+fn merge_slices<T: Clone + Ord>(left: &[T], right: &[T]) -> Vec<T> {
+    // Make sure to sort before deduping, which removes the duplicates:
+    left.iter()
+        .cloned()
+        .chain(right.iter().cloned())
+        .sorted()
+        .dedup()
+        .collect()
 }
 
-/// Splits requirement indices for a join into left and right children based 
on the join type.
+/// Splits requirement indices for a join into left and right children based on
+/// the join type.
 ///
-/// This function takes the length of the left child, a slice of requirement 
indices, and the type
-/// of join (e.g., INNER, LEFT, RIGHT, etc.) as arguments. Depending on the 
join type, it divides
-/// the requirement indices into those that apply to the left child and those 
that apply to the right child.
+/// This function takes the length of the left child, a slice of requirement
+/// indices, and the type of join (e.g. `INNER`, `LEFT`, `RIGHT`) as arguments.
+/// Depending on the join type, it divides the requirement indices into those
+/// that apply to the left child and those that apply to the right child.
 ///
-/// - For INNER, LEFT, RIGHT, and FULL joins, the requirements are split 
between left and right children.
-///   The right child indices are adjusted to point to valid positions in the 
right child by subtracting
-///   the length of the left child.
+/// - For `INNER`, `LEFT`, `RIGHT` and `FULL` joins, the requirements are split
+///   between left and right children. The right child indices are adjusted to
+///   point to valid positions within the right child by subtracting the length
+///   of the left child.
 ///
-/// - For LEFT ANTI, LEFT SEMI, RIGHT SEMI, and RIGHT ANTI joins, all 
requirements are re-routed to either
-///   the left child or the right child directly, depending on the join type.
+/// - For `LEFT ANTI`, `LEFT SEMI`, `RIGHT SEMI` and `RIGHT ANTI` joins, all
+///   requirements are re-routed to either the left child or the right child
+///   directly, depending on the join type.
 ///
-/// # Arguments
+/// # Parameters
 ///
 /// * `left_len` - The length of the left child.
 /// * `indices` - A slice of requirement indices.
-/// * `join_type` - The type of join (e.g., INNER, LEFT, RIGHT, etc.).
+/// * `join_type` - The type of join (e.g. `INNER`, `LEFT`, `RIGHT`).
 ///
 /// # Returns
 ///
-/// A tuple containing two vectors of `usize` indices: the first vector 
represents the requirements for
-/// the left child, and the second vector represents the requirements for the 
right child. The indices
-/// are appropriately split and adjusted based on the join type.
+/// A tuple containing two vectors of `usize` indices: The first vector 
represents
+/// the requirements for the left child, and the second vector represents the
+/// requirements for the right child. The indices are appropriately split and
+/// adjusted based on the join type.
 fn split_join_requirements(
     left_len: usize,
     indices: &[usize],
     join_type: &JoinType,
 ) -> (Vec<usize>, Vec<usize>) {
     match join_type {
-        // In these cases requirements split to left and right child.
+        // In these cases requirements are split between left/right children:
         JoinType::Inner | JoinType::Left | JoinType::Right | JoinType::Full => 
{
-            let (left_child_reqs, mut right_child_reqs): (Vec<usize>, 
Vec<usize>) =
+            let (left_reqs, mut right_reqs): (Vec<usize>, Vec<usize>) =
                 indices.iter().partition(|&&idx| idx < left_len);
-            // Decrease right side index by `left_len` so that they point to 
valid positions in the right child.
-            right_child_reqs.iter_mut().for_each(|idx| *idx -= left_len);
-            (left_child_reqs, right_child_reqs)
+            // Decrease right side indices by `left_len` so that they point to 
valid
+            // positions within the right child:
+            for idx in right_reqs.iter_mut() {
+                *idx -= left_len;
+            }
+            (left_reqs, right_reqs)
         }
         // All requirements can be re-routed to left child directly.
         JoinType::LeftAnti | JoinType::LeftSemi => (indices.to_vec(), vec![]),
-        // All requirements can be re-routed to right side directly. (No need 
to change index, join schema is right child schema.)
+        // All requirements can be re-routed to right side directly.
+        // No need to change index, join schema is right child schema.
         JoinType::RightSemi | JoinType::RightAnti => (vec![], 
indices.to_vec()),
     }
 }
 
-/// Adds a projection on top of a logical plan if it is beneficial and reduces 
the number of columns for the parent operator.
+/// Adds a projection on top of a logical plan if doing so reduces the number
+/// of columns for the parent operator.
 ///
-/// This function takes a `LogicalPlan`, a list of projection expressions, and 
a flag indicating whether
-/// the projection is beneficial. If the projection is beneficial and reduces 
the number of columns in
-/// the plan, a new `LogicalPlan` with the projection is created and returned, 
along with a `true` flag.
-/// If the projection is unnecessary or doesn't reduce the number of columns, 
the original plan is returned
-/// with a `false` flag.
+/// This function takes a `LogicalPlan` and a list of projection expressions.
+/// If the projection is beneficial (it reduces the number of columns in the
+/// plan) a new `LogicalPlan` with the projection is created and returned, 
along
+/// with a `true` flag. If the projection doesn't reduce the number of columns,
+/// the original plan is returned with a `false` flag.
 ///
-/// # Arguments
+/// # Parameters
 ///
 /// * `plan` - The input `LogicalPlan` to potentially add a projection to.
 /// * `project_exprs` - A list of expressions for the projection.
-/// * `projection_beneficial` - A flag indicating whether the projection is 
beneficial.
 ///
 /// # Returns
 ///
-/// A `Result` containing a tuple with two values: the resulting `LogicalPlan` 
(with or without
-/// the added projection) and a `bool` flag indicating whether the projection 
was added (`true`) or not (`false`).
+/// A `Result` containing a tuple with two values: The resulting `LogicalPlan`
+/// (with or without the added projection) and a `bool` flag indicating if a
+/// projection was added (`true`) or not (`false`).
 fn add_projection_on_top_if_helpful(
     plan: LogicalPlan,
     project_exprs: Vec<Expr>,
-    projection_beneficial: bool,
 ) -> Result<(LogicalPlan, bool)> {
-    // Make sure projection decreases table column size, otherwise it is 
unnecessary.
-    if !projection_beneficial || project_exprs.len() >= 
plan.schema().fields().len() {
+    // Make sure projection decreases the number of columns, otherwise it is 
unnecessary.
+    if project_exprs.len() >= plan.schema().fields().len() {
         Ok((plan, false))
     } else {
-        let new_plan = Projection::try_new(project_exprs, Arc::new(plan))
-            .map(LogicalPlan::Projection)?;
-        Ok((new_plan, true))
+        Projection::try_new(project_exprs, Arc::new(plan))
+            .map(|proj| (LogicalPlan::Projection(proj), true))
     }
 }
 
-/// Collects and returns a vector of all indices of the fields in the schema 
of a logical plan.
+/// Rewrite the given projection according to the fields required by its
+/// ancestors.
 ///
-/// # Arguments
+/// # Parameters
 ///
-/// * `plan` - A reference to the `LogicalPlan` for which indices are required.
+/// * `proj` - A reference to the original projection to rewrite.
+/// * `config` - A reference to the optimizer configuration.
+/// * `indices` - A slice of indices representing the columns required by the
+///   ancestors of the given projection.
 ///
 /// # Returns
 ///
-/// A vector of `usize` indices representing all fields in the schema of the 
provided logical plan.
-fn require_all_indices(plan: &LogicalPlan) -> Vec<usize> {
-    (0..plan.schema().fields().len()).collect()
-}
-
-/// Rewrite Projection Given Required fields by its parent(s).
-///
-/// # Arguments
-///
-/// * `proj` - A reference to the original projection to be rewritten.
-/// * `_config` - A reference to the optimizer configuration (unused in the 
function).
-/// * `indices` - A slice of indices representing the required columns by the 
parent(s) of projection.
-///
-/// # Returns
+/// A `Result` object with the following semantics:
 ///
-/// A `Result` containing an `Option` of the rewritten logical plan. If the
-/// rewrite is successful, it returns `Some` with the optimized logical plan.
-/// If the logical plan remains unchanged it returns `Ok(None)`.
+/// - `Ok(Some(LogicalPlan))`: Contains the rewritten projection
+/// - `Ok(None)`: No rewrite necessary.
+/// - `Err(error)`: An error occured during the function call.
 fn rewrite_projection_given_requirements(
     proj: &Projection,
-    _config: &dyn OptimizerConfig,
+    config: &dyn OptimizerConfig,
     indices: &[usize],
 ) -> Result<Option<LogicalPlan>> {
     let exprs_used = get_at_indices(&proj.expr, indices);
-    let required_indices = indices_referred_by_exprs(&proj.input, 
exprs_used.iter())?;
+    let required_indices =
+        indices_referred_by_exprs(proj.input.schema(), exprs_used.iter())?;
     return if let Some(input) =
-        optimize_projections(&proj.input, _config, &required_indices)?
+        optimize_projections(&proj.input, config, &required_indices)?
     {
         if &projection_schema(&input, &exprs_used)? == input.schema() {
             Ok(Some(input))
         } else {
-            let new_proj = Projection::try_new(exprs_used, Arc::new(input))?;
-            let new_proj = LogicalPlan::Projection(new_proj);
-            Ok(Some(new_proj))
+            Projection::try_new(exprs_used, Arc::new(input))
+                .map(|proj| Some(LogicalPlan::Projection(proj)))
         }
     } else if exprs_used.len() < proj.expr.len() {
-        // Projection expression used is different than the existing projection
-        // In this case, even if child doesn't change we should update 
projection to use less columns.
+        // Projection expression used is different than the existing 
projection.
+        // In this case, even if the child doesn't change, we should update the
+        // projection to use fewer columns:
         if &projection_schema(&proj.input, &exprs_used)? == 
proj.input.schema() {
             Ok(Some(proj.input.as_ref().clone()))
         } else {
-            let new_proj = Projection::try_new(exprs_used, 
proj.input.clone())?;
-            let new_proj = LogicalPlan::Projection(new_proj);
-            Ok(Some(new_proj))
+            Projection::try_new(exprs_used, proj.input.clone())
+                .map(|proj| Some(LogicalPlan::Projection(proj)))
         }
     } else {
         // Projection doesn't change.
@@ -866,16 +970,16 @@ fn rewrite_projection_given_requirements(
 
 #[cfg(test)]
 mod tests {
+    use std::sync::Arc;
+
     use crate::optimize_projections::OptimizeProjections;
+    use crate::test::{assert_optimized_plan_eq, test_table_scan};
     use arrow::datatypes::{DataType, Field, Schema};
     use datafusion_common::{Result, TableReference};
     use datafusion_expr::{
         binary_expr, col, count, lit, 
logical_plan::builder::LogicalPlanBuilder,
         table_scan, Expr, LogicalPlan, Operator,
     };
-    use std::sync::Arc;
-
-    use crate::test::*;
 
     fn assert_optimized_plan_equal(plan: &LogicalPlan, expected: &str) -> 
Result<()> {
         assert_optimized_plan_eq(Arc::new(OptimizeProjections::new()), plan, 
expected)
@@ -920,6 +1024,20 @@ mod tests {
         \n  TableScan: test projection=[a]";
         assert_optimized_plan_equal(&plan, expected)
     }
+
+    #[test]
+    fn merge_nested_alias() -> Result<()> {
+        let table_scan = test_table_scan()?;
+        let plan = LogicalPlanBuilder::from(table_scan)
+            .project(vec![col("a").alias("alias1").alias("alias2")])?
+            .project(vec![col("alias2").alias("alias")])?
+            .build()?;
+
+        let expected = "Projection: test.a AS alias\
+        \n  TableScan: test projection=[a]";
+        assert_optimized_plan_equal(&plan, expected)
+    }
+
     #[test]
     fn test_nested_count() -> Result<()> {
         let schema = Schema::new(vec![Field::new("foo", DataType::Int32, 
false)]);
diff --git a/datafusion/optimizer/src/optimizer.rs 
b/datafusion/optimizer/src/optimizer.rs
index 7af46ed70a..0dc34cb809 100644
--- a/datafusion/optimizer/src/optimizer.rs
+++ b/datafusion/optimizer/src/optimizer.rs
@@ -17,6 +17,10 @@
 
 //! Query optimizer traits
 
+use std::collections::HashSet;
+use std::sync::Arc;
+use std::time::Instant;
+
 use crate::common_subexpr_eliminate::CommonSubexprEliminate;
 use crate::decorrelate_predicate_subquery::DecorrelatePredicateSubquery;
 use crate::eliminate_cross_join::EliminateCrossJoin;
@@ -41,15 +45,14 @@ use crate::simplify_expressions::SimplifyExpressions;
 use crate::single_distinct_to_groupby::SingleDistinctToGroupBy;
 use crate::unwrap_cast_in_comparison::UnwrapCastInComparison;
 use crate::utils::log_plan;
-use chrono::{DateTime, Utc};
+
 use datafusion_common::alias::AliasGenerator;
 use datafusion_common::config::ConfigOptions;
 use datafusion_common::{DataFusionError, Result};
-use datafusion_expr::LogicalPlan;
+use datafusion_expr::logical_plan::LogicalPlan;
+
+use chrono::{DateTime, Utc};
 use log::{debug, warn};
-use std::collections::HashSet;
-use std::sync::Arc;
-use std::time::Instant;
 
 /// `OptimizerRule` transforms one [`LogicalPlan`] into another which
 /// computes the same results, but in a potentially more efficient
@@ -447,17 +450,18 @@ pub(crate) fn assert_schema_is_the_same(
 
 #[cfg(test)]
 mod tests {
+    use std::sync::{Arc, Mutex};
+
+    use super::ApplyOrder;
     use crate::optimizer::Optimizer;
     use crate::test::test_table_scan;
     use crate::{OptimizerConfig, OptimizerContext, OptimizerRule};
+
     use datafusion_common::{
         plan_err, DFField, DFSchema, DFSchemaRef, DataFusionError, Result,
     };
     use datafusion_expr::logical_plan::EmptyRelation;
     use datafusion_expr::{col, lit, LogicalPlan, LogicalPlanBuilder, 
Projection};
-    use std::sync::{Arc, Mutex};
-
-    use super::ApplyOrder;
 
     #[test]
     fn skip_failing_rule() {
diff --git a/datafusion/optimizer/tests/optimizer_integration.rs 
b/datafusion/optimizer/tests/optimizer_integration.rs
index 4172881c0a..d857c6154e 100644
--- a/datafusion/optimizer/tests/optimizer_integration.rs
+++ b/datafusion/optimizer/tests/optimizer_integration.rs
@@ -15,8 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
+use std::any::Any;
+use std::collections::HashMap;
+use std::sync::Arc;
+
 use arrow::datatypes::{DataType, Field, Schema, SchemaRef, TimeUnit};
-use chrono::{DateTime, NaiveDateTime, Utc};
 use datafusion_common::config::ConfigOptions;
 use datafusion_common::{plan_err, DataFusionError, Result};
 use datafusion_expr::{AggregateUDF, LogicalPlan, ScalarUDF, TableSource, 
WindowUDF};
@@ -28,9 +31,8 @@ use datafusion_sql::sqlparser::ast::Statement;
 use datafusion_sql::sqlparser::dialect::GenericDialect;
 use datafusion_sql::sqlparser::parser::Parser;
 use datafusion_sql::TableReference;
-use std::any::Any;
-use std::collections::HashMap;
-use std::sync::Arc;
+
+use chrono::{DateTime, NaiveDateTime, Utc};
 
 #[cfg(test)]
 #[ctor::ctor]
diff --git a/datafusion/sql/src/select.rs b/datafusion/sql/src/select.rs
index 15f720d756..a0819e4aaf 100644
--- a/datafusion/sql/src/select.rs
+++ b/datafusion/sql/src/select.rs
@@ -25,10 +25,7 @@ use crate::utils::{
 };
 
 use datafusion_common::Column;
-use datafusion_common::{
-    get_target_functional_dependencies, not_impl_err, plan_err, DFSchemaRef,
-    DataFusionError, Result,
-};
+use datafusion_common::{not_impl_err, plan_err, DataFusionError, Result};
 use datafusion_expr::expr::Alias;
 use datafusion_expr::expr_rewriter::{
     normalize_col, normalize_col_with_schemas_and_ambiguity_check,
@@ -534,14 +531,17 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         group_by_exprs: &[Expr],
         aggr_exprs: &[Expr],
     ) -> Result<(LogicalPlan, Vec<Expr>, Option<Expr>)> {
-        let group_by_exprs =
-            get_updated_group_by_exprs(group_by_exprs, select_exprs, 
input.schema())?;
-
         // create the aggregate plan
         let plan = LogicalPlanBuilder::from(input.clone())
-            .aggregate(group_by_exprs.clone(), aggr_exprs.to_vec())?
+            .aggregate(group_by_exprs.to_vec(), aggr_exprs.to_vec())?
             .build()?;
 
+        let group_by_exprs = if let LogicalPlan::Aggregate(agg) = &plan {
+            &agg.group_expr
+        } else {
+            unreachable!();
+        };
+
         // in this next section of code we are re-writing the projection to 
refer to columns
         // output by the aggregate plan. For example, if the projection 
contains the expression
         // `SUM(a)` then we replace that with a reference to a column `SUM(a)` 
produced by
@@ -550,7 +550,7 @@ impl<'a, S: ContextProvider> SqlToRel<'a, S> {
         // combine the original grouping and aggregate expressions into one 
list (note that
         // we do not add the "having" expression since that is not part of the 
projection)
         let mut aggr_projection_exprs = vec![];
-        for expr in &group_by_exprs {
+        for expr in group_by_exprs {
             match expr {
                 Expr::GroupingSet(GroupingSet::Rollup(exprs)) => {
                     aggr_projection_exprs.extend_from_slice(exprs)
@@ -659,61 +659,3 @@ fn match_window_definitions(
     }
     Ok(())
 }
-
-/// Update group by exprs, according to functional dependencies
-/// The query below
-///
-/// SELECT sn, amount
-/// FROM sales_global
-/// GROUP BY sn
-///
-/// cannot be calculated, because it has a column(`amount`) which is not
-/// part of group by expression.
-/// However, if we know that, `sn` is determinant of `amount`. We can
-/// safely, determine value of `amount` for each distinct `sn`. For these cases
-/// we rewrite the query above as
-///
-/// SELECT sn, amount
-/// FROM sales_global
-/// GROUP BY sn, amount
-///
-/// Both queries, are functionally same. \[Because, (`sn`, `amount`) and (`sn`)
-/// defines the identical groups. \]
-/// This function updates group by expressions such that select expressions 
that are
-/// not in group by expression, are added to the group by expressions if they 
are dependent
-/// of the sub-set of group by expressions.
-fn get_updated_group_by_exprs(
-    group_by_exprs: &[Expr],
-    select_exprs: &[Expr],
-    schema: &DFSchemaRef,
-) -> Result<Vec<Expr>> {
-    let mut new_group_by_exprs = group_by_exprs.to_vec();
-    let fields = schema.fields();
-    let group_by_expr_names = group_by_exprs
-        .iter()
-        .map(|group_by_expr| group_by_expr.display_name())
-        .collect::<Result<Vec<_>>>()?;
-    // Get targets that can be used in a select, even if they do not occur in 
aggregation:
-    if let Some(target_indices) =
-        get_target_functional_dependencies(schema, &group_by_expr_names)
-    {
-        // Calculate dependent fields names with determinant GROUP BY 
expression:
-        let associated_field_names = target_indices
-            .iter()
-            .map(|idx| fields[*idx].qualified_name())
-            .collect::<Vec<_>>();
-        // Expand GROUP BY expressions with select expressions: If a GROUP
-        // BY expression is a determinant key, we can use its dependent
-        // columns in select statements also.
-        for expr in select_exprs {
-            let expr_name = format!("{}", expr);
-            if !new_group_by_exprs.contains(expr)
-                && associated_field_names.contains(&expr_name)
-            {
-                new_group_by_exprs.push(expr.clone());
-            }
-        }
-    }
-
-    Ok(new_group_by_exprs)
-}
diff --git a/datafusion/sqllogictest/test_files/groupby.slt 
b/datafusion/sqllogictest/test_files/groupby.slt
index 1d6d7dc671..5248ac8c85 100644
--- a/datafusion/sqllogictest/test_files/groupby.slt
+++ b/datafusion/sqllogictest/test_files/groupby.slt
@@ -3211,6 +3211,21 @@ SELECT s.sn, s.amount, 2*s.sn
 3 200 6
 4 100 8
 
+# we should be able to re-write group by expression
+# using functional dependencies for complex expressions also.
+# In this case, we use 2*s.amount instead of s.amount.
+query IRI
+SELECT s.sn, 2*s.amount, 2*s.sn
+  FROM sales_global_with_pk AS s
+  GROUP BY sn
+  ORDER BY sn
+----
+0 60 0
+1 100 2
+2 150 4
+3 400 6
+4 200 8
+
 query IRI
 SELECT s.sn, s.amount, 2*s.sn
   FROM sales_global_with_pk_alternate AS s
@@ -3364,7 +3379,7 @@ SELECT column1, COUNT(*) as column2 FROM (VALUES (['a', 
'b'], 1), (['c', 'd', 'e
 
 
 # primary key should be aware from which columns it is associated
-statement error DataFusion error: Error during planning: Projection references 
non-aggregate values: Expression r.sn could not be resolved from available 
columns: l.sn, SUM\(l.amount\)
+statement error DataFusion error: Error during planning: Projection references 
non-aggregate values: Expression r.sn could not be resolved from available 
columns: l.sn, l.zip_code, l.country, l.ts, l.currency, l.amount, 
SUM\(l.amount\)
 SELECT l.sn, r.sn, SUM(l.amount), r.amount
   FROM sales_global_with_pk AS l
   JOIN sales_global_with_pk AS r
@@ -3456,7 +3471,7 @@ ORDER BY r.sn
 4 100 2022-01-03T10:00:00
 
 # after join, new window expressions shouldn't be associated with primary keys
-statement error DataFusion error: Error during planning: Projection references 
non-aggregate values: Expression rn1 could not be resolved from available 
columns: r.sn, SUM\(r.amount\)
+statement error DataFusion error: Error during planning: Projection references 
non-aggregate values: Expression rn1 could not be resolved from available 
columns: r.sn, r.ts, r.amount, SUM\(r.amount\)
 SELECT r.sn, SUM(r.amount), rn1
 FROM
   (SELECT r.ts, r.sn, r.amount,
@@ -3784,6 +3799,192 @@ AggregateExec: mode=FinalPartitioned, gby=[c@0 as c, 
b@1 as b], aggr=[SUM(multip
 ----------RepartitionExec: partitioning=RoundRobinBatch(8), input_partitions=1
 ------------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, 
d], output_ordering=[c@1 ASC NULLS LAST], has_header=true
 
+statement ok
+set datafusion.execution.target_partitions = 1;
+
+query TT
+EXPLAIN SELECT c, sum1
+  FROM
+    (SELECT c, b, a, SUM(d) as sum1
+    FROM multiple_ordered_table_with_pk
+    GROUP BY c)
+GROUP BY c;
+----
+logical_plan
+Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, sum1]], aggr=[[]]
+--Projection: multiple_ordered_table_with_pk.c, 
SUM(multiple_ordered_table_with_pk.d) AS sum1
+----Aggregate: groupBy=[[multiple_ordered_table_with_pk.c]], 
aggr=[[SUM(CAST(multiple_ordered_table_with_pk.d AS Int64))]]
+------TableScan: multiple_ordered_table_with_pk projection=[c, d]
+physical_plan
+AggregateExec: mode=Single, gby=[c@0 as c, sum1@1 as sum1], aggr=[], 
ordering_mode=PartiallySorted([0])
+--ProjectionExec: expr=[c@0 as c, SUM(multiple_ordered_table_with_pk.d)@1 as 
sum1]
+----AggregateExec: mode=Single, gby=[c@0 as c], 
aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=Sorted
+------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], 
output_ordering=[c@0 ASC NULLS LAST], has_header=true
+
+query TT
+EXPLAIN SELECT c, sum1, SUM(b) OVER() as sumb
+  FROM
+    (SELECT c, b, a, SUM(d) as sum1
+    FROM multiple_ordered_table_with_pk
+    GROUP BY c);
+----
+logical_plan
+Projection: multiple_ordered_table_with_pk.c, sum1, 
SUM(multiple_ordered_table_with_pk.b) ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING AS sumb
+--WindowAggr: windowExpr=[[SUM(CAST(multiple_ordered_table_with_pk.b AS 
Int64)) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
+----Projection: multiple_ordered_table_with_pk.c, 
multiple_ordered_table_with_pk.b, SUM(multiple_ordered_table_with_pk.d) AS sum1
+------Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, 
multiple_ordered_table_with_pk.b]], 
aggr=[[SUM(CAST(multiple_ordered_table_with_pk.d AS Int64))]]
+--------TableScan: multiple_ordered_table_with_pk projection=[b, c, d]
+physical_plan
+ProjectionExec: expr=[c@0 as c, sum1@2 as sum1, 
SUM(multiple_ordered_table_with_pk.b) ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING@3 as sumb]
+--WindowAggExec: wdw=[SUM(multiple_ordered_table_with_pk.b) ROWS BETWEEN 
UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: 
"SUM(multiple_ordered_table_with_pk.b) ROWS BETWEEN UNBOUNDED PRECEDING AND 
UNBOUNDED FOLLOWING", data_type: Int64, nullable: true, dict_id: 0, 
dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, 
start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }]
+----ProjectionExec: expr=[c@0 as c, b@1 as b, 
SUM(multiple_ordered_table_with_pk.d)@2 as sum1]
+------AggregateExec: mode=Single, gby=[c@1 as c, b@0 as b], 
aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
+--------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, 
d], output_ordering=[c@1 ASC NULLS LAST], has_header=true
+
+query TT
+EXPLAIN SELECT lhs.c, rhs.c, lhs.sum1, rhs.sum1
+  FROM
+    (SELECT c, b, a, SUM(d) as sum1
+    FROM multiple_ordered_table_with_pk
+    GROUP BY c) as lhs
+  JOIN
+    (SELECT c, b, a, SUM(d) as sum1
+    FROM multiple_ordered_table_with_pk
+    GROUP BY c) as rhs
+  ON lhs.b=rhs.b;
+----
+logical_plan
+Projection: lhs.c, rhs.c, lhs.sum1, rhs.sum1
+--Inner Join: lhs.b = rhs.b
+----SubqueryAlias: lhs
+------Projection: multiple_ordered_table_with_pk.c, 
multiple_ordered_table_with_pk.b, SUM(multiple_ordered_table_with_pk.d) AS sum1
+--------Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, 
multiple_ordered_table_with_pk.b]], 
aggr=[[SUM(CAST(multiple_ordered_table_with_pk.d AS Int64))]]
+----------TableScan: multiple_ordered_table_with_pk projection=[b, c, d]
+----SubqueryAlias: rhs
+------Projection: multiple_ordered_table_with_pk.c, 
multiple_ordered_table_with_pk.b, SUM(multiple_ordered_table_with_pk.d) AS sum1
+--------Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, 
multiple_ordered_table_with_pk.b]], 
aggr=[[SUM(CAST(multiple_ordered_table_with_pk.d AS Int64))]]
+----------TableScan: multiple_ordered_table_with_pk projection=[b, c, d]
+physical_plan
+ProjectionExec: expr=[c@0 as c, c@3 as c, sum1@2 as sum1, sum1@5 as sum1]
+--CoalesceBatchesExec: target_batch_size=2
+----HashJoinExec: mode=CollectLeft, join_type=Inner, on=[(b@1, b@1)]
+------ProjectionExec: expr=[c@0 as c, b@1 as b, 
SUM(multiple_ordered_table_with_pk.d)@2 as sum1]
+--------AggregateExec: mode=Single, gby=[c@1 as c, b@0 as b], 
aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
+----------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, 
d], output_ordering=[c@1 ASC NULLS LAST], has_header=true
+------ProjectionExec: expr=[c@0 as c, b@1 as b, 
SUM(multiple_ordered_table_with_pk.d)@2 as sum1]
+--------AggregateExec: mode=Single, gby=[c@1 as c, b@0 as b], 
aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=PartiallySorted([0])
+----------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[b, c, 
d], output_ordering=[c@1 ASC NULLS LAST], has_header=true
+
+query TT
+EXPLAIN SELECT lhs.c, rhs.c, lhs.sum1, rhs.sum1
+  FROM
+    (SELECT c, b, a, SUM(d) as sum1
+    FROM multiple_ordered_table_with_pk
+    GROUP BY c) as lhs
+  CROSS JOIN
+    (SELECT c, b, a, SUM(d) as sum1
+    FROM multiple_ordered_table_with_pk
+    GROUP BY c) as rhs;
+----
+logical_plan
+Projection: lhs.c, rhs.c, lhs.sum1, rhs.sum1
+--CrossJoin:
+----SubqueryAlias: lhs
+------Projection: multiple_ordered_table_with_pk.c, 
SUM(multiple_ordered_table_with_pk.d) AS sum1
+--------Aggregate: groupBy=[[multiple_ordered_table_with_pk.c]], 
aggr=[[SUM(CAST(multiple_ordered_table_with_pk.d AS Int64))]]
+----------TableScan: multiple_ordered_table_with_pk projection=[c, d]
+----SubqueryAlias: rhs
+------Projection: multiple_ordered_table_with_pk.c, 
SUM(multiple_ordered_table_with_pk.d) AS sum1
+--------Aggregate: groupBy=[[multiple_ordered_table_with_pk.c]], 
aggr=[[SUM(CAST(multiple_ordered_table_with_pk.d AS Int64))]]
+----------TableScan: multiple_ordered_table_with_pk projection=[c, d]
+physical_plan
+ProjectionExec: expr=[c@0 as c, c@2 as c, sum1@1 as sum1, sum1@3 as sum1]
+--CrossJoinExec
+----ProjectionExec: expr=[c@0 as c, SUM(multiple_ordered_table_with_pk.d)@1 as 
sum1]
+------AggregateExec: mode=Single, gby=[c@0 as c], 
aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=Sorted
+--------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], 
output_ordering=[c@0 ASC NULLS LAST], has_header=true
+----ProjectionExec: expr=[c@0 as c, SUM(multiple_ordered_table_with_pk.d)@1 as 
sum1]
+------AggregateExec: mode=Single, gby=[c@0 as c], 
aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=Sorted
+--------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[c, d], 
output_ordering=[c@0 ASC NULLS LAST], has_header=true
+
+# we do not generate physical plan for Repartition yet (e.g Distribute By 
queries).
+query TT
+EXPLAIN SELECT a, b, sum1
+FROM (SELECT c, b, a, SUM(d) as sum1
+   FROM multiple_ordered_table_with_pk
+   GROUP BY c)
+DISTRIBUTE BY a
+----
+logical_plan
+Repartition: DistributeBy(a)
+--Projection: multiple_ordered_table_with_pk.a, 
multiple_ordered_table_with_pk.b, SUM(multiple_ordered_table_with_pk.d) AS sum1
+----Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, 
multiple_ordered_table_with_pk.a, multiple_ordered_table_with_pk.b]], 
aggr=[[SUM(CAST(multiple_ordered_table_with_pk.d AS Int64))]]
+------TableScan: multiple_ordered_table_with_pk projection=[a, b, c, d]
+
+# union with aggregate
+query TT
+EXPLAIN SELECT c, a, SUM(d) as sum1
+ FROM multiple_ordered_table_with_pk
+ GROUP BY c
+UNION ALL
+ SELECT c, a, SUM(d) as sum1
+ FROM multiple_ordered_table_with_pk
+ GROUP BY c
+----
+logical_plan
+Union
+--Projection: multiple_ordered_table_with_pk.c, 
multiple_ordered_table_with_pk.a, SUM(multiple_ordered_table_with_pk.d) AS sum1
+----Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, 
multiple_ordered_table_with_pk.a]], 
aggr=[[SUM(CAST(multiple_ordered_table_with_pk.d AS Int64))]]
+------TableScan: multiple_ordered_table_with_pk projection=[a, c, d]
+--Projection: multiple_ordered_table_with_pk.c, 
multiple_ordered_table_with_pk.a, SUM(multiple_ordered_table_with_pk.d) AS sum1
+----Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, 
multiple_ordered_table_with_pk.a]], 
aggr=[[SUM(CAST(multiple_ordered_table_with_pk.d AS Int64))]]
+------TableScan: multiple_ordered_table_with_pk projection=[a, c, d]
+physical_plan
+UnionExec
+--ProjectionExec: expr=[c@0 as c, a@1 as a, 
SUM(multiple_ordered_table_with_pk.d)@2 as sum1]
+----AggregateExec: mode=Single, gby=[c@1 as c, a@0 as a], 
aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=Sorted
+------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, 
d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], 
has_header=true
+--ProjectionExec: expr=[c@0 as c, a@1 as a, 
SUM(multiple_ordered_table_with_pk.d)@2 as sum1]
+----AggregateExec: mode=Single, gby=[c@1 as c, a@0 as a], 
aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=Sorted
+------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, 
d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], 
has_header=true
+
+# table scan should be simplified.
+query TT
+EXPLAIN SELECT c, a, SUM(d) as sum1
+ FROM multiple_ordered_table_with_pk
+ GROUP BY c
+----
+logical_plan
+Projection: multiple_ordered_table_with_pk.c, 
multiple_ordered_table_with_pk.a, SUM(multiple_ordered_table_with_pk.d) AS sum1
+--Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, 
multiple_ordered_table_with_pk.a]], 
aggr=[[SUM(CAST(multiple_ordered_table_with_pk.d AS Int64))]]
+----TableScan: multiple_ordered_table_with_pk projection=[a, c, d]
+physical_plan
+ProjectionExec: expr=[c@0 as c, a@1 as a, 
SUM(multiple_ordered_table_with_pk.d)@2 as sum1]
+--AggregateExec: mode=Single, gby=[c@1 as c, a@0 as a], 
aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=Sorted
+----CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, 
d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], 
has_header=true
+
+# limit should be simplified
+query TT
+EXPLAIN SELECT *
+ FROM (SELECT c, a, SUM(d) as sum1
+   FROM multiple_ordered_table_with_pk
+   GROUP BY c
+   LIMIT 5)
+----
+logical_plan
+Projection: multiple_ordered_table_with_pk.c, 
multiple_ordered_table_with_pk.a, SUM(multiple_ordered_table_with_pk.d) AS sum1
+--Limit: skip=0, fetch=5
+----Aggregate: groupBy=[[multiple_ordered_table_with_pk.c, 
multiple_ordered_table_with_pk.a]], 
aggr=[[SUM(CAST(multiple_ordered_table_with_pk.d AS Int64))]]
+------TableScan: multiple_ordered_table_with_pk projection=[a, c, d]
+physical_plan
+ProjectionExec: expr=[c@0 as c, a@1 as a, 
SUM(multiple_ordered_table_with_pk.d)@2 as sum1]
+--GlobalLimitExec: skip=0, fetch=5
+----AggregateExec: mode=Single, gby=[c@1 as c, a@0 as a], 
aggr=[SUM(multiple_ordered_table_with_pk.d)], ordering_mode=Sorted
+------CsvExec: file_groups={1 group: 
[[WORKSPACE_ROOT/datafusion/core/tests/data/window_2.csv]]}, projection=[a, c, 
d], output_orderings=[[a@0 ASC NULLS LAST], [c@1 ASC NULLS LAST]], 
has_header=true
+
+statement ok
+set datafusion.execution.target_partitions = 8;
+
 # Tests for single distinct to group by optimization rule
 statement ok
 CREATE TABLE t(x int) AS VALUES (1), (2), (1);

Reply via email to