[ 
https://issues.apache.org/jira/browse/SPARK-37290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kaya Kupferschmidt updated SPARK-37290:
---------------------------------------
    Description: 
We are experiencing an exponential growth of processing time in case of some 
DataFrame queries including non-deterministic functions. I could create a small 
example program, which can be pasted into the Spark shell for reproducing the 
issue:
{code:scala}
val adselect_raw = spark.createDataFrame(Seq(("imp-1",1),("imp-2",2)))
    .cache()
val adselect = adselect_raw.select(
        expr("uuid()").alias("userUuid"),
        expr("_1").alias("impressionUuid"),
        expr("_1").alias("accessDateTime"),
        expr("_1").alias("publisher"),
        expr("_1").alias("site"),
        expr("_1").alias("placement"),
        expr("_1").alias("advertiser"),
        expr("_1").alias("campaign"),
        expr("_1").alias("lineItem"),
        expr("_1").alias("creative"),
        expr("_1").alias("browserLanguage"),
        expr("_1").alias("geoLocode"),
        expr("_1").alias("osFamily"),
        expr("_1").alias("osName"),
        expr("_1").alias("browserName"),
        expr("_1").alias("referrerDomain"),
        expr("_1").alias("placementIabCategory"),
        expr("_1").alias("placementDeviceGroup"),
        expr("_1").alias("placementDevice"),
        expr("_1").alias("placementVideoType"),
        expr("_1").alias("placementSection"),
        expr("_1").alias("placementPlayer"),
        expr("_1").alias("demandType"),
        expr("_1").alias("techCosts"),
        expr("_1").alias("mediaCosts"),
        expr("_1").alias("directSPrice"),
        expr("_1").alias("network"),
        expr("_1").alias("deviceSetting"),
        expr("_1").alias("placementGroup"),
        expr("_1").alias("postalCode"),
        expr("_1").alias("householdId")
    )

val adcount_raw = spark.createDataFrame(Seq(("imp-1", 1), ("imp-2", 2)))
val adcount = adcount_raw.select(
        expr("_1").alias("impressionUuid"),
        expr("_2").alias("accessDateTime")
    )

val result =  adselect.join(adcount, Seq("impressionUuid"))
result.explain()
{code}
Further reducing the program (for example by removing the join or the cache) 
did not show the problem any more.

The problem occurs during planning time and debugging lead me to the function 
{{UnaryNode.getAllValidConstraints}} where the ExpressionSet {{allConstraints}} 
grew with an apparently exponential number of entries for the non-deterministic 
function "{{{}uuid(){}}}" in the code example above. Every time a new column 
from the large select is processed in the {{foreach}} loop in the function 
{{{}UnaryNode.getAllValidConstraints{}}}, the number of entries for the 
{{uuid()}} column in the ExpressionSet seems to be doubled.

As a workaround, we moved the {{uuid()}} column to the end of the list, which 
solved the issue (since all other columns were already processed in the 
{{foreach}} loop).

  was:
We are experiencing an exponential growth of processing time in case of some 
DataFrame queries including non-deterministic functions. I could create a small 
example program, which can be pasted into the Spark shell for reproducing the 
issue:
{code:scala}
val adselect_raw = spark.createDataFrame(Seq(("imp-1",1),("imp-2",2)))
    .cache()
val adselect = adselect_raw.select(
        expr("uuid()").alias("userUuid"),
        expr("_1").alias("impressionUuid"),
        expr("_1").alias("accessDateTime"),
        expr("_1").alias("publisher"),
        expr("_1").alias("site"),
        expr("_1").alias("placement"),
        expr("_1").alias("advertiser"),
        expr("_1").alias("campaign"),
        expr("_1").alias("lineItem"),
        expr("_1").alias("creative"),
        expr("_1").alias("browserLanguage"),
        expr("_1").alias("geoLocode"),
        expr("_1").alias("osFamily"),
        expr("_1").alias("osName"),
        expr("_1").alias("browserName"),
        expr("_1").alias("referrerDomain"),
        expr("_1").alias("placementIabCategory"),
        expr("_1").alias("placementDeviceGroup"),
        expr("_1").alias("placementDevice"),
        expr("_1").alias("placementVideoType"),
        expr("_1").alias("placementSection"),
        expr("_1").alias("placementPlayer"),
        expr("_1").alias("demandType"),
        expr("_1").alias("techCosts"),
        expr("_1").alias("mediaCosts"),
        expr("_1").alias("directSPrice"),
        expr("_1").alias("network"),
        expr("_1").alias("deviceSetting"),
        expr("_1").alias("placementGroup"),
        expr("_1").alias("postalCode"),
        expr("_1").alias("householdId")
    )

val adcount_raw = spark.createDataFrame(Seq(("imp-1", 1), ("imp-2", 2)))
val adcount = adcount_raw.select(
        expr("_1").alias("impressionUuid"),
        expr("_2").alias("accessDateTime")
    )

val result =  adselect.join(adcount, Seq("impressionUuid"))
result.explain()
{code}
Further reducing the program (for example by removing the join or the cache) 
did not show the problem any more.

The problem occurs during planning time and debugging lead me to the function 
`UnaryNode.getAllValidConstraints` where the ExpressionSet `allConstraints` 
grew with an apparently exponential number of entries for the non-deterministic 
function "uuid()" in the code example above. Every time a new column from the 
large select is processed in the `foreach` loop in the function 
`UnaryNode.getAllValidConstraints`, the number of entries for the uuid() column 
in the ExpressionSet seems to be doubled.

As a workaround, we moved the uuid() column to the end of the list, which 
solved the issue (since all other columns were already processed in the 
`foreach` loop).


> Exponential planning time in case of non-deterministic function
> ---------------------------------------------------------------
>
>                 Key: SPARK-37290
>                 URL: https://issues.apache.org/jira/browse/SPARK-37290
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.1.2
>            Reporter: Kaya Kupferschmidt
>            Priority: Major
>
> We are experiencing an exponential growth of processing time in case of some 
> DataFrame queries including non-deterministic functions. I could create a 
> small example program, which can be pasted into the Spark shell for 
> reproducing the issue:
> {code:scala}
> val adselect_raw = spark.createDataFrame(Seq(("imp-1",1),("imp-2",2)))
>     .cache()
> val adselect = adselect_raw.select(
>         expr("uuid()").alias("userUuid"),
>         expr("_1").alias("impressionUuid"),
>         expr("_1").alias("accessDateTime"),
>         expr("_1").alias("publisher"),
>         expr("_1").alias("site"),
>         expr("_1").alias("placement"),
>         expr("_1").alias("advertiser"),
>         expr("_1").alias("campaign"),
>         expr("_1").alias("lineItem"),
>         expr("_1").alias("creative"),
>         expr("_1").alias("browserLanguage"),
>         expr("_1").alias("geoLocode"),
>         expr("_1").alias("osFamily"),
>         expr("_1").alias("osName"),
>         expr("_1").alias("browserName"),
>         expr("_1").alias("referrerDomain"),
>         expr("_1").alias("placementIabCategory"),
>         expr("_1").alias("placementDeviceGroup"),
>         expr("_1").alias("placementDevice"),
>         expr("_1").alias("placementVideoType"),
>         expr("_1").alias("placementSection"),
>         expr("_1").alias("placementPlayer"),
>         expr("_1").alias("demandType"),
>         expr("_1").alias("techCosts"),
>         expr("_1").alias("mediaCosts"),
>         expr("_1").alias("directSPrice"),
>         expr("_1").alias("network"),
>         expr("_1").alias("deviceSetting"),
>         expr("_1").alias("placementGroup"),
>         expr("_1").alias("postalCode"),
>         expr("_1").alias("householdId")
>     )
> val adcount_raw = spark.createDataFrame(Seq(("imp-1", 1), ("imp-2", 2)))
> val adcount = adcount_raw.select(
>         expr("_1").alias("impressionUuid"),
>         expr("_2").alias("accessDateTime")
>     )
> val result =  adselect.join(adcount, Seq("impressionUuid"))
> result.explain()
> {code}
> Further reducing the program (for example by removing the join or the cache) 
> did not show the problem any more.
> The problem occurs during planning time and debugging lead me to the function 
> {{UnaryNode.getAllValidConstraints}} where the ExpressionSet 
> {{allConstraints}} grew with an apparently exponential number of entries for 
> the non-deterministic function "{{{}uuid(){}}}" in the code example above. 
> Every time a new column from the large select is processed in the {{foreach}} 
> loop in the function {{{}UnaryNode.getAllValidConstraints{}}}, the number of 
> entries for the {{uuid()}} column in the ExpressionSet seems to be doubled.
> As a workaround, we moved the {{uuid()}} column to the end of the list, which 
> solved the issue (since all other columns were already processed in the 
> {{foreach}} loop).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to