This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d44e073f0cd [SPARK-43128][CONNECT][SS] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api d44e073f0cd is described below commit d44e073f0cdaf16028a4854e79db200a4e39a6fe Author: yangjie01 <yangji...@baidu.com> AuthorDate: Wed May 17 16:27:03 2023 +0900 [SPARK-43128][CONNECT][SS] Make `recentProgress` and `lastProgress` return `StreamingQueryProgress` consistent with the native Scala Api ### What changes were proposed in this pull request? This pr add support to make `recentProgress` and `lastProgress` in `RemoteStreamingQuery` return `StreamingQueryProgress` instance consistent with the native Scala Api. ### Why are the changes needed? Add Spark connect jvm client api coverage. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add new check to `StreamingQuerySuite` Closes #40892 from LuciferYang/SPARK-43128. Lead-authored-by: yangjie01 <yangji...@baidu.com> Co-authored-by: YangJie <yangji...@baidu.com> Co-authored-by: Wenchen Fan <wenc...@databricks.com> Co-authored-by: Ruifeng Zheng <ruife...@apache.org> Co-authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../spark/sql/streaming/StreamingQuery.scala | 4 +- .../org/apache/spark/sql/streaming/progress.scala | 310 ++++++++++++++++++++- .../CheckConnectJvmClientCompatibility.scala | 5 - .../streaming/StreamingQueryProgressSuite.scala | 227 +++++++++++++++ .../spark/sql/streaming/StreamingQuerySuite.scala | 23 ++ .../sql/connect/planner/SparkConnectPlanner.scala | 5 +- .../org/apache/spark/sql/streaming/progress.scala | 17 ++ .../StreamingQueryStatusAndProgressSuite.scala | 58 ++++ 8 files changed, 638 insertions(+), 11 deletions(-) diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala index b57e6239b77..ceb096b9aff 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala @@ -205,14 +205,14 @@ class RemoteStreamingQuery( override def recentProgress: Array[StreamingQueryProgress] = { executeQueryCmd(_.setRecentProgress(true)).getRecentProgress.getRecentProgressJsonList.asScala - .map(json => new StreamingQueryProgress(json)) + .map(StreamingQueryProgress.fromJson) .toArray } override def lastProgress: StreamingQueryProgress = { executeQueryCmd( _.setLastProgress(true)).getRecentProgress.getRecentProgressJsonList.asScala.headOption - .map(json => new StreamingQueryProgress(json)) + .map(StreamingQueryProgress.fromJson) .orNull } diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 974bcd64b29..593311efb9c 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -17,6 +17,312 @@ package org.apache.spark.sql.streaming -class StreamingQueryProgress private[sql] (val json: String) { - // TODO(SPARK-43128): (Implement full object by parsing from json). +import java.{util => ju} +import java.lang.{Long => JLong} +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.util.control.NonFatal + +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} +import com.fasterxml.jackson.databind.annotation.JsonDeserialize +import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule} +import org.json4s._ +import org.json4s.JsonAST.JValue +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.annotation.Evolving +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema +import org.apache.spark.sql.streaming.SafeJsonSerializer.{safeDoubleToJValue, safeMapToJValue} +import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS + +/** + * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger. + */ +@Evolving +class StateOperatorProgress private[spark] ( + val operatorName: String, + val numRowsTotal: Long, + val numRowsUpdated: Long, + val allUpdatesTimeMs: Long, + val numRowsRemoved: Long, + val allRemovalsTimeMs: Long, + val commitTimeMs: Long, + val memoryUsedBytes: Long, + val numRowsDroppedByWatermark: Long, + val numShufflePartitions: Long, + val numStateStoreInstances: Long, + val customMetrics: ju.Map[String, JLong] = new ju.HashMap()) + extends Serializable { + + /** The compact JSON representation of this progress. */ + def json: String = compact(render(jsonValue)) + + /** The pretty (i.e. indented) JSON representation of this progress. */ + def prettyJson: String = pretty(render(jsonValue)) + + private[sql] def copy( + newNumRowsUpdated: Long, + newNumRowsDroppedByWatermark: Long): StateOperatorProgress = + new StateOperatorProgress( + operatorName = operatorName, + numRowsTotal = numRowsTotal, + numRowsUpdated = newNumRowsUpdated, + allUpdatesTimeMs = allUpdatesTimeMs, + numRowsRemoved = numRowsRemoved, + allRemovalsTimeMs = allRemovalsTimeMs, + commitTimeMs = commitTimeMs, + memoryUsedBytes = memoryUsedBytes, + numRowsDroppedByWatermark = newNumRowsDroppedByWatermark, + numShufflePartitions = numShufflePartitions, + numStateStoreInstances = numStateStoreInstances, + customMetrics = customMetrics) + + private[sql] def jsonValue: JValue = { + ("operatorName" -> JString(operatorName)) ~ + ("numRowsTotal" -> JInt(numRowsTotal)) ~ + ("numRowsUpdated" -> JInt(numRowsUpdated)) ~ + ("allUpdatesTimeMs" -> JInt(allUpdatesTimeMs)) ~ + ("numRowsRemoved" -> JInt(numRowsRemoved)) ~ + ("allRemovalsTimeMs" -> JInt(allRemovalsTimeMs)) ~ + ("commitTimeMs" -> JInt(commitTimeMs)) ~ + ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~ + ("numRowsDroppedByWatermark" -> JInt(numRowsDroppedByWatermark)) ~ + ("numShufflePartitions" -> JInt(numShufflePartitions)) ~ + ("numStateStoreInstances" -> JInt(numStateStoreInstances)) ~ + ("customMetrics" -> { + if (!customMetrics.isEmpty) { + val keys = customMetrics.keySet.asScala.toSeq.sorted + keys.map { k => k -> JInt(customMetrics.get(k).toLong): JObject }.reduce(_ ~ _) + } else { + JNothing + } + }) + } + + override def toString: String = prettyJson +} + +/** + * Information about progress made in the execution of a [[StreamingQuery]] during a trigger. Each + * event relates to processing done for a single trigger of the streaming query. Events are + * emitted even when no new data is available to be processed. + * + * @param id + * A unique query id that persists across restarts. See `StreamingQuery.id()`. + * @param runId + * A query id that is unique for every start/restart. See `StreamingQuery.runId()`. + * @param name + * User-specified name of the query, null if not specified. + * @param timestamp + * Beginning time of the trigger in ISO8601 format, i.e. UTC timestamps. + * @param batchId + * A unique id for the current batch of data being processed. Note that in the case of retries + * after a failure a given batchId my be executed more than once. Similarly, when there is no + * data to be processed, the batchId will not be incremented. + * @param batchDuration + * The process duration of each batch. + * @param durationMs + * The amount of time taken to perform various operations in milliseconds. + * @param eventTime + * Statistics of event time seen in this batch. It may contain the following keys: + * {{{ + * "max" -> "2016-12-05T20:54:20.827Z" // maximum event time seen in this trigger + * "min" -> "2016-12-05T20:54:20.827Z" // minimum event time seen in this trigger + * "avg" -> "2016-12-05T20:54:20.827Z" // average event time seen in this trigger + * "watermark" -> "2016-12-05T20:54:20.827Z" // watermark used in this trigger + * }}} + * All timestamps are in ISO8601 format, i.e. UTC timestamps. + * @param stateOperators + * Information about operators in the query that store state. + * @param sources + * detailed statistics on data being read from each of the streaming sources. + * @since 3.5.0 + */ +@Evolving +class StreamingQueryProgress private[spark] ( + val id: UUID, + val runId: UUID, + val name: String, + val timestamp: String, + val batchId: Long, + val batchDuration: Long, + val durationMs: ju.Map[String, JLong], + val eventTime: ju.Map[String, String], + val stateOperators: Array[StateOperatorProgress], + val sources: Array[SourceProgress], + val sink: SinkProgress, + @JsonDeserialize(contentAs = classOf[GenericRowWithSchema]) + val observedMetrics: ju.Map[String, Row]) + extends Serializable { + + /** The aggregate (across all sources) number of records processed in a trigger. */ + def numInputRows: Long = sources.map(_.numInputRows).sum + + /** The aggregate (across all sources) rate of data arriving. */ + def inputRowsPerSecond: Double = sources.map(_.inputRowsPerSecond).sum + + /** The aggregate (across all sources) rate at which Spark is processing data. */ + def processedRowsPerSecond: Double = sources.map(_.processedRowsPerSecond).sum + + /** The compact JSON representation of this progress. */ + def json: String = compact(render(jsonValue)) + + /** The pretty (i.e. indented) JSON representation of this progress. */ + def prettyJson: String = pretty(render(jsonValue)) + + override def toString: String = prettyJson + + private[sql] def jsonValue: JValue = { + ("id" -> JString(id.toString)) ~ + ("runId" -> JString(runId.toString)) ~ + ("name" -> JString(name)) ~ + ("timestamp" -> JString(timestamp)) ~ + ("batchId" -> JInt(batchId)) ~ + ("numInputRows" -> JInt(numInputRows)) ~ + ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~ + ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~ + ("durationMs" -> safeMapToJValue[JLong](durationMs, v => JInt(v.toLong))) ~ + ("eventTime" -> safeMapToJValue[String](eventTime, s => JString(s))) ~ + ("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~ + ("sources" -> JArray(sources.map(_.jsonValue).toList)) ~ + ("sink" -> sink.jsonValue) ~ + ("observedMetrics" -> safeMapToJValue[Row](observedMetrics, row => row.jsonValue)) + } +} + +private[spark] object StreamingQueryProgress { + private val mapper = { + val ret = new ObjectMapper() with ClassTagExtensions + ret.registerModule(DefaultScalaModule) + ret.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + ret + } + + private[spark] def jsonString(progress: StreamingQueryProgress): String = + mapper.writeValueAsString(progress) + + private[spark] def fromJson(json: String): StreamingQueryProgress = + mapper.readValue[StreamingQueryProgress](json) +} + +/** + * Information about progress made for a source in the execution of a [[StreamingQuery]] during a + * trigger. See [[StreamingQueryProgress]] for more information. + * + * @param description + * Description of the source. + * @param startOffset + * The starting offset for data being read. + * @param endOffset + * The ending offset for data being read. + * @param latestOffset + * The latest offset from this source. + * @param numInputRows + * The number of records read from this source. + * @param inputRowsPerSecond + * The rate at which data is arriving from this source. + * @param processedRowsPerSecond + * The rate at which data from this source is being processed by Spark. + * @since 3.5.0 + */ +@Evolving +class SourceProgress protected[spark] ( + val description: String, + val startOffset: String, + val endOffset: String, + val latestOffset: String, + val numInputRows: Long, + val inputRowsPerSecond: Double, + val processedRowsPerSecond: Double, + val metrics: ju.Map[String, String] = Map[String, String]().asJava) + extends Serializable { + + /** The compact JSON representation of this progress. */ + def json: String = compact(render(jsonValue)) + + /** The pretty (i.e. indented) JSON representation of this progress. */ + def prettyJson: String = pretty(render(jsonValue)) + + override def toString: String = prettyJson + + private[sql] def jsonValue: JValue = { + ("description" -> JString(description)) ~ + ("startOffset" -> tryParse(startOffset)) ~ + ("endOffset" -> tryParse(endOffset)) ~ + ("latestOffset" -> tryParse(latestOffset)) ~ + ("numInputRows" -> JInt(numInputRows)) ~ + ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~ + ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~ + ("metrics" -> safeMapToJValue[String](metrics, s => JString(s))) + } + + private def tryParse(json: String) = try { + parse(json) + } catch { + case NonFatal(e) => JString(json) + } +} + +/** + * Information about progress made for a sink in the execution of a [[StreamingQuery]] during a + * trigger. See [[StreamingQueryProgress]] for more information. + * + * @param description + * Description of the source corresponding to this status. + * @param numOutputRows + * Number of rows written to the sink or -1 for Continuous Mode (temporarily) or Sink V1 (until + * decommissioned). + * @since 3.5.0 + */ +@Evolving +class SinkProgress protected[spark] ( + val description: String, + val numOutputRows: Long, + val metrics: ju.Map[String, String] = Map[String, String]().asJava) + extends Serializable { + + /** SinkProgress without custom metrics. */ + protected[sql] def this(description: String) = { + this(description, DEFAULT_NUM_OUTPUT_ROWS) + } + + /** The compact JSON representation of this progress. */ + def json: String = compact(render(jsonValue)) + + /** The pretty (i.e. indented) JSON representation of this progress. */ + def prettyJson: String = pretty(render(jsonValue)) + + override def toString: String = prettyJson + + private[sql] def jsonValue: JValue = { + ("description" -> JString(description)) ~ + ("numOutputRows" -> JInt(numOutputRows)) ~ + ("metrics" -> safeMapToJValue[String](metrics, s => JString(s))) + } +} + +private[sql] object SinkProgress { + val DEFAULT_NUM_OUTPUT_ROWS: Long = -1L + + def apply( + description: String, + numOutputRows: Option[Long], + metrics: ju.Map[String, String] = Map[String, String]().asJava): SinkProgress = + new SinkProgress(description, numOutputRows.getOrElse(DEFAULT_NUM_OUTPUT_ROWS), metrics) +} + +private object SafeJsonSerializer { + def safeDoubleToJValue(value: Double): JValue = { + if (value.isNaN || value.isInfinity) JNothing else JDouble(value) + } + + /** Convert map to JValue while handling empty maps. Also, this sorts the keys. */ + def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): JValue = { + if (map.isEmpty) return JNothing + val keys = map.asScala.keySet.toSeq.sorted + keys.map { k => k -> valueToJValue(map.get(k)): JObject }.reduce(_ ~ _) + } } diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala index 05139e766a8..f53709b07f9 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala @@ -230,11 +230,6 @@ object CheckConnectJvmClientCompatibility { "org.apache.spark.sql.streaming.DataStreamWriter.SOURCE*" // These are constant vals. ), - // StreamingQuery - ProblemFilters.exclude[Problem]( - "org.apache.spark.sql.streaming.StreamingQueryProgress.*" // TODO(SPARK-43128) - ), - // SQLImplicits ProblemFilters.exclude[Problem]("org.apache.spark.sql.SQLImplicits.this"), ProblemFilters.exclude[Problem]("org.apache.spark.sql.SQLImplicits.rddToDatasetHolder"), diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala new file mode 100644 index 00000000000..a6a44c1bd71 --- /dev/null +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala @@ -0,0 +1,227 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.streaming + +import scala.jdk.CollectionConverters._ + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema +import org.apache.spark.sql.connect.client.util.ConnectFunSuite +import org.apache.spark.sql.types.StructType + +class StreamingQueryProgressSuite extends ConnectFunSuite { + test("test seder StreamingQueryProgress from json") { + val jsonStringFromServerSide = + s""" + |{ + | "id" : "aa624dba-d302-4429-b75b-7e82e44cfb11", + | "runId" : "5b8c40d3-4a7c-425c-b333-838f6b9c18bb", + | "name" : "myName", + | "timestamp" : "2016-12-05T20:54:20.827Z", + | "batchId" : 2, + | "batchDuration" : 0, + | "durationMs" : { + | "total" : 0 + | }, + | "eventTime" : { + | "min" : "2016-12-05T20:54:20.827Z", + | "avg" : "2016-12-05T20:54:20.827Z", + | "watermark" : "2016-12-05T20:54:20.827Z", + | "max" : "2016-12-05T20:54:20.827Z" + | }, + | "stateOperators" : [ { + | "operatorName" : "op1", + | "numRowsTotal" : 0, + | "numRowsUpdated" : 1, + | "allUpdatesTimeMs" : 1, + | "numRowsRemoved" : 2, + | "allRemovalsTimeMs" : 34, + | "commitTimeMs" : 23, + | "memoryUsedBytes" : 3, + | "numRowsDroppedByWatermark" : 0, + | "numShufflePartitions" : 2, + | "numStateStoreInstances" : 2, + | "customMetrics" : { + | "stateOnCurrentVersionSizeBytes" : 2, + | "loadedMapCacheHitCount" : 1, + | "loadedMapCacheMissCount" : 0 + | } + | } ], + | "sources" : [ { + | "description" : "source", + | "startOffset" : "123", + | "endOffset" : "456", + | "latestOffset" : "789", + | "numInputRows" : 678, + | "inputRowsPerSecond" : 10.0, + | "processedRowsPerSecond" : "Infinity", + | "metrics" : { } + | }, { + | "description" : "source", + | "startOffset" : "234", + | "endOffset" : "567", + | "latestOffset" : "890", + | "numInputRows" : 789, + | "inputRowsPerSecond" : 12.0, + | "processedRowsPerSecond" : "Infinity", + | "metrics" : { } + | } ], + | "sink" : { + | "description" : "sink", + | "numOutputRows" : -1, + | "metrics" : { } + | }, + | "observedMetrics" : { + | "event1" : { + | "values" : [ 1, 3.0 ], + | "schema" : { + | "type" : "struct", + | "fields" : [ { + | "name" : "c1", + | "type" : "long", + | "nullable" : true, + | "metadata" : { } + | }, { + | "name" : "c2", + | "type" : "double", + | "nullable" : true, + | "metadata" : { } + | } ] + | } + | }, + | "event2" : { + | "values" : [ 1, "hello", "world" ], + | "schema" : { + | "type" : "struct", + | "fields" : [ { + | "name" : "rc", + | "type" : "long", + | "nullable" : true, + | "metadata" : { } + | }, { + | "name" : "min_q", + | "type" : "string", + | "nullable" : true, + | "metadata" : { } + | }, { + | "name" : "max_q", + | "type" : "string", + | "nullable" : true, + | "metadata" : { } + | } ] + | } + | } + | } + |} + | + """.stripMargin.trim + + val result = StreamingQueryProgress.fromJson(jsonStringFromServerSide) + assert(result.id.toString === "aa624dba-d302-4429-b75b-7e82e44cfb11") + assert(result.runId.toString === "5b8c40d3-4a7c-425c-b333-838f6b9c18bb") + assert(result.numInputRows === 1467) // 678 + 789 + assert(result.stateOperators.head.operatorName === "op1") + assert(result.sources.head.startOffset === "123") + + // check observedMetrics + val schema1 = new StructType() + .add("c1", "long") + .add("c2", "double") + val schema2 = new StructType() + .add("rc", "long") + .add("min_q", "string") + .add("max_q", "string") + val observedMetrics = Map[String, Row]( + "event1" -> new GenericRowWithSchema(Array(1L, 3.0d), schema1), + "event2" -> new GenericRowWithSchema(Array(1L, "hello", "world"), schema2)).asJava + assert(result.observedMetrics.size() == 2) + assert(result.observedMetrics == observedMetrics) + + // check `.json` + val jsonString = + """ + |{ + | "id" : "aa624dba-d302-4429-b75b-7e82e44cfb11", + | "runId" : "5b8c40d3-4a7c-425c-b333-838f6b9c18bb", + | "name" : "myName", + | "timestamp" : "2016-12-05T20:54:20.827Z", + | "batchId" : 2, + | "numInputRows" : 1467, + | "inputRowsPerSecond" : 22.0, + | "durationMs" : { + | "total" : 0 + | }, + | "eventTime" : { + | "avg" : "2016-12-05T20:54:20.827Z", + | "max" : "2016-12-05T20:54:20.827Z", + | "min" : "2016-12-05T20:54:20.827Z", + | "watermark" : "2016-12-05T20:54:20.827Z" + | }, + | "stateOperators" : [ { + | "operatorName" : "op1", + | "numRowsTotal" : 0, + | "numRowsUpdated" : 1, + | "allUpdatesTimeMs" : 1, + | "numRowsRemoved" : 2, + | "allRemovalsTimeMs" : 34, + | "commitTimeMs" : 23, + | "memoryUsedBytes" : 3, + | "numRowsDroppedByWatermark" : 0, + | "numShufflePartitions" : 2, + | "numStateStoreInstances" : 2, + | "customMetrics" : { + | "loadedMapCacheHitCount" : 1, + | "loadedMapCacheMissCount" : 0, + | "stateOnCurrentVersionSizeBytes" : 2 + | } + | } ], + | "sources" : [ { + | "description" : "source", + | "startOffset" : 123, + | "endOffset" : 456, + | "latestOffset" : 789, + | "numInputRows" : 678, + | "inputRowsPerSecond" : 10.0 + | }, { + | "description" : "source", + | "startOffset" : 234, + | "endOffset" : 567, + | "latestOffset" : 890, + | "numInputRows" : 789, + | "inputRowsPerSecond" : 12.0 + | } ], + | "sink" : { + | "description" : "sink", + | "numOutputRows" : -1 + | }, + | "observedMetrics" : { + | "event1" : { + | "c1" : 1, + | "c2" : 3.0 + | }, + | "event2" : { + | "rc" : 1, + | "min_q" : "hello", + | "max_q" : "world" + | } + | } + |} + |""".stripMargin.trim + assert(result.prettyJson === jsonString) + } +} diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 4d7e21e57a5..571e718bc9d 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql.streaming import java.util.concurrent.TimeUnit +import scala.collection.JavaConverters._ + import org.scalatest.concurrent.Eventually.eventually import org.scalatest.concurrent.Futures.timeout import org.scalatest.time.SpanSugar._ @@ -70,6 +72,27 @@ class StreamingQuerySuite extends RemoteSparkSession with SQLHelper { assert(query.recentProgress.nonEmpty) // Query made progress. } + val lastProgress = query.lastProgress + assert(lastProgress != null) + assert(lastProgress.name == queryName) + assert(!lastProgress.durationMs.isEmpty) + assert(!lastProgress.eventTime.isEmpty) + assert(lastProgress.stateOperators.nonEmpty) + assert( + lastProgress.stateOperators.head.customMetrics.keySet().asScala == Set( + "loadedMapCacheHitCount", + "loadedMapCacheMissCount", + "stateOnCurrentVersionSizeBytes")) + assert(lastProgress.sources.nonEmpty) + assert(lastProgress.sink.description == "MemorySink") + assert(lastProgress.observedMetrics.isEmpty) + + query.recentProgress.foreach { p => + assert(p.id == lastProgress.id) + assert(p.runId == lastProgress.runId) + assert(p.name == lastProgress.name) + } + query.explain() // Prints the plan to console. // Consider verifying explain output by capturing stdout similar to // test("Dataset explain") in ClientE2ETestSuite. diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 09ae7799b5e..307cf1d054d 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -69,7 +69,7 @@ import org.apache.spark.sql.execution.stat.StatFunctions import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper import org.apache.spark.sql.expressions.ReduceAggregator import org.apache.spark.sql.internal.{CatalogImpl, TypedAggUtils} -import org.apache.spark.sql.streaming.{StreamingQuery, Trigger} +import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryProgress, Trigger} import org.apache.spark.sql.types._ import org.apache.spark.sql.util.CaseInsensitiveStringMap import org.apache.spark.storage.CacheId @@ -2474,7 +2474,8 @@ class SparkConnectPlanner(val session: SparkSession) { respBuilder.setRecentProgress( StreamingQueryCommandResult.RecentProgressResult .newBuilder() - .addAllRecentProgressJson(progressReports.map(_.json).asJava) + .addAllRecentProgressJson( + progressReports.map(StreamingQueryProgress.jsonString).asJava) .build()) case StreamingQueryCommand.CommandCase.STOP => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 1b755ed70c6..b3927699756 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -24,7 +24,9 @@ import java.util.UUID import scala.collection.JavaConverters._ import scala.util.control.NonFatal +import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper} import com.fasterxml.jackson.databind.annotation.JsonDeserialize +import com.fasterxml.jackson.module.scala.{ClassTagExtensions, DefaultScalaModule} import org.json4s._ import org.json4s.JsonAST.JValue import org.json4s.JsonDSL._ @@ -175,6 +177,21 @@ class StreamingQueryProgress private[spark]( } } +private[spark] object StreamingQueryProgress { + private[this] val mapper = { + val ret = new ObjectMapper() with ClassTagExtensions + ret.registerModule(DefaultScalaModule) + ret.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false) + ret + } + + private[spark] def jsonString(progress: StreamingQueryProgress): String = + mapper.writeValueAsString(progress) + + private[spark] def fromJson(json: String): StreamingQueryProgress = + mapper.readValue[StreamingQueryProgress](json) +} + /** * Information about progress made for a source in the execution of a [[StreamingQuery]] * during a trigger. See [[StreamingQueryProgress]] for more information. diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index d9bb7714d7a..d016b334627 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -171,6 +171,64 @@ class StreamingQueryStatusAndProgressSuite extends StreamTest with Eventually { assert(testProgress2.toString === testProgress2.prettyJson) } + test("StreamingQueryProgress - jsonString and fromJson") { + Seq(testProgress1, testProgress2).foreach { input => + val jsonString = StreamingQueryProgress.jsonString(input) + val result = StreamingQueryProgress.fromJson(jsonString) + assert(input.id == result.id) + assert(input.runId == result.runId) + assert(input.name == result.name) + assert(input.timestamp == result.timestamp) + assert(input.batchId == result.batchId) + assert(input.batchDuration == result.batchDuration) + assert(input.durationMs == result.durationMs) + assert(input.eventTime == result.eventTime) + + input.stateOperators.zip(result.stateOperators).foreach { case (o1, o2) => + assert(o1.operatorName == o2.operatorName) + assert(o1.numRowsTotal == o2.numRowsTotal) + assert(o1.numRowsUpdated == o2.numRowsUpdated) + assert(o1.allUpdatesTimeMs == o2.allUpdatesTimeMs) + assert(o1.numRowsRemoved == o2.numRowsRemoved) + assert(o1.allRemovalsTimeMs == o2.allRemovalsTimeMs) + assert(o1.commitTimeMs == o2.commitTimeMs) + assert(o1.memoryUsedBytes == o2.memoryUsedBytes) + assert(o1.numRowsDroppedByWatermark == o2.numRowsDroppedByWatermark) + assert(o1.numShufflePartitions == o2.numShufflePartitions) + assert(o1.numStateStoreInstances == o2.numStateStoreInstances) + assert(o1.customMetrics == o2.customMetrics) + } + + input.sources.zip(result.sources).foreach { case (s1, s2) => + assert(s1.description == s2.description) + assert(s1.startOffset == s2.startOffset) + assert(s1.endOffset == s2.endOffset) + assert(s1.latestOffset == s2.latestOffset) + assert(s1.numInputRows == s2.numInputRows) + if (s1.inputRowsPerSecond.isNaN) { + assert(s2.inputRowsPerSecond.isNaN) + } else { + assert(s1.inputRowsPerSecond == s2.inputRowsPerSecond) + } + assert(s1.processedRowsPerSecond == s2.processedRowsPerSecond) + assert(s1.metrics == s2.metrics) + } + + Seq(input.sink).zip(Seq(result.sink)).foreach { case (s1, s2) => + assert(s1.description == s2.description) + assert(s1.numOutputRows == s2.numOutputRows) + assert(s1.metrics == s2.metrics) + } + + val resultObservedMetrics = result.observedMetrics + assert(input.observedMetrics.size() == resultObservedMetrics.size()) + assert(input.observedMetrics.keySet() == resultObservedMetrics.keySet()) + input.observedMetrics.entrySet().forEach { e => + assert(e.getValue == resultObservedMetrics.get(e.getKey)) + } + } + } + test("StreamingQueryStatus - prettyJson") { val json = testStatus.prettyJson assertJson( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org