This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 290d09d1c88 [SPARK-41423][CORE] Protobuf serializer for 
StageDataWrapper
290d09d1c88 is described below

commit 290d09d1c883ece2ecc2f20133c8e8fab3a0c7c4
Author: panbingkun <pbk1...@gmail.com>
AuthorDate: Fri Dec 30 13:49:17 2022 -0800

    [SPARK-41423][CORE] Protobuf serializer for StageDataWrapper
    
    ### What changes were proposed in this pull request?
    Add Protobuf serializer for StageDataWrapper.
    
    ### Why are the changes needed?
    Support fast and compact serialization/deserialization for StageDataWrapper 
over RocksDB.
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    New UT.
    
    Closes #39192 from panbingkun/SPARK-41423.
    
    Authored-by: panbingkun <pbk1...@gmail.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../apache/spark/status/protobuf/store_types.proto | 211 +++++++
 .../org.apache.spark.status.protobuf.ProtobufSerDe |   1 +
 .../scala/org/apache/spark/status/api/v1/api.scala |   4 +-
 .../protobuf/AccumulableInfoSerializer.scala       |  49 ++
 ....scala => ExecutorStageSummarySerializer.scala} |  34 +-
 .../ExecutorStageSummaryWrapperSerializer.scala    |  56 +-
 .../protobuf/StageDataWrapperSerializer.scala      | 616 +++++++++++++++++++++
 .../status/protobuf/StageStatusSerializer.scala    |  35 ++
 .../protobuf/TaskDataWrapperSerializer.scala       |  23 +-
 .../protobuf/KVStoreProtobufSerializerSuite.scala  | 610 ++++++++++++++++++--
 10 files changed, 1494 insertions(+), 145 deletions(-)

diff --git 
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto 
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index bba3a494083..ff687331a6a 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -463,3 +463,214 @@ message RDDOperationGraphWrapper {
   repeated RDDOperationEdge incoming_edges = 4;
   RDDOperationClusterWrapper root_cluster = 5;
 }
+
+message StageDataWrapper {
+  StageData info = 1;
+  repeated int64 job_ids = 2;
+  map<string, int64> locality = 3;
+}
+
+message TaskData {
+  int64 task_id = 1;
+  int32 index = 2;
+  int32 attempt = 3;
+  int32 partition_id = 4;
+  int64 launch_time = 5;
+  optional int64 result_fetch_start = 6;
+  optional int64 duration = 7;
+  string executor_id = 8;
+  string host = 9;
+  string status = 10;
+  string task_locality = 11;
+  bool speculative = 12;
+  repeated AccumulableInfo accumulator_updates = 13;
+  optional string error_message = 14;
+  optional TaskMetrics task_metrics = 15;
+  map<string, string> executor_logs = 16;
+  int64 scheduler_delay = 17;
+  int64 getting_result_time = 18;
+}
+
+enum StageStatus {
+  STAGE_STATUS_UNSPECIFIED = 0;
+  STAGE_STATUS_ACTIVE = 1;
+  STAGE_STATUS_COMPLETE = 2;
+  STAGE_STATUS_FAILED = 3;
+  STAGE_STATUS_PENDING = 4;
+  STAGE_STATUS_SKIPPED = 5;
+}
+
+message StageData {
+  StageStatus status = 1;
+  int64 stage_id = 2;
+  int32 attempt_id = 3;
+  int32 num_tasks = 4;
+  int32 num_active_tasks = 5;
+  int32 num_complete_tasks = 6;
+  int32 num_failed_tasks = 7;
+  int32 num_killed_tasks = 8;
+  int32 num_completed_indices = 9;
+
+  optional int64 submission_time = 10;
+  optional int64 first_task_launched_time = 11;
+  optional int64 completion_time = 12;
+  optional string failure_reason = 13;
+
+  int64 executor_deserialize_time = 14;
+  int64 executor_deserialize_cpu_time = 15;
+  int64 executor_run_time = 16;
+  int64 executor_cpu_time = 17;
+  int64 result_size = 18;
+  int64 jvm_gc_time = 19;
+  int64 result_serialization_time = 20;
+  int64 memory_bytes_spilled = 21;
+  int64 disk_bytes_spilled = 22;
+  int64 peak_execution_memory = 23;
+  int64 input_bytes = 24;
+  int64 input_records = 25;
+  int64 output_bytes = 26;
+  int64 output_records = 27;
+  int64 shuffle_remote_blocks_fetched = 28;
+  int64 shuffle_local_blocks_fetched = 29;
+  int64 shuffle_fetch_wait_time = 30;
+  int64 shuffle_remote_bytes_read = 31;
+  int64 shuffle_remote_bytes_read_to_disk = 32;
+  int64 shuffle_local_bytes_read = 33;
+  int64 shuffle_read_bytes = 34;
+  int64 shuffle_read_records = 35;
+  int64 shuffle_write_bytes = 36;
+  int64 shuffle_write_time = 37;
+  int64 shuffle_write_records = 38;
+
+  string name = 39;
+  optional string description = 40;
+  string details = 41;
+  string scheduling_pool = 42;
+
+  repeated int64 rdd_ids = 43;
+  repeated AccumulableInfo accumulator_updates = 44;
+  map<int64, TaskData> tasks = 45;
+  map<string, ExecutorStageSummary> executor_summary = 46;
+  optional SpeculationStageSummary speculation_summary = 47;
+  map<string, int32> killed_tasks_summary = 48;
+  int32 resource_profile_id = 49;
+  optional ExecutorMetrics peak_executor_metrics = 50;
+  optional TaskMetricDistributions task_metrics_distributions = 51;
+  optional ExecutorMetricsDistributions executor_metrics_distributions = 52;
+}
+
+message TaskMetrics {
+  int64 executor_deserialize_time = 1;
+  int64 executor_deserialize_cpu_time = 2;
+  int64 executor_run_time = 3;
+  int64 executor_cpu_time = 4;
+  int64 result_size = 5;
+  int64 jvm_gc_time = 6;
+  int64 result_serialization_time = 7;
+  int64 memory_bytes_spilled = 8;
+  int64 disk_bytes_spilled = 9;
+  int64 peak_execution_memory = 10;
+  InputMetrics input_metrics = 11;
+  OutputMetrics output_metrics = 12;
+  ShuffleReadMetrics shuffle_read_metrics = 13;
+  ShuffleWriteMetrics shuffle_write_metrics = 14;
+}
+
+message InputMetrics {
+  int64 bytes_read = 1;
+  int64 records_read = 2;
+}
+
+message OutputMetrics {
+  int64 bytes_written = 1;
+  int64 records_written = 2;
+}
+
+message ShuffleReadMetrics {
+  int64 remote_blocks_fetched = 1;
+  int64 local_blocks_fetched = 2;
+  int64 fetch_wait_time = 3;
+  int64 remote_bytes_read = 4;
+  int64 remote_bytes_read_to_disk = 5;
+  int64 local_bytes_read = 6;
+  int64 records_read = 7;
+}
+
+message ShuffleWriteMetrics {
+  int64 bytes_written = 1;
+  int64 write_time = 2;
+  int64 records_written = 3;
+}
+
+message TaskMetricDistributions {
+  repeated double quantiles = 1;
+  repeated double duration = 2;
+  repeated double executor_deserialize_time = 3;
+  repeated double executor_deserialize_cpu_time = 4;
+  repeated double executor_run_time = 5;
+  repeated double executor_cpu_time = 6;
+  repeated double result_size = 7;
+  repeated double jvm_gc_time = 8;
+  repeated double result_serialization_time = 9;
+  repeated double getting_result_time = 10;
+  repeated double scheduler_delay = 11;
+  repeated double peak_execution_memory = 12;
+  repeated double memory_bytes_spilled = 13;
+  repeated double disk_bytes_spilled = 14;
+  InputMetricDistributions input_metrics = 15;
+  OutputMetricDistributions output_metrics = 16;
+  ShuffleReadMetricDistributions shuffle_read_metrics = 17;
+  ShuffleWriteMetricDistributions shuffle_write_metrics = 18;
+}
+
+message InputMetricDistributions {
+  repeated double bytes_read = 1;
+  repeated double records_read = 2;
+}
+
+message OutputMetricDistributions {
+  repeated double bytes_written = 1;
+  repeated double records_written = 2;
+}
+
+message ShuffleReadMetricDistributions {
+  repeated double read_bytes = 1;
+  repeated double read_records = 2;
+  repeated double remote_blocks_fetched = 3;
+  repeated double local_blocks_fetched = 4;
+  repeated double fetch_wait_time = 5;
+  repeated double remote_bytes_read = 6;
+  repeated double remote_bytes_read_to_disk = 7;
+  repeated double total_blocks_fetched = 8;
+}
+
+message ShuffleWriteMetricDistributions {
+  repeated double write_bytes = 1;
+  repeated double write_records = 2;
+  repeated double write_time = 3;
+}
+
+message ExecutorMetricsDistributions {
+  repeated double quantiles = 1;
+
+  repeated double task_time = 2;
+  repeated double failed_tasks = 3;
+  repeated double succeeded_tasks = 4;
+  repeated double killed_tasks = 5;
+  repeated double input_bytes = 6;
+  repeated double input_records = 7;
+  repeated double output_bytes = 8;
+  repeated double output_records = 9;
+  repeated double shuffle_read = 10;
+  repeated double shuffle_read_records = 11;
+  repeated double shuffle_write = 12;
+  repeated double shuffle_write_records = 13;
+  repeated double memory_bytes_spilled = 14;
+  repeated double disk_bytes_spilled = 15;
+  ExecutorPeakMetricsDistributions peak_memory_metrics = 16;
+}
+
+message ExecutorPeakMetricsDistributions {
+  repeated double quantiles = 1;
+  repeated ExecutorMetrics executor_metrics = 2;
+}
diff --git 
a/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
 
b/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
index 4e39d9ecdc0..5619f2651d3 100644
--- 
a/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
+++ 
b/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
@@ -28,3 +28,4 @@ 
org.apache.spark.status.protobuf.SpeculationStageSummaryWrapperSerializer
 org.apache.spark.status.protobuf.ExecutorSummaryWrapperSerializer
 org.apache.spark.status.protobuf.ProcessSummaryWrapperSerializer
 org.apache.spark.status.protobuf.RDDOperationGraphWrapperSerializer
+org.apache.spark.status.protobuf.StageDataWrapperSerializer
diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala 
b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
index 025943f628b..a58b0b808b1 100644
--- a/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
+++ b/core/src/main/scala/org/apache/spark/status/api/v1/api.scala
@@ -291,8 +291,8 @@ class StageData private[spark](
     val details: String,
     val schedulingPool: String,
 
-    val rddIds: Seq[Int],
-    val accumulatorUpdates: Seq[AccumulableInfo],
+    val rddIds: collection.Seq[Int],
+    val accumulatorUpdates: collection.Seq[AccumulableInfo],
     val tasks: Option[Map[Long, TaskData]],
     val executorSummary: Option[Map[String, ExecutorStageSummary]],
     val speculationSummary: Option[SpeculationStageSummary],
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/AccumulableInfoSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/AccumulableInfoSerializer.scala
new file mode 100644
index 00000000000..8d5046923e9
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/AccumulableInfoSerializer.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf
+
+import java.util.{List => JList}
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.spark.status.api.v1.AccumulableInfo
+import org.apache.spark.status.protobuf.Utils.getOptional
+
+private[protobuf] object AccumulableInfoSerializer {
+
+  def serialize(input: AccumulableInfo): StoreTypes.AccumulableInfo = {
+    val builder = StoreTypes.AccumulableInfo.newBuilder()
+      .setId(input.id)
+      .setName(input.name)
+      .setValue(input.value)
+    input.update.foreach(builder.setUpdate)
+    builder.build()
+  }
+
+  def deserialize(updates: JList[StoreTypes.AccumulableInfo]): 
ArrayBuffer[AccumulableInfo] = {
+    val accumulatorUpdates = new ArrayBuffer[AccumulableInfo](updates.size())
+    updates.forEach { update =>
+      accumulatorUpdates.append(new AccumulableInfo(
+        id = update.getId,
+        name = update.getName,
+        update = getOptional(update.hasUpdate, update.getUpdate),
+        value = update.getValue))
+    }
+    accumulatorUpdates
+  }
+}
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummarySerializer.scala
similarity index 68%
copy from 
core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
copy to 
core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummarySerializer.scala
index 21296e95e51..c304ed8e5e1 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummarySerializer.scala
@@ -17,39 +17,12 @@
 
 package org.apache.spark.status.protobuf
 
-import org.apache.spark.status.ExecutorStageSummaryWrapper
 import org.apache.spark.status.api.v1.ExecutorStageSummary
 import org.apache.spark.status.protobuf.Utils.getOptional
 
-class ExecutorStageSummaryWrapperSerializer extends ProtobufSerDe {
+private[protobuf] object ExecutorStageSummarySerializer {
 
-  override val supportClass: Class[_] = classOf[ExecutorStageSummaryWrapper]
-
-  override def serialize(input: Any): Array[Byte] =
-    serialize(input.asInstanceOf[ExecutorStageSummaryWrapper])
-
-  private def serialize(input: ExecutorStageSummaryWrapper): Array[Byte] = {
-    val info = serializeExecutorStageSummary(input.info)
-    val builder = StoreTypes.ExecutorStageSummaryWrapper.newBuilder()
-      .setStageId(input.stageId.toLong)
-      .setStageAttemptId(input.stageAttemptId)
-      .setExecutorId(input.executorId)
-      .setInfo(info)
-    builder.build().toByteArray
-  }
-
-  def deserialize(bytes: Array[Byte]): ExecutorStageSummaryWrapper = {
-    val binary = StoreTypes.ExecutorStageSummaryWrapper.parseFrom(bytes)
-    val info = deserializeExecutorStageSummary(binary.getInfo)
-    new ExecutorStageSummaryWrapper(
-      stageId = binary.getStageId.toInt,
-      stageAttemptId = binary.getStageAttemptId,
-      executorId = binary.getExecutorId,
-      info = info)
-  }
-
-  private def serializeExecutorStageSummary(
-      input: ExecutorStageSummary): StoreTypes.ExecutorStageSummary = {
+  def serialize(input: ExecutorStageSummary): StoreTypes.ExecutorStageSummary 
= {
     val builder = StoreTypes.ExecutorStageSummary.newBuilder()
       .setTaskTime(input.taskTime)
       .setFailedTasks(input.failedTasks)
@@ -73,8 +46,7 @@ class ExecutorStageSummaryWrapperSerializer extends 
ProtobufSerDe {
     builder.build()
   }
 
-  def deserializeExecutorStageSummary(
-      binary: StoreTypes.ExecutorStageSummary): ExecutorStageSummary = {
+  def deserialize(binary: StoreTypes.ExecutorStageSummary): 
ExecutorStageSummary = {
     val peakMemoryMetrics =
       getOptional(binary.hasPeakMemoryMetrics,
         () => 
ExecutorMetricsSerializer.deserialize(binary.getPeakMemoryMetrics))
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
index 21296e95e51..71de2fbc81f 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
@@ -18,8 +18,6 @@
 package org.apache.spark.status.protobuf
 
 import org.apache.spark.status.ExecutorStageSummaryWrapper
-import org.apache.spark.status.api.v1.ExecutorStageSummary
-import org.apache.spark.status.protobuf.Utils.getOptional
 
 class ExecutorStageSummaryWrapperSerializer extends ProtobufSerDe {
 
@@ -29,7 +27,7 @@ class ExecutorStageSummaryWrapperSerializer extends 
ProtobufSerDe {
     serialize(input.asInstanceOf[ExecutorStageSummaryWrapper])
 
   private def serialize(input: ExecutorStageSummaryWrapper): Array[Byte] = {
-    val info = serializeExecutorStageSummary(input.info)
+    val info = ExecutorStageSummarySerializer.serialize(input.info)
     val builder = StoreTypes.ExecutorStageSummaryWrapper.newBuilder()
       .setStageId(input.stageId.toLong)
       .setStageAttemptId(input.stageAttemptId)
@@ -40,61 +38,11 @@ class ExecutorStageSummaryWrapperSerializer extends 
ProtobufSerDe {
 
   def deserialize(bytes: Array[Byte]): ExecutorStageSummaryWrapper = {
     val binary = StoreTypes.ExecutorStageSummaryWrapper.parseFrom(bytes)
-    val info = deserializeExecutorStageSummary(binary.getInfo)
+    val info = ExecutorStageSummarySerializer.deserialize(binary.getInfo)
     new ExecutorStageSummaryWrapper(
       stageId = binary.getStageId.toInt,
       stageAttemptId = binary.getStageAttemptId,
       executorId = binary.getExecutorId,
       info = info)
   }
-
-  private def serializeExecutorStageSummary(
-      input: ExecutorStageSummary): StoreTypes.ExecutorStageSummary = {
-    val builder = StoreTypes.ExecutorStageSummary.newBuilder()
-      .setTaskTime(input.taskTime)
-      .setFailedTasks(input.failedTasks)
-      .setSucceededTasks(input.succeededTasks)
-      .setKilledTasks(input.killedTasks)
-      .setInputBytes(input.inputBytes)
-      .setInputRecords(input.inputRecords)
-      .setOutputBytes(input.outputBytes)
-      .setOutputRecords(input.outputRecords)
-      .setShuffleRead(input.shuffleRead)
-      .setShuffleReadRecords(input.shuffleReadRecords)
-      .setShuffleWrite(input.shuffleWrite)
-      .setShuffleWriteRecords(input.shuffleWriteRecords)
-      .setMemoryBytesSpilled(input.memoryBytesSpilled)
-      .setDiskBytesSpilled(input.diskBytesSpilled)
-      .setIsBlacklistedForStage(input.isBlacklistedForStage)
-      .setIsExcludedForStage(input.isExcludedForStage)
-    input.peakMemoryMetrics.map { m =>
-      builder.setPeakMemoryMetrics(ExecutorMetricsSerializer.serialize(m))
-    }
-    builder.build()
-  }
-
-  def deserializeExecutorStageSummary(
-      binary: StoreTypes.ExecutorStageSummary): ExecutorStageSummary = {
-    val peakMemoryMetrics =
-      getOptional(binary.hasPeakMemoryMetrics,
-        () => 
ExecutorMetricsSerializer.deserialize(binary.getPeakMemoryMetrics))
-    new ExecutorStageSummary(
-      taskTime = binary.getTaskTime,
-      failedTasks = binary.getFailedTasks,
-      succeededTasks = binary.getSucceededTasks,
-      killedTasks = binary.getKilledTasks,
-      inputBytes = binary.getInputBytes,
-      inputRecords = binary.getInputRecords,
-      outputBytes = binary.getOutputBytes,
-      outputRecords = binary.getOutputRecords,
-      shuffleRead = binary.getShuffleRead,
-      shuffleReadRecords = binary.getShuffleReadRecords,
-      shuffleWrite = binary.getShuffleWrite,
-      shuffleWriteRecords = binary.getShuffleWriteRecords,
-      memoryBytesSpilled = binary.getMemoryBytesSpilled,
-      diskBytesSpilled = binary.getDiskBytesSpilled,
-      isBlacklistedForStage = binary.getIsBlacklistedForStage,
-      peakMemoryMetrics = peakMemoryMetrics,
-      isExcludedForStage = binary.getIsExcludedForStage)
-  }
 }
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
new file mode 100644
index 00000000000..3ab0245898d
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
@@ -0,0 +1,616 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf
+
+import java.util.Date
+
+import collection.JavaConverters._
+import org.apache.commons.collections4.MapUtils
+
+import org.apache.spark.status.StageDataWrapper
+import org.apache.spark.status.api.v1.{ExecutorMetricsDistributions, 
ExecutorPeakMetricsDistributions, InputMetricDistributions, InputMetrics, 
OutputMetricDistributions, OutputMetrics, ShuffleReadMetricDistributions, 
ShuffleReadMetrics, ShuffleWriteMetricDistributions, ShuffleWriteMetrics, 
SpeculationStageSummary, StageData, TaskData, TaskMetricDistributions, 
TaskMetrics}
+import org.apache.spark.status.protobuf.Utils.getOptional
+import org.apache.spark.util.Utils.weakIntern
+
+class StageDataWrapperSerializer extends ProtobufSerDe {
+
+  override val supportClass: Class[_] = classOf[StageDataWrapper]
+
+  override def serialize(input: Any): Array[Byte] = {
+    val s = input.asInstanceOf[StageDataWrapper]
+    val builder = StoreTypes.StageDataWrapper.newBuilder()
+    builder.setInfo(serializeStageData(s.info))
+    s.jobIds.foreach(id => builder.addJobIds(id.toLong))
+    s.locality.foreach { entry =>
+      builder.putLocality(entry._1, entry._2)
+    }
+    builder.build().toByteArray
+  }
+
+  private def serializeStageData(stageData: StageData): StoreTypes.StageData = 
{
+    val stageDataBuilder = StoreTypes.StageData.newBuilder()
+    stageDataBuilder
+      .setStatus(StageStatusSerializer.serialize(stageData.status))
+      .setStageId(stageData.stageId.toLong)
+      .setAttemptId(stageData.attemptId)
+      .setNumTasks(stageData.numTasks)
+      .setNumActiveTasks(stageData.numActiveTasks)
+      .setNumCompleteTasks(stageData.numCompleteTasks)
+      .setNumFailedTasks(stageData.numFailedTasks)
+      .setNumKilledTasks(stageData.numKilledTasks)
+      .setNumCompletedIndices(stageData.numCompletedIndices)
+      .setExecutorDeserializeTime(stageData.executorDeserializeTime)
+      .setExecutorDeserializeCpuTime(stageData.executorDeserializeCpuTime)
+      .setExecutorRunTime(stageData.executorRunTime)
+      .setExecutorCpuTime(stageData.executorCpuTime)
+      .setResultSize(stageData.resultSize)
+      .setJvmGcTime(stageData.jvmGcTime)
+      .setResultSerializationTime(stageData.resultSerializationTime)
+      .setMemoryBytesSpilled(stageData.memoryBytesSpilled)
+      .setDiskBytesSpilled(stageData.diskBytesSpilled)
+      .setPeakExecutionMemory(stageData.peakExecutionMemory)
+      .setInputBytes(stageData.inputBytes)
+      .setInputRecords(stageData.inputRecords)
+      .setOutputBytes(stageData.outputBytes)
+      .setOutputRecords(stageData.outputRecords)
+      .setShuffleRemoteBlocksFetched(stageData.shuffleRemoteBlocksFetched)
+      .setShuffleLocalBlocksFetched(stageData.shuffleLocalBlocksFetched)
+      .setShuffleFetchWaitTime(stageData.shuffleFetchWaitTime)
+      .setShuffleRemoteBytesRead(stageData.shuffleRemoteBytesRead)
+      .setShuffleRemoteBytesReadToDisk(stageData.shuffleRemoteBytesReadToDisk)
+      .setShuffleLocalBytesRead(stageData.shuffleLocalBytesRead)
+      .setShuffleReadBytes(stageData.shuffleReadBytes)
+      .setShuffleReadRecords(stageData.shuffleReadRecords)
+      .setShuffleWriteBytes(stageData.shuffleWriteBytes)
+      .setShuffleWriteTime(stageData.shuffleWriteTime)
+      .setShuffleWriteRecords(stageData.shuffleWriteRecords)
+      .setName(stageData.name)
+      .setDetails(stageData.details)
+      .setSchedulingPool(stageData.schedulingPool)
+      .setResourceProfileId(stageData.resourceProfileId)
+    stageData.submissionTime.foreach { d =>
+      stageDataBuilder.setSubmissionTime(d.getTime)
+    }
+    stageData.firstTaskLaunchedTime.foreach { d =>
+      stageDataBuilder.setFirstTaskLaunchedTime(d.getTime)
+    }
+    stageData.completionTime.foreach { d =>
+      stageDataBuilder.setCompletionTime(d.getTime)
+    }
+    stageData.failureReason.foreach { fr =>
+      stageDataBuilder.setFailureReason(fr)
+    }
+    stageData.description.foreach { d =>
+      stageDataBuilder.setDescription(d)
+    }
+    stageData.rddIds.foreach(id => stageDataBuilder.addRddIds(id.toLong))
+    stageData.accumulatorUpdates.foreach { update =>
+      stageDataBuilder.addAccumulatorUpdates(
+        AccumulableInfoSerializer.serialize(update))
+    }
+    stageData.tasks.foreach { t =>
+      t.foreach { entry =>
+        stageDataBuilder.putTasks(entry._1, serializeTaskData(entry._2))
+      }
+    }
+    stageData.executorSummary.foreach { es =>
+      es.foreach { entry =>
+        stageDataBuilder.putExecutorSummary(entry._1,
+          ExecutorStageSummarySerializer.serialize(entry._2))
+      }
+    }
+    stageData.speculationSummary.foreach { ss =>
+      
stageDataBuilder.setSpeculationSummary(serializeSpeculationStageSummary(ss))
+    }
+    stageData.killedTasksSummary.foreach { entry =>
+      stageDataBuilder.putKilledTasksSummary(entry._1, entry._2)
+    }
+    stageData.peakExecutorMetrics.foreach { pem =>
+      
stageDataBuilder.setPeakExecutorMetrics(ExecutorMetricsSerializer.serialize(pem))
+    }
+    stageData.taskMetricsDistributions.foreach { tmd =>
+      
stageDataBuilder.setTaskMetricsDistributions(serializeTaskMetricDistributions(tmd))
+    }
+    stageData.executorMetricsDistributions.foreach { emd =>
+      
stageDataBuilder.setExecutorMetricsDistributions(serializeExecutorMetricsDistributions(emd))
+    }
+    stageDataBuilder.build()
+  }
+
+  private def serializeTaskData(t: TaskData): StoreTypes.TaskData = {
+    val taskDataBuilder = StoreTypes.TaskData.newBuilder()
+    taskDataBuilder
+      .setTaskId(t.taskId)
+      .setIndex(t.index)
+      .setAttempt(t.attempt)
+      .setPartitionId(t.partitionId)
+      .setLaunchTime(t.launchTime.getTime)
+      .setExecutorId(t.executorId)
+      .setHost(t.host)
+      .setStatus(t.status)
+      .setTaskLocality(t.taskLocality)
+      .setSpeculative(t.speculative)
+      .setSchedulerDelay(t.schedulerDelay)
+      .setGettingResultTime(t.gettingResultTime)
+    t.resultFetchStart.foreach { rfs =>
+      taskDataBuilder.setResultFetchStart(rfs.getTime)
+    }
+    t.duration.foreach { d =>
+      taskDataBuilder.setDuration(d)
+    }
+    t.accumulatorUpdates.foreach { update =>
+      taskDataBuilder.addAccumulatorUpdates(
+        AccumulableInfoSerializer.serialize(update))
+    }
+    t.errorMessage.foreach { em =>
+      taskDataBuilder.setErrorMessage(em)
+    }
+    t.taskMetrics.foreach { tm =>
+      taskDataBuilder.setTaskMetrics(serializeTaskMetrics(tm))
+    }
+    t.executorLogs.foreach { entry =>
+      taskDataBuilder.putExecutorLogs(entry._1, entry._2)
+    }
+    taskDataBuilder.build()
+  }
+
+  private def serializeTaskMetrics(tm: TaskMetrics): StoreTypes.TaskMetrics = {
+    val taskMetricsBuilder = StoreTypes.TaskMetrics.newBuilder()
+    taskMetricsBuilder
+      .setExecutorDeserializeTime(tm.executorDeserializeTime)
+      .setExecutorDeserializeCpuTime(tm.executorDeserializeCpuTime)
+      .setExecutorRunTime(tm.executorRunTime)
+      .setExecutorCpuTime(tm.executorCpuTime)
+      .setResultSize(tm.resultSize)
+      .setJvmGcTime(tm.jvmGcTime)
+      .setResultSerializationTime(tm.resultSerializationTime)
+      .setMemoryBytesSpilled(tm.memoryBytesSpilled)
+      .setDiskBytesSpilled(tm.diskBytesSpilled)
+      .setPeakExecutionMemory(tm.peakExecutionMemory)
+      .setInputMetrics(serializeInputMetrics(tm.inputMetrics))
+      .setOutputMetrics(serializeOutputMetrics(tm.outputMetrics))
+      
.setShuffleReadMetrics(serializeShuffleReadMetrics(tm.shuffleReadMetrics))
+      
.setShuffleWriteMetrics(serializeShuffleWriteMetrics(tm.shuffleWriteMetrics))
+    taskMetricsBuilder.build()
+  }
+
+  private def serializeInputMetrics(im: InputMetrics): StoreTypes.InputMetrics 
= {
+    StoreTypes.InputMetrics.newBuilder()
+      .setBytesRead(im.bytesRead)
+      .setRecordsRead(im.recordsRead)
+      .build()
+  }
+
+  private def serializeOutputMetrics(om: OutputMetrics): 
StoreTypes.OutputMetrics = {
+    StoreTypes.OutputMetrics.newBuilder()
+      .setBytesWritten(om.bytesWritten)
+      .setRecordsWritten(om.recordsWritten)
+      .build()
+  }
+
+  private def serializeShuffleReadMetrics(
+      srm: ShuffleReadMetrics): StoreTypes.ShuffleReadMetrics = {
+    StoreTypes.ShuffleReadMetrics.newBuilder()
+      .setRemoteBlocksFetched(srm.remoteBlocksFetched)
+      .setLocalBlocksFetched(srm.localBlocksFetched)
+      .setFetchWaitTime(srm.fetchWaitTime)
+      .setRemoteBytesRead(srm.remoteBytesRead)
+      .setRemoteBytesReadToDisk(srm.remoteBytesReadToDisk)
+      .setLocalBytesRead(srm.localBytesRead)
+      .setRecordsRead(srm.recordsRead)
+      .build()
+  }
+
+  private def serializeShuffleWriteMetrics(
+      swm: ShuffleWriteMetrics): StoreTypes.ShuffleWriteMetrics = {
+    StoreTypes.ShuffleWriteMetrics.newBuilder()
+      .setBytesWritten(swm.bytesWritten)
+      .setWriteTime(swm.writeTime)
+      .setRecordsWritten(swm.recordsWritten)
+      .build()
+  }
+
+  private def serializeSpeculationStageSummary(
+      sss: SpeculationStageSummary): StoreTypes.SpeculationStageSummary = {
+    StoreTypes.SpeculationStageSummary.newBuilder()
+      .setNumTasks(sss.numTasks)
+      .setNumActiveTasks(sss.numActiveTasks)
+      .setNumCompletedTasks(sss.numCompletedTasks)
+      .setNumFailedTasks(sss.numFailedTasks)
+      .setNumKilledTasks(sss.numKilledTasks)
+      .build()
+  }
+
+  private def serializeTaskMetricDistributions(
+      tmd: TaskMetricDistributions): StoreTypes.TaskMetricDistributions = {
+    val builder = StoreTypes.TaskMetricDistributions.newBuilder()
+    tmd.quantiles.foreach(q => builder.addQuantiles(q))
+    tmd.duration.foreach(d => builder.addDuration(d))
+    tmd.executorDeserializeTime.foreach(edt => 
builder.addExecutorDeserializeTime(edt))
+    tmd.executorDeserializeCpuTime.foreach(edct => 
builder.addExecutorDeserializeCpuTime(edct))
+    tmd.executorRunTime.foreach(ert => builder.addExecutorRunTime(ert))
+    tmd.executorCpuTime.foreach(ect => builder.addExecutorCpuTime(ect))
+    tmd.resultSize.foreach(rs => builder.addResultSize(rs))
+    tmd.jvmGcTime.foreach(jgt => builder.addJvmGcTime(jgt))
+    tmd.resultSerializationTime.foreach(rst => 
builder.addResultSerializationTime(rst))
+    tmd.gettingResultTime.foreach(grt => builder.addGettingResultTime(grt))
+    tmd.schedulerDelay.foreach(sd => builder.addSchedulerDelay(sd))
+    tmd.peakExecutionMemory.foreach(pem => builder.addPeakExecutionMemory(pem))
+    tmd.memoryBytesSpilled.foreach(mbs => builder.addMemoryBytesSpilled(mbs))
+    tmd.diskBytesSpilled.foreach(dbs => builder.addDiskBytesSpilled(dbs))
+    builder
+      .setInputMetrics(serializeInputMetricDistributions(tmd.inputMetrics))
+      .setOutputMetrics(serializeOutputMetricDistributions(tmd.outputMetrics))
+      
.setShuffleReadMetrics(serializeShuffleReadMetricDistributions(tmd.shuffleReadMetrics))
+      
.setShuffleWriteMetrics(serializeShuffleWriteMetricDistributions(tmd.shuffleWriteMetrics))
+      .build()
+  }
+
+  private def serializeInputMetricDistributions(
+      imd: InputMetricDistributions): StoreTypes.InputMetricDistributions = {
+    val builder = StoreTypes.InputMetricDistributions.newBuilder()
+    imd.bytesRead.foreach(br => builder.addBytesRead(br))
+    imd.recordsRead.foreach(rr => builder.addRecordsRead(rr))
+    builder.build()
+  }
+
+  private def serializeOutputMetricDistributions(
+      omd: OutputMetricDistributions): StoreTypes.OutputMetricDistributions = {
+    val builder = StoreTypes.OutputMetricDistributions.newBuilder()
+    omd.bytesWritten.foreach(bw => builder.addBytesWritten(bw))
+    omd.recordsWritten.foreach(rw => builder.addRecordsWritten(rw))
+    builder.build()
+  }
+
+  private def serializeShuffleReadMetricDistributions(
+      srmd: ShuffleReadMetricDistributions): 
StoreTypes.ShuffleReadMetricDistributions = {
+    val builder = StoreTypes.ShuffleReadMetricDistributions.newBuilder()
+    srmd.readBytes.foreach(rb => builder.addReadBytes(rb))
+    srmd.readRecords.foreach(rr => builder.addReadRecords(rr))
+    srmd.remoteBlocksFetched.foreach(rbf => 
builder.addRemoteBlocksFetched(rbf))
+    srmd.localBlocksFetched.foreach(lbf => builder.addLocalBlocksFetched(lbf))
+    srmd.fetchWaitTime.foreach(fwt => builder.addFetchWaitTime(fwt))
+    srmd.remoteBytesRead.foreach(rbr => builder.addRemoteBytesRead(rbr))
+    srmd.remoteBytesReadToDisk.foreach(rbrtd => 
builder.addRemoteBytesReadToDisk(rbrtd))
+    srmd.totalBlocksFetched.foreach(tbf => builder.addTotalBlocksFetched(tbf))
+    builder.build()
+  }
+
+  private def serializeShuffleWriteMetricDistributions(
+      swmd: ShuffleWriteMetricDistributions): 
StoreTypes.ShuffleWriteMetricDistributions = {
+    val builder = StoreTypes.ShuffleWriteMetricDistributions.newBuilder()
+    swmd.writeBytes.foreach(wb => builder.addWriteBytes(wb))
+    swmd.writeRecords.foreach(wr => builder.addWriteRecords(wr))
+    swmd.writeTime.foreach(wt => builder.addWriteTime(wt))
+    builder.build()
+  }
+
+  private def serializeExecutorMetricsDistributions(
+      emd: ExecutorMetricsDistributions): 
StoreTypes.ExecutorMetricsDistributions = {
+    val builder = StoreTypes.ExecutorMetricsDistributions.newBuilder()
+    emd.quantiles.foreach(q => builder.addQuantiles(q))
+    emd.taskTime.foreach(tt => builder.addTaskTime(tt))
+    emd.failedTasks.foreach(ft => builder.addFailedTasks(ft))
+    emd.succeededTasks.foreach(st => builder.addSucceededTasks(st))
+    emd.killedTasks.foreach(kt => builder.addKilledTasks(kt))
+    emd.inputBytes.foreach(ib => builder.addInputBytes(ib))
+    emd.inputRecords.foreach(ir => builder.addInputRecords(ir))
+    emd.outputBytes.foreach(ob => builder.addOutputBytes(ob))
+    emd.outputRecords.foreach(or => builder.addOutputRecords(or))
+    emd.shuffleRead.foreach(sr => builder.addShuffleRead(sr))
+    emd.shuffleReadRecords.foreach(srr => builder.addShuffleReadRecords(srr))
+    emd.shuffleWrite.foreach(sw => builder.addShuffleWrite(sw))
+    emd.shuffleWriteRecords.foreach(swr => builder.addShuffleWriteRecords(swr))
+    emd.memoryBytesSpilled.foreach(mbs => builder.addMemoryBytesSpilled(mbs))
+    emd.diskBytesSpilled.foreach(dbs => builder.addDiskBytesSpilled(dbs))
+    
builder.setPeakMemoryMetrics(serializeExecutorPeakMetricsDistributions(emd.peakMemoryMetrics))
+    builder.build()
+  }
+
+  private def serializeExecutorPeakMetricsDistributions(
+      epmd: ExecutorPeakMetricsDistributions): 
StoreTypes.ExecutorPeakMetricsDistributions = {
+    val builder = StoreTypes.ExecutorPeakMetricsDistributions.newBuilder()
+    epmd.quantiles.foreach(q => builder.addQuantiles(q))
+    epmd.executorMetrics.foreach(em => builder.addExecutorMetrics(
+      ExecutorMetricsSerializer.serialize(em)))
+    builder.build()
+  }
+
+  override def deserialize(bytes: Array[Byte]): StageDataWrapper = {
+    val binary = StoreTypes.StageDataWrapper.parseFrom(bytes)
+    val info = deserializeStageData(binary.getInfo)
+    new StageDataWrapper(
+      info = info,
+      jobIds = binary.getJobIdsList.asScala.map(_.toInt).toSet,
+      locality = binary.getLocalityMap.asScala.mapValues(_.toLong).toMap
+    )
+  }
+
+  private def deserializeStageData(binary: StoreTypes.StageData): StageData = {
+    val status = StageStatusSerializer.deserialize(binary.getStatus)
+    val submissionTime =
+      getOptional(binary.hasSubmissionTime, () => new 
Date(binary.getSubmissionTime))
+    val firstTaskLaunchedTime =
+      getOptional(binary.hasFirstTaskLaunchedTime, () => new 
Date(binary.getFirstTaskLaunchedTime))
+    val completionTime =
+      getOptional(binary.hasCompletionTime, () => new 
Date(binary.getCompletionTime))
+    val failureReason =
+      getOptional(binary.hasFailureReason, () => 
weakIntern(binary.getFailureReason))
+    val description =
+      getOptional(binary.hasDescription, () => 
weakIntern(binary.getDescription))
+    val accumulatorUpdates = 
AccumulableInfoSerializer.deserialize(binary.getAccumulatorUpdatesList)
+    val tasks = if (MapUtils.isNotEmpty(binary.getTasksMap)) {
+      Some(binary.getTasksMap.asScala.map(
+        entry => (entry._1.toLong, deserializeTaskData(entry._2))).toMap)
+    } else None
+    val executorSummary = if 
(MapUtils.isNotEmpty(binary.getExecutorSummaryMap)) {
+      Some(binary.getExecutorSummaryMap.asScala.mapValues(
+        ExecutorStageSummarySerializer.deserialize).toMap)
+    } else None
+    val speculationSummary =
+      getOptional(binary.hasSpeculationSummary,
+        () => deserializeSpeculationStageSummary(binary.getSpeculationSummary))
+    val peakExecutorMetrics =
+      getOptional(binary.hasPeakExecutorMetrics,
+        () => 
ExecutorMetricsSerializer.deserialize(binary.getPeakExecutorMetrics))
+    val taskMetricsDistributions =
+      getOptional(binary.hasTaskMetricsDistributions,
+        () => 
deserializeTaskMetricDistributions(binary.getTaskMetricsDistributions))
+    val executorMetricsDistributions =
+      getOptional(binary.hasExecutorMetricsDistributions,
+        () => 
deserializeExecutorMetricsDistributions(binary.getExecutorMetricsDistributions))
+    new StageData(
+      status = status,
+      stageId = binary.getStageId.toInt,
+      attemptId = binary.getAttemptId,
+      numTasks = binary.getNumTasks,
+      numActiveTasks = binary.getNumActiveTasks,
+      numCompleteTasks = binary.getNumCompleteTasks,
+      numFailedTasks = binary.getNumFailedTasks,
+      numKilledTasks = binary.getNumKilledTasks,
+      numCompletedIndices = binary.getNumCompletedIndices,
+      submissionTime = submissionTime,
+      firstTaskLaunchedTime = firstTaskLaunchedTime,
+      completionTime = completionTime,
+      failureReason = failureReason,
+      executorDeserializeTime = binary.getExecutorDeserializeTime,
+      executorDeserializeCpuTime = binary.getExecutorDeserializeCpuTime,
+      executorRunTime = binary.getExecutorRunTime,
+      executorCpuTime = binary.getExecutorCpuTime,
+      resultSize = binary.getResultSize,
+      jvmGcTime = binary.getJvmGcTime,
+      resultSerializationTime = binary.getResultSerializationTime,
+      memoryBytesSpilled = binary.getMemoryBytesSpilled,
+      diskBytesSpilled = binary.getDiskBytesSpilled,
+      peakExecutionMemory = binary.getPeakExecutionMemory,
+      inputBytes = binary.getInputBytes,
+      inputRecords = binary.getInputRecords,
+      outputBytes = binary.getOutputBytes,
+      outputRecords = binary.getOutputRecords,
+      shuffleRemoteBlocksFetched = binary.getShuffleRemoteBlocksFetched,
+      shuffleLocalBlocksFetched = binary.getShuffleLocalBlocksFetched,
+      shuffleFetchWaitTime = binary.getShuffleFetchWaitTime,
+      shuffleRemoteBytesRead = binary.getShuffleRemoteBytesRead,
+      shuffleRemoteBytesReadToDisk = binary.getShuffleRemoteBytesReadToDisk,
+      shuffleLocalBytesRead = binary.getShuffleLocalBytesRead,
+      shuffleReadBytes = binary.getShuffleReadBytes,
+      shuffleReadRecords = binary.getShuffleReadRecords,
+      shuffleWriteBytes = binary.getShuffleWriteBytes,
+      shuffleWriteTime = binary.getShuffleWriteTime,
+      shuffleWriteRecords = binary.getShuffleWriteRecords,
+      name = weakIntern(binary.getName),
+      description = description,
+      details = weakIntern(binary.getDetails),
+      schedulingPool = weakIntern(binary.getSchedulingPool),
+      rddIds = binary.getRddIdsList.asScala.map(_.toInt),
+      accumulatorUpdates = accumulatorUpdates,
+      tasks = tasks,
+      executorSummary = executorSummary,
+      speculationSummary = speculationSummary,
+      killedTasksSummary = 
binary.getKilledTasksSummaryMap.asScala.mapValues(_.toInt).toMap,
+      resourceProfileId = binary.getResourceProfileId,
+      peakExecutorMetrics = peakExecutorMetrics,
+      taskMetricsDistributions = taskMetricsDistributions,
+      executorMetricsDistributions = executorMetricsDistributions
+    )
+  }
+
+  private def deserializeSpeculationStageSummary(
+      binary: StoreTypes.SpeculationStageSummary): SpeculationStageSummary = {
+    new SpeculationStageSummary(
+      binary.getNumTasks,
+      binary.getNumActiveTasks,
+      binary.getNumCompletedTasks,
+      binary.getNumFailedTasks,
+      binary.getNumKilledTasks
+    )
+  }
+
+  private def deserializeTaskMetricDistributions(
+      binary: StoreTypes.TaskMetricDistributions): TaskMetricDistributions = {
+    new TaskMetricDistributions(
+      quantiles = binary.getQuantilesList.asScala.map(_.toDouble).toIndexedSeq,
+      duration = binary.getDurationList.asScala.map(_.toDouble).toIndexedSeq,
+      executorDeserializeTime =
+        
binary.getExecutorDeserializeTimeList.asScala.map(_.toDouble).toIndexedSeq,
+      executorDeserializeCpuTime =
+        
binary.getExecutorDeserializeCpuTimeList.asScala.map(_.toDouble).toIndexedSeq,
+      executorRunTime = 
binary.getExecutorRunTimeList.asScala.map(_.toDouble).toIndexedSeq,
+      executorCpuTime = 
binary.getExecutorCpuTimeList.asScala.map(_.toDouble).toIndexedSeq,
+      resultSize = 
binary.getResultSizeList.asScala.map(_.toDouble).toIndexedSeq,
+      jvmGcTime = binary.getJvmGcTimeList.asScala.map(_.toDouble).toIndexedSeq,
+      resultSerializationTime =
+        
binary.getResultSerializationTimeList.asScala.map(_.toDouble).toIndexedSeq,
+      gettingResultTime = 
binary.getGettingResultTimeList.asScala.map(_.toDouble).toIndexedSeq,
+      schedulerDelay = 
binary.getSchedulerDelayList.asScala.map(_.toDouble).toIndexedSeq,
+      peakExecutionMemory = 
binary.getPeakExecutionMemoryList.asScala.map(_.toDouble).toIndexedSeq,
+      memoryBytesSpilled = 
binary.getMemoryBytesSpilledList.asScala.map(_.toDouble).toIndexedSeq,
+      diskBytesSpilled = 
binary.getDiskBytesSpilledList.asScala.map(_.toDouble).toIndexedSeq,
+      inputMetrics = 
deserializeInputMetricDistributions(binary.getInputMetrics),
+      outputMetrics = 
deserializeOutputMetricDistributions(binary.getOutputMetrics),
+      shuffleReadMetrics = 
deserializeShuffleReadMetricDistributions(binary.getShuffleReadMetrics),
+      shuffleWriteMetrics =
+        
deserializeShuffleWriteMetricDistributions(binary.getShuffleWriteMetrics)
+    )
+  }
+
+  private def deserializeInputMetricDistributions(
+      binary: StoreTypes.InputMetricDistributions): InputMetricDistributions = 
{
+    new InputMetricDistributions(
+      bytesRead = binary.getBytesReadList.asScala.map(_.toDouble).toIndexedSeq,
+      recordsRead = 
binary.getRecordsReadList.asScala.map(_.toDouble).toIndexedSeq
+    )
+  }
+
+  private def deserializeOutputMetricDistributions(
+      binary: StoreTypes.OutputMetricDistributions): OutputMetricDistributions 
= {
+    new OutputMetricDistributions(
+      bytesWritten = 
binary.getBytesWrittenList.asScala.map(_.toDouble).toIndexedSeq,
+      recordsWritten = 
binary.getRecordsWrittenList.asScala.map(_.toDouble).toIndexedSeq
+    )
+  }
+
+  private def deserializeShuffleReadMetricDistributions(
+      binary: StoreTypes.ShuffleReadMetricDistributions): 
ShuffleReadMetricDistributions = {
+    new ShuffleReadMetricDistributions(
+      readBytes = binary.getReadBytesList.asScala.map(_.toDouble).toIndexedSeq,
+      readRecords = 
binary.getReadRecordsList.asScala.map(_.toDouble).toIndexedSeq,
+      remoteBlocksFetched = 
binary.getRemoteBlocksFetchedList.asScala.map(_.toDouble).toIndexedSeq,
+      localBlocksFetched = 
binary.getLocalBlocksFetchedList.asScala.map(_.toDouble).toIndexedSeq,
+      fetchWaitTime = 
binary.getFetchWaitTimeList.asScala.map(_.toDouble).toIndexedSeq,
+      remoteBytesRead = 
binary.getRemoteBytesReadList.asScala.map(_.toDouble).toIndexedSeq,
+      remoteBytesReadToDisk =
+        
binary.getRemoteBytesReadToDiskList.asScala.map(_.toDouble).toIndexedSeq,
+      totalBlocksFetched = 
binary.getTotalBlocksFetchedList.asScala.map(_.toDouble).toIndexedSeq
+    )
+  }
+
+  private def deserializeShuffleWriteMetricDistributions(
+      binary: StoreTypes.ShuffleWriteMetricDistributions): 
ShuffleWriteMetricDistributions = {
+    new ShuffleWriteMetricDistributions(
+      writeBytes = 
binary.getWriteBytesList.asScala.map(_.toDouble).toIndexedSeq,
+      writeRecords = 
binary.getWriteRecordsList.asScala.map(_.toDouble).toIndexedSeq,
+      writeTime = binary.getWriteTimeList.asScala.map(_.toDouble).toIndexedSeq
+    )
+  }
+
+  private def deserializeExecutorMetricsDistributions(
+      binary: StoreTypes.ExecutorMetricsDistributions): 
ExecutorMetricsDistributions = {
+    new ExecutorMetricsDistributions(
+      quantiles = binary.getQuantilesList.asScala.map(_.toDouble).toIndexedSeq,
+      taskTime = binary.getTaskTimeList.asScala.map(_.toDouble).toIndexedSeq,
+      failedTasks = 
binary.getFailedTasksList.asScala.map(_.toDouble).toIndexedSeq,
+      succeededTasks = 
binary.getSucceededTasksList.asScala.map(_.toDouble).toIndexedSeq,
+      killedTasks = 
binary.getKilledTasksList.asScala.map(_.toDouble).toIndexedSeq,
+      inputBytes = 
binary.getInputBytesList.asScala.map(_.toDouble).toIndexedSeq,
+      inputRecords = 
binary.getInputRecordsList.asScala.map(_.toDouble).toIndexedSeq,
+      outputBytes = 
binary.getOutputBytesList.asScala.map(_.toDouble).toIndexedSeq,
+      outputRecords = 
binary.getOutputRecordsList.asScala.map(_.toDouble).toIndexedSeq,
+      shuffleRead = 
binary.getShuffleReadList.asScala.map(_.toDouble).toIndexedSeq,
+      shuffleReadRecords = 
binary.getShuffleReadRecordsList.asScala.map(_.toDouble).toIndexedSeq,
+      shuffleWrite = 
binary.getShuffleWriteList.asScala.map(_.toDouble).toIndexedSeq,
+      shuffleWriteRecords = 
binary.getShuffleWriteRecordsList.asScala.map(_.toDouble).toIndexedSeq,
+      memoryBytesSpilled = 
binary.getMemoryBytesSpilledList.asScala.map(_.toDouble).toIndexedSeq,
+      diskBytesSpilled = 
binary.getDiskBytesSpilledList.asScala.map(_.toDouble).toIndexedSeq,
+      peakMemoryMetrics = 
deserializeExecutorPeakMetricsDistributions(binary.getPeakMemoryMetrics)
+    )
+  }
+
+  private def deserializeExecutorPeakMetricsDistributions(
+      binary: StoreTypes.ExecutorPeakMetricsDistributions): 
ExecutorPeakMetricsDistributions = {
+    new ExecutorPeakMetricsDistributions(
+      quantiles = binary.getQuantilesList.asScala.map(_.toDouble).toIndexedSeq,
+      executorMetrics = binary.getExecutorMetricsList.asScala.map(
+        ExecutorMetricsSerializer.deserialize).toIndexedSeq
+    )
+  }
+
+  private def deserializeTaskData(binary: StoreTypes.TaskData): TaskData = {
+    val resultFetchStart = getOptional(binary.hasResultFetchStart,
+      () => new Date(binary.getResultFetchStart))
+    val duration = getOptional(binary.hasDuration, () => binary.getDuration)
+    val accumulatorUpdates = 
AccumulableInfoSerializer.deserialize(binary.getAccumulatorUpdatesList)
+    val taskMetrics = getOptional(binary.hasTaskMetrics,
+      () => deserializeTaskMetrics(binary.getTaskMetrics))
+    new TaskData(
+      taskId = binary.getTaskId,
+      index = binary.getIndex,
+      attempt = binary.getAttempt,
+      partitionId = binary.getPartitionId,
+      launchTime = new Date(binary.getLaunchTime),
+      resultFetchStart = resultFetchStart,
+      duration = duration,
+      executorId = weakIntern(binary.getExecutorId),
+      host = weakIntern(binary.getHost),
+      status = weakIntern(binary.getStatus),
+      taskLocality = weakIntern(binary.getTaskLocality),
+      speculative = binary.getSpeculative,
+      accumulatorUpdates = accumulatorUpdates,
+      errorMessage = getOptional(binary.hasErrorMessage, () => 
weakIntern(binary.getErrorMessage)),
+      taskMetrics = taskMetrics,
+      executorLogs = binary.getExecutorLogsMap.asScala.toMap,
+      schedulerDelay = binary.getSchedulerDelay,
+      gettingResultTime = binary.getGettingResultTime)
+  }
+
+  private def deserializeTaskMetrics(binary: StoreTypes.TaskMetrics): 
TaskMetrics = {
+    new TaskMetrics(
+      binary.getExecutorDeserializeTime,
+      binary.getExecutorDeserializeCpuTime,
+      binary.getExecutorRunTime,
+      binary.getExecutorCpuTime,
+      binary.getResultSize,
+      binary.getJvmGcTime,
+      binary.getResultSerializationTime,
+      binary.getMemoryBytesSpilled,
+      binary.getDiskBytesSpilled,
+      binary.getPeakExecutionMemory,
+      deserializeInputMetrics(binary.getInputMetrics),
+      deserializeOutputMetrics(binary.getOutputMetrics),
+      deserializeShuffleReadMetrics(binary.getShuffleReadMetrics),
+      deserializeShuffleWriteMetrics(binary.getShuffleWriteMetrics))
+  }
+
+  private def deserializeInputMetrics(binary: StoreTypes.InputMetrics): 
InputMetrics = {
+    new InputMetrics(binary.getBytesRead, binary.getRecordsRead)
+  }
+
+  private def deserializeOutputMetrics(binary: StoreTypes.OutputMetrics): 
OutputMetrics = {
+    new OutputMetrics(binary.getBytesWritten, binary.getRecordsWritten)
+  }
+
+  private def deserializeShuffleReadMetrics(
+      binary: StoreTypes.ShuffleReadMetrics): ShuffleReadMetrics = {
+    new ShuffleReadMetrics(
+      binary.getRemoteBlocksFetched,
+      binary.getLocalBlocksFetched,
+      binary.getFetchWaitTime,
+      binary.getRemoteBytesRead,
+      binary.getRemoteBytesReadToDisk,
+      binary.getLocalBytesRead,
+      binary.getRecordsRead)
+  }
+
+  private def deserializeShuffleWriteMetrics(
+      binary: StoreTypes.ShuffleWriteMetrics): ShuffleWriteMetrics = {
+    new ShuffleWriteMetrics(
+      binary.getBytesWritten,
+      binary.getWriteTime,
+      binary.getRecordsWritten)
+  }
+}
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/StageStatusSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/StageStatusSerializer.scala
new file mode 100644
index 00000000000..6014379bb1e
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/StageStatusSerializer.scala
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf
+
+import org.apache.commons.lang3.StringUtils
+
+import org.apache.spark.status.api.v1.StageStatus
+
+private[protobuf] object StageStatusSerializer {
+
+  private def PREFIX = "STAGE_STATUS_"
+
+  def serialize(input: StageStatus): StoreTypes.StageStatus = {
+    StoreTypes.StageStatus.valueOf(PREFIX + input.toString)
+  }
+
+  def deserialize(binary: StoreTypes.StageStatus): StageStatus = {
+    StageStatus.valueOf(StringUtils.removeStart(binary.toString, PREFIX))
+  }
+}
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala
index 80f258aead6..155a0348398 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala
@@ -17,10 +17,7 @@
 
 package org.apache.spark.status.protobuf
 
-import scala.collection.mutable.ArrayBuffer
-
 import org.apache.spark.status.TaskDataWrapper
-import org.apache.spark.status.api.v1.AccumulableInfo
 import org.apache.spark.status.protobuf.Utils.getOptional
 import org.apache.spark.util.Utils.weakIntern
 
@@ -74,21 +71,14 @@ class TaskDataWrapperSerializer extends ProtobufSerDe {
       .setStageAttemptId(input.stageAttemptId)
     input.errorMessage.foreach(builder.setErrorMessage)
     input.accumulatorUpdates.foreach { update =>
-      builder.addAccumulatorUpdates(serializeAccumulableInfo(update))
+      
builder.addAccumulatorUpdates(AccumulableInfoSerializer.serialize(update))
     }
     builder.build().toByteArray
   }
 
   def deserialize(bytes: Array[Byte]): TaskDataWrapper = {
     val binary = StoreTypes.TaskDataWrapper.parseFrom(bytes)
-    val accumulatorUpdates = new ArrayBuffer[AccumulableInfo]()
-    binary.getAccumulatorUpdatesList.forEach { update =>
-      accumulatorUpdates.append(new AccumulableInfo(
-        id = update.getId,
-        name = update.getName,
-        update = getOptional(update.hasUpdate, update.getUpdate),
-        value = update.getValue))
-    }
+    val accumulatorUpdates = 
AccumulableInfoSerializer.deserialize(binary.getAccumulatorUpdatesList)
     new TaskDataWrapper(
       taskId = binary.getTaskId,
       index = binary.getIndex,
@@ -133,13 +123,4 @@ class TaskDataWrapperSerializer extends ProtobufSerDe {
       stageAttemptId = binary.getStageAttemptId
     )
   }
-
-  def serializeAccumulableInfo(input: AccumulableInfo): 
StoreTypes.AccumulableInfo = {
-    val builder = StoreTypes.AccumulableInfo.newBuilder()
-      .setId(input.id)
-      .setName(input.name)
-      .setValue(input.value)
-    input.update.foreach(builder.setUpdate)
-    builder.build()
-  }
 }
diff --git 
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
 
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
index dab9d9c071f..a9edae711b4 100644
--- 
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
@@ -133,13 +133,7 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite 
{
 
     val bytes = serializer.serialize(input)
     val result = serializer.deserialize(bytes, classOf[TaskDataWrapper])
-    assert(result.accumulatorUpdates.length == input.accumulatorUpdates.length)
-    result.accumulatorUpdates.zip(input.accumulatorUpdates).foreach { case 
(a1, a2) =>
-      assert(a1.id == a2.id)
-      assert(a1.name == a2.name)
-      assert(a1.update.getOrElse("") == a2.update.getOrElse(""))
-      assert(a1.update == a2.update)
-    }
+    checkAnswer(result.accumulatorUpdates, input.accumulatorUpdates)
     assert(result.taskId == input.taskId)
     assert(result.index == input.index)
     assert(result.attempt == input.attempt)
@@ -213,27 +207,7 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite 
{
     assert(result.stageId == input.stageId)
     assert(result.stageAttemptId == input.stageAttemptId)
     assert(result.executorId == input.executorId)
-    assert(result.info.taskTime == input.info.taskTime)
-    assert(result.info.failedTasks == input.info.failedTasks)
-    assert(result.info.succeededTasks == input.info.succeededTasks)
-    assert(result.info.killedTasks == input.info.killedTasks)
-    assert(result.info.inputBytes == input.info.inputBytes)
-    assert(result.info.inputRecords == input.info.inputRecords)
-    assert(result.info.outputBytes == input.info.outputBytes)
-    assert(result.info.outputRecords == input.info.outputRecords)
-    assert(result.info.shuffleRead == input.info.shuffleRead)
-    assert(result.info.shuffleReadRecords == input.info.shuffleReadRecords)
-    assert(result.info.shuffleWrite == input.info.shuffleWrite)
-    assert(result.info.shuffleWriteRecords == input.info.shuffleWriteRecords)
-    assert(result.info.memoryBytesSpilled == input.info.memoryBytesSpilled)
-    assert(result.info.diskBytesSpilled == input.info.diskBytesSpilled)
-    assert(result.info.isBlacklistedForStage == 
input.info.isBlacklistedForStage)
-    assert(result.info.isExcludedForStage == input.info.isExcludedForStage)
-    assert(result.info.peakMemoryMetrics.isDefined)
-    ExecutorMetricType.metricToOffset.foreach { case (name, index) =>
-      result.info.peakMemoryMetrics.get.getMetricValue(name) ==
-        input.info.peakMemoryMetrics.get.getMetricValue(name)
-    }
+    checkAnswer(result.info, input.info)
   }
 
   test("Application Environment Info") {
@@ -613,11 +587,7 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite 
{
     val result = serializer.deserialize(bytes, 
classOf[SpeculationStageSummaryWrapper])
     assert(result.stageId == input.stageId)
     assert(result.stageAttemptId == input.stageAttemptId)
-    assert(result.info.numTasks == input.info.numTasks)
-    assert(result.info.numActiveTasks == input.info.numActiveTasks)
-    assert(result.info.numCompletedTasks == input.info.numCompletedTasks)
-    assert(result.info.numFailedTasks == input.info.numFailedTasks)
-    assert(result.info.numKilledTasks == input.info.numKilledTasks)
+    checkAnswer(result.info, input.info)
   }
 
   test("Executor Summary") {
@@ -718,10 +688,7 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite 
{
 
     assert(result.info.peakMemoryMetrics.isDefined == 
input.info.peakMemoryMetrics.isDefined)
     if (result.info.peakMemoryMetrics.isDefined && 
input.info.peakMemoryMetrics.isDefined) {
-      ExecutorMetricType.metricToOffset.foreach { case (name, index) =>
-        result.info.peakMemoryMetrics.get.getMetricValue(name) ==
-          input.info.peakMemoryMetrics.get.getMetricValue(name)
-      }
+      checkAnswer(result.info.peakMemoryMetrics.get, 
input.info.peakMemoryMetrics.get)
     }
 
     assert(result.info.attributes.size == input.info.attributes.size)
@@ -857,4 +824,573 @@ class KVStoreProtobufSerializerSuite extends 
SparkFunSuite {
 
     compareClusters(result.rootCluster, input.rootCluster)
   }
+
+  test("Stage Data") {
+    val accumulatorUpdates = Seq(
+      new AccumulableInfo(1L, "duration", Some("update"), "value1"),
+      new AccumulableInfo(2L, "duration2", None, "value2")
+    )
+    val inputMetrics = new InputMetrics(
+      bytesRead = 1L,
+      recordsRead = 2L)
+    val outputMetrics = new OutputMetrics(
+      bytesWritten = 1L,
+      recordsWritten = 2L
+    )
+    val shuffleReadMetrics = new ShuffleReadMetrics(
+      remoteBlocksFetched = 1L,
+      localBlocksFetched = 2L,
+      fetchWaitTime = 3L,
+      remoteBytesRead = 4L,
+      remoteBytesReadToDisk = 5L,
+      localBytesRead = 6L,
+      recordsRead = 7L
+    )
+    val shuffleWriteMetrics = new ShuffleWriteMetrics(
+      bytesWritten = 1L,
+      writeTime = 2L,
+      recordsWritten = 3L
+    )
+    val taskMetrics = new TaskMetrics(
+      executorDeserializeTime = 1L,
+      executorDeserializeCpuTime = 2L,
+      executorRunTime = 3L,
+      executorCpuTime = 4L,
+      resultSize = 5L,
+      jvmGcTime = 6L,
+      resultSerializationTime = 7L,
+      memoryBytesSpilled = 8L,
+      diskBytesSpilled = 9L,
+      peakExecutionMemory = 10L,
+      inputMetrics = inputMetrics,
+      outputMetrics = outputMetrics,
+      shuffleReadMetrics = shuffleReadMetrics,
+      shuffleWriteMetrics = shuffleWriteMetrics
+    )
+    val taskData1 = new TaskData(
+      taskId = 1L,
+      index = 2,
+      attempt = 3,
+      partitionId = 4,
+      launchTime = new Date(123456L),
+      resultFetchStart = Some(new Date(223456L)),
+      duration = Some(10000L),
+      executorId = "executor_id_1",
+      host = "host_name_1",
+      status = "SUCCESS",
+      taskLocality = "LOCAL",
+      speculative = true,
+      accumulatorUpdates = accumulatorUpdates,
+      errorMessage = Some("error_1"),
+      taskMetrics = Some(taskMetrics),
+      executorLogs = Map("executor_id_1" -> "executor_log_1"),
+      schedulerDelay = 5L,
+      gettingResultTime = 6L
+    )
+    val taskData2 = new TaskData(
+      taskId = 11L,
+      index = 12,
+      attempt = 13,
+      partitionId = 14,
+      launchTime = new Date(1123456L),
+      resultFetchStart = Some(new Date(1223456L)),
+      duration = Some(110000L),
+      executorId = "executor_id_2",
+      host = "host_name_2",
+      status = "SUCCESS",
+      taskLocality = "LOCAL",
+      speculative = false,
+      accumulatorUpdates = accumulatorUpdates,
+      errorMessage = Some("error_2"),
+      taskMetrics = Some(taskMetrics),
+      executorLogs = Map("executor_id_2" -> "executor_log_2"),
+      schedulerDelay = 15L,
+      gettingResultTime = 16L
+    )
+    val tasks = Some(
+      Map(1L -> taskData1, 2L -> taskData2)
+    )
+    val peakMemoryMetrics =
+      Some(new ExecutorMetrics(Array(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 
1024L)))
+    val executorStageSummary1 = new ExecutorStageSummary(
+      taskTime = 1L,
+      failedTasks = 2,
+      succeededTasks = 3,
+      killedTasks = 4,
+      inputBytes = 5L,
+      inputRecords = 6L,
+      outputBytes = 7L,
+      outputRecords = 8L,
+      shuffleRead = 9L,
+      shuffleReadRecords = 10L,
+      shuffleWrite = 11L,
+      shuffleWriteRecords = 12L,
+      memoryBytesSpilled = 13L,
+      diskBytesSpilled = 14L,
+      isBlacklistedForStage = true,
+      peakMemoryMetrics = peakMemoryMetrics,
+      isExcludedForStage = false)
+    val executorStageSummary2 = new ExecutorStageSummary(
+      taskTime = 11L,
+      failedTasks = 12,
+      succeededTasks = 13,
+      killedTasks = 14,
+      inputBytes = 15L,
+      inputRecords = 16L,
+      outputBytes = 17L,
+      outputRecords = 18L,
+      shuffleRead = 19L,
+      shuffleReadRecords = 110L,
+      shuffleWrite = 111L,
+      shuffleWriteRecords = 112L,
+      memoryBytesSpilled = 113L,
+      diskBytesSpilled = 114L,
+      isBlacklistedForStage = false,
+      peakMemoryMetrics = peakMemoryMetrics,
+      isExcludedForStage = true)
+    val executorSummary = Some(
+      Map("executor_id_1" -> executorStageSummary1, "executor_id_2" -> 
executorStageSummary2)
+    )
+    val speculationStageSummary = new SpeculationStageSummary(
+      numTasks = 3,
+      numActiveTasks = 4,
+      numCompletedTasks = 5,
+      numFailedTasks = 6,
+      numKilledTasks = 7
+    )
+    val inputMetricDistributions = new InputMetricDistributions(
+      bytesRead = IndexedSeq(1.001D, 2.001D),
+      recordsRead = IndexedSeq(3.001D, 4.001D)
+    )
+    val outputMetricDistributions = new OutputMetricDistributions(
+      bytesWritten = IndexedSeq(1.001D, 2.001D),
+      recordsWritten = IndexedSeq(3.001D, 4.001D)
+    )
+    val shuffleReadMetricDistributions = new ShuffleReadMetricDistributions(
+      readBytes = IndexedSeq(1.001D, 2.001D),
+      readRecords = IndexedSeq(3.001D, 4.001D),
+      remoteBlocksFetched = IndexedSeq(5.001D, 6.001D),
+      localBlocksFetched = IndexedSeq(7.001D, 8.001D),
+      fetchWaitTime = IndexedSeq(9.001D, 10.001D),
+      remoteBytesRead = IndexedSeq(11.001D, 12.001D),
+      remoteBytesReadToDisk = IndexedSeq(13.001D, 14.001D),
+      totalBlocksFetched = IndexedSeq(15.001D, 16.001D)
+    )
+    val shuffleWriteMetricDistributions = new ShuffleWriteMetricDistributions(
+      writeBytes = IndexedSeq(1.001D, 2.001D),
+      writeRecords = IndexedSeq(3.001D, 4.001D),
+      writeTime = IndexedSeq(5.001D, 6.001D)
+    )
+    val taskMetricDistributions = new TaskMetricDistributions(
+      quantiles = IndexedSeq(1.001D, 2.001D),
+      duration = IndexedSeq(3.001D, 4.001D),
+      executorDeserializeTime = IndexedSeq(5.001D, 6.001D),
+      executorDeserializeCpuTime = IndexedSeq(7.001D, 8.001D),
+      executorRunTime = IndexedSeq(9.001D, 10.001D),
+      executorCpuTime = IndexedSeq(11.001D, 12.001D),
+      resultSize = IndexedSeq(13.001D, 14.001D),
+      jvmGcTime = IndexedSeq(15.001D, 16.001D),
+      resultSerializationTime = IndexedSeq(17.001D, 18.001D),
+      gettingResultTime = IndexedSeq(19.001D, 20.001D),
+      schedulerDelay = IndexedSeq(21.001D, 22.001D),
+      peakExecutionMemory = IndexedSeq(23.001D, 24.001D),
+      memoryBytesSpilled = IndexedSeq(25.001D, 26.001D),
+      diskBytesSpilled = IndexedSeq(27.001D, 28.001D),
+      inputMetrics = inputMetricDistributions,
+      outputMetrics = outputMetricDistributions,
+      shuffleReadMetrics = shuffleReadMetricDistributions,
+      shuffleWriteMetrics = shuffleWriteMetricDistributions
+    )
+    val executorPeakMetricsDistributions = new 
ExecutorPeakMetricsDistributions(
+      quantiles = IndexedSeq(1.001D, 2.001D),
+      executorMetrics = IndexedSeq(
+        new ExecutorMetrics(Array(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 1024L)))
+    )
+    val executorMetricsDistributions = new ExecutorMetricsDistributions(
+      quantiles = IndexedSeq(1.001D, 2.001D),
+      taskTime = IndexedSeq(3.001D, 4.001D),
+      failedTasks = IndexedSeq(5.001D, 6.001D),
+      succeededTasks = IndexedSeq(7.001D, 8.001D),
+      killedTasks = IndexedSeq(9.001D, 10.001D),
+      inputBytes = IndexedSeq(11.001D, 12.001D),
+      inputRecords = IndexedSeq(13.001D, 14.001D),
+      outputBytes = IndexedSeq(15.001D, 16.001D),
+      outputRecords = IndexedSeq(17.001D, 18.001D),
+      shuffleRead = IndexedSeq(19.001D, 20.001D),
+      shuffleReadRecords = IndexedSeq(21.001D, 22.001D),
+      shuffleWrite = IndexedSeq(23.001D, 24.001D),
+      shuffleWriteRecords = IndexedSeq(25.001D, 24.001D),
+      memoryBytesSpilled = IndexedSeq(27.001D, 28.001D),
+      diskBytesSpilled = IndexedSeq(29.001D, 30.001D),
+      peakMemoryMetrics = executorPeakMetricsDistributions
+    )
+    val info = new StageData(
+      status = StageStatus.COMPLETE,
+      stageId = 1,
+      attemptId = 2,
+      numTasks = 3,
+      numActiveTasks = 4,
+      numCompleteTasks = 5,
+      numFailedTasks = 6,
+      numKilledTasks = 7,
+      numCompletedIndices = 8,
+      submissionTime = Some(new Date(123456L)),
+      firstTaskLaunchedTime = Some(new Date(234567L)),
+      completionTime = Some(new Date(654321L)),
+      failureReason = Some("failure reason"),
+      executorDeserializeTime = 9L,
+      executorDeserializeCpuTime = 10L,
+      executorRunTime = 11L,
+      executorCpuTime = 12L,
+      resultSize = 13L,
+      jvmGcTime = 14L,
+      resultSerializationTime = 15L,
+      memoryBytesSpilled = 16L,
+      diskBytesSpilled = 17L,
+      peakExecutionMemory = 18L,
+      inputBytes = 19L,
+      inputRecords = 20L,
+      outputBytes = 21L,
+      outputRecords = 22L,
+      shuffleRemoteBlocksFetched = 23L,
+      shuffleLocalBlocksFetched = 24L,
+      shuffleFetchWaitTime = 25L,
+      shuffleRemoteBytesRead = 26L,
+      shuffleRemoteBytesReadToDisk = 27L,
+      shuffleLocalBytesRead = 28L,
+      shuffleReadBytes = 29L,
+      shuffleReadRecords = 30L,
+      shuffleWriteBytes = 31L,
+      shuffleWriteTime = 32L,
+      shuffleWriteRecords = 33L,
+      name = "name",
+      description = Some("test description"),
+      details = "test details",
+      schedulingPool = "test scheduling pool",
+      rddIds = Seq(1, 2, 3, 4, 5, 6),
+      accumulatorUpdates = accumulatorUpdates,
+      tasks = tasks,
+      executorSummary = executorSummary,
+      speculationSummary = Some(speculationStageSummary),
+      killedTasksSummary = Map("task_1" -> 1),
+      resourceProfileId = 34,
+      peakExecutorMetrics = peakMemoryMetrics,
+      taskMetricsDistributions = Some(taskMetricDistributions),
+      executorMetricsDistributions = Some(executorMetricsDistributions)
+    )
+    val input = new StageDataWrapper(
+      info = info,
+      jobIds = Set(1, 2, 3, 4),
+      locality = Map(
+        "PROCESS_LOCAL" -> 1L,
+        "NODE_LOCAL" -> 2L
+      )
+    )
+
+    val bytes = serializer.serialize(input)
+    val result = serializer.deserialize(bytes, classOf[StageDataWrapper])
+
+    assert(result.jobIds == input.jobIds)
+    assert(result.locality == input.locality)
+
+    assert(result.info.status == input.info.status)
+    assert(result.info.stageId == input.info.stageId)
+    assert(result.info.attemptId == input.info.attemptId)
+    assert(result.info.numTasks == input.info.numTasks)
+    assert(result.info.numActiveTasks == input.info.numActiveTasks)
+    assert(result.info.numCompleteTasks == input.info.numCompleteTasks)
+    assert(result.info.numFailedTasks == input.info.numFailedTasks)
+    assert(result.info.numKilledTasks == input.info.numKilledTasks)
+    assert(result.info.numCompletedIndices == input.info.numCompletedIndices)
+
+    assert(result.info.submissionTime == input.info.submissionTime)
+    assert(result.info.firstTaskLaunchedTime == 
input.info.firstTaskLaunchedTime)
+    assert(result.info.completionTime == input.info.completionTime)
+    assert(result.info.failureReason == input.info.failureReason)
+
+    assert(result.info.executorDeserializeTime == 
input.info.executorDeserializeTime)
+    assert(result.info.executorDeserializeCpuTime == 
input.info.executorDeserializeCpuTime)
+    assert(result.info.executorRunTime == input.info.executorRunTime)
+    assert(result.info.executorCpuTime == input.info.executorCpuTime)
+    assert(result.info.resultSize == input.info.resultSize)
+    assert(result.info.jvmGcTime == input.info.jvmGcTime)
+    assert(result.info.resultSerializationTime == 
input.info.resultSerializationTime)
+    assert(result.info.memoryBytesSpilled == input.info.memoryBytesSpilled)
+    assert(result.info.diskBytesSpilled == input.info.diskBytesSpilled)
+    assert(result.info.peakExecutionMemory == input.info.peakExecutionMemory)
+    assert(result.info.inputBytes == input.info.inputBytes)
+    assert(result.info.inputRecords == input.info.inputRecords)
+    assert(result.info.outputBytes == input.info.outputBytes)
+    assert(result.info.outputRecords == input.info.outputRecords)
+    assert(result.info.shuffleRemoteBlocksFetched == 
input.info.shuffleRemoteBlocksFetched)
+    assert(result.info.shuffleLocalBlocksFetched == 
input.info.shuffleLocalBlocksFetched)
+    assert(result.info.shuffleFetchWaitTime == input.info.shuffleFetchWaitTime)
+    assert(result.info.shuffleRemoteBytesRead == 
input.info.shuffleRemoteBytesRead)
+    assert(result.info.shuffleRemoteBytesReadToDisk == 
input.info.shuffleRemoteBytesReadToDisk)
+    assert(result.info.shuffleLocalBytesRead == 
input.info.shuffleLocalBytesRead)
+    assert(result.info.shuffleReadBytes == input.info.shuffleReadBytes)
+    assert(result.info.shuffleReadRecords == input.info.shuffleReadRecords)
+    assert(result.info.shuffleWriteBytes == input.info.shuffleWriteBytes)
+    assert(result.info.shuffleWriteTime == input.info.shuffleWriteTime)
+    assert(result.info.shuffleWriteRecords == input.info.shuffleWriteRecords)
+
+    assert(result.info.name == input.info.name)
+    assert(result.info.description == input.info.description)
+    assert(result.info.details == input.info.details)
+    assert(result.info.schedulingPool == input.info.schedulingPool)
+
+    assert(result.info.rddIds == input.info.rddIds)
+    checkAnswer(result.info.accumulatorUpdates, input.info.accumulatorUpdates)
+
+    assert(result.info.tasks.isDefined == input.info.tasks.isDefined)
+    if (result.info.tasks.isDefined && input.info.tasks.isDefined) {
+      checkIdTask(result.info.tasks.get, input.info.tasks.get)
+    }
+
+    assert(result.info.executorSummary.isDefined == 
input.info.executorSummary.isDefined)
+    if (result.info.executorSummary.isDefined && 
input.info.executorSummary.isDefined) {
+      checkAnswer(result.info.executorSummary.get, 
input.info.executorSummary.get)
+    }
+
+    assert(result.info.speculationSummary.isDefined == 
input.info.speculationSummary.isDefined)
+    if (result.info.speculationSummary.isDefined && 
input.info.speculationSummary.isDefined) {
+      checkAnswer(result.info.speculationSummary.get, 
input.info.speculationSummary.get)
+    }
+    assert(result.info.killedTasksSummary == input.info.killedTasksSummary)
+    assert(result.info.resourceProfileId == input.info.resourceProfileId)
+    assert(result.info.peakExecutorMetrics.isDefined == 
input.info.peakExecutorMetrics.isDefined)
+    if (result.info.peakExecutorMetrics.isDefined && 
input.info.peakExecutorMetrics.isDefined) {
+      checkAnswer(result.info.peakExecutorMetrics.get, 
input.info.peakExecutorMetrics.get)
+    }
+    assert(result.info.taskMetricsDistributions.isDefined ==
+      input.info.taskMetricsDistributions.isDefined)
+    if (result.info.taskMetricsDistributions.isDefined &&
+      input.info.taskMetricsDistributions.isDefined) {
+      checkAnswer(result.info.taskMetricsDistributions.get, 
input.info.taskMetricsDistributions.get)
+    }
+    assert(result.info.executorMetricsDistributions.isDefined ==
+      input.info.executorMetricsDistributions.isDefined)
+    if (result.info.executorMetricsDistributions.isDefined &&
+      input.info.executorMetricsDistributions.isDefined) {
+      checkAnswer(result.info.executorMetricsDistributions.get,
+        input.info.executorMetricsDistributions.get)
+    }
+  }
+
+  private def checkAnswer(result: TaskMetrics, expected: TaskMetrics): Unit = {
+    assert(result.executorDeserializeTime == expected.executorDeserializeTime)
+    assert(result.executorDeserializeCpuTime == 
expected.executorDeserializeCpuTime)
+    assert(result.executorRunTime == expected.executorRunTime)
+    assert(result.executorCpuTime == expected.executorCpuTime)
+    assert(result.resultSize == expected.resultSize)
+    assert(result.jvmGcTime == expected.jvmGcTime)
+    assert(result.resultSerializationTime == expected.resultSerializationTime)
+    assert(result.memoryBytesSpilled == expected.memoryBytesSpilled)
+    assert(result.diskBytesSpilled == expected.diskBytesSpilled)
+    assert(result.peakExecutionMemory == expected.peakExecutionMemory)
+    checkAnswer(result.inputMetrics, expected.inputMetrics)
+    checkAnswer(result.outputMetrics, expected.outputMetrics)
+    checkAnswer(result.shuffleReadMetrics, expected.shuffleReadMetrics)
+    checkAnswer(result.shuffleWriteMetrics, expected.shuffleWriteMetrics)
+  }
+
+  private def checkAnswer(result: InputMetrics, expected: InputMetrics): Unit 
= {
+    assert(result.bytesRead == expected.bytesRead)
+    assert(result.recordsRead == expected.recordsRead)
+  }
+
+  private def checkAnswer(result: OutputMetrics, expected: OutputMetrics): 
Unit = {
+    assert(result.bytesWritten == expected.bytesWritten)
+    assert(result.recordsWritten == expected.recordsWritten)
+  }
+
+  private def checkAnswer(result: ShuffleReadMetrics, expected: 
ShuffleReadMetrics): Unit = {
+    assert(result.remoteBlocksFetched == expected.remoteBlocksFetched)
+    assert(result.localBlocksFetched == expected.localBlocksFetched)
+    assert(result.fetchWaitTime == expected.fetchWaitTime)
+    assert(result.remoteBytesRead == expected.remoteBytesRead)
+    assert(result.remoteBytesReadToDisk == expected.remoteBytesReadToDisk)
+    assert(result.localBytesRead == expected.localBytesRead)
+    assert(result.recordsRead == expected.recordsRead)
+  }
+
+  private def checkAnswer(result: ShuffleWriteMetrics, expected: 
ShuffleWriteMetrics): Unit = {
+    assert(result.bytesWritten == expected.bytesWritten)
+    assert(result.writeTime == expected.writeTime)
+    assert(result.recordsWritten == expected.recordsWritten)
+  }
+
+  private def checkAnswer(result: collection.Seq[AccumulableInfo],
+      expected: collection.Seq[AccumulableInfo]): Unit = {
+    assert(result.length == expected.length)
+    result.zip(expected).foreach { case (a1, a2) =>
+      assert(a1.id == a2.id)
+      assert(a1.name == a2.name)
+      assert(a1.update.getOrElse("") == a2.update.getOrElse(""))
+      assert(a1.update == a2.update)
+    }
+  }
+
+  private def checkIdTask(result: Map[Long, TaskData], expected: Map[Long, 
TaskData]): Unit = {
+    assert(result.size == expected.size)
+    assert(result.keys.size == expected.keys.size)
+    result.keysIterator.foreach { k =>
+      assert(expected.contains(k))
+      checkAnswer(result(k), expected(k))
+    }
+  }
+
+  private def checkAnswer(result: TaskData, expected: TaskData): Unit = {
+    assert(result.taskId == expected.taskId)
+    assert(result.index == expected.index)
+    assert(result.attempt == expected.attempt)
+    assert(result.partitionId == expected.partitionId)
+    assert(result.launchTime == expected.launchTime)
+    assert(result.resultFetchStart == expected.resultFetchStart)
+    assert(result.duration == expected.duration)
+    assert(result.executorId == expected.executorId)
+    assert(result.host == expected.host)
+    assert(result.status == expected.status)
+    assert(result.taskLocality == expected.taskLocality)
+    assert(result.speculative == expected.speculative)
+    checkAnswer(result.accumulatorUpdates, expected.accumulatorUpdates)
+    assert(result.errorMessage == expected.errorMessage)
+    assert(result.taskMetrics.isDefined == expected.taskMetrics.isDefined)
+    if (result.taskMetrics.isDefined && expected.taskMetrics.isDefined) {
+      checkAnswer(result.taskMetrics.get, expected.taskMetrics.get)
+    }
+  }
+
+  private def checkAnswer(result: Map[String, ExecutorStageSummary],
+      expected: Map[String, ExecutorStageSummary]): Unit = {
+    assert(result.size == expected.size)
+    assert(result.keys.size == expected.keys.size)
+    result.keysIterator.foreach { k =>
+      assert(expected.contains(k))
+      checkAnswer(result(k), expected(k))
+    }
+  }
+
+  private def checkAnswer(result: ExecutorStageSummary,
+      expected: ExecutorStageSummary): Unit = {
+    assert(result.taskTime == expected.taskTime)
+    assert(result.failedTasks == expected.failedTasks)
+    assert(result.succeededTasks == expected.succeededTasks)
+    assert(result.killedTasks == expected.killedTasks)
+    assert(result.inputBytes == expected.inputBytes)
+    assert(result.inputRecords == expected.inputRecords)
+    assert(result.outputBytes == expected.outputBytes)
+    assert(result.outputRecords == expected.outputRecords)
+    assert(result.shuffleRead == expected.shuffleRead)
+    assert(result.shuffleReadRecords == expected.shuffleReadRecords)
+    assert(result.shuffleWrite == expected.shuffleWrite)
+    assert(result.shuffleWriteRecords == expected.shuffleWriteRecords)
+    assert(result.memoryBytesSpilled == expected.memoryBytesSpilled)
+    assert(result.diskBytesSpilled == expected.diskBytesSpilled)
+    assert(result.isBlacklistedForStage == expected.isBlacklistedForStage)
+    assert(result.isExcludedForStage == expected.isExcludedForStage)
+    assert(result.peakMemoryMetrics.isDefined == 
expected.peakMemoryMetrics.isDefined)
+    if (result.peakMemoryMetrics.isDefined && 
expected.peakMemoryMetrics.isDefined) {
+      checkAnswer(result.peakMemoryMetrics.get, expected.peakMemoryMetrics.get)
+    }
+  }
+
+  private def checkAnswer(result: SpeculationStageSummary,
+      expected: SpeculationStageSummary): Unit = {
+    assert(result.numTasks == expected.numTasks)
+    assert(result.numActiveTasks == expected.numActiveTasks)
+    assert(result.numCompletedTasks == expected.numCompletedTasks)
+    assert(result.numFailedTasks == expected.numFailedTasks)
+    assert(result.numKilledTasks == expected.numKilledTasks)
+  }
+
+  private def checkAnswer(result: ExecutorMetrics, expected: ExecutorMetrics): 
Unit = {
+    ExecutorMetricType.metricToOffset.foreach { case (name, _) =>
+      result.getMetricValue(name) == expected.getMetricValue(name)
+    }
+  }
+
+  private def checkAnswer(result: TaskMetricDistributions,
+      expected: TaskMetricDistributions): Unit = {
+    assert(result.quantiles == expected.quantiles)
+    assert(result.duration == expected.duration)
+    assert(result.executorDeserializeTime == expected.executorDeserializeTime)
+    assert(result.executorDeserializeCpuTime == 
expected.executorDeserializeCpuTime)
+    assert(result.executorRunTime == expected.executorRunTime)
+    assert(result.executorCpuTime == expected.executorCpuTime)
+    assert(result.resultSize == expected.resultSize)
+    assert(result.jvmGcTime == expected.jvmGcTime)
+    assert(result.resultSerializationTime == expected.resultSerializationTime)
+    assert(result.gettingResultTime == expected.gettingResultTime)
+    assert(result.schedulerDelay == expected.schedulerDelay)
+    assert(result.peakExecutionMemory == expected.peakExecutionMemory)
+    assert(result.memoryBytesSpilled == expected.memoryBytesSpilled)
+    assert(result.diskBytesSpilled == expected.diskBytesSpilled)
+
+    checkAnswer(result.inputMetrics, expected.inputMetrics)
+    checkAnswer(result.outputMetrics, expected.outputMetrics)
+    checkAnswer(result.shuffleReadMetrics, expected.shuffleReadMetrics)
+    checkAnswer(result.shuffleWriteMetrics, expected.shuffleWriteMetrics)
+  }
+
+  private def checkAnswer(result: InputMetricDistributions,
+      expected: InputMetricDistributions): Unit = {
+    assert(result.bytesRead == expected.bytesRead)
+    assert(result.recordsRead == expected.recordsRead)
+  }
+
+  private def checkAnswer(result: OutputMetricDistributions,
+      expected: OutputMetricDistributions): Unit = {
+    assert(result.bytesWritten == expected.bytesWritten)
+    assert(result.recordsWritten == expected.recordsWritten)
+  }
+
+  private def checkAnswer(result: ShuffleReadMetricDistributions,
+      expected: ShuffleReadMetricDistributions): Unit = {
+    assert(result.readBytes == expected.readBytes)
+    assert(result.readRecords == expected.readRecords)
+    assert(result.remoteBlocksFetched == expected.remoteBlocksFetched)
+    assert(result.localBlocksFetched == expected.localBlocksFetched)
+    assert(result.fetchWaitTime == expected.fetchWaitTime)
+    assert(result.remoteBytesRead == expected.remoteBytesRead)
+    assert(result.remoteBytesReadToDisk == expected.remoteBytesReadToDisk)
+    assert(result.totalBlocksFetched == expected.totalBlocksFetched)
+  }
+
+  private def checkAnswer(result: ShuffleWriteMetricDistributions,
+      expected: ShuffleWriteMetricDistributions): Unit = {
+    assert(result.writeBytes == expected.writeBytes)
+    assert(result.writeRecords == expected.writeRecords)
+    assert(result.writeTime == expected.writeTime)
+  }
+
+  private def checkAnswer(result: ExecutorMetricsDistributions,
+      expected: ExecutorMetricsDistributions): Unit = {
+    assert(result.quantiles == expected.quantiles)
+
+    assert(result.taskTime == expected.taskTime)
+    assert(result.failedTasks == expected.failedTasks)
+    assert(result.succeededTasks == expected.succeededTasks)
+    assert(result.killedTasks == expected.killedTasks)
+    assert(result.inputBytes == expected.inputBytes)
+    assert(result.inputRecords == expected.inputRecords)
+    assert(result.outputBytes == expected.outputBytes)
+    assert(result.outputRecords == expected.outputRecords)
+    assert(result.shuffleRead == expected.shuffleRead)
+    assert(result.shuffleReadRecords == expected.shuffleReadRecords)
+    assert(result.shuffleWrite == expected.shuffleWrite)
+    assert(result.shuffleWriteRecords == expected.shuffleWriteRecords)
+    assert(result.memoryBytesSpilled == expected.memoryBytesSpilled)
+    assert(result.diskBytesSpilled == expected.diskBytesSpilled)
+    checkAnswer(result.peakMemoryMetrics, expected.peakMemoryMetrics)
+  }
+
+  private def checkAnswer(result: ExecutorPeakMetricsDistributions,
+      expected: ExecutorPeakMetricsDistributions): Unit = {
+    assert(result.quantiles == expected.quantiles)
+    assert(result.executorMetrics.size == expected.executorMetrics.size)
+    result.executorMetrics.zip(expected.executorMetrics).foreach { case (a1, 
a2) =>
+      checkAnswer(a1, a2)
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to