spark git commit: [SPARK-17926][SQL][STREAMING] Added json for statuses
Repository: spark Updated Branches: refs/heads/branch-2.0 78458a7eb -> af2e6e0c9 [SPARK-17926][SQL][STREAMING] Added json for statuses ## What changes were proposed in this pull request? StreamingQueryStatus exposed through StreamingQueryListener often needs to be recorded (similar to SparkListener events). This PR adds `.json` and `.prettyJson` to `StreamingQueryStatus`, `SourceStatus` and `SinkStatus`. ## How was this patch tested? New unit tests Author: Tathagata Das Closes #15476 from tdas/SPARK-17926. (cherry picked from commit 7a531e3054f8d4820216ed379433559f57f571b8) Signed-off-by: Yin Huai Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/af2e6e0c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/af2e6e0c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/af2e6e0c Branch: refs/heads/branch-2.0 Commit: af2e6e0c9c85c40bc505ed1183857a8fb60fbd72 Parents: 78458a7 Author: Tathagata Das Authored: Fri Oct 21 13:07:29 2016 -0700 Committer: Yin Huai Committed: Fri Oct 21 13:07:59 2016 -0700 -- python/pyspark/sql/streaming.py | 11 +- .../apache/spark/sql/streaming/SinkStatus.scala | 18 +++- .../spark/sql/streaming/SourceStatus.scala | 23 +++- .../sql/streaming/StreamingQueryStatus.scala| 55 +++--- .../streaming/StreamingQueryStatusSuite.scala | 105 +++ 5 files changed, 187 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/af2e6e0c/python/pyspark/sql/streaming.py -- diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 0df63a7..cfe917b 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -205,8 +205,7 @@ class StreamingQueryStatus(object): Pretty string of this query status. >>> print(sqs) -StreamingQueryStatus: -Query name: query +Status of query 'query' Query id: 1 Status timestamp: 123 Input rate: 15.5 rows/sec @@ -220,7 +219,7 @@ class StreamingQueryStatus(object): numRows.input.total: 100 triggerId: 5 Source statuses [1 source]: -Source 1:MySource1 +Source 1 - MySource1 Available offset: #0 Input rate: 15.5 rows/sec Processing rate: 23.5 rows/sec @@ -228,7 +227,7 @@ class StreamingQueryStatus(object): numRows.input.source: 100 latency.getOffset.source: 10 latency.getBatch.source: 20 -Sink status: MySink +Sink status - MySink Committed offsets: [#1, -] """ return self._jsqs.toString() @@ -366,7 +365,7 @@ class SourceStatus(object): Pretty string of this source status. >>> print(sqs.sourceStatuses[0]) -SourceStatus:MySource1 +Status of source MySource1 Available offset: #0 Input rate: 15.5 rows/sec Processing rate: 23.5 rows/sec @@ -457,7 +456,7 @@ class SinkStatus(object): Pretty string of this source status. >>> print(sqs.sinkStatus) -SinkStatus:MySink +Status of sink MySink Committed offsets: [#1, -] """ return self._jss.toString() http://git-wip-us.apache.org/repos/asf/spark/blob/af2e6e0c/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala index c991166..ab19602 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala @@ -17,6 +17,11 @@ package org.apache.spark.sql.streaming +import org.json4s._ +import org.json4s.JsonAST.JValue +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + import org.apache.spark.annotation.Experimental import org.apache.spark.sql.streaming.StreamingQueryStatus.indent @@ -34,8 +39,19 @@ class SinkStatus private( val description: String, val offsetDesc: String) { + /** The compact JSON representation of this status. */ + def json: String = compact(render(jsonValue)) + + /** The pretty (i.e. indented) JSON representation of this status. */ + def prettyJson: String = pretty(render(jsonValue)) + override def toString: String = -"SinkStatus:" + indent(prettyString) +"Status of sink " + indent(prettyString).trim + +
spark git commit: [SPARK-17926][SQL][STREAMING] Added json for statuses
Repository: spark Updated Branches: refs/heads/master e371040a0 -> 7a531e305 [SPARK-17926][SQL][STREAMING] Added json for statuses ## What changes were proposed in this pull request? StreamingQueryStatus exposed through StreamingQueryListener often needs to be recorded (similar to SparkListener events). This PR adds `.json` and `.prettyJson` to `StreamingQueryStatus`, `SourceStatus` and `SinkStatus`. ## How was this patch tested? New unit tests Author: Tathagata Das Closes #15476 from tdas/SPARK-17926. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a531e30 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a531e30 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a531e30 Branch: refs/heads/master Commit: 7a531e3054f8d4820216ed379433559f57f571b8 Parents: e371040 Author: Tathagata Das Authored: Fri Oct 21 13:07:29 2016 -0700 Committer: Yin Huai Committed: Fri Oct 21 13:07:29 2016 -0700 -- python/pyspark/sql/streaming.py | 11 +- .../apache/spark/sql/streaming/SinkStatus.scala | 18 +++- .../spark/sql/streaming/SourceStatus.scala | 23 +++- .../sql/streaming/StreamingQueryStatus.scala| 55 +++--- .../streaming/StreamingQueryStatusSuite.scala | 105 +++ 5 files changed, 187 insertions(+), 25 deletions(-) -- http://git-wip-us.apache.org/repos/asf/spark/blob/7a531e30/python/pyspark/sql/streaming.py -- diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index ce47bd1..35fc469 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -205,8 +205,7 @@ class StreamingQueryStatus(object): Pretty string of this query status. >>> print(sqs) -StreamingQueryStatus: -Query name: query +Status of query 'query' Query id: 1 Status timestamp: 123 Input rate: 15.5 rows/sec @@ -220,7 +219,7 @@ class StreamingQueryStatus(object): numRows.input.total: 100 triggerId: 5 Source statuses [1 source]: -Source 1:MySource1 +Source 1 - MySource1 Available offset: #0 Input rate: 15.5 rows/sec Processing rate: 23.5 rows/sec @@ -228,7 +227,7 @@ class StreamingQueryStatus(object): numRows.input.source: 100 latency.getOffset.source: 10 latency.getBatch.source: 20 -Sink status: MySink +Sink status - MySink Committed offsets: [#1, -] """ return self._jsqs.toString() @@ -366,7 +365,7 @@ class SourceStatus(object): Pretty string of this source status. >>> print(sqs.sourceStatuses[0]) -SourceStatus:MySource1 +Status of source MySource1 Available offset: #0 Input rate: 15.5 rows/sec Processing rate: 23.5 rows/sec @@ -457,7 +456,7 @@ class SinkStatus(object): Pretty string of this source status. >>> print(sqs.sinkStatus) -SinkStatus:MySink +Status of sink MySink Committed offsets: [#1, -] """ return self._jss.toString() http://git-wip-us.apache.org/repos/asf/spark/blob/7a531e30/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala -- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala index c991166..ab19602 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala @@ -17,6 +17,11 @@ package org.apache.spark.sql.streaming +import org.json4s._ +import org.json4s.JsonAST.JValue +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + import org.apache.spark.annotation.Experimental import org.apache.spark.sql.streaming.StreamingQueryStatus.indent @@ -34,8 +39,19 @@ class SinkStatus private( val description: String, val offsetDesc: String) { + /** The compact JSON representation of this status. */ + def json: String = compact(render(jsonValue)) + + /** The pretty (i.e. indented) JSON representation of this status. */ + def prettyJson: String = pretty(render(jsonValue)) + override def toString: String = -"SinkStatus:" + indent(prettyString) +"Status of sink " + indent(prettyString).trim + + private[sql] def jsonValue: JValue = { +("description" -> JString(description)) ~ +("offsetDe