This is an automated email from the ASF dual-hosted git repository.
yuanzhou pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-gluten.git
The following commit(s) were added to refs/heads/main by this push:
new 52b717abda [VL] Add GlutenExtractPythonUDFsSuite (#9877)
52b717abda is described below
commit 52b717abdab284b70a042fef0293da4ff909404c
Author: Ankita Victor <[email protected]>
AuthorDate: Thu Jun 12 21:40:24 2025 +0530
[VL] Add GlutenExtractPythonUDFsSuite (#9877)
Add GlutenExtractPythonUDFsSuite for all Spark versions.
---
.../gluten/utils/velox/VeloxTestSettings.scala | 8 +
.../python/GlutenExtractPythonUDFsSuite.scala | 167 +++++++++++++++++++++
.../gluten/utils/velox/VeloxTestSettings.scala | 8 +
.../python/GlutenExtractPythonUDFsSuite.scala | 167 +++++++++++++++++++++
.../gluten/utils/velox/VeloxTestSettings.scala | 8 +
.../python/GlutenExtractPythonUDFsSuite.scala | 167 +++++++++++++++++++++
.../gluten/utils/velox/VeloxTestSettings.scala | 8 +
.../python/GlutenExtractPythonUDFsSuite.scala | 167 +++++++++++++++++++++
8 files changed, 700 insertions(+)
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index c1d8b242ef..a277da3dba 100644
---
a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -823,6 +823,14 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("Nested Python UDF: push down deterministic FilterExec
predicates")
.exclude("Python UDF: no push down on non-deterministic")
.exclude("Python UDF: push down on deterministic predicates after the
first non-deterministic")
+ enableSuite[GlutenExtractPythonUDFsSuite]
+ // Replaced with test that check for native operations
+ .exclude("Python UDF should not break column pruning/filter pushdown --
Parquet V1")
+ .exclude("Chained Scalar Pandas UDFs should be combined to a single
physical node")
+ .exclude("Mixed Batched Python UDFs and Pandas UDF should be separate
physical node")
+ .exclude("Independent Batched Python UDFs and Scalar Pandas UDFs should be
combined separately")
+ .exclude("Dependent Batched Python UDFs and Scalar Pandas UDFs should not
be combined")
+ .exclude("Python UDF should not break column pruning/filter pushdown --
Parquet V2")
override def getSQLQueryTestSettings: SQLQueryTestSettings =
VeloxSQLQueryTestSettings
}
diff --git
a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/python/GlutenExtractPythonUDFsSuite.scala
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/python/GlutenExtractPythonUDFsSuite.scala
new file mode 100644
index 0000000000..1cd34bbf78
--- /dev/null
+++
b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/execution/python/GlutenExtractPythonUDFsSuite.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.gluten.execution.{BatchScanExecTransformer,
FileSourceScanExecTransformer}
+
+import org.apache.spark.sql.GlutenSQLTestsBaseTrait
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.internal.SQLConf
+
+class GlutenExtractPythonUDFsSuite extends ExtractPythonUDFsSuite with
GlutenSQLTestsBaseTrait {
+
+ import testImplicits._
+
+ def collectBatchExec(plan: SparkPlan): Seq[BatchEvalPythonExec] =
plan.collect {
+ case b: BatchEvalPythonExec => b
+ }
+
+ def collectColumnarArrowExec(plan: SparkPlan): Seq[EvalPythonExec] =
plan.collect {
+ // To check for ColumnarArrowEvalPythonExec
+ case b: EvalPythonExec
+ if !b.isInstanceOf[ArrowEvalPythonExec] &&
!b.isInstanceOf[BatchEvalPythonExec] =>
+ b
+ }
+
+ testGluten("Chained Scalar Pandas UDFs should be combined to a single
physical node") {
+ val df = Seq(("Hello", 4)).toDF("a", "b")
+ val df2 = df
+ .withColumn("c", scalarPandasUDF(col("a")))
+ .withColumn("d", scalarPandasUDF(col("c")))
+ val arrowEvalNodes =
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+ assert(arrowEvalNodes.size == 1)
+ }
+
+ testGluten("Mixed Batched Python UDFs and Pandas UDF should be separate
physical node") {
+ val df = Seq(("Hello", 4)).toDF("a", "b")
+ val df2 = df
+ .withColumn("c", batchedPythonUDF(col("a")))
+ .withColumn("d", scalarPandasUDF(col("b")))
+
+ val pythonEvalNodes = collectBatchExec(df2.queryExecution.executedPlan)
+ val arrowEvalNodes =
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+ assert(pythonEvalNodes.size == 1)
+ assert(arrowEvalNodes.size == 1)
+ }
+
+ testGluten(
+ "Independent Batched Python UDFs and Scalar Pandas UDFs should be combined
separately") {
+ val df = Seq(("Hello", 4)).toDF("a", "b")
+ val df2 = df
+ .withColumn("c1", batchedPythonUDF(col("a")))
+ .withColumn("c2", batchedPythonUDF(col("c1")))
+ .withColumn("d1", scalarPandasUDF(col("a")))
+ .withColumn("d2", scalarPandasUDF(col("d1")))
+
+ val pythonEvalNodes = collectBatchExec(df2.queryExecution.executedPlan)
+ val arrowEvalNodes =
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+ assert(pythonEvalNodes.size == 1)
+ assert(arrowEvalNodes.size == 1)
+ }
+
+ testGluten("Dependent Batched Python UDFs and Scalar Pandas UDFs should not
be combined") {
+ val df = Seq(("Hello", 4)).toDF("a", "b")
+ val df2 = df
+ .withColumn("c1", batchedPythonUDF(col("a")))
+ .withColumn("d1", scalarPandasUDF(col("c1")))
+ .withColumn("c2", batchedPythonUDF(col("d1")))
+ .withColumn("d2", scalarPandasUDF(col("c2")))
+
+ val pythonEvalNodes = collectBatchExec(df2.queryExecution.executedPlan)
+ val arrowEvalNodes =
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+ assert(pythonEvalNodes.size == 2)
+ assert(arrowEvalNodes.size == 2)
+ }
+
+ testGluten("Python UDF should not break column pruning/filter pushdown --
Parquet V2") {
+ withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
+ withTempPath {
+ f =>
+ spark.range(10).select($"id".as("a"),
$"id".as("b")).write.parquet(f.getCanonicalPath)
+ val df = spark.read.parquet(f.getCanonicalPath)
+
+ withClue("column pruning") {
+ val query = df.filter(batchedPythonUDF($"a")).select($"a")
+
+ val pythonEvalNodes =
collectBatchExec(query.queryExecution.executedPlan)
+ assert(pythonEvalNodes.length == 1)
+
+ val scanNodes = query.queryExecution.executedPlan.collect {
+ case scan: BatchScanExecTransformer => scan
+ }
+ assert(scanNodes.length == 1)
+ assert(scanNodes.head.output.map(_.name) == Seq("a"))
+ }
+
+ withClue("filter pushdown") {
+ val query = df.filter($"a" > 1 && batchedPythonUDF($"a"))
+ val pythonEvalNodes =
collectBatchExec(query.queryExecution.executedPlan)
+ assert(pythonEvalNodes.length == 1)
+
+ val scanNodes = query.queryExecution.executedPlan.collect {
+ case scan: BatchScanExecTransformer => scan
+ }
+ assert(scanNodes.length == 1)
+ // $"a" is not null and $"a" > 1
+ val filters =
scanNodes.head.scan.asInstanceOf[ParquetScan].pushedFilters
+ assert(filters.length == 2)
+ assert(filters.flatMap(_.references).distinct === Array("a"))
+ }
+ }
+ }
+ }
+
+ testGluten("Python UDF should not break column pruning/filter pushdown --
Parquet V1") {
+ withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
+ withTempPath {
+ f =>
+ spark.range(10).select($"id".as("a"),
$"id".as("b")).write.parquet(f.getCanonicalPath)
+ val df = spark.read.parquet(f.getCanonicalPath)
+
+ withClue("column pruning") {
+ val query = df.filter(batchedPythonUDF($"a")).select($"a")
+
+ val pythonEvalNodes =
collectBatchExec(query.queryExecution.executedPlan)
+ assert(pythonEvalNodes.length == 1)
+
+ val scanNodes = query.queryExecution.executedPlan.collect {
+ case scan: FileSourceScanExecTransformer => scan
+ }
+ assert(scanNodes.length == 1)
+ assert(scanNodes.head.output.map(_.name) == Seq("a"))
+ }
+
+ withClue("filter pushdown") {
+ val query = df.filter($"a" > 1 && batchedPythonUDF($"a"))
+ val pythonEvalNodes =
collectBatchExec(query.queryExecution.executedPlan)
+ assert(pythonEvalNodes.length == 1)
+
+ val scanNodes = query.queryExecution.executedPlan.collect {
+ case scan: FileSourceScanExecTransformer => scan
+ }
+ assert(scanNodes.length == 1)
+ // $"a" is not null and $"a" > 1
+ assert(scanNodes.head.dataFilters.length == 2)
+ assert(
+
scanNodes.head.dataFilters.flatMap(_.references.map(_.name)).distinct ==
Seq("a"))
+ }
+ }
+ }
+ }
+}
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 5d3f772966..718682cfcc 100644
---
a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -872,6 +872,14 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("Nested Python UDF: push down deterministic FilterExec
predicates")
.exclude("Python UDF: no push down on non-deterministic")
.exclude("Python UDF: push down on deterministic predicates after the
first non-deterministic")
+ enableSuite[GlutenExtractPythonUDFsSuite]
+ // Replaced with test that check for native operations
+ .exclude("Python UDF should not break column pruning/filter pushdown --
Parquet V1")
+ .exclude("Chained Scalar Pandas UDFs should be combined to a single
physical node")
+ .exclude("Mixed Batched Python UDFs and Pandas UDF should be separate
physical node")
+ .exclude("Independent Batched Python UDFs and Scalar Pandas UDFs should be
combined separately")
+ .exclude("Dependent Batched Python UDFs and Scalar Pandas UDFs should not
be combined")
+ .exclude("Python UDF should not break column pruning/filter pushdown --
Parquet V2")
override def getSQLQueryTestSettings: SQLQueryTestSettings =
VeloxSQLQueryTestSettings
}
diff --git
a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/python/GlutenExtractPythonUDFsSuite.scala
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/python/GlutenExtractPythonUDFsSuite.scala
new file mode 100644
index 0000000000..1cd34bbf78
--- /dev/null
+++
b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/execution/python/GlutenExtractPythonUDFsSuite.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.gluten.execution.{BatchScanExecTransformer,
FileSourceScanExecTransformer}
+
+import org.apache.spark.sql.GlutenSQLTestsBaseTrait
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.internal.SQLConf
+
+class GlutenExtractPythonUDFsSuite extends ExtractPythonUDFsSuite with
GlutenSQLTestsBaseTrait {
+
+ import testImplicits._
+
+ def collectBatchExec(plan: SparkPlan): Seq[BatchEvalPythonExec] =
plan.collect {
+ case b: BatchEvalPythonExec => b
+ }
+
+ def collectColumnarArrowExec(plan: SparkPlan): Seq[EvalPythonExec] =
plan.collect {
+ // To check for ColumnarArrowEvalPythonExec
+ case b: EvalPythonExec
+ if !b.isInstanceOf[ArrowEvalPythonExec] &&
!b.isInstanceOf[BatchEvalPythonExec] =>
+ b
+ }
+
+ testGluten("Chained Scalar Pandas UDFs should be combined to a single
physical node") {
+ val df = Seq(("Hello", 4)).toDF("a", "b")
+ val df2 = df
+ .withColumn("c", scalarPandasUDF(col("a")))
+ .withColumn("d", scalarPandasUDF(col("c")))
+ val arrowEvalNodes =
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+ assert(arrowEvalNodes.size == 1)
+ }
+
+ testGluten("Mixed Batched Python UDFs and Pandas UDF should be separate
physical node") {
+ val df = Seq(("Hello", 4)).toDF("a", "b")
+ val df2 = df
+ .withColumn("c", batchedPythonUDF(col("a")))
+ .withColumn("d", scalarPandasUDF(col("b")))
+
+ val pythonEvalNodes = collectBatchExec(df2.queryExecution.executedPlan)
+ val arrowEvalNodes =
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+ assert(pythonEvalNodes.size == 1)
+ assert(arrowEvalNodes.size == 1)
+ }
+
+ testGluten(
+ "Independent Batched Python UDFs and Scalar Pandas UDFs should be combined
separately") {
+ val df = Seq(("Hello", 4)).toDF("a", "b")
+ val df2 = df
+ .withColumn("c1", batchedPythonUDF(col("a")))
+ .withColumn("c2", batchedPythonUDF(col("c1")))
+ .withColumn("d1", scalarPandasUDF(col("a")))
+ .withColumn("d2", scalarPandasUDF(col("d1")))
+
+ val pythonEvalNodes = collectBatchExec(df2.queryExecution.executedPlan)
+ val arrowEvalNodes =
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+ assert(pythonEvalNodes.size == 1)
+ assert(arrowEvalNodes.size == 1)
+ }
+
+ testGluten("Dependent Batched Python UDFs and Scalar Pandas UDFs should not
be combined") {
+ val df = Seq(("Hello", 4)).toDF("a", "b")
+ val df2 = df
+ .withColumn("c1", batchedPythonUDF(col("a")))
+ .withColumn("d1", scalarPandasUDF(col("c1")))
+ .withColumn("c2", batchedPythonUDF(col("d1")))
+ .withColumn("d2", scalarPandasUDF(col("c2")))
+
+ val pythonEvalNodes = collectBatchExec(df2.queryExecution.executedPlan)
+ val arrowEvalNodes =
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+ assert(pythonEvalNodes.size == 2)
+ assert(arrowEvalNodes.size == 2)
+ }
+
+ testGluten("Python UDF should not break column pruning/filter pushdown --
Parquet V2") {
+ withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
+ withTempPath {
+ f =>
+ spark.range(10).select($"id".as("a"),
$"id".as("b")).write.parquet(f.getCanonicalPath)
+ val df = spark.read.parquet(f.getCanonicalPath)
+
+ withClue("column pruning") {
+ val query = df.filter(batchedPythonUDF($"a")).select($"a")
+
+ val pythonEvalNodes =
collectBatchExec(query.queryExecution.executedPlan)
+ assert(pythonEvalNodes.length == 1)
+
+ val scanNodes = query.queryExecution.executedPlan.collect {
+ case scan: BatchScanExecTransformer => scan
+ }
+ assert(scanNodes.length == 1)
+ assert(scanNodes.head.output.map(_.name) == Seq("a"))
+ }
+
+ withClue("filter pushdown") {
+ val query = df.filter($"a" > 1 && batchedPythonUDF($"a"))
+ val pythonEvalNodes =
collectBatchExec(query.queryExecution.executedPlan)
+ assert(pythonEvalNodes.length == 1)
+
+ val scanNodes = query.queryExecution.executedPlan.collect {
+ case scan: BatchScanExecTransformer => scan
+ }
+ assert(scanNodes.length == 1)
+ // $"a" is not null and $"a" > 1
+ val filters =
scanNodes.head.scan.asInstanceOf[ParquetScan].pushedFilters
+ assert(filters.length == 2)
+ assert(filters.flatMap(_.references).distinct === Array("a"))
+ }
+ }
+ }
+ }
+
+ testGluten("Python UDF should not break column pruning/filter pushdown --
Parquet V1") {
+ withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
+ withTempPath {
+ f =>
+ spark.range(10).select($"id".as("a"),
$"id".as("b")).write.parquet(f.getCanonicalPath)
+ val df = spark.read.parquet(f.getCanonicalPath)
+
+ withClue("column pruning") {
+ val query = df.filter(batchedPythonUDF($"a")).select($"a")
+
+ val pythonEvalNodes =
collectBatchExec(query.queryExecution.executedPlan)
+ assert(pythonEvalNodes.length == 1)
+
+ val scanNodes = query.queryExecution.executedPlan.collect {
+ case scan: FileSourceScanExecTransformer => scan
+ }
+ assert(scanNodes.length == 1)
+ assert(scanNodes.head.output.map(_.name) == Seq("a"))
+ }
+
+ withClue("filter pushdown") {
+ val query = df.filter($"a" > 1 && batchedPythonUDF($"a"))
+ val pythonEvalNodes =
collectBatchExec(query.queryExecution.executedPlan)
+ assert(pythonEvalNodes.length == 1)
+
+ val scanNodes = query.queryExecution.executedPlan.collect {
+ case scan: FileSourceScanExecTransformer => scan
+ }
+ assert(scanNodes.length == 1)
+ // $"a" is not null and $"a" > 1
+ assert(scanNodes.head.dataFilters.length == 2)
+ assert(
+
scanNodes.head.dataFilters.flatMap(_.references.map(_.name)).distinct ==
Seq("a"))
+ }
+ }
+ }
+ }
+}
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index e470e22ad3..77cbd2ad08 100644
---
a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -909,6 +909,14 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("Nested Python UDF: push down deterministic FilterExec
predicates")
.exclude("Python UDF: no push down on non-deterministic")
.exclude("Python UDF: push down on deterministic predicates after the
first non-deterministic")
+ enableSuite[GlutenExtractPythonUDFsSuite]
+ // Replaced with test that check for native operations
+ .exclude("Python UDF should not break column pruning/filter pushdown --
Parquet V1")
+ .exclude("Chained Scalar Pandas UDFs should be combined to a single
physical node")
+ .exclude("Mixed Batched Python UDFs and Pandas UDF should be separate
physical node")
+ .exclude("Independent Batched Python UDFs and Scalar Pandas UDFs should be
combined separately")
+ .exclude("Dependent Batched Python UDFs and Scalar Pandas UDFs should not
be combined")
+ .exclude("Python UDF should not break column pruning/filter pushdown --
Parquet V2")
override def getSQLQueryTestSettings: SQLQueryTestSettings =
VeloxSQLQueryTestSettings
}
diff --git
a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/python/GlutenExtractPythonUDFsSuite.scala
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/python/GlutenExtractPythonUDFsSuite.scala
new file mode 100644
index 0000000000..1cd34bbf78
--- /dev/null
+++
b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/execution/python/GlutenExtractPythonUDFsSuite.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.gluten.execution.{BatchScanExecTransformer,
FileSourceScanExecTransformer}
+
+import org.apache.spark.sql.GlutenSQLTestsBaseTrait
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.internal.SQLConf
+
+class GlutenExtractPythonUDFsSuite extends ExtractPythonUDFsSuite with
GlutenSQLTestsBaseTrait {
+
+ import testImplicits._
+
+ def collectBatchExec(plan: SparkPlan): Seq[BatchEvalPythonExec] =
plan.collect {
+ case b: BatchEvalPythonExec => b
+ }
+
+ def collectColumnarArrowExec(plan: SparkPlan): Seq[EvalPythonExec] =
plan.collect {
+ // To check for ColumnarArrowEvalPythonExec
+ case b: EvalPythonExec
+ if !b.isInstanceOf[ArrowEvalPythonExec] &&
!b.isInstanceOf[BatchEvalPythonExec] =>
+ b
+ }
+
+ testGluten("Chained Scalar Pandas UDFs should be combined to a single
physical node") {
+ val df = Seq(("Hello", 4)).toDF("a", "b")
+ val df2 = df
+ .withColumn("c", scalarPandasUDF(col("a")))
+ .withColumn("d", scalarPandasUDF(col("c")))
+ val arrowEvalNodes =
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+ assert(arrowEvalNodes.size == 1)
+ }
+
+ testGluten("Mixed Batched Python UDFs and Pandas UDF should be separate
physical node") {
+ val df = Seq(("Hello", 4)).toDF("a", "b")
+ val df2 = df
+ .withColumn("c", batchedPythonUDF(col("a")))
+ .withColumn("d", scalarPandasUDF(col("b")))
+
+ val pythonEvalNodes = collectBatchExec(df2.queryExecution.executedPlan)
+ val arrowEvalNodes =
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+ assert(pythonEvalNodes.size == 1)
+ assert(arrowEvalNodes.size == 1)
+ }
+
+ testGluten(
+ "Independent Batched Python UDFs and Scalar Pandas UDFs should be combined
separately") {
+ val df = Seq(("Hello", 4)).toDF("a", "b")
+ val df2 = df
+ .withColumn("c1", batchedPythonUDF(col("a")))
+ .withColumn("c2", batchedPythonUDF(col("c1")))
+ .withColumn("d1", scalarPandasUDF(col("a")))
+ .withColumn("d2", scalarPandasUDF(col("d1")))
+
+ val pythonEvalNodes = collectBatchExec(df2.queryExecution.executedPlan)
+ val arrowEvalNodes =
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+ assert(pythonEvalNodes.size == 1)
+ assert(arrowEvalNodes.size == 1)
+ }
+
+ testGluten("Dependent Batched Python UDFs and Scalar Pandas UDFs should not
be combined") {
+ val df = Seq(("Hello", 4)).toDF("a", "b")
+ val df2 = df
+ .withColumn("c1", batchedPythonUDF(col("a")))
+ .withColumn("d1", scalarPandasUDF(col("c1")))
+ .withColumn("c2", batchedPythonUDF(col("d1")))
+ .withColumn("d2", scalarPandasUDF(col("c2")))
+
+ val pythonEvalNodes = collectBatchExec(df2.queryExecution.executedPlan)
+ val arrowEvalNodes =
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+ assert(pythonEvalNodes.size == 2)
+ assert(arrowEvalNodes.size == 2)
+ }
+
+ testGluten("Python UDF should not break column pruning/filter pushdown --
Parquet V2") {
+ withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
+ withTempPath {
+ f =>
+ spark.range(10).select($"id".as("a"),
$"id".as("b")).write.parquet(f.getCanonicalPath)
+ val df = spark.read.parquet(f.getCanonicalPath)
+
+ withClue("column pruning") {
+ val query = df.filter(batchedPythonUDF($"a")).select($"a")
+
+ val pythonEvalNodes =
collectBatchExec(query.queryExecution.executedPlan)
+ assert(pythonEvalNodes.length == 1)
+
+ val scanNodes = query.queryExecution.executedPlan.collect {
+ case scan: BatchScanExecTransformer => scan
+ }
+ assert(scanNodes.length == 1)
+ assert(scanNodes.head.output.map(_.name) == Seq("a"))
+ }
+
+ withClue("filter pushdown") {
+ val query = df.filter($"a" > 1 && batchedPythonUDF($"a"))
+ val pythonEvalNodes =
collectBatchExec(query.queryExecution.executedPlan)
+ assert(pythonEvalNodes.length == 1)
+
+ val scanNodes = query.queryExecution.executedPlan.collect {
+ case scan: BatchScanExecTransformer => scan
+ }
+ assert(scanNodes.length == 1)
+ // $"a" is not null and $"a" > 1
+ val filters =
scanNodes.head.scan.asInstanceOf[ParquetScan].pushedFilters
+ assert(filters.length == 2)
+ assert(filters.flatMap(_.references).distinct === Array("a"))
+ }
+ }
+ }
+ }
+
+ testGluten("Python UDF should not break column pruning/filter pushdown --
Parquet V1") {
+ withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
+ withTempPath {
+ f =>
+ spark.range(10).select($"id".as("a"),
$"id".as("b")).write.parquet(f.getCanonicalPath)
+ val df = spark.read.parquet(f.getCanonicalPath)
+
+ withClue("column pruning") {
+ val query = df.filter(batchedPythonUDF($"a")).select($"a")
+
+ val pythonEvalNodes =
collectBatchExec(query.queryExecution.executedPlan)
+ assert(pythonEvalNodes.length == 1)
+
+ val scanNodes = query.queryExecution.executedPlan.collect {
+ case scan: FileSourceScanExecTransformer => scan
+ }
+ assert(scanNodes.length == 1)
+ assert(scanNodes.head.output.map(_.name) == Seq("a"))
+ }
+
+ withClue("filter pushdown") {
+ val query = df.filter($"a" > 1 && batchedPythonUDF($"a"))
+ val pythonEvalNodes =
collectBatchExec(query.queryExecution.executedPlan)
+ assert(pythonEvalNodes.length == 1)
+
+ val scanNodes = query.queryExecution.executedPlan.collect {
+ case scan: FileSourceScanExecTransformer => scan
+ }
+ assert(scanNodes.length == 1)
+ // $"a" is not null and $"a" > 1
+ assert(scanNodes.head.dataFilters.length == 2)
+ assert(
+
scanNodes.head.dataFilters.flatMap(_.references.map(_.name)).distinct ==
Seq("a"))
+ }
+ }
+ }
+ }
+}
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
index 089c6f4c16..512d4b6bb9 100644
---
a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
+++
b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala
@@ -949,6 +949,14 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("Nested Python UDF: push down deterministic FilterExec
predicates")
.exclude("Python UDF: no push down on non-deterministic")
.exclude("Python UDF: push down on deterministic predicates after the
first non-deterministic")
+ enableSuite[GlutenExtractPythonUDFsSuite]
+ // Replaced with test that check for native operations
+ .exclude("Python UDF should not break column pruning/filter pushdown --
Parquet V1")
+ .exclude("Chained Scalar Pandas UDFs should be combined to a single
physical node")
+ .exclude("Mixed Batched Python UDFs and Pandas UDF should be separate
physical node")
+ .exclude("Independent Batched Python UDFs and Scalar Pandas UDFs should be
combined separately")
+ .exclude("Dependent Batched Python UDFs and Scalar Pandas UDFs should not
be combined")
+ .exclude("Python UDF should not break column pruning/filter pushdown --
Parquet V2")
override def getSQLQueryTestSettings: SQLQueryTestSettings =
VeloxSQLQueryTestSettings
}
diff --git
a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/python/GlutenExtractPythonUDFsSuite.scala
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/python/GlutenExtractPythonUDFsSuite.scala
new file mode 100644
index 0000000000..1cd34bbf78
--- /dev/null
+++
b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/execution/python/GlutenExtractPythonUDFsSuite.scala
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.python
+
+import org.apache.gluten.execution.{BatchScanExecTransformer,
FileSourceScanExecTransformer}
+
+import org.apache.spark.sql.GlutenSQLTestsBaseTrait
+import org.apache.spark.sql.execution.SparkPlan
+import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetScan
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.internal.SQLConf
+
+class GlutenExtractPythonUDFsSuite extends ExtractPythonUDFsSuite with
GlutenSQLTestsBaseTrait {
+
+ import testImplicits._
+
+ def collectBatchExec(plan: SparkPlan): Seq[BatchEvalPythonExec] =
plan.collect {
+ case b: BatchEvalPythonExec => b
+ }
+
+ def collectColumnarArrowExec(plan: SparkPlan): Seq[EvalPythonExec] =
plan.collect {
+ // To check for ColumnarArrowEvalPythonExec
+ case b: EvalPythonExec
+ if !b.isInstanceOf[ArrowEvalPythonExec] &&
!b.isInstanceOf[BatchEvalPythonExec] =>
+ b
+ }
+
+ testGluten("Chained Scalar Pandas UDFs should be combined to a single
physical node") {
+ val df = Seq(("Hello", 4)).toDF("a", "b")
+ val df2 = df
+ .withColumn("c", scalarPandasUDF(col("a")))
+ .withColumn("d", scalarPandasUDF(col("c")))
+ val arrowEvalNodes =
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+ assert(arrowEvalNodes.size == 1)
+ }
+
+ testGluten("Mixed Batched Python UDFs and Pandas UDF should be separate
physical node") {
+ val df = Seq(("Hello", 4)).toDF("a", "b")
+ val df2 = df
+ .withColumn("c", batchedPythonUDF(col("a")))
+ .withColumn("d", scalarPandasUDF(col("b")))
+
+ val pythonEvalNodes = collectBatchExec(df2.queryExecution.executedPlan)
+ val arrowEvalNodes =
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+ assert(pythonEvalNodes.size == 1)
+ assert(arrowEvalNodes.size == 1)
+ }
+
+ testGluten(
+ "Independent Batched Python UDFs and Scalar Pandas UDFs should be combined
separately") {
+ val df = Seq(("Hello", 4)).toDF("a", "b")
+ val df2 = df
+ .withColumn("c1", batchedPythonUDF(col("a")))
+ .withColumn("c2", batchedPythonUDF(col("c1")))
+ .withColumn("d1", scalarPandasUDF(col("a")))
+ .withColumn("d2", scalarPandasUDF(col("d1")))
+
+ val pythonEvalNodes = collectBatchExec(df2.queryExecution.executedPlan)
+ val arrowEvalNodes =
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+ assert(pythonEvalNodes.size == 1)
+ assert(arrowEvalNodes.size == 1)
+ }
+
+ testGluten("Dependent Batched Python UDFs and Scalar Pandas UDFs should not
be combined") {
+ val df = Seq(("Hello", 4)).toDF("a", "b")
+ val df2 = df
+ .withColumn("c1", batchedPythonUDF(col("a")))
+ .withColumn("d1", scalarPandasUDF(col("c1")))
+ .withColumn("c2", batchedPythonUDF(col("d1")))
+ .withColumn("d2", scalarPandasUDF(col("c2")))
+
+ val pythonEvalNodes = collectBatchExec(df2.queryExecution.executedPlan)
+ val arrowEvalNodes =
collectColumnarArrowExec(df2.queryExecution.executedPlan)
+ assert(pythonEvalNodes.size == 2)
+ assert(arrowEvalNodes.size == 2)
+ }
+
+ testGluten("Python UDF should not break column pruning/filter pushdown --
Parquet V2") {
+ withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "") {
+ withTempPath {
+ f =>
+ spark.range(10).select($"id".as("a"),
$"id".as("b")).write.parquet(f.getCanonicalPath)
+ val df = spark.read.parquet(f.getCanonicalPath)
+
+ withClue("column pruning") {
+ val query = df.filter(batchedPythonUDF($"a")).select($"a")
+
+ val pythonEvalNodes =
collectBatchExec(query.queryExecution.executedPlan)
+ assert(pythonEvalNodes.length == 1)
+
+ val scanNodes = query.queryExecution.executedPlan.collect {
+ case scan: BatchScanExecTransformer => scan
+ }
+ assert(scanNodes.length == 1)
+ assert(scanNodes.head.output.map(_.name) == Seq("a"))
+ }
+
+ withClue("filter pushdown") {
+ val query = df.filter($"a" > 1 && batchedPythonUDF($"a"))
+ val pythonEvalNodes =
collectBatchExec(query.queryExecution.executedPlan)
+ assert(pythonEvalNodes.length == 1)
+
+ val scanNodes = query.queryExecution.executedPlan.collect {
+ case scan: BatchScanExecTransformer => scan
+ }
+ assert(scanNodes.length == 1)
+ // $"a" is not null and $"a" > 1
+ val filters =
scanNodes.head.scan.asInstanceOf[ParquetScan].pushedFilters
+ assert(filters.length == 2)
+ assert(filters.flatMap(_.references).distinct === Array("a"))
+ }
+ }
+ }
+ }
+
+ testGluten("Python UDF should not break column pruning/filter pushdown --
Parquet V1") {
+ withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") {
+ withTempPath {
+ f =>
+ spark.range(10).select($"id".as("a"),
$"id".as("b")).write.parquet(f.getCanonicalPath)
+ val df = spark.read.parquet(f.getCanonicalPath)
+
+ withClue("column pruning") {
+ val query = df.filter(batchedPythonUDF($"a")).select($"a")
+
+ val pythonEvalNodes =
collectBatchExec(query.queryExecution.executedPlan)
+ assert(pythonEvalNodes.length == 1)
+
+ val scanNodes = query.queryExecution.executedPlan.collect {
+ case scan: FileSourceScanExecTransformer => scan
+ }
+ assert(scanNodes.length == 1)
+ assert(scanNodes.head.output.map(_.name) == Seq("a"))
+ }
+
+ withClue("filter pushdown") {
+ val query = df.filter($"a" > 1 && batchedPythonUDF($"a"))
+ val pythonEvalNodes =
collectBatchExec(query.queryExecution.executedPlan)
+ assert(pythonEvalNodes.length == 1)
+
+ val scanNodes = query.queryExecution.executedPlan.collect {
+ case scan: FileSourceScanExecTransformer => scan
+ }
+ assert(scanNodes.length == 1)
+ // $"a" is not null and $"a" > 1
+ assert(scanNodes.head.dataFilters.length == 2)
+ assert(
+
scanNodes.head.dataFilters.flatMap(_.references.map(_.name)).distinct ==
Seq("a"))
+ }
+ }
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]