[ 
https://issues.apache.org/jira/browse/SPARK-37290?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kaya Kupferschmidt updated SPARK-37290:
---------------------------------------
    Description: 
We are experiencing an exponential growth of processing time in case of some 
DataFrame queries including non-deterministic functions. I could create a small 
example program, which can be pasted into the Spark shell for reproducing the 
issue:
{code:scala}
val adselect_raw = spark.createDataFrame(Seq(("imp-1",1),("imp-2",2)))
    .cache()
val adselect = adselect_raw.select(
        expr("uuid()").alias("userUuid"),
        expr("_1").alias("impressionUuid"),
        expr("_1").alias("accessDateTime"),
        expr("_1").alias("publisher"),
        expr("_1").alias("site"),
        expr("_1").alias("placement"),
        expr("_1").alias("advertiser"),
        expr("_1").alias("campaign"),
        expr("_1").alias("lineItem"),
        expr("_1").alias("creative"),
        expr("_1").alias("browserLanguage"),
        expr("_1").alias("geoLocode"),
        expr("_1").alias("osFamily"),
        expr("_1").alias("osName"),
        expr("_1").alias("browserName"),
        expr("_1").alias("referrerDomain"),
        expr("_1").alias("placementIabCategory"),
        expr("_1").alias("placementDeviceGroup"),
        expr("_1").alias("placementDevice"),
        expr("_1").alias("placementVideoType"),
        expr("_1").alias("placementSection"),
        expr("_1").alias("placementPlayer"),
        expr("_1").alias("demandType"),
        expr("_1").alias("techCosts"),
        expr("_1").alias("mediaCosts"),
        expr("_1").alias("directSPrice"),
        expr("_1").alias("network"),
        expr("_1").alias("deviceSetting"),
        expr("_1").alias("placementGroup"),
        expr("_1").alias("postalCode"),
        expr("_1").alias("householdId")
    )

val adcount_raw = spark.createDataFrame(Seq(("imp-1", 1), ("imp-2", 2)))
val adcount = adcount_raw.select(
        expr("_1").alias("impressionUuid"),
        expr("_2").alias("accessDateTime")
    )

val result =  adselect.join(adcount, Seq("impressionUuid"))
result.explain()
{code}
Further reducing the program (for example by removing the join or the cache) 
did not show the problem any more.

The problem occurs during planning time and debugging lead me to the function 
{{UnaryNode.getAllValidConstraints}} where the local variable 
{{allConstraints}} grew with an apparently exponential number of entries for 
the non-deterministic function "{{{}uuid(){}}}" in the code example above. 
Every time a new column from the large select is processed in the {{foreach}} 
loop in the function {{{}UnaryNode.getAllValidConstraints{}}}, the number of 
entries for the {{uuid()}} column in the ExpressionSet seems to be doubled:
{code:scala}
trait UnaryNode extends LogicalPlan with UnaryLike[LogicalPlan] {
  override def getAllValidConstraints(projectList: Seq[NamedExpression]): 
ExpressionSet = {
    var allConstraints = child.constraints
    projectList.foreach {
      case a @ Alias(l: Literal, _) =>
        allConstraints += EqualNullSafe(a.toAttribute, l)
      case a @ Alias(e, _) =>
        // KK: Since the ExpressionSet handles each non-deterministic function 
as a separate entry, each "uuid()" entry in allConstraints is re-added over an 
over again in every iteration, 
        // thereby doubling the list every time    
        allConstraints ++= allConstraints.map(_ transform {
          case expr: Expression if expr.semanticEquals(e) =>
            a.toAttribute
        })
        allConstraints += EqualNullSafe(e, a.toAttribute)
      case _ => // Don't change.
    }

    allConstraints
  }
}
{code}
As a workaround, we moved the {{uuid()}} column in our code to the end of the 
list in the select statement, which solved the issue (since all other columns 
were already processed in the {{foreach}} loop).

  was:
We are experiencing an exponential growth of processing time in case of some 
DataFrame queries including non-deterministic functions. I could create a small 
example program, which can be pasted into the Spark shell for reproducing the 
issue:
{code:scala}
val adselect_raw = spark.createDataFrame(Seq(("imp-1",1),("imp-2",2)))
    .cache()
val adselect = adselect_raw.select(
        expr("uuid()").alias("userUuid"),
        expr("_1").alias("impressionUuid"),
        expr("_1").alias("accessDateTime"),
        expr("_1").alias("publisher"),
        expr("_1").alias("site"),
        expr("_1").alias("placement"),
        expr("_1").alias("advertiser"),
        expr("_1").alias("campaign"),
        expr("_1").alias("lineItem"),
        expr("_1").alias("creative"),
        expr("_1").alias("browserLanguage"),
        expr("_1").alias("geoLocode"),
        expr("_1").alias("osFamily"),
        expr("_1").alias("osName"),
        expr("_1").alias("browserName"),
        expr("_1").alias("referrerDomain"),
        expr("_1").alias("placementIabCategory"),
        expr("_1").alias("placementDeviceGroup"),
        expr("_1").alias("placementDevice"),
        expr("_1").alias("placementVideoType"),
        expr("_1").alias("placementSection"),
        expr("_1").alias("placementPlayer"),
        expr("_1").alias("demandType"),
        expr("_1").alias("techCosts"),
        expr("_1").alias("mediaCosts"),
        expr("_1").alias("directSPrice"),
        expr("_1").alias("network"),
        expr("_1").alias("deviceSetting"),
        expr("_1").alias("placementGroup"),
        expr("_1").alias("postalCode"),
        expr("_1").alias("householdId")
    )

val adcount_raw = spark.createDataFrame(Seq(("imp-1", 1), ("imp-2", 2)))
val adcount = adcount_raw.select(
        expr("_1").alias("impressionUuid"),
        expr("_2").alias("accessDateTime")
    )

val result =  adselect.join(adcount, Seq("impressionUuid"))
result.explain()
{code}
Further reducing the program (for example by removing the join or the cache) 
did not show the problem any more.

The problem occurs during planning time and debugging lead me to the function 
{{UnaryNode.getAllValidConstraints}} where the ExpressionSet {{allConstraints}} 
grew with an apparently exponential number of entries for the non-deterministic 
function "{{{}uuid(){}}}" in the code example above. Every time a new column 
from the large select is processed in the {{foreach}} loop in the function 
{{{}UnaryNode.getAllValidConstraints{}}}, the number of entries for the 
{{uuid()}} column in the ExpressionSet seems to be doubled.

As a workaround, we moved the {{uuid()}} column to the end of the list, which 
solved the issue (since all other columns were already processed in the 
{{foreach}} loop).


> Exponential planning time in case of non-deterministic function
> ---------------------------------------------------------------
>
>                 Key: SPARK-37290
>                 URL: https://issues.apache.org/jira/browse/SPARK-37290
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.1.2
>            Reporter: Kaya Kupferschmidt
>            Priority: Major
>
> We are experiencing an exponential growth of processing time in case of some 
> DataFrame queries including non-deterministic functions. I could create a 
> small example program, which can be pasted into the Spark shell for 
> reproducing the issue:
> {code:scala}
> val adselect_raw = spark.createDataFrame(Seq(("imp-1",1),("imp-2",2)))
>     .cache()
> val adselect = adselect_raw.select(
>         expr("uuid()").alias("userUuid"),
>         expr("_1").alias("impressionUuid"),
>         expr("_1").alias("accessDateTime"),
>         expr("_1").alias("publisher"),
>         expr("_1").alias("site"),
>         expr("_1").alias("placement"),
>         expr("_1").alias("advertiser"),
>         expr("_1").alias("campaign"),
>         expr("_1").alias("lineItem"),
>         expr("_1").alias("creative"),
>         expr("_1").alias("browserLanguage"),
>         expr("_1").alias("geoLocode"),
>         expr("_1").alias("osFamily"),
>         expr("_1").alias("osName"),
>         expr("_1").alias("browserName"),
>         expr("_1").alias("referrerDomain"),
>         expr("_1").alias("placementIabCategory"),
>         expr("_1").alias("placementDeviceGroup"),
>         expr("_1").alias("placementDevice"),
>         expr("_1").alias("placementVideoType"),
>         expr("_1").alias("placementSection"),
>         expr("_1").alias("placementPlayer"),
>         expr("_1").alias("demandType"),
>         expr("_1").alias("techCosts"),
>         expr("_1").alias("mediaCosts"),
>         expr("_1").alias("directSPrice"),
>         expr("_1").alias("network"),
>         expr("_1").alias("deviceSetting"),
>         expr("_1").alias("placementGroup"),
>         expr("_1").alias("postalCode"),
>         expr("_1").alias("householdId")
>     )
> val adcount_raw = spark.createDataFrame(Seq(("imp-1", 1), ("imp-2", 2)))
> val adcount = adcount_raw.select(
>         expr("_1").alias("impressionUuid"),
>         expr("_2").alias("accessDateTime")
>     )
> val result =  adselect.join(adcount, Seq("impressionUuid"))
> result.explain()
> {code}
> Further reducing the program (for example by removing the join or the cache) 
> did not show the problem any more.
> The problem occurs during planning time and debugging lead me to the function 
> {{UnaryNode.getAllValidConstraints}} where the local variable 
> {{allConstraints}} grew with an apparently exponential number of entries for 
> the non-deterministic function "{{{}uuid(){}}}" in the code example above. 
> Every time a new column from the large select is processed in the {{foreach}} 
> loop in the function {{{}UnaryNode.getAllValidConstraints{}}}, the number of 
> entries for the {{uuid()}} column in the ExpressionSet seems to be doubled:
> {code:scala}
> trait UnaryNode extends LogicalPlan with UnaryLike[LogicalPlan] {
>   override def getAllValidConstraints(projectList: Seq[NamedExpression]): 
> ExpressionSet = {
>     var allConstraints = child.constraints
>     projectList.foreach {
>       case a @ Alias(l: Literal, _) =>
>         allConstraints += EqualNullSafe(a.toAttribute, l)
>       case a @ Alias(e, _) =>
>         // KK: Since the ExpressionSet handles each non-deterministic 
> function as a separate entry, each "uuid()" entry in allConstraints is 
> re-added over an over again in every iteration, 
>         // thereby doubling the list every time    
>         allConstraints ++= allConstraints.map(_ transform {
>           case expr: Expression if expr.semanticEquals(e) =>
>             a.toAttribute
>         })
>         allConstraints += EqualNullSafe(e, a.toAttribute)
>       case _ => // Don't change.
>     }
>     allConstraints
>   }
> }
> {code}
> As a workaround, we moved the {{uuid()}} column in our code to the end of the 
> list in the select statement, which solved the issue (since all other columns 
> were already processed in the {{foreach}} loop).



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to