Ahmad Humayun created SPARK-51798: ------------------------------------- Summary: Non-deterministic property of UDF not respected when used in orderBy Key: SPARK-51798 URL: https://issues.apache.org/jira/browse/SPARK-51798 Project: Spark Issue Type: Bug Components: Optimizer Affects Versions: 3.5.0 Environment: *Environment:* * Spark version: 3.5.0
* Scala version: 2.13 * Local mode / standalone * Java 11 Corretto Reporter: Ahmad Humayun *Summary:* Non-deterministic UDFs are evaluated multiple times per row when Spark optimizer rules are disabled, violating expected semantics of {{.asNondeterministic()}} *Description:* According to the Spark API, marking a UDF as {{.asNondeterministic()}} should instruct the optimizer to avoid redundant re-evaluations and treat the result as unstable. However, when optimizer rules are disabled via {{{}SQLConf.OPTIMIZER_EXCLUDED_RULES{}}}, this behavior is broken: the UDF gets evaluated multiple times for the same input row. This breaks the expected semantics and causes issues for use cases involving external state, such as querying a Redis-like client. *Minimal Reproduction Example:* {code:java} // Emulate an external state manager object DummyRedisClient { private var state: Int = 1 private var increasing: Boolean = true def get(key: String): String = { val result = state.toString if (increasing) { if (state < 3) state += 1 else { state -= 1; increasing = false } } else { if (state > 0) state -= 1 else { state += 1; increasing = true } } result } } object Main { def main(args: Array[String]): Unit = { val spark = SparkSession .builder() .master("local[*]") .getOrCreate() // UDF that queries external state val myUdf = udf((input: String) => { DummyRedisClient.get("global_state").toInt }).asNondeterministic() // Disable all excludable optimizer rules val sparkOpt = spark.sessionState.optimizer val excludableRules = sparkOpt.defaultBatches .flatMap(_.rules.map(_.ruleName)) .toSet -- sparkOpt.nonExcludableRules.toSet val excludedRules = excludableRules.mkString(",") SQLConf .get .setConfString(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, excludedRules) // Emulate data source spark.range(1, 4) .repartition(1) .write .mode("overwrite") .parquet("/tmp/jira-issue") // Minimal query val a = spark.read.parquet("/tmp/jira-issue") val b = a.orderBy(myUdf(col("id"))) b.show() }}{code} *Expected Behavior:* Each UDF call should be evaluated exactly once per input row, regardless of optimizer rule configuration, since {{.asNondeterministic()}} has been explicitly set. *Actual Behavior:* When optimizer rules are disabled, the UDF is invoked multiple times per input row, violating the non-determinism contract and producing inconsistent or incorrect results for stateful UDFs. *Impact:* Breaks correctness for UDFs involving external state or side effects (e.g., querying Redis, logging, metrics). Also makes debugging unpredictable due to duplicate calls. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org