Repository: spark
Updated Branches:
  refs/heads/master 695dbc816 -> 0cfd6192f


[SPARK-15580][SQL] Add ContinuousQueryInfo to make ContinuousQueryListener 
events serializable

## What changes were proposed in this pull request?

This PR adds ContinuousQueryInfo to make ContinuousQueryListener events 
serializable in order to support writing events into the event log.

## How was this patch tested?

Jenkins unit tests.

Author: Shixiong Zhu <shixi...@databricks.com>

Closes #13335 from zsxwing/query-info.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0cfd6192
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0cfd6192
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0cfd6192

Branch: refs/heads/master
Commit: 0cfd6192f38932a26195a6a8dbbc637d67f5ec55
Parents: 695dbc8
Author: Shixiong Zhu <shixi...@databricks.com>
Authored: Tue Jun 7 16:40:03 2016 -0700
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Tue Jun 7 16:40:03 2016 -0700

----------------------------------------------------------------------
 .../streaming/ContinuousQueryListenerBus.scala  |  27 +---
 .../execution/streaming/StreamExecution.scala   |  21 ++-
 .../sql/streaming/ContinuousQueryInfo.scala     |  34 +++++
 .../sql/streaming/ContinuousQueryListener.scala |  34 +++--
 .../apache/spark/sql/streaming/SinkStatus.scala |   6 +-
 .../spark/sql/streaming/SourceStatus.scala      |   8 +-
 .../ContinuousQueryListenerSuite.scala          | 133 +++++++++++++++----
 .../sql/streaming/ContinuousQuerySuite.scala    |  16 +--
 8 files changed, 203 insertions(+), 76 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/0cfd6192/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala
index 2a1be09..f50951f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ContinuousQueryListenerBus.scala
@@ -22,15 +22,13 @@ import 
org.apache.spark.sql.streaming.ContinuousQueryListener
 import org.apache.spark.util.ListenerBus
 
 /**
- * A bus to forward events to [[ContinuousQueryListener]]s. This one will wrap 
received
- * [[ContinuousQueryListener.Event]]s as WrappedContinuousQueryListenerEvents 
and send them to the
- * Spark listener bus. It also registers itself with Spark listener bus, so 
that it can receive
- * WrappedContinuousQueryListenerEvents, unwrap them as 
ContinuousQueryListener.Events and
- * dispatch them to ContinuousQueryListener.
+ * A bus to forward events to [[ContinuousQueryListener]]s. This one will send 
received
+ * [[ContinuousQueryListener.Event]]s to the Spark listener bus. It also 
registers itself with
+ * Spark listener bus, so that it can receive 
[[ContinuousQueryListener.Event]]s and dispatch them
+ * to ContinuousQueryListener.
  */
 class ContinuousQueryListenerBus(sparkListenerBus: LiveListenerBus)
-  extends SparkListener
-    with ListenerBus[ContinuousQueryListener, ContinuousQueryListener.Event] {
+  extends SparkListener with ListenerBus[ContinuousQueryListener, 
ContinuousQueryListener.Event] {
 
   import ContinuousQueryListener._
 
@@ -45,13 +43,13 @@ class ContinuousQueryListenerBus(sparkListenerBus: 
LiveListenerBus)
       case s: QueryStarted =>
         postToAll(s)
       case _ =>
-        sparkListenerBus.post(new WrappedContinuousQueryListenerEvent(event))
+        sparkListenerBus.post(event)
     }
   }
 
   override def onOtherEvent(event: SparkListenerEvent): Unit = {
     event match {
-      case WrappedContinuousQueryListenerEvent(e) =>
+      case e: ContinuousQueryListener.Event =>
         postToAll(e)
       case _ =>
     }
@@ -71,15 +69,4 @@ class ContinuousQueryListenerBus(sparkListenerBus: 
LiveListenerBus)
     }
   }
 
-  /**
-   * Wrapper for StreamingListenerEvent as SparkListenerEvent so that it can 
be posted to Spark
-   * listener bus.
-   */
-  private case class WrappedContinuousQueryListenerEvent(
-      streamingListenerEvent: ContinuousQueryListener.Event)
-    extends SparkListenerEvent {
-
-    // Do not log streaming events in event log as history server does not 
support these events.
-    protected[spark] override def logEvent: Boolean = false
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0cfd6192/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 16d38a2..d9800e4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -131,12 +131,13 @@ class StreamExecution(
   /** Returns current status of all the sources. */
   override def sourceStatuses: Array[SourceStatus] = {
     val localAvailableOffsets = availableOffsets
-    sources.map(s => new SourceStatus(s.toString, 
localAvailableOffsets.get(s))).toArray
+    sources.map(s =>
+      new SourceStatus(s.toString, 
localAvailableOffsets.get(s).map(_.toString))).toArray
   }
 
   /** Returns current status of the sink. */
   override def sinkStatus: SinkStatus =
-    new SinkStatus(sink.toString, committedOffsets.toCompositeOffset(sources))
+    new SinkStatus(sink.toString, 
committedOffsets.toCompositeOffset(sources).toString)
 
   /** Returns the [[ContinuousQueryException]] if the query was terminated by 
an exception. */
   override def exception: Option[ContinuousQueryException] = 
Option(streamDeathCause)
@@ -167,7 +168,7 @@ class StreamExecution(
       // Mark ACTIVE and then post the event. QueryStarted event is 
synchronously sent to listeners,
       // so must mark this as ACTIVE first.
       state = ACTIVE
-      postEvent(new QueryStarted(this)) // Assumption: Does not throw 
exception.
+      postEvent(new QueryStarted(this.toInfo)) // Assumption: Does not throw 
exception.
 
       // Unblock starting thread
       startLatch.countDown()
@@ -206,7 +207,10 @@ class StreamExecution(
     } finally {
       state = TERMINATED
       sparkSession.streams.notifyQueryTermination(StreamExecution.this)
-      postEvent(new QueryTerminated(this))
+      postEvent(new QueryTerminated(
+        this.toInfo,
+        exception.map(_.getMessage),
+        exception.map(_.getStackTrace.toSeq).getOrElse(Nil)))
       terminationLatch.countDown()
     }
   }
@@ -374,7 +378,7 @@ class StreamExecution(
     logInfo(s"Completed up to $availableOffsets in ${batchTime}ms")
     // Update committed offsets.
     committedOffsets ++= availableOffsets
-    postEvent(new QueryProgress(this))
+    postEvent(new QueryProgress(this.toInfo))
   }
 
   private def postEvent(event: ContinuousQueryListener.Event) {
@@ -484,6 +488,13 @@ class StreamExecution(
      """.stripMargin
   }
 
+  private def toInfo: ContinuousQueryInfo = {
+    new ContinuousQueryInfo(
+      this.name,
+      this.sourceStatuses,
+      this.sinkStatus)
+  }
+
   trait State
   case object INITIALIZED extends State
   case object ACTIVE extends State

http://git-wip-us.apache.org/repos/asf/spark/blob/0cfd6192/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala
new file mode 100644
index 0000000..57b718b
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryInfo.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * A class used to report information about the progress of a 
[[ContinuousQuery]].
+ *
+ * @param name The [[ContinuousQuery]] name.
+ * @param sourceStatuses The current statuses of the [[ContinuousQuery]]'s 
sources.
+ * @param sinkStatus The current status of the [[ContinuousQuery]]'s sink.
+ */
+@Experimental
+class ContinuousQueryInfo private[sql](
+  val name: String,
+  val sourceStatuses: Seq[SourceStatus],
+  val sinkStatus: SinkStatus)

http://git-wip-us.apache.org/repos/asf/spark/blob/0cfd6192/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala
index 6bdd513..dd31114 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ContinuousQueryListener.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.streaming
 
 import org.apache.spark.annotation.Experimental
+import org.apache.spark.scheduler.SparkListenerEvent
 
 /**
  * :: Experimental ::
@@ -70,26 +71,43 @@ abstract class ContinuousQueryListener {
 object ContinuousQueryListener {
 
   /**
-   * Base type of [[ContinuousQueryListener]] events.
+   * :: Experimental ::
+   * Base type of [[ContinuousQueryListener]] events
    * @since 2.0.0
    */
-  trait Event
+  @Experimental
+  trait Event extends SparkListenerEvent
 
   /**
-   * Event representing the start of a query.
+   * :: Experimental ::
+   * Event representing the start of a query
    * @since 2.0.0
    */
-  class QueryStarted private[sql](val query: ContinuousQuery) extends Event
+  @Experimental
+  class QueryStarted private[sql](val queryInfo: ContinuousQueryInfo) extends 
Event
 
   /**
-   * Event representing any progress updates in a query.
+   * :: Experimental ::
+   * Event representing any progress updates in a query
    * @since 2.0.0
    */
-  class QueryProgress private[sql](val query: ContinuousQuery) extends Event
+  @Experimental
+  class QueryProgress private[sql](val queryInfo: ContinuousQueryInfo) extends 
Event
 
   /**
-   * Event representing that termination of a query.
+   * :: Experimental ::
+   * Event representing that termination of a query
+   *
+   * @param queryInfo Information about the status of the query.
+   * @param exception The exception message of the [[ContinuousQuery]] if the 
query was terminated
+   *                  with an exception. Otherwise, it will be `None`.
+   * @param stackTrace The stack trace of the exception if the query was 
terminated with an
+   *                   exception. It will be empty if there was no error.
    * @since 2.0.0
    */
-  class QueryTerminated private[sql](val query: ContinuousQuery) extends Event
+  @Experimental
+  class QueryTerminated private[sql](
+      val queryInfo: ContinuousQueryInfo,
+      val exception: Option[String],
+      val stackTrace: Seq[StackTraceElement]) extends Event
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0cfd6192/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
index 79ddf01..de1efe9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
@@ -18,17 +18,17 @@
 package org.apache.spark.sql.streaming
 
 import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.execution.streaming.{Offset, Sink}
+import org.apache.spark.sql.execution.streaming.Sink
 
 /**
  * :: Experimental ::
  * Status and metrics of a streaming [[Sink]].
  *
  * @param description Description of the source corresponding to this status
- * @param offset      Current offset up to which data has been written by the 
sink
+ * @param offsetDesc Description of the current offset up to which data has 
been written by the sink
  * @since 2.0.0
  */
 @Experimental
 class SinkStatus private[sql](
     val description: String,
-    val offset: Offset)
+    val offsetDesc: String)

http://git-wip-us.apache.org/repos/asf/spark/blob/0cfd6192/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
index 8fccd5b..bd0c848 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
@@ -18,17 +18,17 @@
 package org.apache.spark.sql.streaming
 
 import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.execution.streaming.{Offset, Source}
+import org.apache.spark.sql.execution.streaming.Source
 
 /**
  * :: Experimental ::
  * Status and metrics of a streaming [[Source]].
  *
- * @param description     Description of the source corresponding to this 
status
- * @param offset          Current offset of the source, if known
+ * @param description Description of the source corresponding to this status
+ * @param offsetDesc Description of the current [[Source]] offset if known
  * @since 2.0.0
  */
 @Experimental
 class SourceStatus private[sql] (
     val description: String,
-    val offset: Option[Offset])
+    val offsetDesc: Option[String])

http://git-wip-us.apache.org/repos/asf/spark/blob/0cfd6192/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala
index cdd97da..9b59ab6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQueryListenerSuite.scala
@@ -26,7 +26,9 @@ import org.scalatest.concurrent.Eventually._
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.time.SpanSugar._
 
+import org.apache.spark.SparkException
 import org.apache.spark.sql.execution.streaming._
+import org.apache.spark.util.JsonProtocol
 
 
 class ContinuousQueryListenerSuite extends StreamTest with BeforeAndAfter {
@@ -51,14 +53,13 @@ class ContinuousQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
         Assert("Incorrect query status in onQueryStarted") {
           val status = listener.startStatus
           assert(status != null)
-          assert(status.active == true)
           assert(status.sourceStatuses.size === 1)
           assert(status.sourceStatuses(0).description.contains("Memory"))
 
           // The source and sink offsets must be None as this must be called 
before the
           // batches have started
-          assert(status.sourceStatuses(0).offset === None)
-          assert(status.sinkStatus.offset === CompositeOffset(None :: Nil))
+          assert(status.sourceStatuses(0).offsetDesc === None)
+          assert(status.sinkStatus.offsetDesc === CompositeOffset(None :: 
Nil).toString)
 
           // No progress events or termination events
           assert(listener.progressStatuses.isEmpty)
@@ -73,9 +74,8 @@ class ContinuousQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
             assert(listener.progressStatuses.size === 1)
             val status = listener.progressStatuses.peek()
             assert(status != null)
-            assert(status.active == true)
-            assert(status.sourceStatuses(0).offset === Some(LongOffset(0)))
-            assert(status.sinkStatus.offset === 
CompositeOffset.fill(LongOffset(0)))
+            assert(status.sourceStatuses(0).offsetDesc === 
Some(LongOffset(0).toString))
+            assert(status.sinkStatus.offsetDesc === 
CompositeOffset.fill(LongOffset(0)).toString)
 
             // No termination events
             assert(listener.terminationStatus === null)
@@ -86,10 +86,8 @@ class ContinuousQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
           eventually(Timeout(streamingTimeout)) {
             val status = listener.terminationStatus
             assert(status != null)
-
-            assert(status.active === false) // must be inactive by the time 
onQueryTerm is called
-            assert(status.sourceStatuses(0).offset === Some(LongOffset(0)))
-            assert(status.sinkStatus.offset === 
CompositeOffset.fill(LongOffset(0)))
+            assert(status.sourceStatuses(0).offsetDesc === 
Some(LongOffset(0).toString))
+            assert(status.sinkStatus.offsetDesc === 
CompositeOffset.fill(LongOffset(0)).toString)
           }
           listener.checkAsyncErrors()
         }
@@ -141,6 +139,92 @@ class ContinuousQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
     }
   }
 
+  test("exception should be reported in QueryTerminated") {
+    val listener = new QueryStatusCollector
+    withListenerAdded(listener) {
+      val input = MemoryStream[Int]
+      testStream(input.toDS.map(_ / 0))(
+        StartStream(),
+        AddData(input, 1),
+        ExpectFailure[SparkException](),
+        Assert {
+          spark.sparkContext.listenerBus.waitUntilEmpty(10000)
+          assert(listener.terminationStatus !== null)
+          assert(listener.terminationException.isDefined)
+          
assert(listener.terminationException.get.contains("java.lang.ArithmeticException"))
+          assert(listener.terminationStackTrace.nonEmpty)
+        }
+      )
+    }
+  }
+
+  test("QueryStarted serialization") {
+    val queryStartedInfo = new ContinuousQueryInfo(
+      "name",
+      Seq(new SourceStatus("source1", None), new SourceStatus("source2", 
None)),
+      new SinkStatus("sink", CompositeOffset(None :: None :: Nil).toString))
+    val queryStarted = new 
ContinuousQueryListener.QueryStarted(queryStartedInfo)
+    val json = JsonProtocol.sparkEventToJson(queryStarted)
+    val newQueryStarted = JsonProtocol.sparkEventFromJson(json)
+      .asInstanceOf[ContinuousQueryListener.QueryStarted]
+    assertContinuousQueryInfoEquals(queryStarted.queryInfo, 
newQueryStarted.queryInfo)
+  }
+
+  test("QueryProgress serialization") {
+    val queryProcessInfo = new ContinuousQueryInfo(
+      "name",
+      Seq(
+        new SourceStatus("source1", Some(LongOffset(0).toString)),
+        new SourceStatus("source2", Some(LongOffset(1).toString))),
+      new SinkStatus("sink", new CompositeOffset(Array(None, 
Some(LongOffset(1)))).toString))
+    val queryProcess = new 
ContinuousQueryListener.QueryProgress(queryProcessInfo)
+    val json = JsonProtocol.sparkEventToJson(queryProcess)
+    val newQueryProcess = JsonProtocol.sparkEventFromJson(json)
+      .asInstanceOf[ContinuousQueryListener.QueryProgress]
+    assertContinuousQueryInfoEquals(queryProcess.queryInfo, 
newQueryProcess.queryInfo)
+  }
+
+  test("QueryTerminated serialization") {
+    val queryTerminatedInfo = new ContinuousQueryInfo(
+      "name",
+      Seq(
+        new SourceStatus("source1", Some(LongOffset(0).toString)),
+        new SourceStatus("source2", Some(LongOffset(1).toString))),
+      new SinkStatus("sink", new CompositeOffset(Array(None, 
Some(LongOffset(1)))).toString))
+    val exception = new RuntimeException("exception")
+    val queryQueryTerminated = new ContinuousQueryListener.QueryTerminated(
+      queryTerminatedInfo,
+      Some(exception.getMessage),
+      exception.getStackTrace)
+    val json =
+      JsonProtocol.sparkEventToJson(queryQueryTerminated)
+    val newQueryTerminated = JsonProtocol.sparkEventFromJson(json)
+      .asInstanceOf[ContinuousQueryListener.QueryTerminated]
+    assertContinuousQueryInfoEquals(queryQueryTerminated.queryInfo, 
newQueryTerminated.queryInfo)
+    assert(queryQueryTerminated.exception === newQueryTerminated.exception)
+  }
+
+  private def assertContinuousQueryInfoEquals(
+      expected: ContinuousQueryInfo,
+      actual: ContinuousQueryInfo): Unit = {
+    assert(expected.name === actual.name)
+    assert(expected.sourceStatuses.size === actual.sourceStatuses.size)
+    expected.sourceStatuses.zip(actual.sourceStatuses).foreach {
+      case (expectedSource, actualSource) =>
+        assertSourceStatus(expectedSource, actualSource)
+    }
+    assertSinkStatus(expected.sinkStatus, actual.sinkStatus)
+  }
+
+  private def assertSourceStatus(expected: SourceStatus, actual: 
SourceStatus): Unit = {
+    assert(expected.description === actual.description)
+    assert(expected.offsetDesc === actual.offsetDesc)
+  }
+
+  private def assertSinkStatus(expected: SinkStatus, actual: SinkStatus): Unit 
= {
+    assert(expected.description === actual.description)
+    assert(expected.offsetDesc === actual.offsetDesc)
+  }
 
   private def withListenerAdded(listener: ContinuousQueryListener)(body: => 
Unit): Unit = {
     try {
@@ -164,9 +248,12 @@ class ContinuousQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
     // to catch errors in the async listener events
     @volatile private var asyncTestWaiter = new Waiter
 
-    @volatile var startStatus: QueryStatus = null
-    @volatile var terminationStatus: QueryStatus = null
-    val progressStatuses = new ConcurrentLinkedQueue[QueryStatus]
+    @volatile var startStatus: ContinuousQueryInfo = null
+    @volatile var terminationStatus: ContinuousQueryInfo = null
+    @volatile var terminationException: Option[String] = null
+    @volatile var terminationStackTrace: Seq[StackTraceElement] = null
+
+    val progressStatuses = new ConcurrentLinkedQueue[ContinuousQueryInfo]
 
     def reset(): Unit = {
       startStatus = null
@@ -182,35 +269,25 @@ class ContinuousQueryListenerSuite extends StreamTest 
with BeforeAndAfter {
 
     override def onQueryStarted(queryStarted: QueryStarted): Unit = {
       asyncTestWaiter {
-        startStatus = QueryStatus(queryStarted.query)
+        startStatus = queryStarted.queryInfo
       }
     }
 
     override def onQueryProgress(queryProgress: QueryProgress): Unit = {
       asyncTestWaiter {
         assert(startStatus != null, "onQueryProgress called before 
onQueryStarted")
-        progressStatuses.add(QueryStatus(queryProgress.query))
+        progressStatuses.add(queryProgress.queryInfo)
       }
     }
 
     override def onQueryTerminated(queryTerminated: QueryTerminated): Unit = {
       asyncTestWaiter {
         assert(startStatus != null, "onQueryTerminated called before 
onQueryStarted")
-        terminationStatus = QueryStatus(queryTerminated.query)
+        terminationStatus = queryTerminated.queryInfo
+        terminationException = queryTerminated.exception
+        terminationStackTrace = queryTerminated.stackTrace
       }
       asyncTestWaiter.dismiss()
     }
   }
-
-  case class QueryStatus(
-    active: Boolean,
-    exception: Option[Exception],
-    sourceStatuses: Array[SourceStatus],
-    sinkStatus: SinkStatus)
-
-  object QueryStatus {
-    def apply(query: ContinuousQuery): QueryStatus = {
-      QueryStatus(query.isActive, query.exception, query.sourceStatuses, 
query.sinkStatus)
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/0cfd6192/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
index e4ca86d..5542405 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/ContinuousQuerySuite.scala
@@ -66,21 +66,21 @@ class ContinuousQuerySuite extends StreamTest {
     testStream(mapped)(
       AssertOnQuery(_.sourceStatuses.length === 1),
       AssertOnQuery(_.sourceStatuses(0).description.contains("Memory")),
-      AssertOnQuery(_.sourceStatuses(0).offset === None),
+      AssertOnQuery(_.sourceStatuses(0).offsetDesc === None),
       AssertOnQuery(_.sinkStatus.description.contains("Memory")),
-      AssertOnQuery(_.sinkStatus.offset === new CompositeOffset(None :: Nil)),
+      AssertOnQuery(_.sinkStatus.offsetDesc === new CompositeOffset(None :: 
Nil).toString),
       AddData(inputData, 1, 2),
       CheckAnswer(6, 3),
-      AssertOnQuery(_.sourceStatuses(0).offset === Some(LongOffset(0))),
-      AssertOnQuery(_.sinkStatus.offset === 
CompositeOffset.fill(LongOffset(0))),
+      AssertOnQuery(_.sourceStatuses(0).offsetDesc === 
Some(LongOffset(0).toString)),
+      AssertOnQuery(_.sinkStatus.offsetDesc === 
CompositeOffset.fill(LongOffset(0)).toString),
       AddData(inputData, 1, 2),
       CheckAnswer(6, 3, 6, 3),
-      AssertOnQuery(_.sourceStatuses(0).offset === Some(LongOffset(1))),
-      AssertOnQuery(_.sinkStatus.offset === 
CompositeOffset.fill(LongOffset(1))),
+      AssertOnQuery(_.sourceStatuses(0).offsetDesc === 
Some(LongOffset(1).toString)),
+      AssertOnQuery(_.sinkStatus.offsetDesc === 
CompositeOffset.fill(LongOffset(1)).toString),
       AddData(inputData, 0),
       ExpectFailure[SparkException],
-      AssertOnQuery(_.sourceStatuses(0).offset === Some(LongOffset(2))),
-      AssertOnQuery(_.sinkStatus.offset === 
CompositeOffset.fill(LongOffset(1)))
+      AssertOnQuery(_.sourceStatuses(0).offsetDesc === 
Some(LongOffset(2).toString)),
+      AssertOnQuery(_.sinkStatus.offsetDesc === 
CompositeOffset.fill(LongOffset(1)).toString)
     )
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to