This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 5574071aa62 [SPARK-41676][CORE][SQL][SS][UI] Protobuf serializer for `StreamingQueryData` 5574071aa62 is described below commit 5574071aa6202da108378c6c3bb9ce0c05197972 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Sat Dec 31 15:36:37 2022 -0800 [SPARK-41676][CORE][SQL][SS][UI] Protobuf serializer for `StreamingQueryData` ### What changes were proposed in this pull request? Add Protobuf serializer for `StreamingQueryData` ### Why are the changes needed? Support fast and compact serialization/deserialization for `StreamingQueryData` over RocksDB. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add new UT Closes #39233 from LuciferYang/SPARK-41676. Lead-authored-by: yangjie01 <yangji...@baidu.com> Co-authored-by: Gengliang Wang <gengli...@apache.org> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../apache/spark/status/protobuf/store_types.proto | 10 ++++ .../org.apache.spark.status.protobuf.ProtobufSerDe | 1 + .../ui/StreamingQueryStatusListener.scala | 2 +- .../sql/StreamingQueryDataSerializer.scala | 59 ++++++++++++++++++++++ .../sql/KVStoreProtobufSerializerSuite.scala | 25 +++++++++ 5 files changed, 96 insertions(+), 1 deletion(-) diff --git a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto index ff687331a6a..38b82518ddd 100644 --- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto +++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto @@ -464,6 +464,16 @@ message RDDOperationGraphWrapper { RDDOperationClusterWrapper root_cluster = 5; } +message StreamingQueryData { + string name = 1; + string id = 2; + string run_id = 3; + bool is_active = 4; + optional string exception = 5; + int64 start_timestamp = 6; + optional int64 end_timestamp = 7; +} + message StageDataWrapper { StageData info = 1; repeated int64 job_ids = 2; diff --git a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe index 3f0ae5470ce..7beff87d7ec 100644 --- a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe +++ b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe @@ -17,3 +17,4 @@ org.apache.spark.status.protobuf.sql.SQLExecutionUIDataSerializer org.apache.spark.status.protobuf.sql.SparkPlanGraphWrapperSerializer +org.apache.spark.status.protobuf.sql.StreamingQueryDataSerializer diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala index 2e6102b01fa..1bdc5e3f79a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala @@ -115,7 +115,7 @@ private[sql] class StreamingQueryStatusListener( } } -private[sql] class StreamingQueryData( +private[spark] class StreamingQueryData( val name: String, val id: UUID, @KVIndexParam val runId: String, diff --git a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryDataSerializer.scala b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryDataSerializer.scala new file mode 100644 index 00000000000..f63ea80c77e --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryDataSerializer.scala @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.protobuf.sql + +import java.util.UUID + +import org.apache.spark.sql.streaming.ui.StreamingQueryData +import org.apache.spark.status.protobuf.{ProtobufSerDe, StoreTypes} +import org.apache.spark.status.protobuf.Utils.getOptional + +class StreamingQueryDataSerializer extends ProtobufSerDe { + + override val supportClass: Class[_] = classOf[StreamingQueryData] + + override def serialize(input: Any): Array[Byte] = { + val data = input.asInstanceOf[StreamingQueryData] + val builder = StoreTypes.StreamingQueryData.newBuilder() + .setName(data.name) + .setId(data.id.toString) + .setRunId(data.runId) + .setIsActive(data.isActive) + data.exception.foreach(builder.setException) + builder.setStartTimestamp(data.startTimestamp) + data.endTimestamp.foreach(builder.setEndTimestamp) + builder.build().toByteArray + } + + override def deserialize(bytes: Array[Byte]): Any = { + val data = StoreTypes.StreamingQueryData.parseFrom(bytes) + val exception = + getOptional(data.hasException, () => data.getException) + val endTimestamp = + getOptional(data.hasEndTimestamp, () => data.getEndTimestamp) + new StreamingQueryData( + name = data.getName, + id = UUID.fromString(data.getId), + runId = data.getRunId, + isActive = data.getIsActive, + exception = exception, + startTimestamp = data.getStartTimestamp, + endTimestamp = endTimestamp + ) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala index 90d04c3f013..5f1cd812d97 100644 --- a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala @@ -17,8 +17,11 @@ package org.apache.spark.status.protobuf.sql +import java.util.UUID + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.execution.ui._ +import org.apache.spark.sql.streaming.ui.StreamingQueryData import org.apache.spark.status.api.v1.sql.SqlResourceSuite import org.apache.spark.status.protobuf.KVStoreProtobufSerializer @@ -211,4 +214,26 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { assert(e1.toId == e2.toId) } } + + test("StreamingQueryData") { + val id = UUID.randomUUID() + val input = new StreamingQueryData( + name = "some-query", + id = id, + runId = s"run-id-$id", + isActive = false, + exception = Some("Some Exception"), + startTimestamp = 1L, + endTimestamp = Some(2L) + ) + val bytes = serializer.serialize(input) + val result = serializer.deserialize(bytes, classOf[StreamingQueryData]) + assert(result.name == input.name) + assert(result.id == input.id) + assert(result.runId == input.runId) + assert(result.isActive == input.isActive) + assert(result.exception == input.exception) + assert(result.startTimestamp == input.startTimestamp) + assert(result.endTimestamp == input.endTimestamp) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org