This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d44e073f0cd [SPARK-43128][CONNECT][SS] Make `recentProgress` and 
`lastProgress` return `StreamingQueryProgress` consistent with the native Scala 
Api
d44e073f0cd is described below

commit d44e073f0cdaf16028a4854e79db200a4e39a6fe
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Wed May 17 16:27:03 2023 +0900

    [SPARK-43128][CONNECT][SS] Make `recentProgress` and `lastProgress` return 
`StreamingQueryProgress` consistent with the native Scala Api
    
    ### What changes were proposed in this pull request?
    This pr  add support to make `recentProgress` and `lastProgress` in 
`RemoteStreamingQuery` return `StreamingQueryProgress` instance consistent with 
the native Scala Api.
    
    ### Why are the changes needed?
    Add Spark connect jvm client api coverage.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Add new check to `StreamingQuerySuite`
    
    Closes #40892 from LuciferYang/SPARK-43128.
    
    Lead-authored-by: yangjie01 <yangji...@baidu.com>
    Co-authored-by: YangJie <yangji...@baidu.com>
    Co-authored-by: Wenchen Fan <wenc...@databricks.com>
    Co-authored-by: Ruifeng Zheng <ruife...@apache.org>
    Co-authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../spark/sql/streaming/StreamingQuery.scala       |   4 +-
 .../org/apache/spark/sql/streaming/progress.scala  | 310 ++++++++++++++++++++-
 .../CheckConnectJvmClientCompatibility.scala       |   5 -
 .../streaming/StreamingQueryProgressSuite.scala    | 227 +++++++++++++++
 .../spark/sql/streaming/StreamingQuerySuite.scala  |  23 ++
 .../sql/connect/planner/SparkConnectPlanner.scala  |   5 +-
 .../org/apache/spark/sql/streaming/progress.scala  |  17 ++
 .../StreamingQueryStatusAndProgressSuite.scala     |  58 ++++
 8 files changed, 638 insertions(+), 11 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
index b57e6239b77..ceb096b9aff 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/StreamingQuery.scala
@@ -205,14 +205,14 @@ class RemoteStreamingQuery(
 
   override def recentProgress: Array[StreamingQueryProgress] = {
     
executeQueryCmd(_.setRecentProgress(true)).getRecentProgress.getRecentProgressJsonList.asScala
-      .map(json => new StreamingQueryProgress(json))
+      .map(StreamingQueryProgress.fromJson)
       .toArray
   }
 
   override def lastProgress: StreamingQueryProgress = {
     executeQueryCmd(
       
_.setLastProgress(true)).getRecentProgress.getRecentProgressJsonList.asScala.headOption
-      .map(json => new StreamingQueryProgress(json))
+      .map(StreamingQueryProgress.fromJson)
       .orNull
   }
 
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index 974bcd64b29..593311efb9c 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -17,6 +17,312 @@
 
 package org.apache.spark.sql.streaming
 
-class StreamingQueryProgress private[sql] (val json: String) {
-  // TODO(SPARK-43128): (Implement full object by parsing from json).
+import java.{util => ju}
+import java.lang.{Long => JLong}
+import java.util.UUID
+
+import scala.collection.JavaConverters._
+import scala.util.control.NonFatal
+
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize
+import com.fasterxml.jackson.module.scala.{ClassTagExtensions, 
DefaultScalaModule}
+import org.json4s._
+import org.json4s.JsonAST.JValue
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.annotation.Evolving
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.streaming.SafeJsonSerializer.{safeDoubleToJValue, 
safeMapToJValue}
+import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS
+
+/**
+ * Information about updates made to stateful operators in a 
[[StreamingQuery]] during a trigger.
+ */
+@Evolving
+class StateOperatorProgress private[spark] (
+    val operatorName: String,
+    val numRowsTotal: Long,
+    val numRowsUpdated: Long,
+    val allUpdatesTimeMs: Long,
+    val numRowsRemoved: Long,
+    val allRemovalsTimeMs: Long,
+    val commitTimeMs: Long,
+    val memoryUsedBytes: Long,
+    val numRowsDroppedByWatermark: Long,
+    val numShufflePartitions: Long,
+    val numStateStoreInstances: Long,
+    val customMetrics: ju.Map[String, JLong] = new ju.HashMap())
+    extends Serializable {
+
+  /** The compact JSON representation of this progress. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this progress. */
+  def prettyJson: String = pretty(render(jsonValue))
+
+  private[sql] def copy(
+      newNumRowsUpdated: Long,
+      newNumRowsDroppedByWatermark: Long): StateOperatorProgress =
+    new StateOperatorProgress(
+      operatorName = operatorName,
+      numRowsTotal = numRowsTotal,
+      numRowsUpdated = newNumRowsUpdated,
+      allUpdatesTimeMs = allUpdatesTimeMs,
+      numRowsRemoved = numRowsRemoved,
+      allRemovalsTimeMs = allRemovalsTimeMs,
+      commitTimeMs = commitTimeMs,
+      memoryUsedBytes = memoryUsedBytes,
+      numRowsDroppedByWatermark = newNumRowsDroppedByWatermark,
+      numShufflePartitions = numShufflePartitions,
+      numStateStoreInstances = numStateStoreInstances,
+      customMetrics = customMetrics)
+
+  private[sql] def jsonValue: JValue = {
+    ("operatorName" -> JString(operatorName)) ~
+      ("numRowsTotal" -> JInt(numRowsTotal)) ~
+      ("numRowsUpdated" -> JInt(numRowsUpdated)) ~
+      ("allUpdatesTimeMs" -> JInt(allUpdatesTimeMs)) ~
+      ("numRowsRemoved" -> JInt(numRowsRemoved)) ~
+      ("allRemovalsTimeMs" -> JInt(allRemovalsTimeMs)) ~
+      ("commitTimeMs" -> JInt(commitTimeMs)) ~
+      ("memoryUsedBytes" -> JInt(memoryUsedBytes)) ~
+      ("numRowsDroppedByWatermark" -> JInt(numRowsDroppedByWatermark)) ~
+      ("numShufflePartitions" -> JInt(numShufflePartitions)) ~
+      ("numStateStoreInstances" -> JInt(numStateStoreInstances)) ~
+      ("customMetrics" -> {
+        if (!customMetrics.isEmpty) {
+          val keys = customMetrics.keySet.asScala.toSeq.sorted
+          keys.map { k => k -> JInt(customMetrics.get(k).toLong): JObject 
}.reduce(_ ~ _)
+        } else {
+          JNothing
+        }
+      })
+  }
+
+  override def toString: String = prettyJson
+}
+
+/**
+ * Information about progress made in the execution of a [[StreamingQuery]] 
during a trigger. Each
+ * event relates to processing done for a single trigger of the streaming 
query. Events are
+ * emitted even when no new data is available to be processed.
+ *
+ * @param id
+ *   A unique query id that persists across restarts. See 
`StreamingQuery.id()`.
+ * @param runId
+ *   A query id that is unique for every start/restart. See 
`StreamingQuery.runId()`.
+ * @param name
+ *   User-specified name of the query, null if not specified.
+ * @param timestamp
+ *   Beginning time of the trigger in ISO8601 format, i.e. UTC timestamps.
+ * @param batchId
+ *   A unique id for the current batch of data being processed. Note that in 
the case of retries
+ *   after a failure a given batchId my be executed more than once. Similarly, 
when there is no
+ *   data to be processed, the batchId will not be incremented.
+ * @param batchDuration
+ *   The process duration of each batch.
+ * @param durationMs
+ *   The amount of time taken to perform various operations in milliseconds.
+ * @param eventTime
+ *   Statistics of event time seen in this batch. It may contain the following 
keys:
+ *   {{{
+ *                   "max" -> "2016-12-05T20:54:20.827Z"  // maximum event 
time seen in this trigger
+ *                   "min" -> "2016-12-05T20:54:20.827Z"  // minimum event 
time seen in this trigger
+ *                   "avg" -> "2016-12-05T20:54:20.827Z"  // average event 
time seen in this trigger
+ *                   "watermark" -> "2016-12-05T20:54:20.827Z"  // watermark 
used in this trigger
+ *   }}}
+ *   All timestamps are in ISO8601 format, i.e. UTC timestamps.
+ * @param stateOperators
+ *   Information about operators in the query that store state.
+ * @param sources
+ *   detailed statistics on data being read from each of the streaming sources.
+ * @since 3.5.0
+ */
+@Evolving
+class StreamingQueryProgress private[spark] (
+    val id: UUID,
+    val runId: UUID,
+    val name: String,
+    val timestamp: String,
+    val batchId: Long,
+    val batchDuration: Long,
+    val durationMs: ju.Map[String, JLong],
+    val eventTime: ju.Map[String, String],
+    val stateOperators: Array[StateOperatorProgress],
+    val sources: Array[SourceProgress],
+    val sink: SinkProgress,
+    @JsonDeserialize(contentAs = classOf[GenericRowWithSchema])
+    val observedMetrics: ju.Map[String, Row])
+    extends Serializable {
+
+  /** The aggregate (across all sources) number of records processed in a 
trigger. */
+  def numInputRows: Long = sources.map(_.numInputRows).sum
+
+  /** The aggregate (across all sources) rate of data arriving. */
+  def inputRowsPerSecond: Double = sources.map(_.inputRowsPerSecond).sum
+
+  /** The aggregate (across all sources) rate at which Spark is processing 
data. */
+  def processedRowsPerSecond: Double = 
sources.map(_.processedRowsPerSecond).sum
+
+  /** The compact JSON representation of this progress. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this progress. */
+  def prettyJson: String = pretty(render(jsonValue))
+
+  override def toString: String = prettyJson
+
+  private[sql] def jsonValue: JValue = {
+    ("id" -> JString(id.toString)) ~
+      ("runId" -> JString(runId.toString)) ~
+      ("name" -> JString(name)) ~
+      ("timestamp" -> JString(timestamp)) ~
+      ("batchId" -> JInt(batchId)) ~
+      ("numInputRows" -> JInt(numInputRows)) ~
+      ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
+      ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) 
~
+      ("durationMs" -> safeMapToJValue[JLong](durationMs, v => 
JInt(v.toLong))) ~
+      ("eventTime" -> safeMapToJValue[String](eventTime, s => JString(s))) ~
+      ("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~
+      ("sources" -> JArray(sources.map(_.jsonValue).toList)) ~
+      ("sink" -> sink.jsonValue) ~
+      ("observedMetrics" -> safeMapToJValue[Row](observedMetrics, row => 
row.jsonValue))
+  }
+}
+
+private[spark] object StreamingQueryProgress {
+  private val mapper = {
+    val ret = new ObjectMapper() with ClassTagExtensions
+    ret.registerModule(DefaultScalaModule)
+    ret.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    ret
+  }
+
+  private[spark] def jsonString(progress: StreamingQueryProgress): String =
+    mapper.writeValueAsString(progress)
+
+  private[spark] def fromJson(json: String): StreamingQueryProgress =
+    mapper.readValue[StreamingQueryProgress](json)
+}
+
+/**
+ * Information about progress made for a source in the execution of a 
[[StreamingQuery]] during a
+ * trigger. See [[StreamingQueryProgress]] for more information.
+ *
+ * @param description
+ *   Description of the source.
+ * @param startOffset
+ *   The starting offset for data being read.
+ * @param endOffset
+ *   The ending offset for data being read.
+ * @param latestOffset
+ *   The latest offset from this source.
+ * @param numInputRows
+ *   The number of records read from this source.
+ * @param inputRowsPerSecond
+ *   The rate at which data is arriving from this source.
+ * @param processedRowsPerSecond
+ *   The rate at which data from this source is being processed by Spark.
+ * @since 3.5.0
+ */
+@Evolving
+class SourceProgress protected[spark] (
+    val description: String,
+    val startOffset: String,
+    val endOffset: String,
+    val latestOffset: String,
+    val numInputRows: Long,
+    val inputRowsPerSecond: Double,
+    val processedRowsPerSecond: Double,
+    val metrics: ju.Map[String, String] = Map[String, String]().asJava)
+    extends Serializable {
+
+  /** The compact JSON representation of this progress. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this progress. */
+  def prettyJson: String = pretty(render(jsonValue))
+
+  override def toString: String = prettyJson
+
+  private[sql] def jsonValue: JValue = {
+    ("description" -> JString(description)) ~
+      ("startOffset" -> tryParse(startOffset)) ~
+      ("endOffset" -> tryParse(endOffset)) ~
+      ("latestOffset" -> tryParse(latestOffset)) ~
+      ("numInputRows" -> JInt(numInputRows)) ~
+      ("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
+      ("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) 
~
+      ("metrics" -> safeMapToJValue[String](metrics, s => JString(s)))
+  }
+
+  private def tryParse(json: String) = try {
+    parse(json)
+  } catch {
+    case NonFatal(e) => JString(json)
+  }
+}
+
+/**
+ * Information about progress made for a sink in the execution of a 
[[StreamingQuery]] during a
+ * trigger. See [[StreamingQueryProgress]] for more information.
+ *
+ * @param description
+ *   Description of the source corresponding to this status.
+ * @param numOutputRows
+ *   Number of rows written to the sink or -1 for Continuous Mode 
(temporarily) or Sink V1 (until
+ *   decommissioned).
+ * @since 3.5.0
+ */
+@Evolving
+class SinkProgress protected[spark] (
+    val description: String,
+    val numOutputRows: Long,
+    val metrics: ju.Map[String, String] = Map[String, String]().asJava)
+    extends Serializable {
+
+  /** SinkProgress without custom metrics. */
+  protected[sql] def this(description: String) = {
+    this(description, DEFAULT_NUM_OUTPUT_ROWS)
+  }
+
+  /** The compact JSON representation of this progress. */
+  def json: String = compact(render(jsonValue))
+
+  /** The pretty (i.e. indented) JSON representation of this progress. */
+  def prettyJson: String = pretty(render(jsonValue))
+
+  override def toString: String = prettyJson
+
+  private[sql] def jsonValue: JValue = {
+    ("description" -> JString(description)) ~
+      ("numOutputRows" -> JInt(numOutputRows)) ~
+      ("metrics" -> safeMapToJValue[String](metrics, s => JString(s)))
+  }
+}
+
+private[sql] object SinkProgress {
+  val DEFAULT_NUM_OUTPUT_ROWS: Long = -1L
+
+  def apply(
+      description: String,
+      numOutputRows: Option[Long],
+      metrics: ju.Map[String, String] = Map[String, String]().asJava): 
SinkProgress =
+    new SinkProgress(description, 
numOutputRows.getOrElse(DEFAULT_NUM_OUTPUT_ROWS), metrics)
+}
+
+private object SafeJsonSerializer {
+  def safeDoubleToJValue(value: Double): JValue = {
+    if (value.isNaN || value.isInfinity) JNothing else JDouble(value)
+  }
+
+  /** Convert map to JValue while handling empty maps. Also, this sorts the 
keys. */
+  def safeMapToJValue[T](map: ju.Map[String, T], valueToJValue: T => JValue): 
JValue = {
+    if (map.isEmpty) return JNothing
+    val keys = map.asScala.keySet.toSeq.sorted
+    keys.map { k => k -> valueToJValue(map.get(k)): JObject }.reduce(_ ~ _)
+  }
 }
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index 05139e766a8..f53709b07f9 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -230,11 +230,6 @@ object CheckConnectJvmClientCompatibility {
         "org.apache.spark.sql.streaming.DataStreamWriter.SOURCE*" // These are 
constant vals.
       ),
 
-      // StreamingQuery
-      ProblemFilters.exclude[Problem](
-        "org.apache.spark.sql.streaming.StreamingQueryProgress.*" // 
TODO(SPARK-43128)
-      ),
-
       // SQLImplicits
       
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SQLImplicits.this"),
       
ProblemFilters.exclude[Problem]("org.apache.spark.sql.SQLImplicits.rddToDatasetHolder"),
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala
new file mode 100644
index 00000000000..a6a44c1bd71
--- /dev/null
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryProgressSuite.scala
@@ -0,0 +1,227 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.streaming
+
+import scala.jdk.CollectionConverters._
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.connect.client.util.ConnectFunSuite
+import org.apache.spark.sql.types.StructType
+
+class StreamingQueryProgressSuite extends ConnectFunSuite {
+  test("test seder StreamingQueryProgress from json") {
+    val jsonStringFromServerSide =
+      s"""
+         |{
+         |  "id" : "aa624dba-d302-4429-b75b-7e82e44cfb11",
+         |  "runId" : "5b8c40d3-4a7c-425c-b333-838f6b9c18bb",
+         |  "name" : "myName",
+         |  "timestamp" : "2016-12-05T20:54:20.827Z",
+         |  "batchId" : 2,
+         |  "batchDuration" : 0,
+         |  "durationMs" : {
+         |    "total" : 0
+         |  },
+         |  "eventTime" : {
+         |    "min" : "2016-12-05T20:54:20.827Z",
+         |    "avg" : "2016-12-05T20:54:20.827Z",
+         |    "watermark" : "2016-12-05T20:54:20.827Z",
+         |    "max" : "2016-12-05T20:54:20.827Z"
+         |  },
+         |  "stateOperators" : [ {
+         |    "operatorName" : "op1",
+         |    "numRowsTotal" : 0,
+         |    "numRowsUpdated" : 1,
+         |    "allUpdatesTimeMs" : 1,
+         |    "numRowsRemoved" : 2,
+         |    "allRemovalsTimeMs" : 34,
+         |    "commitTimeMs" : 23,
+         |    "memoryUsedBytes" : 3,
+         |    "numRowsDroppedByWatermark" : 0,
+         |    "numShufflePartitions" : 2,
+         |    "numStateStoreInstances" : 2,
+         |    "customMetrics" : {
+         |      "stateOnCurrentVersionSizeBytes" : 2,
+         |      "loadedMapCacheHitCount" : 1,
+         |      "loadedMapCacheMissCount" : 0
+         |    }
+         |  } ],
+         |  "sources" : [ {
+         |    "description" : "source",
+         |    "startOffset" : "123",
+         |    "endOffset" : "456",
+         |    "latestOffset" : "789",
+         |    "numInputRows" : 678,
+         |    "inputRowsPerSecond" : 10.0,
+         |    "processedRowsPerSecond" : "Infinity",
+         |    "metrics" : { }
+         |  }, {
+         |    "description" : "source",
+         |    "startOffset" : "234",
+         |    "endOffset" : "567",
+         |    "latestOffset" : "890",
+         |    "numInputRows" : 789,
+         |    "inputRowsPerSecond" : 12.0,
+         |    "processedRowsPerSecond" : "Infinity",
+         |    "metrics" : { }
+         |  } ],
+         |  "sink" : {
+         |    "description" : "sink",
+         |    "numOutputRows" : -1,
+         |    "metrics" : { }
+         |  },
+         |  "observedMetrics" : {
+         |    "event1" : {
+         |      "values" : [ 1, 3.0 ],
+         |      "schema" : {
+         |        "type" : "struct",
+         |        "fields" : [ {
+         |          "name" : "c1",
+         |          "type" : "long",
+         |          "nullable" : true,
+         |          "metadata" : { }
+         |        }, {
+         |          "name" : "c2",
+         |          "type" : "double",
+         |          "nullable" : true,
+         |          "metadata" : { }
+         |        } ]
+         |      }
+         |    },
+         |    "event2" : {
+         |      "values" : [ 1, "hello", "world" ],
+         |      "schema" : {
+         |        "type" : "struct",
+         |        "fields" : [ {
+         |          "name" : "rc",
+         |          "type" : "long",
+         |          "nullable" : true,
+         |          "metadata" : { }
+         |        }, {
+         |          "name" : "min_q",
+         |          "type" : "string",
+         |          "nullable" : true,
+         |          "metadata" : { }
+         |        }, {
+         |          "name" : "max_q",
+         |          "type" : "string",
+         |          "nullable" : true,
+         |          "metadata" : { }
+         |        } ]
+         |      }
+         |    }
+         |  }
+         |}
+         |
+      """.stripMargin.trim
+
+    val result = StreamingQueryProgress.fromJson(jsonStringFromServerSide)
+    assert(result.id.toString === "aa624dba-d302-4429-b75b-7e82e44cfb11")
+    assert(result.runId.toString === "5b8c40d3-4a7c-425c-b333-838f6b9c18bb")
+    assert(result.numInputRows === 1467) // 678 + 789
+    assert(result.stateOperators.head.operatorName === "op1")
+    assert(result.sources.head.startOffset === "123")
+
+    // check observedMetrics
+    val schema1 = new StructType()
+      .add("c1", "long")
+      .add("c2", "double")
+    val schema2 = new StructType()
+      .add("rc", "long")
+      .add("min_q", "string")
+      .add("max_q", "string")
+    val observedMetrics = Map[String, Row](
+      "event1" -> new GenericRowWithSchema(Array(1L, 3.0d), schema1),
+      "event2" -> new GenericRowWithSchema(Array(1L, "hello", "world"), 
schema2)).asJava
+    assert(result.observedMetrics.size() == 2)
+    assert(result.observedMetrics == observedMetrics)
+
+    // check `.json`
+    val jsonString =
+      """
+        |{
+        |  "id" : "aa624dba-d302-4429-b75b-7e82e44cfb11",
+        |  "runId" : "5b8c40d3-4a7c-425c-b333-838f6b9c18bb",
+        |  "name" : "myName",
+        |  "timestamp" : "2016-12-05T20:54:20.827Z",
+        |  "batchId" : 2,
+        |  "numInputRows" : 1467,
+        |  "inputRowsPerSecond" : 22.0,
+        |  "durationMs" : {
+        |    "total" : 0
+        |  },
+        |  "eventTime" : {
+        |    "avg" : "2016-12-05T20:54:20.827Z",
+        |    "max" : "2016-12-05T20:54:20.827Z",
+        |    "min" : "2016-12-05T20:54:20.827Z",
+        |    "watermark" : "2016-12-05T20:54:20.827Z"
+        |  },
+        |  "stateOperators" : [ {
+        |    "operatorName" : "op1",
+        |    "numRowsTotal" : 0,
+        |    "numRowsUpdated" : 1,
+        |    "allUpdatesTimeMs" : 1,
+        |    "numRowsRemoved" : 2,
+        |    "allRemovalsTimeMs" : 34,
+        |    "commitTimeMs" : 23,
+        |    "memoryUsedBytes" : 3,
+        |    "numRowsDroppedByWatermark" : 0,
+        |    "numShufflePartitions" : 2,
+        |    "numStateStoreInstances" : 2,
+        |    "customMetrics" : {
+        |      "loadedMapCacheHitCount" : 1,
+        |      "loadedMapCacheMissCount" : 0,
+        |      "stateOnCurrentVersionSizeBytes" : 2
+        |    }
+        |  } ],
+        |  "sources" : [ {
+        |    "description" : "source",
+        |    "startOffset" : 123,
+        |    "endOffset" : 456,
+        |    "latestOffset" : 789,
+        |    "numInputRows" : 678,
+        |    "inputRowsPerSecond" : 10.0
+        |  }, {
+        |    "description" : "source",
+        |    "startOffset" : 234,
+        |    "endOffset" : 567,
+        |    "latestOffset" : 890,
+        |    "numInputRows" : 789,
+        |    "inputRowsPerSecond" : 12.0
+        |  } ],
+        |  "sink" : {
+        |    "description" : "sink",
+        |    "numOutputRows" : -1
+        |  },
+        |  "observedMetrics" : {
+        |    "event1" : {
+        |      "c1" : 1,
+        |      "c2" : 3.0
+        |    },
+        |    "event2" : {
+        |      "rc" : 1,
+        |      "min_q" : "hello",
+        |      "max_q" : "world"
+        |    }
+        |  }
+        |}
+        |""".stripMargin.trim
+    assert(result.prettyJson === jsonString)
+  }
+}
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 4d7e21e57a5..571e718bc9d 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -19,6 +19,8 @@ package org.apache.spark.sql.streaming
 
 import java.util.concurrent.TimeUnit
 
+import scala.collection.JavaConverters._
+
 import org.scalatest.concurrent.Eventually.eventually
 import org.scalatest.concurrent.Futures.timeout
 import org.scalatest.time.SpanSugar._
@@ -70,6 +72,27 @@ class StreamingQuerySuite extends RemoteSparkSession with 
SQLHelper {
           assert(query.recentProgress.nonEmpty) // Query made progress.
         }
 
+        val lastProgress = query.lastProgress
+        assert(lastProgress != null)
+        assert(lastProgress.name == queryName)
+        assert(!lastProgress.durationMs.isEmpty)
+        assert(!lastProgress.eventTime.isEmpty)
+        assert(lastProgress.stateOperators.nonEmpty)
+        assert(
+          lastProgress.stateOperators.head.customMetrics.keySet().asScala == 
Set(
+            "loadedMapCacheHitCount",
+            "loadedMapCacheMissCount",
+            "stateOnCurrentVersionSizeBytes"))
+        assert(lastProgress.sources.nonEmpty)
+        assert(lastProgress.sink.description == "MemorySink")
+        assert(lastProgress.observedMetrics.isEmpty)
+
+        query.recentProgress.foreach { p =>
+          assert(p.id == lastProgress.id)
+          assert(p.runId == lastProgress.runId)
+          assert(p.name == lastProgress.name)
+        }
+
         query.explain() // Prints the plan to console.
         // Consider verifying explain output by capturing stdout similar to
         // test("Dataset explain") in ClientE2ETestSuite.
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 09ae7799b5e..307cf1d054d 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -69,7 +69,7 @@ import org.apache.spark.sql.execution.stat.StatFunctions
 import org.apache.spark.sql.execution.streaming.StreamingQueryWrapper
 import org.apache.spark.sql.expressions.ReduceAggregator
 import org.apache.spark.sql.internal.{CatalogImpl, TypedAggUtils}
-import org.apache.spark.sql.streaming.{StreamingQuery, Trigger}
+import org.apache.spark.sql.streaming.{StreamingQuery, StreamingQueryProgress, 
Trigger}
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.storage.CacheId
@@ -2474,7 +2474,8 @@ class SparkConnectPlanner(val session: SparkSession) {
         respBuilder.setRecentProgress(
           StreamingQueryCommandResult.RecentProgressResult
             .newBuilder()
-            .addAllRecentProgressJson(progressReports.map(_.json).asJava)
+            .addAllRecentProgressJson(
+              progressReports.map(StreamingQueryProgress.jsonString).asJava)
             .build())
 
       case StreamingQueryCommand.CommandCase.STOP =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index 1b755ed70c6..b3927699756 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -24,7 +24,9 @@ import java.util.UUID
 import scala.collection.JavaConverters._
 import scala.util.control.NonFatal
 
+import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
 import com.fasterxml.jackson.databind.annotation.JsonDeserialize
+import com.fasterxml.jackson.module.scala.{ClassTagExtensions, 
DefaultScalaModule}
 import org.json4s._
 import org.json4s.JsonAST.JValue
 import org.json4s.JsonDSL._
@@ -175,6 +177,21 @@ class StreamingQueryProgress private[spark](
   }
 }
 
+private[spark] object StreamingQueryProgress {
+  private[this] val mapper = {
+    val ret = new ObjectMapper() with ClassTagExtensions
+    ret.registerModule(DefaultScalaModule)
+    ret.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
+    ret
+  }
+
+  private[spark] def jsonString(progress: StreamingQueryProgress): String =
+    mapper.writeValueAsString(progress)
+
+  private[spark] def fromJson(json: String): StreamingQueryProgress =
+    mapper.readValue[StreamingQueryProgress](json)
+}
+
 /**
  * Information about progress made for a source in the execution of a 
[[StreamingQuery]]
  * during a trigger. See [[StreamingQueryProgress]] for more information.
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index d9bb7714d7a..d016b334627 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -171,6 +171,64 @@ class StreamingQueryStatusAndProgressSuite extends 
StreamTest with Eventually {
     assert(testProgress2.toString === testProgress2.prettyJson)
   }
 
+  test("StreamingQueryProgress - jsonString and fromJson") {
+    Seq(testProgress1, testProgress2).foreach { input =>
+      val jsonString = StreamingQueryProgress.jsonString(input)
+      val result = StreamingQueryProgress.fromJson(jsonString)
+      assert(input.id == result.id)
+      assert(input.runId == result.runId)
+      assert(input.name == result.name)
+      assert(input.timestamp == result.timestamp)
+      assert(input.batchId == result.batchId)
+      assert(input.batchDuration == result.batchDuration)
+      assert(input.durationMs == result.durationMs)
+      assert(input.eventTime == result.eventTime)
+
+      input.stateOperators.zip(result.stateOperators).foreach { case (o1, o2) 
=>
+        assert(o1.operatorName == o2.operatorName)
+        assert(o1.numRowsTotal == o2.numRowsTotal)
+        assert(o1.numRowsUpdated == o2.numRowsUpdated)
+        assert(o1.allUpdatesTimeMs == o2.allUpdatesTimeMs)
+        assert(o1.numRowsRemoved == o2.numRowsRemoved)
+        assert(o1.allRemovalsTimeMs == o2.allRemovalsTimeMs)
+        assert(o1.commitTimeMs == o2.commitTimeMs)
+        assert(o1.memoryUsedBytes == o2.memoryUsedBytes)
+        assert(o1.numRowsDroppedByWatermark == o2.numRowsDroppedByWatermark)
+        assert(o1.numShufflePartitions == o2.numShufflePartitions)
+        assert(o1.numStateStoreInstances == o2.numStateStoreInstances)
+        assert(o1.customMetrics == o2.customMetrics)
+      }
+
+      input.sources.zip(result.sources).foreach { case (s1, s2) =>
+        assert(s1.description == s2.description)
+        assert(s1.startOffset == s2.startOffset)
+        assert(s1.endOffset == s2.endOffset)
+        assert(s1.latestOffset == s2.latestOffset)
+        assert(s1.numInputRows == s2.numInputRows)
+        if (s1.inputRowsPerSecond.isNaN) {
+          assert(s2.inputRowsPerSecond.isNaN)
+        } else {
+          assert(s1.inputRowsPerSecond == s2.inputRowsPerSecond)
+        }
+        assert(s1.processedRowsPerSecond == s2.processedRowsPerSecond)
+        assert(s1.metrics == s2.metrics)
+      }
+
+      Seq(input.sink).zip(Seq(result.sink)).foreach { case (s1, s2) =>
+        assert(s1.description == s2.description)
+        assert(s1.numOutputRows == s2.numOutputRows)
+        assert(s1.metrics == s2.metrics)
+      }
+
+      val resultObservedMetrics = result.observedMetrics
+      assert(input.observedMetrics.size() == resultObservedMetrics.size())
+      assert(input.observedMetrics.keySet() == resultObservedMetrics.keySet())
+      input.observedMetrics.entrySet().forEach { e =>
+        assert(e.getValue == resultObservedMetrics.get(e.getKey))
+      }
+    }
+  }
+
   test("StreamingQueryStatus - prettyJson") {
     val json = testStatus.prettyJson
     assertJson(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to