[ https://issues.apache.org/jira/browse/SPARK-34644?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Gavrilescu Laurentiu updated SPARK-34644: ----------------------------------------- Description: *Applying an UDF followed by explode calls the UDF multiple times.* Using *persist* after applying the UDF mitigates the problem Consider the following code to reproduce it: {code:java} object Bug { def main(args: Array[String]) { val sparkSession: SparkSession = SparkSession.builder.master("local[4]").getOrCreate() val invocations = sparkSession.sparkContext.longAccumulator("invocations") def showTiming[T](body: => T): T = { val t0 = System.nanoTime() invocations.reset() val res = body val t1 = System.nanoTime() println(s"invocations=${invocations.value}, time=${(t1 - t0) / 1e9}") res } def expensive(n: Int) = { Thread.sleep(100) invocations.add(1) 1 } val expensiveUdf = udf((x: Int) => (1 to 10) map { _ => expensive(x) }) val df = sparkSession.range(10).toDF() showTiming(df .select(explode(expensiveUdf(col("id")))) .select(sum(col("col"))) .show()) showTiming(df.select(expensiveUdf(col("id")).as("values")) .persist() .select(explode(col("values"))) .select(sum("col")) .show()) } } {code} => {code:java} first: invocations=300, time=11.342076635 second: invocations=100, time=3.351682967{code} This behavior can have undesired behavior and can return wrong results if a managed state is used inside the UDF. Imagine having the following scenario: 1. you have a dataframe with some string columns 2. you have an expensive function that creates a score based on some string input 3. you want to get all the distinct values from all the columns and their score - there is an executor level cache that holds the score values for strings to minimize the execution of the expensive function consider the following code to reproduce it: {code:java} case class RowWithStrings(c1: String, c2: String, c3: String) case class ValueScore(value: String, score: Double) object Bug { val columns: List[String] = List("c1", "c2", "c3") def score(input: String): Double = { // insert expensive function here input.toDouble } def main(args: Array[String]) { lazy val sparkSession: SparkSession = { val sparkSession = SparkSession.builder.master("local[4]") .getOrCreate() sparkSession } // some cache over expensive operation val cache: TrieMap[String, Double] = TrieMap[String, Double]() // get scores for all columns in the row val body = (row: Row) => { val arr = ArrayBuffer[ValueScore]() columns foreach { column => val value = row.getAs[String](column) if (!cache.contains(value)) { val computedScore = score(value) arr += ValueScore(value, computedScore) cache(value) = computedScore } } arr } val basicUdf = udf(body) val values = (1 to 5) map { idx => // repeated values RowWithStrings(idx.toString, idx.toString, idx.toString) } import sparkSession.implicits._ val df = values.toDF("c1", "c2", "c3").persist() val allCols = df.columns.map(col) df.select(basicUdf(struct(allCols: _*)).as("valuesScore")) .select(explode(col("valuesScore"))) .distinct() .show() } } {code} this shows: {code:java} +---+ |col| +---+ +---+ {code} When adding persist before explode, the result is correct: {code:java} df.select(basicUdf(struct(allCols: _*)).as("valuesScore")) .persist() .select(explode(col("valuesScore"))) .distinct() .show() {code} => {code:java} +--------+ | col| +--------+ |{2, 2.0}| |{4, 4.0}| |{3, 3.0}| |{5, 5.0}| |{1, 1.0}| +--------+ {code} This is not reproducible using 3.0.2 version. was: *Applying an UDF followed by explode calls the UDF multiple times.* Using *persist* after applying the UDF mitigates ** the problem Consider the following code to reproduce it: {code:java} object Bug { def main(args: Array[String]) { val sparkSession: SparkSession = SparkSession.builder.master("local[4]").getOrCreate() val invocations = sparkSession.sparkContext.longAccumulator("invocations") def showTiming[T](body: => T): T = { val t0 = System.nanoTime() invocations.reset() val res = body val t1 = System.nanoTime() println(s"invocations=${invocations.value}, time=${(t1 - t0) / 1e9}") res } def expensive(n: Int) = { Thread.sleep(100) invocations.add(1) 1 } val expensiveUdf = udf((x: Int) => (1 to 10) map { _ => expensive(x) }) val df = sparkSession.range(10).toDF() showTiming(df .select(explode(expensiveUdf(col("id")))) .select(sum(col("col"))) .show()) showTiming(df.select(expensiveUdf(col("id")).as("values")) .persist() .select(explode(col("values"))) .select(sum("col")) .show()) } } {code} => {code:java} first: invocations=300, time=11.342076635 second: invocations=100, time=3.351682967{code} This behavior can have undesired behavior and can return wrong results if a managed state is used inside the UDF. Imagine having the following scenario: 1. you have a dataframe with some string columns 2. you have an expensive function that creates a score based on some string input 3. you want to get all the distinct values from all the columns and their score - there is an executor level cache that holds the score values for strings to minimize the execution of the expensive function consider the following code to reproduce it: {code:java} case class RowWithStrings(c1: String, c2: String, c3: String) case class ValueScore(value: String, score: Double) object Bug { val columns: List[String] = List("c1", "c2", "c3") def score(input: String): Double = { // insert expensive function here input.toDouble } def main(args: Array[String]) { lazy val sparkSession: SparkSession = { val sparkSession = SparkSession.builder.master("local[4]") .getOrCreate() sparkSession } // some cache over expensive operation val cache: TrieMap[String, Double] = TrieMap[String, Double]() // get scores for all columns in the row val body = (row: Row) => { val arr = ArrayBuffer[ValueScore]() columns foreach { column => val value = row.getAs[String](column) if (!cache.contains(value)) { val computedScore = score(value) arr += ValueScore(value, computedScore) cache(value) = computedScore } } arr } val basicUdf = udf(body) val values = (1 to 5) map { idx => // repeated values RowWithStrings(idx.toString, idx.toString, idx.toString) } import sparkSession.implicits._ val df = values.toDF("c1", "c2", "c3").persist() val allCols = df.columns.map(col) df.select(basicUdf(struct(allCols: _*)).as("valuesScore")) .select(explode(col("valuesScore"))) .distinct() .show() } } {code} this shows: {code:java} +---+ |col| +---+ +---+ {code} When adding persist before explode, the result is correct: {code:java} df.select(basicUdf(struct(allCols: _*)).as("valuesScore")) .persist() .select(explode(col("valuesScore"))) .distinct() .show() {code} => {code:java} +--------+ | col| +--------+ |{2, 2.0}| |{4, 4.0}| |{3, 3.0}| |{5, 5.0}| |{1, 1.0}| +--------+ {code} This is not reproducible using 3.0.2 version. > UDF returning array followed by explode calls the UDF multiple times and > could return wrong results > --------------------------------------------------------------------------------------------------- > > Key: SPARK-34644 > URL: https://issues.apache.org/jira/browse/SPARK-34644 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.1.1 > Reporter: Gavrilescu Laurentiu > Priority: Major > > *Applying an UDF followed by explode calls the UDF multiple times.* > Using *persist* after applying the UDF mitigates the problem > Consider the following code to reproduce it: > {code:java} > object Bug { > def main(args: Array[String]) { > val sparkSession: SparkSession = > SparkSession.builder.master("local[4]").getOrCreate() > val invocations = sparkSession.sparkContext.longAccumulator("invocations") > def showTiming[T](body: => T): T = { > val t0 = System.nanoTime() > invocations.reset() > val res = body > val t1 = System.nanoTime() > println(s"invocations=${invocations.value}, time=${(t1 - t0) / 1e9}") > res > } > def expensive(n: Int) = { > Thread.sleep(100) > invocations.add(1) > 1 > } > val expensiveUdf = udf((x: Int) => (1 to 10) map { _ => expensive(x) }) > val df = sparkSession.range(10).toDF() > showTiming(df > .select(explode(expensiveUdf(col("id")))) > .select(sum(col("col"))) > .show()) > showTiming(df.select(expensiveUdf(col("id")).as("values")) > .persist() > .select(explode(col("values"))) > .select(sum("col")) > .show()) > } > } > {code} > => > {code:java} > first: invocations=300, time=11.342076635 > second: invocations=100, time=3.351682967{code} > This behavior can have undesired behavior and can return wrong results if a > managed state is used inside the UDF. > Imagine having the following scenario: > 1. you have a dataframe with some string columns > 2. you have an expensive function that creates a score based on some string > input > 3. you want to get all the distinct values from all the columns and their > score - there is an executor level cache that holds the score values for > strings to minimize the execution of the expensive function > consider the following code to reproduce it: > {code:java} > case class RowWithStrings(c1: String, c2: String, c3: String) > case class ValueScore(value: String, score: Double) > object Bug { > val columns: List[String] = List("c1", "c2", "c3") > def score(input: String): Double = { > // insert expensive function here > input.toDouble > } > def main(args: Array[String]) { > lazy val sparkSession: SparkSession = { > val sparkSession = SparkSession.builder.master("local[4]") > .getOrCreate() > sparkSession > } > // some cache over expensive operation > val cache: TrieMap[String, Double] = TrieMap[String, Double]() > // get scores for all columns in the row > val body = (row: Row) => { > val arr = ArrayBuffer[ValueScore]() > columns foreach { > column => > val value = row.getAs[String](column) > if (!cache.contains(value)) { > val computedScore = score(value) > arr += ValueScore(value, computedScore) > cache(value) = computedScore > } > } > arr > } > val basicUdf = udf(body) > val values = (1 to 5) map { > idx => > // repeated values > RowWithStrings(idx.toString, idx.toString, idx.toString) > } > import sparkSession.implicits._ > val df = values.toDF("c1", "c2", "c3").persist() > val allCols = df.columns.map(col) > df.select(basicUdf(struct(allCols: _*)).as("valuesScore")) > .select(explode(col("valuesScore"))) > .distinct() > .show() > } > } > {code} > this shows: > {code:java} > +---+ > |col| > +---+ > +---+ > {code} > When adding persist before explode, the result is correct: > {code:java} > df.select(basicUdf(struct(allCols: _*)).as("valuesScore")) > .persist() > .select(explode(col("valuesScore"))) > .distinct() > .show() > {code} > => > {code:java} > +--------+ > | col| > +--------+ > |{2, 2.0}| > |{4, 4.0}| > |{3, 3.0}| > |{5, 5.0}| > |{1, 1.0}| > +--------+ > {code} > This is not reproducible using 3.0.2 version. -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org