This is an automated email from the ASF dual-hosted git repository.
kazuyukitanimura pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 26b406dc4 Feat: support array_compact function (#1321)
26b406dc4 is described below
commit 26b406dc4efa0ed037bb07a7862563c2ff9cdba8
Author: Kazantsev Maksim <[email protected]>
AuthorDate: Thu Mar 6 04:27:28 2025 +0400
Feat: support array_compact function (#1321)
## Which issue does this PR close?
Related to Epic: https://github.com/apache/datafusion-comet/issues/1042
array_compact: SELECT array_compact(array(1, 2, 3, null)) => array(1, 2, 3)
DataFusion' s array_compact has same behavior with Spark 's array_compact
function
Spark:
https://docs.databricks.com/en/sql/language-manual/functions/array_compact.html
DataFusion:
https://datafusion.apache.org/user-guide/sql/scalar_functions.html#array-remove-all
## Rationale for this change
Defined under Epic: https://github.com/apache/datafusion-comet/issues/1042
## What changes are included in this PR?
planner.rs: Maps Spark 's arrays_compact function to DataFusion
array_remove_all_udf physical expression from Spark physical expression
expr.proto: arrays_compact message has been added,
QueryPlanSerde.scala: arrays_compact pattern matching case has been added,
CometExpressionSuite.scala: A new UT has been added for arrays_compact
function.
## How are these changes tested?
A new UT has been added.
---
.../org/apache/comet/serde/QueryPlanSerde.scala | 2 ++
.../main/scala/org/apache/comet/serde/arrays.scala | 31 +++++++++++++++++++---
.../apache/comet/CometArrayExpressionSuite.scala | 20 ++++++++++++++
3 files changed, 50 insertions(+), 3 deletions(-)
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 381a75b65..8757105ec 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -1992,6 +1992,8 @@ object QueryPlanSerde extends Logging with
ShimQueryPlanSerde with CometExprShim
case _: ArrayIntersect => convert(CometArrayIntersect)
case _: ArrayJoin => convert(CometArrayJoin)
case _: ArraysOverlap => convert(CometArraysOverlap)
+ case _ @ArrayFilter(_, func) if
func.children.head.isInstanceOf[IsNotNull] =>
+ convert(CometArrayCompact)
case _ =>
withInfo(expr, s"${expr.prettyName} is not supported", expr.children:
_*)
None
diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala
b/spark/src/main/scala/org/apache/comet/serde/arrays.scala
index db1679f22..1ada81b97 100644
--- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala
@@ -19,11 +19,11 @@
package org.apache.comet.serde
-import org.apache.spark.sql.catalyst.expressions.{ArrayJoin, ArrayRemove,
Attribute, Expression}
-import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes,
DecimalType, StructType}
+import org.apache.spark.sql.catalyst.expressions.{ArrayJoin, ArrayRemove,
Attribute, Expression, Literal}
+import org.apache.spark.sql.types._
import org.apache.comet.CometSparkSessionExtensions.withInfo
-import org.apache.comet.serde.QueryPlanSerde.{createBinaryExpr, exprToProto}
+import org.apache.comet.serde.QueryPlanSerde.{createBinaryExpr, exprToProto,
scalarExprToProtoWithReturnType}
import org.apache.comet.shims.CometExprShim
object CometArrayRemove extends CometExpressionSerde with CometExprShim {
@@ -126,6 +126,31 @@ object CometArraysOverlap extends CometExpressionSerde
with IncompatExpr {
}
}
+object CometArrayCompact extends CometExpressionSerde with IncompatExpr {
+ override def convert(
+ expr: Expression,
+ inputs: Seq[Attribute],
+ binding: Boolean): Option[ExprOuterClass.Expr] = {
+ val child = expr.children.head
+ val elementType = child.dataType.asInstanceOf[ArrayType].elementType
+
+ val arrayExprProto = exprToProto(child, inputs, binding)
+ val nullLiteralProto = exprToProto(Literal(null, elementType), Seq.empty)
+
+ val arrayCompactScalarExpr = scalarExprToProtoWithReturnType(
+ "array_remove_all",
+ ArrayType(elementType = elementType),
+ arrayExprProto,
+ nullLiteralProto)
+ arrayCompactScalarExpr match {
+ case None =>
+ withInfo(expr, "unsupported arguments for ArrayCompact",
expr.children: _*)
+ None
+ case expr => expr
+ }
+ }
+}
+
object CometArrayJoin extends CometExpressionSerde with IncompatExpr {
override def convert(
expr: Expression,
diff --git
a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
index df1fccb69..8850f2133 100644
--- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
@@ -292,4 +292,24 @@ class CometArrayExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelp
}
}
+ test("array_compact") {
+ assume(isSpark34Plus)
+ withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") {
+ Seq(true, false).foreach { dictionaryEnabled =>
+ withTempDir { dir =>
+ val path = new Path(dir.toURI.toString, "test.parquet")
+ makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled,
n = 10000)
+ spark.read.parquet(path.toString).createOrReplaceTempView("t1")
+
+ checkSparkAnswerAndOperator(
+ sql("SELECT array_compact(array(_2)) FROM t1 WHERE _2 IS NULL"))
+ checkSparkAnswerAndOperator(
+ sql("SELECT array_compact(array(_2)) FROM t1 WHERE _2 IS NOT
NULL"))
+ checkSparkAnswerAndOperator(
+ sql("SELECT array_compact(array(_2, _3, null)) FROM t1 WHERE _2 IS
NOT NULL"))
+ }
+ }
+ }
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]