[ https://issues.apache.org/jira/browse/SPARK-43157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Wenchen Fan resolved SPARK-43157. --------------------------------- Fix Version/s: 3.5.0 3.4.1 Resolution: Fixed Issue resolved by pull request 40812 [https://github.com/apache/spark/pull/40812] > TreeNode tags can become corrupted and hang driver when the dataset is cached > ----------------------------------------------------------------------------- > > Key: SPARK-43157 > URL: https://issues.apache.org/jira/browse/SPARK-43157 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 2.3.0, 3.5.0 > Reporter: Rob Reeves > Assignee: Rob Reeves > Priority: Major > Fix For: 3.5.0, 3.4.1 > > > If a cached dataset is used by multiple other datasets materialized in > separate threads it can corrupt the TreeNode.tags map in any of the cached > plan nodes. This will hang the driver forever. This happens because > TreeNode.tags is not thread-safe. How this happens: > # Multiple datasets are materialized at the same time in different threads > that reference the same cached dataset > # AdaptiveSparkPlanExec.onUpdatePlan will call ExplainMode.fromString > # ExplainUtils uses the TreeNode.tags map to store the operator Id for every > node in the plan. This is usually okay because the plan is cloned. When there > is an InMemoryScanExec the InMemoryRelation.cachedPlan is not cloned so > multiple threads can set the operator Id. > Making the TreeNode.tags field thread-safe does not solve this problem > because there is still a correctness issue. The threads may be overwriting > each other's operator Ids, which could be different. > Example stack trace of the infinite loop: > {code:scala} > scala.collection.mutable.HashTable.resize(HashTable.scala:265) > scala.collection.mutable.HashTable.addEntry0(HashTable.scala:158) > scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:170) > scala.collection.mutable.HashTable.findOrAddEntry$(HashTable.scala:167) > scala.collection.mutable.HashMap.findOrAddEntry(HashMap.scala:44) > scala.collection.mutable.HashMap.put(HashMap.scala:126) > scala.collection.mutable.HashMap.update(HashMap.scala:131) > org.apache.spark.sql.catalyst.trees.TreeNode.setTagValue(TreeNode.scala:108) > org.apache.spark.sql.execution.ExplainUtils$.setOpId$1(ExplainUtils.scala:134) > … > org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:175) > org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.onUpdatePlan(AdaptiveSparkPlanExec.scala:662){code} > Example to show the cachedPlan object is not cloned: > {code:java} > import org.apache.spark.sql.execution.SparkPlan > import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec > import spark.implicits._ > def findCacheOperator(plan: SparkPlan): Option[InMemoryTableScanExec] = { > if (plan.isInstanceOf[InMemoryTableScanExec]) { > Some(plan.asInstanceOf[InMemoryTableScanExec]) > } else if (plan.children.isEmpty && plan.subqueries.isEmpty) { > None > } else { > (plan.subqueries.flatMap(p => findCacheOperator(p)) ++ > plan.children.flatMap(findCacheOperator)).headOption > } > } > val df = spark.range(10).filter($"id" < 100).cache() > val df1 = df.limit(1) > val df2 = df.limit(1) > // Get the cache operator (InMemoryTableScanExec) in each plan > val plan1 = findCacheOperator(df1.queryExecution.executedPlan).get > val plan2 = findCacheOperator(df2.queryExecution.executedPlan).get > // Check if InMemoryTableScanExec references point to the same object > println(plan1.eq(plan2)) > // returns false// Check if InMemoryRelation references point to the same > object > println(plan1.relation.eq(plan2.relation)) > // returns false > // Check if the cached SparkPlan references point to the same object > println(plan1.relation.cachedPlan.eq(plan2.relation.cachedPlan)) > // returns true > // This shows that the cloned plan2 still has references to the original > plan1 {code} -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org