Ahmad Humayun created SPARK-51798:
-------------------------------------

             Summary: Non-deterministic property of UDF not respected when used 
in orderBy
                 Key: SPARK-51798
                 URL: https://issues.apache.org/jira/browse/SPARK-51798
             Project: Spark
          Issue Type: Bug
          Components: Optimizer
    Affects Versions: 3.5.0
         Environment: *Environment:*
 * Spark version: 3.5.0

 * Scala version: 2.13

 * Local mode / standalone

 * Java 11 Corretto
            Reporter: Ahmad Humayun


*Summary:*
Non-deterministic UDFs are evaluated multiple times per row when Spark 
optimizer rules are disabled, violating expected semantics of 
{{.asNondeterministic()}}

*Description:*
According to the Spark API, marking a UDF as {{.asNondeterministic()}} should 
instruct the optimizer to avoid redundant re-evaluations and treat the result 
as unstable. However, when optimizer rules are disabled via 
{{{}SQLConf.OPTIMIZER_EXCLUDED_RULES{}}}, this behavior is broken: the UDF gets 
evaluated multiple times for the same input row.

This breaks the expected semantics and causes issues for use cases involving 
external state, such as querying a Redis-like client.

*Minimal Reproduction Example:*

 
{code:java}
// Emulate an external state manager
object DummyRedisClient {
  private var state: Int = 1
  private var increasing: Boolean = true  
  def get(key: String): String = {
    val result = state.toString
    if (increasing) {
      if (state < 3) state += 1 else { state -= 1; increasing = false }
    } else {
      if (state > 0) state -= 1 else { state += 1; increasing = true }
    }
    result
  }
}


object Main {
 def main(args: Array[String]): Unit = {   
   val spark = SparkSession
                .builder()
                .master("local[*]")
                .getOrCreate()

   // UDF that queries external state
   val myUdf = udf((input: String) => {     
     DummyRedisClient.get("global_state").toInt     
   }).asNondeterministic()

   // Disable all excludable optimizer rules   
   val sparkOpt = spark.sessionState.optimizer   
   val excludableRules = sparkOpt.defaultBatches
                           .flatMap(_.rules.map(_.ruleName))
                           .toSet -- sparkOpt.nonExcludableRules.toSet   
   val excludedRules = excludableRules.mkString(",")   
   SQLConf
     .get
     .setConfString(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, excludedRules)
   
   // Emulate data source
   spark.range(1, 4)
     .repartition(1)
     .write
     .mode("overwrite")
     .parquet("/tmp/jira-issue")

   // Minimal query
   val a = spark.read.parquet("/tmp/jira-issue")   
   val b = a.orderBy(myUdf(col("id")))   
   b.show()
}}{code}
*Expected Behavior:*
Each UDF call should be evaluated exactly once per input row, regardless of 
optimizer rule configuration, since {{.asNondeterministic()}} has been 
explicitly set.

*Actual Behavior:*
When optimizer rules are disabled, the UDF is invoked multiple times per input 
row, violating the non-determinism contract and producing inconsistent or 
incorrect results for stateful UDFs.

*Impact:*
Breaks correctness for UDFs involving external state or side effects (e.g., 
querying Redis, logging, metrics). Also makes debugging unpredictable due to 
duplicate calls.

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to