Repository: spark
Updated Branches:
  refs/heads/master 9a02f6821 -> c3d08e2f2


http://git-wip-us.apache.org/repos/asf/spark/blob/c3d08e2f/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
new file mode 100644
index 0000000..7129fa4
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.{util => ju}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import org.apache.jute.compiler.JLong
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.Experimental
+
+/**
+ * :: Experimental ::
+ * Information about updates made to stateful operators in a 
[[StreamingQuery]] during a trigger.
+ */
+@Experimental
+class StateOperatorProgress private[sql](
+    val numRowsTotal: Long,
+    val numRowsUpdated: Long) {
+  private[sql] def jsonValue: JValue = {
+    ("numRowsTotal" -> JInt(numRowsTotal)) ~
+    ("numRowsUpdated" -> JInt(numRowsUpdated))
+  }
+}
+
+/**
+ * :: Experimental ::
+ * Information about progress made in the execution of a [[StreamingQuery]] 
during
+ * a trigger. Each event relates to processing done for a single trigger of 
the streaming
+ * query. Events are emitted even when no new data is available to be 
processed.
+ *
+ * @param id A unique id of the query.
+ * @param name Name of the query. This name is unique across all active 
queries.
+ * @param timestamp Timestamp (ms) of the beginning of the trigger.
+ * @param batchId A unique id for the current batch of data being processed.  
Note that in the
+ *                case of retries after a failure a given batchId my be 
executed more than once.
+ *                Similarly, when there is no data to be processed, the 
batchId will not be
+ *                incremented.
+ * @param durationMs The amount of time taken to perform various operations in 
milliseconds.
+ * @param currentWatermark The current event time watermark in milliseconds
+ * @param stateOperators Information about operators in the query that store 
state.
+ * @param sources detailed statistics on data being read from each of the 
streaming sources.
+ * @since 2.1.0
+ */
+@Experimental
+class StreamingQueryProgress private[sql](
+  val id: UUID,
+  val name: String,
+  val timestamp: Long,
+  val batchId: Long,
+  val durationMs: ju.Map[String, java.lang.Long],
+  val currentWatermark: Long,
+  val stateOperators: Array[StateOperatorProgress],
+  val sources: Array[SourceProgress],
+  val sink: SinkProgress) {
+
+  /** The aggregate (across all sources) number of records processed in a 
trigger. */
+  def numInputRows: Long = sources.map(_.numInputRows).sum
+
+  /** The aggregate (across all sources) rate of data arriving. */
+  def inputRowsPerSecond: Double = sources.map(_.inputRowsPerSecond).sum
+
+  /** The aggregate (across all sources) rate at which Spark is processing 
data. */
+  def processedRowsPerSecond: Double = 
sources.map(_.processedRowsPerSecond).sum
+
+  /** The compact JSON representation of this status. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this status. */
+  def prettyJson: String = pretty(render(jsonValue))
+
+  override def toString: String = prettyJson
+
+  private[sql] def jsonValue: JValue = {
+    def safeDoubleToJValue(value: Double): JValue = {
+      if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
+    }
+
+    ("id" -> JString(id.toString)) ~
+    ("name" -> JString(name)) ~
+    ("timestamp" -> JInt(timestamp)) ~
+    ("numInputRows" -> JInt(numInputRows)) ~
+    ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
+    ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
+    ("durationMs" -> durationMs
+        .asScala
+        .map { case (k, v) => k -> JInt(v.toLong): JObject }
+        .reduce(_ ~ _)) ~
+    ("currentWatermark" -> JInt(currentWatermark)) ~
+    ("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~
+    ("sources" -> JArray(sources.map(_.jsonValue).toList)) ~
+    ("sink" -> sink.jsonValue)
+
+  }
+}
+
+/**
+ * :: Experimental ::
+ * Information about progress made for a source in the execution of a 
[[StreamingQuery]]
+ * during a trigger. See [[StreamingQueryProgress]] for more information.
+ *
+ * @param description            Description of the source.
+ * @param startOffset            The starting offset for data being read.
+ * @param endOffset              The ending offset for data being read.
+ * @param numInputRows           The number of records read from this source.
+ * @param inputRowsPerSecond     The rate at which data is arriving from this 
source.
+ * @param processedRowsPerSecond The rate at which data from this source is 
being procressed by
+ *                               Spark.
+ * @since 2.1.0
+ */
+@Experimental
+class SourceProgress protected[sql](
+  val description: String,
+  val startOffset: String,
+  val endOffset: String,
+  val numInputRows: Long,
+  val inputRowsPerSecond: Double,
+  val processedRowsPerSecond: Double) {
+
+  /** The compact JSON representation of this progress. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this progress. */
+  def prettyJson: String = pretty(render(jsonValue))
+
+  override def toString: String = prettyJson
+
+  private[sql] def jsonValue: JValue = {
+    def safeDoubleToJValue(value: Double): JValue = {
+      if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
+    }
+
+    ("description" -> JString(description)) ~
+      ("startOffset" -> tryParse(startOffset)) ~
+      ("endOffset" -> tryParse(endOffset)) ~
+      ("numInputRows" -> JInt(numInputRows)) ~
+      ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
+      ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond))
+  }
+
+  private def tryParse(json: String) = try {
+    parse(json)
+  } catch {
+    case NonFatal(e) => JString(json)
+  }
+}
+
+/**
+ * :: Experimental ::
+ * Information about progress made for a sink in the execution of a 
[[StreamingQuery]]
+ * during a trigger. See [[StreamingQueryProgress]] for more information.
+ *
+ * @param description Description of the source corresponding to this status.
+ * @since 2.1.0
+ */
+@Experimental
+class SinkProgress protected[sql](
+    val description: String) {
+
+  /** The compact JSON representation of this status. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this status. */
+  def prettyJson: String = pretty(render(jsonValue))
+
+  override def toString: String = prettyJson
+
+  private[sql] def jsonValue: JValue = {
+    ("description" -> JString(description))
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c3d08e2f/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala
deleted file mode 100644
index 38c4ece..0000000
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/StreamMetricsSuite.scala
+++ /dev/null
@@ -1,213 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.streaming
-
-import org.scalactic.TolerantNumerics
-
-import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.types.{StructField, StructType}
-import org.apache.spark.util.ManualClock
-
-class StreamMetricsSuite extends SparkFunSuite {
-  import StreamMetrics._
-
-  // To make === between double tolerate inexact values
-  implicit val doubleEquality = TolerantNumerics.tolerantDoubleEquality(0.01)
-
-  test("rates, latencies, trigger details - basic life cycle") {
-    val sm = newStreamMetrics(source)
-    assert(sm.currentInputRate() === 0.0)
-    assert(sm.currentProcessingRate() === 0.0)
-    assert(sm.currentSourceInputRate(source) === 0.0)
-    assert(sm.currentSourceProcessingRate(source) === 0.0)
-    assert(sm.currentLatency() === None)
-    assert(sm.currentTriggerDetails().isEmpty)
-
-    // When trigger started, the rates should not change, but should return
-    // reported trigger details
-    sm.reportTriggerStarted(1)
-    sm.reportTriggerDetail("key", "value")
-    sm.reportSourceTriggerDetail(source, "key2", "value2")
-    assert(sm.currentInputRate() === 0.0)
-    assert(sm.currentProcessingRate() === 0.0)
-    assert(sm.currentSourceInputRate(source) === 0.0)
-    assert(sm.currentSourceProcessingRate(source) === 0.0)
-    assert(sm.currentLatency() === None)
-    assert(sm.currentTriggerDetails() ===
-      Map(BATCH_ID -> "1", IS_TRIGGER_ACTIVE -> "true",
-        START_TIMESTAMP -> "0", "key" -> "value"))
-    assert(sm.currentSourceTriggerDetails(source) ===
-      Map(BATCH_ID -> "1", "key2" -> "value2"))
-
-    // Finishing the trigger should calculate the rates, except input rate 
which needs
-    // to have another trigger interval
-    sm.reportNumInputRows(Map(source -> 100L)) // 100 input rows, 10 output 
rows
-    clock.advance(1000)
-    sm.reportTriggerFinished()
-    assert(sm.currentInputRate() === 0.0)
-    assert(sm.currentProcessingRate() === 100.0)  // 100 input rows processed 
in 1 sec
-    assert(sm.currentSourceInputRate(source) === 0.0)
-    assert(sm.currentSourceProcessingRate(source) === 100.0)
-    assert(sm.currentLatency() === None)
-    assert(sm.currentTriggerDetails() ===
-      Map(BATCH_ID -> "1", IS_TRIGGER_ACTIVE -> "false",
-        START_TIMESTAMP -> "0", FINISH_TIMESTAMP -> "1000",
-        NUM_INPUT_ROWS -> "100", "key" -> "value"))
-    assert(sm.currentSourceTriggerDetails(source) ===
-      Map(BATCH_ID -> "1", NUM_SOURCE_INPUT_ROWS -> "100", "key2" -> "value2"))
-
-    // After another trigger starts, the rates and latencies should not change 
until
-    // new rows are reported
-    clock.advance(1000)
-    sm.reportTriggerStarted(2)
-    assert(sm.currentInputRate() === 0.0)
-    assert(sm.currentProcessingRate() === 100.0)
-    assert(sm.currentSourceInputRate(source) === 0.0)
-    assert(sm.currentSourceProcessingRate(source) === 100.0)
-    assert(sm.currentLatency() === None)
-
-    // Reporting new rows should update the rates and latencies
-    sm.reportNumInputRows(Map(source -> 200L))     // 200 input rows
-    clock.advance(500)
-    sm.reportTriggerFinished()
-    assert(sm.currentInputRate() === 100.0)      // 200 input rows generated 
in 2 seconds b/w starts
-    assert(sm.currentProcessingRate() === 400.0) // 200 output rows processed 
in 0.5 sec
-    assert(sm.currentSourceInputRate(source) === 100.0)
-    assert(sm.currentSourceProcessingRate(source) === 400.0)
-    assert(sm.currentLatency().get === 1500.0)       // 2000 ms / 2 + 500 ms
-
-    // Rates should be set to 0 after stop
-    sm.stop()
-    assert(sm.currentInputRate() === 0.0)
-    assert(sm.currentProcessingRate() === 0.0)
-    assert(sm.currentSourceInputRate(source) === 0.0)
-    assert(sm.currentSourceProcessingRate(source) === 0.0)
-    assert(sm.currentLatency() === None)
-    assert(sm.currentTriggerDetails().isEmpty)
-  }
-
-  test("rates and latencies - after trigger with no data") {
-    val sm = newStreamMetrics(source)
-    // Trigger 1 with data
-    sm.reportTriggerStarted(1)
-    sm.reportNumInputRows(Map(source -> 100L)) // 100 input rows
-    clock.advance(1000)
-    sm.reportTriggerFinished()
-
-    // Trigger 2 with data
-    clock.advance(1000)
-    sm.reportTriggerStarted(2)
-    sm.reportNumInputRows(Map(source -> 200L)) // 200 input rows
-    clock.advance(500)
-    sm.reportTriggerFinished()
-
-    // Make sure that all rates are set
-    require(sm.currentInputRate() === 100.0) // 200 input rows generated in 2 
seconds b/w starts
-    require(sm.currentProcessingRate() === 400.0) // 200 output rows processed 
in 0.5 sec
-    require(sm.currentSourceInputRate(source) === 100.0)
-    require(sm.currentSourceProcessingRate(source) === 400.0)
-    require(sm.currentLatency().get === 1500.0) // 2000 ms / 2 + 500 ms
-
-    // Trigger 3 with data
-    clock.advance(500)
-    sm.reportTriggerStarted(3)
-    clock.advance(500)
-    sm.reportTriggerFinished()
-
-    // Rates are set to zero and latency is set to None
-    assert(sm.currentInputRate() === 0.0)
-    assert(sm.currentProcessingRate() === 0.0)
-    assert(sm.currentSourceInputRate(source) === 0.0)
-    assert(sm.currentSourceProcessingRate(source) === 0.0)
-    assert(sm.currentLatency() === None)
-    sm.stop()
-  }
-
-  test("rates - after trigger with multiple sources, and one source having no 
info") {
-    val source1 = TestSource(1)
-    val source2 = TestSource(2)
-    val sm = newStreamMetrics(source1, source2)
-    // Trigger 1 with data
-    sm.reportTriggerStarted(1)
-    sm.reportNumInputRows(Map(source1 -> 100L, source2 -> 100L))
-    clock.advance(1000)
-    sm.reportTriggerFinished()
-
-    // Trigger 2 with data
-    clock.advance(1000)
-    sm.reportTriggerStarted(2)
-    sm.reportNumInputRows(Map(source1 -> 200L, source2 -> 200L))
-    clock.advance(500)
-    sm.reportTriggerFinished()
-
-    // Make sure that all rates are set
-    assert(sm.currentInputRate() === 200.0) // 200*2 input rows generated in 2 
seconds b/w starts
-    assert(sm.currentProcessingRate() === 800.0) // 200*2 output rows 
processed in 0.5 sec
-    assert(sm.currentSourceInputRate(source1) === 100.0)
-    assert(sm.currentSourceInputRate(source2) === 100.0)
-    assert(sm.currentSourceProcessingRate(source1) === 400.0)
-    assert(sm.currentSourceProcessingRate(source2) === 400.0)
-
-    // Trigger 3 with data
-    clock.advance(500)
-    sm.reportTriggerStarted(3)
-    clock.advance(500)
-    sm.reportNumInputRows(Map(source1 -> 200L))
-    sm.reportTriggerFinished()
-
-    // Rates are set to zero and latency is set to None
-    assert(sm.currentInputRate() === 200.0)
-    assert(sm.currentProcessingRate() === 400.0)
-    assert(sm.currentSourceInputRate(source1) === 200.0)
-    assert(sm.currentSourceInputRate(source2) === 0.0)
-    assert(sm.currentSourceProcessingRate(source1) === 400.0)
-    assert(sm.currentSourceProcessingRate(source2) === 0.0)
-    sm.stop()
-  }
-
-  test("registered Codahale metrics") {
-    import scala.collection.JavaConverters._
-    val sm = newStreamMetrics(source)
-    val gaugeNames = sm.metricRegistry.getGauges().keySet().asScala
-
-    // so that all metrics are considered as a single metric group in Ganglia
-    assert(!gaugeNames.exists(_.contains(".")))
-    assert(gaugeNames === Set(
-      "inputRate-total",
-      "inputRate-source0",
-      "processingRate-total",
-      "processingRate-source0",
-      "latency"))
-  }
-
-  private def newStreamMetrics(sources: Source*): StreamMetrics = {
-    new StreamMetrics(sources.toSet, clock, "test")
-  }
-
-  private val clock = new ManualClock()
-  private val source = TestSource(0)
-
-  case class TestSource(id: Int) extends Source {
-    override def schema: StructType = StructType(Array.empty[StructField])
-    override def getOffset: Option[Offset] = Some(new LongOffset(0))
-    override def getBatch(start: Option[Offset], end: Offset): DataFrame = { 
null }
-    override def stop() {}
-    override def toString(): String = s"source$id"
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/c3d08e2f/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index bad6642..8256c63 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -1006,9 +1006,13 @@ class FileStreamSourceSuite extends FileStreamSourceTest 
{
       testStream(input)(
         AddTextFileData("100", src, tmp),
         CheckAnswer("100"),
-        AssertOnLastQueryStatus { status =>
-          assert(status.triggerDetails.get("numRows.input.total") === "1")
-          assert(status.sourceStatuses(0).processingRate > 0.0)
+        AssertOnQuery { query =>
+          val actualProgress = query.recentProgresses
+              .find(_.numInputRows > 0)
+              .getOrElse(sys.error("Could not find records with data."))
+          assert(actualProgress.numInputRows === 1)
+          assert(actualProgress.sources(0).processedRowsPerSecond > 0.0)
+          true
         }
       )
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/c3d08e2f/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
index a6b2d4b..a2629f7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamTest.scala
@@ -28,7 +28,6 @@ import scala.util.control.NonFatal
 
 import org.scalatest.Assertions
 import org.scalatest.concurrent.{Eventually, Timeouts}
-import org.scalatest.concurrent.AsyncAssertions.Waiter
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.exceptions.TestFailedDueToTimeoutException
@@ -202,10 +201,7 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
     }
   }
 
-  case class AssertOnLastQueryStatus(condition: StreamingQueryStatus => Unit)
-    extends StreamAction
-
-  class StreamManualClock(time: Long = 0L) extends ManualClock(time) {
+  class StreamManualClock(time: Long = 0L) extends ManualClock(time) with 
Serializable {
     private var waitStartTime: Option[Long] = None
 
     override def waitTillTime(targetTime: Long): Long = synchronized {
@@ -325,10 +321,8 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
 
     val testThread = Thread.currentThread()
     val metadataRoot = Utils.createTempDir(namePrefix = 
"streaming.metadata").getCanonicalPath
-    val statusCollector = new QueryStatusCollector
     var manualClockExpectedTime = -1L
     try {
-      spark.streams.addListener(statusCollector)
       startedTest.foreach { action =>
         logInfo(s"Processing test stream action: $action")
         action match {
@@ -375,10 +369,12 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
                    s"can not advance clock of type 
${currentStream.triggerClock.getClass}")
             val clock = 
currentStream.triggerClock.asInstanceOf[StreamManualClock]
             assert(manualClockExpectedTime >= 0)
+
             // Make sure we don't advance ManualClock too early. See 
SPARK-16002.
             eventually("StreamManualClock has not yet entered the waiting 
state") {
               assert(clock.isStreamWaitingAt(manualClockExpectedTime))
             }
+
             clock.advance(timeToAdd)
             manualClockExpectedTime += timeToAdd
             verify(clock.getTimeMillis() === manualClockExpectedTime,
@@ -447,13 +443,6 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
             val streamToAssert = Option(currentStream).getOrElse(lastStream)
             verify({ a.run(); true }, s"Assert failed: ${a.message}")
 
-          case a: AssertOnLastQueryStatus =>
-            Eventually.eventually(timeout(streamingTimeout)) {
-              require(statusCollector.lastTriggerStatus.nonEmpty)
-            }
-            val status = statusCollector.lastTriggerStatus.get
-            verify({ a.condition(status); true }, "Assert on last query status 
failed")
-
           case a: AddData =>
             try {
               // Add data and get the source where it was added, and the 
expected offset of the
@@ -528,7 +517,6 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
       if (currentStream != null && currentStream.microBatchThread.isAlive) {
         currentStream.stop()
       }
-      spark.streams.removeListener(statusCollector)
 
       // Rollback prev configuration values
       resetConfValues.foreach {
@@ -614,7 +602,6 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
     testStream(ds)(actions: _*)
   }
 
-
   object AwaitTerminationTester {
 
     trait ExpectedBehavior
@@ -668,58 +655,4 @@ trait StreamTest extends QueryTest with SharedSQLContext 
with Timeouts {
       }
     }
   }
-
-
-  class QueryStatusCollector extends StreamingQueryListener {
-    // to catch errors in the async listener events
-    @volatile private var asyncTestWaiter = new Waiter
-
-    @volatile var startStatus: StreamingQueryStatus = null
-    @volatile var terminationStatus: StreamingQueryStatus = null
-    @volatile var terminationException: Option[String] = null
-
-    private val progressStatuses = new 
mutable.ArrayBuffer[StreamingQueryStatus]
-
-    /** Get the info of the last trigger that processed data */
-    def lastTriggerStatus: Option[StreamingQueryStatus] = synchronized {
-      progressStatuses.filter { i =>
-        i.triggerDetails.get("isTriggerActive").toBoolean == false &&
-          i.triggerDetails.get("isDataPresentInTrigger").toBoolean == true
-      }.lastOption
-    }
-
-    def reset(): Unit = {
-      startStatus = null
-      terminationStatus = null
-      progressStatuses.clear()
-      asyncTestWaiter = new Waiter
-    }
-
-    def checkAsyncErrors(): Unit = {
-      asyncTestWaiter.await(timeout(10 seconds))
-    }
-
-
-    override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
-      asyncTestWaiter {
-        startStatus = queryStarted.queryStatus
-      }
-    }
-
-    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
-      asyncTestWaiter {
-        assert(startStatus != null, "onQueryProgress called before 
onQueryStarted")
-        synchronized { progressStatuses += queryProgress.queryStatus }
-      }
-    }
-
-    override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): 
Unit = {
-      asyncTestWaiter {
-        assert(startStatus != null, "onQueryTerminated called before 
onQueryStarted")
-        terminationStatus = queryTerminated.queryStatus
-        terminationException = queryTerminated.exception
-      }
-      asyncTestWaiter.dismiss()
-    }
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c3d08e2f/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index 98f3bec..c68f953 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -17,24 +17,26 @@
 
 package org.apache.spark.sql.streaming
 
+import java.util.UUID
+
 import scala.collection.mutable
 
 import org.scalactic.TolerantNumerics
+import org.scalatest.concurrent.AsyncAssertions.Waiter
+import org.scalatest.concurrent.Eventually._
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.BeforeAndAfter
 import org.scalatest.PrivateMethodTester._
 
 import org.apache.spark.SparkException
 import org.apache.spark.scheduler._
-import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.functions._
-import org.apache.spark.util.{JsonProtocol, ManualClock}
-
+import org.apache.spark.sql.streaming.StreamingQueryListener._
+import org.apache.spark.util.JsonProtocol
 
 class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter {
 
   import testImplicits._
-  import StreamingQueryListenerSuite._
 
   // To make === between double tolerate inexact values
   implicit val doubleEquality = TolerantNumerics.tolerantDoubleEquality(0.01)
@@ -46,86 +48,86 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
     // Make sure we don't leak any events to the next test
   }
 
-  test("single listener, check trigger statuses") {
-    import StreamingQueryListenerSuite._
-    clock = new StreamManualClock
-
-    /** Custom MemoryStream that waits for manual clock to reach a time */
-    val inputData = new MemoryStream[Int](0, sqlContext) {
-      // Wait for manual clock to be 100 first time there is data
-      override def getOffset: Option[Offset] = {
-        val offset = super.getOffset
-        if (offset.nonEmpty) {
-          clock.waitTillTime(100)
+  testQuietly("single listener, check trigger events are generated correctly") 
{
+    val clock = new StreamManualClock
+    val inputData = new MemoryStream[Int](0, sqlContext)
+    val df = inputData.toDS().as[Long].map { 10 / _ }
+    val listener = new EventCollector
+    try {
+      // No events until started
+      spark.streams.addListener(listener)
+      assert(listener.startEvent === null)
+      assert(listener.progressEvents.isEmpty)
+      assert(listener.terminationEvent === null)
+
+      testStream(df, OutputMode.Append)(
+
+        // Start event generated when query started
+        StartStream(ProcessingTime(100), triggerClock = clock),
+        AssertOnQuery { query =>
+          assert(listener.startEvent !== null)
+          assert(listener.startEvent.id === query.id)
+          assert(listener.startEvent.name === query.name)
+          assert(listener.progressEvents.isEmpty)
+          assert(listener.terminationEvent === null)
+          true
+        },
+
+        // Progress event generated when data processed
+        AddData(inputData, 1, 2),
+        AdvanceManualClock(100),
+        CheckAnswer(10, 5),
+        AssertOnQuery { query =>
+          assert(listener.progressEvents.nonEmpty)
+          assert(listener.progressEvents.last.json === query.lastProgress.json)
+          assert(listener.terminationEvent === null)
+          true
+        },
+
+        // Termination event generated when stopped cleanly
+        StopStream,
+        AssertOnQuery { query =>
+          eventually(Timeout(streamingTimeout)) {
+            assert(listener.terminationEvent !== null)
+            assert(listener.terminationEvent.id === query.id)
+            assert(listener.terminationEvent.exception === None)
+          }
+          listener.checkAsyncErrors()
+          listener.reset()
+          true
+        },
+
+        // Termination event generated with exception message when stopped 
with error
+        StartStream(ProcessingTime(100), triggerClock = clock),
+        AddData(inputData, 0),
+        AdvanceManualClock(100),
+        ExpectFailure[SparkException],
+        AssertOnQuery { query =>
+          assert(listener.terminationEvent !== null)
+          assert(listener.terminationEvent.id === query.id)
+          assert(listener.terminationEvent.exception.nonEmpty)
+          listener.checkAsyncErrors()
+          true
         }
-        offset
-      }
-
-      // Wait for manual clock to be 300 first time there is data
-      override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
-        clock.waitTillTime(300)
-        super.getBatch(start, end)
-      }
-    }
-
-    // This is to make sure thatquery waits for manual clock to be 600 first 
time there is data
-    val mapped = inputData.toDS().agg(count("*")).as[Long].coalesce(1).map { x 
=>
-      clock.waitTillTime(600)
-      x
+      )
+    } finally {
+      spark.streams.removeListener(listener)
     }
-
-    testStream(mapped, OutputMode.Complete)(
-      StartStream(triggerClock = clock),
-      AddData(inputData, 1, 2),
-      AdvanceManualClock(100),  // unblock getOffset, will block on getBatch
-      AdvanceManualClock(200),  // unblock getBatch, will block on computation
-      AdvanceManualClock(300),  // unblock computation
-      AssertOnQuery { _ => clock.getTimeMillis() === 600 },
-      AssertOnLastQueryStatus { status: StreamingQueryStatus =>
-        // Check the correctness of the trigger info of the last completed 
batch reported by
-        // onQueryProgress
-        assert(status.triggerDetails.containsKey("batchId"))
-        assert(status.triggerDetails.get("isTriggerActive") === "false")
-        assert(status.triggerDetails.get("isDataPresentInTrigger") === "true")
-
-        assert(status.triggerDetails.get("timestamp.triggerStart") === "0")
-        assert(status.triggerDetails.get("timestamp.afterGetOffset") === "100")
-        assert(status.triggerDetails.get("timestamp.afterGetBatch") === "300")
-        assert(status.triggerDetails.get("timestamp.triggerFinish") === "600")
-
-        assert(status.triggerDetails.get("latency.getOffset.total") === "100")
-        assert(status.triggerDetails.get("latency.getBatch.total") === "200")
-        assert(status.triggerDetails.get("latency.optimizer") === "0")
-        assert(status.triggerDetails.get("latency.offsetLogWrite") === "0")
-        assert(status.triggerDetails.get("latency.fullTrigger") === "600")
-
-        assert(status.triggerDetails.get("numRows.input.total") === "2")
-        assert(status.triggerDetails.get("numRows.state.aggregation1.total") 
=== "1")
-        assert(status.triggerDetails.get("numRows.state.aggregation1.updated") 
=== "1")
-
-        assert(status.sourceStatuses.length === 1)
-        assert(status.sourceStatuses(0).triggerDetails.containsKey("batchId"))
-        
assert(status.sourceStatuses(0).triggerDetails.get("latency.getOffset.source") 
=== "100")
-        
assert(status.sourceStatuses(0).triggerDetails.get("latency.getBatch.source") 
=== "200")
-        
assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") === 
"2")
-      },
-      CheckAnswer(2)
-    )
   }
 
   test("adding and removing listener") {
-    def isListenerActive(listener: QueryStatusCollector): Boolean = {
+    def isListenerActive(listener: EventCollector): Boolean = {
       listener.reset()
       testStream(MemoryStream[Int].toDS)(
         StartStream(),
         StopStream
       )
-      listener.startStatus != null
+      listener.startEvent != null
     }
 
     try {
-      val listener1 = new QueryStatusCollector
-      val listener2 = new QueryStatusCollector
+      val listener1 = new EventCollector
+      val listener2 = new EventCollector
 
       spark.streams.addListener(listener1)
       assert(isListenerActive(listener1) === true)
@@ -142,14 +144,14 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
   }
 
   test("event ordering") {
-    val listener = new QueryStatusCollector
+    val listener = new EventCollector
     withListenerAdded(listener) {
       for (i <- 1 to 100) {
         listener.reset()
-        require(listener.startStatus === null)
+        require(listener.startEvent === null)
         testStream(MemoryStream[Int].toDS)(
           StartStream(),
-          Assert(listener.startStatus !== null, "onQueryStarted not called 
before query returned"),
+          Assert(listener.startEvent !== null, "onQueryStarted not called 
before query returned"),
           StopStream,
           Assert { listener.checkAsyncErrors() }
         )
@@ -158,7 +160,7 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
   }
 
   testQuietly("exception should be reported in QueryTerminated") {
-    val listener = new QueryStatusCollector
+    val listener = new EventCollector
     withListenerAdded(listener) {
       val input = MemoryStream[Int]
       testStream(input.toDS.map(_ / 0))(
@@ -167,49 +169,46 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
         ExpectFailure[SparkException](),
         Assert {
           spark.sparkContext.listenerBus.waitUntilEmpty(10000)
-          assert(listener.terminationStatus !== null)
-          assert(listener.terminationException.isDefined)
+          assert(listener.terminationEvent !== null)
+          assert(listener.terminationEvent.exception.nonEmpty)
           // Make sure that the exception message reported through listener
           // contains the actual exception and relevant stack trace
-          
assert(!listener.terminationException.get.contains("StreamingQueryException"))
-          
assert(listener.terminationException.get.contains("java.lang.ArithmeticException"))
-          
assert(listener.terminationException.get.contains("StreamingQueryListenerSuite"))
+          
assert(!listener.terminationEvent.exception.get.contains("StreamingQueryException"))
+          
assert(listener.terminationEvent.exception.get.contains("java.lang.ArithmeticException"))
+          
assert(listener.terminationEvent.exception.get.contains("StreamingQueryListenerSuite"))
         }
       )
     }
   }
 
-  test("QueryStarted serialization") {
-    val queryStarted = new 
StreamingQueryListener.QueryStartedEvent(StreamingQueryStatus.testStatus)
+  test("QueryStartedEvent serialization") {
+    val queryStarted = new 
StreamingQueryListener.QueryStartedEvent(UUID.randomUUID(), "name")
     val json = JsonProtocol.sparkEventToJson(queryStarted)
     val newQueryStarted = JsonProtocol.sparkEventFromJson(json)
       .asInstanceOf[StreamingQueryListener.QueryStartedEvent]
-    assertStreamingQueryInfoEquals(queryStarted.queryStatus, 
newQueryStarted.queryStatus)
   }
 
-  test("QueryProgress serialization") {
-    val queryProcess = new StreamingQueryListener.QueryProgressEvent(
-      StreamingQueryStatus.testStatus)
-    val json = JsonProtocol.sparkEventToJson(queryProcess)
-    val newQueryProcess = JsonProtocol.sparkEventFromJson(json)
+  test("QueryProgressEvent serialization") {
+    val event = new StreamingQueryListener.QueryProgressEvent(
+      StreamingQueryProgressSuite.testProgress)
+    val json = JsonProtocol.sparkEventToJson(event)
+    val newEvent = JsonProtocol.sparkEventFromJson(json)
       .asInstanceOf[StreamingQueryListener.QueryProgressEvent]
-    assertStreamingQueryInfoEquals(queryProcess.queryStatus, 
newQueryProcess.queryStatus)
+    assert(event.progress.json === newEvent.progress.json)
   }
 
-  test("QueryTerminated serialization") {
+  test("QueryTerminatedEvent serialization") {
     val exception = new RuntimeException("exception")
     val queryQueryTerminated = new StreamingQueryListener.QueryTerminatedEvent(
-      StreamingQueryStatus.testStatus,
-      Some(exception.getMessage))
-    val json =
-      JsonProtocol.sparkEventToJson(queryQueryTerminated)
+      UUID.randomUUID, Some(exception.getMessage))
+    val json = JsonProtocol.sparkEventToJson(queryQueryTerminated)
     val newQueryTerminated = JsonProtocol.sparkEventFromJson(json)
       .asInstanceOf[StreamingQueryListener.QueryTerminatedEvent]
-    assertStreamingQueryInfoEquals(queryQueryTerminated.queryStatus, 
newQueryTerminated.queryStatus)
+    assert(queryQueryTerminated.id === newQueryTerminated.id)
     assert(queryQueryTerminated.exception === newQueryTerminated.exception)
   }
 
-  test("ReplayListenerBus should ignore broken event jsons generated in 
2.0.0") {
+  testQuietly("ReplayListenerBus should ignore broken event jsons generated in 
2.0.0") {
     // query-event-logs-version-2.0.0.txt has all types of events generated by
     // Structured Streaming in Spark 2.0.0.
     // SparkListenerApplicationEnd is the only valid event and it's the last 
event. We use it
@@ -217,7 +216,7 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
     
testReplayListenerBusWithBorkenEventJsons("query-event-logs-version-2.0.0.txt")
   }
 
-  test("ReplayListenerBus should ignore broken event jsons generated in 
2.0.1") {
+  testQuietly("ReplayListenerBus should ignore broken event jsons generated in 
2.0.1") {
     // query-event-logs-version-2.0.1.txt has all types of events generated by
     // Structured Streaming in Spark 2.0.1.
     // SparkListenerApplicationEnd is the only valid event and it's the last 
event. We use it
@@ -248,28 +247,6 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
     }
   }
 
-  private def assertStreamingQueryInfoEquals(
-      expected: StreamingQueryStatus,
-      actual: StreamingQueryStatus): Unit = {
-    assert(expected.name === actual.name)
-    assert(expected.sourceStatuses.size === actual.sourceStatuses.size)
-    expected.sourceStatuses.zip(actual.sourceStatuses).foreach {
-      case (expectedSource, actualSource) =>
-        assertSourceStatus(expectedSource, actualSource)
-    }
-    assertSinkStatus(expected.sinkStatus, actual.sinkStatus)
-  }
-
-  private def assertSourceStatus(expected: SourceStatus, actual: 
SourceStatus): Unit = {
-    assert(expected.description === actual.description)
-    assert(expected.offsetDesc === actual.offsetDesc)
-  }
-
-  private def assertSinkStatus(expected: SinkStatus, actual: SinkStatus): Unit 
= {
-    assert(expected.description === actual.description)
-    assert(expected.offsetDesc === actual.offsetDesc)
-  }
-
   private def withListenerAdded(listener: StreamingQueryListener)(body: => 
Unit): Unit = {
     try {
       failAfter(streamingTimeout) {
@@ -287,9 +264,51 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
     val listenerBus = spark.streams invokePrivate listenerBusMethod()
     listenerBus.listeners.toArray.map(_.asInstanceOf[StreamingQueryListener])
   }
-}
 
-object StreamingQueryListenerSuite {
-  // Singleton reference to clock that does not get serialized in task closures
-  @volatile var clock: ManualClock = null
+  /** Collects events from the StreamingQueryListener for testing */
+  class EventCollector extends StreamingQueryListener {
+    // to catch errors in the async listener events
+    @volatile private var asyncTestWaiter = new Waiter
+
+    @volatile var startEvent: QueryStartedEvent = null
+    @volatile var terminationEvent: QueryTerminatedEvent = null
+
+    private val _progressEvents = new mutable.Queue[StreamingQueryProgress]
+
+    def progressEvents: Seq[StreamingQueryProgress] = 
_progressEvents.synchronized {
+      _progressEvents.filter(_.numInputRows > 0)
+    }
+
+    def reset(): Unit = {
+      startEvent = null
+      terminationEvent = null
+      _progressEvents.clear()
+      asyncTestWaiter = new Waiter
+    }
+
+    def checkAsyncErrors(): Unit = {
+      asyncTestWaiter.await(timeout(streamingTimeout))
+    }
+
+    override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
+      asyncTestWaiter {
+        startEvent = queryStarted
+      }
+    }
+
+    override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
+      asyncTestWaiter {
+        assert(startEvent != null, "onQueryProgress called before 
onQueryStarted")
+        _progressEvents.synchronized { _progressEvents += 
queryProgress.progress }
+      }
+    }
+
+    override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): 
Unit = {
+      asyncTestWaiter {
+        assert(startEvent != null, "onQueryTerminated called before 
onQueryStarted")
+        terminationEvent = queryTerminated
+      }
+      asyncTestWaiter.dismiss()
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/c3d08e2f/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
index 41ffd56..268b8ff 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryManagerSuite.scala
@@ -62,7 +62,7 @@ class StreamingQueryManagerSuite extends StreamTest with 
BeforeAndAfter {
       assert(spark.streams.get(q1.id).eq(q1))
       assert(spark.streams.get(q2.id).eq(q2))
       assert(spark.streams.get(q3.id).eq(q3))
-      assert(spark.streams.get(-1) === null) // non-existent id
+      assert(spark.streams.get(java.util.UUID.randomUUID()) === null) // 
non-existent id
       q1.stop()
 
       assert(spark.streams.active.toSet === Set(q2, q3))

http://git-wip-us.apache.org/repos/asf/spark/blob/c3d08e2f/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala
new file mode 100644
index 0000000..45d29f6
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+
+import org.json4s._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.streaming.StreamingQueryProgressSuite._
+
+
+class StreamingQueryProgressSuite extends SparkFunSuite {
+
+  test("prettyJson") {
+    val json = testProgress.prettyJson
+    assert(json ===
+      s"""
+        |{
+        |  "id" : "${testProgress.id.toString}",
+        |  "name" : "name",
+        |  "timestamp" : 1,
+        |  "numInputRows" : 678,
+        |  "inputRowsPerSecond" : 10.0,
+        |  "durationMs" : {
+        |    "total" : 0
+        |  },
+        |  "currentWatermark" : 3,
+        |  "stateOperators" : [ {
+        |    "numRowsTotal" : 0,
+        |    "numRowsUpdated" : 1
+        |  } ],
+        |  "sources" : [ {
+        |    "description" : "source",
+        |    "startOffset" : 123,
+        |    "endOffset" : 456,
+        |    "numInputRows" : 678,
+        |    "inputRowsPerSecond" : 10.0
+        |  } ],
+        |  "sink" : {
+        |    "description" : "sink"
+        |  }
+        |}
+      """.stripMargin.trim)
+    assert(compact(parse(json)) === testProgress.json)
+
+  }
+
+  test("json") {
+    assert(compact(parse(testProgress.json)) === testProgress.json)
+  }
+
+  test("toString") {
+    assert(testProgress.toString === testProgress.prettyJson)
+  }
+}
+
+object StreamingQueryProgressSuite {
+  val testProgress = new StreamingQueryProgress(
+    id = UUID.randomUUID(),
+    name = "name",
+    timestamp = 1L,
+    batchId = 2L,
+    durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
+    currentWatermark = 3L,
+    stateOperators = Array(new StateOperatorProgress(numRowsTotal = 0, 
numRowsUpdated = 1)),
+    sources = Array(
+      new SourceProgress(
+        description = "source",
+        startOffset = "123",
+        endOffset = "456",
+        numInputRows = 678,
+        inputRowsPerSecond = 10.0,
+        processedRowsPerSecond = Double.PositiveInfinity  // should not be 
present in the json
+      )
+    ),
+    sink = new SinkProgress("sink")
+  )
+}
+

http://git-wip-us.apache.org/repos/asf/spark/blob/c3d08e2f/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala
deleted file mode 100644
index 50a7d92..0000000
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import org.apache.spark.SparkFunSuite
-
-class StreamingQueryStatusSuite extends SparkFunSuite {
-  test("toString") {
-    assert(StreamingQueryStatus.testStatus.sourceStatuses(0).toString ===
-      """
-        |Status of source MySource1
-        |    Available offset: 0
-        |    Input rate: 15.5 rows/sec
-        |    Processing rate: 23.5 rows/sec
-        |    Trigger details:
-        |        numRows.input.source: 100
-        |        latency.getOffset.source: 10
-        |        latency.getBatch.source: 20
-      """.stripMargin.trim, "SourceStatus.toString does not match")
-
-    assert(StreamingQueryStatus.testStatus.sinkStatus.toString ===
-      """
-        |Status of sink MySink
-        |    Committed offsets: [1, -]
-      """.stripMargin.trim, "SinkStatus.toString does not match")
-
-    assert(StreamingQueryStatus.testStatus.toString ===
-      """
-        |Status of query 'query'
-        |    Query id: 1
-        |    Status timestamp: 123
-        |    Input rate: 15.5 rows/sec
-        |    Processing rate 23.5 rows/sec
-        |    Latency: 345.0 ms
-        |    Trigger details:
-        |        batchId: 5
-        |        isDataPresentInTrigger: true
-        |        isTriggerActive: true
-        |        latency.getBatch.total: 20
-        |        latency.getOffset.total: 10
-        |        numRows.input.total: 100
-        |    Source statuses [1 source]:
-        |        Source 1 - MySource1
-        |            Available offset: 0
-        |            Input rate: 15.5 rows/sec
-        |            Processing rate: 23.5 rows/sec
-        |            Trigger details:
-        |                numRows.input.source: 100
-        |                latency.getOffset.source: 10
-        |                latency.getBatch.source: 20
-        |    Sink status - MySink
-        |        Committed offsets: [1, -]
-      """.stripMargin.trim, "StreamingQueryStatus.toString does not match")
-
-  }
-
-  test("json") {
-    assert(StreamingQueryStatus.testStatus.json ===
-      """
-        
|{"name":"query","id":1,"timestamp":123,"inputRate":15.5,"processingRate":23.5,
-        |"latency":345.0,"triggerDetails":{"latency.getBatch.total":"20",
-        |"numRows.input.total":"100","isTriggerActive":"true","batchId":"5",
-        |"latency.getOffset.total":"10","isDataPresentInTrigger":"true"},
-        
|"sourceStatuses":[{"description":"MySource1","offsetDesc":"0","inputRate":15.5,
-        |"processingRate":23.5,"triggerDetails":{"numRows.input.source":"100",
-        |"latency.getOffset.source":"10","latency.getBatch.source":"20"}}],
-        |"sinkStatus":{"description":"MySink","offsetDesc":"[1, -]"}}
-      """.stripMargin.replace("\n", "").trim)
-  }
-
-  test("prettyJson") {
-    assert(
-      StreamingQueryStatus.testStatus.prettyJson ===
-        """
-          |{
-          |  "name" : "query",
-          |  "id" : 1,
-          |  "timestamp" : 123,
-          |  "inputRate" : 15.5,
-          |  "processingRate" : 23.5,
-          |  "latency" : 345.0,
-          |  "triggerDetails" : {
-          |    "latency.getBatch.total" : "20",
-          |    "numRows.input.total" : "100",
-          |    "isTriggerActive" : "true",
-          |    "batchId" : "5",
-          |    "latency.getOffset.total" : "10",
-          |    "isDataPresentInTrigger" : "true"
-          |  },
-          |  "sourceStatuses" : [ {
-          |    "description" : "MySource1",
-          |    "offsetDesc" : "0",
-          |    "inputRate" : 15.5,
-          |    "processingRate" : 23.5,
-          |    "triggerDetails" : {
-          |      "numRows.input.source" : "100",
-          |      "latency.getOffset.source" : "10",
-          |      "latency.getBatch.source" : "20"
-          |    }
-          |  } ],
-          |  "sinkStatus" : {
-          |    "description" : "MySink",
-          |    "offsetDesc" : "[1, -]"
-          |  }
-          |}
-        """.stripMargin.trim)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/c3d08e2f/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 8ecb33c..4f3b4a2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -20,14 +20,15 @@ package org.apache.spark.sql.streaming
 import org.scalactic.TolerantNumerics
 import org.scalatest.concurrent.Eventually._
 import org.scalatest.BeforeAndAfter
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.DataFrame
-import org.apache.spark.sql.streaming.StreamingQueryListener._
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.SparkException
 import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.util.Utils
+import org.apache.spark.sql.functions._
+import org.apache.spark.util.{ManualClock, Utils}
 
 
 class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
@@ -109,85 +110,139 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging {
     )
   }
 
-  testQuietly("query statuses") {
-    val inputData = MemoryStream[Int]
-    val mapped = inputData.toDS().map(6 / _)
-    testStream(mapped)(
-      AssertOnQuery(q => q.status.name === q.name),
-      AssertOnQuery(q => q.status.id === q.id),
-      AssertOnQuery(_.status.timestamp <= System.currentTimeMillis),
-      AssertOnQuery(_.status.inputRate === 0.0),
-      AssertOnQuery(_.status.processingRate === 0.0),
-      AssertOnQuery(_.status.sourceStatuses.length === 1),
-      AssertOnQuery(_.status.sourceStatuses(0).description.contains("Memory")),
-      AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === "-"),
-      AssertOnQuery(_.status.sourceStatuses(0).inputRate === 0.0),
-      AssertOnQuery(_.status.sourceStatuses(0).processingRate === 0.0),
-      AssertOnQuery(_.status.sinkStatus.description.contains("Memory")),
-      AssertOnQuery(_.status.sinkStatus.offsetDesc === OffsetSeq(None :: 
Nil).toString),
-      AssertOnQuery(_.sourceStatuses(0).description.contains("Memory")),
-      AssertOnQuery(_.sourceStatuses(0).offsetDesc === "-"),
-      AssertOnQuery(_.sourceStatuses(0).inputRate === 0.0),
-      AssertOnQuery(_.sourceStatuses(0).processingRate === 0.0),
-      AssertOnQuery(_.sinkStatus.description.contains("Memory")),
-      AssertOnQuery(_.sinkStatus.offsetDesc === new OffsetSeq(None :: 
Nil).toString),
+  testQuietly("query statuses and progresses") {
+    import StreamingQuerySuite._
+    clock = new StreamManualClock
+
+    /** Custom MemoryStream that waits for manual clock to reach a time */
+    val inputData = new MemoryStream[Int](0, sqlContext) {
+      // Wait for manual clock to be 100 first time there is data
+      override def getOffset: Option[Offset] = {
+        val offset = super.getOffset
+        if (offset.nonEmpty) {
+          clock.waitTillTime(300)
+        }
+        offset
+      }
 
-      AddData(inputData, 1, 2),
-      CheckAnswer(6, 3),
-      AssertOnQuery(_.status.timestamp <= System.currentTimeMillis),
-      AssertOnQuery(_.status.inputRate >= 0.0),
-      AssertOnQuery(_.status.processingRate >= 0.0),
-      AssertOnQuery(_.status.sourceStatuses.length === 1),
-      AssertOnQuery(_.status.sourceStatuses(0).description.contains("Memory")),
-      AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === 
LongOffset(0).json),
-      AssertOnQuery(_.status.sourceStatuses(0).inputRate >= 0.0),
-      AssertOnQuery(_.status.sourceStatuses(0).processingRate >= 0.0),
-      AssertOnQuery(_.status.sinkStatus.description.contains("Memory")),
-      AssertOnQuery(_.status.sinkStatus.offsetDesc ===
-        OffsetSeq.fill(LongOffset(0)).toString),
-      AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(0).json),
-      AssertOnQuery(_.sourceStatuses(0).inputRate >= 0.0),
-      AssertOnQuery(_.sourceStatuses(0).processingRate >= 0.0),
-      AssertOnQuery(_.sinkStatus.offsetDesc === 
OffsetSeq.fill(LongOffset(0)).toString),
+      // Wait for manual clock to be 300 first time there is data
+      override def getBatch(start: Option[Offset], end: Offset): DataFrame = {
+        clock.waitTillTime(600)
+        super.getBatch(start, end)
+      }
+    }
 
-      AddData(inputData, 1, 2),
-      CheckAnswer(6, 3, 6, 3),
-      AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === 
LongOffset(1).json),
-      AssertOnQuery(_.status.sinkStatus.offsetDesc ===
-        OffsetSeq.fill(LongOffset(1)).toString),
-      AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(1).json),
-      AssertOnQuery(_.sinkStatus.offsetDesc === 
OffsetSeq.fill(LongOffset(1)).toString),
+    // This is to make sure thatquery waits for manual clock to be 600 first 
time there is data
+    val mapped = inputData.toDS().agg(count("*")).as[Long].coalesce(1).map { x 
=>
+      clock.waitTillTime(1100)
+      x
+    }
 
-      StopStream,
-      AssertOnQuery(_.status.inputRate === 0.0),
-      AssertOnQuery(_.status.processingRate === 0.0),
-      AssertOnQuery(_.status.sourceStatuses.length === 1),
-      AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === 
LongOffset(1).json),
-      AssertOnQuery(_.status.sourceStatuses(0).inputRate === 0.0),
-      AssertOnQuery(_.status.sourceStatuses(0).processingRate === 0.0),
-      AssertOnQuery(_.status.sinkStatus.offsetDesc ===
-        OffsetSeq.fill(LongOffset(1)).toString),
-      AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(1).json),
-      AssertOnQuery(_.sourceStatuses(0).inputRate === 0.0),
-      AssertOnQuery(_.sourceStatuses(0).processingRate === 0.0),
-      AssertOnQuery(_.sinkStatus.offsetDesc === 
OffsetSeq.fill(LongOffset(1)).toString),
-      AssertOnQuery(_.status.triggerDetails.isEmpty),
+    case class AssertStreamExecThreadToWaitForClock()
+      extends AssertOnQuery(q => {
+        eventually(Timeout(streamingTimeout)) {
+          if (q.exception.isEmpty) {
+            
assert(clock.asInstanceOf[StreamManualClock].isStreamWaitingAt(clock.getTimeMillis))
+          }
+        }
+        if (q.exception.isDefined) {
+          throw q.exception.get
+        }
+        true
+      }, "")
+
+    testStream(mapped, OutputMode.Complete)(
+      StartStream(ProcessingTime(100), triggerClock = clock),
+      AssertStreamExecThreadToWaitForClock(),
+      AssertOnQuery(_.status.isDataAvailable === false),
+      AssertOnQuery(_.status.isTriggerActive === false),
+      // TODO: test status.message before trigger has started
+      // AssertOnQuery(_.lastProgress === null)  // there is an empty trigger 
as soon as started
+      AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
+
+      // Test status while offset is being fetched
+      AddData(inputData, 1, 2),
+      AdvanceManualClock(100), // time = 100 to start new trigger, will block 
on getOffset
+      AssertStreamExecThreadToWaitForClock(),
+      AssertOnQuery(_.status.isDataAvailable === false),
+      AssertOnQuery(_.status.isTriggerActive === true),
+      AssertOnQuery(_.status.message.toLowerCase.contains("getting offsets 
from")),
+      AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
+
+      // Test status while batch is being fetched
+      AdvanceManualClock(200), // time = 300 to unblock getOffset, will block 
on getBatch
+      AssertStreamExecThreadToWaitForClock(),
+      AssertOnQuery(_.status.isDataAvailable === true),
+      AssertOnQuery(_.status.isTriggerActive === true),
+      AssertOnQuery(_.status.message === "Processing new data"),
+      AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
+
+      // Test status while batch is being processed
+      AdvanceManualClock(300), // time = 600 to unblock getBatch, will block 
in Spark job
+      AssertOnQuery(_.status.isDataAvailable === true),
+      AssertOnQuery(_.status.isTriggerActive === true),
+      AssertOnQuery(_.status.message === "Processing new data"),
+      AssertOnQuery(_.recentProgresses.count(_.numInputRows > 0) === 0),
+
+      // Test status while batch processing has completed
+      AdvanceManualClock(500), // time = 1100 to unblock job
+      AssertOnQuery { _ => clock.getTimeMillis() === 1100 },
+      CheckAnswer(2),
+      AssertOnQuery(_.status.isDataAvailable === true),
+      AssertOnQuery(_.status.isTriggerActive === false),
+      AssertOnQuery(_.status.message === "Waiting for next trigger"),
+      AssertOnQuery { query =>
+        assert(query.lastProgress != null)
+        assert(query.recentProgresses.exists(_.numInputRows > 0))
+        assert(query.recentProgresses.last.eq(query.lastProgress))
+
+        val progress = query.lastProgress
+        assert(progress.id === query.id)
+        assert(progress.name === query.name)
+        assert(progress.batchId === 0)
+        assert(progress.timestamp === 100)
+        assert(progress.numInputRows === 2)
+        assert(progress.processedRowsPerSecond === 2.0)
+
+        assert(progress.durationMs.get("getOffset") === 200)
+        assert(progress.durationMs.get("getBatch") === 300)
+        assert(progress.durationMs.get("queryPlanning") === 0)
+        assert(progress.durationMs.get("walCommit") === 0)
+        assert(progress.durationMs.get("triggerExecution") === 1000)
+
+        assert(progress.sources.length === 1)
+        assert(progress.sources(0).description contains "MemoryStream")
+        assert(progress.sources(0).startOffset === null)
+        assert(progress.sources(0).endOffset !== null)
+        assert(progress.sources(0).processedRowsPerSecond === 2.0)
+
+        assert(progress.stateOperators.length === 1)
+        assert(progress.stateOperators(0).numRowsUpdated === 1)
+        assert(progress.stateOperators(0).numRowsTotal === 1)
+
+        assert(progress.sink.description contains "MemorySink")
+        true
+      },
 
-      StartStream(),
-      AddData(inputData, 0),
-      ExpectFailure[SparkException],
-      AssertOnQuery(_.status.inputRate === 0.0),
-      AssertOnQuery(_.status.processingRate === 0.0),
-      AssertOnQuery(_.status.sourceStatuses.length === 1),
-      AssertOnQuery(_.status.sourceStatuses(0).offsetDesc === 
LongOffset(2).json),
-      AssertOnQuery(_.status.sourceStatuses(0).inputRate === 0.0),
-      AssertOnQuery(_.status.sourceStatuses(0).processingRate === 0.0),
-      AssertOnQuery(_.status.sinkStatus.offsetDesc ===
-        OffsetSeq.fill(LongOffset(1)).toString),
-      AssertOnQuery(_.sourceStatuses(0).offsetDesc === LongOffset(2).json),
-      AssertOnQuery(_.sourceStatuses(0).inputRate === 0.0),
-      AssertOnQuery(_.sourceStatuses(0).processingRate === 0.0),
-      AssertOnQuery(_.sinkStatus.offsetDesc === 
OffsetSeq.fill(LongOffset(1)).toString)
+      AddData(inputData, 1, 2),
+      AdvanceManualClock(100), // allow another trigger
+      CheckAnswer(4),
+      AssertOnQuery(_.status.isDataAvailable === true),
+      AssertOnQuery(_.status.isTriggerActive === false),
+      AssertOnQuery(_.status.message === "Waiting for next trigger"),
+      AssertOnQuery { query =>
+        assert(query.recentProgresses.last.eq(query.lastProgress))
+        assert(query.lastProgress.batchId === 1)
+        assert(query.lastProgress.sources(0).inputRowsPerSecond === 1.818)
+        true
+      },
+
+      // Test status after data is not available for a trigger
+      AdvanceManualClock(100), // allow another trigger
+      AssertStreamExecThreadToWaitForClock(),
+      AssertOnQuery(_.status.isDataAvailable === false),
+      AssertOnQuery(_.status.isTriggerActive === false),
+      AssertOnQuery(_.status.message === "Waiting for next trigger")
     )
   }
 
@@ -196,7 +251,7 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging {
 
     /** Whether metrics of a query is registered for reporting */
     def isMetricsRegistered(query: StreamingQuery): Boolean = {
-      val sourceName = s"StructuredStreaming.${query.name}"
+      val sourceName = s"spark.streaming.${query.name}"
       val sources = 
spark.sparkContext.env.metricsSystem.getSourcesByName(sourceName)
       require(sources.size <= 1)
       sources.nonEmpty
@@ -229,23 +284,23 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging {
 
     // Trigger input has 10 rows, static input has 2 rows,
     // therefore after the first trigger, the calculated input rows should be 
10
-    val status = getFirstTriggerStatus(streamingInputDF.join(staticInputDF, 
"value"))
-    assert(status.triggerDetails.get("numRows.input.total") === "10")
-    assert(status.sourceStatuses.size === 1)
-    assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") 
=== "10")
+    val progress = getFirstProgress(streamingInputDF.join(staticInputDF, 
"value"))
+    assert(progress.numInputRows === 10)
+    assert(progress.sources.size === 1)
+    assert(progress.sources(0).numInputRows === 10)
   }
 
-  test("input row calculation with trigger DF having multiple leaves") {
+  test("input row calculation with trigger input DF having multiple leaves") {
     val streamingTriggerDF =
       spark.createDataset(1 to 5).toDF.union(spark.createDataset(6 to 10).toDF)
     require(streamingTriggerDF.logicalPlan.collectLeaves().size > 1)
     val streamingInputDF = createSingleTriggerStreamingDF(streamingTriggerDF)
 
     // After the first trigger, the calculated input rows should be 10
-    val status = getFirstTriggerStatus(streamingInputDF)
-    assert(status.triggerDetails.get("numRows.input.total") === "10")
-    assert(status.sourceStatuses.size === 1)
-    assert(status.sourceStatuses(0).triggerDetails.get("numRows.input.source") 
=== "10")
+    val progress = getFirstProgress(streamingInputDF)
+    assert(progress.numInputRows === 10)
+    assert(progress.sources.size === 1)
+    assert(progress.sources(0).numInputRows === 10)
   }
 
   testQuietly("StreamExecution metadata garbage collection") {
@@ -285,34 +340,14 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging {
     StreamingExecutionRelation(source)
   }
 
-  /** Returns the query status at the end of the first trigger of streaming DF 
*/
-  private def getFirstTriggerStatus(streamingDF: DataFrame): 
StreamingQueryStatus = {
-    // A StreamingQueryListener that gets the query status after the first 
completed trigger
-    val listener = new StreamingQueryListener {
-      @volatile var firstStatus: StreamingQueryStatus = null
-      @volatile var queryStartedEvent = 0
-      override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
-        queryStartedEvent += 1
-      }
-      override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
-       if (firstStatus == null) firstStatus = queryProgress.queryStatus
-      }
-      override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): 
Unit = { }
-    }
-
+  /** Returns the query progress at the end of the first trigger of streaming 
DF */
+  private def getFirstProgress(streamingDF: DataFrame): StreamingQueryProgress 
= {
     try {
-      spark.streams.addListener(listener)
       val q = 
streamingDF.writeStream.format("memory").queryName("test").start()
       q.processAllAvailable()
-      eventually(timeout(streamingTimeout)) {
-        assert(listener.firstStatus != null)
-        // test if QueryStartedEvent callback is called for only once
-        assert(listener.queryStartedEvent === 1)
-      }
-      listener.firstStatus
+      q.recentProgresses.head
     } finally {
       spark.streams.active.map(_.stop())
-      spark.streams.removeListener(listener)
     }
   }
 
@@ -369,3 +404,8 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging {
     }
   }
 }
+
+object StreamingQuerySuite {
+  // Singleton reference to clock that does not get serialized in task closures
+  var clock: ManualClock = null
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/c3d08e2f/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
index 3e9488c..12f3c3e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/WatermarkSuite.scala
@@ -51,6 +51,7 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter 
with Logging {
 
 
   test("watermark metric") {
+
     val inputData = MemoryStream[Int]
 
     val windowedAggregation = inputData.toDF()
@@ -62,16 +63,19 @@ class WatermarkSuite extends StreamTest with BeforeAndAfter 
with Logging {
 
     testStream(windowedAggregation)(
       AddData(inputData, 15),
-      AssertOnLastQueryStatus { status =>
-        status.triggerDetails.get(StreamMetrics.EVENT_TIME_WATERMARK) === 
"5000"
+      CheckAnswer(),
+      AssertOnQuery { query =>
+        query.lastProgress.currentWatermark === 5000
       },
       AddData(inputData, 15),
-      AssertOnLastQueryStatus { status =>
-        status.triggerDetails.get(StreamMetrics.EVENT_TIME_WATERMARK) === 
"5000"
+      CheckAnswer(),
+      AssertOnQuery { query =>
+        query.lastProgress.currentWatermark === 5000
       },
       AddData(inputData, 25),
-      AssertOnLastQueryStatus { status =>
-        status.triggerDetails.get(StreamMetrics.EVENT_TIME_WATERMARK) === 
"15000"
+      CheckAnswer(),
+      AssertOnQuery { query =>
+        query.lastProgress.currentWatermark === 15000
       }
     )
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to