This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 3a3bc77f3de Revert "[SPARK-41677][CORE][SQL][SS][UI] Add Protobuf serializer for StreamingQueryProgressWrapper" 3a3bc77f3de is described below commit 3a3bc77f3dea368ca0b434a3f8a9629b5d69a5ca Author: Gengliang Wang <gengli...@apache.org> AuthorDate: Thu Jan 5 20:28:55 2023 -0800 Revert "[SPARK-41677][CORE][SQL][SS][UI] Add Protobuf serializer for StreamingQueryProgressWrapper" ### What changes were proposed in this pull request? This reverts commit 915e9c67a9581a1f66e70321879092d854c9fb3b. ### Why are the changes needed? When running end-to-end tests, there are 5 NPE errors from string fields: - SourceProgress.latestOffset - SourceProgress.endOffset - SourceProgress.startOffset - StreamingQueryData.name - StreamingQueryProgress.name After fixing them, there is following error: ``` java.lang.UnsupportedOperationException at java.base/java.util.Collections$UnmodifiableMap.remove(Collections.java:1460) at org.apache.spark.sql.streaming.ui.StreamingQueryStatisticsPage.$anonfun$generateStatTable$27(StreamingQueryStatisticsPage.scala:401) ``` The deserialized map `StreamingQueryProgress.durationMs` needs to be mutable. Give the StreamingQueryProgressWrapper contains nullable fields and mutable map, I suggest using the default JSON serailizer for this class. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? GA tests Closes #39416 from gengliangwang/revertSS. Authored-by: Gengliang Wang <gengli...@apache.org> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../apache/spark/status/protobuf/store_types.proto | 51 ------- .../org.apache.spark.status.protobuf.ProtobufSerDe | 1 - .../org/apache/spark/sql/streaming/progress.scala | 8 +- .../ui/StreamingQueryStatusListener.scala | 2 +- .../protobuf/sql/SinkProgressSerializer.scala | 42 ----- .../protobuf/sql/SourceProgressSerializer.scala | 65 -------- .../sql/StateOperatorProgressSerializer.scala | 75 --------- .../sql/StreamingQueryProgressSerializer.scala | 89 ----------- .../StreamingQueryProgressWrapperSerializer.scala | 40 ----- .../sql/KVStoreProtobufSerializerSuite.scala | 170 +-------------------- 10 files changed, 6 insertions(+), 537 deletions(-) diff --git a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto index 1c3e5bfc49a..499fda34174 100644 --- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto +++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto @@ -686,54 +686,3 @@ message ExecutorPeakMetricsDistributions { repeated double quantiles = 1; repeated ExecutorMetrics executor_metrics = 2; } - -message StateOperatorProgress { - string operator_name = 1; - int64 num_rows_total = 2; - int64 num_rows_updated = 3; - int64 all_updates_time_ms = 4; - int64 num_rows_removed = 5; - int64 all_removals_time_ms = 6; - int64 commit_time_ms = 7; - int64 memory_used_bytes = 8; - int64 num_rows_dropped_by_watermark = 9; - int64 num_shuffle_partitions = 10; - int64 num_state_store_instances = 11; - map<string, int64> custom_metrics = 12; -} - -message SourceProgress { - string description = 1; - string start_offset = 2; - string end_offset = 3; - string latest_offset = 4; - int64 num_input_rows = 5; - double input_rows_per_second = 6; - double processed_rows_per_second = 7; - map<string, string> metrics = 8; -} - -message SinkProgress { - string description = 1; - int64 num_output_rows = 2; - map<string, string> metrics = 3; -} - -message StreamingQueryProgress { - string id = 1; - string run_id = 2; - string name = 3; - string timestamp = 4; - int64 batch_id = 5; - int64 batch_duration = 6; - map<string, int64> duration_ms = 7; - map<string, string> event_time = 8; - repeated StateOperatorProgress state_operators = 9; - repeated SourceProgress sources = 10; - SinkProgress sink = 11; - map<string, string> observed_metrics = 12; -} - -message StreamingQueryProgressWrapper { - StreamingQueryProgress progress = 1; -} diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe index e907d559349..7beff87d7ec 100644 --- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe @@ -18,4 +18,3 @@ org.apache.spark.status.protobuf.sql.SQLExecutionUIDataSerializer org.apache.spark.status.protobuf.sql.SparkPlanGraphWrapperSerializer org.apache.spark.status.protobuf.sql.StreamingQueryDataSerializer -org.apache.spark.status.protobuf.sql.StreamingQueryProgressWrapperSerializer diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala index 1b755ed70c6..3d206e7780c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala @@ -40,7 +40,7 @@ import org.apache.spark.sql.streaming.SinkProgress.DEFAULT_NUM_OUTPUT_ROWS * Information about updates made to stateful operators in a [[StreamingQuery]] during a trigger. */ @Evolving -class StateOperatorProgress private[spark]( +class StateOperatorProgress private[sql]( val operatorName: String, val numRowsTotal: Long, val numRowsUpdated: Long, @@ -125,7 +125,7 @@ class StateOperatorProgress private[spark]( * @since 2.1.0 */ @Evolving -class StreamingQueryProgress private[spark]( +class StreamingQueryProgress private[sql]( val id: UUID, val runId: UUID, val name: String, @@ -190,7 +190,7 @@ class StreamingQueryProgress private[spark]( * @since 2.1.0 */ @Evolving -class SourceProgress protected[spark]( +class SourceProgress protected[sql]( val description: String, val startOffset: String, val endOffset: String, @@ -236,7 +236,7 @@ class SourceProgress protected[spark]( * @since 2.1.0 */ @Evolving -class SinkProgress protected[spark]( +class SinkProgress protected[sql]( val description: String, val numOutputRows: Long, val metrics: ju.Map[String, String] = Map[String, String]().asJava) extends Serializable { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala index c5ecdb6395a..1bdc5e3f79a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala @@ -141,7 +141,7 @@ private[sql] case class StreamingQueryUIData( } } -private[spark] class StreamingQueryProgressWrapper(val progress: StreamingQueryProgress) { +private[sql] class StreamingQueryProgressWrapper(val progress: StreamingQueryProgress) { @JsonIgnore @KVIndex private val uniqueId: String = getUniqueId(progress.runId, progress.batchId, progress.timestamp) diff --git a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SinkProgressSerializer.scala b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SinkProgressSerializer.scala deleted file mode 100644 index 66f8e4942bf..00000000000 --- a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SinkProgressSerializer.scala +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.status.protobuf.sql - -import org.apache.spark.sql.streaming.SinkProgress -import org.apache.spark.status.protobuf.StoreTypes - -private[protobuf] object SinkProgressSerializer { - - def serialize(sink: SinkProgress): StoreTypes.SinkProgress = { - val builder = StoreTypes.SinkProgress.newBuilder() - builder.setDescription(sink.description) - builder.setNumOutputRows(sink.numOutputRows) - sink.metrics.forEach { - case (k, v) => builder.putMetrics(k, v) - } - builder.build() - } - - def deserialize(sink: StoreTypes.SinkProgress): SinkProgress = { - new SinkProgress( - description = sink.getDescription, - numOutputRows = sink.getNumOutputRows, - metrics = sink.getMetricsMap - ) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SourceProgressSerializer.scala b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SourceProgressSerializer.scala deleted file mode 100644 index 4d53c7bbc7b..00000000000 --- a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SourceProgressSerializer.scala +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.status.protobuf.sql - -import java.util.{List => JList} - -import org.apache.spark.sql.streaming.SourceProgress -import org.apache.spark.status.protobuf.StoreTypes - -private[protobuf] object SourceProgressSerializer { - - def serialize(source: SourceProgress): StoreTypes.SourceProgress = { - val builder = StoreTypes.SourceProgress.newBuilder() - builder.setDescription(source.description) - builder.setStartOffset(source.startOffset) - builder.setEndOffset(source.endOffset) - builder.setLatestOffset(source.latestOffset) - builder.setNumInputRows(source.numInputRows) - builder.setInputRowsPerSecond(source.inputRowsPerSecond) - builder.setProcessedRowsPerSecond(source.processedRowsPerSecond) - source.metrics.forEach { - case (k, v) => builder.putMetrics(k, v) - } - builder.build() - } - - def deserializeToArray(sourceList: JList[StoreTypes.SourceProgress]): Array[SourceProgress] = { - val size = sourceList.size() - val result = new Array[SourceProgress](size) - var i = 0 - while (i < size) { - result(i) = deserialize(sourceList.get(i)) - i += 1 - } - result - } - - private def deserialize(source: StoreTypes.SourceProgress): SourceProgress = { - new SourceProgress( - description = source.getDescription, - startOffset = source.getStartOffset, - endOffset = source.getEndOffset, - latestOffset = source.getLatestOffset, - numInputRows = source.getNumInputRows, - inputRowsPerSecond = source.getInputRowsPerSecond, - processedRowsPerSecond = source.getProcessedRowsPerSecond, - metrics = source.getMetricsMap - ) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StateOperatorProgressSerializer.scala b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StateOperatorProgressSerializer.scala deleted file mode 100644 index 6831c044c82..00000000000 --- a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StateOperatorProgressSerializer.scala +++ /dev/null @@ -1,75 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.status.protobuf.sql - -import java.util.{List => JList} - -import org.apache.spark.sql.streaming.StateOperatorProgress -import org.apache.spark.status.protobuf.StoreTypes - -object StateOperatorProgressSerializer { - - def serialize(stateOperator: StateOperatorProgress): StoreTypes.StateOperatorProgress = { - val builder = StoreTypes.StateOperatorProgress.newBuilder() - builder.setOperatorName(stateOperator.operatorName) - builder.setNumRowsTotal(stateOperator.numRowsTotal) - builder.setNumRowsUpdated(stateOperator.numRowsUpdated) - builder.setAllUpdatesTimeMs(stateOperator.allUpdatesTimeMs) - builder.setNumRowsRemoved(stateOperator.numRowsRemoved) - builder.setAllRemovalsTimeMs(stateOperator.allRemovalsTimeMs) - builder.setCommitTimeMs(stateOperator.commitTimeMs) - builder.setMemoryUsedBytes(stateOperator.memoryUsedBytes) - builder.setNumRowsDroppedByWatermark(stateOperator.numRowsDroppedByWatermark) - builder.setNumShufflePartitions(stateOperator.numShufflePartitions) - builder.setNumStateStoreInstances(stateOperator.numStateStoreInstances) - stateOperator.customMetrics.forEach { - case (k, v) => builder.putCustomMetrics(k, v) - } - builder.build() - } - - def deserializeToArray( - stateOperatorList: JList[StoreTypes.StateOperatorProgress]): Array[StateOperatorProgress] = { - val size = stateOperatorList.size() - val result = new Array[StateOperatorProgress](size) - var i = 0 - while (i < size) { - result(i) = deserialize(stateOperatorList.get(i)) - i += 1 - } - result - } - - private def deserialize( - stateOperator: StoreTypes.StateOperatorProgress): StateOperatorProgress = { - new StateOperatorProgress( - operatorName = stateOperator.getOperatorName, - numRowsTotal = stateOperator.getNumRowsTotal, - numRowsUpdated = stateOperator.getNumRowsUpdated, - allUpdatesTimeMs = stateOperator.getAllUpdatesTimeMs, - numRowsRemoved = stateOperator.getNumRowsRemoved, - allRemovalsTimeMs = stateOperator.getAllRemovalsTimeMs, - commitTimeMs = stateOperator.getCommitTimeMs, - memoryUsedBytes = stateOperator.getMemoryUsedBytes, - numRowsDroppedByWatermark = stateOperator.getNumRowsDroppedByWatermark, - numShufflePartitions = stateOperator.getNumShufflePartitions, - numStateStoreInstances = stateOperator.getNumStateStoreInstances, - customMetrics = stateOperator.getCustomMetricsMap - ) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressSerializer.scala b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressSerializer.scala deleted file mode 100644 index 32d62a18ca4..00000000000 --- a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressSerializer.scala +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.status.protobuf.sql - -import java.util.{HashMap => JHashMap, Map => JMap, UUID} - -import com.fasterxml.jackson.databind.json.JsonMapper -import com.fasterxml.jackson.module.scala.DefaultScalaModule - -import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema -import org.apache.spark.sql.streaming.StreamingQueryProgress -import org.apache.spark.status.protobuf.StoreTypes - -private[protobuf] object StreamingQueryProgressSerializer { - - private val mapper: JsonMapper = JsonMapper.builder() - .addModule(DefaultScalaModule) - .build() - - def serialize(process: StreamingQueryProgress): StoreTypes.StreamingQueryProgress = { - val builder = StoreTypes.StreamingQueryProgress.newBuilder() - builder.setId(process.id.toString) - builder.setRunId(process.runId.toString) - builder.setName(process.name) - builder.setTimestamp(process.timestamp) - builder.setBatchId(process.batchId) - builder.setBatchDuration(process.batchDuration) - process.durationMs.forEach { - case (k, v) => builder.putDurationMs(k, v) - } - process.eventTime.forEach { - case (k, v) => builder.putEventTime(k, v) - } - process.stateOperators.foreach( - s => builder.addStateOperators(StateOperatorProgressSerializer.serialize(s))) - process.sources.foreach( - s => builder.addSources(SourceProgressSerializer.serialize(s)) - ) - builder.setSink(SinkProgressSerializer.serialize(process.sink)) - process.observedMetrics.forEach { - case (k, v) => builder.putObservedMetrics(k, mapper.writeValueAsString(v)) - } - builder.build() - } - - def deserialize(process: StoreTypes.StreamingQueryProgress): StreamingQueryProgress = { - new StreamingQueryProgress( - id = UUID.fromString(process.getId), - runId = UUID.fromString(process.getRunId), - name = process.getName, - timestamp = process.getTimestamp, - batchId = process.getBatchId, - batchDuration = process.getBatchDuration, - durationMs = process.getDurationMsMap, - eventTime = process.getEventTimeMap, - stateOperators = - StateOperatorProgressSerializer.deserializeToArray(process.getStateOperatorsList), - sources = SourceProgressSerializer.deserializeToArray(process.getSourcesList), - sink = SinkProgressSerializer.deserialize(process.getSink), - observedMetrics = convertToObservedMetrics(process.getObservedMetricsMap) - ) - } - - private def convertToObservedMetrics(input: JMap[String, String]): JHashMap[String, Row] = { - val observedMetrics = new JHashMap[String, Row](input.size()) - val classType = classOf[GenericRowWithSchema] - input.forEach { - case (k, v) => - observedMetrics.put(k, mapper.readValue(v, classType)) - } - observedMetrics - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressWrapperSerializer.scala b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressWrapperSerializer.scala deleted file mode 100644 index 3846af26754..00000000000 --- a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryProgressWrapperSerializer.scala +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.status.protobuf.sql - -import org.apache.spark.sql.streaming.ui.StreamingQueryProgressWrapper -import org.apache.spark.status.protobuf.{ProtobufSerDe, StoreTypes} - -class StreamingQueryProgressWrapperSerializer extends ProtobufSerDe { - - override val supportClass: Class[_] = - classOf[StreamingQueryProgressWrapper] - - override def serialize(input: Any): Array[Byte] = { - val data = input.asInstanceOf[StreamingQueryProgressWrapper] - val builder = StoreTypes.StreamingQueryProgressWrapper.newBuilder() - builder.setProgress(StreamingQueryProgressSerializer.serialize(data.progress)) - builder.build().toByteArray - } - - override def deserialize(bytes: Array[Byte]): StreamingQueryProgressWrapper = { - val processWrapper = StoreTypes.StreamingQueryProgressWrapper.parseFrom(bytes) - new StreamingQueryProgressWrapper( - StreamingQueryProgressSerializer.deserialize(processWrapper.getProgress)) - } -} diff --git a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala index b590f6dd42c..f7b783ef3ca 100644 --- a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala @@ -17,18 +17,11 @@ package org.apache.spark.status.protobuf.sql -import java.lang.{Long => JLong} import java.util.UUID -import scala.collection.JavaConverters._ - import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.Row -import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema import org.apache.spark.sql.execution.ui._ -import org.apache.spark.sql.streaming.{SinkProgress, SourceProgress, StateOperatorProgress, StreamingQueryProgress} -import org.apache.spark.sql.streaming.ui.{StreamingQueryData, StreamingQueryProgressWrapper} -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.streaming.ui.StreamingQueryData import org.apache.spark.status.api.v1.sql.SqlResourceSuite import org.apache.spark.status.protobuf.KVStoreProtobufSerializer @@ -243,165 +236,4 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { assert(result.startTimestamp == input.startTimestamp) assert(result.endTimestamp == input.endTimestamp) } - - test("StreamingQueryProgressWrapper") { - // Generate input data - val stateOperatorProgress0 = new StateOperatorProgress( - operatorName = "op-0", - numRowsTotal = 1L, - numRowsUpdated = 2L, - allUpdatesTimeMs = 3L, - numRowsRemoved = 4L, - allRemovalsTimeMs = 5L, - commitTimeMs = 6L, - memoryUsedBytes = 7L, - numRowsDroppedByWatermark = 8L, - numShufflePartitions = 9L, - numStateStoreInstances = 10L, - customMetrics = Map( - "custom-metrics-00" -> JLong.valueOf("10"), - "custom-metrics-01" -> JLong.valueOf("11")).asJava - ) - - val stateOperatorProgress1 = new StateOperatorProgress( - operatorName = "op-1", - numRowsTotal = 11L, - numRowsUpdated = 12L, - allUpdatesTimeMs = 13L, - numRowsRemoved = 14L, - allRemovalsTimeMs = 15L, - commitTimeMs = 16L, - memoryUsedBytes = 17L, - numRowsDroppedByWatermark = 18L, - numShufflePartitions = 19L, - numStateStoreInstances = 20L, - customMetrics = Map( - "custom-metrics-10" -> JLong.valueOf("20"), - "custom-metrics-11" -> JLong.valueOf("21")).asJava - ) - - val source0 = new SourceProgress( - description = "description-0", - startOffset = "startOffset-0", - endOffset = "endOffset-0", - latestOffset = "latestOffset-0", - numInputRows = 10L, - inputRowsPerSecond = 11.0, - processedRowsPerSecond = 12.0, - metrics = Map( - "metrics-00" -> "10", - "metrics-01" -> "11").asJava - ) - - val source1 = new SourceProgress( - description = "description-", - startOffset = "startOffset-1", - endOffset = "endOffset-1", - latestOffset = "latestOffset-1", - numInputRows = 20L, - inputRowsPerSecond = 21.0, - processedRowsPerSecond = 22.0, - metrics = Map( - "metrics-10" -> "20", - "metrics-11" -> "21").asJava - ) - - val sink = new SinkProgress( - description = "sink-0", - numOutputRows = 30, - metrics = Map( - "metrics-20" -> "30", - "metrics-21" -> "31").asJava - ) - - val schema1 = new StructType() - .add("c1", "long") - .add("c2", "double") - val schema2 = new StructType() - .add("rc", "long") - .add("min_q", "string") - .add("max_q", "string") - - val observedMetrics = Map[String, Row]( - "event1" -> new GenericRowWithSchema(Array(1L, 3.0d), schema1), - "event2" -> new GenericRowWithSchema(Array(1L, "hello", "world"), schema2) - ).asJava - - val progress = new StreamingQueryProgress( - id = UUID.randomUUID(), - runId = UUID.randomUUID(), - name = "name-1", - timestamp = "2023-01-03T09:14:04.175Z", - batchId = 1L, - batchDuration = 2L, - durationMs = Map( - "duration-0" -> JLong.valueOf("10"), - "duration-1" -> JLong.valueOf("11")).asJava, - eventTime = Map( - "eventTime-0" -> "20", - "eventTime-1" -> "21").asJava, - stateOperators = Array(stateOperatorProgress0, stateOperatorProgress1), - sources = Array(source0, source1), - sink = sink, - observedMetrics = observedMetrics - ) - val input = new StreamingQueryProgressWrapper(progress) - - // Do serialization and deserialization - val bytes = serializer.serialize(input) - val result = serializer.deserialize(bytes, classOf[StreamingQueryProgressWrapper]) - - // Assertion results - val resultProcess = result.progress - assert(progress.id == resultProcess.id) - assert(progress.runId == resultProcess.runId) - assert(progress.name == resultProcess.name) - assert(progress.timestamp == resultProcess.timestamp) - assert(progress.batchId == resultProcess.batchId) - assert(progress.batchDuration == resultProcess.batchDuration) - assert(progress.durationMs == resultProcess.durationMs) - assert(progress.eventTime == resultProcess.eventTime) - - progress.stateOperators.zip(resultProcess.stateOperators).foreach { - case (o1, o2) => - assert(o1.operatorName == o2.operatorName) - assert(o1.numRowsTotal == o2.numRowsTotal) - assert(o1.numRowsUpdated == o2.numRowsUpdated) - assert(o1.allUpdatesTimeMs == o2.allUpdatesTimeMs) - assert(o1.numRowsRemoved == o2.numRowsRemoved) - assert(o1.allRemovalsTimeMs == o2.allRemovalsTimeMs) - assert(o1.commitTimeMs == o2.commitTimeMs) - assert(o1.memoryUsedBytes == o2.memoryUsedBytes) - assert(o1.numRowsDroppedByWatermark == o2.numRowsDroppedByWatermark) - assert(o1.numShufflePartitions == o2.numShufflePartitions) - assert(o1.numStateStoreInstances == o2.numStateStoreInstances) - assert(o1.customMetrics == o2.customMetrics) - } - - progress.sources.zip(resultProcess.sources).foreach { - case (s1, s2) => - assert(s1.description == s2.description) - assert(s1.startOffset == s2.startOffset) - assert(s1.endOffset == s2.endOffset) - assert(s1.latestOffset == s2.latestOffset) - assert(s1.numInputRows == s2.numInputRows) - assert(s1.inputRowsPerSecond == s2.inputRowsPerSecond) - assert(s1.processedRowsPerSecond == s2.processedRowsPerSecond) - assert(s1.metrics == s2.metrics) - } - - Seq(progress.sink).zip(Seq(resultProcess.sink)).foreach { - case (s1, s2) => - assert(s1.description == s2.description) - assert(s1.numOutputRows == s2.numOutputRows) - assert(s1.metrics == s2.metrics) - } - - val resultObservedMetrics = resultProcess.observedMetrics - assert(progress.observedMetrics.size() == resultObservedMetrics.size()) - assert(progress.observedMetrics.keySet() == resultObservedMetrics.keySet()) - progress.observedMetrics.entrySet().forEach { e => - assert(e.getValue == resultObservedMetrics.get(e.getKey)) - } - } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org