This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5574071aa62 [SPARK-41676][CORE][SQL][SS][UI] Protobuf serializer for 
`StreamingQueryData`
5574071aa62 is described below

commit 5574071aa6202da108378c6c3bb9ce0c05197972
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Sat Dec 31 15:36:37 2022 -0800

    [SPARK-41676][CORE][SQL][SS][UI] Protobuf serializer for 
`StreamingQueryData`
    
    ### What changes were proposed in this pull request?
    Add Protobuf serializer for `StreamingQueryData`
    
    ### Why are the changes needed?
    Support fast and compact serialization/deserialization for 
`StreamingQueryData` over RocksDB.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Add new UT
    
    Closes #39233 from LuciferYang/SPARK-41676.
    
    Lead-authored-by: yangjie01 <yangji...@baidu.com>
    Co-authored-by: Gengliang Wang <gengli...@apache.org>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../apache/spark/status/protobuf/store_types.proto | 10 ++++
 .../org.apache.spark.status.protobuf.ProtobufSerDe |  1 +
 .../ui/StreamingQueryStatusListener.scala          |  2 +-
 .../sql/StreamingQueryDataSerializer.scala         | 59 ++++++++++++++++++++++
 .../sql/KVStoreProtobufSerializerSuite.scala       | 25 +++++++++
 5 files changed, 96 insertions(+), 1 deletion(-)

diff --git 
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto 
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index ff687331a6a..38b82518ddd 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -464,6 +464,16 @@ message RDDOperationGraphWrapper {
   RDDOperationClusterWrapper root_cluster = 5;
 }
 
+message StreamingQueryData {
+  string name = 1;
+  string id = 2;
+  string run_id = 3;
+  bool is_active = 4;
+  optional string exception = 5;
+  int64 start_timestamp = 6;
+  optional int64 end_timestamp = 7;
+}
+
 message StageDataWrapper {
   StageData info = 1;
   repeated int64 job_ids = 2;
diff --git 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
index 3f0ae5470ce..7beff87d7ec 100644
--- 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
+++ 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
@@ -17,3 +17,4 @@
 
 org.apache.spark.status.protobuf.sql.SQLExecutionUIDataSerializer
 org.apache.spark.status.protobuf.sql.SparkPlanGraphWrapperSerializer
+org.apache.spark.status.protobuf.sql.StreamingQueryDataSerializer
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
index 2e6102b01fa..1bdc5e3f79a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ui/StreamingQueryStatusListener.scala
@@ -115,7 +115,7 @@ private[sql] class StreamingQueryStatusListener(
   }
 }
 
-private[sql] class StreamingQueryData(
+private[spark] class StreamingQueryData(
     val name: String,
     val id: UUID,
     @KVIndexParam val runId: String,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryDataSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryDataSerializer.scala
new file mode 100644
index 00000000000..f63ea80c77e
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryDataSerializer.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf.sql
+
+import java.util.UUID
+
+import org.apache.spark.sql.streaming.ui.StreamingQueryData
+import org.apache.spark.status.protobuf.{ProtobufSerDe, StoreTypes}
+import org.apache.spark.status.protobuf.Utils.getOptional
+
+class StreamingQueryDataSerializer extends ProtobufSerDe {
+
+  override val supportClass: Class[_] = classOf[StreamingQueryData]
+
+  override def serialize(input: Any): Array[Byte] = {
+    val data = input.asInstanceOf[StreamingQueryData]
+    val builder = StoreTypes.StreamingQueryData.newBuilder()
+      .setName(data.name)
+      .setId(data.id.toString)
+      .setRunId(data.runId)
+      .setIsActive(data.isActive)
+    data.exception.foreach(builder.setException)
+    builder.setStartTimestamp(data.startTimestamp)
+    data.endTimestamp.foreach(builder.setEndTimestamp)
+    builder.build().toByteArray
+  }
+
+  override def deserialize(bytes: Array[Byte]): Any = {
+    val data = StoreTypes.StreamingQueryData.parseFrom(bytes)
+    val exception =
+      getOptional(data.hasException, () => data.getException)
+    val endTimestamp =
+      getOptional(data.hasEndTimestamp, () => data.getEndTimestamp)
+    new StreamingQueryData(
+      name = data.getName,
+      id = UUID.fromString(data.getId),
+      runId = data.getRunId,
+      isActive = data.getIsActive,
+      exception = exception,
+      startTimestamp = data.getStartTimestamp,
+      endTimestamp = endTimestamp
+    )
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
index 90d04c3f013..5f1cd812d97 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
@@ -17,8 +17,11 @@
 
 package org.apache.spark.status.protobuf.sql
 
+import java.util.UUID
+
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.execution.ui._
+import org.apache.spark.sql.streaming.ui.StreamingQueryData
 import org.apache.spark.status.api.v1.sql.SqlResourceSuite
 import org.apache.spark.status.protobuf.KVStoreProtobufSerializer
 
@@ -211,4 +214,26 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite 
{
       assert(e1.toId == e2.toId)
     }
   }
+
+  test("StreamingQueryData") {
+    val id = UUID.randomUUID()
+    val input = new StreamingQueryData(
+      name = "some-query",
+      id = id,
+      runId = s"run-id-$id",
+      isActive = false,
+      exception = Some("Some Exception"),
+      startTimestamp = 1L,
+      endTimestamp = Some(2L)
+    )
+    val bytes = serializer.serialize(input)
+    val result = serializer.deserialize(bytes, classOf[StreamingQueryData])
+    assert(result.name == input.name)
+    assert(result.id == input.id)
+    assert(result.runId == input.runId)
+    assert(result.isActive == input.isActive)
+    assert(result.exception == input.exception)
+    assert(result.startTimestamp == input.startTimestamp)
+    assert(result.endTimestamp == input.endTimestamp)
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to