spark git commit: [SPARK-17926][SQL][STREAMING] Added json for statuses

2016-10-21 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 78458a7eb -> af2e6e0c9


[SPARK-17926][SQL][STREAMING] Added json for statuses

## What changes were proposed in this pull request?

StreamingQueryStatus exposed through StreamingQueryListener often needs to be 
recorded (similar to SparkListener events). This PR adds `.json` and 
`.prettyJson` to `StreamingQueryStatus`, `SourceStatus` and `SinkStatus`.

## How was this patch tested?
New unit tests

Author: Tathagata Das 

Closes #15476 from tdas/SPARK-17926.

(cherry picked from commit 7a531e3054f8d4820216ed379433559f57f571b8)
Signed-off-by: Yin Huai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/af2e6e0c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/af2e6e0c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/af2e6e0c

Branch: refs/heads/branch-2.0
Commit: af2e6e0c9c85c40bc505ed1183857a8fb60fbd72
Parents: 78458a7
Author: Tathagata Das 
Authored: Fri Oct 21 13:07:29 2016 -0700
Committer: Yin Huai 
Committed: Fri Oct 21 13:07:59 2016 -0700

--
 python/pyspark/sql/streaming.py |  11 +-
 .../apache/spark/sql/streaming/SinkStatus.scala |  18 +++-
 .../spark/sql/streaming/SourceStatus.scala  |  23 +++-
 .../sql/streaming/StreamingQueryStatus.scala|  55 +++---
 .../streaming/StreamingQueryStatusSuite.scala   | 105 +++
 5 files changed, 187 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/af2e6e0c/python/pyspark/sql/streaming.py
--
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 0df63a7..cfe917b 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -205,8 +205,7 @@ class StreamingQueryStatus(object):
 Pretty string of this query status.
 
 >>> print(sqs)
-StreamingQueryStatus:
-Query name: query
+Status of query 'query'
 Query id: 1
 Status timestamp: 123
 Input rate: 15.5 rows/sec
@@ -220,7 +219,7 @@ class StreamingQueryStatus(object):
 numRows.input.total: 100
 triggerId: 5
 Source statuses [1 source]:
-Source 1:MySource1
+Source 1 - MySource1
 Available offset: #0
 Input rate: 15.5 rows/sec
 Processing rate: 23.5 rows/sec
@@ -228,7 +227,7 @@ class StreamingQueryStatus(object):
 numRows.input.source: 100
 latency.getOffset.source: 10
 latency.getBatch.source: 20
-Sink status: MySink
+Sink status - MySink
 Committed offsets: [#1, -]
 """
 return self._jsqs.toString()
@@ -366,7 +365,7 @@ class SourceStatus(object):
 Pretty string of this source status.
 
 >>> print(sqs.sourceStatuses[0])
-SourceStatus:MySource1
+Status of source MySource1
 Available offset: #0
 Input rate: 15.5 rows/sec
 Processing rate: 23.5 rows/sec
@@ -457,7 +456,7 @@ class SinkStatus(object):
 Pretty string of this source status.
 
 >>> print(sqs.sinkStatus)
-SinkStatus:MySink
+Status of sink MySink
 Committed offsets: [#1, -]
 """
 return self._jss.toString()

http://git-wip-us.apache.org/repos/asf/spark/blob/af2e6e0c/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
index c991166..ab19602 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
@@ -17,6 +17,11 @@
 
 package org.apache.spark.sql.streaming
 
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql.streaming.StreamingQueryStatus.indent
 
@@ -34,8 +39,19 @@ class SinkStatus private(
 val description: String,
 val offsetDesc: String) {
 
+  /** The compact JSON representation of this status. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this status. */
+  def prettyJson: String = pretty(render(jsonValue))
+
   override def toString: 

spark git commit: [SPARK-17926][SQL][STREAMING] Added json for statuses

2016-10-21 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master e371040a0 -> 7a531e305


[SPARK-17926][SQL][STREAMING] Added json for statuses

## What changes were proposed in this pull request?

StreamingQueryStatus exposed through StreamingQueryListener often needs to be 
recorded (similar to SparkListener events). This PR adds `.json` and 
`.prettyJson` to `StreamingQueryStatus`, `SourceStatus` and `SinkStatus`.

## How was this patch tested?
New unit tests

Author: Tathagata Das 

Closes #15476 from tdas/SPARK-17926.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a531e30
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a531e30
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a531e30

Branch: refs/heads/master
Commit: 7a531e3054f8d4820216ed379433559f57f571b8
Parents: e371040
Author: Tathagata Das 
Authored: Fri Oct 21 13:07:29 2016 -0700
Committer: Yin Huai 
Committed: Fri Oct 21 13:07:29 2016 -0700

--
 python/pyspark/sql/streaming.py |  11 +-
 .../apache/spark/sql/streaming/SinkStatus.scala |  18 +++-
 .../spark/sql/streaming/SourceStatus.scala  |  23 +++-
 .../sql/streaming/StreamingQueryStatus.scala|  55 +++---
 .../streaming/StreamingQueryStatusSuite.scala   | 105 +++
 5 files changed, 187 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7a531e30/python/pyspark/sql/streaming.py
--
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index ce47bd1..35fc469 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -205,8 +205,7 @@ class StreamingQueryStatus(object):
 Pretty string of this query status.
 
 >>> print(sqs)
-StreamingQueryStatus:
-Query name: query
+Status of query 'query'
 Query id: 1
 Status timestamp: 123
 Input rate: 15.5 rows/sec
@@ -220,7 +219,7 @@ class StreamingQueryStatus(object):
 numRows.input.total: 100
 triggerId: 5
 Source statuses [1 source]:
-Source 1:MySource1
+Source 1 - MySource1
 Available offset: #0
 Input rate: 15.5 rows/sec
 Processing rate: 23.5 rows/sec
@@ -228,7 +227,7 @@ class StreamingQueryStatus(object):
 numRows.input.source: 100
 latency.getOffset.source: 10
 latency.getBatch.source: 20
-Sink status: MySink
+Sink status - MySink
 Committed offsets: [#1, -]
 """
 return self._jsqs.toString()
@@ -366,7 +365,7 @@ class SourceStatus(object):
 Pretty string of this source status.
 
 >>> print(sqs.sourceStatuses[0])
-SourceStatus:MySource1
+Status of source MySource1
 Available offset: #0
 Input rate: 15.5 rows/sec
 Processing rate: 23.5 rows/sec
@@ -457,7 +456,7 @@ class SinkStatus(object):
 Pretty string of this source status.
 
 >>> print(sqs.sinkStatus)
-SinkStatus:MySink
+Status of sink MySink
 Committed offsets: [#1, -]
 """
 return self._jss.toString()

http://git-wip-us.apache.org/repos/asf/spark/blob/7a531e30/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
index c991166..ab19602 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala
@@ -17,6 +17,11 @@
 
 package org.apache.spark.sql.streaming
 
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
 import org.apache.spark.annotation.Experimental
 import org.apache.spark.sql.streaming.StreamingQueryStatus.indent
 
@@ -34,8 +39,19 @@ class SinkStatus private(
 val description: String,
 val offsetDesc: String) {
 
+  /** The compact JSON representation of this status. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this status. */
+  def prettyJson: String = pretty(render(jsonValue))
+
   override def toString: String =
-"SinkStatus:" + indent(prettyString)
+"Status of sink " + indent(prettyString).trim
+
+  private[sql] def