vbarua commented on code in PR #13931:
URL: https://github.com/apache/datafusion/pull/13931#discussion_r1898690260


##########
datafusion/substrait/src/logical_plan/producer.rs:
##########
@@ -101,14 +105,330 @@ use substrait::{
     version,
 };
 
-use super::state::SubstraitPlanningState;
+/// This trait is used to produce Substrait plans, converting them from 
DataFusion Logical Plans.
+/// It can be implemented by users to allow for custom handling of relations, 
expressions, etc.
+///
+/// Combined with the [crate::logical_plan::consumer::SubstraitConsumer] this 
allows for fully
+/// customizable Substrait serde.
+///
+/// # Example Usage
+///
+/// ```
+/// # use std::sync::Arc;
+/// # use substrait::proto::{Expression, Rel};
+/// # use substrait::proto::rel::RelType;
+/// # use datafusion::common::DFSchemaRef;
+/// # use datafusion::error::Result;
+/// # use datafusion::execution::SessionState;
+/// # use datafusion::logical_expr::{Between, Extension, Projection};
+/// # use datafusion_substrait::extensions::Extensions;
+/// # use datafusion_substrait::logical_plan::producer::{from_projection, 
SubstraitProducer};
+///
+/// struct CustomSubstraitProducer {
+///     extensions: Extensions,
+///     state: Arc<SessionState>,
+/// }
+///
+/// impl SubstraitProducer for CustomSubstraitProducer {
+///
+///     fn register_function(&mut self, signature: String) -> u32 {
+///        self.extensions.register_function(signature)
+///     }
+///
+///     fn get_extensions(self) -> Extensions {
+///         self.extensions
+///     }
+///
+///     // You can set additional metadata on the Rels you produce
+///     fn consume_projection(&mut self, plan: &Projection) -> 
Result<Box<Rel>> {
+///         let mut rel = from_projection(self, plan)?;
+///         match rel.rel_type {
+///             Some(RelType::Project(mut project)) => {
+///                 let mut project = project.clone();
+///                 // set common metadata or advanced extension
+///                 project.common = None;
+///                 project.advanced_extension = None;
+///                 Ok(Box::new(Rel {
+///                     rel_type: Some(RelType::Project(project)),
+///                 }))
+///             }
+///             rel_type => Ok(Box::new(Rel { rel_type })),
+///        }
+///     }
+///
+///     // You can tweak how you convert expressions for your target system
+///     fn consume_between(&mut self, between: &Between, schema: &DFSchemaRef) 
-> Result<Expression> {
+///        // add your own encoding for Between
+///        todo!()
+///    }
+///
+///     // You can fully control how you convert UserDefinedLogicalNodes into 
Substrait
+///     fn consume_extension(&mut self, _plan: &Extension) -> Result<Box<Rel>> 
{
+///         // implement your own serializer into Substrait
+///        todo!()
+///    }
+/// }
+/// ```
+pub trait SubstraitProducer: Send + Sync + Sized {
+    /// Within a Substrait plan, functions are referenced using function 
anchors that are stored at
+    /// the top level of the [Plan] within
+    /// 
[ExtensionFunction](substrait::proto::extensions::simple_extension_declaration::ExtensionFunction)
+    /// messages.
+    ///
+    /// When given a function signature, this method should return the 
existing anchor for it if
+    /// there is one. Otherwise, it should generate a new anchor.
+    fn register_function(&mut self, signature: String) -> u32;
+
+    /// Consume the producer to generate the [Extensions] for the Substrait 
plan based on the
+    /// functions that have been registered
+    fn get_extensions(self) -> Extensions;
+
+    // Logical Plan Methods
+    // There is one method per LogicalPlan to allow for easy overriding of 
producer behaviour.
+    // These methods have default implementations calling the common handler 
code, to allow for users
+    // to re-use common handling logic.
+
+    fn consume_plan(&mut self, plan: &LogicalPlan) -> Result<Box<Rel>> {
+        to_substrait_rel(self, plan)
+    }

Review Comment:
   Even though this is the SubstraitProducer, I used `consume` as the verb for 
the API as it consumes DataFusion and produces Substrait.
   
   I though about using `produce_plan`, `produce_projection`, etc but found 
that pattern a little weird reading-wise.
   
   For example does `produce_between` create a Substrait Between expression 
(which does not exist), or does it convert a Between expression into a 
Substrait equivalent. Because DataFusion relations and expressions don't map 
1-1 with Substrait, I found it easier to think of this as consuming DataFusion. 
Just my 2 cents.



##########
datafusion/substrait/src/logical_plan/producer.rs:
##########
@@ -450,220 +799,176 @@ pub fn to_substrait_rel(
                 }))),
             }))
         }
-        LogicalPlan::Join(join) => {
-            let left = to_substrait_rel(join.left.as_ref(), state, 
extensions)?;
-            let right = to_substrait_rel(join.right.as_ref(), state, 
extensions)?;
-            let join_type = to_substrait_jointype(join.join_type);
-            // we only support basic joins so return an error for anything not 
yet supported
-            match join.join_constraint {
-                JoinConstraint::On => {}
-                JoinConstraint::Using => {
-                    return not_impl_err!("join constraint: `using`")
-                }
-            }
-            // parse filter if exists
-            let in_join_schema = join.left.schema().join(join.right.schema())?;
-            let join_filter = match &join.filter {
-                Some(filter) => Some(to_substrait_rex(
-                    state,
-                    filter,
-                    &Arc::new(in_join_schema),
-                    0,
-                    extensions,
-                )?),
-                None => None,
-            };
+        Distinct::On(_) => not_impl_err!("Cannot convert Distinct::On"),
+    }
+}
 
-            // map the left and right columns to binary expressions in the 
form `l = r`
-            // build a single expression for the ON condition, such as `l.a = 
r.a AND l.b = r.b`
-            let eq_op = if join.null_equals_null {
-                Operator::IsNotDistinctFrom
-            } else {
-                Operator::Eq
-            };
-            let join_on = to_substrait_join_expr(
-                state,
-                &join.on,
-                eq_op,
-                join.left.schema(),
-                join.right.schema(),
-                extensions,
-            )?;
-
-            // create conjunction between `join_on` and `join_filter` to embed 
all join conditions,
-            // whether equal or non-equal in a single expression
-            let join_expr = match &join_on {
-                Some(on_expr) => match &join_filter {
-                    Some(filter) => Some(Box::new(make_binary_op_scalar_func(
-                        on_expr,
-                        filter,
-                        Operator::And,
-                        extensions,
-                    ))),
-                    None => join_on.map(Box::new), // the join expression will 
only contain `join_on` if filter doesn't exist
-                },
-                None => match &join_filter {
-                    Some(_) => join_filter.map(Box::new), // the join 
expression will only contain `join_filter` if the `on` condition doesn't exist
-                    None => None,
-                },
-            };
+pub fn from_join(producer: &mut impl SubstraitProducer, join: &Join) -> 
Result<Box<Rel>> {
+    let left = producer.consume_plan(join.left.as_ref())?;
+    let right = producer.consume_plan(join.right.as_ref())?;
+    let join_type = to_substrait_jointype(join.join_type);
+    // we only support basic joins so return an error for anything not yet 
supported
+    match join.join_constraint {
+        JoinConstraint::On => {}
+        JoinConstraint::Using => return not_impl_err!("join constraint: 
`using`"),
+    }
+    let in_join_schema = 
Arc::new(join.left.schema().join(join.right.schema())?);
 
-            Ok(Box::new(Rel {
-                rel_type: Some(RelType::Join(Box::new(JoinRel {
-                    common: None,
-                    left: Some(left),
-                    right: Some(right),
-                    r#type: join_type as i32,
-                    expression: join_expr,
-                    post_join_filter: None,
-                    advanced_extension: None,
-                }))),
-            }))
-        }
-        LogicalPlan::SubqueryAlias(alias) => {
-            // Do nothing if encounters SubqueryAlias
-            // since there is no corresponding relation type in Substrait
-            to_substrait_rel(alias.input.as_ref(), state, extensions)
-        }
-        LogicalPlan::Union(union) => {
-            let input_rels = union
-                .inputs
-                .iter()
-                .map(|input| to_substrait_rel(input.as_ref(), state, 
extensions))
-                .collect::<Result<Vec<_>>>()?
-                .into_iter()
-                .map(|ptr| *ptr)
-                .collect();
-            Ok(Box::new(Rel {
-                rel_type: Some(RelType::Set(SetRel {
-                    common: None,
-                    inputs: input_rels,
-                    op: set_rel::SetOp::UnionAll as i32, // UNION DISTINCT 
gets translated to AGGREGATION + UNION ALL
-                    advanced_extension: None,
-                })),
-            }))
-        }
-        LogicalPlan::Window(window) => {
-            let input = to_substrait_rel(window.input.as_ref(), state, 
extensions)?;
+    // convert filter if present
+    let join_filter = match &join.filter {
+        Some(filter) => Some(to_substrait_rex(producer, filter, 
&in_join_schema)?),
+        None => None,
+    };
 
-            // create a field reference for each input field
-            let mut expressions = (0..window.input.schema().fields().len())
-                .map(substrait_field_ref)
-                .collect::<Result<Vec<_>>>()?;
+    // map the left and right columns to binary expressions in the form `l = r`
+    // build a single expression for the ON condition, such as `l.a = r.a AND 
l.b = r.b`
+    let eq_op = if join.null_equals_null {
+        Operator::IsNotDistinctFrom
+    } else {
+        Operator::Eq
+    };
+    let join_on = to_substrait_join_expr(producer, &join.on, eq_op, 
&in_join_schema)?;
+
+    // create conjunction between `join_on` and `join_filter` to embed all 
join conditions,
+    // whether equal or non-equal in a single expression
+    let join_expr = match &join_on {
+        Some(on_expr) => match &join_filter {
+            Some(filter) => Some(Box::new(make_binary_op_scalar_func(
+                producer,
+                on_expr,
+                filter,
+                Operator::And,
+            ))),
+            None => join_on.map(Box::new), // the join expression will only 
contain `join_on` if filter doesn't exist
+        },
+        None => match &join_filter {
+            Some(_) => join_filter.map(Box::new), // the join expression will 
only contain `join_filter` if the `on` condition doesn't exist
+            None => None,
+        },
+    };
 
-            // process and add each window function expression
-            for expr in &window.window_expr {
-                expressions.push(to_substrait_rex(
-                    state,
-                    expr,
-                    window.input.schema(),
-                    0,
-                    extensions,
-                )?);
-            }
+    Ok(Box::new(Rel {
+        rel_type: Some(RelType::Join(Box::new(JoinRel {
+            common: None,
+            left: Some(left),
+            right: Some(right),
+            r#type: join_type as i32,
+            expression: join_expr,
+            post_join_filter: None,
+            advanced_extension: None,
+        }))),
+    }))
+}
 
-            let emit_kind = create_project_remapping(
-                expressions.len(),
-                window.input.schema().fields().len(),
-            );
-            let common = RelCommon {
-                emit_kind: Some(emit_kind),
-                hint: None,
-                advanced_extension: None,
-            };
-            let project_rel = Box::new(ProjectRel {
-                common: Some(common),
-                input: Some(input),
-                expressions,
-                advanced_extension: None,
-            });
+pub fn from_subquery_alias(
+    producer: &mut impl SubstraitProducer,
+    alias: &SubqueryAlias,
+) -> Result<Box<Rel>> {
+    // Do nothing if encounters SubqueryAlias
+    // since there is no corresponding relation type in Substrait
+    producer.consume_plan(alias.input.as_ref())
+}
 
-            Ok(Box::new(Rel {
-                rel_type: Some(RelType::Project(project_rel)),
-            }))
+pub fn from_union(
+    producer: &mut impl SubstraitProducer,
+    union: &Union,
+) -> Result<Box<Rel>> {
+    let input_rels = union
+        .inputs
+        .iter()
+        .map(|input| producer.consume_plan(input.as_ref()))
+        .collect::<Result<Vec<_>>>()?
+        .into_iter()
+        .map(|ptr| *ptr)
+        .collect();
+    Ok(Box::new(Rel {
+        rel_type: Some(RelType::Set(SetRel {
+            common: None,
+            inputs: input_rels,
+            op: set_rel::SetOp::UnionAll as i32, // UNION DISTINCT gets 
translated to AGGREGATION + UNION ALL
+            advanced_extension: None,
+        })),
+    }))
+}
+
+pub fn from_window(
+    producer: &mut impl SubstraitProducer,
+    window: &Window,
+) -> Result<Box<Rel>> {
+    let input = producer.consume_plan(window.input.as_ref())?;
+
+    // create a field reference for each input field
+    let mut expressions = (0..window.input.schema().fields().len())
+        .map(substrait_field_ref)
+        .collect::<Result<Vec<_>>>()?;
+
+    // process and add each window function expression
+    for expr in &window.window_expr {
+        expressions.push(producer.consume_expr(expr, window.input.schema())?);
+    }
+
+    let emit_kind =
+        create_project_remapping(expressions.len(), 
window.input.schema().fields().len());
+    let common = RelCommon {
+        emit_kind: Some(emit_kind),
+        hint: None,
+        advanced_extension: None,
+    };
+    let project_rel = Box::new(ProjectRel {
+        common: Some(common),
+        input: Some(input),
+        expressions,
+        advanced_extension: None,
+    });
+
+    Ok(Box::new(Rel {
+        rel_type: Some(RelType::Project(project_rel)),
+    }))
+}
+
+pub fn from_repartition(
+    producer: &mut impl SubstraitProducer,
+    repartition: &Repartition,
+) -> Result<Box<Rel>> {
+    let input = producer.consume_plan(repartition.input.as_ref())?;
+    let partition_count = match repartition.partitioning_scheme {
+        Partitioning::RoundRobinBatch(num) => num,
+        Partitioning::Hash(_, num) => num,
+        Partitioning::DistributeBy(_) => {
+            return not_impl_err!(
+                "Physical plan does not support DistributeBy partitioning"
+            )
         }
-        LogicalPlan::Repartition(repartition) => {
-            let input = to_substrait_rel(repartition.input.as_ref(), state, 
extensions)?;
-            let partition_count = match repartition.partitioning_scheme {
-                Partitioning::RoundRobinBatch(num) => num,
-                Partitioning::Hash(_, num) => num,
-                Partitioning::DistributeBy(_) => {
-                    return not_impl_err!(
-                        "Physical plan does not support DistributeBy 
partitioning"
-                    )
-                }
-            };
-            // ref: 
https://substrait.io/relations/physical_relations/#exchange-types
-            let exchange_kind = match &repartition.partitioning_scheme {
-                Partitioning::RoundRobinBatch(_) => {
-                    ExchangeKind::RoundRobin(RoundRobin::default())
-                }
-                Partitioning::Hash(exprs, _) => {
-                    let fields = exprs
-                        .iter()
-                        .map(|e| {
-                            try_to_substrait_field_reference(
-                                e,
-                                repartition.input.schema(),
-                            )
-                        })
-                        .collect::<Result<Vec<_>>>()?;
-                    ExchangeKind::ScatterByFields(ScatterFields { fields })
-                }
-                Partitioning::DistributeBy(_) => {
-                    return not_impl_err!(
-                        "Physical plan does not support DistributeBy 
partitioning"
-                    )
-                }
-            };
-            let exchange_rel = ExchangeRel {
-                common: None,
-                input: Some(input),
-                exchange_kind: Some(exchange_kind),
-                advanced_extension: None,
-                partition_count: partition_count as i32,
-                targets: vec![],
-            };
-            Ok(Box::new(Rel {
-                rel_type: Some(RelType::Exchange(Box::new(exchange_rel))),
-            }))
+    };
+    // ref: https://substrait.io/relations/physical_relations/#exchange-types
+    let exchange_kind = match &repartition.partitioning_scheme {
+        Partitioning::RoundRobinBatch(_) => {
+            ExchangeKind::RoundRobin(RoundRobin::default())
         }
-        LogicalPlan::Extension(extension_plan) => {
-            let extension_bytes = state
-                .serializer_registry()
-                .serialize_logical_plan(extension_plan.node.as_ref())?;
-            let detail = ProtoAny {
-                type_url: extension_plan.node.name().to_string(),
-                value: extension_bytes.into(),
-            };
-            let mut inputs_rel = extension_plan
-                .node
-                .inputs()
-                .into_iter()
-                .map(|plan| to_substrait_rel(plan, state, extensions))
+        Partitioning::Hash(exprs, _) => {
+            let fields = exprs
+                .iter()
+                .map(|e| try_to_substrait_field_reference(e, 
repartition.input.schema()))
                 .collect::<Result<Vec<_>>>()?;
-            let rel_type = match inputs_rel.len() {
-                0 => RelType::ExtensionLeaf(ExtensionLeafRel {
-                    common: None,
-                    detail: Some(detail),
-                }),
-                1 => RelType::ExtensionSingle(Box::new(ExtensionSingleRel {
-                    common: None,
-                    detail: Some(detail),
-                    input: Some(inputs_rel.pop().unwrap()),
-                })),
-                _ => RelType::ExtensionMulti(ExtensionMultiRel {
-                    common: None,
-                    detail: Some(detail),
-                    inputs: inputs_rel.into_iter().map(|r| *r).collect(),
-                }),
-            };
-            Ok(Box::new(Rel {
-                rel_type: Some(rel_type),
-            }))

Review Comment:
   Moved up into the `DefaultSubstraitProducer`



##########
datafusion/substrait/src/logical_plan/producer.rs:
##########
@@ -185,257 +501,290 @@ pub fn to_substrait_extended_expr(
     }))
 }
 
-/// Convert DataFusion LogicalPlan to Substrait Rel
-#[allow(deprecated)]
 pub fn to_substrait_rel(
+    producer: &mut impl SubstraitProducer,
     plan: &LogicalPlan,
-    state: &dyn SubstraitPlanningState,
-    extensions: &mut Extensions,
 ) -> Result<Box<Rel>> {
     match plan {
-        LogicalPlan::TableScan(scan) => {
-            let projection = scan.projection.as_ref().map(|p| {
-                p.iter()
-                    .map(|i| StructItem {
-                        field: *i as i32,
-                        child: None,
-                    })
-                    .collect()
-            });
+        LogicalPlan::Projection(plan) => producer.consume_projection(plan),
+        LogicalPlan::Filter(plan) => producer.consume_filter(plan),
+        LogicalPlan::Window(plan) => producer.consume_window(plan),
+        LogicalPlan::Aggregate(plan) => producer.consume_aggregate(plan),
+        LogicalPlan::Sort(plan) => producer.consume_sort(plan),
+        LogicalPlan::Join(plan) => producer.consume_join(plan),
+        LogicalPlan::Repartition(plan) => producer.consume_repartition(plan),
+        LogicalPlan::Union(plan) => producer.consume_union(plan),
+        LogicalPlan::TableScan(plan) => producer.consume_table_scan(plan),
+        LogicalPlan::EmptyRelation(plan) => 
producer.consume_empty_relation(plan),
+        LogicalPlan::SubqueryAlias(plan) => 
producer.consume_subquery_alias(plan),
+        LogicalPlan::Limit(plan) => producer.consume_limit(plan),
+        LogicalPlan::Values(plan) => producer.consume_values(plan),
+        LogicalPlan::Distinct(plan) => producer.consume_distinct(plan),
+        LogicalPlan::Extension(plan) => producer.consume_extension(plan),
+        _ => not_impl_err!("Unsupported plan type: {plan:?}")?,
+    }
+}
 
-            let projection = projection.map(|struct_items| MaskExpression {
-                select: Some(StructSelect { struct_items }),
-                maintain_singular_struct: false,
-            });
+pub fn from_table_scan(
+    _producer: &mut impl SubstraitProducer,

Review Comment:
   This currently isn't used. However, in the future we're likely going to want 
to use this producer when converting the DataFusion schema into Substrait, 
especially after the logical type work lands and we can potentially add 
user-define logical types.



##########
datafusion/substrait/src/logical_plan/producer.rs:
##########
@@ -101,14 +105,330 @@ use substrait::{
     version,
 };
 
-use super::state::SubstraitPlanningState;
+/// This trait is used to produce Substrait plans, converting them from 
DataFusion Logical Plans.
+/// It can be implemented by users to allow for custom handling of relations, 
expressions, etc.
+///
+/// Combined with the [crate::logical_plan::consumer::SubstraitConsumer] this 
allows for fully
+/// customizable Substrait serde.
+///
+/// # Example Usage
+///
+/// ```
+/// # use std::sync::Arc;
+/// # use substrait::proto::{Expression, Rel};
+/// # use substrait::proto::rel::RelType;
+/// # use datafusion::common::DFSchemaRef;
+/// # use datafusion::error::Result;
+/// # use datafusion::execution::SessionState;
+/// # use datafusion::logical_expr::{Between, Extension, Projection};
+/// # use datafusion_substrait::extensions::Extensions;
+/// # use datafusion_substrait::logical_plan::producer::{from_projection, 
SubstraitProducer};
+///
+/// struct CustomSubstraitProducer {
+///     extensions: Extensions,
+///     state: Arc<SessionState>,
+/// }
+///
+/// impl SubstraitProducer for CustomSubstraitProducer {
+///
+///     fn register_function(&mut self, signature: String) -> u32 {
+///        self.extensions.register_function(signature)
+///     }
+///
+///     fn get_extensions(self) -> Extensions {
+///         self.extensions
+///     }
+///
+///     // You can set additional metadata on the Rels you produce
+///     fn consume_projection(&mut self, plan: &Projection) -> 
Result<Box<Rel>> {
+///         let mut rel = from_projection(self, plan)?;
+///         match rel.rel_type {
+///             Some(RelType::Project(mut project)) => {
+///                 let mut project = project.clone();
+///                 // set common metadata or advanced extension
+///                 project.common = None;
+///                 project.advanced_extension = None;
+///                 Ok(Box::new(Rel {
+///                     rel_type: Some(RelType::Project(project)),
+///                 }))
+///             }
+///             rel_type => Ok(Box::new(Rel { rel_type })),
+///        }
+///     }
+///
+///     // You can tweak how you convert expressions for your target system
+///     fn consume_between(&mut self, between: &Between, schema: &DFSchemaRef) 
-> Result<Expression> {
+///        // add your own encoding for Between
+///        todo!()
+///    }
+///
+///     // You can fully control how you convert UserDefinedLogicalNodes into 
Substrait
+///     fn consume_extension(&mut self, _plan: &Extension) -> Result<Box<Rel>> 
{
+///         // implement your own serializer into Substrait
+///        todo!()
+///    }
+/// }
+/// ```
+pub trait SubstraitProducer: Send + Sync + Sized {
+    /// Within a Substrait plan, functions are referenced using function 
anchors that are stored at
+    /// the top level of the [Plan] within
+    /// 
[ExtensionFunction](substrait::proto::extensions::simple_extension_declaration::ExtensionFunction)
+    /// messages.
+    ///
+    /// When given a function signature, this method should return the 
existing anchor for it if
+    /// there is one. Otherwise, it should generate a new anchor.
+    fn register_function(&mut self, signature: String) -> u32;
+
+    /// Consume the producer to generate the [Extensions] for the Substrait 
plan based on the
+    /// functions that have been registered
+    fn get_extensions(self) -> Extensions;
+
+    // Logical Plan Methods
+    // There is one method per LogicalPlan to allow for easy overriding of 
producer behaviour.
+    // These methods have default implementations calling the common handler 
code, to allow for users
+    // to re-use common handling logic.
+
+    fn consume_plan(&mut self, plan: &LogicalPlan) -> Result<Box<Rel>> {
+        to_substrait_rel(self, plan)
+    }
+
+    fn consume_projection(&mut self, plan: &Projection) -> Result<Box<Rel>> {
+        from_projection(self, plan)
+    }
+
+    fn consume_filter(&mut self, plan: &Filter) -> Result<Box<Rel>> {
+        from_filter(self, plan)
+    }
+
+    fn consume_window(&mut self, plan: &Window) -> Result<Box<Rel>> {
+        from_window(self, plan)
+    }
+
+    fn consume_aggregate(&mut self, plan: &Aggregate) -> Result<Box<Rel>> {
+        from_aggregate(self, plan)
+    }
+
+    fn consume_sort(&mut self, plan: &Sort) -> Result<Box<Rel>> {
+        from_sort(self, plan)
+    }
+
+    fn consume_join(&mut self, plan: &Join) -> Result<Box<Rel>> {
+        from_join(self, plan)
+    }
+
+    fn consume_repartition(&mut self, plan: &Repartition) -> Result<Box<Rel>> {
+        from_repartition(self, plan)
+    }
+
+    fn consume_union(&mut self, plan: &Union) -> Result<Box<Rel>> {
+        from_union(self, plan)
+    }
+
+    fn consume_table_scan(&mut self, plan: &TableScan) -> Result<Box<Rel>> {
+        from_table_scan(self, plan)
+    }
+
+    fn consume_empty_relation(&mut self, plan: &EmptyRelation) -> 
Result<Box<Rel>> {
+        from_empty_relation(plan)
+    }
+
+    fn consume_subquery_alias(&mut self, plan: &SubqueryAlias) -> 
Result<Box<Rel>> {
+        from_subquery_alias(self, plan)
+    }
+
+    fn consume_limit(&mut self, plan: &Limit) -> Result<Box<Rel>> {
+        from_limit(self, plan)
+    }
+
+    fn consume_values(&mut self, plan: &Values) -> Result<Box<Rel>> {
+        from_values(self, plan)
+    }
+
+    fn consume_distinct(&mut self, plan: &Distinct) -> Result<Box<Rel>> {
+        from_distinct(self, plan)
+    }
+
+    fn consume_extension(&mut self, _plan: &Extension) -> Result<Box<Rel>> {
+        substrait_err!("Specify handling for LogicalPlan::Extension by 
implementing the SubstraitProducer trait")
+    }
+
+    // Expression Methods
+    // There is one method per DataFusion Expr to allow for easy overriding of 
producer behaviour
+    // These methods have default implementations calling the common handler 
code, to allow for users
+    // to re-use common handling logic.
+
+    fn consume_expr(&mut self, expr: &Expr, schema: &DFSchemaRef) -> 
Result<Expression> {
+        to_substrait_rex(self, expr, schema)
+    }
+
+    fn consume_alias(
+        &mut self,
+        alias: &Alias,
+        schema: &DFSchemaRef,
+    ) -> Result<Expression> {
+        from_alias(self, alias, schema)
+    }
+
+    fn consume_column(
+        &mut self,
+        column: &Column,
+        schema: &DFSchemaRef,
+    ) -> Result<Expression> {
+        from_column(self, column, schema)
+    }
+
+    fn consume_literal(&mut self, value: &ScalarValue) -> Result<Expression> {
+        from_literal(self, value)
+    }
+
+    fn consume_binary_expr(
+        &mut self,
+        expr: &BinaryExpr,
+        schema: &DFSchemaRef,
+    ) -> Result<Expression> {
+        from_binary_expr(self, expr, schema)
+    }
+
+    fn consume_like(&mut self, like: &Like, schema: &DFSchemaRef) -> 
Result<Expression> {
+        from_like(self, like, schema)
+    }
+
+    /// For handling Not, IsNotNull, IsNull, IsTrue, IsFalse, IsUnknown, 
IsNotTrue, IsNotFalse, IsNotUnknown, Negative
+    fn consume_unary_expr(
+        &mut self,
+        expr: &Expr,
+        schema: &DFSchemaRef,
+    ) -> Result<Expression> {
+        from_unary_expr(self, expr, schema)
+    }
+
+    fn consume_between(
+        &mut self,
+        between: &Between,
+        schema: &DFSchemaRef,
+    ) -> Result<Expression> {
+        from_between(self, between, schema)
+    }
+
+    fn consume_case(&mut self, case: &Case, schema: &DFSchemaRef) -> 
Result<Expression> {
+        from_case(self, case, schema)
+    }
+
+    fn consume_cast(&mut self, cast: &Cast, schema: &DFSchemaRef) -> 
Result<Expression> {
+        from_cast(self, cast, schema)
+    }
+
+    fn consume_try_cast(
+        &mut self,
+        cast: &TryCast,
+        schema: &DFSchemaRef,
+    ) -> Result<Expression> {
+        from_try_cast(self, cast, schema)
+    }
+
+    fn consume_scalar_function(
+        &mut self,
+        scalar_fn: &expr::ScalarFunction,
+        schema: &DFSchemaRef,
+    ) -> Result<Expression> {
+        from_scalar_function(self, scalar_fn, schema)
+    }
+
+    fn consume_aggregate_function(
+        &mut self,
+        agg_fn: &expr::AggregateFunction,
+        schema: &DFSchemaRef,
+    ) -> Result<Measure> {
+        from_aggregate_function(self, agg_fn, schema)
+    }
+
+    fn consume_window_function(
+        &mut self,
+        window_fn: &WindowFunction,
+        schema: &DFSchemaRef,
+    ) -> Result<Expression> {
+        from_window_function(self, window_fn, schema)
+    }
+
+    fn consume_in_list(
+        &mut self,
+        in_list: &InList,
+        schema: &DFSchemaRef,
+    ) -> Result<Expression> {
+        from_in_list(self, in_list, schema)
+    }
+
+    fn consume_in_subquery(
+        &mut self,
+        in_subquery: &InSubquery,
+        schema: &DFSchemaRef,
+    ) -> Result<Expression> {
+        from_in_subquery(self, in_subquery, schema)
+    }
+}
+
+struct DefaultSubstraitProducer<'a> {
+    extensions: Extensions,
+    state: &'a SessionState,
+}
+
+impl<'a> DefaultSubstraitProducer<'a> {
+    pub fn new(state: &'a SessionState) -> Self {
+        DefaultSubstraitProducer {
+            extensions: Extensions::default(),
+            state,
+        }
+    }
+}
+
+impl SubstraitProducer for DefaultSubstraitProducer<'_> {
+    fn register_function(&mut self, fn_name: String) -> u32 {
+        self.extensions.register_function(fn_name)
+    }
+
+    fn get_extensions(self) -> Extensions {
+        self.extensions
+    }
+
+    fn consume_extension(&mut self, plan: &Extension) -> Result<Box<Rel>> {

Review Comment:
   The following was copied from the existing code for handling 
LogicalPlan::Extension nodes found later on.



##########
datafusion/substrait/src/logical_plan/producer.rs:
##########
@@ -101,14 +105,330 @@ use substrait::{
     version,
 };
 
-use super::state::SubstraitPlanningState;
+/// This trait is used to produce Substrait plans, converting them from 
DataFusion Logical Plans.
+/// It can be implemented by users to allow for custom handling of relations, 
expressions, etc.
+///
+/// Combined with the [crate::logical_plan::consumer::SubstraitConsumer] this 
allows for fully
+/// customizable Substrait serde.
+///
+/// # Example Usage
+///
+/// ```
+/// # use std::sync::Arc;
+/// # use substrait::proto::{Expression, Rel};
+/// # use substrait::proto::rel::RelType;
+/// # use datafusion::common::DFSchemaRef;
+/// # use datafusion::error::Result;
+/// # use datafusion::execution::SessionState;
+/// # use datafusion::logical_expr::{Between, Extension, Projection};
+/// # use datafusion_substrait::extensions::Extensions;
+/// # use datafusion_substrait::logical_plan::producer::{from_projection, 
SubstraitProducer};
+///
+/// struct CustomSubstraitProducer {
+///     extensions: Extensions,
+///     state: Arc<SessionState>,
+/// }
+///
+/// impl SubstraitProducer for CustomSubstraitProducer {
+///
+///     fn register_function(&mut self, signature: String) -> u32 {
+///        self.extensions.register_function(signature)
+///     }
+///
+///     fn get_extensions(self) -> Extensions {
+///         self.extensions
+///     }
+///
+///     // You can set additional metadata on the Rels you produce
+///     fn consume_projection(&mut self, plan: &Projection) -> 
Result<Box<Rel>> {
+///         let mut rel = from_projection(self, plan)?;
+///         match rel.rel_type {
+///             Some(RelType::Project(mut project)) => {
+///                 let mut project = project.clone();
+///                 // set common metadata or advanced extension
+///                 project.common = None;
+///                 project.advanced_extension = None;
+///                 Ok(Box::new(Rel {
+///                     rel_type: Some(RelType::Project(project)),
+///                 }))
+///             }
+///             rel_type => Ok(Box::new(Rel { rel_type })),
+///        }
+///     }
+///
+///     // You can tweak how you convert expressions for your target system
+///     fn consume_between(&mut self, between: &Between, schema: &DFSchemaRef) 
-> Result<Expression> {
+///        // add your own encoding for Between
+///        todo!()
+///    }
+///
+///     // You can fully control how you convert UserDefinedLogicalNodes into 
Substrait
+///     fn consume_extension(&mut self, _plan: &Extension) -> Result<Box<Rel>> 
{
+///         // implement your own serializer into Substrait
+///        todo!()
+///    }
+/// }
+/// ```
+pub trait SubstraitProducer: Send + Sync + Sized {
+    /// Within a Substrait plan, functions are referenced using function 
anchors that are stored at
+    /// the top level of the [Plan] within
+    /// 
[ExtensionFunction](substrait::proto::extensions::simple_extension_declaration::ExtensionFunction)
+    /// messages.
+    ///
+    /// When given a function signature, this method should return the 
existing anchor for it if
+    /// there is one. Otherwise, it should generate a new anchor.
+    fn register_function(&mut self, signature: String) -> u32;
+
+    /// Consume the producer to generate the [Extensions] for the Substrait 
plan based on the
+    /// functions that have been registered
+    fn get_extensions(self) -> Extensions;
+
+    // Logical Plan Methods
+    // There is one method per LogicalPlan to allow for easy overriding of 
producer behaviour.
+    // These methods have default implementations calling the common handler 
code, to allow for users
+    // to re-use common handling logic.
+
+    fn consume_plan(&mut self, plan: &LogicalPlan) -> Result<Box<Rel>> {
+        to_substrait_rel(self, plan)
+    }
+
+    fn consume_projection(&mut self, plan: &Projection) -> Result<Box<Rel>> {
+        from_projection(self, plan)
+    }
+
+    fn consume_filter(&mut self, plan: &Filter) -> Result<Box<Rel>> {
+        from_filter(self, plan)
+    }
+
+    fn consume_window(&mut self, plan: &Window) -> Result<Box<Rel>> {
+        from_window(self, plan)
+    }
+
+    fn consume_aggregate(&mut self, plan: &Aggregate) -> Result<Box<Rel>> {
+        from_aggregate(self, plan)
+    }
+
+    fn consume_sort(&mut self, plan: &Sort) -> Result<Box<Rel>> {
+        from_sort(self, plan)
+    }
+
+    fn consume_join(&mut self, plan: &Join) -> Result<Box<Rel>> {
+        from_join(self, plan)
+    }
+
+    fn consume_repartition(&mut self, plan: &Repartition) -> Result<Box<Rel>> {
+        from_repartition(self, plan)
+    }
+
+    fn consume_union(&mut self, plan: &Union) -> Result<Box<Rel>> {
+        from_union(self, plan)
+    }
+
+    fn consume_table_scan(&mut self, plan: &TableScan) -> Result<Box<Rel>> {
+        from_table_scan(self, plan)
+    }
+
+    fn consume_empty_relation(&mut self, plan: &EmptyRelation) -> 
Result<Box<Rel>> {
+        from_empty_relation(plan)
+    }
+
+    fn consume_subquery_alias(&mut self, plan: &SubqueryAlias) -> 
Result<Box<Rel>> {
+        from_subquery_alias(self, plan)
+    }
+
+    fn consume_limit(&mut self, plan: &Limit) -> Result<Box<Rel>> {
+        from_limit(self, plan)
+    }
+
+    fn consume_values(&mut self, plan: &Values) -> Result<Box<Rel>> {
+        from_values(self, plan)
+    }
+
+    fn consume_distinct(&mut self, plan: &Distinct) -> Result<Box<Rel>> {
+        from_distinct(self, plan)
+    }
+
+    fn consume_extension(&mut self, _plan: &Extension) -> Result<Box<Rel>> {
+        substrait_err!("Specify handling for LogicalPlan::Extension by 
implementing the SubstraitProducer trait")
+    }
+
+    // Expression Methods
+    // There is one method per DataFusion Expr to allow for easy overriding of 
producer behaviour
+    // These methods have default implementations calling the common handler 
code, to allow for users
+    // to re-use common handling logic.
+
+    fn consume_expr(&mut self, expr: &Expr, schema: &DFSchemaRef) -> 
Result<Expression> {
+        to_substrait_rex(self, expr, schema)
+    }
+
+    fn consume_alias(
+        &mut self,
+        alias: &Alias,
+        schema: &DFSchemaRef,
+    ) -> Result<Expression> {
+        from_alias(self, alias, schema)
+    }
+
+    fn consume_column(
+        &mut self,
+        column: &Column,
+        schema: &DFSchemaRef,
+    ) -> Result<Expression> {
+        from_column(self, column, schema)
+    }
+
+    fn consume_literal(&mut self, value: &ScalarValue) -> Result<Expression> {
+        from_literal(self, value)
+    }
+
+    fn consume_binary_expr(
+        &mut self,
+        expr: &BinaryExpr,
+        schema: &DFSchemaRef,
+    ) -> Result<Expression> {
+        from_binary_expr(self, expr, schema)
+    }
+
+    fn consume_like(&mut self, like: &Like, schema: &DFSchemaRef) -> 
Result<Expression> {
+        from_like(self, like, schema)
+    }
+
+    /// For handling Not, IsNotNull, IsNull, IsTrue, IsFalse, IsUnknown, 
IsNotTrue, IsNotFalse, IsNotUnknown, Negative
+    fn consume_unary_expr(
+        &mut self,
+        expr: &Expr,
+        schema: &DFSchemaRef,
+    ) -> Result<Expression> {
+        from_unary_expr(self, expr, schema)
+    }
+
+    fn consume_between(
+        &mut self,
+        between: &Between,
+        schema: &DFSchemaRef,
+    ) -> Result<Expression> {
+        from_between(self, between, schema)
+    }
+
+    fn consume_case(&mut self, case: &Case, schema: &DFSchemaRef) -> 
Result<Expression> {
+        from_case(self, case, schema)
+    }
+
+    fn consume_cast(&mut self, cast: &Cast, schema: &DFSchemaRef) -> 
Result<Expression> {
+        from_cast(self, cast, schema)
+    }
+
+    fn consume_try_cast(
+        &mut self,
+        cast: &TryCast,
+        schema: &DFSchemaRef,
+    ) -> Result<Expression> {
+        from_try_cast(self, cast, schema)
+    }
+
+    fn consume_scalar_function(
+        &mut self,
+        scalar_fn: &expr::ScalarFunction,
+        schema: &DFSchemaRef,
+    ) -> Result<Expression> {
+        from_scalar_function(self, scalar_fn, schema)
+    }
+
+    fn consume_aggregate_function(
+        &mut self,
+        agg_fn: &expr::AggregateFunction,
+        schema: &DFSchemaRef,
+    ) -> Result<Measure> {
+        from_aggregate_function(self, agg_fn, schema)
+    }
+
+    fn consume_window_function(
+        &mut self,
+        window_fn: &WindowFunction,
+        schema: &DFSchemaRef,
+    ) -> Result<Expression> {
+        from_window_function(self, window_fn, schema)
+    }
+
+    fn consume_in_list(
+        &mut self,
+        in_list: &InList,
+        schema: &DFSchemaRef,
+    ) -> Result<Expression> {
+        from_in_list(self, in_list, schema)
+    }
+
+    fn consume_in_subquery(
+        &mut self,
+        in_subquery: &InSubquery,
+        schema: &DFSchemaRef,
+    ) -> Result<Expression> {
+        from_in_subquery(self, in_subquery, schema)
+    }
+}
+
+struct DefaultSubstraitProducer<'a> {
+    extensions: Extensions,
+    state: &'a SessionState,
+}
+
+impl<'a> DefaultSubstraitProducer<'a> {
+    pub fn new(state: &'a SessionState) -> Self {
+        DefaultSubstraitProducer {
+            extensions: Extensions::default(),
+            state,
+        }
+    }
+}
+
+impl SubstraitProducer for DefaultSubstraitProducer<'_> {
+    fn register_function(&mut self, fn_name: String) -> u32 {
+        self.extensions.register_function(fn_name)
+    }
+
+    fn get_extensions(self) -> Extensions {
+        self.extensions
+    }
+
+    fn consume_extension(&mut self, plan: &Extension) -> Result<Box<Rel>> {
+        let extension_bytes = self
+            .state
+            .serializer_registry()
+            .serialize_logical_plan(plan.node.as_ref())?;
+        let detail = ProtoAny {
+            type_url: plan.node.name().to_string(),
+            value: extension_bytes.into(),
+        };
+        let mut inputs_rel = plan
+            .node
+            .inputs()
+            .into_iter()
+            .map(|plan| self.consume_plan(plan))
+            .collect::<Result<Vec<_>>>()?;
+        let rel_type = match inputs_rel.len() {
+            0 => RelType::ExtensionLeaf(ExtensionLeafRel {
+                common: None,
+                detail: Some(detail),
+            }),
+            1 => RelType::ExtensionSingle(Box::new(ExtensionSingleRel {
+                common: None,
+                detail: Some(detail),
+                input: Some(inputs_rel.pop().unwrap()),
+            })),
+            _ => RelType::ExtensionMulti(ExtensionMultiRel {
+                common: None,
+                detail: Some(detail),
+                inputs: inputs_rel.into_iter().map(|r| *r).collect(),
+            }),
+        };
+        Ok(Box::new(Rel {
+            rel_type: Some(rel_type),
+        }))
+    }
+}
 
 /// Convert DataFusion LogicalPlan to Substrait Plan
-pub fn to_substrait_plan(
-    plan: &LogicalPlan,
-    state: &dyn SubstraitPlanningState,
-) -> Result<Box<Plan>> {
-    let mut extensions = Extensions::default();
+pub fn to_substrait_plan(plan: &LogicalPlan, state: &SessionState) -> 
Result<Box<Plan>> {

Review Comment:
   The public API stays mostly the same, taking a `&SessionState` instead of a 
`&dyn SubstraitPlanningState` which most users shouldn't notice.



##########
datafusion/substrait/src/logical_plan/producer.rs:
##########
@@ -450,220 +799,176 @@ pub fn to_substrait_rel(
                 }))),
             }))
         }
-        LogicalPlan::Join(join) => {
-            let left = to_substrait_rel(join.left.as_ref(), state, 
extensions)?;
-            let right = to_substrait_rel(join.right.as_ref(), state, 
extensions)?;
-            let join_type = to_substrait_jointype(join.join_type);
-            // we only support basic joins so return an error for anything not 
yet supported
-            match join.join_constraint {
-                JoinConstraint::On => {}
-                JoinConstraint::Using => {
-                    return not_impl_err!("join constraint: `using`")
-                }
-            }
-            // parse filter if exists
-            let in_join_schema = join.left.schema().join(join.right.schema())?;
-            let join_filter = match &join.filter {
-                Some(filter) => Some(to_substrait_rex(
-                    state,
-                    filter,
-                    &Arc::new(in_join_schema),
-                    0,
-                    extensions,
-                )?),
-                None => None,
-            };
+        Distinct::On(_) => not_impl_err!("Cannot convert Distinct::On"),
+    }
+}
 
-            // map the left and right columns to binary expressions in the 
form `l = r`
-            // build a single expression for the ON condition, such as `l.a = 
r.a AND l.b = r.b`
-            let eq_op = if join.null_equals_null {
-                Operator::IsNotDistinctFrom
-            } else {
-                Operator::Eq
-            };
-            let join_on = to_substrait_join_expr(
-                state,
-                &join.on,
-                eq_op,
-                join.left.schema(),
-                join.right.schema(),
-                extensions,
-            )?;
-
-            // create conjunction between `join_on` and `join_filter` to embed 
all join conditions,
-            // whether equal or non-equal in a single expression
-            let join_expr = match &join_on {
-                Some(on_expr) => match &join_filter {
-                    Some(filter) => Some(Box::new(make_binary_op_scalar_func(
-                        on_expr,
-                        filter,
-                        Operator::And,
-                        extensions,
-                    ))),
-                    None => join_on.map(Box::new), // the join expression will 
only contain `join_on` if filter doesn't exist
-                },
-                None => match &join_filter {
-                    Some(_) => join_filter.map(Box::new), // the join 
expression will only contain `join_filter` if the `on` condition doesn't exist
-                    None => None,
-                },
-            };
+pub fn from_join(producer: &mut impl SubstraitProducer, join: &Join) -> 
Result<Box<Rel>> {
+    let left = producer.consume_plan(join.left.as_ref())?;
+    let right = producer.consume_plan(join.right.as_ref())?;
+    let join_type = to_substrait_jointype(join.join_type);
+    // we only support basic joins so return an error for anything not yet 
supported
+    match join.join_constraint {
+        JoinConstraint::On => {}
+        JoinConstraint::Using => return not_impl_err!("join constraint: 
`using`"),
+    }
+    let in_join_schema = 
Arc::new(join.left.schema().join(join.right.schema())?);
 
-            Ok(Box::new(Rel {
-                rel_type: Some(RelType::Join(Box::new(JoinRel {
-                    common: None,
-                    left: Some(left),
-                    right: Some(right),
-                    r#type: join_type as i32,
-                    expression: join_expr,
-                    post_join_filter: None,
-                    advanced_extension: None,
-                }))),
-            }))
-        }
-        LogicalPlan::SubqueryAlias(alias) => {
-            // Do nothing if encounters SubqueryAlias
-            // since there is no corresponding relation type in Substrait
-            to_substrait_rel(alias.input.as_ref(), state, extensions)
-        }
-        LogicalPlan::Union(union) => {
-            let input_rels = union
-                .inputs
-                .iter()
-                .map(|input| to_substrait_rel(input.as_ref(), state, 
extensions))
-                .collect::<Result<Vec<_>>>()?
-                .into_iter()
-                .map(|ptr| *ptr)
-                .collect();
-            Ok(Box::new(Rel {
-                rel_type: Some(RelType::Set(SetRel {
-                    common: None,
-                    inputs: input_rels,
-                    op: set_rel::SetOp::UnionAll as i32, // UNION DISTINCT 
gets translated to AGGREGATION + UNION ALL
-                    advanced_extension: None,
-                })),
-            }))
-        }
-        LogicalPlan::Window(window) => {
-            let input = to_substrait_rel(window.input.as_ref(), state, 
extensions)?;
+    // convert filter if present
+    let join_filter = match &join.filter {
+        Some(filter) => Some(to_substrait_rex(producer, filter, 
&in_join_schema)?),
+        None => None,
+    };
 
-            // create a field reference for each input field
-            let mut expressions = (0..window.input.schema().fields().len())
-                .map(substrait_field_ref)
-                .collect::<Result<Vec<_>>>()?;
+    // map the left and right columns to binary expressions in the form `l = r`
+    // build a single expression for the ON condition, such as `l.a = r.a AND 
l.b = r.b`
+    let eq_op = if join.null_equals_null {
+        Operator::IsNotDistinctFrom
+    } else {
+        Operator::Eq
+    };
+    let join_on = to_substrait_join_expr(producer, &join.on, eq_op, 
&in_join_schema)?;

Review Comment:
   The code here has changed slightly. `to_substrait_join_expr` now takes the 
output schema of the join which makes it easier to process the join condition. 
More details below.



##########
datafusion/substrait/src/logical_plan/producer.rs:
##########
@@ -730,32 +1035,23 @@ fn to_substrait_named_struct(schema: &DFSchemaRef) -> 
Result<NamedStruct> {
 }
 
 fn to_substrait_join_expr(
-    state: &dyn SubstraitPlanningState,
+    producer: &mut impl SubstraitProducer,
     join_conditions: &Vec<(Expr, Expr)>,
     eq_op: Operator,
-    left_schema: &DFSchemaRef,
-    right_schema: &DFSchemaRef,
-    extensions: &mut Extensions,
+    join_schema: &DFSchemaRef,
 ) -> Result<Option<Expression>> {
     // Only support AND conjunction for each binary expression in join 
conditions
     let mut exprs: Vec<Expression> = vec![];
     for (left, right) in join_conditions {
-        // Parse left
-        let l = to_substrait_rex(state, left, left_schema, 0, extensions)?;
-        // Parse right
-        let r = to_substrait_rex(
-            state,
-            right,
-            right_schema,
-            left_schema.fields().len(), // offset to return the correct index
-            extensions,
-        )?;
+        let l = producer.consume_expr(left, join_schema)?;
+        let r = producer.consume_expr(right, join_schema)?;

Review Comment:
   We no longer need to track the column offset explicitly.
   
   The column offset code was added as part of 
https://github.com/apache/datafusion/pull/6135 to handle queries like
   ```sql
   SELECT d1.b, d2.c
   FROM data d1
   JOIN data d2 ON d1.b = d2.e
   ```
   which caused issue because the left and right inputs both had the same name. 
This could potentially cause column name references in DataFusion to converted 
incorrectly into Substrait column indices in some cases. Additionally, there 
were issues with duplicate schema errors.
   
   However, the introduction and usage of 
https://github.com/apache/datafusion/blob/a08dc0af8f798afe73a2fdf170ab36e72ad7e782/datafusion/substrait/src/logical_plan/consumer.rs#L1772-L1777,
 which was needed because different tables might have columns with the same 
name, now means that we can concatenate the left and right schemas of a join 
without issues. Then if the DataFusion column references are unambiguous, we 
can simply look up the columns in the join schema to get the correct index.
   
   This was the only place were the column offset was used. Removing this here 
allowed me to remove the `col_ref_offset` argument from a number of functions, 
which IMO simplifies the API substantially.
   
   For further verification, a test has been added for in in 
`roundtrip_logical_plan.rs`
   



##########
datafusion/substrait/src/logical_plan/producer.rs:
##########
@@ -999,449 +1305,421 @@ pub fn make_binary_op_scalar_func(
 ///
 /// # Arguments
 ///
-/// * `expr` - DataFusion expression to be parse into a Substrait expression
-/// * `schema` - DataFusion input schema for looking up field qualifiers
-/// * `col_ref_offset` - Offset for calculating Substrait field reference 
indices.
-///                     This should only be set by caller with more than one 
input relations i.e. Join.
-///                     Substrait expects one set of indices when joining two 
relations.
-///                     Let's say `left` and `right` have `m` and `n` columns, 
respectively. The `right`
-///                     relation will have column indices from `0` to `n-1`, 
however, Substrait will expect
-///                     the `right` indices to be offset by the `left`. This 
means Substrait will expect to
-///                     evaluate the join condition expression on indices [0 
.. n-1, n .. n+m-1]. For example:
-///                     ```SELECT *
-///                        FROM t1
-///                        JOIN t2
-///                        ON t1.c1 = t2.c0;```
-///                     where t1 consists of columns [c0, c1, c2], and t2 = 
columns [c0, c1]
-///                     the join condition should become
-///                     `col_ref(1) = col_ref(3 + 0)`
-///                     , where `3` is the number of `left` columns 
(`col_ref_offset`) and `0` is the index
-///                     of the join key column from `right`
-/// * `extensions` - Substrait extension info. Contains registered function 
information
-#[allow(deprecated)]
+/// * `expr` - DataFusion expression to convert into a Substrait expression
+/// * `schema` - DataFusion input schema for looking up columns
 pub fn to_substrait_rex(
-    state: &dyn SubstraitPlanningState,
+    producer: &mut impl SubstraitProducer,
     expr: &Expr,
     schema: &DFSchemaRef,
-    col_ref_offset: usize,
-    extensions: &mut Extensions,
 ) -> Result<Expression> {
     match expr {
-        Expr::InList(InList {
-            expr,
-            list,
-            negated,
-        }) => {
-            let substrait_list = list
-                .iter()
-                .map(|x| to_substrait_rex(state, x, schema, col_ref_offset, 
extensions))
-                .collect::<Result<Vec<Expression>>>()?;
-            let substrait_expr =
-                to_substrait_rex(state, expr, schema, col_ref_offset, 
extensions)?;
-
-            let substrait_or_list = Expression {
-                rex_type: Some(RexType::SingularOrList(Box::new(SingularOrList 
{
-                    value: Some(Box::new(substrait_expr)),
-                    options: substrait_list,
-                }))),
-            };
-
-            if *negated {
-                let function_anchor = 
extensions.register_function("not".to_string());
-
-                Ok(Expression {
-                    rex_type: Some(RexType::ScalarFunction(ScalarFunction {
-                        function_reference: function_anchor,
-                        arguments: vec![FunctionArgument {
-                            arg_type: Some(ArgType::Value(substrait_or_list)),
-                        }],
-                        output_type: None,
-                        args: vec![],
-                        options: vec![],
-                    })),
-                })
-            } else {
-                Ok(substrait_or_list)
-            }
+        Expr::Alias(expr) => producer.consume_alias(expr, schema),
+        Expr::Column(expr) => producer.consume_column(expr, schema),
+        Expr::Literal(expr) => producer.consume_literal(expr),
+        Expr::BinaryExpr(expr) => producer.consume_binary_expr(expr, schema),
+        Expr::Like(expr) => producer.consume_like(expr, schema),
+        Expr::SimilarTo(_) => not_impl_err!("SimilarTo is not supported"),
+        Expr::Not(_) => producer.consume_unary_expr(expr, schema),
+        Expr::IsNotNull(_) => producer.consume_unary_expr(expr, schema),
+        Expr::IsNull(_) => producer.consume_unary_expr(expr, schema),
+        Expr::IsTrue(_) => producer.consume_unary_expr(expr, schema),
+        Expr::IsFalse(_) => producer.consume_unary_expr(expr, schema),
+        Expr::IsUnknown(_) => producer.consume_unary_expr(expr, schema),
+        Expr::IsNotTrue(_) => producer.consume_unary_expr(expr, schema),
+        Expr::IsNotFalse(_) => producer.consume_unary_expr(expr, schema),
+        Expr::IsNotUnknown(_) => producer.consume_unary_expr(expr, schema),
+        Expr::Negative(_) => producer.consume_unary_expr(expr, schema),
+        Expr::Between(expr) => producer.consume_between(expr, schema),
+        Expr::Case(expr) => producer.consume_case(expr, schema),
+        Expr::Cast(expr) => producer.consume_cast(expr, schema),
+        Expr::TryCast(expr) => producer.consume_try_cast(expr, schema),
+        Expr::ScalarFunction(expr) => producer.consume_scalar_function(expr, 
schema),
+        Expr::AggregateFunction(_) => {
+            internal_err!(
+                "AggregateFunction should only be encountered as part of a 
LogicalPlan::Aggregate"
+            )
         }
-        Expr::ScalarFunction(fun) => {
-            let mut arguments: Vec<FunctionArgument> = vec![];
-            for arg in &fun.args {
-                arguments.push(FunctionArgument {
-                    arg_type: Some(ArgType::Value(to_substrait_rex(
-                        state,
-                        arg,
-                        schema,
-                        col_ref_offset,
-                        extensions,
-                    )?)),
-                });
-            }
+        Expr::WindowFunction(expr) => producer.consume_window_function(expr, 
schema),
+        Expr::InList(expr) => producer.consume_in_list(expr, schema),
+        Expr::InSubquery(expr) => producer.consume_in_subquery(expr, schema),
+        _ => not_impl_err!("Cannot convert {expr:?} to Substrait"),
+    }
+}
 
-            let function_anchor = 
extensions.register_function(fun.name().to_string());
-            Ok(Expression {
-                rex_type: Some(RexType::ScalarFunction(ScalarFunction {
-                    function_reference: function_anchor,
-                    arguments,
-                    output_type: None,
-                    args: vec![],
-                    options: vec![],
-                })),
-            })
-        }
-        Expr::Between(Between {
-            expr,
-            negated,
-            low,
-            high,
-        }) => {
-            if *negated {
-                // `expr NOT BETWEEN low AND high` can be translated into 
(expr < low OR high < expr)
-                let substrait_expr =
-                    to_substrait_rex(state, expr, schema, col_ref_offset, 
extensions)?;
-                let substrait_low =
-                    to_substrait_rex(state, low, schema, col_ref_offset, 
extensions)?;
-                let substrait_high =
-                    to_substrait_rex(state, high, schema, col_ref_offset, 
extensions)?;
-
-                let l_expr = make_binary_op_scalar_func(
-                    &substrait_expr,
-                    &substrait_low,
-                    Operator::Lt,
-                    extensions,
-                );
-                let r_expr = make_binary_op_scalar_func(
-                    &substrait_high,
-                    &substrait_expr,
-                    Operator::Lt,
-                    extensions,
-                );
+pub fn from_in_list(
+    producer: &mut impl SubstraitProducer,
+    in_list: &InList,
+    schema: &DFSchemaRef,
+) -> Result<Expression> {
+    let InList {
+        expr,
+        list,
+        negated,
+    } = in_list;
+    let substrait_list = list
+        .iter()
+        .map(|x| producer.consume_expr(x, schema))
+        .collect::<Result<Vec<Expression>>>()?;
+    let substrait_expr = producer.consume_expr(expr, schema)?;
+
+    let substrait_or_list = Expression {
+        rex_type: Some(RexType::SingularOrList(Box::new(SingularOrList {
+            value: Some(Box::new(substrait_expr)),
+            options: substrait_list,
+        }))),
+    };
 
-                Ok(make_binary_op_scalar_func(
-                    &l_expr,
-                    &r_expr,
-                    Operator::Or,
-                    extensions,
-                ))
-            } else {
-                // `expr BETWEEN low AND high` can be translated into (low <= 
expr AND expr <= high)
-                let substrait_expr =
-                    to_substrait_rex(state, expr, schema, col_ref_offset, 
extensions)?;
-                let substrait_low =
-                    to_substrait_rex(state, low, schema, col_ref_offset, 
extensions)?;
-                let substrait_high =
-                    to_substrait_rex(state, high, schema, col_ref_offset, 
extensions)?;
-
-                let l_expr = make_binary_op_scalar_func(
-                    &substrait_low,
-                    &substrait_expr,
-                    Operator::LtEq,
-                    extensions,
-                );
-                let r_expr = make_binary_op_scalar_func(
-                    &substrait_expr,
-                    &substrait_high,
-                    Operator::LtEq,
-                    extensions,
-                );
+    if *negated {
+        let function_anchor = producer.register_function("not".to_string());
 
-                Ok(make_binary_op_scalar_func(
-                    &l_expr,
-                    &r_expr,
-                    Operator::And,
-                    extensions,
-                ))
-            }
-        }
-        Expr::Column(col) => {
-            let index = schema.index_of_column(col)?;
-            substrait_field_ref(index + col_ref_offset)
-        }
-        Expr::BinaryExpr(BinaryExpr { left, op, right }) => {
-            let l = to_substrait_rex(state, left, schema, col_ref_offset, 
extensions)?;
-            let r = to_substrait_rex(state, right, schema, col_ref_offset, 
extensions)?;
+        #[allow(deprecated)]
+        Ok(Expression {
+            rex_type: Some(RexType::ScalarFunction(ScalarFunction {
+                function_reference: function_anchor,
+                arguments: vec![FunctionArgument {
+                    arg_type: Some(ArgType::Value(substrait_or_list)),
+                }],
+                output_type: None,
+                args: vec![],
+                options: vec![],
+            })),
+        })
+    } else {
+        Ok(substrait_or_list)
+    }
+}
 
-            Ok(make_binary_op_scalar_func(&l, &r, *op, extensions))
-        }
-        Expr::Case(Case {
-            expr,
-            when_then_expr,
-            else_expr,
-        }) => {
-            let mut ifs: Vec<IfClause> = vec![];
-            // Parse base
-            if let Some(e) = expr {
-                // Base expression exists
-                ifs.push(IfClause {
-                    r#if: Some(to_substrait_rex(
-                        state,
-                        e,
-                        schema,
-                        col_ref_offset,
-                        extensions,
-                    )?),
-                    then: None,
-                });
-            }
-            // Parse `when`s
-            for (r#if, then) in when_then_expr {
-                ifs.push(IfClause {
-                    r#if: Some(to_substrait_rex(
-                        state,
-                        r#if,
-                        schema,
-                        col_ref_offset,
-                        extensions,
-                    )?),
-                    then: Some(to_substrait_rex(
-                        state,
-                        then,
-                        schema,
-                        col_ref_offset,
-                        extensions,
-                    )?),
-                });
-            }
+pub fn from_scalar_function(
+    producer: &mut impl SubstraitProducer,
+    fun: &expr::ScalarFunction,
+    schema: &DFSchemaRef,
+) -> Result<Expression> {
+    let mut arguments: Vec<FunctionArgument> = vec![];
+    for arg in &fun.args {
+        arguments.push(FunctionArgument {
+            arg_type: Some(ArgType::Value(to_substrait_rex(producer, arg, 
schema)?)),
+        });
+    }
 
-            // Parse outer `else`
-            let r#else: Option<Box<Expression>> = match else_expr {
-                Some(e) => Some(Box::new(to_substrait_rex(
-                    state,
-                    e,
-                    schema,
-                    col_ref_offset,
-                    extensions,
-                )?)),
-                None => None,
-            };
+    let function_anchor = producer.register_function(fun.name().to_string());
+    #[allow(deprecated)]
+    Ok(Expression {
+        rex_type: Some(RexType::ScalarFunction(ScalarFunction {
+            function_reference: function_anchor,
+            arguments,
+            output_type: None,
+            options: vec![],
+            args: vec![],
+        })),
+    })
+}
 
-            Ok(Expression {
-                rex_type: Some(RexType::IfThen(Box::new(IfThen { ifs, r#else 
}))),
-            })
-        }
-        Expr::Cast(Cast { expr, data_type }) => Ok(Expression {
-            rex_type: Some(RexType::Cast(Box::new(
-                substrait::proto::expression::Cast {
-                    r#type: Some(to_substrait_type(data_type, true)?),
-                    input: Some(Box::new(to_substrait_rex(
-                        state,
-                        expr,
-                        schema,
-                        col_ref_offset,
-                        extensions,
-                    )?)),
-                    failure_behavior: FailureBehavior::ThrowException.into(),
-                },
-            ))),
-        }),
-        Expr::TryCast(TryCast { expr, data_type }) => Ok(Expression {
-            rex_type: Some(RexType::Cast(Box::new(
-                substrait::proto::expression::Cast {
-                    r#type: Some(to_substrait_type(data_type, true)?),
-                    input: Some(Box::new(to_substrait_rex(
-                        state,
-                        expr,
-                        schema,
-                        col_ref_offset,
-                        extensions,
-                    )?)),
-                    failure_behavior: FailureBehavior::ReturnNull.into(),
-                },
-            ))),
-        }),
-        Expr::Literal(value) => to_substrait_literal_expr(value, extensions),
-        Expr::Alias(Alias { expr, .. }) => {
-            to_substrait_rex(state, expr, schema, col_ref_offset, extensions)
-        }
-        Expr::WindowFunction(WindowFunction {
-            fun,
-            args,
-            partition_by,
-            order_by,
-            window_frame,
-            null_treatment: _,
-        }) => {
-            // function reference
-            let function_anchor = 
extensions.register_function(fun.to_string());
-            // arguments
-            let mut arguments: Vec<FunctionArgument> = vec![];
-            for arg in args {
-                arguments.push(FunctionArgument {
-                    arg_type: Some(ArgType::Value(to_substrait_rex(
-                        state,
-                        arg,
-                        schema,
-                        col_ref_offset,
-                        extensions,
-                    )?)),
-                });
-            }
-            // partition by expressions
-            let partition_by = partition_by
-                .iter()
-                .map(|e| to_substrait_rex(state, e, schema, col_ref_offset, 
extensions))
-                .collect::<Result<Vec<_>>>()?;
-            // order by expressions
-            let order_by = order_by
-                .iter()
-                .map(|e| substrait_sort_field(state, e, schema, extensions))
-                .collect::<Result<Vec<_>>>()?;
-            // window frame
-            let bounds = to_substrait_bounds(window_frame)?;
-            let bound_type = to_substrait_bound_type(window_frame)?;
-            Ok(make_substrait_window_function(
-                function_anchor,
-                arguments,
-                partition_by,
-                order_by,
-                bounds,
-                bound_type,
-            ))
-        }
-        Expr::Like(Like {
-            negated,
-            expr,
-            pattern,
-            escape_char,
-            case_insensitive,
-        }) => make_substrait_like_expr(
-            state,
-            *case_insensitive,
-            *negated,
-            expr,
-            pattern,
-            *escape_char,
-            schema,
-            col_ref_offset,
-            extensions,
-        ),
-        Expr::InSubquery(InSubquery {
-            expr,
-            subquery,
-            negated,
-        }) => {
-            let substrait_expr =
-                to_substrait_rex(state, expr, schema, col_ref_offset, 
extensions)?;
-
-            let subquery_plan =
-                to_substrait_rel(subquery.subquery.as_ref(), state, 
extensions)?;
-
-            let substrait_subquery = Expression {
-                rex_type: Some(RexType::Subquery(Box::new(Subquery {
-                    subquery_type: Some(
-                        
substrait::proto::expression::subquery::SubqueryType::InPredicate(
-                            Box::new(InPredicate {
-                                needles: (vec![substrait_expr]),
-                                haystack: Some(subquery_plan),
-                            }),
-                        ),
+pub fn from_between(
+    producer: &mut impl SubstraitProducer,
+    between: &Between,
+    schema: &DFSchemaRef,
+) -> Result<Expression> {
+    let Between {
+        expr,
+        negated,
+        low,
+        high,
+    } = between;
+    if *negated {
+        // `expr NOT BETWEEN low AND high` can be translated into (expr < low 
OR high < expr)
+        let substrait_expr = producer.consume_expr(expr.as_ref(), schema)?;
+        let substrait_low = producer.consume_expr(low.as_ref(), schema)?;
+        let substrait_high = producer.consume_expr(high.as_ref(), schema)?;
+
+        let l_expr = make_binary_op_scalar_func(
+            producer,
+            &substrait_expr,
+            &substrait_low,
+            Operator::Lt,
+        );
+        let r_expr = make_binary_op_scalar_func(
+            producer,
+            &substrait_high,
+            &substrait_expr,
+            Operator::Lt,
+        );
+
+        Ok(make_binary_op_scalar_func(
+            producer,
+            &l_expr,
+            &r_expr,
+            Operator::Or,
+        ))
+    } else {
+        // `expr BETWEEN low AND high` can be translated into (low <= expr AND 
expr <= high)
+        let substrait_expr = producer.consume_expr(expr.as_ref(), schema)?;
+        let substrait_low = producer.consume_expr(low.as_ref(), schema)?;
+        let substrait_high = producer.consume_expr(high.as_ref(), schema)?;
+
+        let l_expr = make_binary_op_scalar_func(
+            producer,
+            &substrait_low,
+            &substrait_expr,
+            Operator::LtEq,
+        );
+        let r_expr = make_binary_op_scalar_func(
+            producer,
+            &substrait_expr,
+            &substrait_high,
+            Operator::LtEq,
+        );
+
+        Ok(make_binary_op_scalar_func(
+            producer,
+            &l_expr,
+            &r_expr,
+            Operator::And,
+        ))
+    }
+}
+pub fn from_column(
+    _producer: &impl SubstraitProducer,
+    col: &Column,
+    schema: &DFSchemaRef,
+) -> Result<Expression> {
+    let index = schema.index_of_column(col)?;
+    substrait_field_ref(index)
+}
+
+pub fn from_binary_expr(
+    producer: &mut impl SubstraitProducer,
+    expr: &BinaryExpr,
+    schema: &DFSchemaRef,
+) -> Result<Expression> {
+    let BinaryExpr { left, op, right } = expr;
+    let l = producer.consume_expr(left, schema)?;
+    let r = producer.consume_expr(right, schema)?;
+    Ok(make_binary_op_scalar_func(producer, &l, &r, *op))
+}
+pub fn from_case(
+    producer: &mut impl SubstraitProducer,
+    case: &Case,
+    schema: &DFSchemaRef,
+) -> Result<Expression> {
+    let Case {
+        expr,
+        when_then_expr,
+        else_expr,
+    } = case;
+    let mut ifs: Vec<IfClause> = vec![];
+    // Parse base
+    if let Some(e) = expr {
+        // Base expression exists
+        ifs.push(IfClause {
+            r#if: Some(producer.consume_expr(e, schema)?),
+            then: None,
+        });
+    }
+    // Parse `when`s
+    for (r#if, then) in when_then_expr {
+        ifs.push(IfClause {
+            r#if: Some(producer.consume_expr(r#if, schema)?),
+            then: Some(producer.consume_expr(then, schema)?),
+        });
+    }
+
+    // Parse outer `else`
+    let r#else: Option<Box<Expression>> = match else_expr {
+        Some(e) => Some(Box::new(to_substrait_rex(producer, e, schema)?)),
+        None => None,
+    };
+
+    Ok(Expression {
+        rex_type: Some(RexType::IfThen(Box::new(IfThen { ifs, r#else }))),
+    })
+}
+
+pub fn from_cast(
+    producer: &mut impl SubstraitProducer,
+    cast: &Cast,
+    schema: &DFSchemaRef,
+) -> Result<Expression> {
+    let Cast { expr, data_type } = cast;
+    Ok(Expression {
+        rex_type: Some(RexType::Cast(Box::new(
+            substrait::proto::expression::Cast {
+                r#type: Some(to_substrait_type(data_type, true)?),
+                input: Some(Box::new(to_substrait_rex(producer, expr, 
schema)?)),
+                failure_behavior: FailureBehavior::ThrowException.into(),
+            },
+        ))),
+    })
+}
+
+pub fn from_try_cast(
+    producer: &mut impl SubstraitProducer,
+    cast: &TryCast,
+    schema: &DFSchemaRef,
+) -> Result<Expression> {
+    let TryCast { expr, data_type } = cast;
+    Ok(Expression {
+        rex_type: Some(RexType::Cast(Box::new(
+            substrait::proto::expression::Cast {
+                r#type: Some(to_substrait_type(data_type, true)?),
+                input: Some(Box::new(to_substrait_rex(producer, expr, 
schema)?)),
+                failure_behavior: FailureBehavior::ReturnNull.into(),
+            },
+        ))),
+    })
+}
+
+pub fn from_literal(
+    producer: &mut impl SubstraitProducer,
+    value: &ScalarValue,
+) -> Result<Expression> {
+    to_substrait_literal_expr(producer, value)
+}
+
+pub fn from_alias(
+    producer: &mut impl SubstraitProducer,
+    alias: &Alias,
+    schema: &DFSchemaRef,
+) -> Result<Expression> {
+    producer.consume_expr(alias.expr.as_ref(), schema)
+}
+
+pub fn from_window_function(
+    producer: &mut impl SubstraitProducer,
+    window_fn: &WindowFunction,
+    schema: &DFSchemaRef,
+) -> Result<Expression> {
+    let WindowFunction {
+        fun,
+        args,
+        partition_by,
+        order_by,
+        window_frame,
+        null_treatment: _,
+    } = window_fn;
+    // function reference
+    let function_anchor = producer.register_function(fun.to_string());
+    // arguments
+    let mut arguments: Vec<FunctionArgument> = vec![];
+    for arg in args {
+        arguments.push(FunctionArgument {
+            arg_type: Some(ArgType::Value(to_substrait_rex(producer, arg, 
schema)?)),
+        });
+    }
+    // partition by expressions
+    let partition_by = partition_by
+        .iter()
+        .map(|e| producer.consume_expr(e, schema))
+        .collect::<Result<Vec<_>>>()?;
+    // order by expressions
+    let order_by = order_by
+        .iter()
+        .map(|e| substrait_sort_field(producer, e, schema))
+        .collect::<Result<Vec<_>>>()?;
+    // window frame
+    let bounds = to_substrait_bounds(window_frame)?;
+    let bound_type = to_substrait_bound_type(window_frame)?;
+    Ok(make_substrait_window_function(
+        function_anchor,
+        arguments,
+        partition_by,
+        order_by,
+        bounds,
+        bound_type,
+    ))
+}
+
+pub fn from_like(
+    producer: &mut impl SubstraitProducer,
+    like: &Like,
+    schema: &DFSchemaRef,
+) -> Result<Expression> {
+    let Like {
+        negated,
+        expr,
+        pattern,
+        escape_char,
+        case_insensitive,
+    } = like;
+    make_substrait_like_expr(
+        producer,
+        *case_insensitive,
+        *negated,
+        expr,
+        pattern,
+        *escape_char,
+        schema,
+    )
+}
+
+pub fn from_in_subquery(
+    producer: &mut impl SubstraitProducer,
+    subquery: &InSubquery,
+    schema: &DFSchemaRef,
+) -> Result<Expression> {
+    let InSubquery {
+        expr,
+        subquery,
+        negated,
+    } = subquery;
+    let substrait_expr = producer.consume_expr(expr, schema)?;
+
+    let subquery_plan = producer.consume_plan(subquery.subquery.as_ref())?;
+
+    let substrait_subquery = Expression {
+        rex_type: Some(RexType::Subquery(Box::new(
+            substrait::proto::expression::Subquery {
+                subquery_type: Some(
+                    
substrait::proto::expression::subquery::SubqueryType::InPredicate(
+                        Box::new(InPredicate {
+                            needles: (vec![substrait_expr]),
+                            haystack: Some(subquery_plan),
+                        }),
                     ),
-                }))),
-            };
-            if *negated {
-                let function_anchor = 
extensions.register_function("not".to_string());
-
-                Ok(Expression {
-                    rex_type: Some(RexType::ScalarFunction(ScalarFunction {
-                        function_reference: function_anchor,
-                        arguments: vec![FunctionArgument {
-                            arg_type: Some(ArgType::Value(substrait_subquery)),
-                        }],
-                        output_type: None,
-                        args: vec![],
-                        options: vec![],
-                    })),
-                })
-            } else {
-                Ok(substrait_subquery)
-            }
-        }
-        Expr::Not(arg) => to_substrait_unary_scalar_fn(
-            state,
-            "not",
-            arg,
-            schema,
-            col_ref_offset,
-            extensions,
-        ),
-        Expr::IsNull(arg) => to_substrait_unary_scalar_fn(
-            state,
-            "is_null",
-            arg,
-            schema,
-            col_ref_offset,
-            extensions,
-        ),
-        Expr::IsNotNull(arg) => to_substrait_unary_scalar_fn(
-            state,
-            "is_not_null",
-            arg,
-            schema,
-            col_ref_offset,
-            extensions,
-        ),
-        Expr::IsTrue(arg) => to_substrait_unary_scalar_fn(
-            state,
-            "is_true",
-            arg,
-            schema,
-            col_ref_offset,
-            extensions,
-        ),
-        Expr::IsFalse(arg) => to_substrait_unary_scalar_fn(
-            state,
-            "is_false",
-            arg,
-            schema,
-            col_ref_offset,
-            extensions,
-        ),
-        Expr::IsUnknown(arg) => to_substrait_unary_scalar_fn(
-            state,
-            "is_unknown",
-            arg,
-            schema,
-            col_ref_offset,
-            extensions,
-        ),
-        Expr::IsNotTrue(arg) => to_substrait_unary_scalar_fn(
-            state,
-            "is_not_true",
-            arg,
-            schema,
-            col_ref_offset,
-            extensions,
-        ),
-        Expr::IsNotFalse(arg) => to_substrait_unary_scalar_fn(
-            state,
-            "is_not_false",
-            arg,
-            schema,
-            col_ref_offset,
-            extensions,
-        ),
-        Expr::IsNotUnknown(arg) => to_substrait_unary_scalar_fn(
-            state,
-            "is_not_unknown",
-            arg,
-            schema,
-            col_ref_offset,
-            extensions,
-        ),
-        Expr::Negative(arg) => to_substrait_unary_scalar_fn(
-            state,
-            "negate",
-            arg,
-            schema,
-            col_ref_offset,
-            extensions,
-        ),
-        _ => {
-            not_impl_err!("Unsupported expression: {expr:?}")
-        }
+                ),
+            },
+        ))),
+    };
+    if *negated {
+        let function_anchor = producer.register_function("not".to_string());
+
+        #[allow(deprecated)]
+        Ok(Expression {
+            rex_type: Some(RexType::ScalarFunction(ScalarFunction {
+                function_reference: function_anchor,
+                arguments: vec![FunctionArgument {
+                    arg_type: Some(ArgType::Value(substrait_subquery)),
+                }],
+                output_type: None,
+                args: vec![],
+                options: vec![],
+            })),
+        })
+    } else {
+        Ok(substrait_subquery)
     }
 }
 
+pub fn from_unary_expr(
+    producer: &mut impl SubstraitProducer,
+    expr: &Expr,
+    schema: &DFSchemaRef,
+) -> Result<Expression> {
+    let (fn_name, arg) = match expr {
+        Expr::Not(arg) => ("not", arg),
+        Expr::IsNull(arg) => ("is_null", arg),
+        Expr::IsNotNull(arg) => ("is_not_null", arg),
+        Expr::IsTrue(arg) => ("is_true", arg),
+        Expr::IsFalse(arg) => ("is_false", arg),
+        Expr::IsUnknown(arg) => ("is_unknown", arg),
+        Expr::IsNotTrue(arg) => ("is_not_true", arg),
+        Expr::IsNotFalse(arg) => ("is_not_false", arg),
+        Expr::IsNotUnknown(arg) => ("is_not_unknown", arg),
+        Expr::Negative(arg) => ("negate", arg),
+        expr => not_impl_err!("Unsupported expression: {expr:?}")?,
+    };
+    to_substrait_unary_scalar_fn(producer, fn_name, arg, schema)

Review Comment:
   Consolidated the handling of unary expression like Not, IsNull, IsNotNull 
etc into a single function for improved readability.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: github-unsubscr...@datafusion.apache.org
For additional commands, e-mail: github-h...@datafusion.apache.org

Reply via email to