Repository: spark Updated Branches: refs/heads/master f18b905f6 -> 84f1b25f3
[SPARK-21462][SS] Added batchId to StreamingQueryProgress.json ## What changes were proposed in this pull request? - Added batchId to StreamingQueryProgress.json as that was missing from the generated json. - Also, removed recently added numPartitions from StatefulOperatorProgress as this value does not change through the query run, and there are other ways to find that. ## How was this patch tested? Updated unit tests Author: Tathagata Das <tathagata.das1...@gmail.com> Closes #18675 from tdas/SPARK-21462. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/84f1b25f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/84f1b25f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/84f1b25f Branch: refs/heads/master Commit: 84f1b25f316a42ce4d3b69a3e136d0db41c9aec2 Parents: f18b905 Author: Tathagata Das <tathagata.das1...@gmail.com> Authored: Tue Jul 18 16:29:45 2017 -0700 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Tue Jul 18 16:29:45 2017 -0700 ---------------------------------------------------------------------- .../sql/execution/streaming/statefulOperators.scala | 3 +-- .../scala/org/apache/spark/sql/streaming/progress.scala | 9 ++++----- .../StreamingQueryStatusAndProgressSuite.scala | 12 ++++++------ 3 files changed, 11 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/84f1b25f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index 3ca7f4b..6addab6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -87,8 +87,7 @@ trait StateStoreWriter extends StatefulOperator { self: SparkPlan => new StateOperatorProgress( numRowsTotal = longMetric("numTotalStateRows").value, numRowsUpdated = longMetric("numUpdatedStateRows").value, - memoryUsedBytes = longMetric("stateMemory").value, - numPartitions = this.sqlContext.conf.numShufflePartitions) + memoryUsedBytes = longMetric("stateMemory").value) } /** Records the duration of running `body` for the next query progress update. */ http://git-wip-us.apache.org/repos/asf/spark/blob/84f1b25f/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 81a2387..3000c42 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -38,8 +38,7 @@ import org.apache.spark.annotation.InterfaceStability class StateOperatorProgress private[sql]( val numRowsTotal: Long, val numRowsUpdated: Long, - val memoryUsedBytes: Long, - val numPartitions: Long + val memoryUsedBytes: Long ) extends Serializable { /** The compact JSON representation of this progress. */ @@ -49,13 +48,12 @@ class StateOperatorProgress private[sql]( def prettyJson: String = pretty(render(jsonValue)) private[sql] def copy(newNumRowsUpdated: Long): StateOperatorProgress = - new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes, numPartitions) + new StateOperatorProgress(numRowsTotal, newNumRowsUpdated, memoryUsedBytes) private[sql] def jsonValue: JValue = { ("numRowsTotal" -> JInt(numRowsTotal)) ~ ("numRowsUpdated" -> JInt(numRowsUpdated)) ~ - ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~ - ("numPartitions" -> JInt(numPartitions)) + ("memoryUsedBytes" -> JInt(memoryUsedBytes)) } } @@ -131,6 +129,7 @@ class StreamingQueryProgress private[sql]( ("runId" -> JString(runId.toString)) ~ ("name" -> JString(name)) ~ ("timestamp" -> JString(timestamp)) ~ + ("batchId" -> JInt(batchId)) ~ ("numInputRows" -> JInt(numInputRows)) ~ ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~ ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~ http://git-wip-us.apache.org/repos/asf/spark/blob/84f1b25f/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index d3cafac..79bb827 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -43,6 +43,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { | "runId" : "${testProgress1.runId.toString}", | "name" : "myName", | "timestamp" : "2016-12-05T20:54:20.827Z", + | "batchId" : 2, | "numInputRows" : 678, | "inputRowsPerSecond" : 10.0, | "durationMs" : { @@ -57,8 +58,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { | "stateOperators" : [ { | "numRowsTotal" : 0, | "numRowsUpdated" : 1, - | "memoryUsedBytes" : 2, - | "numPartitions" : 4 + | "memoryUsedBytes" : 2 | } ], | "sources" : [ { | "description" : "source", @@ -83,6 +83,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { | "runId" : "${testProgress2.runId.toString}", | "name" : null, | "timestamp" : "2016-12-05T20:54:20.827Z", + | "batchId" : 2, | "numInputRows" : 678, | "durationMs" : { | "total" : 0 @@ -90,8 +91,7 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { | "stateOperators" : [ { | "numRowsTotal" : 0, | "numRowsUpdated" : 1, - | "memoryUsedBytes" : 2, - | "numPartitions" : 4 + | "memoryUsedBytes" : 2 | } ], | "sources" : [ { | "description" : "source", @@ -230,7 +230,7 @@ object StreamingQueryStatusAndProgressSuite { "avg" -> "2016-12-05T20:54:20.827Z", "watermark" -> "2016-12-05T20:54:20.827Z").asJava), stateOperators = Array(new StateOperatorProgress( - numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2, numPartitions = 4)), + numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2)), sources = Array( new SourceProgress( description = "source", @@ -254,7 +254,7 @@ object StreamingQueryStatusAndProgressSuite { // empty maps should be handled correctly eventTime = new java.util.HashMap(Map.empty[String, String].asJava), stateOperators = Array(new StateOperatorProgress( - numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2, numPartitions = 4)), + numRowsTotal = 0, numRowsUpdated = 1, memoryUsedBytes = 2)), sources = Array( new SourceProgress( description = "source", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org