This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 915e9c67a95 [SPARK-41677][CORE][SQL][SS][UI] Add Protobuf serializer for StreamingQueryProgressWrapper 915e9c67a95 is described below commit 915e9c67a9581a1f66e70321879092d854c9fb3b Author: yangjie01 <yangji...@baidu.com> AuthorDate: Wed Jan 4 14:03:58 2023 -0800 [SPARK-41677][CORE][SQL][SS][UI] Add Protobuf serializer for StreamingQueryProgressWrapper ### What changes were proposed in this pull request? Add Protobuf serializer for `StreamingQueryProgressWrapper ` ### Why are the changes needed? Support fast and compact serialization/deserialization for `StreamingQueryProgressWrapper ` over RocksDB. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add new UT Closes #39357 from LuciferYang/SPARK-41677. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../apache/spark/status/protobuf/store_types.proto | 51 +++++++ .../org.apache.spark.status.protobuf.ProtobufSerDe | 1 + .../org/apache/spark/sql/streaming/progress.scala | 8 +- .../ui/StreamingQueryStatusListener.scala | 2 +- .../protobuf/sql/SinkProgressSerializer.scala | 42 +++++ .../protobuf/sql/SourceProgressSerializer.scala | 65 ++++++++ .../sql/StateOperatorProgressSerializer.scala | 75 +++++++++ .../sql/StreamingQueryProgressSerializer.scala | 89 +++++++++++ .../StreamingQueryProgressWrapperSerializer.scala | 40 +++++ .../sql/KVStoreProtobufSerializerSuite.scala | 170 ++++++++++++++++++++- 10 files changed, 537 insertions(+), 6 deletions(-) diff --git a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto index 6ba1915dfa1..2a45b5da1d8 100644 --- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto +++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto @@ -685,3 +685,54 @@ message ExecutorPeakMetricsDistributions { repeated double quantiles = 1; repeated ExecutorMetrics executor_metrics = 2; } + +message StateOperatorProgress { + string operator_name = 1; + int64 num_rows_total = 2; + int64 num_rows_updated = 3; + int64 all_updates_time_ms = 4; + int64 num_rows_removed = 5; + int64 all_removals_time_ms = 6; + int64 commit_time_ms = 7; + int64 memory_used_bytes = 8; + int64 num_rows_dropped_by_watermark = 9; + int64 num_shuffle_partitions = 10; + int64 num_state_store_instances = 11; + map<string, int64> custom_metrics = 12; +} + +message SourceProgress { + string description = 1; + string start_offset = 2; + string end_offset = 3; + string latest_offset = 4; + int64 num_input_rows = 5; + double input_rows_per_second = 6; + double processed_rows_per_second = 7; + map<string, string> metrics = 8; +} + +message SinkProgress { + string description = 1; + int64 num_output_rows = 2; + map<string, string> metrics = 3; +} + +message StreamingQueryProgress { + string id = 1; + string run_id = 2; + string name = 3; + string timestamp = 4; + int64 batch_id = 5; + int64 batch_duration = 6; + map<string, int64> duration_ms = 7; + map<string, string> event_time = 8; + repeated StateOperatorProgress state_operators = 9; + repeated SourceProgress sources = 10; + SinkProgress sink = 11; + map<string, string> observed_metrics = 12; +} + +message StreamingQueryProgressWrapper { + StreamingQueryProgress progress = 1; +} diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe index 7beff87d7ec..e907d559349 100644 --- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe @@ -18,3 +18,4 @@ org.apache.spark.status.protobuf.sql.SQLExecutionUIDataSerializer org.apache.spark.status.protobuf.sql.SparkPlanGraphWrapperSerializer org.apache.spark.status.protobuf.sql.StreamingQueryDataSerializer +org.apache.spark.status.protobuf.sql.StreamingQueryProgressWrapperSerializer diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 3d206e7780c..1b755ed70c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger. */ @Evolving -class StateOperatorProgress private[sql]( +class StateOperatorProgress private[spark]( val operatorName: String, val numRowsTotal: Long, val numRowsUpdated: Long, @@ -125,7 +125,7 @@ class StateOperatorProgress private[sql]( * @since 2.1.0 */ @Evolving -class StreamingQueryProgress private[sql]( +class StreamingQueryProgress private[spark]( val id: UUID, val runId: UUID, val name: String, @@ -190,7 +190,7 @@ class StreamingQueryProgress private[sql]( * @since 2.1.0 */ @Evolving -class SourceProgress protected[sql]( +class SourceProgress protected[spark]( val description: String, val startOffset: String, val endOffset: String, @@ -236,7 +236,7 @@ class SourceProgress protected[sql]( * @since 2.1.0 */ @Evolving -class SinkProgress protected[sql]( +class SinkProgress protected[spark]( val description: String, val numOutputRows: Long, val metrics: ju.Map[String, String] = Map[String, String]().asJava) extends Serializable { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala index 1bdc5e3f79a..c5ecdb6395a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala @@ -141,7 +141,7 @@ private[sql] case class StreamingQueryUIData( } } -private[sql] class StreamingQueryProgressWrapper(val progress: StreamingQueryProgress) { +private[spark] class StreamingQueryProgressWrapper(val progress: StreamingQueryProgress) { @JsonIgnore @KVIndex private val uniqueId: String = getUniqueId(progress.runId, progress.batchId, progress.timestamp) diff --git a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SinkProgressSerializer.scala b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SinkProgressSerializer.scala new file mode 100644 index 00000000000..66f8e4942bf --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SinkProgressSerializer.scala @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.protobuf.sql + +import org.apache.spark.sql.streaming.SinkProgress +import org.apache.spark.status.protobuf.StoreTypes + +private[protobuf] object SinkProgressSerializer { + + def serialize(sink: SinkProgress): StoreTypes.SinkProgress = { + val builder = StoreTypes.SinkProgress.newBuilder() + builder.setDescription(sink.description) + builder.setNumOutputRows(sink.numOutputRows) + sink.metrics.forEach { + case (k, v) => builder.putMetrics(k, v) + } + builder.build() + } + + def deserialize(sink: StoreTypes.SinkProgress): SinkProgress = { + new SinkProgress( + description = sink.getDescription, + numOutputRows = sink.getNumOutputRows, + metrics = sink.getMetricsMap + ) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SourceProgressSerializer.scala b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SourceProgressSerializer.scala new file mode 100644 index 00000000000..4d53c7bbc7b --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SourceProgressSerializer.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.protobuf.sql + +import java.util.{List => JList} + +import org.apache.spark.sql.streaming.SourceProgress +import org.apache.spark.status.protobuf.StoreTypes + +private[protobuf] object SourceProgressSerializer { + + def serialize(source: SourceProgress): StoreTypes.SourceProgress = { + val builder = StoreTypes.SourceProgress.newBuilder() + builder.setDescription(source.description) + builder.setStartOffset(source.startOffset) + builder.setEndOffset(source.endOffset) + builder.setLatestOffset(source.latestOffset) + builder.setNumInputRows(source.numInputRows) + builder.setInputRowsPerSecond(source.inputRowsPerSecond) + builder.setProcessedRowsPerSecond(source.processedRowsPerSecond) + source.metrics.forEach { + case (k, v) => builder.putMetrics(k, v) + } + builder.build() + } + + def deserializeToArray(sourceList: JList[StoreTypes.SourceProgress]): Array[SourceProgress] = { + val size = sourceList.size() + val result = new Array[SourceProgress](size) + var i = 0 + while (i < size) { + result(i) = deserialize(sourceList.get(i)) + i += 1 + } + result + } + + private def deserialize(source: StoreTypes.SourceProgress): SourceProgress = { + new SourceProgress( + description = source.getDescription, + startOffset = source.getStartOffset, + endOffset = source.getEndOffset, + latestOffset = source.getLatestOffset, + numInputRows = source.getNumInputRows, + inputRowsPerSecond = source.getInputRowsPerSecond, + processedRowsPerSecond = source.getProcessedRowsPerSecond, + metrics = source.getMetricsMap + ) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StateOperatorProgressSerializer.scala b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StateOperatorProgressSerializer.scala new file mode 100644 index 00000000000..6831c044c82 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StateOperatorProgressSerializer.scala @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.protobuf.sql + +import java.util.{List => JList} + +import org.apache.spark.sql.streaming.StateOperatorProgress +import org.apache.spark.status.protobuf.StoreTypes + +object StateOperatorProgressSerializer { + + def serialize(stateOperator: StateOperatorProgress): StoreTypes.StateOperatorProgress = { + val builder = StoreTypes.StateOperatorProgress.newBuilder() + builder.setOperatorName(stateOperator.operatorName) + builder.setNumRowsTotal(stateOperator.numRowsTotal) + builder.setNumRowsUpdated(stateOperator.numRowsUpdated) + builder.setAllUpdatesTimeMs(stateOperator.allUpdatesTimeMs) + builder.setNumRowsRemoved(stateOperator.numRowsRemoved) + builder.setAllRemovalsTimeMs(stateOperator.allRemovalsTimeMs) + builder.setCommitTimeMs(stateOperator.commitTimeMs) + builder.setMemoryUsedBytes(stateOperator.memoryUsedBytes) + builder.setNumRowsDroppedByWatermark(stateOperator.numRowsDroppedByWatermark) + builder.setNumShufflePartitions(stateOperator.numShufflePartitions) + builder.setNumStateStoreInstances(stateOperator.numStateStoreInstances) + stateOperator.customMetrics.forEach { + case (k, v) => builder.putCustomMetrics(k, v) + } + builder.build() + } + + def deserializeToArray( + stateOperatorList: JList[StoreTypes.StateOperatorProgress]): Array[StateOperatorProgress] = { + val size = stateOperatorList.size() + val result = new Array[StateOperatorProgress](size) + var i = 0 + while (i < size) { + result(i) = deserialize(stateOperatorList.get(i)) + i += 1 + } + result + } + + private def deserialize( + stateOperator: StoreTypes.StateOperatorProgress): StateOperatorProgress = { + new StateOperatorProgress( + operatorName = stateOperator.getOperatorName, + numRowsTotal = stateOperator.getNumRowsTotal, + numRowsUpdated = stateOperator.getNumRowsUpdated, + allUpdatesTimeMs = stateOperator.getAllUpdatesTimeMs, + numRowsRemoved = stateOperator.getNumRowsRemoved, + allRemovalsTimeMs = stateOperator.getAllRemovalsTimeMs, + commitTimeMs = stateOperator.getCommitTimeMs, + memoryUsedBytes = stateOperator.getMemoryUsedBytes, + numRowsDroppedByWatermark = stateOperator.getNumRowsDroppedByWatermark, + numShufflePartitions = stateOperator.getNumShufflePartitions, + numStateStoreInstances = stateOperator.getNumStateStoreInstances, + customMetrics = stateOperator.getCustomMetricsMap + ) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressSerializer.scala b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressSerializer.scala new file mode 100644 index 00000000000..32d62a18ca4 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressSerializer.scala @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.protobuf.sql + +import java.util.{HashMap => JHashMap, Map => JMap, UUID} + +import com.fasterxml.jackson.databind.json.JsonMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema +import org.apache.spark.sql.streaming.StreamingQueryProgress +import org.apache.spark.status.protobuf.StoreTypes + +private[protobuf] object StreamingQueryProgressSerializer { + + private val mapper: JsonMapper = JsonMapper.builder() + .addModule(DefaultScalaModule) + .build() + + def serialize(process: StreamingQueryProgress): StoreTypes.StreamingQueryProgress = { + val builder = StoreTypes.StreamingQueryProgress.newBuilder() + builder.setId(process.id.toString) + builder.setRunId(process.runId.toString) + builder.setName(process.name) + builder.setTimestamp(process.timestamp) + builder.setBatchId(process.batchId) + builder.setBatchDuration(process.batchDuration) + process.durationMs.forEach { + case (k, v) => builder.putDurationMs(k, v) + } + process.eventTime.forEach { + case (k, v) => builder.putEventTime(k, v) + } + process.stateOperators.foreach( + s => builder.addStateOperators(StateOperatorProgressSerializer.serialize(s))) + process.sources.foreach( + s => builder.addSources(SourceProgressSerializer.serialize(s)) + ) + builder.setSink(SinkProgressSerializer.serialize(process.sink)) + process.observedMetrics.forEach { + case (k, v) => builder.putObservedMetrics(k, mapper.writeValueAsString(v)) + } + builder.build() + } + + def deserialize(process: StoreTypes.StreamingQueryProgress): StreamingQueryProgress = { + new StreamingQueryProgress( + id = UUID.fromString(process.getId), + runId = UUID.fromString(process.getRunId), + name = process.getName, + timestamp = process.getTimestamp, + batchId = process.getBatchId, + batchDuration = process.getBatchDuration, + durationMs = process.getDurationMsMap, + eventTime = process.getEventTimeMap, + stateOperators = + StateOperatorProgressSerializer.deserializeToArray(process.getStateOperatorsList), + sources = SourceProgressSerializer.deserializeToArray(process.getSourcesList), + sink = SinkProgressSerializer.deserialize(process.getSink), + observedMetrics = convertToObservedMetrics(process.getObservedMetricsMap) + ) + } + + private def convertToObservedMetrics(input: JMap[String, String]): JHashMap[String, Row] = { + val observedMetrics = new JHashMap[String, Row](input.size()) + val classType = classOf[GenericRowWithSchema] + input.forEach { + case (k, v) => + observedMetrics.put(k, mapper.readValue(v, classType)) + } + observedMetrics + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressWrapperSerializer.scala b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressWrapperSerializer.scala new file mode 100644 index 00000000000..3846af26754 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressWrapperSerializer.scala @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.protobuf.sql + +import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper +import org.apache.spark.status.protobuf.{ProtobufSerDe, StoreTypes} + +class StreamingQueryProgressWrapperSerializer extends ProtobufSerDe { + + override val supportClass: Class[_] = + classOf[StreamingQueryProgressWrapper] + + override def serialize(input: Any): Array[Byte] = { + val data = input.asInstanceOf[StreamingQueryProgressWrapper] + val builder = StoreTypes.StreamingQueryProgressWrapper.newBuilder() + builder.setProgress(StreamingQueryProgressSerializer.serialize(data.progress)) + builder.build().toByteArray + } + + override def deserialize(bytes: Array[Byte]): StreamingQueryProgressWrapper = { + val processWrapper = StoreTypes.StreamingQueryProgressWrapper.parseFrom(bytes) + new StreamingQueryProgressWrapper( + StreamingQueryProgressSerializer.deserialize(processWrapper.getProgress)) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala index 5f1cd812d97..a031eb69bf2 100644 --- a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala @@ -17,11 +17,18 @@ package org.apache.spark.status.protobuf.sql +import java.lang.{Long => JLong} import java.util.UUID +import scala.collection.JavaConverters._ + import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.execution.ui._ -import org.apache.spark.sql.streaming.ui.StreamingQueryData +import org.apache.spark.sql.streaming.{SinkProgress, SourceProgress, StateOperatorProgress, StreamingQueryProgress} +import org.apache.spark.sql.streaming.ui.{StreamingQueryData, StreamingQueryProgressWrapper} +import org.apache.spark.sql.types.StructType import org.apache.spark.status.api.v1.sql.SqlResourceSuite import org.apache.spark.status.protobuf.KVStoreProtobufSerializer @@ -236,4 +243,165 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { assert(result.startTimestamp == input.startTimestamp) assert(result.endTimestamp == input.endTimestamp) } + + test("StreamingQueryProgressWrapper") { + // Generate input data + val stateOperatorProgress0 = new StateOperatorProgress( + operatorName = "op-0", + numRowsTotal = 1L, + numRowsUpdated = 2L, + allUpdatesTimeMs = 3L, + numRowsRemoved = 4L, + allRemovalsTimeMs = 5L, + commitTimeMs = 6L, + memoryUsedBytes = 7L, + numRowsDroppedByWatermark = 8L, + numShufflePartitions = 9L, + numStateStoreInstances = 10L, + customMetrics = Map( + "custom-metrics-00" -> JLong.valueOf("10"), + "custom-metrics-01" -> JLong.valueOf("11")).asJava + ) + + val stateOperatorProgress1 = new StateOperatorProgress( + operatorName = "op-1", + numRowsTotal = 11L, + numRowsUpdated = 12L, + allUpdatesTimeMs = 13L, + numRowsRemoved = 14L, + allRemovalsTimeMs = 15L, + commitTimeMs = 16L, + memoryUsedBytes = 17L, + numRowsDroppedByWatermark = 18L, + numShufflePartitions = 19L, + numStateStoreInstances = 20L, + customMetrics = Map( + "custom-metrics-10" -> JLong.valueOf("20"), + "custom-metrics-11" -> JLong.valueOf("21")).asJava + ) + + val source0 = new SourceProgress( + description = "description-0", + startOffset = "startOffset-0", + endOffset = "endOffset-0", + latestOffset = "latestOffset-0", + numInputRows = 10L, + inputRowsPerSecond = 11.0, + processedRowsPerSecond = 12.0, + metrics = Map( + "metrics-00" -> "10", + "metrics-01" -> "11").asJava + ) + + val source1 = new SourceProgress( + description = "description-", + startOffset = "startOffset-1", + endOffset = "endOffset-1", + latestOffset = "latestOffset-1", + numInputRows = 20L, + inputRowsPerSecond = 21.0, + processedRowsPerSecond = 22.0, + metrics = Map( + "metrics-10" -> "20", + "metrics-11" -> "21").asJava + ) + + val sink = new SinkProgress( + description = "sink-0", + numOutputRows = 30, + metrics = Map( + "metrics-20" -> "30", + "metrics-21" -> "31").asJava + ) + + val schema1 = new StructType() + .add("c1", "long") + .add("c2", "double") + val schema2 = new StructType() + .add("rc", "long") + .add("min_q", "string") + .add("max_q", "string") + + val observedMetrics = Map[String, Row]( + "event1" -> new GenericRowWithSchema(Array(1L, 3.0d), schema1), + "event2" -> new GenericRowWithSchema(Array(1L, "hello", "world"), schema2) + ).asJava + + val progress = new StreamingQueryProgress( + id = UUID.randomUUID(), + runId = UUID.randomUUID(), + name = "name-1", + timestamp = "2023-01-03T09:14:04.175Z", + batchId = 1L, + batchDuration = 2L, + durationMs = Map( + "duration-0" -> JLong.valueOf("10"), + "duration-1" -> JLong.valueOf("11")).asJava, + eventTime = Map( + "eventTime-0" -> "20", + "eventTime-1" -> "21").asJava, + stateOperators = Array(stateOperatorProgress0, stateOperatorProgress1), + sources = Array(source0, source1), + sink = sink, + observedMetrics = observedMetrics + ) + val input = new StreamingQueryProgressWrapper(progress) + + // Do serialization and deserialization + val bytes = serializer.serialize(input) + val result = serializer.deserialize(bytes, classOf[StreamingQueryProgressWrapper]) + + // Assertion results + val resultProcess = result.progress + assert(progress.id == resultProcess.id) + assert(progress.runId == resultProcess.runId) + assert(progress.name == resultProcess.name) + assert(progress.timestamp == resultProcess.timestamp) + assert(progress.batchId == resultProcess.batchId) + assert(progress.batchDuration == resultProcess.batchDuration) + assert(progress.durationMs == resultProcess.durationMs) + assert(progress.eventTime == resultProcess.eventTime) + + progress.stateOperators.zip(resultProcess.stateOperators).foreach { + case (o1, o2) => + assert(o1.operatorName == o2.operatorName) + assert(o1.numRowsTotal == o2.numRowsTotal) + assert(o1.numRowsUpdated == o2.numRowsUpdated) + assert(o1.allUpdatesTimeMs == o2.allUpdatesTimeMs) + assert(o1.numRowsRemoved == o2.numRowsRemoved) + assert(o1.allRemovalsTimeMs == o2.allRemovalsTimeMs) + assert(o1.commitTimeMs == o2.commitTimeMs) + assert(o1.memoryUsedBytes == o2.memoryUsedBytes) + assert(o1.numRowsDroppedByWatermark == o2.numRowsDroppedByWatermark) + assert(o1.numShufflePartitions == o2.numShufflePartitions) + assert(o1.numStateStoreInstances == o2.numStateStoreInstances) + assert(o1.customMetrics == o2.customMetrics) + } + + progress.sources.zip(resultProcess.sources).foreach { + case (s1, s2) => + assert(s1.description == s2.description) + assert(s1.startOffset == s2.startOffset) + assert(s1.endOffset == s2.endOffset) + assert(s1.latestOffset == s2.latestOffset) + assert(s1.numInputRows == s2.numInputRows) + assert(s1.inputRowsPerSecond == s2.inputRowsPerSecond) + assert(s1.processedRowsPerSecond == s2.processedRowsPerSecond) + assert(s1.metrics == s2.metrics) + } + + Seq(progress.sink).zip(Seq(resultProcess.sink)).foreach { + case (s1, s2) => + assert(s1.description == s2.description) + assert(s1.numOutputRows == s2.numOutputRows) + assert(s1.metrics == s2.metrics) + } + + val resultObservedMetrics = resultProcess.observedMetrics + assert(progress.observedMetrics.size() == resultObservedMetrics.size()) + assert(progress.observedMetrics.keySet() == resultObservedMetrics.keySet()) + progress.observedMetrics.entrySet().forEach { e => + assert(e.getValue == resultObservedMetrics.get(e.getKey)) + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org