[ 
https://issues.apache.org/jira/browse/SPARK-34644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gavrilescu Laurentiu updated SPARK-34644:
-----------------------------------------
    Description: 
*Applying an UDF followed by explode calls the UDF multiple times.*
 Using *persist* after applying the UDF mitigates the problem  
 Consider the following code to reproduce it:
{code:java}
object Bug {

  def main(args: Array[String]) {
    val sparkSession: SparkSession = 
SparkSession.builder.master("local[4]").getOrCreate()

    val invocations = sparkSession.sparkContext.longAccumulator("invocations")
    def showTiming[T](body: => T): T = {
      val t0 = System.nanoTime()
      invocations.reset()
      val res = body
      val t1 = System.nanoTime()
      println(s"invocations=${invocations.value}, time=${(t1 - t0) / 1e9}")
      res
    }

    def expensive(n: Int) = {
      Thread.sleep(100)
      invocations.add(1)
      1
    }
    val expensiveUdf = udf((x: Int) => (1 to 10) map { _ => expensive(x) })

    val df = sparkSession.range(10).toDF()
    showTiming(df
      .select(explode(expensiveUdf(col("id"))))
      .select(sum(col("col")))
      .show())

    showTiming(df.select(expensiveUdf(col("id")).as("values"))
      .persist()
      .select(explode(col("values")))
      .select(sum("col"))
      .show())

  }
}
{code}
 =>
{code:java}
first:  invocations=300, time=11.342076635
second: invocations=100, time=3.351682967{code}
This behavior can have undesired behavior and can return wrong results if a 
managed state is used inside the UDF.
 Imagine having the following scenario:

1. you have a dataframe with some string columns
 2. you have an expensive function that creates a score based on some string 
input
 3. you want to get all the distinct values from all the columns and their 
score - there is an executor level cache that holds the score values for 
strings to minimize the execution of the expensive function

consider the following code to reproduce it:
{code:java}
case class RowWithStrings(c1: String, c2: String, c3: String)
case class ValueScore(value: String, score: Double)

object Bug {

  val columns: List[String] = List("c1", "c2", "c3")

  def score(input: String): Double = {
    // insert expensive function here
    input.toDouble
  }

  def main(args: Array[String]) {
    lazy val sparkSession: SparkSession = {
      val sparkSession = SparkSession.builder.master("local[4]")
        .getOrCreate()

      sparkSession
    }

    // some cache over expensive operation
    val cache: TrieMap[String, Double] = TrieMap[String, Double]()

    // get scores for all columns in the row
    val body = (row: Row) => {
      val arr = ArrayBuffer[ValueScore]()
      columns foreach {
        column =>
          val value = row.getAs[String](column)
          if (!cache.contains(value)) {
            val computedScore = score(value)
            arr += ValueScore(value, computedScore)
            cache(value) = computedScore
          }
      }
      arr
    }

    val basicUdf = udf(body)

    val values = (1 to 5) map {
      idx =>
        // repeated values
        RowWithStrings(idx.toString, idx.toString, idx.toString)
    }

    import sparkSession.implicits._
    val df = values.toDF("c1", "c2", "c3").persist()
    val allCols = df.columns.map(col)

    df.select(basicUdf(struct(allCols: _*)).as("valuesScore"))
      .select(explode(col("valuesScore")))
      .distinct()
      .show()
  }
}
{code}
 this shows:
{code:java}
+---+
|col|
+---+
+---+
{code}
When adding persist before explode, the result is correct:
{code:java}
df.select(basicUdf(struct(allCols: _*)).as("valuesScore"))
  .persist()
  .select(explode(col("valuesScore")))
  .distinct()
  .show()
{code}
=>
{code:java}
+--------+
|     col|
+--------+
|{2, 2.0}|
|{4, 4.0}|
|{3, 3.0}|
|{5, 5.0}|
|{1, 1.0}|
+--------+
{code}
This is not reproducible using 3.0.2 version.  

  was:
*Applying an UDF followed by explode calls the UDF multiple times.*
 Using *persist* after applying the UDF mitigates ** the problem  
 Consider the following code to reproduce it:
{code:java}
object Bug {

  def main(args: Array[String]) {
    val sparkSession: SparkSession = 
SparkSession.builder.master("local[4]").getOrCreate()

    val invocations = sparkSession.sparkContext.longAccumulator("invocations")
    def showTiming[T](body: => T): T = {
      val t0 = System.nanoTime()
      invocations.reset()
      val res = body
      val t1 = System.nanoTime()
      println(s"invocations=${invocations.value}, time=${(t1 - t0) / 1e9}")
      res
    }

    def expensive(n: Int) = {
      Thread.sleep(100)
      invocations.add(1)
      1
    }
    val expensiveUdf = udf((x: Int) => (1 to 10) map { _ => expensive(x) })

    val df = sparkSession.range(10).toDF()
    showTiming(df
      .select(explode(expensiveUdf(col("id"))))
      .select(sum(col("col")))
      .show())

    showTiming(df.select(expensiveUdf(col("id")).as("values"))
      .persist()
      .select(explode(col("values")))
      .select(sum("col"))
      .show())

  }
}
{code}
 =>
{code:java}
first:  invocations=300, time=11.342076635
second: invocations=100, time=3.351682967{code}
This behavior can have undesired behavior and can return wrong results if a 
managed state is used inside the UDF.
 Imagine having the following scenario:

1. you have a dataframe with some string columns
 2. you have an expensive function that creates a score based on some string 
input
 3. you want to get all the distinct values from all the columns and their 
score - there is an executor level cache that holds the score values for 
strings to minimize the execution of the expensive function

consider the following code to reproduce it:
{code:java}
case class RowWithStrings(c1: String, c2: String, c3: String)
case class ValueScore(value: String, score: Double)

object Bug {

  val columns: List[String] = List("c1", "c2", "c3")

  def score(input: String): Double = {
    // insert expensive function here
    input.toDouble
  }

  def main(args: Array[String]) {
    lazy val sparkSession: SparkSession = {
      val sparkSession = SparkSession.builder.master("local[4]")
        .getOrCreate()

      sparkSession
    }

    // some cache over expensive operation
    val cache: TrieMap[String, Double] = TrieMap[String, Double]()

    // get scores for all columns in the row
    val body = (row: Row) => {
      val arr = ArrayBuffer[ValueScore]()
      columns foreach {
        column =>
          val value = row.getAs[String](column)
          if (!cache.contains(value)) {
            val computedScore = score(value)
            arr += ValueScore(value, computedScore)
            cache(value) = computedScore
          }
      }
      arr
    }

    val basicUdf = udf(body)

    val values = (1 to 5) map {
      idx =>
        // repeated values
        RowWithStrings(idx.toString, idx.toString, idx.toString)
    }

    import sparkSession.implicits._
    val df = values.toDF("c1", "c2", "c3").persist()
    val allCols = df.columns.map(col)

    df.select(basicUdf(struct(allCols: _*)).as("valuesScore"))
      .select(explode(col("valuesScore")))
      .distinct()
      .show()
  }
}
{code}
 this shows:
{code:java}
+---+
|col|
+---+
+---+
{code}
When adding persist before explode, the result is correct:
{code:java}
df.select(basicUdf(struct(allCols: _*)).as("valuesScore"))
  .persist()
  .select(explode(col("valuesScore")))
  .distinct()
  .show()
{code}
=>
{code:java}
+--------+
|     col|
+--------+
|{2, 2.0}|
|{4, 4.0}|
|{3, 3.0}|
|{5, 5.0}|
|{1, 1.0}|
+--------+
{code}
This is not reproducible using 3.0.2 version.  


> UDF returning array followed by explode calls the UDF multiple times and 
> could return wrong results
> ---------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-34644
>                 URL: https://issues.apache.org/jira/browse/SPARK-34644
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.1.1
>            Reporter: Gavrilescu Laurentiu
>            Priority: Major
>
> *Applying an UDF followed by explode calls the UDF multiple times.*
>  Using *persist* after applying the UDF mitigates the problem  
>  Consider the following code to reproduce it:
> {code:java}
> object Bug {
>   def main(args: Array[String]) {
>     val sparkSession: SparkSession = 
> SparkSession.builder.master("local[4]").getOrCreate()
>     val invocations = sparkSession.sparkContext.longAccumulator("invocations")
>     def showTiming[T](body: => T): T = {
>       val t0 = System.nanoTime()
>       invocations.reset()
>       val res = body
>       val t1 = System.nanoTime()
>       println(s"invocations=${invocations.value}, time=${(t1 - t0) / 1e9}")
>       res
>     }
>     def expensive(n: Int) = {
>       Thread.sleep(100)
>       invocations.add(1)
>       1
>     }
>     val expensiveUdf = udf((x: Int) => (1 to 10) map { _ => expensive(x) })
>     val df = sparkSession.range(10).toDF()
>     showTiming(df
>       .select(explode(expensiveUdf(col("id"))))
>       .select(sum(col("col")))
>       .show())
>     showTiming(df.select(expensiveUdf(col("id")).as("values"))
>       .persist()
>       .select(explode(col("values")))
>       .select(sum("col"))
>       .show())
>   }
> }
> {code}
>  =>
> {code:java}
> first:  invocations=300, time=11.342076635
> second: invocations=100, time=3.351682967{code}
> This behavior can have undesired behavior and can return wrong results if a 
> managed state is used inside the UDF.
>  Imagine having the following scenario:
> 1. you have a dataframe with some string columns
>  2. you have an expensive function that creates a score based on some string 
> input
>  3. you want to get all the distinct values from all the columns and their 
> score - there is an executor level cache that holds the score values for 
> strings to minimize the execution of the expensive function
> consider the following code to reproduce it:
> {code:java}
> case class RowWithStrings(c1: String, c2: String, c3: String)
> case class ValueScore(value: String, score: Double)
> object Bug {
>   val columns: List[String] = List("c1", "c2", "c3")
>   def score(input: String): Double = {
>     // insert expensive function here
>     input.toDouble
>   }
>   def main(args: Array[String]) {
>     lazy val sparkSession: SparkSession = {
>       val sparkSession = SparkSession.builder.master("local[4]")
>         .getOrCreate()
>       sparkSession
>     }
>     // some cache over expensive operation
>     val cache: TrieMap[String, Double] = TrieMap[String, Double]()
>     // get scores for all columns in the row
>     val body = (row: Row) => {
>       val arr = ArrayBuffer[ValueScore]()
>       columns foreach {
>         column =>
>           val value = row.getAs[String](column)
>           if (!cache.contains(value)) {
>             val computedScore = score(value)
>             arr += ValueScore(value, computedScore)
>             cache(value) = computedScore
>           }
>       }
>       arr
>     }
>     val basicUdf = udf(body)
>     val values = (1 to 5) map {
>       idx =>
>         // repeated values
>         RowWithStrings(idx.toString, idx.toString, idx.toString)
>     }
>     import sparkSession.implicits._
>     val df = values.toDF("c1", "c2", "c3").persist()
>     val allCols = df.columns.map(col)
>     df.select(basicUdf(struct(allCols: _*)).as("valuesScore"))
>       .select(explode(col("valuesScore")))
>       .distinct()
>       .show()
>   }
> }
> {code}
>  this shows:
> {code:java}
> +---+
> |col|
> +---+
> +---+
> {code}
> When adding persist before explode, the result is correct:
> {code:java}
> df.select(basicUdf(struct(allCols: _*)).as("valuesScore"))
>   .persist()
>   .select(explode(col("valuesScore")))
>   .distinct()
>   .show()
> {code}
> =>
> {code:java}
> +--------+
> |     col|
> +--------+
> |{2, 2.0}|
> |{4, 4.0}|
> |{3, 3.0}|
> |{5, 5.0}|
> |{1, 1.0}|
> +--------+
> {code}
> This is not reproducible using 3.0.2 version.  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to