Repository: spark
Updated Branches:
  refs/heads/master 5e31db70b -> 3cc2c053b


[SPARK-11540][SQL] API audit for QueryExecutionListener.

Author: Reynold Xin <r...@databricks.com>

Closes #9509 from rxin/SPARK-11540.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3cc2c053
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3cc2c053
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3cc2c053

Branch: refs/heads/master
Commit: 3cc2c053b5d68c747a30bd58cf388b87b1922f13
Parents: 5e31db7
Author: Reynold Xin <r...@databricks.com>
Authored: Thu Nov 5 18:12:54 2015 -0800
Committer: Reynold Xin <r...@databricks.com>
Committed: Thu Nov 5 18:12:54 2015 -0800

----------------------------------------------------------------------
 .../spark/sql/execution/QueryExecution.scala    |  30 +++---
 .../spark/sql/util/QueryExecutionListener.scala | 101 ++++++++++---------
 2 files changed, 72 insertions(+), 59 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3cc2c053/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index fc91745..c2142d0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution
 
+import com.google.common.annotations.VisibleForTesting
+
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.catalyst.InternalRow
@@ -25,31 +27,33 @@ import 
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 /**
  * The primary workflow for executing relational queries using Spark.  
Designed to allow easy
  * access to the intermediate phases of query execution for developers.
+ *
+ * While this is not a public class, we should avoid changing the function 
names for the sake of
+ * changing them, because a lot of developers use the feature for debugging.
  */
 class QueryExecution(val sqlContext: SQLContext, val logical: LogicalPlan) {
-  val analyzer = sqlContext.analyzer
-  val optimizer = sqlContext.optimizer
-  val planner = sqlContext.planner
-  val cacheManager = sqlContext.cacheManager
-  val prepareForExecution = sqlContext.prepareForExecution
 
-  def assertAnalyzed(): Unit = analyzer.checkAnalysis(analyzed)
+  @VisibleForTesting
+  def assertAnalyzed(): Unit = sqlContext.analyzer.checkAnalysis(analyzed)
+
+  lazy val analyzed: LogicalPlan = sqlContext.analyzer.execute(logical)
 
-  lazy val analyzed: LogicalPlan = analyzer.execute(logical)
   lazy val withCachedData: LogicalPlan = {
     assertAnalyzed()
-    cacheManager.useCachedData(analyzed)
+    sqlContext.cacheManager.useCachedData(analyzed)
   }
-  lazy val optimizedPlan: LogicalPlan = optimizer.execute(withCachedData)
+
+  lazy val optimizedPlan: LogicalPlan = 
sqlContext.optimizer.execute(withCachedData)
 
   // TODO: Don't just pick the first one...
   lazy val sparkPlan: SparkPlan = {
     SparkPlan.currentContext.set(sqlContext)
-    planner.plan(optimizedPlan).next()
+    sqlContext.planner.plan(optimizedPlan).next()
   }
+
   // executedPlan should not be used to initialize any SparkPlan. It should be
   // only used for execution.
-  lazy val executedPlan: SparkPlan = prepareForExecution.execute(sparkPlan)
+  lazy val executedPlan: SparkPlan = 
sqlContext.prepareForExecution.execute(sparkPlan)
 
   /** Internal version of the RDD. Avoids copies and has no schema */
   lazy val toRdd: RDD[InternalRow] = executedPlan.execute()
@@ -57,11 +61,11 @@ class QueryExecution(val sqlContext: SQLContext, val 
logical: LogicalPlan) {
   protected def stringOrError[A](f: => A): String =
     try f.toString catch { case e: Throwable => e.toString }
 
-  def simpleString: String =
+  def simpleString: String = {
     s"""== Physical Plan ==
        |${stringOrError(executedPlan)}
       """.stripMargin.trim
-
+  }
 
   override def toString: String = {
     def output =

http://git-wip-us.apache.org/repos/asf/spark/blob/3cc2c053/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
index 909a8ab..ac432e2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/util/QueryExecutionListener.scala
@@ -19,36 +19,38 @@ package org.apache.spark.sql.util
 
 import java.util.concurrent.locks.ReentrantReadWriteLock
 import scala.collection.mutable.ListBuffer
+import scala.util.control.NonFatal
 
-import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.Logging
+import org.apache.spark.annotation.{DeveloperApi, Experimental}
 import org.apache.spark.sql.execution.QueryExecution
 
 
 /**
+ * :: Experimental ::
  * The interface of query execution listener that can be used to analyze 
execution metrics.
  *
- * Note that implementations should guarantee thread-safety as they will be 
used in a non
- * thread-safe way.
+ * Note that implementations should guarantee thread-safety as they can be 
invoked by
+ * multiple different threads.
  */
 @Experimental
 trait QueryExecutionListener {
 
   /**
    * A callback function that will be called when a query executed 
successfully.
-   * Implementations should guarantee thread-safe.
+   * Note that this can be invoked by multiple different threads.
    *
-   * @param funcName the name of the action that triggered this query.
+   * @param funcName name of the action that triggered this query.
    * @param qe the QueryExecution object that carries detail information like 
logical plan,
    *           physical plan, etc.
-   * @param duration the execution time for this query in nanoseconds.
+   * @param durationNs the execution time for this query in nanoseconds.
    */
   @DeveloperApi
-  def onSuccess(funcName: String, qe: QueryExecution, duration: Long)
+  def onSuccess(funcName: String, qe: QueryExecution, durationNs: Long): Unit
 
   /**
    * A callback function that will be called when a query execution failed.
-   * Implementations should guarantee thread-safe.
+   * Note that this can be invoked by multiple different threads.
    *
    * @param funcName the name of the action that triggered this query.
    * @param qe the QueryExecution object that carries detail information like 
logical plan,
@@ -56,34 +58,20 @@ trait QueryExecutionListener {
    * @param exception the exception that failed this query.
    */
   @DeveloperApi
-  def onFailure(funcName: String, qe: QueryExecution, exception: Exception)
+  def onFailure(funcName: String, qe: QueryExecution, exception: Exception): 
Unit
 }
 
-@Experimental
-class ExecutionListenerManager extends Logging {
-  private[this] val listeners = ListBuffer.empty[QueryExecutionListener]
-  private[this] val lock = new ReentrantReadWriteLock()
-
-  /** Acquires a read lock on the cache for the duration of `f`. */
-  private def readLock[A](f: => A): A = {
-    val rl = lock.readLock()
-    rl.lock()
-    try f finally {
-      rl.unlock()
-    }
-  }
 
-  /** Acquires a write lock on the cache for the duration of `f`. */
-  private def writeLock[A](f: => A): A = {
-    val wl = lock.writeLock()
-    wl.lock()
-    try f finally {
-      wl.unlock()
-    }
-  }
+/**
+ * :: Experimental ::
+ *
+ * Manager for [[QueryExecutionListener]]. See 
[[org.apache.spark.sql.SQLContext.listenerManager]].
+ */
+@Experimental
+class ExecutionListenerManager private[sql] () extends Logging {
 
   /**
-   * Registers the specified QueryExecutionListener.
+   * Registers the specified [[QueryExecutionListener]].
    */
   @DeveloperApi
   def register(listener: QueryExecutionListener): Unit = writeLock {
@@ -91,7 +79,7 @@ class ExecutionListenerManager extends Logging {
   }
 
   /**
-   * Unregisters the specified QueryExecutionListener.
+   * Unregisters the specified [[QueryExecutionListener]].
    */
   @DeveloperApi
   def unregister(listener: QueryExecutionListener): Unit = writeLock {
@@ -99,38 +87,59 @@ class ExecutionListenerManager extends Logging {
   }
 
   /**
-   * clears out all registered QueryExecutionListeners.
+   * Removes all the registered [[QueryExecutionListener]].
    */
   @DeveloperApi
   def clear(): Unit = writeLock {
     listeners.clear()
   }
 
-  private[sql] def onSuccess(
-      funcName: String,
-      qe: QueryExecution,
-      duration: Long): Unit = readLock {
-    withErrorHandling { listener =>
-      listener.onSuccess(funcName, qe, duration)
+  private[sql] def onSuccess(funcName: String, qe: QueryExecution, duration: 
Long): Unit = {
+    readLock {
+      withErrorHandling { listener =>
+        listener.onSuccess(funcName, qe, duration)
+      }
     }
   }
 
-  private[sql] def onFailure(
-      funcName: String,
-      qe: QueryExecution,
-      exception: Exception): Unit = readLock {
-    withErrorHandling { listener =>
-      listener.onFailure(funcName, qe, exception)
+  private[sql] def onFailure(funcName: String, qe: QueryExecution, exception: 
Exception): Unit = {
+    readLock {
+      withErrorHandling { listener =>
+        listener.onFailure(funcName, qe, exception)
+      }
     }
   }
 
+  private[this] val listeners = ListBuffer.empty[QueryExecutionListener]
+
+  /** A lock to prevent updating the list of listeners while we are traversing 
through them. */
+  private[this] val lock = new ReentrantReadWriteLock()
+
   private def withErrorHandling(f: QueryExecutionListener => Unit): Unit = {
     for (listener <- listeners) {
       try {
         f(listener)
       } catch {
-        case e: Exception => logWarning("error executing query execution 
listener", e)
+        case NonFatal(e) => logWarning("Error executing query execution 
listener", e)
       }
     }
   }
+
+  /** Acquires a read lock on the cache for the duration of `f`. */
+  private def readLock[A](f: => A): A = {
+    val rl = lock.readLock()
+    rl.lock()
+    try f finally {
+      rl.unlock()
+    }
+  }
+
+  /** Acquires a write lock on the cache for the duration of `f`. */
+  private def writeLock[A](f: => A): A = {
+    val wl = lock.writeLock()
+    wl.lock()
+    try f finally {
+      wl.unlock()
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to