[SPARK-18516][SQL] Split state and progress in streaming

This PR separates the status of a `StreamingQuery` into two separate APIs:
 - `status` - describes the status of a `StreamingQuery` at this moment, 
including what phase of processing is currently happening and if data is 
available.
 - `recentProgress` - an array of statistics about the most recent microbatches 
that have executed.

A recent progress contains the following information:
```
{
  "id" : "2be8670a-fce1-4859-a530-748f29553bb6",
  "name" : "query-29",
  "timestamp" : 1479705392724,
  "inputRowsPerSecond" : 230.76923076923077,
  "processedRowsPerSecond" : 10.869565217391303,
  "durationMs" : {
    "triggerExecution" : 276,
    "queryPlanning" : 3,
    "getBatch" : 5,
    "getOffset" : 3,
    "addBatch" : 234,
    "walCommit" : 30
  },
  "currentWatermark" : 0,
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "KafkaSource[Subscribe[topic-14]]",
    "startOffset" : {
      "topic-14" : {
        "2" : 0,
        "4" : 1,
        "1" : 0,
        "3" : 0,
        "0" : 0
      }
    },
    "endOffset" : {
      "topic-14" : {
        "2" : 1,
        "4" : 2,
        "1" : 0,
        "3" : 0,
        "0" : 1
      }
    },
    "numRecords" : 3,
    "inputRowsPerSecond" : 230.76923076923077,
    "processedRowsPerSecond" : 10.869565217391303
  } ]
}
```

Additionally, in order to make it possible to correlate progress updates across 
restarts, we change the `id` field from an integer that is unique with in the 
JVM to a `UUID` that is globally unique.

Author: Tathagata Das <tathagata.das1...@gmail.com>
Author: Michael Armbrust <mich...@databricks.com>

Closes #15954 from marmbrus/queryProgress.

(cherry picked from commit c3d08e2f29baeebe09bf4c059ace4336af9116b5)
Signed-off-by: Michael Armbrust <mich...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/28b57c8a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/28b57c8a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/28b57c8a

Branch: refs/heads/branch-2.1
Commit: 28b57c8a124fe55501c4ca4b91320851ace5d735
Parents: 045ae29
Author: Tathagata Das <tathagata.das1...@gmail.com>
Authored: Tue Nov 29 17:24:17 2016 -0800
Committer: Michael Armbrust <mich...@databricks.com>
Committed: Tue Nov 29 17:24:37 2016 -0800

----------------------------------------------------------------------
 .../spark/sql/kafka010/KafkaSourceSuite.scala   |   7 +-
 project/MimaExcludes.scala                      |  11 +
 python/pyspark/sql/streaming.py                 | 326 ++-----------------
 python/pyspark/sql/tests.py                     |  22 ++
 .../execution/streaming/MetricsReporter.scala   |  53 +++
 .../execution/streaming/ProgressReporter.scala  | 234 +++++++++++++
 .../execution/streaming/StreamExecution.scala   | 282 ++++------------
 .../sql/execution/streaming/StreamMetrics.scala | 243 --------------
 .../org/apache/spark/sql/internal/SQLConf.scala |   8 +
 .../apache/spark/sql/streaming/SinkStatus.scala |  66 ----
 .../spark/sql/streaming/SourceStatus.scala      |  95 ------
 .../spark/sql/streaming/StreamingQuery.scala    |  33 +-
 .../sql/streaming/StreamingQueryException.scala |   2 +-
 .../sql/streaming/StreamingQueryListener.scala  |  24 +-
 .../sql/streaming/StreamingQueryManager.scala   |  27 +-
 .../sql/streaming/StreamingQueryStatus.scala    | 151 +--------
 .../apache/spark/sql/streaming/progress.scala   | 193 +++++++++++
 .../streaming/StreamMetricsSuite.scala          | 213 ------------
 .../sql/streaming/FileStreamSourceSuite.scala   |  10 +-
 .../apache/spark/sql/streaming/StreamTest.scala |  73 +----
 .../streaming/StreamingQueryListenerSuite.scala | 267 ++++++++-------
 .../streaming/StreamingQueryManagerSuite.scala  |   2 +-
 .../streaming/StreamingQueryProgressSuite.scala |  98 ++++++
 .../streaming/StreamingQueryStatusSuite.scala   | 123 -------
 .../sql/streaming/StreamingQuerySuite.scala     | 260 ++++++++-------
 .../spark/sql/streaming/WatermarkSuite.scala    |  16 +-
 26 files changed, 1087 insertions(+), 1752 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
index e1af14f..2d6ccb2 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
@@ -442,12 +442,13 @@ class KafkaSourceSuite extends KafkaSourceTest {
 
     val mapped = kafka.map(kv => kv._2.toInt + 1)
     testStream(mapped)(
+      StartStream(trigger = ProcessingTime(1)),
       makeSureGetOffsetCalled,
       AddKafkaData(Set(topic), 1, 2, 3),
       CheckAnswer(2, 3, 4),
-      AssertOnLastQueryStatus { status =>
-        assert(status.triggerDetails.get("numRows.input.total").toInt > 0)
-        assert(status.sourceStatuses(0).processingRate > 0.0)
+      AssertOnQuery { query =>
+        val recordsRead = query.recentProgresses.map(_.numInputRows).sum
+        recordsRead == 3
       }
     )
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 03c9fcc..9739164 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -78,6 +78,17 @@ object MimaExcludes {
       
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryTerminated"),
       
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQueryListener.onQueryTerminated"),
 
+      // [SPARK-18516][SQL] Split state and progress in streaming
+      
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.SourceStatus"),
+      
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.SinkStatus"),
+      
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.sinkStatus"),
+      
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.sourceStatuses"),
+      
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"),
+      
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.lastProgress"),
+      
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.recentProgresses"),
+      
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.streaming.StreamingQuery.id"),
+      
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.sql.streaming.StreamingQueryManager.get"),
+
       // [SPARK-17338][SQL] add global temp view
       
ProblemFilters.exclude[ReversedMissingMethodProblem]("org.apache.spark.sql.catalog.Catalog.dropGlobalTempView"),
       
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.sql.catalog.Catalog.dropTempView"),

http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/python/pyspark/sql/streaming.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 9c3a237..c420b0d 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -16,6 +16,8 @@
 #
 
 import sys
+import json
+
 if sys.version >= '3':
     intlike = int
     basestring = unicode = str
@@ -48,10 +50,9 @@ class StreamingQuery(object):
     @property
     @since(2.0)
     def id(self):
-        """The id of the streaming query. This id is unique across all queries 
that have been
-        started in the current process.
+        """The id of the streaming query.
         """
-        return self._jsq.id()
+        return self._jsq.id().toString()
 
     @property
     @since(2.0)
@@ -87,6 +88,24 @@ class StreamingQuery(object):
         else:
             return self._jsq.awaitTermination()
 
+    @property
+    @since(2.1)
+    def recentProgresses(self):
+        """Returns an array of the most recent [[StreamingQueryProgress]] 
updates for this query.
+        The number of progress updates retained for each stream is configured 
by Spark session
+        configuration `spark.sql.streaming.numRecentProgresses`.
+        """
+        return [json.loads(p.json()) for p in self._jsq.recentProgresses()]
+
+    @property
+    @since(2.1)
+    def lastProgress(self):
+        """
+        Returns the most recent :class:`StreamingQueryProgress` update of this 
streaming query.
+        :return: a map
+        """
+        return json.loads(self._jsq.lastProgress().json())
+
     @since(2.0)
     def processAllAvailable(self):
         """Blocks until all available data in the source has been processed 
and committed to the
@@ -149,8 +168,6 @@ class StreamingQueryManager(object):
         True
         >>> sq.stop()
         """
-        if not isinstance(id, intlike):
-            raise ValueError("The id for the query must be an integer. Got: 
%s" % id)
         return StreamingQuery(self._jsqm.get(id))
 
     @since(2.0)
@@ -191,303 +208,6 @@ class StreamingQueryManager(object):
         self._jsqm.resetTerminated()
 
 
-class StreamingQueryStatus(object):
-    """A class used to report information about the progress of a 
StreamingQuery.
-
-    .. note:: Experimental
-
-    .. versionadded:: 2.1
-    """
-
-    def __init__(self, jsqs):
-        self._jsqs = jsqs
-
-    def __str__(self):
-        """
-        Pretty string of this query status.
-
-        >>> print(sqs)
-        Status of query 'query'
-            Query id: 1
-            Status timestamp: 123
-            Input rate: 15.5 rows/sec
-            Processing rate 23.5 rows/sec
-            Latency: 345.0 ms
-            Trigger details:
-                batchId: 5
-                isDataPresentInTrigger: true
-                isTriggerActive: true
-                latency.getBatch.total: 20
-                latency.getOffset.total: 10
-                numRows.input.total: 100
-            Source statuses [1 source]:
-                Source 1 - MySource1
-                    Available offset: 0
-                    Input rate: 15.5 rows/sec
-                    Processing rate: 23.5 rows/sec
-                    Trigger details:
-                        numRows.input.source: 100
-                        latency.getOffset.source: 10
-                        latency.getBatch.source: 20
-            Sink status - MySink
-                Committed offsets: [1, -]
-        """
-        return self._jsqs.toString()
-
-    @property
-    @ignore_unicode_prefix
-    @since(2.1)
-    def name(self):
-        """
-        Name of the query. This name is unique across all active queries.
-
-        >>> sqs.name
-        u'query'
-        """
-        return self._jsqs.name()
-
-    @property
-    @since(2.1)
-    def id(self):
-        """
-        Id of the query. This id is unique across all queries that have been 
started in
-        the current process.
-
-        >>> int(sqs.id)
-        1
-        """
-        return self._jsqs.id()
-
-    @property
-    @since(2.1)
-    def timestamp(self):
-        """
-        Timestamp (ms) of when this query was generated.
-
-        >>> int(sqs.timestamp)
-        123
-        """
-        return self._jsqs.timestamp()
-
-    @property
-    @since(2.1)
-    def inputRate(self):
-        """
-        Current total rate (rows/sec) at which data is being generated by all 
the sources.
-
-        >>> sqs.inputRate
-        15.5
-        """
-        return self._jsqs.inputRate()
-
-    @property
-    @since(2.1)
-    def processingRate(self):
-        """
-        Current rate (rows/sec) at which the query is processing data from all 
the sources.
-
-        >>> sqs.processingRate
-        23.5
-        """
-        return self._jsqs.processingRate()
-
-    @property
-    @since(2.1)
-    def latency(self):
-        """
-        Current average latency between the data being available in source and 
the sink
-        writing the corresponding output.
-
-        >>> sqs.latency
-        345.0
-        """
-        if (self._jsqs.latency().nonEmpty()):
-            return self._jsqs.latency().get()
-        else:
-            return None
-
-    @property
-    @ignore_unicode_prefix
-    @since(2.1)
-    def sourceStatuses(self):
-        """
-        Current statuses of the sources as a list.
-
-        >>> len(sqs.sourceStatuses)
-        1
-        >>> sqs.sourceStatuses[0].description
-        u'MySource1'
-        """
-        return [SourceStatus(ss) for ss in self._jsqs.sourceStatuses()]
-
-    @property
-    @ignore_unicode_prefix
-    @since(2.1)
-    def sinkStatus(self):
-        """
-        Current status of the sink.
-
-        >>> sqs.sinkStatus.description
-        u'MySink'
-        """
-        return SinkStatus(self._jsqs.sinkStatus())
-
-    @property
-    @ignore_unicode_prefix
-    @since(2.1)
-    def triggerDetails(self):
-        """
-        Low-level details of the currently active trigger (e.g. number of rows 
processed
-        in trigger, latency of intermediate steps, etc.).
-
-        If no trigger is currently active, then it will have details of the 
last completed trigger.
-
-        >>> sqs.triggerDetails
-        {u'latency.getBatch.total': u'20', u'numRows.input.total': u'100',
-        u'isTriggerActive': u'true', u'batchId': u'5', 
u'latency.getOffset.total': u'10',
-        u'isDataPresentInTrigger': u'true'}
-        """
-        return self._jsqs.triggerDetails()
-
-
-class SourceStatus(object):
-    """
-    Status and metrics of a streaming Source.
-
-    .. note:: Experimental
-
-    .. versionadded:: 2.1
-    """
-
-    def __init__(self, jss):
-        self._jss = jss
-
-    def __str__(self):
-        """
-        Pretty string of this source status.
-
-        >>> print(sqs.sourceStatuses[0])
-        Status of source MySource1
-            Available offset: 0
-            Input rate: 15.5 rows/sec
-            Processing rate: 23.5 rows/sec
-            Trigger details:
-                numRows.input.source: 100
-                latency.getOffset.source: 10
-                latency.getBatch.source: 20
-        """
-        return self._jss.toString()
-
-    @property
-    @ignore_unicode_prefix
-    @since(2.1)
-    def description(self):
-        """
-        Description of the source corresponding to this status.
-
-        >>> sqs.sourceStatuses[0].description
-        u'MySource1'
-        """
-        return self._jss.description()
-
-    @property
-    @ignore_unicode_prefix
-    @since(2.1)
-    def offsetDesc(self):
-        """
-        Description of the current offset if known.
-
-        >>> sqs.sourceStatuses[0].offsetDesc
-        u'0'
-        """
-        return self._jss.offsetDesc()
-
-    @property
-    @since(2.1)
-    def inputRate(self):
-        """
-        Current rate (rows/sec) at which data is being generated by the source.
-
-        >>> sqs.sourceStatuses[0].inputRate
-        15.5
-        """
-        return self._jss.inputRate()
-
-    @property
-    @since(2.1)
-    def processingRate(self):
-        """
-        Current rate (rows/sec) at which the query is processing data from the 
source.
-
-        >>> sqs.sourceStatuses[0].processingRate
-        23.5
-        """
-        return self._jss.processingRate()
-
-    @property
-    @ignore_unicode_prefix
-    @since(2.1)
-    def triggerDetails(self):
-        """
-        Low-level details of the currently active trigger (e.g. number of rows 
processed
-        in trigger, latency of intermediate steps, etc.).
-
-        If no trigger is currently active, then it will have details of the 
last completed trigger.
-
-        >>> sqs.sourceStatuses[0].triggerDetails
-        {u'numRows.input.source': u'100', u'latency.getOffset.source': u'10',
-        u'latency.getBatch.source': u'20'}
-       """
-        return self._jss.triggerDetails()
-
-
-class SinkStatus(object):
-    """
-    Status and metrics of a streaming Sink.
-
-    .. note:: Experimental
-
-    .. versionadded:: 2.1
-    """
-
-    def __init__(self, jss):
-        self._jss = jss
-
-    def __str__(self):
-        """
-        Pretty string of this source status.
-
-        >>> print(sqs.sinkStatus)
-        Status of sink MySink
-            Committed offsets: [1, -]
-        """
-        return self._jss.toString()
-
-    @property
-    @ignore_unicode_prefix
-    @since(2.1)
-    def description(self):
-        """
-        Description of the source corresponding to this status.
-
-        >>> sqs.sinkStatus.description
-        u'MySink'
-        """
-        return self._jss.description()
-
-    @property
-    @ignore_unicode_prefix
-    @since(2.1)
-    def offsetDesc(self):
-        """
-        Description of the current offsets up to which data has been written 
by the sink.
-
-        >>> sqs.sinkStatus.offsetDesc
-        u'[1, -]'
-        """
-        return self._jss.offsetDesc()
-
-
 class Trigger(object):
     """Used to indicate how often results should be produced by a 
:class:`StreamingQuery`.
 
@@ -1053,8 +773,6 @@ def _test():
     globs['sdf_schema'] = StructType([StructField("data", StringType(), 
False)])
     globs['df'] = \
         
globs['spark'].readStream.format('text').load('python/test_support/sql/streaming')
-    globs['sqs'] = StreamingQueryStatus(
-        
spark.sparkContext._jvm.org.apache.spark.sql.streaming.StreamingQueryStatus.testStatus())
 
     (failure_count, test_count) = doctest.testmod(
         pyspark.sql.streaming, globs=globs,

http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/python/pyspark/sql/tests.py
----------------------------------------------------------------------
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 3d46b85..7151f95 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -1082,6 +1082,28 @@ class SQLTests(ReusedPySparkTestCase):
             q.stop()
             shutil.rmtree(tmpPath)
 
+    def test_stream_status_and_progress(self):
+        df = 
self.spark.readStream.format('text').load('python/test_support/sql/streaming')
+        for q in self.spark._wrapped.streams.active:
+            q.stop()
+        tmpPath = tempfile.mkdtemp()
+        shutil.rmtree(tmpPath)
+        self.assertTrue(df.isStreaming)
+        out = os.path.join(tmpPath, 'out')
+        chk = os.path.join(tmpPath, 'chk')
+        q = df.writeStream \
+            .start(path=out, format='parquet', queryName='this_query', 
checkpointLocation=chk)
+        try:
+            q.processAllAvailable()
+            lastProgress = q.lastProgress
+            recentProgresses = q.recentProgresses
+            self.assertEqual(lastProgress['name'], q.name)
+            self.assertEqual(lastProgress['id'], q.id)
+            self.assertTrue(any(p == lastProgress for p in recentProgresses))
+        finally:
+            q.stop()
+            shutil.rmtree(tmpPath)
+
     def test_stream_await_termination(self):
         df = 
self.spark.readStream.format('text').load('python/test_support/sql/streaming')
         for q in self.spark._wrapped.streams.active:

http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
new file mode 100644
index 0000000..5551d12
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetricsReporter.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.{util => ju}
+
+import scala.collection.mutable
+
+import com.codahale.metrics.{Gauge, MetricRegistry}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.source.{Source => CodahaleSource}
+import org.apache.spark.util.Clock
+
+/**
+ * Serves metrics from a [[org.apache.spark.sql.streaming.StreamingQuery]] to
+ * Codahale/DropWizard metrics
+ */
+class MetricsReporter(
+    stream: StreamExecution,
+    override val sourceName: String) extends CodahaleSource with Logging {
+
+  override val metricRegistry: MetricRegistry = new MetricRegistry
+
+  // Metric names should not have . in them, so that all the metrics of a 
query are identified
+  // together in Ganglia as a single metric group
+  registerGauge("inputRate-total", () => 
stream.lastProgress.inputRowsPerSecond)
+  registerGauge("processingRate-total", () => 
stream.lastProgress.inputRowsPerSecond)
+  registerGauge("latency", () => 
stream.lastProgress.durationMs.get("triggerExecution").longValue())
+
+  private def registerGauge[T](name: String, f: () => T)(implicit num: 
Numeric[T]): Unit = {
+    synchronized {
+      metricRegistry.register(name, new Gauge[T] {
+        override def getValue: T = f()
+      })
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
new file mode 100644
index 0000000..b7b6e19
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming
+
+import java.util.UUID
+
+import scala.collection.mutable
+import scala.collection.JavaConverters._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, SparkSession}
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.streaming._
+import org.apache.spark.util.Clock
+
+/**
+ * Responsible for continually reporting statistics about the amount of data 
processed as well
+ * as latency for a streaming query.  This trait is designed to be mixed into 
the
+ * [[StreamExecution]], who is responsible for calling `startTrigger` and 
`finishTrigger`
+ * at the appropriate times. Additionally, the status can updated with 
`updateStatusMessage` to
+ * allow reporting on the streams current state (i.e. "Fetching more data").
+ */
+trait ProgressReporter extends Logging {
+
+  case class ExecutionStats(
+    inputRows: Map[Source, Long], stateOperators: Seq[StateOperatorProgress])
+
+  // Internal state of the stream, required for computing metrics.
+  protected def id: UUID
+  protected def name: String
+  protected def triggerClock: Clock
+  protected def logicalPlan: LogicalPlan
+  protected def lastExecution: QueryExecution
+  protected def newData: Map[Source, DataFrame]
+  protected def availableOffsets: StreamProgress
+  protected def committedOffsets: StreamProgress
+  protected def sources: Seq[Source]
+  protected def sink: Sink
+  protected def streamExecutionMetadata: StreamExecutionMetadata
+  protected def currentBatchId: Long
+  protected def sparkSession: SparkSession
+
+  // Local timestamps and counters.
+  private var currentTriggerStartTimestamp = -1L
+  private var currentTriggerEndTimestamp = -1L
+  // TODO: Restore this from the checkpoint when possible.
+  private var lastTriggerStartTimestamp = -1L
+  private val currentDurationsMs = new mutable.HashMap[String, Long]()
+
+  /** Flag that signals whether any error with input metrics have already been 
logged */
+  private var metricWarningLogged: Boolean = false
+
+  /** Holds the most recent query progress updates.  Accesses must lock on the 
queue itself. */
+  private val progressBuffer = new mutable.Queue[StreamingQueryProgress]()
+
+  @volatile
+  protected var currentStatus: StreamingQueryStatus =
+    StreamingQueryStatus(
+      message = "Initializing StreamExecution",
+      isDataAvailable = false,
+      isTriggerActive = false)
+
+  /** Returns the current status of the query. */
+  def status: StreamingQueryStatus = currentStatus
+
+  /** Returns an array containing the most recent query progress updates. */
+  def recentProgresses: Array[StreamingQueryProgress] = 
progressBuffer.synchronized {
+    progressBuffer.toArray
+  }
+
+  /** Returns the most recent query progress update. */
+  def lastProgress: StreamingQueryProgress = progressBuffer.synchronized {
+    progressBuffer.last
+  }
+
+  /** Begins recording statistics about query progress for a given trigger. */
+  protected def startTrigger(): Unit = {
+    logDebug("Starting Trigger Calculation")
+    lastTriggerStartTimestamp = currentTriggerStartTimestamp
+    currentTriggerStartTimestamp = triggerClock.getTimeMillis()
+    currentStatus = currentStatus.copy(isTriggerActive = true)
+    currentDurationsMs.clear()
+  }
+
+  /** Finalizes the query progress and adds it to list of recent status 
updates. */
+  protected def finishTrigger(hasNewData: Boolean): Unit = {
+    currentTriggerEndTimestamp = triggerClock.getTimeMillis()
+
+    val executionStats: ExecutionStats = if (!hasNewData) {
+      ExecutionStats(Map.empty, Seq.empty)
+    } else {
+      extractExecutionStats
+    }
+
+    val processingTimeSec =
+      (currentTriggerEndTimestamp - currentTriggerStartTimestamp).toDouble / 
1000
+
+    val inputTimeSec = if (lastTriggerStartTimestamp >= 0) {
+      (currentTriggerStartTimestamp - lastTriggerStartTimestamp).toDouble / 
1000
+    } else {
+      Double.NaN
+    }
+    logDebug(s"Execution stats: $executionStats")
+
+    val sourceProgress = sources.map { source =>
+      val numRecords = executionStats.inputRows.getOrElse(source, 0L)
+      new SourceProgress(
+        description = source.toString,
+        startOffset = committedOffsets.get(source).map(_.json).orNull,
+        endOffset = availableOffsets.get(source).map(_.json).orNull,
+        numInputRows = numRecords,
+        inputRowsPerSecond = numRecords / inputTimeSec,
+        processedRowsPerSecond = numRecords / processingTimeSec
+      )
+    }
+    val sinkProgress = new SinkProgress(sink.toString)
+
+    val newProgress = new StreamingQueryProgress(
+      id = id,
+      name = name,
+      timestamp = currentTriggerStartTimestamp,
+      batchId = currentBatchId,
+      durationMs = currentDurationsMs.toMap.mapValues(long2Long).asJava,
+      currentWatermark = streamExecutionMetadata.batchWatermarkMs,
+      stateOperators = executionStats.stateOperators.toArray,
+      sources = sourceProgress.toArray,
+      sink = sinkProgress)
+
+    progressBuffer.synchronized {
+      progressBuffer += newProgress
+      while (progressBuffer.length >= 
sparkSession.sqlContext.conf.streamingProgressRetention) {
+        progressBuffer.dequeue()
+      }
+    }
+
+    logInfo(s"Streaming query made progress: $newProgress")
+    currentStatus = currentStatus.copy(isTriggerActive = false)
+  }
+
+  /** Extracts statistics from the most recent query execution. */
+  private def extractExecutionStats: ExecutionStats = {
+    // We want to associate execution plan leaves to sources that generate 
them, so that we match
+    // the their metrics (e.g. numOutputRows) to the sources. To do this we do 
the following.
+    // Consider the translation from the streaming logical plan to the final 
executed plan.
+    //
+    //  streaming logical plan (with sources) <==> trigger's logical plan <==> 
executed plan
+    //
+    // 1. We keep track of streaming sources associated with each leaf in the 
trigger's logical plan
+    //    - Each logical plan leaf will be associated with a single streaming 
source.
+    //    - There can be multiple logical plan leaves associated with a 
streaming source.
+    //    - There can be leaves not associated with any streaming source, 
because they were
+    //      generated from a batch source (e.g. stream-batch joins)
+    //
+    // 2. Assuming that the executed plan has same number of leaves in the 
same order as that of
+    //    the trigger logical plan, we associate executed plan leaves with 
corresponding
+    //    streaming sources.
+    //
+    // 3. For each source, we sum the metrics of the associated execution plan 
leaves.
+    //
+    val logicalPlanLeafToSource = newData.flatMap { case (source, df) =>
+      df.logicalPlan.collectLeaves().map { leaf => leaf -> source }
+    }
+    val allLogicalPlanLeaves = lastExecution.logical.collectLeaves() // 
includes non-streaming
+    val allExecPlanLeaves = lastExecution.executedPlan.collectLeaves()
+    val numInputRows: Map[Source, Long] =
+      if (allLogicalPlanLeaves.size == allExecPlanLeaves.size) {
+        val execLeafToSource = 
allLogicalPlanLeaves.zip(allExecPlanLeaves).flatMap {
+          case (lp, ep) => logicalPlanLeafToSource.get(lp).map { source => ep 
-> source }
+        }
+        val sourceToNumInputRows = execLeafToSource.map { case (execLeaf, 
source) =>
+          val numRows = 
execLeaf.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
+          source -> numRows
+        }
+        sourceToNumInputRows.groupBy(_._1).mapValues(_.map(_._2).sum) // sum 
up rows for each source
+      } else {
+        if (!metricWarningLogged) {
+          def toString[T](seq: Seq[T]): String = s"(size = ${seq.size}), 
${seq.mkString(", ")}"
+          logWarning(
+            "Could not report metrics as number leaves in trigger logical plan 
did not match that" +
+                s" of the execution plan:\n" +
+                s"logical plan leaves: ${toString(allLogicalPlanLeaves)}\n" +
+                s"execution plan leaves: ${toString(allExecPlanLeaves)}\n")
+          metricWarningLogged = true
+        }
+        Map.empty
+      }
+
+    // Extract statistics about stateful operators in the query plan.
+    val stateNodes = lastExecution.executedPlan.collect {
+      case p if p.isInstanceOf[StateStoreSaveExec] => p
+    }
+    val stateOperators = stateNodes.map { node =>
+      new StateOperatorProgress(
+        numRowsTotal = 
node.metrics.get("numTotalStateRows").map(_.value).getOrElse(0L),
+        numRowsUpdated = 
node.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L))
+    }
+
+    ExecutionStats(numInputRows, stateOperators)
+  }
+
+  /** Records the duration of running `body` for the next query progress 
update. */
+  protected def reportTimeTaken[T](triggerDetailKey: String)(body: => T): T = {
+    val startTime = triggerClock.getTimeMillis()
+    val result = body
+    val endTime = triggerClock.getTimeMillis()
+    val timeTaken = math.max(endTime - startTime, 0)
+
+    val previousTime = currentDurationsMs.getOrElse(triggerDetailKey, 0L)
+    currentDurationsMs.put(triggerDetailKey, previousTime + timeTaken)
+    logDebug(s"$triggerDetailKey took $timeTaken ms")
+    result
+  }
+
+  /** Updates the message returned in `status`. */
+  protected def updateStatusMessage(message: String): Unit = {
+    currentStatus = currentStatus.copy(message = message)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index 21664d7..e4f31af 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -17,8 +17,8 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.util.UUID
 import java.util.concurrent.{CountDownLatch, TimeUnit}
-import java.util.concurrent.atomic.AtomicLong
 import java.util.concurrent.locks.ReentrantLock
 
 import scala.collection.mutable.ArrayBuffer
@@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, 
CurrentBatchTimestamp, CurrentDate, CurrentTimestamp}
 import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, LogicalPlan}
 import org.apache.spark.sql.catalyst.util._
-import org.apache.spark.sql.execution.{QueryExecution, SparkPlan}
+import org.apache.spark.sql.execution.QueryExecution
 import org.apache.spark.sql.execution.command.ExplainCommand
 import org.apache.spark.sql.streaming._
 import org.apache.spark.util.{Clock, UninterruptibleThread, Utils}
@@ -47,7 +47,6 @@ import org.apache.spark.util.{Clock, UninterruptibleThread, 
Utils}
  */
 class StreamExecution(
     override val sparkSession: SparkSession,
-    override val id: Long,
     override val name: String,
     checkpointRoot: String,
     val logicalPlan: LogicalPlan,
@@ -55,10 +54,12 @@ class StreamExecution(
     val trigger: Trigger,
     val triggerClock: Clock,
     val outputMode: OutputMode)
-  extends StreamingQuery with Logging {
+  extends StreamingQuery with ProgressReporter with Logging {
 
   import org.apache.spark.sql.streaming.StreamingQueryListener._
-  import StreamMetrics._
+
+  // TODO: restore this from the checkpoint directory.
+  override val id: UUID = UUID.randomUUID()
 
   private val pollingDelayMs = 
sparkSession.sessionState.conf.streamingPollingDelay
 
@@ -89,16 +90,16 @@ class StreamExecution(
    * once, since the field's value may change at any time.
    */
   @volatile
-  private var availableOffsets = new StreamProgress
+  protected var availableOffsets = new StreamProgress
 
   /** The current batchId or -1 if execution has not yet been initialized. */
-  private var currentBatchId: Long = -1
+  protected var currentBatchId: Long = -1
 
   /** Stream execution metadata */
-  private var streamExecutionMetadata = StreamExecutionMetadata()
+  protected var streamExecutionMetadata = StreamExecutionMetadata()
 
   /** All stream sources present in the query plan. */
-  private val sources =
+  protected val sources =
     logicalPlan.collect { case s: StreamingExecutionRelation => s.source }
 
   /** A list of unique sources in the query plan. */
@@ -113,7 +114,10 @@ class StreamExecution(
   private var state: State = INITIALIZED
 
   @volatile
-  var lastExecution: QueryExecution = null
+  var lastExecution: QueryExecution = _
+
+  /** Holds the most recent input data for each source. */
+  protected var newData: Map[Source, DataFrame] = _
 
   @volatile
   private var streamDeathCause: StreamingQueryException = null
@@ -121,16 +125,8 @@ class StreamExecution(
   /* Get the call site in the caller thread; will pass this into the micro 
batch thread */
   private val callSite = Utils.getCallSite()
 
-  /** Metrics for this query */
-  private val streamMetrics =
-    new StreamMetrics(uniqueSources.toSet, triggerClock, 
s"StructuredStreaming.$name")
-
-  @volatile
-  private var currentStatus: StreamingQueryStatus = null
-
-  /** Flag that signals whether any error with input metrics have already been 
logged */
-  @volatile
-  private var metricWarningLogged: Boolean = false
+  /** Used to report metrics to coda-hale. */
+  lazy val streamMetrics = new MetricsReporter(this, s"spark.streaming.$name")
 
   /**
    * The thread that runs the micro-batches of this stream. Note that this 
thread must be
@@ -158,15 +154,6 @@ class StreamExecution(
   /** Whether the query is currently active or not */
   override def isActive: Boolean = state == ACTIVE
 
-  /** Returns the current status of the query. */
-  override def status: StreamingQueryStatus = currentStatus
-
-  /** Returns current status of all the sources. */
-  override def sourceStatuses: Array[SourceStatus] = 
currentStatus.sourceStatuses.toArray
-
-  /** Returns current status of the sink. */
-  override def sinkStatus: SinkStatus = currentStatus.sinkStatus
-
   /** Returns the [[StreamingQueryException]] if the query was terminated by 
an exception. */
   override def exception: Option[StreamingQueryException] = 
Option(streamDeathCause)
 
@@ -200,8 +187,8 @@ class StreamExecution(
       if (sparkSession.sessionState.conf.streamingMetricsEnabled) {
         
sparkSession.sparkContext.env.metricsSystem.registerSource(streamMetrics)
       }
-      updateStatus()
-      postEvent(new QueryStartedEvent(currentStatus)) // Assumption: Does not 
throw exception.
+
+      postEvent(new QueryStartedEvent(id, name)) // Assumption: Does not throw 
exception.
 
       // Unblock starting thread
       startLatch.countDown()
@@ -210,40 +197,45 @@ class StreamExecution(
       SparkSession.setActiveSession(sparkSession)
 
       triggerExecutor.execute(() => {
-        streamMetrics.reportTriggerStarted(currentBatchId)
-        streamMetrics.reportTriggerDetail(STATUS_MESSAGE, "Finding new data 
from sources")
-        updateStatus()
-        val isTerminated = reportTimeTaken(TRIGGER_LATENCY) {
+        startTrigger()
+
+        val isTerminated =
           if (isActive) {
-            if (currentBatchId < 0) {
-              // We'll do this initialization only once
-              populateStartOffsets()
-              logDebug(s"Stream running from $committedOffsets to 
$availableOffsets")
-            } else {
-              constructNextBatch()
+            reportTimeTaken("triggerExecution") {
+              if (currentBatchId < 0) {
+                // We'll do this initialization only once
+                populateStartOffsets()
+                logDebug(s"Stream running from $committedOffsets to 
$availableOffsets")
+              } else {
+                constructNextBatch()
+              }
+              if (dataAvailable) {
+                currentStatus = currentStatus.copy(isDataAvailable = true)
+                updateStatusMessage("Processing new data")
+                runBatch()
+              }
             }
+
+            // Report trigger as finished and construct progress object.
+            finishTrigger(dataAvailable)
+            postEvent(new QueryProgressEvent(lastProgress))
+
             if (dataAvailable) {
-              streamMetrics.reportTriggerDetail(IS_DATA_PRESENT_IN_TRIGGER, 
true)
-              streamMetrics.reportTriggerDetail(STATUS_MESSAGE, "Processing 
new data")
-              updateStatus()
-              runBatch()
               // We'll increase currentBatchId after we complete processing 
current batch's data
               currentBatchId += 1
             } else {
-              streamMetrics.reportTriggerDetail(IS_DATA_PRESENT_IN_TRIGGER, 
false)
-              streamMetrics.reportTriggerDetail(STATUS_MESSAGE, "No new data")
-              updateStatus()
+              currentStatus = currentStatus.copy(isDataAvailable = false)
+              updateStatusMessage("Waiting for data to arrive")
               Thread.sleep(pollingDelayMs)
             }
             true
           } else {
             false
           }
-        }
-        // Update metrics and notify others
-        streamMetrics.reportTriggerFinished()
-        updateStatus()
-        postEvent(new QueryProgressEvent(currentStatus))
+
+        // Update committed offsets.
+        committedOffsets ++= availableOffsets
+        updateStatusMessage("Waiting for next trigger")
         isTerminated
       })
     } catch {
@@ -264,14 +256,12 @@ class StreamExecution(
       state = TERMINATED
 
       // Update metrics and status
-      streamMetrics.stop()
       sparkSession.sparkContext.env.metricsSystem.removeSource(streamMetrics)
-      updateStatus()
 
       // Notify others
       sparkSession.streams.notifyQueryTermination(StreamExecution.this)
       postEvent(
-        new QueryTerminatedEvent(currentStatus, 
exception.map(_.cause).map(Utils.exceptionString)))
+       new QueryTerminatedEvent(id, 
exception.map(_.cause).map(Utils.exceptionString)))
       terminationLatch.countDown()
     }
   }
@@ -328,14 +318,13 @@ class StreamExecution(
     val hasNewData = {
       awaitBatchLock.lock()
       try {
-        reportTimeTaken(GET_OFFSET_LATENCY) {
-          val latestOffsets: Map[Source, Option[Offset]] = uniqueSources.map { 
s =>
-            reportTimeTaken(s, SOURCE_GET_OFFSET_LATENCY) {
-              (s, s.getOffset)
-            }
-          }.toMap
-          availableOffsets ++= latestOffsets.filter { case (s, o) => 
o.nonEmpty }.mapValues(_.get)
-        }
+        val latestOffsets: Map[Source, Option[Offset]] = uniqueSources.map { s 
=>
+          updateStatusMessage(s"Getting offsets from $s")
+          reportTimeTaken("getOffset") {
+            (s, s.getOffset)
+          }
+        }.toMap
+        availableOffsets ++= latestOffsets.filter { case (s, o) => o.nonEmpty 
}.mapValues(_.get)
 
         if (dataAvailable) {
           true
@@ -350,8 +339,10 @@ class StreamExecution(
     if (hasNewData) {
       // Current batch timestamp in milliseconds
       streamExecutionMetadata.batchTimestampMs = triggerClock.getTimeMillis()
-      reportTimeTaken(OFFSET_WAL_WRITE_LATENCY) {
-        assert(offsetLog.add(currentBatchId,
+      updateStatusMessage("Writing offsets to log")
+      reportTimeTaken("walCommit") {
+        assert(offsetLog.add(
+          currentBatchId,
           availableOffsets.toOffsetSeq(sources, streamExecutionMetadata.json)),
           s"Concurrent update to the log. Multiple streaming jobs detected for 
$currentBatchId")
         logInfo(s"Committed offsets for batch $currentBatchId. " +
@@ -384,30 +375,24 @@ class StreamExecution(
         awaitBatchLock.unlock()
       }
     }
-    reportTimestamp(GET_OFFSET_TIMESTAMP)
   }
 
   /**
    * Processes any data available between `availableOffsets` and 
`committedOffsets`.
    */
   private def runBatch(): Unit = {
-    // TODO: Move this to IncrementalExecution.
-
     // Request unprocessed data from all sources.
-    val newData = reportTimeTaken(GET_BATCH_LATENCY) {
+    newData = reportTimeTaken("getBatch") {
       availableOffsets.flatMap {
         case (source, available)
           if committedOffsets.get(source).map(_ != available).getOrElse(true) 
=>
           val current = committedOffsets.get(source)
-          val batch = reportTimeTaken(source, SOURCE_GET_BATCH_LATENCY) {
-            source.getBatch(current, available)
-          }
+          val batch = source.getBatch(current, available)
           logDebug(s"Retrieving data from $source: $current -> $available")
           Some(source -> batch)
         case _ => None
       }
     }
-    reportTimestamp(GET_BATCH_TIMESTAMP)
 
     // A list of attributes that will need to be updated.
     var replacements = new ArrayBuffer[(Attribute, Attribute)]
@@ -438,7 +423,7 @@ class StreamExecution(
           cd.dataType)
     }
 
-    val executedPlan = reportTimeTaken(OPTIMIZER_LATENCY) {
+    val executedPlan = reportTimeTaken("queryPlanning") {
       lastExecution = new IncrementalExecution(
         sparkSession,
         triggerLogicalPlan,
@@ -451,11 +436,12 @@ class StreamExecution(
 
     val nextBatch =
       new Dataset(sparkSession, lastExecution, 
RowEncoder(lastExecution.analyzed.schema))
-    sink.addBatch(currentBatchId, nextBatch)
-    reportNumRows(executedPlan, triggerLogicalPlan, newData)
+
+    reportTimeTaken("addBatch") {
+      sink.addBatch(currentBatchId, nextBatch)
+    }
 
     // Update the eventTime watermark if we find one in the plan.
-    // TODO: Does this need to be an AttributeMap?
     lastExecution.executedPlan.collect {
       case e: EventTimeWatermarkExec =>
         logTrace(s"Maximum observed eventTime: ${e.maxEventTime.value}")
@@ -468,10 +454,6 @@ class StreamExecution(
         logTrace(s"Event time didn't move: $newWatermark < " +
           s"$streamExecutionMetadata.currentEventTimeWatermark")
       }
-
-      if (newWatermark != 0) {
-        streamMetrics.reportTriggerDetail(EVENT_TIME_WATERMARK, newWatermark)
-      }
     }
 
     awaitBatchLock.lock()
@@ -481,9 +463,6 @@ class StreamExecution(
     } finally {
       awaitBatchLock.unlock()
     }
-
-    // Update committed offsets.
-    committedOffsets ++= availableOffsets
   }
 
   private def postEvent(event: StreamingQueryListener.Event) {
@@ -616,145 +595,12 @@ class StreamExecution(
      """.stripMargin
   }
 
-  /**
-   * Report row metrics of the executed trigger
-   * @param triggerExecutionPlan Execution plan of the trigger
-   * @param triggerLogicalPlan Logical plan of the trigger, generated from the 
query logical plan
-   * @param sourceToDF Source to DataFrame returned by the source.getBatch in 
this trigger
-   */
-  private def reportNumRows(
-      triggerExecutionPlan: SparkPlan,
-      triggerLogicalPlan: LogicalPlan,
-      sourceToDF: Map[Source, DataFrame]): Unit = {
-    // We want to associate execution plan leaves to sources that generate 
them, so that we match
-    // the their metrics (e.g. numOutputRows) to the sources. To do this we do 
the following.
-    // Consider the translation from the streaming logical plan to the final 
executed plan.
-    //
-    //  streaming logical plan (with sources) <==> trigger's logical plan <==> 
executed plan
-    //
-    // 1. We keep track of streaming sources associated with each leaf in the 
trigger's logical plan
-    //    - Each logical plan leaf will be associated with a single streaming 
source.
-    //    - There can be multiple logical plan leaves associated with a 
streaming source.
-    //    - There can be leaves not associated with any streaming source, 
because they were
-    //      generated from a batch source (e.g. stream-batch joins)
-    //
-    // 2. Assuming that the executed plan has same number of leaves in the 
same order as that of
-    //    the trigger logical plan, we associate executed plan leaves with 
corresponding
-    //    streaming sources.
-    //
-    // 3. For each source, we sum the metrics of the associated execution plan 
leaves.
-    //
-    val logicalPlanLeafToSource = sourceToDF.flatMap { case (source, df) =>
-      df.logicalPlan.collectLeaves().map { leaf => leaf -> source }
-    }
-    val allLogicalPlanLeaves = triggerLogicalPlan.collectLeaves() // includes 
non-streaming sources
-    val allExecPlanLeaves = triggerExecutionPlan.collectLeaves()
-    val sourceToNumInputRows: Map[Source, Long] =
-      if (allLogicalPlanLeaves.size == allExecPlanLeaves.size) {
-        val execLeafToSource = 
allLogicalPlanLeaves.zip(allExecPlanLeaves).flatMap {
-          case (lp, ep) => logicalPlanLeafToSource.get(lp).map { source => ep 
-> source }
-        }
-        val sourceToNumInputRows = execLeafToSource.map { case (execLeaf, 
source) =>
-          val numRows = 
execLeaf.metrics.get("numOutputRows").map(_.value).getOrElse(0L)
-          source -> numRows
-        }
-        sourceToNumInputRows.groupBy(_._1).mapValues(_.map(_._2).sum) // sum 
up rows for each source
-      } else {
-        if (!metricWarningLogged) {
-          def toString[T](seq: Seq[T]): String = s"(size = ${seq.size}), 
${seq.mkString(", ")}"
-          logWarning(
-            "Could not report metrics as number leaves in trigger logical plan 
did not match that" +
-              s" of the execution plan:\n" +
-              s"logical plan leaves: ${toString(allLogicalPlanLeaves)}\n" +
-              s"execution plan leaves: ${toString(allExecPlanLeaves)}\n")
-          metricWarningLogged = true
-        }
-        Map.empty
-      }
-    val numOutputRows = 
triggerExecutionPlan.metrics.get("numOutputRows").map(_.value)
-    val stateNodes = triggerExecutionPlan.collect {
-      case p if p.isInstanceOf[StateStoreSaveExec] => p
-    }
-
-    streamMetrics.reportNumInputRows(sourceToNumInputRows)
-    stateNodes.zipWithIndex.foreach { case (s, i) =>
-      streamMetrics.reportTriggerDetail(
-        NUM_TOTAL_STATE_ROWS(i + 1),
-        s.metrics.get("numTotalStateRows").map(_.value).getOrElse(0L))
-      streamMetrics.reportTriggerDetail(
-        NUM_UPDATED_STATE_ROWS(i + 1),
-        s.metrics.get("numUpdatedStateRows").map(_.value).getOrElse(0L))
-    }
-    updateStatus()
-  }
-
-  private def reportTimeTaken[T](triggerDetailKey: String)(body: => T): T = {
-    val startTime = triggerClock.getTimeMillis()
-    val result = body
-    val endTime = triggerClock.getTimeMillis()
-    val timeTaken = math.max(endTime - startTime, 0)
-    streamMetrics.reportTriggerDetail(triggerDetailKey, timeTaken)
-    updateStatus()
-    if (triggerDetailKey == TRIGGER_LATENCY) {
-      logInfo(s"Completed up to $availableOffsets in $timeTaken ms")
-    }
-    result
-  }
-
-  private def reportTimeTaken[T](source: Source, triggerDetailKey: 
String)(body: => T): T = {
-    val startTime = triggerClock.getTimeMillis()
-    val result = body
-    val endTime = triggerClock.getTimeMillis()
-    streamMetrics.reportSourceTriggerDetail(
-      source, triggerDetailKey, math.max(endTime - startTime, 0))
-    updateStatus()
-    result
-  }
-
-  private def reportTimestamp(triggerDetailKey: String): Unit = {
-    streamMetrics.reportTriggerDetail(triggerDetailKey, 
triggerClock.getTimeMillis)
-    updateStatus()
-  }
-
-  private def updateStatus(): Unit = {
-    val localAvailableOffsets = availableOffsets
-    val sourceStatuses = sources.map { s =>
-      SourceStatus(
-        s.toString,
-        localAvailableOffsets.get(s).map(_.json).getOrElse("-"),
-        streamMetrics.currentSourceInputRate(s),
-        streamMetrics.currentSourceProcessingRate(s),
-        streamMetrics.currentSourceTriggerDetails(s))
-    }.toArray
-    val sinkStatus = SinkStatus(
-      sink.toString,
-      committedOffsets.toOffsetSeq(sources, 
streamExecutionMetadata.json).toString)
-
-    currentStatus =
-      StreamingQueryStatus(
-        name = name,
-        id = id,
-        timestamp = triggerClock.getTimeMillis(),
-        inputRate = streamMetrics.currentInputRate(),
-        processingRate = streamMetrics.currentProcessingRate(),
-        latency = streamMetrics.currentLatency(),
-        sourceStatuses = sourceStatuses,
-        sinkStatus = sinkStatus,
-        triggerDetails = streamMetrics.currentTriggerDetails())
-  }
-
   trait State
   case object INITIALIZED extends State
   case object ACTIVE extends State
   case object TERMINATED extends State
 }
 
-object StreamExecution {
-  private val _nextId = new AtomicLong(0)
-
-  def nextId: Long = _nextId.getAndIncrement()
-}
-
 /**
  * Contains metadata associated with a stream execution. This information is
  * persisted to the offset log via the OffsetSeq metadata field. Current

http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
deleted file mode 100644
index 942e6ed..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamMetrics.scala
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.streaming
-
-import java.{util => ju}
-
-import scala.collection.mutable
-
-import com.codahale.metrics.{Gauge, MetricRegistry}
-
-import org.apache.spark.internal.Logging
-import org.apache.spark.metrics.source.{Source => CodahaleSource}
-import org.apache.spark.util.Clock
-
-/**
- * Class that manages all the metrics related to a StreamingQuery. It does the 
following.
- * - Calculates metrics (rates, latencies, etc.) based on information reported 
by StreamExecution.
- * - Allows the current metric values to be queried
- * - Serves some of the metrics through Codahale/DropWizard metrics
- *
- * @param sources Unique set of sources in a query
- * @param triggerClock Clock used for triggering in StreamExecution
- * @param codahaleSourceName Root name for all the Codahale metrics
- */
-class StreamMetrics(sources: Set[Source], triggerClock: Clock, 
codahaleSourceName: String)
-  extends CodahaleSource with Logging {
-
-  import StreamMetrics._
-
-  // Trigger infos
-  private val triggerDetails = new mutable.HashMap[String, String]
-  private val sourceTriggerDetails = new mutable.HashMap[Source, 
mutable.HashMap[String, String]]
-
-  // Rate estimators for sources and sinks
-  private val inputRates = new mutable.HashMap[Source, RateCalculator]
-  private val processingRates = new mutable.HashMap[Source, RateCalculator]
-
-  // Number of input rows in the current trigger
-  private val numInputRows = new mutable.HashMap[Source, Long]
-  private var currentTriggerStartTimestamp: Long = -1
-  private var previousTriggerStartTimestamp: Long = -1
-  private var latency: Option[Double] = None
-
-  override val sourceName: String = codahaleSourceName
-  override val metricRegistry: MetricRegistry = new MetricRegistry
-
-  // =========== Initialization ===========
-
-  // Metric names should not have . in them, so that all the metrics of a 
query are identified
-  // together in Ganglia as a single metric group
-  registerGauge("inputRate-total", currentInputRate)
-  registerGauge("processingRate-total", () => currentProcessingRate)
-  registerGauge("latency", () => currentLatency().getOrElse(-1.0))
-
-  sources.foreach { s =>
-    inputRates.put(s, new RateCalculator)
-    processingRates.put(s, new RateCalculator)
-    sourceTriggerDetails.put(s, new mutable.HashMap[String, String])
-
-    registerGauge(s"inputRate-${s.toString}", () => currentSourceInputRate(s))
-    registerGauge(s"processingRate-${s.toString}", () => 
currentSourceProcessingRate(s))
-  }
-
-  // =========== Setter methods ===========
-
-  def reportTriggerStarted(batchId: Long): Unit = synchronized {
-    numInputRows.clear()
-    triggerDetails.clear()
-    sourceTriggerDetails.values.foreach(_.clear())
-
-    reportTriggerDetail(BATCH_ID, batchId)
-    sources.foreach(s => reportSourceTriggerDetail(s, BATCH_ID, batchId))
-    reportTriggerDetail(IS_TRIGGER_ACTIVE, true)
-    currentTriggerStartTimestamp = triggerClock.getTimeMillis()
-    reportTriggerDetail(START_TIMESTAMP, currentTriggerStartTimestamp)
-  }
-
-  def reportTriggerDetail[T](key: String, value: T): Unit = synchronized {
-    triggerDetails.put(key, value.toString)
-  }
-
-  def reportSourceTriggerDetail[T](source: Source, key: String, value: T): 
Unit = synchronized {
-    sourceTriggerDetails(source).put(key, value.toString)
-  }
-
-  def reportNumInputRows(inputRows: Map[Source, Long]): Unit = synchronized {
-    numInputRows ++= inputRows
-  }
-
-  def reportTriggerFinished(): Unit = synchronized {
-    require(currentTriggerStartTimestamp >= 0)
-    val currentTriggerFinishTimestamp = triggerClock.getTimeMillis()
-    reportTriggerDetail(FINISH_TIMESTAMP, currentTriggerFinishTimestamp)
-    triggerDetails.remove(STATUS_MESSAGE)
-    reportTriggerDetail(IS_TRIGGER_ACTIVE, false)
-
-    // Report number of rows
-    val totalNumInputRows = numInputRows.values.sum
-    reportTriggerDetail(NUM_INPUT_ROWS, totalNumInputRows)
-    numInputRows.foreach { case (s, r) =>
-      reportSourceTriggerDetail(s, NUM_SOURCE_INPUT_ROWS, r)
-    }
-
-    val currentTriggerDuration = currentTriggerFinishTimestamp - 
currentTriggerStartTimestamp
-    val previousInputIntervalOption = if (previousTriggerStartTimestamp >= 0) {
-      Some(currentTriggerStartTimestamp - previousTriggerStartTimestamp)
-    } else None
-
-    // Update input rate = num rows received by each source during the 
previous trigger interval
-    // Interval is measures as interval between start times of previous and 
current trigger.
-    //
-    // TODO: Instead of trigger start, we should use time when getOffset was 
called on each source
-    // as this may be different for each source if there are many sources in 
the query plan
-    // and getOffset is called serially on them.
-    if (previousInputIntervalOption.nonEmpty) {
-      sources.foreach { s =>
-        inputRates(s).update(numInputRows.getOrElse(s, 0), 
previousInputIntervalOption.get)
-      }
-    }
-
-    // Update processing rate = num rows processed for each source in current 
trigger duration
-    sources.foreach { s =>
-      processingRates(s).update(numInputRows.getOrElse(s, 0), 
currentTriggerDuration)
-    }
-
-    // Update latency = if data present, 0.5 * previous trigger interval + 
current trigger duration
-    if (previousInputIntervalOption.nonEmpty && totalNumInputRows > 0) {
-      latency = Some((previousInputIntervalOption.get.toDouble / 2) + 
currentTriggerDuration)
-    } else {
-      latency = None
-    }
-
-    previousTriggerStartTimestamp = currentTriggerStartTimestamp
-    currentTriggerStartTimestamp = -1
-  }
-
-  // =========== Getter methods ===========
-
-  def currentInputRate(): Double = synchronized {
-    // Since we are calculating source input rates using the same time 
interval for all sources
-    // it is fine to calculate total input rate as the sum of per source input 
rate.
-    inputRates.map(_._2.currentRate).sum
-  }
-
-  def currentSourceInputRate(source: Source): Double = synchronized {
-    inputRates(source).currentRate
-  }
-
-  def currentProcessingRate(): Double = synchronized {
-    // Since we are calculating source processing rates using the same time 
interval for all sources
-    // it is fine to calculate total processing rate as the sum of per source 
processing rate.
-    processingRates.map(_._2.currentRate).sum
-  }
-
-  def currentSourceProcessingRate(source: Source): Double = synchronized {
-    processingRates(source).currentRate
-  }
-
-  def currentLatency(): Option[Double] = synchronized { latency }
-
-  def currentTriggerDetails(): Map[String, String] = synchronized { 
triggerDetails.toMap }
-
-  def currentSourceTriggerDetails(source: Source): Map[String, String] = 
synchronized {
-    sourceTriggerDetails(source).toMap
-  }
-
-  // =========== Other methods ===========
-
-  private def registerGauge[T](name: String, f: () => T)(implicit num: 
Numeric[T]): Unit = {
-    synchronized {
-      metricRegistry.register(name, new Gauge[T] {
-        override def getValue: T = f()
-      })
-    }
-  }
-
-  def stop(): Unit = synchronized {
-    triggerDetails.clear()
-    inputRates.valuesIterator.foreach { _.stop() }
-    processingRates.valuesIterator.foreach { _.stop() }
-    latency = None
-  }
-}
-
-object StreamMetrics extends Logging {
-  /** Simple utility class to calculate rate while avoiding DivideByZero */
-  class RateCalculator {
-    @volatile private var rate: Option[Double] = None
-
-    def update(numRows: Long, timeGapMs: Long): Unit = {
-      if (timeGapMs > 0) {
-        rate = Some(numRows.toDouble * 1000 / timeGapMs)
-      } else {
-        rate = None
-        logDebug(s"Rate updates cannot with zero or negative time gap 
$timeGapMs")
-      }
-    }
-
-    def currentRate: Double = rate.getOrElse(0.0)
-
-    def stop(): Unit = { rate = None }
-  }
-
-
-  val BATCH_ID = "batchId"
-  val IS_TRIGGER_ACTIVE = "isTriggerActive"
-  val IS_DATA_PRESENT_IN_TRIGGER = "isDataPresentInTrigger"
-  val STATUS_MESSAGE = "statusMessage"
-  val EVENT_TIME_WATERMARK = "eventTimeWatermark"
-
-  val START_TIMESTAMP = "timestamp.triggerStart"
-  val GET_OFFSET_TIMESTAMP = "timestamp.afterGetOffset"
-  val GET_BATCH_TIMESTAMP = "timestamp.afterGetBatch"
-  val FINISH_TIMESTAMP = "timestamp.triggerFinish"
-
-  val GET_OFFSET_LATENCY = "latency.getOffset.total"
-  val GET_BATCH_LATENCY = "latency.getBatch.total"
-  val OFFSET_WAL_WRITE_LATENCY = "latency.offsetLogWrite"
-  val OPTIMIZER_LATENCY = "latency.optimizer"
-  val TRIGGER_LATENCY = "latency.fullTrigger"
-  val SOURCE_GET_OFFSET_LATENCY = "latency.getOffset.source"
-  val SOURCE_GET_BATCH_LATENCY = "latency.getBatch.source"
-
-  val NUM_INPUT_ROWS = "numRows.input.total"
-  val NUM_SOURCE_INPUT_ROWS = "numRows.input.source"
-  def NUM_TOTAL_STATE_ROWS(aggId: Int): String = 
s"numRows.state.aggregation$aggId.total"
-  def NUM_UPDATED_STATE_ROWS(aggId: Int): String = 
s"numRows.state.aggregation$aggId.updated"
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 5589805..21b26b8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -583,6 +583,12 @@ object SQLConf {
       .booleanConf
       .createWithDefault(false)
 
+  val STREAMING_PROGRESS_RETENTION =
+    SQLConfigBuilder("spark.sql.streaming.numRecentProgresses")
+      .doc("The number of progress updates to retain for a streaming query")
+      .intConf
+      .createWithDefault(100)
+
   val NDV_MAX_ERROR =
     SQLConfigBuilder("spark.sql.statistics.ndv.maxError")
       .internal()
@@ -654,6 +660,8 @@ private[sql] class SQLConf extends Serializable with 
CatalystConf with Logging {
 
   def streamingMetricsEnabled: Boolean = getConf(STREAMING_METRICS_ENABLED)
 
+  def streamingProgressRetention: Int = getConf(STREAMING_PROGRESS_RETENTION)
+
   def filesMaxPartitionBytes: Long = getConf(FILES_MAX_PARTITION_BYTES)
 
   def filesOpenCostInBytes: Long = getConf(FILES_OPEN_COST_IN_BYTES)

http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
deleted file mode 100644
index ab19602..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import org.json4s._
-import org.json4s.JsonAST.JValue
-import org.json4s.JsonDSL._
-import org.json4s.jackson.JsonMethods._
-
-import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.streaming.StreamingQueryStatus.indent
-
-/**
- * :: Experimental ::
- * Status and metrics of a streaming sink.
- *
- * @param description Description of the source corresponding to this status.
- * @param offsetDesc Description of the current offsets up to which data has 
been written
- *                   by the sink.
- * @since 2.0.0
- */
-@Experimental
-class SinkStatus private(
-    val description: String,
-    val offsetDesc: String) {
-
-  /** The compact JSON representation of this status. */
-  def json: String = compact(render(jsonValue))
-
-  /** The pretty (i.e. indented) JSON representation of this status. */
-  def prettyJson: String = pretty(render(jsonValue))
-
-  override def toString: String =
-    "Status of sink " + indent(prettyString).trim
-
-  private[sql] def jsonValue: JValue = {
-    ("description" -> JString(description)) ~
-    ("offsetDesc" -> JString(offsetDesc))
-  }
-
-  private[sql] def prettyString: String = {
-    s"""$description
-       |Committed offsets: $offsetDesc
-       |""".stripMargin
-  }
-}
-
-/** Companion object, primarily for creating SinkStatus instances internally */
-private[sql] object SinkStatus {
-  def apply(desc: String, offsetDesc: String): SinkStatus = new 
SinkStatus(desc, offsetDesc)
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
deleted file mode 100644
index cfdf113..0000000
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import java.{util => ju}
-
-import scala.collection.JavaConverters._
-
-import org.json4s._
-import org.json4s.JsonAST.JValue
-import org.json4s.JsonDSL._
-import org.json4s.jackson.JsonMethods._
-
-import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.streaming.StreamingQueryStatus.indent
-import org.apache.spark.util.JsonProtocol
-
-/**
- * :: Experimental ::
- * Status and metrics of a streaming Source.
- *
- * @param description Description of the source corresponding to this status.
- * @param offsetDesc Description of the current offset if known.
- * @param inputRate Current rate (rows/sec) at which data is being generated 
by the source.
- * @param processingRate Current rate (rows/sec) at which the query is 
processing data from
- *                       the source.
- * @param triggerDetails Low-level details of the currently active trigger 
(e.g. number of
- *                      rows processed in trigger, latency of intermediate 
steps, etc.).
- *                      If no trigger is active, then it will have details of 
the last completed
- *                      trigger.
- * @since 2.0.0
- */
-@Experimental
-class SourceStatus private(
-    val description: String,
-    val offsetDesc: String,
-    val inputRate: Double,
-    val processingRate: Double,
-    val triggerDetails: ju.Map[String, String]) {
-
-  /** The compact JSON representation of this status. */
-  def json: String = compact(render(jsonValue))
-
-  /** The pretty (i.e. indented) JSON representation of this status. */
-  def prettyJson: String = pretty(render(jsonValue))
-
-  override def toString: String =
-    "Status of source " + indent(prettyString).trim
-
-  private[sql] def jsonValue: JValue = {
-    ("description" -> JString(description)) ~
-    ("offsetDesc" -> JString(offsetDesc)) ~
-    ("inputRate" -> JDouble(inputRate)) ~
-    ("processingRate" -> JDouble(processingRate)) ~
-    ("triggerDetails" -> JsonProtocol.mapToJson(triggerDetails.asScala))
-  }
-
-  private[sql] def prettyString: String = {
-    val triggerDetailsLines =
-      triggerDetails.asScala.map { case (k, v) => s"$k: $v" }
-    s"""$description
-       |Available offset: $offsetDesc
-       |Input rate: $inputRate rows/sec
-       |Processing rate: $processingRate rows/sec
-       |Trigger details:
-       |""".stripMargin + indent(triggerDetailsLines)
-  }
-}
-
-/** Companion object, primarily for creating SourceStatus instances internally 
*/
-private[sql] object SourceStatus {
-  def apply(
-      desc: String,
-      offsetDesc: String,
-      inputRate: Double,
-      processingRate: Double,
-      triggerDetails: Map[String, String]): SourceStatus = {
-    new SourceStatus(desc, offsetDesc, inputRate, processingRate, 
triggerDetails.asJava)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
index 374313f..8fc4e43 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.streaming
 
+import java.util.UUID
+
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql.SparkSession
 
@@ -33,25 +35,27 @@ trait StreamingQuery {
    * Returns the name of the query. This name is unique across all active 
queries. This can be
    * set in the `org.apache.spark.sql.streaming.DataStreamWriter` as
    * `dataframe.writeStream.queryName("query").start()`.
+   *
    * @since 2.0.0
    */
   def name: String
 
   /**
-   * Returns the unique id of this query. This id is automatically generated 
and is unique across
-   * all queries that have been started in the current process.
-   * @since 2.0.0
+   * Returns the unique id of this query.
+   * @since 2.1.0
    */
-  def id: Long
+  def id: UUID
 
   /**
    * Returns the `SparkSession` associated with `this`.
+   *
    * @since 2.0.0
    */
   def sparkSession: SparkSession
 
   /**
-   * Whether the query is currently active or not
+   * Returns `true` if this query is actively running.
+   *
    * @since 2.0.0
    */
   def isActive: Boolean
@@ -64,23 +68,26 @@ trait StreamingQuery {
 
   /**
    * Returns the current status of the query.
+   *
    * @since 2.0.2
    */
   def status: StreamingQueryStatus
 
   /**
-   * Returns current status of all the sources.
-   * @since 2.0.0
+   * Returns an array of the most recent [[StreamingQueryProgress]] updates 
for this query.
+   * The number of progress updates retained for each stream is configured by 
Spark session
+   * configuration `spark.sql.streaming.numRecentProgresses`.
+   *
+   * @since 2.1.0
    */
-  @deprecated("use status.sourceStatuses", "2.0.2")
-  def sourceStatuses: Array[SourceStatus]
+  def recentProgresses: Array[StreamingQueryProgress]
 
   /**
-   * Returns current status of the sink.
-   * @since 2.0.0
+   * Returns the most recent [[StreamingQueryProgress]] update of this 
streaming query.
+   *
+   * @since 2.1.0
    */
-  @deprecated("use status.sinkStatus", "2.0.2")
-  def sinkStatus: SinkStatus
+  def lastProgress: StreamingQueryProgress
 
   /**
    * Waits for the termination of `this` query, either by `query.stop()` or by 
an exception.

http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
index 0a58142..13f11ba 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryException.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.execution.streaming.{Offset, 
OffsetSeq, StreamExecut
  * :: Experimental ::
  * Exception that stopped a [[StreamingQuery]]. Use `cause` get the actual 
exception
  * that caused the failure.
- * @param query      Query that caused the exception
+ * @param query       Query that caused the exception
  * @param message     Message of this exception
  * @param cause       Internal cause of this exception
  * @param startOffset Starting offset (if known) of the range of data in which 
exception occurred

http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
index 9e311fa..d9ee75c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryListener.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.streaming
 
+import java.util.UUID
+
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.scheduler.SparkListenerEvent
 
@@ -81,30 +83,28 @@ object StreamingQueryListener {
   /**
    * :: Experimental ::
    * Event representing the start of a query
-   * @since 2.0.0
+   * @since 2.1.0
    */
   @Experimental
-  class QueryStartedEvent private[sql](val queryStatus: StreamingQueryStatus) 
extends Event
+  class QueryStartedEvent private[sql](val id: UUID, val name: String) extends 
Event
 
   /**
    * :: Experimental ::
-   * Event representing any progress updates in a query
-   * @since 2.0.0
+   * Event representing any progress updates in a query.
+   * @since 2.1.0
    */
   @Experimental
-  class QueryProgressEvent private[sql](val queryStatus: StreamingQueryStatus) 
extends Event
+  class QueryProgressEvent private[sql](val progress: StreamingQueryProgress) 
extends Event
 
   /**
    * :: Experimental ::
-   * Event representing that termination of a query
+   * Event representing that termination of a query.
    *
-   * @param queryStatus Information about the status of the query.
-   * @param exception The exception message of the [[StreamingQuery]] if the 
query was terminated
+   * @param id The query id.
+   * @param exception The exception message of the query if the query was 
terminated
    *                  with an exception. Otherwise, it will be `None`.
-   * @since 2.0.0
+   * @since 2.1.0
    */
   @Experimental
-  class QueryTerminatedEvent private[sql](
-      val queryStatus: StreamingQueryStatus,
-      val exception: Option[String]) extends Event
+  class QueryTerminatedEvent private[sql](val id: UUID, val exception: 
Option[String]) extends Event
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 53968a8..c448468 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -17,6 +17,9 @@
 
 package org.apache.spark.sql.streaming
 
+import java.util.UUID
+import java.util.concurrent.atomic.AtomicLong
+
 import scala.collection.mutable
 
 import org.apache.hadoop.fs.Path
@@ -41,7 +44,7 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) {
   private[sql] val stateStoreCoordinator =
     StateStoreCoordinatorRef.forDriver(sparkSession.sparkContext.env)
   private val listenerBus = new 
StreamingQueryListenerBus(sparkSession.sparkContext.listenerBus)
-  private val activeQueries = new mutable.HashMap[Long, StreamingQuery]
+  private val activeQueries = new mutable.HashMap[UUID, StreamingQuery]
   private val activeQueriesLock = new Object
   private val awaitTerminationLock = new Object
 
@@ -59,13 +62,20 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) {
   /**
    * Returns the query if there is an active query with the given id, or null.
    *
-   * @since 2.0.0
+   * @since 2.1.0
    */
-  def get(id: Long): StreamingQuery = activeQueriesLock.synchronized {
+  def get(id: UUID): StreamingQuery = activeQueriesLock.synchronized {
     activeQueries.get(id).orNull
   }
 
   /**
+   * Returns the query if there is an active query with the given id, or null.
+   *
+   * @since 2.1.0
+   */
+  def get(id: String): StreamingQuery = get(UUID.fromString(id))
+
+  /**
    * Wait until any of the queries on the associated SQLContext has terminated 
since the
    * creation of the context, or since `resetTerminated()` was called. If any 
query was terminated
    * with an exception, then the exception will be thrown.
@@ -197,8 +207,7 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) {
       trigger: Trigger = ProcessingTime(0),
       triggerClock: Clock = new SystemClock()): StreamingQuery = {
     activeQueriesLock.synchronized {
-      val id = StreamExecution.nextId
-      val name = userSpecifiedName.getOrElse(s"query-$id")
+      val name = 
userSpecifiedName.getOrElse(s"query-${StreamingQueryManager.nextId}")
       if (activeQueries.values.exists(_.name == name)) {
         throw new IllegalArgumentException(
           s"Cannot start query with name $name as a query with that name is 
already active")
@@ -252,7 +261,6 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) {
       }
       val query = new StreamExecution(
         sparkSession,
-        id,
         name,
         checkpointLocation,
         logicalPlan,
@@ -261,7 +269,7 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) {
         triggerClock,
         outputMode)
       query.start()
-      activeQueries.put(id, query)
+      activeQueries.put(query.id, query)
       query
     }
   }
@@ -279,3 +287,8 @@ class StreamingQueryManager private[sql] (sparkSession: 
SparkSession) {
     }
   }
 }
+
+private object StreamingQueryManager {
+  private val _nextId = new AtomicLong(0)
+  private def nextId: Long = _nextId.getAndIncrement()
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/28b57c8a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
index ba732ff..4c1a7ce 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala
@@ -17,146 +17,17 @@
 
 package org.apache.spark.sql.streaming
 
-import java.{util => ju}
-
-import scala.collection.JavaConverters._
-
-import org.json4s._
-import org.json4s.JsonAST.JValue
-import org.json4s.JsonDSL._
-import org.json4s.jackson.JsonMethods._
-
-import org.apache.spark.annotation.Experimental
-import org.apache.spark.sql.execution.streaming.{LongOffset, OffsetSeq}
-import org.apache.spark.util.JsonProtocol
-
 /**
- * :: Experimental ::
- * A class used to report information about the progress of a 
[[StreamingQuery]].
+ * Reports information about the instantaneous status of a streaming query.
  *
- * @param name Name of the query. This name is unique across all active 
queries.
- * @param id Id of the query. This id is unique across
- *          all queries that have been started in the current process.
- * @param timestamp Timestamp (ms) of when this query was generated.
- * @param inputRate Current rate (rows/sec) at which data is being generated 
by all the sources.
- * @param processingRate Current rate (rows/sec) at which the query is 
processing data from
- *                       all the sources.
- * @param latency  Current average latency between the data being available in 
source and the sink
- *                   writing the corresponding output.
- * @param sourceStatuses Current statuses of the sources.
- * @param sinkStatus Current status of the sink.
- * @param triggerDetails Low-level details of the currently active trigger 
(e.g. number of
- *                      rows processed in trigger, latency of intermediate 
steps, etc.).
- *                      If no trigger is active, then it will have details of 
the last completed
- *                      trigger.
- * @since 2.0.0
+ * @param message A human readable description of what the stream is currently 
doing.
+ * @param isDataAvailable True when there is new data to be processed.
+ * @param isTriggerActive True when the trigger is actively firing, false when 
waiting for the
+ *                        next trigger time.
+ *
+ * @since 2.1.0
  */
-@Experimental
-class StreamingQueryStatus private(
-  val name: String,
-  val id: Long,
-  val timestamp: Long,
-  val inputRate: Double,
-  val processingRate: Double,
-  val latency: Option[Double],
-  val sourceStatuses: Array[SourceStatus],
-  val sinkStatus: SinkStatus,
-  val triggerDetails: ju.Map[String, String]) {
-
-  import StreamingQueryStatus._
-
-  /** The compact JSON representation of this status. */
-  def json: String = compact(render(jsonValue))
-
-  /** The pretty (i.e. indented) JSON representation of this status. */
-  def prettyJson: String = pretty(render(jsonValue))
-
-  override def toString: String = {
-    val sourceStatusLines = sourceStatuses.zipWithIndex.map { case (s, i) =>
-      s"Source ${i + 1} - " + indent(s.prettyString).trim
-    }
-    val sinkStatusLines = sinkStatus.prettyString.trim
-    val triggerDetailsLines = triggerDetails.asScala.map { case (k, v) => 
s"$k: $v" }.toSeq.sorted
-    val numSources = sourceStatuses.length
-    val numSourcesString = s"$numSources source" + { if (numSources > 1) "s" 
else "" }
-
-    val allLines =
-      s"""|Query id: $id
-          |Status timestamp: $timestamp
-          |Input rate: $inputRate rows/sec
-          |Processing rate $processingRate rows/sec
-          |Latency: ${latency.getOrElse("-")} ms
-          |Trigger details:
-          |${indent(triggerDetailsLines)}
-          |Source statuses [$numSourcesString]:
-          |${indent(sourceStatusLines)}
-          |Sink status - ${indent(sinkStatusLines).trim}""".stripMargin
-
-    s"Status of query '$name'\n${indent(allLines)}"
-  }
-
-  private[sql] def jsonValue: JValue = {
-    ("name" -> JString(name)) ~
-    ("id" -> JInt(id)) ~
-    ("timestamp" -> JInt(timestamp)) ~
-    ("inputRate" -> JDouble(inputRate)) ~
-    ("processingRate" -> JDouble(processingRate)) ~
-    ("latency" -> latency.map(JDouble).getOrElse(JNothing)) ~
-    ("triggerDetails" -> JsonProtocol.mapToJson(triggerDetails.asScala)) ~
-    ("sourceStatuses" -> JArray(sourceStatuses.map(_.jsonValue).toList)) ~
-    ("sinkStatus" -> sinkStatus.jsonValue)
-  }
-}
-
-/** Companion object, primarily for creating StreamingQueryInfo instances 
internally */
-private[sql] object StreamingQueryStatus {
-  def apply(
-      name: String,
-      id: Long,
-      timestamp: Long,
-      inputRate: Double,
-      processingRate: Double,
-      latency: Option[Double],
-      sourceStatuses: Array[SourceStatus],
-      sinkStatus: SinkStatus,
-      triggerDetails: Map[String, String]): StreamingQueryStatus = {
-    new StreamingQueryStatus(name, id, timestamp, inputRate, processingRate,
-      latency, sourceStatuses, sinkStatus, triggerDetails.asJava)
-  }
-
-  def indent(strings: Iterable[String]): String = 
strings.map(indent).mkString("\n")
-  def indent(string: String): String = string.split("\n").map("    " + 
_).mkString("\n")
-
-  /** Create an instance of status for python testing */
-  def testStatus(): StreamingQueryStatus = {
-    import org.apache.spark.sql.execution.streaming.StreamMetrics._
-    StreamingQueryStatus(
-      name = "query",
-      id = 1,
-      timestamp = 123,
-      inputRate = 15.5,
-      processingRate = 23.5,
-      latency = Some(345),
-      sourceStatuses = Array(
-        SourceStatus(
-          desc = "MySource1",
-          offsetDesc = LongOffset(0).json,
-          inputRate = 15.5,
-          processingRate = 23.5,
-          triggerDetails = Map(
-            NUM_SOURCE_INPUT_ROWS -> "100",
-            SOURCE_GET_OFFSET_LATENCY -> "10",
-            SOURCE_GET_BATCH_LATENCY -> "20"))),
-      sinkStatus = SinkStatus(
-        desc = "MySink",
-        offsetDesc = OffsetSeq(Some(LongOffset(1)) :: None :: Nil).toString),
-      triggerDetails = Map(
-        BATCH_ID -> "5",
-        IS_TRIGGER_ACTIVE -> "true",
-        IS_DATA_PRESENT_IN_TRIGGER -> "true",
-        GET_OFFSET_LATENCY -> "10",
-        GET_BATCH_LATENCY -> "20",
-        NUM_INPUT_ROWS -> "100"
-      ))
-  }
-}
+case class StreamingQueryStatus protected[sql](
+    message: String,
+    isDataAvailable: Boolean,
+    isTriggerActive: Boolean)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to