Repository: spark Updated Branches: refs/heads/branch-2.0 78458a7eb -> af2e6e0c9
[SPARK-17926][SQL][STREAMING] Added json for statuses ## What changes were proposed in this pull request? StreamingQueryStatus exposed through StreamingQueryListener often needs to be recorded (similar to SparkListener events). This PR adds `.json` and `.prettyJson` to `StreamingQueryStatus`, `SourceStatus` and `SinkStatus`. ## How was this patch tested? New unit tests Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #15476 from tdas/SPARK-17926. (cherry picked from commit 7a531e3054f8d4820216ed379433559f57f571b8) Signed-off-by: Yin Huai <yh...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/af2e6e0c Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/af2e6e0c Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/af2e6e0c Branch: refs/heads/branch-2.0 Commit: af2e6e0c9c85c40bc505ed1183857a8fb60fbd72 Parents: 78458a7 Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Fri Oct 21 13:07:29 2016 -0700 Committer: Yin Huai <yh...@databricks.com> Committed: Fri Oct 21 13:07:59 2016 -0700 ---------------------------------------------------------------------- python/pyspark/sql/streaming.py | 11 +- .../apache/spark/sql/streaming/SinkStatus.scala | 18 +++- .../spark/sql/streaming/SourceStatus.scala | 23 +++- .../sql/streaming/StreamingQueryStatus.scala | 55 +++++++--- .../streaming/StreamingQueryStatusSuite.scala | 105 +++++++++++++++++++ 5 files changed, 187 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/af2e6e0c/python/pyspark/sql/streaming.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index 0df63a7..cfe917b 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -205,8 +205,7 @@ class StreamingQueryStatus(object): Pretty string of this query status. >>> print(sqs) - StreamingQueryStatus: - Query name: query + Status of query 'query' Query id: 1 Status timestamp: 123 Input rate: 15.5 rows/sec @@ -220,7 +219,7 @@ class StreamingQueryStatus(object): numRows.input.total: 100 triggerId: 5 Source statuses [1 source]: - Source 1: MySource1 + Source 1 - MySource1 Available offset: #0 Input rate: 15.5 rows/sec Processing rate: 23.5 rows/sec @@ -228,7 +227,7 @@ class StreamingQueryStatus(object): numRows.input.source: 100 latency.getOffset.source: 10 latency.getBatch.source: 20 - Sink status: MySink + Sink status - MySink Committed offsets: [#1, -] """ return self._jsqs.toString() @@ -366,7 +365,7 @@ class SourceStatus(object): Pretty string of this source status. >>> print(sqs.sourceStatuses[0]) - SourceStatus: MySource1 + Status of source MySource1 Available offset: #0 Input rate: 15.5 rows/sec Processing rate: 23.5 rows/sec @@ -457,7 +456,7 @@ class SinkStatus(object): Pretty string of this source status. >>> print(sqs.sinkStatus) - SinkStatus: MySink + Status of sink MySink Committed offsets: [#1, -] """ return self._jss.toString() http://git-wip-us.apache.org/repos/asf/spark/blob/af2e6e0c/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala index c991166..ab19602 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SinkStatus.scala @@ -17,6 +17,11 @@ package org.apache.spark.sql.streaming +import org.json4s._ +import org.json4s.JsonAST.JValue +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + import org.apache.spark.annotation.Experimental import org.apache.spark.sql.streaming.StreamingQueryStatus.indent @@ -34,8 +39,19 @@ class SinkStatus private( val description: String, val offsetDesc: String) { + /** The compact JSON representation of this status. */ + def json: String = compact(render(jsonValue)) + + /** The pretty (i.e. indented) JSON representation of this status. */ + def prettyJson: String = pretty(render(jsonValue)) + override def toString: String = - "SinkStatus:" + indent(prettyString) + "Status of sink " + indent(prettyString).trim + + private[sql] def jsonValue: JValue = { + ("description" -> JString(description)) ~ + ("offsetDesc" -> JString(offsetDesc)) + } private[sql] def prettyString: String = { s"""$description http://git-wip-us.apache.org/repos/asf/spark/blob/af2e6e0c/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala index 6ace483..cfdf113 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/SourceStatus.scala @@ -21,8 +21,14 @@ import java.{util => ju} import scala.collection.JavaConverters._ +import org.json4s._ +import org.json4s.JsonAST.JValue +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + import org.apache.spark.annotation.Experimental import org.apache.spark.sql.streaming.StreamingQueryStatus.indent +import org.apache.spark.util.JsonProtocol /** * :: Experimental :: @@ -47,8 +53,22 @@ class SourceStatus private( val processingRate: Double, val triggerDetails: ju.Map[String, String]) { + /** The compact JSON representation of this status. */ + def json: String = compact(render(jsonValue)) + + /** The pretty (i.e. indented) JSON representation of this status. */ + def prettyJson: String = pretty(render(jsonValue)) + override def toString: String = - "SourceStatus:" + indent(prettyString) + "Status of source " + indent(prettyString).trim + + private[sql] def jsonValue: JValue = { + ("description" -> JString(description)) ~ + ("offsetDesc" -> JString(offsetDesc)) ~ + ("inputRate" -> JDouble(inputRate)) ~ + ("processingRate" -> JDouble(processingRate)) ~ + ("triggerDetails" -> JsonProtocol.mapToJson(triggerDetails.asScala)) + } private[sql] def prettyString: String = { val triggerDetailsLines = @@ -59,7 +79,6 @@ class SourceStatus private( |Processing rate: $processingRate rows/sec |Trigger details: |""".stripMargin + indent(triggerDetailsLines) - } } http://git-wip-us.apache.org/repos/asf/spark/blob/af2e6e0c/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala index 4768992..a50b0d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryStatus.scala @@ -21,8 +21,14 @@ import java.{util => ju} import scala.collection.JavaConverters._ +import org.json4s._ +import org.json4s.JsonAST.JValue +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + import org.apache.spark.annotation.Experimental import org.apache.spark.sql.execution.streaming.{CompositeOffset, LongOffset} +import org.apache.spark.util.JsonProtocol /** * :: Experimental :: @@ -59,29 +65,46 @@ class StreamingQueryStatus private( import StreamingQueryStatus._ + /** The compact JSON representation of this status. */ + def json: String = compact(render(jsonValue)) + + /** The pretty (i.e. indented) JSON representation of this status. */ + def prettyJson: String = pretty(render(jsonValue)) + override def toString: String = { val sourceStatusLines = sourceStatuses.zipWithIndex.map { case (s, i) => - s"Source ${i + 1}:" + indent(s.prettyString) + s"Source ${i + 1} - " + indent(s.prettyString).trim } - val sinkStatusLines = sinkStatus.prettyString + val sinkStatusLines = sinkStatus.prettyString.trim val triggerDetailsLines = triggerDetails.asScala.map { case (k, v) => s"$k: $v" }.toSeq.sorted val numSources = sourceStatuses.length val numSourcesString = s"$numSources source" + { if (numSources > 1) "s" else "" } - val allLines = s""" - |Query name: $name - |Query id: $id - |Status timestamp: $timestamp - |Input rate: $inputRate rows/sec - |Processing rate $processingRate rows/sec - |Latency: ${latency.getOrElse("-")} ms - |Trigger details: - |${indent(triggerDetailsLines)} - |Source statuses [$numSourcesString]: - |${indent(sourceStatusLines)} - |Sink status: ${indent(sinkStatusLines)}""".stripMargin - - s"StreamingQueryStatus:${indent(allLines)}" + val allLines = + s"""|Query id: $id + |Status timestamp: $timestamp + |Input rate: $inputRate rows/sec + |Processing rate $processingRate rows/sec + |Latency: ${latency.getOrElse("-")} ms + |Trigger details: + |${indent(triggerDetailsLines)} + |Source statuses [$numSourcesString]: + |${indent(sourceStatusLines)} + |Sink status - ${indent(sinkStatusLines).trim}""".stripMargin + + s"Status of query '$name'\n${indent(allLines)}" + } + + private[sql] def jsonValue: JValue = { + ("name" -> JString(name)) ~ + ("id" -> JInt(id)) ~ + ("timestamp" -> JInt(timestamp)) ~ + ("inputRate" -> JDouble(inputRate)) ~ + ("processingRate" -> JDouble(processingRate)) ~ + ("latency" -> latency.map(JDouble).getOrElse(JNothing)) ~ + ("triggerDetails" -> JsonProtocol.mapToJson(triggerDetails.asScala)) + ("sourceStatuses" -> JArray(sourceStatuses.map(_.jsonValue).toList)) ~ + ("sinkStatus" -> sinkStatus.jsonValue) } } http://git-wip-us.apache.org/repos/asf/spark/blob/af2e6e0c/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala new file mode 100644 index 0000000..1a98cf2 --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusSuite.scala @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import org.apache.spark.SparkFunSuite + +class StreamingQueryStatusSuite extends SparkFunSuite { + test("toString") { + assert(StreamingQueryStatus.testStatus.sourceStatuses(0).toString === + """ + |Status of source MySource1 + | Available offset: #0 + | Input rate: 15.5 rows/sec + | Processing rate: 23.5 rows/sec + | Trigger details: + | numRows.input.source: 100 + | latency.getOffset.source: 10 + | latency.getBatch.source: 20 + """.stripMargin.trim, "SourceStatus.toString does not match") + + assert(StreamingQueryStatus.testStatus.sinkStatus.toString === + """ + |Status of sink MySink + | Committed offsets: [#1, -] + """.stripMargin.trim, "SinkStatus.toString does not match") + + assert(StreamingQueryStatus.testStatus.toString === + """ + |Status of query 'query' + | Query id: 1 + | Status timestamp: 123 + | Input rate: 15.5 rows/sec + | Processing rate 23.5 rows/sec + | Latency: 345.0 ms + | Trigger details: + | isDataPresentInTrigger: true + | isTriggerActive: true + | latency.getBatch.total: 20 + | latency.getOffset.total: 10 + | numRows.input.total: 100 + | triggerId: 5 + | Source statuses [1 source]: + | Source 1 - MySource1 + | Available offset: #0 + | Input rate: 15.5 rows/sec + | Processing rate: 23.5 rows/sec + | Trigger details: + | numRows.input.source: 100 + | latency.getOffset.source: 10 + | latency.getBatch.source: 20 + | Sink status - MySink + | Committed offsets: [#1, -] + """.stripMargin.trim, "StreamingQueryStatus.toString does not match") + + } + + test("json") { + assert(StreamingQueryStatus.testStatus.json === + """ + |{"sourceStatuses":[{"description":"MySource1","offsetDesc":"#0","inputRate":15.5, + |"processingRate":23.5,"triggerDetails":{"numRows.input.source":"100", + |"latency.getOffset.source":"10","latency.getBatch.source":"20"}}], + |"sinkStatus":{"description":"MySink","offsetDesc":"[#1, -]"}} + """.stripMargin.replace("\n", "").trim) + } + + test("prettyJson") { + assert( + StreamingQueryStatus.testStatus.prettyJson === + """ + |{ + | "sourceStatuses" : [ { + | "description" : "MySource1", + | "offsetDesc" : "#0", + | "inputRate" : 15.5, + | "processingRate" : 23.5, + | "triggerDetails" : { + | "numRows.input.source" : "100", + | "latency.getOffset.source" : "10", + | "latency.getBatch.source" : "20" + | } + | } ], + | "sinkStatus" : { + | "description" : "MySink", + | "offsetDesc" : "[#1, -]" + | } + |} + """.stripMargin.trim) + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org