[ https://issues.apache.org/jira/browse/SPARK-44897?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Wenchen Fan resolved SPARK-44897. --------------------------------- Fix Version/s: 3.5.0 Resolution: Fixed Issue resolved by pull request 42587 [https://github.com/apache/spark/pull/42587] > Local Property Propagation to Subquery Broadcast Exec > ----------------------------------------------------- > > Key: SPARK-44897 > URL: https://issues.apache.org/jira/browse/SPARK-44897 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.4.0 > Reporter: Michael Chen > Assignee: Michael Chen > Priority: Major > Fix For: 3.5.0 > > > https://issues.apache.org/jira/browse/SPARK-32748 was opened and then I > believe mistakenly reverted to address this issue. The claim was local > properties propagation in SubqueryBroadcastExec to the dynamic pruning thread > is not necessary because they will be propagated by broadcast threads > anyways. However, in a scenario where the dynamic pruning thread is first to > initialize the broadcast relation future, the local properties will not be > propagated correctly. This is because the local properties being propagated > to the broadcast threads would already be incorrect. > I do not have a good way of reproducing this consistently because generally > the SubqueryBroadcastExec is not the first to initialize the broadcast > relation future, but by adding a Thread.sleep(1) into the doPrepare method of > SubqueryBroadcastExec, the following test always fails. > {code:java} > withSQLConf(StaticSQLConf.SUBQUERY_BROADCAST_MAX_THREAD_THRESHOLD.key -> "1") > { > withTable("a", "b") { > val confKey = "spark.sql.y" > val confValue1 = UUID.randomUUID().toString() > val confValue2 = UUID.randomUUID().toString() > Seq((confValue1, "1")).toDF("key", "value") > .write > .format("parquet") > .partitionBy("key") > .mode("overwrite") > .saveAsTable("a") > val df1 = spark.table("a") > def generateBroadcastDataFrame(confKey: String, confValue: String): > Dataset[String] = { > val df = spark.range(1).mapPartitions { _ => > Iterator(TaskContext.get.getLocalProperty(confKey)) > }.filter($"value".contains(confValue)).as("c") > df.hint("broadcast") > } > // set local property and assert > val df2 = generateBroadcastDataFrame(confKey, confValue1) > spark.sparkContext.setLocalProperty(confKey, confValue1) > val checkDF = df1.join(df2).where($"a.key" === > $"c.value").select($"a.key", $"c.value") > val checks = checkDF.collect() > assert(checks.forall(_.toSeq == Seq(confValue1, confValue1))) > // change local property and re-assert > Seq((confValue2, "1")).toDF("key", "value") > .write > .format("parquet") > .partitionBy("key") > .mode("overwrite") > .saveAsTable("b") > val df3 = spark.table("b") > val df4 = generateBroadcastDataFrame(confKey, confValue2) > spark.sparkContext.setLocalProperty(confKey, confValue2) > val checks2DF = df3.join(df4).where($"b.key" === > $"c.value").select($"b.key", $"c.value") > val checks2 = checks2DF.collect() > assert(checks2.forall(_.toSeq == Seq(confValue2, confValue2))) > assert(checks2.nonEmpty) > } > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org