This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch branch-3.4 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.4 by this push: new 1a9cacb4625 [SPARK-41677][CORE][SQL][SS][UI] Add Protobuf serializer for `StreamingQueryProgressWrapper` 1a9cacb4625 is described below commit 1a9cacb4625d461773cc7167958164d56a1b9349 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Wed Jan 25 01:01:48 2023 -0800 [SPARK-41677][CORE][SQL][SS][UI] Add Protobuf serializer for `StreamingQueryProgressWrapper` ### What changes were proposed in this pull request? Add Protobuf serializer for `StreamingQueryProgressWrapper ` ### Why are the changes needed? Support fast and compact serialization/deserialization for `StreamingQueryProgressWrapper` over RocksDB. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Add new UT - Manual test sql module with `LIVE_UI_LOCAL_STORE_DIR`, all test passed: ``` build/mvn clean install -DskipTests -pl sq/core -am export LIVE_UI_LOCAL_STORE_DIR=/tmp/spark-ui build/mvn clean install -pl sql/core ``` There 4 existing test suites classes use `StreamingQueryProgressWrapper`: - StateStoreCoordinatorSuite - StreamingQueryStatusListenerWithDiskStoreSuite - UISeleniumSuite - UISeleniumWithRocksDBBackendSuite Closes #39642 from LuciferYang/SPARK-41677-2. Lead-authored-by: yangjie01 <yangji...@baidu.com> Co-authored-by: YangJie <yangji...@baidu.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> (cherry picked from commit 7b93415836057107b9e296fe79cfd67565874551) Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../apache/spark/status/protobuf/store_types.proto | 51 ++++ .../org/apache/spark/status/protobuf/Utils.scala | 8 + .../org.apache.spark.status.protobuf.ProtobufSerDe | 1 + .../org/apache/spark/sql/streaming/progress.scala | 8 +- .../ui/StreamingQueryStatusListener.scala | 2 +- .../protobuf/sql/SinkProgressSerializer.scala | 44 ++++ .../protobuf/sql/SourceProgressSerializer.scala | 64 +++++ .../sql/StateOperatorProgressSerializer.scala | 76 ++++++ .../sql/StreamingQueryProgressSerializer.scala | 104 +++++++++ .../StreamingQueryProgressWrapperSerializer.scala | 36 +++ .../sql/KVStoreProtobufSerializerSuite.scala | 259 ++++++++++++++++++++- 11 files changed, 647 insertions(+), 6 deletions(-) diff --git a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto index aacf49bd401..c4f64f27e4a 100644 --- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto +++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto @@ -765,3 +765,54 @@ message PoolData { optional string name = 1; repeated int64 stage_ids = 2; } + +message StateOperatorProgress { + optional string operator_name = 1; + int64 num_rows_total = 2; + int64 num_rows_updated = 3; + int64 all_updates_time_ms = 4; + int64 num_rows_removed = 5; + int64 all_removals_time_ms = 6; + int64 commit_time_ms = 7; + int64 memory_used_bytes = 8; + int64 num_rows_dropped_by_watermark = 9; + int64 num_shuffle_partitions = 10; + int64 num_state_store_instances = 11; + map<string, int64> custom_metrics = 12; +} + +message SourceProgress { + optional string description = 1; + optional string start_offset = 2; + optional string end_offset = 3; + optional string latest_offset = 4; + int64 num_input_rows = 5; + double input_rows_per_second = 6; + double processed_rows_per_second = 7; + map<string, string> metrics = 8; +} + +message SinkProgress { + optional string description = 1; + int64 num_output_rows = 2; + map<string, string> metrics = 3; +} + +message StreamingQueryProgress { + optional string id = 1; + optional string run_id = 2; + optional string name = 3; + optional string timestamp = 4; + int64 batch_id = 5; + int64 batch_duration = 6; + map<string, int64> duration_ms = 7; + map<string, string> event_time = 8; + repeated StateOperatorProgress state_operators = 9; + repeated SourceProgress sources = 10; + SinkProgress sink = 11; + map<string, string> observed_metrics = 12; +} + +message StreamingQueryProgressWrapper { + StreamingQueryProgress progress = 1; +} diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/Utils.scala b/core/src/main/scala/org/apache/spark/status/protobuf/Utils.scala index 809f9647d72..cef6df3f569 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/Utils.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/Utils.scala @@ -17,6 +17,8 @@ package org.apache.spark.status.protobuf +import java.util.{Map => JMap} + object Utils { def getOptional[T](condition: Boolean, result: () => T): Option[T] = if (condition) { Some(result()) @@ -35,4 +37,10 @@ object Utils { } else { null } + + def setJMapField[K, V](input: JMap[K, V], putAllFunc: JMap[K, V] => Any): Unit = { + if (input != null && !input.isEmpty) { + putAllFunc(input) + } + } } diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe index 7beff87d7ec..e907d559349 100644 --- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe @@ -18,3 +18,4 @@ org.apache.spark.status.protobuf.sql.SQLExecutionUIDataSerializer org.apache.spark.status.protobuf.sql.SparkPlanGraphWrapperSerializer org.apache.spark.status.protobuf.sql.StreamingQueryDataSerializer +org.apache.spark.status.protobuf.sql.StreamingQueryProgressWrapperSerializer diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 3d206e7780c..1b755ed70c6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger. */ @Evolving -class StateOperatorProgress private[sql]( +class StateOperatorProgress private[spark]( val operatorName: String, val numRowsTotal: Long, val numRowsUpdated: Long, @@ -125,7 +125,7 @@ class StateOperatorProgress private[sql]( * @since 2.1.0 */ @Evolving -class StreamingQueryProgress private[sql]( +class StreamingQueryProgress private[spark]( val id: UUID, val runId: UUID, val name: String, @@ -190,7 +190,7 @@ class StreamingQueryProgress private[sql]( * @since 2.1.0 */ @Evolving -class SourceProgress protected[sql]( +class SourceProgress protected[spark]( val description: String, val startOffset: String, val endOffset: String, @@ -236,7 +236,7 @@ class SourceProgress protected[sql]( * @since 2.1.0 */ @Evolving -class SinkProgress protected[sql]( +class SinkProgress protected[spark]( val description: String, val numOutputRows: Long, val metrics: ju.Map[String, String] = Map[String, String]().asJava) extends Serializable { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala index 1bdc5e3f79a..c5ecdb6395a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala @@ -141,7 +141,7 @@ private[sql] case class StreamingQueryUIData( } } -private[sql] class StreamingQueryProgressWrapper(val progress: StreamingQueryProgress) { +private[spark] class StreamingQueryProgressWrapper(val progress: StreamingQueryProgress) { @JsonIgnore @KVIndex private val uniqueId: String = getUniqueId(progress.runId, progress.batchId, progress.timestamp) diff --git a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SinkProgressSerializer.scala b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SinkProgressSerializer.scala new file mode 100644 index 00000000000..eb68a487e2a --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SinkProgressSerializer.scala @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.protobuf.sql + +import java.util.{HashMap => JHashMap} + +import org.apache.spark.sql.streaming.SinkProgress +import org.apache.spark.status.protobuf.StoreTypes +import org.apache.spark.status.protobuf.Utils.{getStringField, setStringField} + +private[protobuf] object SinkProgressSerializer { + + def serialize(sink: SinkProgress): StoreTypes.SinkProgress = { + import org.apache.spark.status.protobuf.Utils.setJMapField + val builder = StoreTypes.SinkProgress.newBuilder() + setStringField(sink.description, builder.setDescription) + builder.setNumOutputRows(sink.numOutputRows) + setJMapField(sink.metrics, builder.putAllMetrics) + builder.build() + } + + def deserialize(sink: StoreTypes.SinkProgress): SinkProgress = { + new SinkProgress( + description = getStringField(sink.hasDescription, () => sink.getDescription), + numOutputRows = sink.getNumOutputRows, + metrics = new JHashMap(sink.getMetricsMap) + ) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SourceProgressSerializer.scala b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SourceProgressSerializer.scala new file mode 100644 index 00000000000..9f3dd1af8f2 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SourceProgressSerializer.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.protobuf.sql + +import java.util.{HashMap => JHashMap, List => JList} + +import org.apache.spark.sql.streaming.SourceProgress +import org.apache.spark.status.protobuf.StoreTypes +import org.apache.spark.status.protobuf.Utils.{getStringField, setJMapField, setStringField} + +private[protobuf] object SourceProgressSerializer { + + def serialize(source: SourceProgress): StoreTypes.SourceProgress = { + val builder = StoreTypes.SourceProgress.newBuilder() + setStringField(source.description, builder.setDescription) + setStringField(source.startOffset, builder.setStartOffset) + setStringField(source.endOffset, builder.setEndOffset) + setStringField(source.latestOffset, builder.setLatestOffset) + builder.setNumInputRows(source.numInputRows) + builder.setInputRowsPerSecond(source.inputRowsPerSecond) + builder.setProcessedRowsPerSecond(source.processedRowsPerSecond) + setJMapField(source.metrics, builder.putAllMetrics) + builder.build() + } + + def deserializeToArray(sourceList: JList[StoreTypes.SourceProgress]): Array[SourceProgress] = { + val size = sourceList.size() + val result = new Array[SourceProgress](size) + var i = 0 + while (i < size) { + result(i) = deserialize(sourceList.get(i)) + i += 1 + } + result + } + + private def deserialize(source: StoreTypes.SourceProgress): SourceProgress = { + new SourceProgress( + description = getStringField(source.hasDescription, () => source.getDescription), + startOffset = getStringField(source.hasStartOffset, () => source.getStartOffset), + endOffset = getStringField(source.hasEndOffset, () => source.getEndOffset), + latestOffset = getStringField(source.hasLatestOffset, () => source.getLatestOffset), + numInputRows = source.getNumInputRows, + inputRowsPerSecond = source.getInputRowsPerSecond, + processedRowsPerSecond = source.getProcessedRowsPerSecond, + metrics = new JHashMap(source.getMetricsMap) + ) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StateOperatorProgressSerializer.scala b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StateOperatorProgressSerializer.scala new file mode 100644 index 00000000000..8b66e8e289b --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StateOperatorProgressSerializer.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.protobuf.sql + +import java.util.{HashMap => JHashMap, List => JList} + +import org.apache.spark.sql.streaming.StateOperatorProgress +import org.apache.spark.status.protobuf.StoreTypes +import org.apache.spark.status.protobuf.Utils.{getStringField, setStringField} + +object StateOperatorProgressSerializer { + + def serialize(stateOperator: StateOperatorProgress): StoreTypes.StateOperatorProgress = { + import org.apache.spark.status.protobuf.Utils.setJMapField + val builder = StoreTypes.StateOperatorProgress.newBuilder() + setStringField(stateOperator.operatorName, builder.setOperatorName) + builder.setNumRowsTotal(stateOperator.numRowsTotal) + builder.setNumRowsUpdated(stateOperator.numRowsUpdated) + builder.setAllUpdatesTimeMs(stateOperator.allUpdatesTimeMs) + builder.setNumRowsRemoved(stateOperator.numRowsRemoved) + builder.setAllRemovalsTimeMs(stateOperator.allRemovalsTimeMs) + builder.setCommitTimeMs(stateOperator.commitTimeMs) + builder.setMemoryUsedBytes(stateOperator.memoryUsedBytes) + builder.setNumRowsDroppedByWatermark(stateOperator.numRowsDroppedByWatermark) + builder.setNumShufflePartitions(stateOperator.numShufflePartitions) + builder.setNumStateStoreInstances(stateOperator.numStateStoreInstances) + setJMapField(stateOperator.customMetrics, builder.putAllCustomMetrics) + builder.build() + } + + def deserializeToArray( + stateOperatorList: JList[StoreTypes.StateOperatorProgress]): Array[StateOperatorProgress] = { + val size = stateOperatorList.size() + val result = new Array[StateOperatorProgress](size) + var i = 0 + while (i < size) { + result(i) = deserialize(stateOperatorList.get(i)) + i += 1 + } + result + } + + private def deserialize( + stateOperator: StoreTypes.StateOperatorProgress): StateOperatorProgress = { + new StateOperatorProgress( + operatorName = + getStringField(stateOperator.hasOperatorName, () => stateOperator.getOperatorName), + numRowsTotal = stateOperator.getNumRowsTotal, + numRowsUpdated = stateOperator.getNumRowsUpdated, + allUpdatesTimeMs = stateOperator.getAllUpdatesTimeMs, + numRowsRemoved = stateOperator.getNumRowsRemoved, + allRemovalsTimeMs = stateOperator.getAllRemovalsTimeMs, + commitTimeMs = stateOperator.getCommitTimeMs, + memoryUsedBytes = stateOperator.getMemoryUsedBytes, + numRowsDroppedByWatermark = stateOperator.getNumRowsDroppedByWatermark, + numShufflePartitions = stateOperator.getNumShufflePartitions, + numStateStoreInstances = stateOperator.getNumStateStoreInstances, + customMetrics = new JHashMap(stateOperator.getCustomMetricsMap) + ) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressSerializer.scala b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressSerializer.scala new file mode 100644 index 00000000000..fc0fd5fa477 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressSerializer.scala @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.protobuf.sql + +import java.util.{HashMap => JHashMap, Map => JMap, UUID} + +import com.fasterxml.jackson.databind.json.JsonMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule + +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema +import org.apache.spark.sql.streaming.StreamingQueryProgress +import org.apache.spark.status.protobuf.StoreTypes +import org.apache.spark.status.protobuf.Utils.{getStringField, setJMapField, setStringField} + +private[protobuf] object StreamingQueryProgressSerializer { + + private val mapper: JsonMapper = JsonMapper.builder() + .addModule(DefaultScalaModule) + .build() + + def serialize(process: StreamingQueryProgress): StoreTypes.StreamingQueryProgress = { + val builder = StoreTypes.StreamingQueryProgress.newBuilder() + if (process.id != null) { + builder.setId(process.id.toString) + } + if (process.runId != null) { + builder.setRunId(process.runId.toString) + } + setStringField(process.name, builder.setName) + setStringField(process.timestamp, builder.setTimestamp) + builder.setBatchId(process.batchId) + builder.setBatchDuration(process.batchDuration) + setJMapField(process.durationMs, builder.putAllDurationMs) + setJMapField(process.eventTime, builder.putAllEventTime) + process.stateOperators.foreach( + s => builder.addStateOperators(StateOperatorProgressSerializer.serialize(s))) + process.sources.foreach( + s => builder.addSources(SourceProgressSerializer.serialize(s)) + ) + builder.setSink(SinkProgressSerializer.serialize(process.sink)) + setJMapField(process.observedMetrics, putAllObservedMetrics(builder, _)) + builder.build() + } + + def deserialize(process: StoreTypes.StreamingQueryProgress): StreamingQueryProgress = { + val id = if (process.hasId) { + UUID.fromString(process.getId) + } else null + val runId = if (process.hasId) { + UUID.fromString(process.getRunId) + } else null + new StreamingQueryProgress( + id = id, + runId = runId, + name = getStringField(process.hasName, () => process.getName), + timestamp = getStringField(process.hasTimestamp, () => process.getTimestamp), + batchId = process.getBatchId, + batchDuration = process.getBatchDuration, + durationMs = new JHashMap(process.getDurationMsMap), + eventTime = new JHashMap(process.getEventTimeMap), + stateOperators = + StateOperatorProgressSerializer.deserializeToArray(process.getStateOperatorsList), + sources = SourceProgressSerializer.deserializeToArray(process.getSourcesList), + sink = SinkProgressSerializer.deserialize(process.getSink), + observedMetrics = convertToObservedMetrics(process.getObservedMetricsMap) + ) + } + + private def putAllObservedMetrics( + builder: StoreTypes.StreamingQueryProgress.Builder, + observedMetrics: JMap[String, Row]): Unit = { + // Encode Row as Json to handle it as a string type in protobuf and this way + // is simpler than defining a message type corresponding to Row in protobuf. + observedMetrics.forEach { + case (k, v) => builder.putObservedMetrics(k, mapper.writeValueAsString(v)) + } + } + + private def convertToObservedMetrics(input: JMap[String, String]): JHashMap[String, Row] = { + val observedMetrics = new JHashMap[String, Row](input.size()) + val classType = classOf[GenericRowWithSchema] + input.forEach { + case (k, v) => + observedMetrics.put(k, mapper.readValue(v, classType)) + } + observedMetrics + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressWrapperSerializer.scala b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressWrapperSerializer.scala new file mode 100644 index 00000000000..21a0adc26da --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressWrapperSerializer.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.protobuf.sql + +import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper +import org.apache.spark.status.protobuf.{ProtobufSerDe, StoreTypes} + +class StreamingQueryProgressWrapperSerializer extends ProtobufSerDe[StreamingQueryProgressWrapper] { + + override def serialize(data: StreamingQueryProgressWrapper): Array[Byte] = { + val builder = StoreTypes.StreamingQueryProgressWrapper.newBuilder() + builder.setProgress(StreamingQueryProgressSerializer.serialize(data.progress)) + builder.build().toByteArray + } + + override def deserialize(bytes: Array[Byte]): StreamingQueryProgressWrapper = { + val processWrapper = StoreTypes.StreamingQueryProgressWrapper.parseFrom(bytes) + new StreamingQueryProgressWrapper( + StreamingQueryProgressSerializer.deserialize(processWrapper.getProgress)) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala index 3c2d2533275..16f5897d2b8 100644 --- a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala @@ -17,11 +17,18 @@ package org.apache.spark.status.protobuf.sql +import java.lang.{Long => JLong} import java.util.UUID +import scala.collection.JavaConverters._ + import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.Row +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.execution.ui._ -import org.apache.spark.sql.streaming.ui.StreamingQueryData +import org.apache.spark.sql.streaming.{SinkProgress, SourceProgress, StateOperatorProgress, StreamingQueryProgress} +import org.apache.spark.sql.streaming.ui.{StreamingQueryData, StreamingQueryProgressWrapper} +import org.apache.spark.sql.types.StructType import org.apache.spark.status.api.v1.sql.SqlResourceSuite import org.apache.spark.status.protobuf.KVStoreProtobufSerializer @@ -271,4 +278,254 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { assert(result.endTimestamp == input.endTimestamp) } } + + test("StreamingQueryProgressWrapper") { + val normalInput = { + val stateOperatorProgress0 = new StateOperatorProgress( + operatorName = "op-0", + numRowsTotal = 1L, + numRowsUpdated = 2L, + allUpdatesTimeMs = 3L, + numRowsRemoved = 4L, + allRemovalsTimeMs = 5L, + commitTimeMs = 6L, + memoryUsedBytes = 7L, + numRowsDroppedByWatermark = 8L, + numShufflePartitions = 9L, + numStateStoreInstances = 10L, + customMetrics = Map( + "custom-metrics-00" -> JLong.valueOf("10"), + "custom-metrics-01" -> JLong.valueOf("11")).asJava + ) + val stateOperatorProgress1 = new StateOperatorProgress( + operatorName = null, + numRowsTotal = 11L, + numRowsUpdated = 12L, + allUpdatesTimeMs = 13L, + numRowsRemoved = 14L, + allRemovalsTimeMs = 15L, + commitTimeMs = 16L, + memoryUsedBytes = 17L, + numRowsDroppedByWatermark = 18L, + numShufflePartitions = 19L, + numStateStoreInstances = 20L, + customMetrics = Map( + "custom-metrics-10" -> JLong.valueOf("20"), + "custom-metrics-11" -> JLong.valueOf("21")).asJava + ) + val source0 = new SourceProgress( + description = "description-0", + startOffset = "startOffset-0", + endOffset = "endOffset-0", + latestOffset = "latestOffset-0", + numInputRows = 10L, + inputRowsPerSecond = 11.0, + processedRowsPerSecond = 12.0, + metrics = Map( + "metrics-00" -> "10", + "metrics-01" -> "11").asJava + ) + val source1 = new SourceProgress( + description = "description-1", + startOffset = "startOffset-1", + endOffset = "endOffset-1", + latestOffset = "latestOffset-1", + numInputRows = 20L, + inputRowsPerSecond = 21.0, + processedRowsPerSecond = 22.0, + metrics = Map( + "metrics-10" -> "20", + "metrics-11" -> "21").asJava + ) + val sink = new SinkProgress( + description = "sink-0", + numOutputRows = 30, + metrics = Map( + "metrics-20" -> "30", + "metrics-21" -> "31").asJava + ) + val schema1 = new StructType() + .add("c1", "long") + .add("c2", "double") + val schema2 = new StructType() + .add("rc", "long") + .add("min_q", "string") + .add("max_q", "string") + val observedMetrics = Map[String, Row]( + "event1" -> new GenericRowWithSchema(Array(1L, 3.0d), schema1), + "event2" -> new GenericRowWithSchema(Array(1L, "hello", "world"), schema2) + ).asJava + val progress = new StreamingQueryProgress( + id = UUID.randomUUID(), + runId = UUID.randomUUID(), + name = "name-1", + timestamp = "2023-01-03T09:14:04.175Z", + batchId = 1L, + batchDuration = 2L, + durationMs = Map( + "duration-0" -> JLong.valueOf("10"), + "duration-1" -> JLong.valueOf("11")).asJava, + eventTime = Map( + "eventTime-0" -> "20", + "eventTime-1" -> "21").asJava, + stateOperators = Array(stateOperatorProgress0, stateOperatorProgress1), + sources = Array(source0, source1), + sink = sink, + observedMetrics = observedMetrics + ) + new StreamingQueryProgressWrapper(progress) + } + + val withNullInput = { + val stateOperatorProgress0 = new StateOperatorProgress( + operatorName = null, + numRowsTotal = 1L, + numRowsUpdated = 2L, + allUpdatesTimeMs = 3L, + numRowsRemoved = 4L, + allRemovalsTimeMs = 5L, + commitTimeMs = 6L, + memoryUsedBytes = 7L, + numRowsDroppedByWatermark = 8L, + numShufflePartitions = 9L, + numStateStoreInstances = 10L, + customMetrics = null + ) + val stateOperatorProgress1 = new StateOperatorProgress( + operatorName = null, + numRowsTotal = 11L, + numRowsUpdated = 12L, + allUpdatesTimeMs = 13L, + numRowsRemoved = 14L, + allRemovalsTimeMs = 15L, + commitTimeMs = 16L, + memoryUsedBytes = 17L, + numRowsDroppedByWatermark = 18L, + numShufflePartitions = 19L, + numStateStoreInstances = 20L, + customMetrics = null + ) + val source0 = new SourceProgress( + description = null, + startOffset = null, + endOffset = null, + latestOffset = null, + numInputRows = 10L, + inputRowsPerSecond = 11.0, + processedRowsPerSecond = 12.0, + metrics = null + ) + val source1 = new SourceProgress( + description = null, + startOffset = null, + endOffset = null, + latestOffset = null, + numInputRows = 10L, + inputRowsPerSecond = 11.0, + processedRowsPerSecond = 12.0, + metrics = null + ) + val sink = new SinkProgress( + description = null, + numOutputRows = 30, + metrics = null + ) + val progress = new StreamingQueryProgress( + id = null, + runId = null, + name = null, + timestamp = null, + batchId = 1L, + batchDuration = 2L, + durationMs = null, + eventTime = null, + stateOperators = Array(stateOperatorProgress0, stateOperatorProgress1), + sources = Array(source0, source1), + sink = sink, + observedMetrics = null + ) + new StreamingQueryProgressWrapper(progress) + } + + Seq((false, normalInput), (true, withNullInput)).foreach { case (hasNullValue, input) => + // Do serialization and deserialization + val bytes = serializer.serialize(input) + val result = serializer.deserialize(bytes, classOf[StreamingQueryProgressWrapper]) + + // Assertion results + val progress = input.progress + val resultProcess = result.progress + assert(progress.id == resultProcess.id) + assert(progress.runId == resultProcess.runId) + assert(progress.name == resultProcess.name) + assert(progress.timestamp == resultProcess.timestamp) + assert(progress.batchId == resultProcess.batchId) + assert(progress.batchDuration == resultProcess.batchDuration) + if (hasNullValue) { + assert(resultProcess.durationMs.isEmpty) + assert(resultProcess.eventTime.isEmpty) + } else { + assert(progress.durationMs == resultProcess.durationMs) + assert(progress.eventTime == resultProcess.eventTime) + } + + progress.stateOperators.zip(resultProcess.stateOperators).foreach { + case (o1, o2) => + assert(o1.operatorName == o2.operatorName) + assert(o1.numRowsTotal == o2.numRowsTotal) + assert(o1.numRowsUpdated == o2.numRowsUpdated) + assert(o1.allUpdatesTimeMs == o2.allUpdatesTimeMs) + assert(o1.numRowsRemoved == o2.numRowsRemoved) + assert(o1.allRemovalsTimeMs == o2.allRemovalsTimeMs) + assert(o1.commitTimeMs == o2.commitTimeMs) + assert(o1.memoryUsedBytes == o2.memoryUsedBytes) + assert(o1.numRowsDroppedByWatermark == o2.numRowsDroppedByWatermark) + assert(o1.numShufflePartitions == o2.numShufflePartitions) + assert(o1.numStateStoreInstances == o2.numStateStoreInstances) + if (hasNullValue) { + assert(o2.customMetrics.isEmpty) + } else { + assert(o1.customMetrics == o2.customMetrics) + } + } + + progress.sources.zip(resultProcess.sources).foreach { + case (s1, s2) => + assert(s1.description == s2.description) + assert(s1.startOffset == s2.startOffset) + assert(s1.endOffset == s2.endOffset) + assert(s1.latestOffset == s2.latestOffset) + assert(s1.numInputRows == s2.numInputRows) + assert(s1.inputRowsPerSecond == s2.inputRowsPerSecond) + assert(s1.processedRowsPerSecond == s2.processedRowsPerSecond) + if (hasNullValue) { + assert(s2.metrics.isEmpty) + } else { + assert(s1.metrics == s2.metrics) + } + } + + Seq(progress.sink).zip(Seq(resultProcess.sink)).foreach { + case (s1, s2) => + assert(s1.description == s2.description) + assert(s1.numOutputRows == s2.numOutputRows) + if (hasNullValue) { + assert(s2.metrics.isEmpty) + } else { + assert(s1.metrics == s2.metrics) + } + } + + val resultObservedMetrics = resultProcess.observedMetrics + if (hasNullValue) { + assert(resultObservedMetrics.isEmpty) + } else { + assert(progress.observedMetrics.size() == resultObservedMetrics.size()) + assert(progress.observedMetrics.keySet() == resultObservedMetrics.keySet()) + progress.observedMetrics.entrySet().forEach { e => + assert(e.getValue == resultObservedMetrics.get(e.getKey)) + } + } + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org