This is an automated email from the ASF dual-hosted git repository.
agrove pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/datafusion-comet.git
The following commit(s) were added to refs/heads/main by this push:
new 07274e800 Support arrays_overlap function (#1312)
07274e800 is described below
commit 07274e8005d9042441bd27378bc381bf95dad4af
Author: Eren Avsarogullari <[email protected]>
AuthorDate: Tue Jan 28 15:53:10 2025 -0800
Support arrays_overlap function (#1312)
---
native/core/src/execution/planner.rs | 16 ++++++++++++++++
native/proto/src/proto/expr.proto | 1 +
.../org/apache/comet/serde/QueryPlanSerde.scala | 16 ++++++++++++++++
.../org/apache/comet/CometExpressionSuite.scala | 21 +++++++++++++++++++++
4 files changed, 54 insertions(+)
diff --git a/native/core/src/execution/planner.rs
b/native/core/src/execution/planner.rs
index 95926bfee..71ec5f334 100644
--- a/native/core/src/execution/planner.rs
+++ b/native/core/src/execution/planner.rs
@@ -65,6 +65,7 @@ use datafusion::{
prelude::SessionContext,
};
use datafusion_comet_spark_expr::{create_comet_physical_fun,
create_negate_expr};
+use datafusion_functions_nested::array_has::array_has_any_udf;
use datafusion_functions_nested::concat::ArrayAppend;
use datafusion_functions_nested::remove::array_remove_all_udf;
use datafusion_functions_nested::set_ops::array_intersect_udf;
@@ -818,6 +819,21 @@ impl PhysicalPlanner {
));
Ok(array_join_expr)
}
+ ExprStruct::ArraysOverlap(expr) => {
+ let left_array_expr =
+ self.create_expr(expr.left.as_ref().unwrap(),
Arc::clone(&input_schema))?;
+ let right_array_expr =
+ self.create_expr(expr.right.as_ref().unwrap(),
Arc::clone(&input_schema))?;
+ let args = vec![Arc::clone(&left_array_expr),
right_array_expr];
+ let datafusion_array_has_any = array_has_any_udf();
+ let array_has_any_expr = Arc::new(ScalarFunctionExpr::new(
+ "array_has_any",
+ datafusion_array_has_any,
+ args,
+ DataType::Boolean,
+ ));
+ Ok(array_has_any_expr)
+ }
expr => Err(ExecutionError::GeneralError(format!(
"Not implemented: {:?}",
expr
diff --git a/native/proto/src/proto/expr.proto
b/native/proto/src/proto/expr.proto
index 83d6da7cb..fd928fd8a 100644
--- a/native/proto/src/proto/expr.proto
+++ b/native/proto/src/proto/expr.proto
@@ -88,6 +88,7 @@ message Expr {
BinaryExpr array_remove = 61;
BinaryExpr array_intersect = 62;
ArrayJoin array_join = 63;
+ BinaryExpr arrays_overlap = 64;
}
}
diff --git a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
index c3d7ac749..cb4fffc1a 100644
--- a/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
+++ b/spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
@@ -2428,6 +2428,22 @@ object QueryPlanSerde extends Logging with
ShimQueryPlanSerde with CometExprShim
withInfo(expr, "unsupported arguments for ArrayJoin", exprs: _*)
None
}
+ case ArraysOverlap(leftArrayExpr, rightArrayExpr) =>
+ if (CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.get()) {
+ createBinaryExpr(
+ expr,
+ leftArrayExpr,
+ rightArrayExpr,
+ inputs,
+ binding,
+ (builder, binaryExpr) => builder.setArraysOverlap(binaryExpr))
+ } else {
+ withInfo(
+ expr,
+ s"$expr is not supported yet. To enable all incompatible casts,
set " +
+ s"${CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key}=true")
+ None
+ }
case _ =>
withInfo(expr, s"${expr.prettyName} is not supported", expr.children:
_*)
None
diff --git a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
index 99cf4bad4..f82101b3a 100644
--- a/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
+++ b/spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
@@ -2701,4 +2701,25 @@ class CometExpressionSuite extends CometTestBase with
AdaptiveSparkPlanHelper {
}
}
}
+
+ test("arrays_overlap") {
+ withSQLConf(CometConf.COMET_CAST_ALLOW_INCOMPATIBLE.key -> "true") {
+ Seq(true, false).foreach { dictionaryEnabled =>
+ withTempDir { dir =>
+ val path = new Path(dir.toURI.toString, "test.parquet")
+ makeParquetFileAllTypes(path, dictionaryEnabled, 10000)
+ spark.read.parquet(path.toString).createOrReplaceTempView("t1")
+ checkSparkAnswerAndOperator(sql(
+ "SELECT arrays_overlap(array(_2, _3, _4), array(_3, _4)) from t1
where _2 is not null"))
+ checkSparkAnswerAndOperator(sql(
+ "SELECT arrays_overlap(array('a', null, cast(_1 as string)),
array('b', cast(_1 as string), cast(_2 as string))) from t1 where _1 is not
null"))
+ checkSparkAnswerAndOperator(sql(
+ "SELECT arrays_overlap(array('a', null), array('b', null)) from t1
where _1 is not null"))
+ checkSparkAnswerAndOperator(spark.sql(
+ "SELECT arrays_overlap((CASE WHEN _2 =_3 THEN array(_6, _7) END),
array(_6, _7)) FROM t1"));
+ }
+ }
+ }
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]