This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new bd3235fd2 chore: Remove code for unpacking dictionaries prior to
FilterExec (#2659)
bd3235fd2 is described below
commit bd3235fd27b559b6aa97e9c0b9687f4143757dcf
Author: Andy Grove <[email protected]>
AuthorDate: Wed Oct 29 10:42:55 2025 -0600
chore: Remove code for unpacking dictionaries prior to FilterExec (#2659)
---
native/core/src/execution/planner.rs | 16 ++++------------
native/proto/src/proto/operator.proto | 2 --
.../scala/org/apache/comet/serde/QueryPlanSerde.scala | 10 ----------
3 files changed, 4 insertions(+), 24 deletions(-)
diff --git a/native/core/src/execution/planner.rs
b/native/core/src/execution/planner.rs
index da56e01bb..ca6e2084e 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -1091,17 +1091,10 @@ impl PhysicalPlanner {
let predicate =
self.create_expr(filter.predicate.as_ref().unwrap(),
child.schema())?;
- let filter: Arc<dyn ExecutionPlan> = if
filter.wrap_child_in_copy_exec {
- Arc::new(FilterExec::try_new(
- predicate,
-
Self::wrap_in_copy_exec(Arc::clone(&child.native_plan)),
- )?)
- } else {
- Arc::new(FilterExec::try_new(
- predicate,
- Arc::clone(&child.native_plan),
- )?)
- };
+ let filter: Arc<dyn ExecutionPlan> =
Arc::new(FilterExec::try_new(
+ predicate,
+ Arc::clone(&child.native_plan),
+ )?);
Ok((
scans,
@@ -3208,7 +3201,6 @@ mod tests {
children: vec![child_op],
op_struct: Some(OpStruct::Filter(spark_operator::Filter {
predicate: Some(expr),
- wrap_child_in_copy_exec: false,
})),
}
}
diff --git a/native/proto/src/proto/operator.proto
b/native/proto/src/proto/operator.proto
index a243ab6b0..3306ad574 100644
--- a/native/proto/src/proto/operator.proto
+++ b/native/proto/src/proto/operator.proto
@@ -113,8 +113,6 @@ message Projection {
message Filter {
spark.spark_expression.Expr predicate = 1;
- // Some expressions don't support dictionary arrays, so may need to wrap the
child in a CopyExec
- bool wrap_child_in_copy_exec = 3;
}
message Sort {
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 8361f1e95..6dd454826 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -1067,19 +1067,9 @@ object QueryPlanSerde extends Logging with CometExprShim
{
val cond = exprToProto(condition, child.output)
if (cond.isDefined && childOp.nonEmpty) {
- // Some native expressions do not support operating on
dictionary-encoded arrays, so
- // wrap the child in a CopyExec to unpack dictionaries first.
- def wrapChildInCopyExec(condition: Expression): Boolean = {
- condition.exists(expr => {
- expr.isInstanceOf[StartsWith] || expr.isInstanceOf[EndsWith] ||
expr
- .isInstanceOf[Contains]
- })
- }
-
val filterBuilder = OperatorOuterClass.Filter
.newBuilder()
.setPredicate(cond.get)
- .setWrapChildInCopyExec(wrapChildInCopyExec(condition))
Some(builder.setFilter(filterBuilder).build())
} else {
withInfo(op, condition, child)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]