This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 55f64cfa839 [SPARK-41968][CORE][SQL] Refactor `ProtobufSerDe` to  
`ProtobufSerDe[T]`
55f64cfa839 is described below

commit 55f64cfa839a77b8fff7c1625281b84cce4c6807
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Wed Jan 11 22:21:21 2023 -0800

    [SPARK-41968][CORE][SQL] Refactor `ProtobufSerDe` to  `ProtobufSerDe[T]`
    
    ### What changes were proposed in this pull request?
    This pr aims refator `ProtobufSerDe` to  `ProtobufSerDe[T]`, the main 
change of `ProtobufSerDe` as follows:
    
    - Change the definition of `ProtobufSerDe` to `ProtobufSerDe[T]`
    - Remove `supportClass` method from `ProtobufSerDe[T]` and use reflection 
in `KVStoreProtobufSerializer` to obtain the actual type of `T` as 
`serializerMap` key
    - Change the input parameter type of `serialize` function from `Any` to `T`
    - Change the return value type of `deserialize` function method from `Any` 
to `T`
    
    Then, all the subclasses of `ProtobufSerDe[T]` are refactored and code 
cleaned in this pr.
    
    ### Why are the changes needed?
    Refactor `ProtobufSerDe` and code cleanup.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass Github Actions
    
    Closes #39487 from LuciferYang/refactor-ProtobufSerDe.
    
    Authored-by: yangjie01 <yangji...@baidu.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../ApplicationEnvironmentInfoWrapperSerializer.scala | 10 +++-------
 .../protobuf/ApplicationInfoWrapperSerializer.scala   |  9 ++-------
 .../status/protobuf/CachedQuantileSerializer.scala    |  6 ++----
 .../ExecutorStageSummaryWrapperSerializer.scala       | 10 +++-------
 .../protobuf/ExecutorSummaryWrapperSerializer.scala   | 10 ++--------
 .../status/protobuf/JobDataWrapperSerializer.scala    |  9 ++-------
 .../status/protobuf/KVStoreProtobufSerializer.scala   | 15 +++++++++++----
 .../protobuf/ProcessSummaryWrapperSerializer.scala    | 10 ++--------
 .../apache/spark/status/protobuf/ProtobufSerDe.scala  | 19 +++++--------------
 .../protobuf/RDDOperationGraphWrapperSerializer.scala |  7 ++-----
 .../protobuf/RDDStorageInfoWrapperSerializer.scala    |  9 ++-------
 .../protobuf/ResourceProfileWrapperSerializer.scala   |  9 ++-------
 .../SpeculationStageSummaryWrapperSerializer.scala    | 10 +++-------
 .../status/protobuf/StageDataWrapperSerializer.scala  | 13 +++++--------
 .../status/protobuf/StreamBlockDataSerializer.scala   |  6 ++----
 .../status/protobuf/TaskDataWrapperSerializer.scala   |  9 ++-------
 .../protobuf/sql/SQLExecutionUIDataSerializer.scala   |  7 ++-----
 .../sql/SparkPlanGraphWrapperSerializer.scala         |  7 ++-----
 .../protobuf/sql/StreamingQueryDataSerializer.scala   |  9 +++------
 19 files changed, 57 insertions(+), 127 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
index 33a18daacbc..b7cf01382e2 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
@@ -23,14 +23,10 @@ import org.apache.spark.resource.{ExecutorResourceRequest, 
TaskResourceRequest}
 import org.apache.spark.status.ApplicationEnvironmentInfoWrapper
 import org.apache.spark.status.api.v1.{ApplicationEnvironmentInfo, 
ResourceProfileInfo, RuntimeInfo}
 
-class ApplicationEnvironmentInfoWrapperSerializer extends ProtobufSerDe {
+class ApplicationEnvironmentInfoWrapperSerializer
+  extends ProtobufSerDe[ApplicationEnvironmentInfoWrapper] {
 
-  override val supportClass: Class[_] = 
classOf[ApplicationEnvironmentInfoWrapper]
-
-  override def serialize(input: Any): Array[Byte] =
-    serialize(input.asInstanceOf[ApplicationEnvironmentInfoWrapper])
-
-  private def serialize(input: ApplicationEnvironmentInfoWrapper): Array[Byte] 
= {
+  override def serialize(input: ApplicationEnvironmentInfoWrapper): 
Array[Byte] = {
     val builder = StoreTypes.ApplicationEnvironmentInfoWrapper.newBuilder()
     builder.setInfo(serializeApplicationEnvironmentInfo(input.info))
     builder.build().toByteArray
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationInfoWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationInfoWrapperSerializer.scala
index 5a2accb7506..c56b5302cc1 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationInfoWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationInfoWrapperSerializer.scala
@@ -26,14 +26,9 @@ import 
org.apache.spark.status.api.v1.{ApplicationAttemptInfo, ApplicationInfo}
 import org.apache.spark.status.protobuf.Utils.getOptional
 
 
-class ApplicationInfoWrapperSerializer extends ProtobufSerDe {
+class ApplicationInfoWrapperSerializer extends 
ProtobufSerDe[ApplicationInfoWrapper] {
 
-  override val supportClass: Class[_] = classOf[ApplicationInfoWrapper]
-
-  override def serialize(input: Any): Array[Byte] =
-    serialize(input.asInstanceOf[ApplicationInfoWrapper])
-
-  private def serialize(j: ApplicationInfoWrapper): Array[Byte] = {
+  override def serialize(j: ApplicationInfoWrapper): Array[Byte] = {
     val jobData = serializeApplicationInfo(j.info)
     val builder = StoreTypes.ApplicationInfoWrapper.newBuilder()
     builder.setInfo(jobData)
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/CachedQuantileSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/CachedQuantileSerializer.scala
index 547cbd86b7a..08b8c8b0a98 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/CachedQuantileSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/CachedQuantileSerializer.scala
@@ -19,11 +19,9 @@ package org.apache.spark.status.protobuf
 
 import org.apache.spark.status.CachedQuantile
 
-class CachedQuantileSerializer extends ProtobufSerDe {
-  override val supportClass: Class[_] = classOf[CachedQuantile]
+class CachedQuantileSerializer extends ProtobufSerDe[CachedQuantile] {
 
-  override def serialize(input: Any): Array[Byte] = {
-    val data = input.asInstanceOf[CachedQuantile]
+  override def serialize(data: CachedQuantile): Array[Byte] = {
     val builder = StoreTypes.CachedQuantile.newBuilder()
       .setStageId(data.stageId.toLong)
       .setStageAttemptId(data.stageAttemptId)
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
index 71de2fbc81f..4d9d045ed5e 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
@@ -19,14 +19,10 @@ package org.apache.spark.status.protobuf
 
 import org.apache.spark.status.ExecutorStageSummaryWrapper
 
-class ExecutorStageSummaryWrapperSerializer extends ProtobufSerDe {
+class ExecutorStageSummaryWrapperSerializer
+  extends ProtobufSerDe[ExecutorStageSummaryWrapper] {
 
-  override val supportClass: Class[_] = classOf[ExecutorStageSummaryWrapper]
-
-  override def serialize(input: Any): Array[Byte] =
-    serialize(input.asInstanceOf[ExecutorStageSummaryWrapper])
-
-  private def serialize(input: ExecutorStageSummaryWrapper): Array[Byte] = {
+  override def serialize(input: ExecutorStageSummaryWrapper): Array[Byte] = {
     val info = ExecutorStageSummarySerializer.serialize(input.info)
     val builder = StoreTypes.ExecutorStageSummaryWrapper.newBuilder()
       .setStageId(input.stageId.toLong)
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
index 03a810157d7..b008c98e562 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
@@ -26,15 +26,9 @@ import org.apache.spark.status.ExecutorSummaryWrapper
 import org.apache.spark.status.api.v1.{ExecutorSummary, MemoryMetrics}
 import org.apache.spark.status.protobuf.Utils.getOptional
 
-class ExecutorSummaryWrapperSerializer extends ProtobufSerDe {
+class ExecutorSummaryWrapperSerializer extends 
ProtobufSerDe[ExecutorSummaryWrapper] {
 
-  override val supportClass: Class[_] = classOf[ExecutorSummaryWrapper]
-
-  override def serialize(input: Any): Array[Byte] = {
-    serialize(input.asInstanceOf[ExecutorSummaryWrapper])
-  }
-
-  def serialize(input: ExecutorSummaryWrapper): Array[Byte] = {
+  override def serialize(input: ExecutorSummaryWrapper): Array[Byte] = {
     val info = serializeExecutorSummary(input.info)
     val builder = StoreTypes.ExecutorSummaryWrapper.newBuilder()
       .setInfo(info)
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
index e2e2a1a8d89..10e0f125f6c 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
@@ -25,14 +25,9 @@ import org.apache.spark.status.JobDataWrapper
 import org.apache.spark.status.api.v1.JobData
 import org.apache.spark.status.protobuf.Utils.getOptional
 
-class JobDataWrapperSerializer extends ProtobufSerDe {
+class JobDataWrapperSerializer extends ProtobufSerDe[JobDataWrapper] {
 
-  override val supportClass: Class[_] = classOf[JobDataWrapper]
-
-  override def serialize(input: Any): Array[Byte] =
-    serialize(input.asInstanceOf[JobDataWrapper])
-
-  private def serialize(j: JobDataWrapper): Array[Byte] = {
+  override def serialize(j: JobDataWrapper): Array[Byte] = {
     val jobData = serializeJobData(j.info)
     val builder = StoreTypes.JobDataWrapper.newBuilder()
     builder.setInfo(jobData)
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializer.scala
index d915edf396c..e6bdfa17715 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializer.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.status.protobuf
 
+import java.lang.reflect.ParameterizedType
 import java.util.ServiceLoader
 
 import collection.JavaConverters._
@@ -40,10 +41,16 @@ private[spark] class KVStoreProtobufSerializer extends 
KVStoreScalaSerializer {
 
 private[spark] object KVStoreProtobufSerializer {
 
- private[this] lazy val serializerMap: Map[Class[_], ProtobufSerDe] =
-   ServiceLoader.load(classOf[ProtobufSerDe])
-     .asScala.map(serDe => serDe.supportClass -> serDe).toMap
+  private[this] lazy val serializerMap: Map[Class[_], ProtobufSerDe[Any]] = {
+    def getGenericsType(klass: Class[_]): Class[_] = {
+      klass.getGenericInterfaces.head.asInstanceOf[ParameterizedType]
+        .getActualTypeArguments.head.asInstanceOf[Class[_]]
+    }
+    ServiceLoader.load(classOf[ProtobufSerDe[Any]]).asScala.map { serDe =>
+      getGenericsType(serDe.getClass) -> serDe
+    }.toMap
+  }
 
-  def getSerializer(klass: Class[_]): Option[ProtobufSerDe] =
+  def getSerializer(klass: Class[_]): Option[ProtobufSerDe[Any]] =
     serializerMap.get(klass)
 }
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/ProcessSummaryWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/ProcessSummaryWrapperSerializer.scala
index 6d8a8f863ad..a3d13ddd31f 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/ProcessSummaryWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/ProcessSummaryWrapperSerializer.scala
@@ -25,15 +25,9 @@ import org.apache.spark.status.ProcessSummaryWrapper
 import org.apache.spark.status.api.v1.ProcessSummary
 import org.apache.spark.status.protobuf.Utils.getOptional
 
-class ProcessSummaryWrapperSerializer extends ProtobufSerDe {
+class ProcessSummaryWrapperSerializer extends 
ProtobufSerDe[ProcessSummaryWrapper] {
 
-  override val supportClass: Class[_] = classOf[ProcessSummaryWrapper]
-
-  override def serialize(input: Any): Array[Byte] = {
-    serialize(input.asInstanceOf[ProcessSummaryWrapper])
-  }
-
-  def serialize(input: ProcessSummaryWrapper): Array[Byte] = {
+  override def serialize(input: ProcessSummaryWrapper): Array[Byte] = {
     val builder = StoreTypes.ProcessSummaryWrapper.newBuilder()
     builder.setInfo(serializeProcessSummary(input.info))
     builder.build().toByteArray
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/ProtobufSerDe.scala 
b/core/src/main/scala/org/apache/spark/status/protobuf/ProtobufSerDe.scala
index 5e0f6263f1a..d6eccb6307a 100644
--- a/core/src/main/scala/org/apache/spark/status/protobuf/ProtobufSerDe.scala
+++ b/core/src/main/scala/org/apache/spark/status/protobuf/ProtobufSerDe.scala
@@ -26,29 +26,20 @@ import org.apache.spark.annotation.{DeveloperApi, Unstable}
  * register itself to `org.apache.spark.status.protobuf.ProtobufSerDe` so that
  * `KVStoreProtobufSerializer` can use `ServiceLoader` to load and use them.
  *
- * TODO: SPARK-41644 How to define `ProtobufSerDe` as `ProtobufSerDe[T]`
- *
  * @since 3.4.0
  */
 @DeveloperApi
 @Unstable
-trait ProtobufSerDe {
-
-  /**
-   * Specify the data types supported by the current `ProtobufSerDe`
-   */
-  val supportClass: Class[_]
+trait ProtobufSerDe[T] {
 
   /**
-   * Serialize the input data of the type corresponding to `supportClass`
-   * to `Array[Byte]`, since the current input parameter type is `Any`,
-   * the input type needs to be guaranteed from the code level.
+   * Serialize the input data of the type `T` to `Array[Byte]`.
    */
-  def serialize(input: Any): Array[Byte]
+  def serialize(input: T): Array[Byte]
 
   /**
    * Deserialize the input `Array[Byte]` to an object of the
-   * type corresponding to `supportClass`.
+   * type `T`.
    */
-  def deserialize(bytes: Array[Byte]): Any
+  def deserialize(bytes: Array[Byte]): T
 }
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
index c0d86ede198..f822ed1889a 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
@@ -24,12 +24,9 @@ import org.apache.spark.status.{RDDOperationClusterWrapper, 
RDDOperationGraphWra
 import org.apache.spark.status.protobuf.StoreTypes.{DeterministicLevel => 
GDeterministicLevel}
 import org.apache.spark.ui.scope.{RDDOperationEdge, RDDOperationNode}
 
-class RDDOperationGraphWrapperSerializer extends ProtobufSerDe {
+class RDDOperationGraphWrapperSerializer extends 
ProtobufSerDe[RDDOperationGraphWrapper] {
 
-  override val supportClass: Class[_] = classOf[RDDOperationGraphWrapper]
-
-  override def serialize(input: Any): Array[Byte] = {
-    val op = input.asInstanceOf[RDDOperationGraphWrapper]
+  override def serialize(op: RDDOperationGraphWrapper): Array[Byte] = {
     val builder = StoreTypes.RDDOperationGraphWrapper.newBuilder()
     builder.setStageId(op.stageId.toLong)
     op.edges.foreach { e =>
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/RDDStorageInfoWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDStorageInfoWrapperSerializer.scala
index be6fe1f83cd..e59d363243e 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/RDDStorageInfoWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDStorageInfoWrapperSerializer.scala
@@ -23,14 +23,9 @@ import org.apache.spark.status.RDDStorageInfoWrapper
 import org.apache.spark.status.api.v1.{RDDDataDistribution, RDDPartitionInfo, 
RDDStorageInfo}
 import org.apache.spark.status.protobuf.Utils.getOptional
 
-class RDDStorageInfoWrapperSerializer extends ProtobufSerDe {
+class RDDStorageInfoWrapperSerializer extends 
ProtobufSerDe[RDDStorageInfoWrapper] {
 
-  override val supportClass: Class[_] = classOf[RDDStorageInfoWrapper]
-
-  override def serialize(input: Any): Array[Byte] =
-    serialize(input.asInstanceOf[RDDStorageInfoWrapper])
-
-  private def serialize(input: RDDStorageInfoWrapper): Array[Byte] = {
+  override def serialize(input: RDDStorageInfoWrapper): Array[Byte] = {
     val builder = StoreTypes.RDDStorageInfoWrapper.newBuilder()
     builder.setInfo(serializeRDDStorageInfo(input.info))
     builder.build().toByteArray
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/ResourceProfileWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/ResourceProfileWrapperSerializer.scala
index a780ea5d7e9..d9d29cc8d88 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/ResourceProfileWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/ResourceProfileWrapperSerializer.scala
@@ -19,16 +19,11 @@ package org.apache.spark.status.protobuf
 
 import org.apache.spark.status.ResourceProfileWrapper
 
-class ResourceProfileWrapperSerializer extends ProtobufSerDe {
+class ResourceProfileWrapperSerializer extends 
ProtobufSerDe[ResourceProfileWrapper] {
 
   private val appEnvSerializer = new 
ApplicationEnvironmentInfoWrapperSerializer
 
-  override val supportClass: Class[_] = classOf[ResourceProfileWrapper]
-
-  override def serialize(input: Any): Array[Byte] =
-    serialize(input.asInstanceOf[ResourceProfileWrapper])
-
-  private def serialize(input: ResourceProfileWrapper): Array[Byte] = {
+  override def serialize(input: ResourceProfileWrapper): Array[Byte] = {
     val builder = StoreTypes.ResourceProfileWrapper.newBuilder()
     
builder.setRpInfo(appEnvSerializer.serializeResourceProfileInfo(input.rpInfo))
     builder.build().toByteArray
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/SpeculationStageSummaryWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/SpeculationStageSummaryWrapperSerializer.scala
index 936e2dd4504..1b9a1ecfce6 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/SpeculationStageSummaryWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/SpeculationStageSummaryWrapperSerializer.scala
@@ -20,14 +20,10 @@ package org.apache.spark.status.protobuf
 import org.apache.spark.status.SpeculationStageSummaryWrapper
 import org.apache.spark.status.api.v1.SpeculationStageSummary
 
-class SpeculationStageSummaryWrapperSerializer extends ProtobufSerDe {
+class SpeculationStageSummaryWrapperSerializer
+  extends ProtobufSerDe[SpeculationStageSummaryWrapper] {
 
-  override val supportClass: Class[_] = classOf[SpeculationStageSummaryWrapper]
-
-  override def serialize(input: Any): Array[Byte] =
-    serialize(input.asInstanceOf[SpeculationStageSummaryWrapper])
-
-  private def serialize(s: SpeculationStageSummaryWrapper): Array[Byte] = {
+  override def serialize(s: SpeculationStageSummaryWrapper): Array[Byte] = {
     val summary = serializeSpeculationStageSummary(s.info)
     val builder = StoreTypes.SpeculationStageSummaryWrapper.newBuilder()
     builder.setStageId(s.stageId.toLong)
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
index f4e5c8104a0..eda4422405e 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
@@ -27,16 +27,13 @@ import 
org.apache.spark.status.api.v1.{ExecutorMetricsDistributions, ExecutorPea
 import org.apache.spark.status.protobuf.Utils.getOptional
 import org.apache.spark.util.Utils.weakIntern
 
-class StageDataWrapperSerializer extends ProtobufSerDe {
+class StageDataWrapperSerializer extends ProtobufSerDe[StageDataWrapper] {
 
-  override val supportClass: Class[_] = classOf[StageDataWrapper]
-
-  override def serialize(input: Any): Array[Byte] = {
-    val s = input.asInstanceOf[StageDataWrapper]
+  override def serialize(input: StageDataWrapper): Array[Byte] = {
     val builder = StoreTypes.StageDataWrapper.newBuilder()
-    builder.setInfo(serializeStageData(s.info))
-    s.jobIds.foreach(id => builder.addJobIds(id.toLong))
-    s.locality.foreach { entry =>
+    builder.setInfo(serializeStageData(input.info))
+    input.jobIds.foreach(id => builder.addJobIds(id.toLong))
+    input.locality.foreach { entry =>
       builder.putLocality(entry._1, entry._2)
     }
     builder.build().toByteArray
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/StreamBlockDataSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/StreamBlockDataSerializer.scala
index 71bb09d3118..f450bbbfd0c 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/StreamBlockDataSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/StreamBlockDataSerializer.scala
@@ -19,11 +19,9 @@ package org.apache.spark.status.protobuf
 
 import org.apache.spark.status.StreamBlockData
 
-class StreamBlockDataSerializer extends ProtobufSerDe {
-  override val supportClass: Class[_] = classOf[StreamBlockData]
+class StreamBlockDataSerializer extends ProtobufSerDe[StreamBlockData] {
 
-  override def serialize(input: Any): Array[Byte] = {
-    val data = input.asInstanceOf[StreamBlockData]
+  override def serialize(data: StreamBlockData): Array[Byte] = {
     val builder = StoreTypes.StreamBlockData.newBuilder()
       .setName(data.name)
       .setExecutorId(data.executorId)
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala
index 7ba4a32d806..3c947c79eab 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala
@@ -21,14 +21,9 @@ import org.apache.spark.status.TaskDataWrapper
 import org.apache.spark.status.protobuf.Utils.getOptional
 import org.apache.spark.util.Utils.weakIntern
 
-class TaskDataWrapperSerializer extends ProtobufSerDe {
+class TaskDataWrapperSerializer extends ProtobufSerDe[TaskDataWrapper] {
 
-  override val supportClass: Class[_] = classOf[TaskDataWrapper]
-
-  override def serialize(input: Any): Array[Byte] =
-    serialize(input.asInstanceOf[TaskDataWrapper])
-
-  private def serialize(input: TaskDataWrapper): Array[Byte] = {
+  override def serialize(input: TaskDataWrapper): Array[Byte] = {
     val builder = StoreTypes.TaskDataWrapper.newBuilder()
       .setTaskId(input.taskId)
       .setIndex(input.index)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
index dcce69b803c..80a36e1b02b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
@@ -25,12 +25,9 @@ import org.apache.spark.sql.execution.ui.SQLExecutionUIData
 import org.apache.spark.status.protobuf.{JobExecutionStatusSerializer, 
ProtobufSerDe, StoreTypes}
 import org.apache.spark.status.protobuf.Utils.getOptional
 
-class SQLExecutionUIDataSerializer extends ProtobufSerDe {
+class SQLExecutionUIDataSerializer extends ProtobufSerDe[SQLExecutionUIData] {
 
-  override val supportClass: Class[_] = classOf[SQLExecutionUIData]
-
-  override def serialize(input: Any): Array[Byte] = {
-    val ui = input.asInstanceOf[SQLExecutionUIData]
+  override def serialize(ui: SQLExecutionUIData): Array[Byte] = {
     val builder = StoreTypes.SQLExecutionUIData.newBuilder()
     builder.setExecutionId(ui.executionId)
     builder.setRootExecutionId(ui.rootExecutionId)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
index c68466489ce..1df82e3246a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
@@ -23,12 +23,9 @@ import 
org.apache.spark.sql.execution.ui.{SparkPlanGraphClusterWrapper, SparkPla
 import org.apache.spark.status.protobuf.ProtobufSerDe
 import org.apache.spark.status.protobuf.StoreTypes
 
-class SparkPlanGraphWrapperSerializer extends ProtobufSerDe {
+class SparkPlanGraphWrapperSerializer extends 
ProtobufSerDe[SparkPlanGraphWrapper] {
 
-  override val supportClass: Class[_] = classOf[SparkPlanGraphWrapper]
-
-  override def serialize(input: Any): Array[Byte] = {
-    val plan = input.asInstanceOf[SparkPlanGraphWrapper]
+  override def serialize(plan: SparkPlanGraphWrapper): Array[Byte] = {
     val builder = StoreTypes.SparkPlanGraphWrapper.newBuilder()
     builder.setExecutionId(plan.executionId)
     plan.nodes.foreach { node =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryDataSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryDataSerializer.scala
index f05b186fea5..70f8bedf91b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryDataSerializer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/StreamingQueryDataSerializer.scala
@@ -23,12 +23,9 @@ import org.apache.spark.sql.streaming.ui.StreamingQueryData
 import org.apache.spark.status.protobuf.{ProtobufSerDe, StoreTypes}
 import org.apache.spark.status.protobuf.Utils.getOptional
 
-class StreamingQueryDataSerializer extends ProtobufSerDe {
+class StreamingQueryDataSerializer extends ProtobufSerDe[StreamingQueryData] {
 
-  override val supportClass: Class[_] = classOf[StreamingQueryData]
-
-  override def serialize(input: Any): Array[Byte] = {
-    val data = input.asInstanceOf[StreamingQueryData]
+  override def serialize(data: StreamingQueryData): Array[Byte] = {
     val builder = StoreTypes.StreamingQueryData.newBuilder()
       .setId(data.id.toString)
       .setRunId(data.runId)
@@ -40,7 +37,7 @@ class StreamingQueryDataSerializer extends ProtobufSerDe {
     builder.build().toByteArray
   }
 
-  override def deserialize(bytes: Array[Byte]): Any = {
+  override def deserialize(bytes: Array[Byte]): StreamingQueryData = {
     val data = StoreTypes.StreamingQueryData.parseFrom(bytes)
     val exception =
       getOptional(data.hasException, () => data.getException)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to