This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 9a4804e0f3d [SPARK-42138][UI] Handle null string values in JobData/TaskDataWrapper/ExecutorStageSummaryWrapper 9a4804e0f3d is described below commit 9a4804e0f3dc8029eab1d270b505a3e1e783d139 Author: Gengliang Wang <gengli...@apache.org> AuthorDate: Fri Jan 20 22:35:14 2023 -0800 [SPARK-42138][UI] Handle null string values in JobData/TaskDataWrapper/ExecutorStageSummaryWrapper ### What changes were proposed in this pull request? Similar to https://github.com/apache/spark/pull/39666, this PR handles null string values in JobData/TaskDataWrapper/ExecutorStageSummaryWrapper ### Why are the changes needed? Properly handles null string values in the protobuf serializer. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New UTs Closes #39680 from gengliangwang/handleMoreNull. Authored-by: Gengliang Wang <gengli...@apache.org> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../apache/spark/status/protobuf/store_types.proto | 12 +- .../ExecutorStageSummaryWrapperSerializer.scala | 5 +- .../status/protobuf/JobDataWrapperSerializer.scala | 7 +- .../protobuf/TaskDataWrapperSerializer.scala | 19 +- .../protobuf/KVStoreProtobufSerializerSuite.scala | 340 +++++++++++---------- 5 files changed, 198 insertions(+), 185 deletions(-) diff --git a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto index 96c78aa001d..b7afa93c04a 100644 --- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto +++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto @@ -41,7 +41,7 @@ enum JobExecutionStatus { message JobData { // All IDs are int64 for extendability, even when they are currently int32 in Spark. int64 job_id = 1; - string name = 2; + optional string name = 2; optional string description = 3; optional int64 submission_time = 4; optional int64 completion_time = 5; @@ -83,10 +83,10 @@ message TaskDataWrapper { int64 launch_time = 5; int64 result_fetch_start = 6; int64 duration = 7; - string executor_id = 8; - string host = 9; - string status = 10; - string task_locality = 11; + optional string executor_id = 8; + optional string host = 9; + optional string status = 10; + optional string task_locality = 11; bool speculative = 12; repeated AccumulableInfo accumulator_updates = 13; optional string error_message = 14; @@ -156,7 +156,7 @@ message ExecutorStageSummary { message ExecutorStageSummaryWrapper { int64 stage_id = 1; int32 stage_attempt_id = 2; - string executor_id = 3; + optional string executor_id = 3; ExecutorStageSummary info = 4; } diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala index 2ffba9db755..8e41a857057 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorStageSummaryWrapperSerializer.scala @@ -18,6 +18,7 @@ package org.apache.spark.status.protobuf import org.apache.spark.status.ExecutorStageSummaryWrapper +import org.apache.spark.status.protobuf.Utils.{getStringField, setStringField} import org.apache.spark.util.Utils.weakIntern class ExecutorStageSummaryWrapperSerializer @@ -28,8 +29,8 @@ class ExecutorStageSummaryWrapperSerializer val builder = StoreTypes.ExecutorStageSummaryWrapper.newBuilder() .setStageId(input.stageId.toLong) .setStageAttemptId(input.stageAttemptId) - .setExecutorId(input.executorId) .setInfo(info) + setStringField(input.executorId, builder.setExecutorId) builder.build().toByteArray } @@ -39,7 +40,7 @@ class ExecutorStageSummaryWrapperSerializer new ExecutorStageSummaryWrapper( stageId = binary.getStageId.toInt, stageAttemptId = binary.getStageAttemptId, - executorId = weakIntern(binary.getExecutorId), + executorId = getStringField(binary.hasExecutorId, () => weakIntern(binary.getExecutorId)), info = info) } } diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala index 10e0f125f6c..55bb4e2549e 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala @@ -23,7 +23,7 @@ import collection.JavaConverters._ import org.apache.spark.status.JobDataWrapper import org.apache.spark.status.api.v1.JobData -import org.apache.spark.status.protobuf.Utils.getOptional +import org.apache.spark.status.protobuf.Utils.{getOptional, getStringField, setStringField} class JobDataWrapperSerializer extends ProtobufSerDe[JobDataWrapper] { @@ -49,7 +49,6 @@ class JobDataWrapperSerializer extends ProtobufSerDe[JobDataWrapper] { private def serializeJobData(jobData: JobData): StoreTypes.JobData = { val jobDataBuilder = StoreTypes.JobData.newBuilder() jobDataBuilder.setJobId(jobData.jobId.toLong) - .setName(jobData.name) .setStatus(JobExecutionStatusSerializer.serialize(jobData.status)) .setNumTasks(jobData.numTasks) .setNumActiveTasks(jobData.numActiveTasks) @@ -62,7 +61,7 @@ class JobDataWrapperSerializer extends ProtobufSerDe[JobDataWrapper] { .setNumCompletedStages(jobData.numCompletedStages) .setNumSkippedStages(jobData.numSkippedStages) .setNumFailedStages(jobData.numFailedStages) - + setStringField(jobData.name, jobDataBuilder.setName) jobData.description.foreach(jobDataBuilder.setDescription) jobData.submissionTime.foreach { d => jobDataBuilder.setSubmissionTime(d.getTime) @@ -88,7 +87,7 @@ class JobDataWrapperSerializer extends ProtobufSerDe[JobDataWrapper] { new JobData( jobId = info.getJobId.toInt, - name = info.getName, + name = getStringField(info.hasName, info.getName), description = description, submissionTime = submissionTime, completionTime = completionTime, diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala index 3c947c79eab..298a1612212 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/TaskDataWrapperSerializer.scala @@ -18,7 +18,7 @@ package org.apache.spark.status.protobuf import org.apache.spark.status.TaskDataWrapper -import org.apache.spark.status.protobuf.Utils.getOptional +import org.apache.spark.status.protobuf.Utils.{getOptional, getStringField, setStringField} import org.apache.spark.util.Utils.weakIntern class TaskDataWrapperSerializer extends ProtobufSerDe[TaskDataWrapper] { @@ -32,10 +32,6 @@ class TaskDataWrapperSerializer extends ProtobufSerDe[TaskDataWrapper] { .setLaunchTime(input.launchTime) .setResultFetchStart(input.resultFetchStart) .setDuration(input.duration) - .setExecutorId(input.executorId) - .setHost(input.host) - .setStatus(input.status) - .setTaskLocality(input.taskLocality) .setSpeculative(input.speculative) .setHasMetrics(input.hasMetrics) .setExecutorDeserializeTime(input.executorDeserializeTime) @@ -74,6 +70,10 @@ class TaskDataWrapperSerializer extends ProtobufSerDe[TaskDataWrapper] { .setShuffleRecordsWritten(input.shuffleRecordsWritten) .setStageId(input.stageId) .setStageAttemptId(input.stageAttemptId) + setStringField(input.executorId, builder.setExecutorId) + setStringField(input.host, builder.setHost) + setStringField(input.status, builder.setStatus) + setStringField(input.taskLocality, builder.setTaskLocality) input.errorMessage.foreach(builder.setErrorMessage) input.accumulatorUpdates.foreach { update => builder.addAccumulatorUpdates(AccumulableInfoSerializer.serialize(update)) @@ -92,10 +92,11 @@ class TaskDataWrapperSerializer extends ProtobufSerDe[TaskDataWrapper] { launchTime = binary.getLaunchTime, resultFetchStart = binary.getResultFetchStart, duration = binary.getDuration, - executorId = weakIntern(binary.getExecutorId), - host = weakIntern(binary.getHost), - status = weakIntern(binary.getStatus), - taskLocality = weakIntern(binary.getTaskLocality), + executorId = getStringField(binary.hasExecutorId, () => weakIntern(binary.getExecutorId)), + host = getStringField(binary.hasHost, () => weakIntern(binary.getHost)), + status = getStringField(binary.hasStatus, () => weakIntern(binary.getStatus)), + taskLocality = + getStringField(binary.hasTaskLocality, () => weakIntern(binary.getTaskLocality)), speculative = binary.getSpeculative, accumulatorUpdates = accumulatorUpdates, errorMessage = getOptional(binary.hasErrorMessage, binary.getErrorMessage), diff --git a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala index 0d0d26410ed..774d7abc420 100644 --- a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala @@ -32,55 +32,60 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { private val serializer = new KVStoreProtobufSerializer() test("Job data") { - val input = new JobDataWrapper( - new JobData( - jobId = 1, - name = "test", - description = Some("test description"), - submissionTime = Some(new Date(123456L)), - completionTime = Some(new Date(654321L)), - stageIds = Seq(1, 2, 3, 4), - jobGroup = Some("group"), - status = JobExecutionStatus.UNKNOWN, - numTasks = 2, - numActiveTasks = 3, - numCompletedTasks = 4, - numSkippedTasks = 5, - numFailedTasks = 6, - numKilledTasks = 7, - numCompletedIndices = 8, - numActiveStages = 9, - numCompletedStages = 10, - numSkippedStages = 11, - numFailedStages = 12, - killedTasksSummary = Map("a" -> 1, "b" -> 2)), - Set(1, 2), - Some(999) - ) + Seq( + ("test", Some("test description"), Some("group")), + (null, None, None) + ).foreach { case (name, description, jobGroup) => + val input = new JobDataWrapper( + new JobData( + jobId = 1, + name = name, + description = description, + submissionTime = Some(new Date(123456L)), + completionTime = Some(new Date(654321L)), + stageIds = Seq(1, 2, 3, 4), + jobGroup = jobGroup, + status = JobExecutionStatus.UNKNOWN, + numTasks = 2, + numActiveTasks = 3, + numCompletedTasks = 4, + numSkippedTasks = 5, + numFailedTasks = 6, + numKilledTasks = 7, + numCompletedIndices = 8, + numActiveStages = 9, + numCompletedStages = 10, + numSkippedStages = 11, + numFailedStages = 12, + killedTasksSummary = Map("a" -> 1, "b" -> 2)), + Set(1, 2), + Some(999) + ) - val bytes = serializer.serialize(input) - val result = serializer.deserialize(bytes, classOf[JobDataWrapper]) - assert(result.info.jobId == input.info.jobId) - assert(result.info.description == input.info.description) - assert(result.info.submissionTime == input.info.submissionTime) - assert(result.info.completionTime == input.info.completionTime) - assert(result.info.stageIds == input.info.stageIds) - assert(result.info.jobGroup == input.info.jobGroup) - assert(result.info.status == input.info.status) - assert(result.info.numTasks == input.info.numTasks) - assert(result.info.numActiveTasks == input.info.numActiveTasks) - assert(result.info.numCompletedTasks == input.info.numCompletedTasks) - assert(result.info.numSkippedTasks == input.info.numSkippedTasks) - assert(result.info.numFailedTasks == input.info.numFailedTasks) - assert(result.info.numKilledTasks == input.info.numKilledTasks) - assert(result.info.numCompletedIndices == input.info.numCompletedIndices) - assert(result.info.numActiveStages == input.info.numActiveStages) - assert(result.info.numCompletedStages == input.info.numCompletedStages) - assert(result.info.numSkippedStages == input.info.numSkippedStages) - assert(result.info.numFailedStages == input.info.numFailedStages) - assert(result.info.killedTasksSummary == input.info.killedTasksSummary) - assert(result.skippedStages == input.skippedStages) - assert(result.sqlExecutionId == input.sqlExecutionId) + val bytes = serializer.serialize(input) + val result = serializer.deserialize(bytes, classOf[JobDataWrapper]) + assert(result.info.jobId == input.info.jobId) + assert(result.info.description == input.info.description) + assert(result.info.submissionTime == input.info.submissionTime) + assert(result.info.completionTime == input.info.completionTime) + assert(result.info.stageIds == input.info.stageIds) + assert(result.info.jobGroup == input.info.jobGroup) + assert(result.info.status == input.info.status) + assert(result.info.numTasks == input.info.numTasks) + assert(result.info.numActiveTasks == input.info.numActiveTasks) + assert(result.info.numCompletedTasks == input.info.numCompletedTasks) + assert(result.info.numSkippedTasks == input.info.numSkippedTasks) + assert(result.info.numFailedTasks == input.info.numFailedTasks) + assert(result.info.numKilledTasks == input.info.numKilledTasks) + assert(result.info.numCompletedIndices == input.info.numCompletedIndices) + assert(result.info.numActiveStages == input.info.numActiveStages) + assert(result.info.numCompletedStages == input.info.numCompletedStages) + assert(result.info.numSkippedStages == input.info.numSkippedStages) + assert(result.info.numFailedStages == input.info.numFailedStages) + assert(result.info.killedTasksSummary == input.info.killedTasksSummary) + assert(result.skippedStages == input.skippedStages) + assert(result.sqlExecutionId == input.sqlExecutionId) + } } test("Task Data") { @@ -89,112 +94,117 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { new AccumulableInfo(2L, "duration2", None, "value2"), new AccumulableInfo(-1L, null, None, null) ) - val input = new TaskDataWrapper( - taskId = 1, - index = 2, - attempt = 3, - partitionId = 4, - launchTime = 5L, - resultFetchStart = 6L, - duration = 10000L, - executorId = "executor_id_1", - host = "host_name", - status = "SUCCESS", - taskLocality = "LOCAL", - speculative = true, - accumulatorUpdates = accumulatorUpdates, - errorMessage = Some("error"), - hasMetrics = true, - executorDeserializeTime = 7L, - executorDeserializeCpuTime = 8L, - executorRunTime = 9L, - executorCpuTime = 10L, - resultSize = 11L, - jvmGcTime = 12L, - resultSerializationTime = 13L, - memoryBytesSpilled = 14L, - diskBytesSpilled = 15L, - peakExecutionMemory = 16L, - inputBytesRead = 17L, - inputRecordsRead = 18L, - outputBytesWritten = 19L, - outputRecordsWritten = 20L, - shuffleRemoteBlocksFetched = 21L, - shuffleLocalBlocksFetched = 22L, - shuffleFetchWaitTime = 23L, - shuffleRemoteBytesRead = 24L, - shuffleRemoteBytesReadToDisk = 25L, - shuffleLocalBytesRead = 26L, - shuffleRecordsRead = 27L, - shuffleCorruptMergedBlockChunks = 28L, - shuffleMergedFetchFallbackCount = 29L, - shuffleMergedRemoteBlocksFetched = 30L, - shuffleMergedLocalBlocksFetched = 31L, - shuffleMergedRemoteChunksFetched = 32L, - shuffleMergedLocalChunksFetched = 33L, - shuffleMergedRemoteBytesRead = 34L, - shuffleMergedLocalBytesRead = 35L, - shuffleRemoteReqsDuration = 36L, - shuffleMergedRemoteReqDuration = 37L, - shuffleBytesWritten = 38L, - shuffleWriteTime = 39L, - shuffleRecordsWritten = 40L, - stageId = 41, - stageAttemptId = 42) + Seq( + ("executor_id_1", "host_name", "SUCCESS", "LOCAL"), + (null, null, null, null) + ).foreach { case (executorId, host, status, taskLocality) => + val input = new TaskDataWrapper( + taskId = 1, + index = 2, + attempt = 3, + partitionId = 4, + launchTime = 5L, + resultFetchStart = 6L, + duration = 10000L, + executorId = executorId, + host = host, + status = status, + taskLocality = taskLocality, + speculative = true, + accumulatorUpdates = accumulatorUpdates, + errorMessage = Some("error"), + hasMetrics = true, + executorDeserializeTime = 7L, + executorDeserializeCpuTime = 8L, + executorRunTime = 9L, + executorCpuTime = 10L, + resultSize = 11L, + jvmGcTime = 12L, + resultSerializationTime = 13L, + memoryBytesSpilled = 14L, + diskBytesSpilled = 15L, + peakExecutionMemory = 16L, + inputBytesRead = 17L, + inputRecordsRead = 18L, + outputBytesWritten = 19L, + outputRecordsWritten = 20L, + shuffleRemoteBlocksFetched = 21L, + shuffleLocalBlocksFetched = 22L, + shuffleFetchWaitTime = 23L, + shuffleRemoteBytesRead = 24L, + shuffleRemoteBytesReadToDisk = 25L, + shuffleLocalBytesRead = 26L, + shuffleRecordsRead = 27L, + shuffleCorruptMergedBlockChunks = 28L, + shuffleMergedFetchFallbackCount = 29L, + shuffleMergedRemoteBlocksFetched = 30L, + shuffleMergedLocalBlocksFetched = 31L, + shuffleMergedRemoteChunksFetched = 32L, + shuffleMergedLocalChunksFetched = 33L, + shuffleMergedRemoteBytesRead = 34L, + shuffleMergedLocalBytesRead = 35L, + shuffleRemoteReqsDuration = 36L, + shuffleMergedRemoteReqDuration = 37L, + shuffleBytesWritten = 38L, + shuffleWriteTime = 39L, + shuffleRecordsWritten = 40L, + stageId = 41, + stageAttemptId = 42) - val bytes = serializer.serialize(input) - val result = serializer.deserialize(bytes, classOf[TaskDataWrapper]) - checkAnswer(result.accumulatorUpdates, input.accumulatorUpdates) - assert(result.taskId == input.taskId) - assert(result.index == input.index) - assert(result.attempt == input.attempt) - assert(result.partitionId == input.partitionId) - assert(result.launchTime == input.launchTime) - assert(result.resultFetchStart == input.resultFetchStart) - assert(result.duration == input.duration) - assert(result.executorId == input.executorId) - assert(result.host == input.host) - assert(result.status == input.status) - assert(result.taskLocality == input.taskLocality) - assert(result.speculative == input.speculative) - assert(result.errorMessage == input.errorMessage) - assert(result.hasMetrics == input.hasMetrics) - assert(result.executorDeserializeTime == input.executorDeserializeTime) - assert(result.executorDeserializeCpuTime == input.executorDeserializeCpuTime) - assert(result.executorRunTime == input.executorRunTime) - assert(result.executorCpuTime == input.executorCpuTime) - assert(result.resultSize == input.resultSize) - assert(result.jvmGcTime == input.jvmGcTime) - assert(result.resultSerializationTime == input.resultSerializationTime) - assert(result.memoryBytesSpilled == input.memoryBytesSpilled) - assert(result.diskBytesSpilled == input.diskBytesSpilled) - assert(result.peakExecutionMemory == input.peakExecutionMemory) - assert(result.inputBytesRead == input.inputBytesRead) - assert(result.inputRecordsRead == input.inputRecordsRead) - assert(result.outputBytesWritten == input.outputBytesWritten) - assert(result.outputRecordsWritten == input.outputRecordsWritten) - assert(result.shuffleRemoteBlocksFetched == input.shuffleRemoteBlocksFetched) - assert(result.shuffleLocalBlocksFetched == input.shuffleLocalBlocksFetched) - assert(result.shuffleFetchWaitTime == input.shuffleFetchWaitTime) - assert(result.shuffleRemoteBytesRead == input.shuffleRemoteBytesRead) - assert(result.shuffleRemoteBytesReadToDisk == input.shuffleRemoteBytesReadToDisk) - assert(result.shuffleLocalBytesRead == input.shuffleLocalBytesRead) - assert(result.shuffleRecordsRead == input.shuffleRecordsRead) - assert(result.shuffleCorruptMergedBlockChunks == input.shuffleCorruptMergedBlockChunks) - assert(result.shuffleMergedFetchFallbackCount == input.shuffleMergedFetchFallbackCount) - assert(result.shuffleMergedRemoteBlocksFetched == input.shuffleMergedRemoteBlocksFetched) - assert(result.shuffleMergedLocalBlocksFetched == input.shuffleMergedLocalBlocksFetched) - assert(result.shuffleMergedRemoteChunksFetched == input.shuffleMergedRemoteChunksFetched) - assert(result.shuffleMergedLocalChunksFetched == input.shuffleMergedLocalChunksFetched) - assert(result.shuffleMergedRemoteBytesRead == input.shuffleMergedRemoteBytesRead) - assert(result.shuffleMergedLocalBytesRead == input.shuffleMergedLocalBytesRead) - assert(result.shuffleRemoteReqsDuration == input.shuffleRemoteReqsDuration) - assert(result.shuffleMergedRemoteReqDuration == input.shuffleMergedRemoteReqDuration) - assert(result.shuffleBytesWritten == input.shuffleBytesWritten) - assert(result.shuffleWriteTime == input.shuffleWriteTime) - assert(result.shuffleRecordsWritten == input.shuffleRecordsWritten) - assert(result.stageId == input.stageId) - assert(result.stageAttemptId == input.stageAttemptId) + val bytes = serializer.serialize(input) + val result = serializer.deserialize(bytes, classOf[TaskDataWrapper]) + checkAnswer(result.accumulatorUpdates, input.accumulatorUpdates) + assert(result.taskId == input.taskId) + assert(result.index == input.index) + assert(result.attempt == input.attempt) + assert(result.partitionId == input.partitionId) + assert(result.launchTime == input.launchTime) + assert(result.resultFetchStart == input.resultFetchStart) + assert(result.duration == input.duration) + assert(result.executorId == input.executorId) + assert(result.host == input.host) + assert(result.status == input.status) + assert(result.taskLocality == input.taskLocality) + assert(result.speculative == input.speculative) + assert(result.errorMessage == input.errorMessage) + assert(result.hasMetrics == input.hasMetrics) + assert(result.executorDeserializeTime == input.executorDeserializeTime) + assert(result.executorDeserializeCpuTime == input.executorDeserializeCpuTime) + assert(result.executorRunTime == input.executorRunTime) + assert(result.executorCpuTime == input.executorCpuTime) + assert(result.resultSize == input.resultSize) + assert(result.jvmGcTime == input.jvmGcTime) + assert(result.resultSerializationTime == input.resultSerializationTime) + assert(result.memoryBytesSpilled == input.memoryBytesSpilled) + assert(result.diskBytesSpilled == input.diskBytesSpilled) + assert(result.peakExecutionMemory == input.peakExecutionMemory) + assert(result.inputBytesRead == input.inputBytesRead) + assert(result.inputRecordsRead == input.inputRecordsRead) + assert(result.outputBytesWritten == input.outputBytesWritten) + assert(result.outputRecordsWritten == input.outputRecordsWritten) + assert(result.shuffleRemoteBlocksFetched == input.shuffleRemoteBlocksFetched) + assert(result.shuffleLocalBlocksFetched == input.shuffleLocalBlocksFetched) + assert(result.shuffleFetchWaitTime == input.shuffleFetchWaitTime) + assert(result.shuffleRemoteBytesRead == input.shuffleRemoteBytesRead) + assert(result.shuffleRemoteBytesReadToDisk == input.shuffleRemoteBytesReadToDisk) + assert(result.shuffleLocalBytesRead == input.shuffleLocalBytesRead) + assert(result.shuffleRecordsRead == input.shuffleRecordsRead) + assert(result.shuffleCorruptMergedBlockChunks == input.shuffleCorruptMergedBlockChunks) + assert(result.shuffleMergedFetchFallbackCount == input.shuffleMergedFetchFallbackCount) + assert(result.shuffleMergedRemoteBlocksFetched == input.shuffleMergedRemoteBlocksFetched) + assert(result.shuffleMergedLocalBlocksFetched == input.shuffleMergedLocalBlocksFetched) + assert(result.shuffleMergedRemoteChunksFetched == input.shuffleMergedRemoteChunksFetched) + assert(result.shuffleMergedLocalChunksFetched == input.shuffleMergedLocalChunksFetched) + assert(result.shuffleMergedRemoteBytesRead == input.shuffleMergedRemoteBytesRead) + assert(result.shuffleMergedLocalBytesRead == input.shuffleMergedLocalBytesRead) + assert(result.shuffleRemoteReqsDuration == input.shuffleRemoteReqsDuration) + assert(result.shuffleMergedRemoteReqDuration == input.shuffleMergedRemoteReqDuration) + assert(result.shuffleBytesWritten == input.shuffleBytesWritten) + assert(result.shuffleWriteTime == input.shuffleWriteTime) + assert(result.shuffleRecordsWritten == input.shuffleRecordsWritten) + assert(result.stageId == input.stageId) + assert(result.stageAttemptId == input.stageAttemptId) + } } test("Executor Stage Summary") { @@ -218,17 +228,19 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { isBlacklistedForStage = true, peakMemoryMetrics = peakMemoryMetrics, isExcludedForStage = false) - val input = new ExecutorStageSummaryWrapper( - stageId = 1, - stageAttemptId = 2, - executorId = "executor_id_1", - info = info) - val bytes = serializer.serialize(input) - val result = serializer.deserialize(bytes, classOf[ExecutorStageSummaryWrapper]) - assert(result.stageId == input.stageId) - assert(result.stageAttemptId == input.stageAttemptId) - assert(result.executorId == input.executorId) - checkAnswer(result.info, input.info) + Seq("executor_id_1", null).foreach { executorId => + val input = new ExecutorStageSummaryWrapper( + stageId = 1, + stageAttemptId = 2, + executorId = executorId, + info = info) + val bytes = serializer.serialize(input) + val result = serializer.deserialize(bytes, classOf[ExecutorStageSummaryWrapper]) + assert(result.stageId == input.stageId) + assert(result.stageAttemptId == input.stageAttemptId) + assert(result.executorId == input.executorId) + checkAnswer(result.info, input.info) + } } test("Application Environment Info") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org