[ 
https://issues.apache.org/jira/browse/SPARK-43157?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Wenchen Fan reassigned SPARK-43157:
-----------------------------------

    Assignee: Rob Reeves

> TreeNode tags can become corrupted and hang driver when the dataset is cached
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-43157
>                 URL: https://issues.apache.org/jira/browse/SPARK-43157
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.0, 3.5.0
>            Reporter: Rob Reeves
>            Assignee: Rob Reeves
>            Priority: Major
>
> If a cached dataset is used by multiple other datasets materialized in 
> separate threads it can corrupt the TreeNode.tags map in any of the cached 
> plan nodes. This will hang the driver forever. This happens because 
> TreeNode.tags is not thread-safe. How this happens:
>  # Multiple datasets are materialized at the same time in different threads 
> that reference the same cached dataset
>  # AdaptiveSparkPlanExec.onUpdatePlan will call ExplainMode.fromString
>  # ExplainUtils uses the TreeNode.tags map to store the operator Id for every 
> node in the plan. This is usually okay because the plan is cloned. When there 
> is an InMemoryScanExec the InMemoryRelation.cachedPlan is not cloned so 
> multiple threads can set the operator Id.
> Making the TreeNode.tags field thread-safe does not solve this problem 
> because there is still a correctness issue. The threads may be overwriting 
> each other's operator Ids, which could be different.
> Example stack trace of the infinite loop:
> {code:scala}
> scala.collection.mutable.HashTable.resize(HashTable.scala:265)
> scala.collection.mutable.HashTable.addEntry0(HashTable.scala:158)
> scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:170)
> scala.collection.mutable.HashTable.findOrAddEntry$(HashTable.scala:167)
> scala.collection.mutable.HashMap.findOrAddEntry(HashMap.scala:44)
> scala.collection.mutable.HashMap.put(HashMap.scala:126)
> scala.collection.mutable.HashMap.update(HashMap.scala:131)
> org.apache.spark.sql.catalyst.trees.TreeNode.setTagValue(TreeNode.scala:108)
> org.apache.spark.sql.execution.ExplainUtils$.setOpId$1(ExplainUtils.scala:134)
> …
> org.apache.spark.sql.execution.QueryExecution.explainString(QueryExecution.scala:175)
> org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.onUpdatePlan(AdaptiveSparkPlanExec.scala:662){code}
> Example to show the cachedPlan object is not cloned:
> {code:java}
> import org.apache.spark.sql.execution.SparkPlan
> import org.apache.spark.sql.execution.columnar.InMemoryTableScanExec
> import spark.implicits._
> def findCacheOperator(plan: SparkPlan): Option[InMemoryTableScanExec] = {
>   if (plan.isInstanceOf[InMemoryTableScanExec]) {
>     Some(plan.asInstanceOf[InMemoryTableScanExec])
>   } else if (plan.children.isEmpty && plan.subqueries.isEmpty) {
>     None
>   } else {
>     (plan.subqueries.flatMap(p => findCacheOperator(p)) ++
>       plan.children.flatMap(findCacheOperator)).headOption
>   }
> }
> val df = spark.range(10).filter($"id" < 100).cache()
> val df1 = df.limit(1)
> val df2 = df.limit(1)
> // Get the cache operator (InMemoryTableScanExec) in each plan
> val plan1 = findCacheOperator(df1.queryExecution.executedPlan).get
> val plan2 = findCacheOperator(df2.queryExecution.executedPlan).get
> // Check if InMemoryTableScanExec references point to the same object
> println(plan1.eq(plan2))
> // returns false// Check if InMemoryRelation references point to the same 
> object
> println(plan1.relation.eq(plan2.relation))
> // returns false
> // Check if the cached SparkPlan references point to the same object
> println(plan1.relation.cachedPlan.eq(plan2.relation.cachedPlan))
> // returns true
> // This shows that the cloned plan2 still has references to the original 
> plan1 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to