This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a4804e0f3d [SPARK-42138][UI] Handle null string values in 
JobData/TaskDataWrapper/ExecutorStageSummaryWrapper
9a4804e0f3d is described below

commit 9a4804e0f3dc8029eab1d270b505a3e1e783d139
Author: Gengliang Wang <gengli...@apache.org>
AuthorDate: Fri Jan 20 22:35:14 2023 -0800

    [SPARK-42138][UI] Handle null string values in 
JobData/TaskDataWrapper/ExecutorStageSummaryWrapper
    
    ### What changes were proposed in this pull request?
    
    Similar to https://github.com/apache/spark/pull/39666, this PR handles null 
string values in JobData/TaskDataWrapper/ExecutorStageSummaryWrapper
    ### Why are the changes needed?
    
    Properly handles null string values in the protobuf serializer.
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    New UTs
    
    Closes #39680 from gengliangwang/handleMoreNull.
    
    Authored-by: Gengliang Wang <gengli...@apache.org>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../apache/spark/status/protobuf/store_types.proto |  12 +-
 .../ExecutorStageSummaryWrapperSerializer.scala    |   5 +-
 .../status/protobuf/JobDataWrapperSerializer.scala |   7 +-
 .../protobuf/TaskDataWrapperSerializer.scala       |  19 +-
 .../protobuf/KVStoreProtobufSerializerSuite.scala  | 340 +++++++++++----------
 5 files changed, 198 insertions(+), 185 deletions(-)

diff --git 
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto 
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index 96c78aa001d..b7afa93c04a 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -41,7 +41,7 @@ enum JobExecutionStatus {
 message JobData {
   // All IDs are int64 for extendability, even when they are currently int32 
in Spark.
   int64 job_id = 1;
-  string name = 2;
+  optional string name = 2;
   optional string description = 3;
   optional int64 submission_time = 4;
   optional int64 completion_time = 5;
@@ -83,10 +83,10 @@ message TaskDataWrapper {
   int64 launch_time = 5;
   int64 result_fetch_start = 6;
   int64 duration = 7;
-  string executor_id = 8;
-  string host = 9;
-  string status = 10;
-  string task_locality = 11;
+  optional string executor_id = 8;
+  optional string host = 9;
+  optional string status = 10;
+  optional string task_locality = 11;
   bool speculative = 12;
   repeated AccumulableInfo accumulator_updates = 13;
   optional string error_message = 14;
@@ -156,7 +156,7 @@ message ExecutorStageSummary {
 message ExecutorStageSummaryWrapper {
   int64 stage_id = 1;
   int32 stage_attempt_id = 2;
-  string executor_id = 3;
+  optional string executor_id = 3;
   ExecutorStageSummary info = 4;
 }
 
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
index 2ffba9db755..8e41a857057 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.status.protobuf
 
 import org.apache.spark.status.ExecutorStageSummaryWrapper
+import org.apache.spark.status.protobuf.Utils.{getStringField, setStringField}
 import org.apache.spark.util.Utils.weakIntern
 
 class ExecutorStageSummaryWrapperSerializer
@@ -28,8 +29,8 @@ class ExecutorStageSummaryWrapperSerializer
     val builder = StoreTypes.ExecutorStageSummaryWrapper.newBuilder()
       .setStageId(input.stageId.toLong)
       .setStageAttemptId(input.stageAttemptId)
-      .setExecutorId(input.executorId)
       .setInfo(info)
+    setStringField(input.executorId, builder.setExecutorId)
     builder.build().toByteArray
   }
 
@@ -39,7 +40,7 @@ class ExecutorStageSummaryWrapperSerializer
     new ExecutorStageSummaryWrapper(
       stageId = binary.getStageId.toInt,
       stageAttemptId = binary.getStageAttemptId,
-      executorId = weakIntern(binary.getExecutorId),
+      executorId = getStringField(binary.hasExecutorId, () => 
weakIntern(binary.getExecutorId)),
       info = info)
   }
 }
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
index 10e0f125f6c..55bb4e2549e 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
@@ -23,7 +23,7 @@ import collection.JavaConverters._
 
 import org.apache.spark.status.JobDataWrapper
 import org.apache.spark.status.api.v1.JobData
-import org.apache.spark.status.protobuf.Utils.getOptional
+import org.apache.spark.status.protobuf.Utils.{getOptional, getStringField, 
setStringField}
 
 class JobDataWrapperSerializer extends ProtobufSerDe[JobDataWrapper] {
 
@@ -49,7 +49,6 @@ class JobDataWrapperSerializer extends 
ProtobufSerDe[JobDataWrapper] {
   private def serializeJobData(jobData: JobData): StoreTypes.JobData = {
     val jobDataBuilder = StoreTypes.JobData.newBuilder()
     jobDataBuilder.setJobId(jobData.jobId.toLong)
-      .setName(jobData.name)
       .setStatus(JobExecutionStatusSerializer.serialize(jobData.status))
       .setNumTasks(jobData.numTasks)
       .setNumActiveTasks(jobData.numActiveTasks)
@@ -62,7 +61,7 @@ class JobDataWrapperSerializer extends 
ProtobufSerDe[JobDataWrapper] {
       .setNumCompletedStages(jobData.numCompletedStages)
       .setNumSkippedStages(jobData.numSkippedStages)
       .setNumFailedStages(jobData.numFailedStages)
-
+    setStringField(jobData.name, jobDataBuilder.setName)
     jobData.description.foreach(jobDataBuilder.setDescription)
     jobData.submissionTime.foreach { d =>
       jobDataBuilder.setSubmissionTime(d.getTime)
@@ -88,7 +87,7 @@ class JobDataWrapperSerializer extends 
ProtobufSerDe[JobDataWrapper] {
 
     new JobData(
       jobId = info.getJobId.toInt,
-      name = info.getName,
+      name = getStringField(info.hasName, info.getName),
       description = description,
       submissionTime = submissionTime,
       completionTime = completionTime,
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala
index 3c947c79eab..298a1612212 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.status.protobuf
 
 import org.apache.spark.status.TaskDataWrapper
-import org.apache.spark.status.protobuf.Utils.getOptional
+import org.apache.spark.status.protobuf.Utils.{getOptional, getStringField, 
setStringField}
 import org.apache.spark.util.Utils.weakIntern
 
 class TaskDataWrapperSerializer extends ProtobufSerDe[TaskDataWrapper] {
@@ -32,10 +32,6 @@ class TaskDataWrapperSerializer extends 
ProtobufSerDe[TaskDataWrapper] {
       .setLaunchTime(input.launchTime)
       .setResultFetchStart(input.resultFetchStart)
       .setDuration(input.duration)
-      .setExecutorId(input.executorId)
-      .setHost(input.host)
-      .setStatus(input.status)
-      .setTaskLocality(input.taskLocality)
       .setSpeculative(input.speculative)
       .setHasMetrics(input.hasMetrics)
       .setExecutorDeserializeTime(input.executorDeserializeTime)
@@ -74,6 +70,10 @@ class TaskDataWrapperSerializer extends 
ProtobufSerDe[TaskDataWrapper] {
       .setShuffleRecordsWritten(input.shuffleRecordsWritten)
       .setStageId(input.stageId)
       .setStageAttemptId(input.stageAttemptId)
+    setStringField(input.executorId, builder.setExecutorId)
+    setStringField(input.host, builder.setHost)
+    setStringField(input.status, builder.setStatus)
+    setStringField(input.taskLocality, builder.setTaskLocality)
     input.errorMessage.foreach(builder.setErrorMessage)
     input.accumulatorUpdates.foreach { update =>
       
builder.addAccumulatorUpdates(AccumulableInfoSerializer.serialize(update))
@@ -92,10 +92,11 @@ class TaskDataWrapperSerializer extends 
ProtobufSerDe[TaskDataWrapper] {
       launchTime = binary.getLaunchTime,
       resultFetchStart = binary.getResultFetchStart,
       duration = binary.getDuration,
-      executorId = weakIntern(binary.getExecutorId),
-      host = weakIntern(binary.getHost),
-      status = weakIntern(binary.getStatus),
-      taskLocality = weakIntern(binary.getTaskLocality),
+      executorId = getStringField(binary.hasExecutorId, () => 
weakIntern(binary.getExecutorId)),
+      host = getStringField(binary.hasHost, () => weakIntern(binary.getHost)),
+      status = getStringField(binary.hasStatus, () => 
weakIntern(binary.getStatus)),
+      taskLocality =
+        getStringField(binary.hasTaskLocality, () => 
weakIntern(binary.getTaskLocality)),
       speculative = binary.getSpeculative,
       accumulatorUpdates = accumulatorUpdates,
       errorMessage = getOptional(binary.hasErrorMessage, 
binary.getErrorMessage),
diff --git 
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
 
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
index 0d0d26410ed..774d7abc420 100644
--- 
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
@@ -32,55 +32,60 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite {
   private val serializer = new KVStoreProtobufSerializer()
 
   test("Job data") {
-    val input = new JobDataWrapper(
-      new JobData(
-        jobId = 1,
-        name = "test",
-        description = Some("test description"),
-        submissionTime = Some(new Date(123456L)),
-        completionTime = Some(new Date(654321L)),
-        stageIds = Seq(1, 2, 3, 4),
-        jobGroup = Some("group"),
-        status = JobExecutionStatus.UNKNOWN,
-        numTasks = 2,
-        numActiveTasks = 3,
-        numCompletedTasks = 4,
-        numSkippedTasks = 5,
-        numFailedTasks = 6,
-        numKilledTasks = 7,
-        numCompletedIndices = 8,
-        numActiveStages = 9,
-        numCompletedStages = 10,
-        numSkippedStages = 11,
-        numFailedStages = 12,
-        killedTasksSummary = Map("a" -> 1, "b" -> 2)),
-      Set(1, 2),
-      Some(999)
-    )
+    Seq(
+      ("test", Some("test description"), Some("group")),
+      (null, None, None)
+    ).foreach { case (name, description, jobGroup) =>
+      val input = new JobDataWrapper(
+        new JobData(
+          jobId = 1,
+          name = name,
+          description = description,
+          submissionTime = Some(new Date(123456L)),
+          completionTime = Some(new Date(654321L)),
+          stageIds = Seq(1, 2, 3, 4),
+          jobGroup = jobGroup,
+          status = JobExecutionStatus.UNKNOWN,
+          numTasks = 2,
+          numActiveTasks = 3,
+          numCompletedTasks = 4,
+          numSkippedTasks = 5,
+          numFailedTasks = 6,
+          numKilledTasks = 7,
+          numCompletedIndices = 8,
+          numActiveStages = 9,
+          numCompletedStages = 10,
+          numSkippedStages = 11,
+          numFailedStages = 12,
+          killedTasksSummary = Map("a" -> 1, "b" -> 2)),
+        Set(1, 2),
+        Some(999)
+      )
 
-    val bytes = serializer.serialize(input)
-    val result = serializer.deserialize(bytes, classOf[JobDataWrapper])
-    assert(result.info.jobId == input.info.jobId)
-    assert(result.info.description == input.info.description)
-    assert(result.info.submissionTime == input.info.submissionTime)
-    assert(result.info.completionTime == input.info.completionTime)
-    assert(result.info.stageIds == input.info.stageIds)
-    assert(result.info.jobGroup == input.info.jobGroup)
-    assert(result.info.status == input.info.status)
-    assert(result.info.numTasks == input.info.numTasks)
-    assert(result.info.numActiveTasks == input.info.numActiveTasks)
-    assert(result.info.numCompletedTasks == input.info.numCompletedTasks)
-    assert(result.info.numSkippedTasks == input.info.numSkippedTasks)
-    assert(result.info.numFailedTasks == input.info.numFailedTasks)
-    assert(result.info.numKilledTasks == input.info.numKilledTasks)
-    assert(result.info.numCompletedIndices == input.info.numCompletedIndices)
-    assert(result.info.numActiveStages == input.info.numActiveStages)
-    assert(result.info.numCompletedStages == input.info.numCompletedStages)
-    assert(result.info.numSkippedStages == input.info.numSkippedStages)
-    assert(result.info.numFailedStages == input.info.numFailedStages)
-    assert(result.info.killedTasksSummary == input.info.killedTasksSummary)
-    assert(result.skippedStages == input.skippedStages)
-    assert(result.sqlExecutionId == input.sqlExecutionId)
+      val bytes = serializer.serialize(input)
+      val result = serializer.deserialize(bytes, classOf[JobDataWrapper])
+      assert(result.info.jobId == input.info.jobId)
+      assert(result.info.description == input.info.description)
+      assert(result.info.submissionTime == input.info.submissionTime)
+      assert(result.info.completionTime == input.info.completionTime)
+      assert(result.info.stageIds == input.info.stageIds)
+      assert(result.info.jobGroup == input.info.jobGroup)
+      assert(result.info.status == input.info.status)
+      assert(result.info.numTasks == input.info.numTasks)
+      assert(result.info.numActiveTasks == input.info.numActiveTasks)
+      assert(result.info.numCompletedTasks == input.info.numCompletedTasks)
+      assert(result.info.numSkippedTasks == input.info.numSkippedTasks)
+      assert(result.info.numFailedTasks == input.info.numFailedTasks)
+      assert(result.info.numKilledTasks == input.info.numKilledTasks)
+      assert(result.info.numCompletedIndices == input.info.numCompletedIndices)
+      assert(result.info.numActiveStages == input.info.numActiveStages)
+      assert(result.info.numCompletedStages == input.info.numCompletedStages)
+      assert(result.info.numSkippedStages == input.info.numSkippedStages)
+      assert(result.info.numFailedStages == input.info.numFailedStages)
+      assert(result.info.killedTasksSummary == input.info.killedTasksSummary)
+      assert(result.skippedStages == input.skippedStages)
+      assert(result.sqlExecutionId == input.sqlExecutionId)
+    }
   }
 
   test("Task Data") {
@@ -89,112 +94,117 @@ class KVStoreProtobufSerializerSuite extends 
SparkFunSuite {
       new AccumulableInfo(2L, "duration2", None, "value2"),
       new AccumulableInfo(-1L, null, None, null)
     )
-    val input = new TaskDataWrapper(
-      taskId = 1,
-      index = 2,
-      attempt = 3,
-      partitionId = 4,
-      launchTime = 5L,
-      resultFetchStart = 6L,
-      duration = 10000L,
-      executorId = "executor_id_1",
-      host = "host_name",
-      status = "SUCCESS",
-      taskLocality = "LOCAL",
-      speculative = true,
-      accumulatorUpdates = accumulatorUpdates,
-      errorMessage = Some("error"),
-      hasMetrics = true,
-      executorDeserializeTime = 7L,
-      executorDeserializeCpuTime = 8L,
-      executorRunTime = 9L,
-      executorCpuTime = 10L,
-      resultSize = 11L,
-      jvmGcTime = 12L,
-      resultSerializationTime = 13L,
-      memoryBytesSpilled = 14L,
-      diskBytesSpilled = 15L,
-      peakExecutionMemory = 16L,
-      inputBytesRead = 17L,
-      inputRecordsRead = 18L,
-      outputBytesWritten = 19L,
-      outputRecordsWritten = 20L,
-      shuffleRemoteBlocksFetched = 21L,
-      shuffleLocalBlocksFetched = 22L,
-      shuffleFetchWaitTime = 23L,
-      shuffleRemoteBytesRead = 24L,
-      shuffleRemoteBytesReadToDisk = 25L,
-      shuffleLocalBytesRead = 26L,
-      shuffleRecordsRead = 27L,
-      shuffleCorruptMergedBlockChunks = 28L,
-      shuffleMergedFetchFallbackCount = 29L,
-      shuffleMergedRemoteBlocksFetched = 30L,
-      shuffleMergedLocalBlocksFetched = 31L,
-      shuffleMergedRemoteChunksFetched = 32L,
-      shuffleMergedLocalChunksFetched = 33L,
-      shuffleMergedRemoteBytesRead = 34L,
-      shuffleMergedLocalBytesRead = 35L,
-      shuffleRemoteReqsDuration = 36L,
-      shuffleMergedRemoteReqDuration = 37L,
-      shuffleBytesWritten = 38L,
-      shuffleWriteTime = 39L,
-      shuffleRecordsWritten = 40L,
-      stageId = 41,
-      stageAttemptId = 42)
+    Seq(
+      ("executor_id_1", "host_name", "SUCCESS", "LOCAL"),
+      (null, null, null, null)
+    ).foreach { case (executorId, host, status, taskLocality) =>
+      val input = new TaskDataWrapper(
+        taskId = 1,
+        index = 2,
+        attempt = 3,
+        partitionId = 4,
+        launchTime = 5L,
+        resultFetchStart = 6L,
+        duration = 10000L,
+        executorId = executorId,
+        host = host,
+        status = status,
+        taskLocality = taskLocality,
+        speculative = true,
+        accumulatorUpdates = accumulatorUpdates,
+        errorMessage = Some("error"),
+        hasMetrics = true,
+        executorDeserializeTime = 7L,
+        executorDeserializeCpuTime = 8L,
+        executorRunTime = 9L,
+        executorCpuTime = 10L,
+        resultSize = 11L,
+        jvmGcTime = 12L,
+        resultSerializationTime = 13L,
+        memoryBytesSpilled = 14L,
+        diskBytesSpilled = 15L,
+        peakExecutionMemory = 16L,
+        inputBytesRead = 17L,
+        inputRecordsRead = 18L,
+        outputBytesWritten = 19L,
+        outputRecordsWritten = 20L,
+        shuffleRemoteBlocksFetched = 21L,
+        shuffleLocalBlocksFetched = 22L,
+        shuffleFetchWaitTime = 23L,
+        shuffleRemoteBytesRead = 24L,
+        shuffleRemoteBytesReadToDisk = 25L,
+        shuffleLocalBytesRead = 26L,
+        shuffleRecordsRead = 27L,
+        shuffleCorruptMergedBlockChunks = 28L,
+        shuffleMergedFetchFallbackCount = 29L,
+        shuffleMergedRemoteBlocksFetched = 30L,
+        shuffleMergedLocalBlocksFetched = 31L,
+        shuffleMergedRemoteChunksFetched = 32L,
+        shuffleMergedLocalChunksFetched = 33L,
+        shuffleMergedRemoteBytesRead = 34L,
+        shuffleMergedLocalBytesRead = 35L,
+        shuffleRemoteReqsDuration = 36L,
+        shuffleMergedRemoteReqDuration = 37L,
+        shuffleBytesWritten = 38L,
+        shuffleWriteTime = 39L,
+        shuffleRecordsWritten = 40L,
+        stageId = 41,
+        stageAttemptId = 42)
 
-    val bytes = serializer.serialize(input)
-    val result = serializer.deserialize(bytes, classOf[TaskDataWrapper])
-    checkAnswer(result.accumulatorUpdates, input.accumulatorUpdates)
-    assert(result.taskId == input.taskId)
-    assert(result.index == input.index)
-    assert(result.attempt == input.attempt)
-    assert(result.partitionId == input.partitionId)
-    assert(result.launchTime == input.launchTime)
-    assert(result.resultFetchStart == input.resultFetchStart)
-    assert(result.duration == input.duration)
-    assert(result.executorId == input.executorId)
-    assert(result.host == input.host)
-    assert(result.status == input.status)
-    assert(result.taskLocality == input.taskLocality)
-    assert(result.speculative == input.speculative)
-    assert(result.errorMessage == input.errorMessage)
-    assert(result.hasMetrics == input.hasMetrics)
-    assert(result.executorDeserializeTime == input.executorDeserializeTime)
-    assert(result.executorDeserializeCpuTime == 
input.executorDeserializeCpuTime)
-    assert(result.executorRunTime == input.executorRunTime)
-    assert(result.executorCpuTime == input.executorCpuTime)
-    assert(result.resultSize == input.resultSize)
-    assert(result.jvmGcTime == input.jvmGcTime)
-    assert(result.resultSerializationTime == input.resultSerializationTime)
-    assert(result.memoryBytesSpilled == input.memoryBytesSpilled)
-    assert(result.diskBytesSpilled == input.diskBytesSpilled)
-    assert(result.peakExecutionMemory == input.peakExecutionMemory)
-    assert(result.inputBytesRead == input.inputBytesRead)
-    assert(result.inputRecordsRead == input.inputRecordsRead)
-    assert(result.outputBytesWritten == input.outputBytesWritten)
-    assert(result.outputRecordsWritten == input.outputRecordsWritten)
-    assert(result.shuffleRemoteBlocksFetched == 
input.shuffleRemoteBlocksFetched)
-    assert(result.shuffleLocalBlocksFetched == input.shuffleLocalBlocksFetched)
-    assert(result.shuffleFetchWaitTime == input.shuffleFetchWaitTime)
-    assert(result.shuffleRemoteBytesRead == input.shuffleRemoteBytesRead)
-    assert(result.shuffleRemoteBytesReadToDisk == 
input.shuffleRemoteBytesReadToDisk)
-    assert(result.shuffleLocalBytesRead == input.shuffleLocalBytesRead)
-    assert(result.shuffleRecordsRead == input.shuffleRecordsRead)
-    assert(result.shuffleCorruptMergedBlockChunks == 
input.shuffleCorruptMergedBlockChunks)
-    assert(result.shuffleMergedFetchFallbackCount == 
input.shuffleMergedFetchFallbackCount)
-    assert(result.shuffleMergedRemoteBlocksFetched == 
input.shuffleMergedRemoteBlocksFetched)
-    assert(result.shuffleMergedLocalBlocksFetched == 
input.shuffleMergedLocalBlocksFetched)
-    assert(result.shuffleMergedRemoteChunksFetched == 
input.shuffleMergedRemoteChunksFetched)
-    assert(result.shuffleMergedLocalChunksFetched == 
input.shuffleMergedLocalChunksFetched)
-    assert(result.shuffleMergedRemoteBytesRead == 
input.shuffleMergedRemoteBytesRead)
-    assert(result.shuffleMergedLocalBytesRead == 
input.shuffleMergedLocalBytesRead)
-    assert(result.shuffleRemoteReqsDuration == input.shuffleRemoteReqsDuration)
-    assert(result.shuffleMergedRemoteReqDuration == 
input.shuffleMergedRemoteReqDuration)
-    assert(result.shuffleBytesWritten == input.shuffleBytesWritten)
-    assert(result.shuffleWriteTime == input.shuffleWriteTime)
-    assert(result.shuffleRecordsWritten == input.shuffleRecordsWritten)
-    assert(result.stageId == input.stageId)
-    assert(result.stageAttemptId == input.stageAttemptId)
+      val bytes = serializer.serialize(input)
+      val result = serializer.deserialize(bytes, classOf[TaskDataWrapper])
+      checkAnswer(result.accumulatorUpdates, input.accumulatorUpdates)
+      assert(result.taskId == input.taskId)
+      assert(result.index == input.index)
+      assert(result.attempt == input.attempt)
+      assert(result.partitionId == input.partitionId)
+      assert(result.launchTime == input.launchTime)
+      assert(result.resultFetchStart == input.resultFetchStart)
+      assert(result.duration == input.duration)
+      assert(result.executorId == input.executorId)
+      assert(result.host == input.host)
+      assert(result.status == input.status)
+      assert(result.taskLocality == input.taskLocality)
+      assert(result.speculative == input.speculative)
+      assert(result.errorMessage == input.errorMessage)
+      assert(result.hasMetrics == input.hasMetrics)
+      assert(result.executorDeserializeTime == input.executorDeserializeTime)
+      assert(result.executorDeserializeCpuTime == 
input.executorDeserializeCpuTime)
+      assert(result.executorRunTime == input.executorRunTime)
+      assert(result.executorCpuTime == input.executorCpuTime)
+      assert(result.resultSize == input.resultSize)
+      assert(result.jvmGcTime == input.jvmGcTime)
+      assert(result.resultSerializationTime == input.resultSerializationTime)
+      assert(result.memoryBytesSpilled == input.memoryBytesSpilled)
+      assert(result.diskBytesSpilled == input.diskBytesSpilled)
+      assert(result.peakExecutionMemory == input.peakExecutionMemory)
+      assert(result.inputBytesRead == input.inputBytesRead)
+      assert(result.inputRecordsRead == input.inputRecordsRead)
+      assert(result.outputBytesWritten == input.outputBytesWritten)
+      assert(result.outputRecordsWritten == input.outputRecordsWritten)
+      assert(result.shuffleRemoteBlocksFetched == 
input.shuffleRemoteBlocksFetched)
+      assert(result.shuffleLocalBlocksFetched == 
input.shuffleLocalBlocksFetched)
+      assert(result.shuffleFetchWaitTime == input.shuffleFetchWaitTime)
+      assert(result.shuffleRemoteBytesRead == input.shuffleRemoteBytesRead)
+      assert(result.shuffleRemoteBytesReadToDisk == 
input.shuffleRemoteBytesReadToDisk)
+      assert(result.shuffleLocalBytesRead == input.shuffleLocalBytesRead)
+      assert(result.shuffleRecordsRead == input.shuffleRecordsRead)
+      assert(result.shuffleCorruptMergedBlockChunks == 
input.shuffleCorruptMergedBlockChunks)
+      assert(result.shuffleMergedFetchFallbackCount == 
input.shuffleMergedFetchFallbackCount)
+      assert(result.shuffleMergedRemoteBlocksFetched == 
input.shuffleMergedRemoteBlocksFetched)
+      assert(result.shuffleMergedLocalBlocksFetched == 
input.shuffleMergedLocalBlocksFetched)
+      assert(result.shuffleMergedRemoteChunksFetched == 
input.shuffleMergedRemoteChunksFetched)
+      assert(result.shuffleMergedLocalChunksFetched == 
input.shuffleMergedLocalChunksFetched)
+      assert(result.shuffleMergedRemoteBytesRead == 
input.shuffleMergedRemoteBytesRead)
+      assert(result.shuffleMergedLocalBytesRead == 
input.shuffleMergedLocalBytesRead)
+      assert(result.shuffleRemoteReqsDuration == 
input.shuffleRemoteReqsDuration)
+      assert(result.shuffleMergedRemoteReqDuration == 
input.shuffleMergedRemoteReqDuration)
+      assert(result.shuffleBytesWritten == input.shuffleBytesWritten)
+      assert(result.shuffleWriteTime == input.shuffleWriteTime)
+      assert(result.shuffleRecordsWritten == input.shuffleRecordsWritten)
+      assert(result.stageId == input.stageId)
+      assert(result.stageAttemptId == input.stageAttemptId)
+    }
   }
 
   test("Executor Stage Summary") {
@@ -218,17 +228,19 @@ class KVStoreProtobufSerializerSuite extends 
SparkFunSuite {
       isBlacklistedForStage = true,
       peakMemoryMetrics = peakMemoryMetrics,
       isExcludedForStage = false)
-    val input = new ExecutorStageSummaryWrapper(
-      stageId = 1,
-      stageAttemptId = 2,
-      executorId = "executor_id_1",
-      info = info)
-    val bytes = serializer.serialize(input)
-    val result = serializer.deserialize(bytes, 
classOf[ExecutorStageSummaryWrapper])
-    assert(result.stageId == input.stageId)
-    assert(result.stageAttemptId == input.stageAttemptId)
-    assert(result.executorId == input.executorId)
-    checkAnswer(result.info, input.info)
+    Seq("executor_id_1", null).foreach { executorId =>
+      val input = new ExecutorStageSummaryWrapper(
+        stageId = 1,
+        stageAttemptId = 2,
+        executorId = executorId,
+        info = info)
+      val bytes = serializer.serialize(input)
+      val result = serializer.deserialize(bytes, 
classOf[ExecutorStageSummaryWrapper])
+      assert(result.stageId == input.stageId)
+      assert(result.stageAttemptId == input.stageAttemptId)
+      assert(result.executorId == input.executorId)
+      checkAnswer(result.info, input.info)
+    }
   }
 
   test("Application Environment Info") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to