This is an automated email from the ASF dual-hosted git repository.

kazuyukitanimura pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git


The following commit(s) were added to refs/heads/main by this push:
     new 26b406dc4 Feat: support array_compact function (#1321)
26b406dc4 is described below

commit 26b406dc4efa0ed037bb07a7862563c2ff9cdba8
Author: Kazantsev Maksim <[email protected]>
AuthorDate: Thu Mar 6 04:27:28 2025 +0400

    Feat: support array_compact function (#1321)
    
    ## Which issue does this PR close?
    
    Related to Epic: https://github.com/apache/datafusion-comet/issues/1042
    array_compact: SELECT array_compact(array(1, 2, 3, null)) => array(1, 2, 3)
    DataFusion' s array_compact has same behavior with Spark 's array_compact 
function
    Spark: 
https://docs.databricks.com/en/sql/language-manual/functions/array_compact.html
    DataFusion: 
https://datafusion.apache.org/user-guide/sql/scalar_functions.html#array-remove-all
    
    ## Rationale for this change
    
    Defined under Epic: https://github.com/apache/datafusion-comet/issues/1042
    
    ## What changes are included in this PR?
    
    planner.rs: Maps Spark 's arrays_compact function to DataFusion 
array_remove_all_udf physical expression from Spark physical expression
    expr.proto: arrays_compact message has been added,
    QueryPlanSerde.scala: arrays_compact pattern matching case has been added,
    CometExpressionSuite.scala: A new UT has been added for arrays_compact 
function.
    
    ## How are these changes tested?
    
    A new UT has been added.
---
 .../org/apache/comet/serde/QueryPlanSerde.scala    |  2 ++
 .../main/scala/org/apache/comet/serde/arrays.scala | 31 +++++++++++++++++++---
 .../apache/comet/CometArrayExpressionSuite.scala   | 20 ++++++++++++++
 3 files changed, 50 insertions(+), 3 deletions(-)

diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala 
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index 381a75b65..8757105ec 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -1992,6 +1992,8 @@ object QueryPlanSerde extends Logging with 
ShimQueryPlanSerde with CometExprShim
       case _: ArrayIntersect => convert(CometArrayIntersect)
       case _: ArrayJoin => convert(CometArrayJoin)
       case _: ArraysOverlap => convert(CometArraysOverlap)
+      case _ @ArrayFilter(_, func) if 
func.children.head.isInstanceOf[IsNotNull] =>
+        convert(CometArrayCompact)
       case _ =>
         withInfo(expr, s"${expr.prettyName} is not supported", expr.children: 
_*)
         None
diff --git a/spark/src/main/scala/org/apache/comet/serde/arrays.scala 
b/spark/src/main/scala/org/apache/comet/serde/arrays.scala
index db1679f22..1ada81b97 100644
--- a/spark/src/main/scala/org/apache/comet/serde/arrays.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/arrays.scala
@@ -19,11 +19,11 @@
 
 package org.apache.comet.serde
 
-import org.apache.spark.sql.catalyst.expressions.{ArrayJoin, ArrayRemove, 
Attribute, Expression}
-import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, 
DecimalType, StructType}
+import org.apache.spark.sql.catalyst.expressions.{ArrayJoin, ArrayRemove, 
Attribute, Expression, Literal}
+import org.apache.spark.sql.types._
 
 import org.apache.comet.CometSparkSessionExtensions.withInfo
-import org.apache.comet.serde.QueryPlanSerde.{createBinaryExpr, exprToProto}
+import org.apache.comet.serde.QueryPlanSerde.{createBinaryExpr, exprToProto, 
scalarExprToProtoWithReturnType}
 import org.apache.comet.shims.CometExprShim
 
 object CometArrayRemove extends CometExpressionSerde with CometExprShim {
@@ -126,6 +126,31 @@ object CometArraysOverlap extends CometExpressionSerde 
with IncompatExpr {
   }
 }
 
+object CometArrayCompact extends CometExpressionSerde with IncompatExpr {
+  override def convert(
+      expr: Expression,
+      inputs: Seq[Attribute],
+      binding: Boolean): Option[ExprOuterClass.Expr] = {
+    val child = expr.children.head
+    val elementType = child.dataType.asInstanceOf[ArrayType].elementType
+
+    val arrayExprProto = exprToProto(child, inputs, binding)
+    val nullLiteralProto = exprToProto(Literal(null, elementType), Seq.empty)
+
+    val arrayCompactScalarExpr = scalarExprToProtoWithReturnType(
+      "array_remove_all",
+      ArrayType(elementType = elementType),
+      arrayExprProto,
+      nullLiteralProto)
+    arrayCompactScalarExpr match {
+      case None =>
+        withInfo(expr, "unsupported arguments for ArrayCompact", 
expr.children: _*)
+        None
+      case expr => expr
+    }
+  }
+}
+
 object CometArrayJoin extends CometExpressionSerde with IncompatExpr {
   override def convert(
       expr: Expression,
diff --git 
a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala 
b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
index df1fccb69..8850f2133 100644
--- a/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometArrayExpressionSuite.scala
@@ -292,4 +292,24 @@ class CometArrayExpressionSuite extends CometTestBase with 
AdaptiveSparkPlanHelp
     }
   }
 
+  test("array_compact") {
+    assume(isSpark34Plus)
+    withSQLConf(CometConf.COMET_EXPR_ALLOW_INCOMPATIBLE.key -> "true") {
+      Seq(true, false).foreach { dictionaryEnabled =>
+        withTempDir { dir =>
+          val path = new Path(dir.toURI.toString, "test.parquet")
+          makeParquetFileAllTypes(path, dictionaryEnabled = dictionaryEnabled, 
n = 10000)
+          spark.read.parquet(path.toString).createOrReplaceTempView("t1")
+
+          checkSparkAnswerAndOperator(
+            sql("SELECT array_compact(array(_2)) FROM t1 WHERE _2 IS NULL"))
+          checkSparkAnswerAndOperator(
+            sql("SELECT array_compact(array(_2)) FROM t1 WHERE _2 IS NOT 
NULL"))
+          checkSparkAnswerAndOperator(
+            sql("SELECT array_compact(array(_2, _3, null)) FROM t1 WHERE _2 IS 
NOT NULL"))
+        }
+      }
+    }
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to