Repository: spark
Updated Branches:
  refs/heads/master c3e9a120e -> b1581ac28


[SPARK-9854] [SQL] RuleExecutor.timeMap should be thread-safe

`RuleExecutor.timeMap` is currently a non-thread-safe mutable HashMap; this can 
lead to infinite loops if multiple threads are concurrently modifying the map.  
I believe that this is responsible for some hangs that I've observed in 
HiveQuerySuite.

This patch addresses this by using a Guava `AtomicLongMap`.

Author: Josh Rosen <joshro...@databricks.com>

Closes #8120 from JoshRosen/rule-executor-time-map-fix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b1581ac2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b1581ac2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b1581ac2

Branch: refs/heads/master
Commit: b1581ac28840a4d2209ef8bb5c9f8700b4c1b286
Parents: c3e9a12
Author: Josh Rosen <joshro...@databricks.com>
Authored: Tue Aug 11 22:46:59 2015 -0700
Committer: Josh Rosen <joshro...@databricks.com>
Committed: Tue Aug 11 22:46:59 2015 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/rules/RuleExecutor.scala      | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b1581ac2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
----------------------------------------------------------------------
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
index 8b82451..f80d2a9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleExecutor.scala
@@ -17,22 +17,25 @@
 
 package org.apache.spark.sql.catalyst.rules
 
+import scala.collection.JavaConverters._
+
+import com.google.common.util.concurrent.AtomicLongMap
+
 import org.apache.spark.Logging
 import org.apache.spark.sql.catalyst.trees.TreeNode
 import org.apache.spark.sql.catalyst.util.sideBySide
 
-import scala.collection.mutable
-
 object RuleExecutor {
-  protected val timeMap = new mutable.HashMap[String, Long].withDefault(_ => 0)
+  protected val timeMap = AtomicLongMap.create[String]()
 
   /** Resets statistics about time spent running specific rules */
   def resetTime(): Unit = timeMap.clear()
 
   /** Dump statistics about time spent running specific rules. */
   def dumpTimeSpent(): String = {
-    val maxSize = timeMap.keys.map(_.toString.length).max
-    timeMap.toSeq.sortBy(_._2).reverseMap { case (k, v) =>
+    val map = timeMap.asMap().asScala
+    val maxSize = map.keys.map(_.toString.length).max
+    map.toSeq.sortBy(_._2).reverseMap { case (k, v) =>
       s"${k.padTo(maxSize, " ").mkString} $v"
     }.mkString("\n")
   }
@@ -79,7 +82,7 @@ abstract class RuleExecutor[TreeType <: TreeNode[_]] extends 
Logging {
             val startTime = System.nanoTime()
             val result = rule(plan)
             val runTime = System.nanoTime() - startTime
-            RuleExecutor.timeMap(rule.ruleName) = 
RuleExecutor.timeMap(rule.ruleName) + runTime
+            RuleExecutor.timeMap.addAndGet(rule.ruleName, runTime)
 
             if (!result.fastEquals(plan)) {
               logTrace(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to