Repository: spark
Updated Branches:
  refs/heads/master a36653c5b -> 85c5424d4


[SPARK-18144][SQL] logging StreamingQueryListener$QueryStartedEvent

## What changes were proposed in this pull request?

The PR fixes the bug that the QueryStartedEvent is not logged

the postToAll() in the original code is actually calling 
StreamingQueryListenerBus.postToAll() which has no listener at all....we shall 
post by sparkListenerBus.postToAll(s) and this.postToAll() to trigger local 
listeners as well as the listeners registered in LiveListenerBus

zsxwing
## How was this patch tested?

The following snapshot shows that QueryStartedEvent has been logged correctly

![image](https://cloud.githubusercontent.com/assets/678008/19821553/007a7d28-9d2d-11e6-9f13-49851559cdaa.png)

Author: CodingCat <zhunans...@gmail.com>

Closes #15675 from CodingCat/SPARK-18144.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/85c5424d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/85c5424d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/85c5424d

Branch: refs/heads/master
Commit: 85c5424d466f4a5765c825e0e2ab30da97611285
Parents: a36653c
Author: CodingCat <zhunans...@gmail.com>
Authored: Tue Nov 1 23:39:53 2016 -0700
Committer: Shixiong Zhu <shixi...@databricks.com>
Committed: Tue Nov 1 23:39:53 2016 -0700

----------------------------------------------------------------------
 .../execution/streaming/StreamingQueryListenerBus.scala   | 10 +++++++++-
 .../apache/spark/sql/streaming/StreamingQuerySuite.scala  |  7 ++++++-
 2 files changed, 15 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/85c5424d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
index fc2190d..22e4c63 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
@@ -41,6 +41,8 @@ class StreamingQueryListenerBus(sparkListenerBus: 
LiveListenerBus)
   def post(event: StreamingQueryListener.Event) {
     event match {
       case s: QueryStartedEvent =>
+        sparkListenerBus.post(s)
+        // post to local listeners to trigger callbacks
         postToAll(s)
       case _ =>
         sparkListenerBus.post(event)
@@ -50,7 +52,13 @@ class StreamingQueryListenerBus(sparkListenerBus: 
LiveListenerBus)
   override def onOtherEvent(event: SparkListenerEvent): Unit = {
     event match {
       case e: StreamingQueryListener.Event =>
-        postToAll(e)
+        // SPARK-18144: we broadcast QueryStartedEvent to all listeners 
attached to this bus
+        // synchronously and the ones attached to LiveListenerBus 
asynchronously. Therefore,
+        // we need to ignore QueryStartedEvent if this method is called within 
SparkListenerBus
+        // thread
+        if (!LiveListenerBus.withinListenerThread.value || 
!e.isInstanceOf[QueryStartedEvent]) {
+          postToAll(e)
+        }
       case _ =>
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/85c5424d/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 464c443..31b7fe0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -290,7 +290,10 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging {
     // A StreamingQueryListener that gets the query status after the first 
completed trigger
     val listener = new StreamingQueryListener {
       @volatile var firstStatus: StreamingQueryStatus = null
-      override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { }
+      @volatile var queryStartedEvent = 0
+      override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
+        queryStartedEvent += 1
+      }
       override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
        if (firstStatus == null) firstStatus = queryProgress.queryStatus
       }
@@ -303,6 +306,8 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging {
       q.processAllAvailable()
       eventually(timeout(streamingTimeout)) {
         assert(listener.firstStatus != null)
+        // test if QueryStartedEvent callback is called for only once
+        assert(listener.queryStartedEvent === 1)
       }
       listener.firstStatus
     } finally {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to