comphead commented on code in PR #2836:
URL: https://github.com/apache/datafusion-comet/pull/2836#discussion_r2583156059
##########
native/core/src/execution/planner.rs:
##########
@@ -1528,6 +1530,107 @@ impl PhysicalPlanner {
Arc::new(SparkPlan::new(spark_plan.plan_id, expand,
vec![child])),
))
}
+ OpStruct::Explode(explode) => {
+ assert_eq!(children.len(), 1);
+ let (scans, child) = self.create_plan(&children[0], inputs,
partition_count)?;
+
+ // Create the expression for the array to explode
+ let child_expr = if let Some(child_expr) = &explode.child {
+ self.create_expr(child_expr, child.schema())?
+ } else {
+ return Err(ExecutionError::GeneralError(
+ "Explode operator requires a child
expression".to_string(),
+ ));
+ };
+
+ // Create projection expressions for other columns
+ let projections: Vec<Arc<dyn PhysicalExpr>> = explode
+ .project_list
+ .iter()
+ .map(|expr| self.create_expr(expr, child.schema()))
+ .collect::<Result<Vec<_>, _>>()?;
+
+ // For UnnestExec, we need to add a projection to put the
columns in the right order:
+ // 1. First add all projection columns
+ // 2. Then add the array column to be exploded
+ // Then UnnestExec will unnest the last column
+
+ let mut project_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> =
projections
+ .iter()
+ .enumerate()
+ .map(|(idx, expr)| (Arc::clone(expr),
format!("col_{idx}")))
+ .collect();
+
+ // Add the array column as the last column
+ let array_col_name = format!("col_{}", projections.len());
+ project_exprs.push((Arc::clone(&child_expr),
array_col_name.clone()));
+
+ // Create a projection to arrange columns as needed
+ let project_exec = Arc::new(ProjectionExec::try_new(
+ project_exprs,
+ Arc::clone(&child.native_plan),
+ )?);
+
+ // Get the input schema from the projection
+ let project_schema = project_exec.schema();
+
+ // Build the output schema for UnnestExec
+ // The output schema replaces the list column with its element
type
+ let mut output_fields: Vec<Field> = Vec::new();
+
+ // Add all projection columns (non-array columns)
+ for i in 0..projections.len() {
+ output_fields.push(project_schema.field(i).clone());
+ }
+
+ // Add the unnested array element field
+ // Extract the element type from the list/array type
+ let array_field = project_schema.field(projections.len());
+ let element_type = match array_field.data_type() {
+ DataType::List(field) => field.data_type().clone(),
+ dt => {
+ return Err(ExecutionError::GeneralError(format!(
+ "Expected List type for explode, got {:?}",
+ dt
+ )))
+ }
+ };
+
+ // The output column has the same name as the input array
column
+ // but with the element type instead of the list type
+ output_fields.push(Field::new(
+ array_field.name(),
+ element_type,
+ true, // Element is nullable after unnesting
Review Comment:
👍
##########
native/core/src/execution/planner.rs:
##########
@@ -1528,6 +1530,107 @@ impl PhysicalPlanner {
Arc::new(SparkPlan::new(spark_plan.plan_id, expand,
vec![child])),
))
}
+ OpStruct::Explode(explode) => {
+ assert_eq!(children.len(), 1);
+ let (scans, child) = self.create_plan(&children[0], inputs,
partition_count)?;
+
+ // Create the expression for the array to explode
+ let child_expr = if let Some(child_expr) = &explode.child {
+ self.create_expr(child_expr, child.schema())?
+ } else {
+ return Err(ExecutionError::GeneralError(
+ "Explode operator requires a child
expression".to_string(),
+ ));
+ };
+
+ // Create projection expressions for other columns
Review Comment:
other columns? 🤔
##########
native/core/src/execution/planner.rs:
##########
@@ -1528,6 +1530,107 @@ impl PhysicalPlanner {
Arc::new(SparkPlan::new(spark_plan.plan_id, expand,
vec![child])),
))
}
+ OpStruct::Explode(explode) => {
+ assert_eq!(children.len(), 1);
+ let (scans, child) = self.create_plan(&children[0], inputs,
partition_count)?;
+
+ // Create the expression for the array to explode
+ let child_expr = if let Some(child_expr) = &explode.child {
+ self.create_expr(child_expr, child.schema())?
+ } else {
+ return Err(ExecutionError::GeneralError(
+ "Explode operator requires a child
expression".to_string(),
+ ));
+ };
+
+ // Create projection expressions for other columns
+ let projections: Vec<Arc<dyn PhysicalExpr>> = explode
+ .project_list
+ .iter()
+ .map(|expr| self.create_expr(expr, child.schema()))
+ .collect::<Result<Vec<_>, _>>()?;
+
+ // For UnnestExec, we need to add a projection to put the
columns in the right order:
+ // 1. First add all projection columns
+ // 2. Then add the array column to be exploded
+ // Then UnnestExec will unnest the last column
+
+ let mut project_exprs: Vec<(Arc<dyn PhysicalExpr>, String)> =
projections
+ .iter()
+ .enumerate()
+ .map(|(idx, expr)| (Arc::clone(expr),
format!("col_{idx}")))
+ .collect();
+
+ // Add the array column as the last column
+ let array_col_name = format!("col_{}", projections.len());
Review Comment:
can this name cause a conflict or issue if original dataset has `col_*` cols?
##########
spark/src/test/resources/tpcds-micro-benchmarks/explode.sql:
##########
@@ -0,0 +1,4 @@
+SELECT i_item_sk, explode(array(i_brand_id, i_class_id, i_category_id,
i_manufact_id, i_manager_id))
Review Comment:
should we also have `explode_outer`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]