[
https://issues.apache.org/jira/browse/SPARK-42115?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hyukjin Kwon updated SPARK-42115:
-
Description:
{code}
from pyspark.sql.functions import udf
spark.range(10).write.mode("overwrite").parquet("/tmp/abc")
@udf(returnType='string')
def my_udf(arg):
return arg
df = spark.read.parquet("/tmp/abc")
df.limit(10).withColumn("prediction", my_udf(df["id"])).explain()
{code}
As an example. since Python UDFs are executed asynchronously, so pushing limits
benefit the performance.
{code}
== Physical Plan ==
CollectLimit 10
+- *(2) Project [id#3L, pythonUDF0#10 AS prediction#6]
+- BatchEvalPython [my_udf(id#3L)#5], [pythonUDF0#10]
+- *(1) ColumnarToRow
+- FileScan parquet [id#3L] Batched: true, DataFilters: [], Format:
Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/abc], PartitionFilters:
[], PushedFilters: [], ReadSchema: struct
{code}
This is a regression from Spark 3.3.1:
{code}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [id#3L, pythonUDF0#10 AS prediction#6]
+- BatchEvalPython [my_udf(id#3L)#5], [pythonUDF0#10]
+- GlobalLimit 10
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=30]
+- LocalLimit 10
+- FileScan parquet [id#3L] Batched: true, DataFilters: [],
Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/abc],
PartitionFilters: [], PushedFilters: [], ReadSchema: struct
{code}
was:
{code}
from pyspark.sql.functions import udf
spark.range(10).write.mode("overwrite").parquet("/tmp/abc")
@udf(returnType='string')
def my_udf(arg):
return arg
df = spark.read.parquet("/tmp/abc")
df = df.limit(10).withColumn("prediction", my_udf(df["id"])).explain()
{code}
As an example. since Python UDFs are executed asynchronously, so pushing limits
benefit the performance.
{code}
== Physical Plan ==
CollectLimit 10
+- *(2) Project [id#3L, pythonUDF0#10 AS prediction#6]
+- BatchEvalPython [my_udf(id#3L)#5], [pythonUDF0#10]
+- *(1) ColumnarToRow
+- FileScan parquet [id#3L] Batched: true, DataFilters: [], Format:
Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/abc], PartitionFilters:
[], PushedFilters: [], ReadSchema: struct
{code}
This is a regression from Spark 3.3.1:
{code}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [id#3L, pythonUDF0#10 AS prediction#6]
+- BatchEvalPython [my_udf(id#3L)#5], [pythonUDF0#10]
+- GlobalLimit 10
+- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=30]
+- LocalLimit 10
+- FileScan parquet [id#3L] Batched: true, DataFilters: [],
Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/abc],
PartitionFilters: [], PushedFilters: [], ReadSchema: struct
{code}
> Push down limit through Python UDFs
> ---
>
> Key: SPARK-42115
> URL: https://issues.apache.org/jira/browse/SPARK-42115
> Project: Spark
> Issue Type: Bug
> Components: PySpark, SQL
>Affects Versions: 3.4.0
>Reporter: Hyukjin Kwon
>Priority: Major
>
> {code}
> from pyspark.sql.functions import udf
> spark.range(10).write.mode("overwrite").parquet("/tmp/abc")
> @udf(returnType='string')
> def my_udf(arg):
> return arg
> df = spark.read.parquet("/tmp/abc")
> df.limit(10).withColumn("prediction", my_udf(df["id"])).explain()
> {code}
> As an example. since Python UDFs are executed asynchronously, so pushing
> limits benefit the performance.
> {code}
> == Physical Plan ==
> CollectLimit 10
> +- *(2) Project [id#3L, pythonUDF0#10 AS prediction#6]
>+- BatchEvalPython [my_udf(id#3L)#5], [pythonUDF0#10]
> +- *(1) ColumnarToRow
> +- FileScan parquet [id#3L] Batched: true, DataFilters: [], Format:
> Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/abc],
> PartitionFilters: [], PushedFilters: [], ReadSchema: struct
> {code}
> This is a regression from Spark 3.3.1:
> {code}
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- Project [id#3L, pythonUDF0#10 AS prediction#6]
> +- BatchEvalPython [my_udf(id#3L)#5], [pythonUDF0#10]
> +- GlobalLimit 10
> +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=30]
> +- LocalLimit 10
> +- FileScan parquet [id#3L] Batched: true, DataFilters: [],
> Format: Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/abc],
> PartitionFilters: [], PushedFilters: [], ReadSchema: struct
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org