This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 915e9c67a95 [SPARK-41677][CORE][SQL][SS][UI] Add Protobuf serializer 
for StreamingQueryProgressWrapper
915e9c67a95 is described below

commit 915e9c67a9581a1f66e70321879092d854c9fb3b
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Wed Jan 4 14:03:58 2023 -0800

    [SPARK-41677][CORE][SQL][SS][UI] Add Protobuf serializer for 
StreamingQueryProgressWrapper
    
    ### What changes were proposed in this pull request?
    Add Protobuf serializer for `StreamingQueryProgressWrapper `
    
    ### Why are the changes needed?
    Support fast and compact serialization/deserialization for 
`StreamingQueryProgressWrapper ` over RocksDB.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Add new UT
    
    Closes #39357 from LuciferYang/SPARK-41677.
    
    Authored-by: yangjie01 <yangji...@baidu.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../apache/spark/status/protobuf/store_types.proto |  51 +++++++
 .../org.apache.spark.status.protobuf.ProtobufSerDe |   1 +
 .../org/apache/spark/sql/streaming/progress.scala  |   8 +-
 .../ui/StreamingQueryStatusListener.scala          |   2 +-
 .../protobuf/sql/SinkProgressSerializer.scala      |  42 +++++
 .../protobuf/sql/SourceProgressSerializer.scala    |  65 ++++++++
 .../sql/StateOperatorProgressSerializer.scala      |  75 +++++++++
 .../sql/StreamingQueryProgressSerializer.scala     |  89 +++++++++++
 .../StreamingQueryProgressWrapperSerializer.scala  |  40 +++++
 .../sql/KVStoreProtobufSerializerSuite.scala       | 170 ++++++++++++++++++++-
 10 files changed, 537 insertions(+), 6 deletions(-)

diff --git 
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto 
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index 6ba1915dfa1..2a45b5da1d8 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -685,3 +685,54 @@ message ExecutorPeakMetricsDistributions {
   repeated double quantiles = 1;
   repeated ExecutorMetrics executor_metrics = 2;
 }
+
+message StateOperatorProgress {
+  string operator_name = 1;
+  int64 num_rows_total = 2;
+  int64 num_rows_updated = 3;
+  int64 all_updates_time_ms = 4;
+  int64 num_rows_removed = 5;
+  int64 all_removals_time_ms = 6;
+  int64 commit_time_ms = 7;
+  int64 memory_used_bytes = 8;
+  int64 num_rows_dropped_by_watermark = 9;
+  int64 num_shuffle_partitions = 10;
+  int64 num_state_store_instances = 11;
+  map<string, int64> custom_metrics = 12;
+}
+
+message SourceProgress {
+  string description = 1;
+  string start_offset = 2;
+  string end_offset = 3;
+  string latest_offset = 4;
+  int64 num_input_rows = 5;
+  double input_rows_per_second = 6;
+  double processed_rows_per_second = 7;
+  map<string, string> metrics = 8;
+}
+
+message SinkProgress {
+  string description = 1;
+  int64 num_output_rows = 2;
+  map<string, string> metrics = 3;
+}
+
+message StreamingQueryProgress {
+  string id = 1;
+  string run_id = 2;
+  string name = 3;
+  string timestamp = 4;
+  int64 batch_id = 5;
+  int64 batch_duration = 6;
+  map<string, int64> duration_ms = 7;
+  map<string, string> event_time = 8;
+  repeated StateOperatorProgress state_operators = 9;
+  repeated SourceProgress sources = 10;
+  SinkProgress sink = 11;
+  map<string, string> observed_metrics = 12;
+}
+
+message StreamingQueryProgressWrapper {
+  StreamingQueryProgress progress = 1;
+}
diff --git 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
index 7beff87d7ec..e907d559349 100644
--- 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
+++ 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
@@ -18,3 +18,4 @@
 org.apache.spark.status.protobuf.sql.SQLExecutionUIDataSerializer
 org.apache.spark.status.protobuf.sql.SparkPlanGraphWrapperSerializer
 org.apache.spark.status.protobuf.sql.StreamingQueryDataSerializer
+org.apache.spark.status.protobuf.sql.StreamingQueryProgressWrapperSerializer
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
index 3d206e7780c..1b755ed70c6 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala
@@ -40,7 +40,7 @@ import 
org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS
  * Information about updates made to stateful operators in a 
[[StreamingQuery]] during a trigger.
  */
 @Evolving
-class StateOperatorProgress private[sql](
+class StateOperatorProgress private[spark](
     val operatorName: String,
     val numRowsTotal: Long,
     val numRowsUpdated: Long,
@@ -125,7 +125,7 @@ class StateOperatorProgress private[sql](
  * @since 2.1.0
  */
 @Evolving
-class StreamingQueryProgress private[sql](
+class StreamingQueryProgress private[spark](
   val id: UUID,
   val runId: UUID,
   val name: String,
@@ -190,7 +190,7 @@ class StreamingQueryProgress private[sql](
  * @since 2.1.0
  */
 @Evolving
-class SourceProgress protected[sql](
+class SourceProgress protected[spark](
   val description: String,
   val startOffset: String,
   val endOffset: String,
@@ -236,7 +236,7 @@ class SourceProgress protected[sql](
  * @since 2.1.0
  */
 @Evolving
-class SinkProgress protected[sql](
+class SinkProgress protected[spark](
     val description: String,
     val numOutputRows: Long,
     val metrics: ju.Map[String, String] = Map[String, String]().asJava) 
extends Serializable {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
index 1bdc5e3f79a..c5ecdb6395a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
@@ -141,7 +141,7 @@ private[sql] case class StreamingQueryUIData(
   }
 }
 
-private[sql] class StreamingQueryProgressWrapper(val progress: 
StreamingQueryProgress) {
+private[spark] class StreamingQueryProgressWrapper(val progress: 
StreamingQueryProgress) {
   @JsonIgnore @KVIndex
   private val uniqueId: String = getUniqueId(progress.runId, progress.batchId, 
progress.timestamp)
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SinkProgressSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SinkProgressSerializer.scala
new file mode 100644
index 00000000000..66f8e4942bf
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SinkProgressSerializer.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf.sql
+
+import org.apache.spark.sql.streaming.SinkProgress
+import org.apache.spark.status.protobuf.StoreTypes
+
+private[protobuf] object SinkProgressSerializer {
+
+  def serialize(sink: SinkProgress): StoreTypes.SinkProgress = {
+    val builder = StoreTypes.SinkProgress.newBuilder()
+    builder.setDescription(sink.description)
+    builder.setNumOutputRows(sink.numOutputRows)
+    sink.metrics.forEach {
+      case (k, v) => builder.putMetrics(k, v)
+    }
+    builder.build()
+  }
+
+  def deserialize(sink: StoreTypes.SinkProgress): SinkProgress = {
+    new SinkProgress(
+      description = sink.getDescription,
+      numOutputRows = sink.getNumOutputRows,
+      metrics = sink.getMetricsMap
+    )
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SourceProgressSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SourceProgressSerializer.scala
new file mode 100644
index 00000000000..4d53c7bbc7b
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SourceProgressSerializer.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf.sql
+
+import java.util.{List => JList}
+
+import org.apache.spark.sql.streaming.SourceProgress
+import org.apache.spark.status.protobuf.StoreTypes
+
+private[protobuf] object SourceProgressSerializer {
+
+  def serialize(source: SourceProgress): StoreTypes.SourceProgress = {
+    val builder = StoreTypes.SourceProgress.newBuilder()
+    builder.setDescription(source.description)
+    builder.setStartOffset(source.startOffset)
+    builder.setEndOffset(source.endOffset)
+    builder.setLatestOffset(source.latestOffset)
+    builder.setNumInputRows(source.numInputRows)
+    builder.setInputRowsPerSecond(source.inputRowsPerSecond)
+    builder.setProcessedRowsPerSecond(source.processedRowsPerSecond)
+    source.metrics.forEach {
+      case (k, v) => builder.putMetrics(k, v)
+    }
+    builder.build()
+  }
+
+  def deserializeToArray(sourceList: JList[StoreTypes.SourceProgress]): 
Array[SourceProgress] = {
+    val size = sourceList.size()
+    val result = new Array[SourceProgress](size)
+    var i = 0
+    while (i < size) {
+      result(i) = deserialize(sourceList.get(i))
+      i += 1
+    }
+    result
+  }
+
+  private def deserialize(source: StoreTypes.SourceProgress): SourceProgress = 
{
+    new SourceProgress(
+      description = source.getDescription,
+      startOffset = source.getStartOffset,
+      endOffset = source.getEndOffset,
+      latestOffset = source.getLatestOffset,
+      numInputRows = source.getNumInputRows,
+      inputRowsPerSecond = source.getInputRowsPerSecond,
+      processedRowsPerSecond = source.getProcessedRowsPerSecond,
+      metrics = source.getMetricsMap
+    )
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StateOperatorProgressSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StateOperatorProgressSerializer.scala
new file mode 100644
index 00000000000..6831c044c82
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StateOperatorProgressSerializer.scala
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf.sql
+
+import java.util.{List => JList}
+
+import org.apache.spark.sql.streaming.StateOperatorProgress
+import org.apache.spark.status.protobuf.StoreTypes
+
+object StateOperatorProgressSerializer {
+
+  def serialize(stateOperator: StateOperatorProgress): 
StoreTypes.StateOperatorProgress = {
+    val builder = StoreTypes.StateOperatorProgress.newBuilder()
+    builder.setOperatorName(stateOperator.operatorName)
+    builder.setNumRowsTotal(stateOperator.numRowsTotal)
+    builder.setNumRowsUpdated(stateOperator.numRowsUpdated)
+    builder.setAllUpdatesTimeMs(stateOperator.allUpdatesTimeMs)
+    builder.setNumRowsRemoved(stateOperator.numRowsRemoved)
+    builder.setAllRemovalsTimeMs(stateOperator.allRemovalsTimeMs)
+    builder.setCommitTimeMs(stateOperator.commitTimeMs)
+    builder.setMemoryUsedBytes(stateOperator.memoryUsedBytes)
+    
builder.setNumRowsDroppedByWatermark(stateOperator.numRowsDroppedByWatermark)
+    builder.setNumShufflePartitions(stateOperator.numShufflePartitions)
+    builder.setNumStateStoreInstances(stateOperator.numStateStoreInstances)
+    stateOperator.customMetrics.forEach {
+      case (k, v) => builder.putCustomMetrics(k, v)
+    }
+    builder.build()
+  }
+
+  def deserializeToArray(
+      stateOperatorList: JList[StoreTypes.StateOperatorProgress]): 
Array[StateOperatorProgress] = {
+    val size = stateOperatorList.size()
+    val result = new Array[StateOperatorProgress](size)
+    var i = 0
+    while (i < size) {
+      result(i) = deserialize(stateOperatorList.get(i))
+      i += 1
+    }
+    result
+  }
+
+  private def deserialize(
+      stateOperator: StoreTypes.StateOperatorProgress): StateOperatorProgress 
= {
+    new StateOperatorProgress(
+      operatorName = stateOperator.getOperatorName,
+      numRowsTotal = stateOperator.getNumRowsTotal,
+      numRowsUpdated = stateOperator.getNumRowsUpdated,
+      allUpdatesTimeMs = stateOperator.getAllUpdatesTimeMs,
+      numRowsRemoved = stateOperator.getNumRowsRemoved,
+      allRemovalsTimeMs = stateOperator.getAllRemovalsTimeMs,
+      commitTimeMs = stateOperator.getCommitTimeMs,
+      memoryUsedBytes = stateOperator.getMemoryUsedBytes,
+      numRowsDroppedByWatermark = stateOperator.getNumRowsDroppedByWatermark,
+      numShufflePartitions = stateOperator.getNumShufflePartitions,
+      numStateStoreInstances = stateOperator.getNumStateStoreInstances,
+      customMetrics = stateOperator.getCustomMetricsMap
+    )
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressSerializer.scala
new file mode 100644
index 00000000000..32d62a18ca4
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressSerializer.scala
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf.sql
+
+import java.util.{HashMap => JHashMap, Map => JMap, UUID}
+
+import com.fasterxml.jackson.databind.json.JsonMapper
+import com.fasterxml.jackson.module.scala.DefaultScalaModule
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
+import org.apache.spark.sql.streaming.StreamingQueryProgress
+import org.apache.spark.status.protobuf.StoreTypes
+
+private[protobuf] object StreamingQueryProgressSerializer {
+
+  private val mapper: JsonMapper = JsonMapper.builder()
+    .addModule(DefaultScalaModule)
+    .build()
+
+  def serialize(process: StreamingQueryProgress): 
StoreTypes.StreamingQueryProgress = {
+    val builder = StoreTypes.StreamingQueryProgress.newBuilder()
+    builder.setId(process.id.toString)
+    builder.setRunId(process.runId.toString)
+    builder.setName(process.name)
+    builder.setTimestamp(process.timestamp)
+    builder.setBatchId(process.batchId)
+    builder.setBatchDuration(process.batchDuration)
+    process.durationMs.forEach {
+      case (k, v) => builder.putDurationMs(k, v)
+    }
+    process.eventTime.forEach {
+      case (k, v) => builder.putEventTime(k, v)
+    }
+    process.stateOperators.foreach(
+      s => 
builder.addStateOperators(StateOperatorProgressSerializer.serialize(s)))
+    process.sources.foreach(
+      s => builder.addSources(SourceProgressSerializer.serialize(s))
+    )
+    builder.setSink(SinkProgressSerializer.serialize(process.sink))
+    process.observedMetrics.forEach {
+      case (k, v) => builder.putObservedMetrics(k, 
mapper.writeValueAsString(v))
+    }
+    builder.build()
+  }
+
+  def deserialize(process: StoreTypes.StreamingQueryProgress): 
StreamingQueryProgress = {
+    new StreamingQueryProgress(
+      id = UUID.fromString(process.getId),
+      runId = UUID.fromString(process.getRunId),
+      name = process.getName,
+      timestamp = process.getTimestamp,
+      batchId = process.getBatchId,
+      batchDuration = process.getBatchDuration,
+      durationMs = process.getDurationMsMap,
+      eventTime = process.getEventTimeMap,
+      stateOperators =
+        
StateOperatorProgressSerializer.deserializeToArray(process.getStateOperatorsList),
+      sources = 
SourceProgressSerializer.deserializeToArray(process.getSourcesList),
+      sink = SinkProgressSerializer.deserialize(process.getSink),
+      observedMetrics = convertToObservedMetrics(process.getObservedMetricsMap)
+    )
+  }
+
+  private def convertToObservedMetrics(input: JMap[String, String]): 
JHashMap[String, Row] = {
+    val observedMetrics = new JHashMap[String, Row](input.size())
+    val classType = classOf[GenericRowWithSchema]
+    input.forEach {
+      case (k, v) =>
+        observedMetrics.put(k, mapper.readValue(v, classType))
+    }
+    observedMetrics
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressWrapperSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressWrapperSerializer.scala
new file mode 100644
index 00000000000..3846af26754
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressWrapperSerializer.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf.sql
+
+import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper
+import org.apache.spark.status.protobuf.{ProtobufSerDe, StoreTypes}
+
+class StreamingQueryProgressWrapperSerializer extends ProtobufSerDe {
+
+  override val supportClass: Class[_] =
+    classOf[StreamingQueryProgressWrapper]
+
+  override def serialize(input: Any): Array[Byte] = {
+    val data = input.asInstanceOf[StreamingQueryProgressWrapper]
+    val builder = StoreTypes.StreamingQueryProgressWrapper.newBuilder()
+    
builder.setProgress(StreamingQueryProgressSerializer.serialize(data.progress))
+    builder.build().toByteArray
+  }
+
+  override def deserialize(bytes: Array[Byte]): StreamingQueryProgressWrapper 
= {
+    val processWrapper = 
StoreTypes.StreamingQueryProgressWrapper.parseFrom(bytes)
+    new StreamingQueryProgressWrapper(
+      StreamingQueryProgressSerializer.deserialize(processWrapper.getProgress))
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
index 5f1cd812d97..a031eb69bf2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
@@ -17,11 +17,18 @@
 
 package org.apache.spark.status.protobuf.sql
 
+import java.lang.{Long => JLong}
 import java.util.UUID
 
+import scala.collection.JavaConverters._
+
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
 import org.apache.spark.sql.execution.ui._
-import org.apache.spark.sql.streaming.ui.StreamingQueryData
+import org.apache.spark.sql.streaming.{SinkProgress, SourceProgress, 
StateOperatorProgress, StreamingQueryProgress}
+import org.apache.spark.sql.streaming.ui.{StreamingQueryData, 
StreamingQueryProgressWrapper}
+import org.apache.spark.sql.types.StructType
 import org.apache.spark.status.api.v1.sql.SqlResourceSuite
 import org.apache.spark.status.protobuf.KVStoreProtobufSerializer
 
@@ -236,4 +243,165 @@ class KVStoreProtobufSerializerSuite extends 
SparkFunSuite {
     assert(result.startTimestamp == input.startTimestamp)
     assert(result.endTimestamp == input.endTimestamp)
   }
+
+  test("StreamingQueryProgressWrapper") {
+    // Generate input data
+    val stateOperatorProgress0 = new StateOperatorProgress(
+      operatorName = "op-0",
+      numRowsTotal = 1L,
+      numRowsUpdated = 2L,
+      allUpdatesTimeMs = 3L,
+      numRowsRemoved = 4L,
+      allRemovalsTimeMs = 5L,
+      commitTimeMs = 6L,
+      memoryUsedBytes = 7L,
+      numRowsDroppedByWatermark = 8L,
+      numShufflePartitions = 9L,
+      numStateStoreInstances = 10L,
+      customMetrics = Map(
+        "custom-metrics-00" -> JLong.valueOf("10"),
+        "custom-metrics-01" -> JLong.valueOf("11")).asJava
+    )
+
+    val stateOperatorProgress1 = new StateOperatorProgress(
+      operatorName = "op-1",
+      numRowsTotal = 11L,
+      numRowsUpdated = 12L,
+      allUpdatesTimeMs = 13L,
+      numRowsRemoved = 14L,
+      allRemovalsTimeMs = 15L,
+      commitTimeMs = 16L,
+      memoryUsedBytes = 17L,
+      numRowsDroppedByWatermark = 18L,
+      numShufflePartitions = 19L,
+      numStateStoreInstances = 20L,
+      customMetrics = Map(
+        "custom-metrics-10" -> JLong.valueOf("20"),
+        "custom-metrics-11" -> JLong.valueOf("21")).asJava
+    )
+
+    val source0 = new SourceProgress(
+      description = "description-0",
+      startOffset = "startOffset-0",
+      endOffset = "endOffset-0",
+      latestOffset = "latestOffset-0",
+      numInputRows = 10L,
+      inputRowsPerSecond = 11.0,
+      processedRowsPerSecond = 12.0,
+      metrics = Map(
+        "metrics-00" -> "10",
+        "metrics-01" -> "11").asJava
+    )
+
+    val source1 = new SourceProgress(
+      description = "description-",
+      startOffset = "startOffset-1",
+      endOffset = "endOffset-1",
+      latestOffset = "latestOffset-1",
+      numInputRows = 20L,
+      inputRowsPerSecond = 21.0,
+      processedRowsPerSecond = 22.0,
+      metrics = Map(
+        "metrics-10" -> "20",
+        "metrics-11" -> "21").asJava
+    )
+
+    val sink = new SinkProgress(
+      description = "sink-0",
+      numOutputRows = 30,
+      metrics = Map(
+        "metrics-20" -> "30",
+        "metrics-21" -> "31").asJava
+    )
+
+    val schema1 = new StructType()
+      .add("c1", "long")
+      .add("c2", "double")
+    val schema2 = new StructType()
+      .add("rc", "long")
+      .add("min_q", "string")
+      .add("max_q", "string")
+
+    val observedMetrics = Map[String, Row](
+      "event1" -> new GenericRowWithSchema(Array(1L, 3.0d), schema1),
+      "event2" -> new GenericRowWithSchema(Array(1L, "hello", "world"), 
schema2)
+    ).asJava
+
+    val progress = new StreamingQueryProgress(
+      id = UUID.randomUUID(),
+      runId = UUID.randomUUID(),
+      name = "name-1",
+      timestamp = "2023-01-03T09:14:04.175Z",
+      batchId = 1L,
+      batchDuration = 2L,
+      durationMs = Map(
+        "duration-0" -> JLong.valueOf("10"),
+        "duration-1" -> JLong.valueOf("11")).asJava,
+      eventTime = Map(
+        "eventTime-0" -> "20",
+        "eventTime-1" -> "21").asJava,
+      stateOperators = Array(stateOperatorProgress0, stateOperatorProgress1),
+      sources = Array(source0, source1),
+      sink = sink,
+      observedMetrics = observedMetrics
+    )
+    val input = new StreamingQueryProgressWrapper(progress)
+
+    // Do serialization and deserialization
+    val bytes = serializer.serialize(input)
+    val result = serializer.deserialize(bytes, 
classOf[StreamingQueryProgressWrapper])
+
+    // Assertion results
+    val resultProcess = result.progress
+    assert(progress.id == resultProcess.id)
+    assert(progress.runId == resultProcess.runId)
+    assert(progress.name == resultProcess.name)
+    assert(progress.timestamp == resultProcess.timestamp)
+    assert(progress.batchId == resultProcess.batchId)
+    assert(progress.batchDuration == resultProcess.batchDuration)
+    assert(progress.durationMs == resultProcess.durationMs)
+    assert(progress.eventTime == resultProcess.eventTime)
+
+    progress.stateOperators.zip(resultProcess.stateOperators).foreach {
+      case (o1, o2) =>
+        assert(o1.operatorName == o2.operatorName)
+        assert(o1.numRowsTotal == o2.numRowsTotal)
+        assert(o1.numRowsUpdated == o2.numRowsUpdated)
+        assert(o1.allUpdatesTimeMs == o2.allUpdatesTimeMs)
+        assert(o1.numRowsRemoved == o2.numRowsRemoved)
+        assert(o1.allRemovalsTimeMs == o2.allRemovalsTimeMs)
+        assert(o1.commitTimeMs == o2.commitTimeMs)
+        assert(o1.memoryUsedBytes == o2.memoryUsedBytes)
+        assert(o1.numRowsDroppedByWatermark == o2.numRowsDroppedByWatermark)
+        assert(o1.numShufflePartitions == o2.numShufflePartitions)
+        assert(o1.numStateStoreInstances == o2.numStateStoreInstances)
+        assert(o1.customMetrics == o2.customMetrics)
+    }
+
+    progress.sources.zip(resultProcess.sources).foreach {
+      case (s1, s2) =>
+        assert(s1.description == s2.description)
+        assert(s1.startOffset == s2.startOffset)
+        assert(s1.endOffset == s2.endOffset)
+        assert(s1.latestOffset == s2.latestOffset)
+        assert(s1.numInputRows == s2.numInputRows)
+        assert(s1.inputRowsPerSecond == s2.inputRowsPerSecond)
+        assert(s1.processedRowsPerSecond == s2.processedRowsPerSecond)
+        assert(s1.metrics == s2.metrics)
+    }
+
+    Seq(progress.sink).zip(Seq(resultProcess.sink)).foreach {
+      case (s1, s2) =>
+        assert(s1.description == s2.description)
+        assert(s1.numOutputRows == s2.numOutputRows)
+        assert(s1.metrics == s2.metrics)
+    }
+
+    val resultObservedMetrics = resultProcess.observedMetrics
+    assert(progress.observedMetrics.size() == resultObservedMetrics.size())
+    assert(progress.observedMetrics.keySet() == resultObservedMetrics.keySet())
+    progress.observedMetrics.entrySet().forEach { e =>
+      assert(e.getValue == resultObservedMetrics.get(e.getKey))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to