This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 492356d1646 [SPARK-41768][CORE] Refactor the definition of enum to follow with the code style 492356d1646 is described below commit 492356d1646a5e7571dad7e3107a11f765ee810a Author: panbingkun <pbk1...@gmail.com> AuthorDate: Wed Jan 4 13:15:04 2023 -0800 [SPARK-41768][CORE] Refactor the definition of enum to follow with the code style ### What changes were proposed in this pull request? The pr aims to refactor the definition of enum in `UI protobuf serializer` to follow with the code style. ### Why are the changes needed? Following code style: https://developers.google.com/protocol-buffers/docs/style#enums <img width="860" alt="image" src="https://user-images.githubusercontent.com/15246973/209946067-4c541101-be0d-49a6-9812-768ba98423a4.png"> ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA Existed UT. Closes #39286 from panbingkun/SPARK-41768. Authored-by: panbingkun <pbk1...@gmail.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../apache/spark/status/protobuf/store_types.proto | 21 ++++++----- .../status/protobuf/JobDataWrapperSerializer.scala | 12 ++---- .../protobuf/JobExecutionStatusSerializer.scala | 43 ++++++++++++++++++++++ .../RDDOperationGraphWrapperSerializer.scala | 35 ++++++++++++++++-- .../status/protobuf/StageStatusSerializer.scala | 26 +++++++++---- .../sql/SQLExecutionUIDataSerializer.scala | 7 ++-- 6 files changed, 110 insertions(+), 34 deletions(-) diff --git a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto index 38b82518ddd..6ba1915dfa1 100644 --- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto +++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto @@ -27,10 +27,10 @@ package org.apache.spark.status.protobuf; enum JobExecutionStatus { JOB_EXECUTION_STATUS_UNSPECIFIED = 0; - RUNNING = 1; - SUCCEEDED = 2; - FAILED = 3; - UNKNOWN = 4; + JOB_EXECUTION_STATUS_RUNNING = 1; + JOB_EXECUTION_STATUS_SUCCEEDED = 2; + JOB_EXECUTION_STATUS_FAILED = 3; + JOB_EXECUTION_STATUS_UNKNOWN = 4; } message JobData { @@ -434,13 +434,14 @@ message RDDOperationEdge { int32 to_id = 2; } +enum DeterministicLevel { + DETERMINISTIC_LEVEL_UNSPECIFIED = 0; + DETERMINISTIC_LEVEL_DETERMINATE = 1; + DETERMINISTIC_LEVEL_UNORDERED = 2; + DETERMINISTIC_LEVEL_INDETERMINATE = 3; +} + message RDDOperationNode { - enum DeterministicLevel { - UNSPECIFIED = 0; - DETERMINATE = 1; - UNORDERED = 2; - INDETERMINATE = 3; - } int32 id = 1; string name = 2; bool cached = 3; diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala index 98ac2d643c9..e2e2a1a8d89 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala @@ -17,10 +17,10 @@ package org.apache.spark.status.protobuf -import collection.JavaConverters._ import java.util.Date -import org.apache.spark.JobExecutionStatus +import collection.JavaConverters._ + import org.apache.spark.status.JobDataWrapper import org.apache.spark.status.api.v1.JobData import org.apache.spark.status.protobuf.Utils.getOptional @@ -55,7 +55,7 @@ class JobDataWrapperSerializer extends ProtobufSerDe { val jobDataBuilder = StoreTypes.JobData.newBuilder() jobDataBuilder.setJobId(jobData.jobId.toLong) .setName(jobData.name) - .setStatus(serializeJobExecutionStatus(jobData.status)) + .setStatus(JobExecutionStatusSerializer.serialize(jobData.status)) .setNumTasks(jobData.numTasks) .setNumActiveTasks(jobData.numActiveTasks) .setNumCompletedTasks(jobData.numCompletedTasks) @@ -89,7 +89,7 @@ class JobDataWrapperSerializer extends ProtobufSerDe { getOptional(info.hasSubmissionTime, () => new Date(info.getSubmissionTime)) val completionTime = getOptional(info.hasCompletionTime, () => new Date(info.getCompletionTime)) val jobGroup = getOptional(info.hasJobGroup, info.getJobGroup) - val status = JobExecutionStatus.valueOf(info.getStatus.toString) + val status = JobExecutionStatusSerializer.deserialize(info.getStatus) new JobData( jobId = info.getJobId.toInt, @@ -113,8 +113,4 @@ class JobDataWrapperSerializer extends ProtobufSerDe { numFailedStages = info.getNumFailedStages, killedTasksSummary = info.getKillTasksSummaryMap.asScala.mapValues(_.toInt).toMap) } - - private def serializeJobExecutionStatus(j: JobExecutionStatus): StoreTypes.JobExecutionStatus = { - StoreTypes.JobExecutionStatus.valueOf(j.toString) - } } diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/JobExecutionStatusSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/JobExecutionStatusSerializer.scala new file mode 100644 index 00000000000..fd07da61a9e --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/protobuf/JobExecutionStatusSerializer.scala @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.protobuf + +import org.apache.spark.JobExecutionStatus +import org.apache.spark.status.protobuf.StoreTypes.{JobExecutionStatus => GJobExecutionStatus} + +private[protobuf] object JobExecutionStatusSerializer { + + def serialize(input: JobExecutionStatus): GJobExecutionStatus = { + input match { + case JobExecutionStatus.RUNNING => GJobExecutionStatus.JOB_EXECUTION_STATUS_RUNNING + case JobExecutionStatus.SUCCEEDED => GJobExecutionStatus.JOB_EXECUTION_STATUS_SUCCEEDED + case JobExecutionStatus.FAILED => GJobExecutionStatus.JOB_EXECUTION_STATUS_FAILED + case JobExecutionStatus.UNKNOWN => GJobExecutionStatus.JOB_EXECUTION_STATUS_UNKNOWN + } + } + + def deserialize(binary: GJobExecutionStatus): JobExecutionStatus = { + binary match { + case GJobExecutionStatus.JOB_EXECUTION_STATUS_RUNNING => JobExecutionStatus.RUNNING + case GJobExecutionStatus.JOB_EXECUTION_STATUS_SUCCEEDED => JobExecutionStatus.SUCCEEDED + case GJobExecutionStatus.JOB_EXECUTION_STATUS_FAILED => JobExecutionStatus.FAILED + case GJobExecutionStatus.JOB_EXECUTION_STATUS_UNKNOWN => JobExecutionStatus.UNKNOWN + case _ => null + } + } +} diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala index 8975062082c..44622514ac9 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala @@ -21,6 +21,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.rdd.DeterministicLevel import org.apache.spark.status.{RDDOperationClusterWrapper, RDDOperationGraphWrapper} +import org.apache.spark.status.protobuf.StoreTypes.{DeterministicLevel => GDeterministicLevel} import org.apache.spark.ui.scope.{RDDOperationEdge, RDDOperationNode} class RDDOperationGraphWrapperSerializer extends ProtobufSerDe { @@ -81,8 +82,8 @@ class RDDOperationGraphWrapperSerializer extends ProtobufSerDe { } private def serializeRDDOperationNode(node: RDDOperationNode): StoreTypes.RDDOperationNode = { - val outputDeterministicLevel = StoreTypes.RDDOperationNode.DeterministicLevel - .valueOf(node.outputDeterministicLevel.toString) + val outputDeterministicLevel = DeterministicLevelSerializer.serialize( + node.outputDeterministicLevel) val builder = StoreTypes.RDDOperationNode.newBuilder() builder.setId(node.id) builder.setName(node.name) @@ -100,8 +101,8 @@ class RDDOperationGraphWrapperSerializer extends ProtobufSerDe { cached = node.getCached, barrier = node.getBarrier, callsite = node.getCallsite, - outputDeterministicLevel = - DeterministicLevel.withName(node.getOutputDeterministicLevel.toString) + outputDeterministicLevel = DeterministicLevelSerializer.deserialize( + node.getOutputDeterministicLevel) ) } @@ -118,3 +119,29 @@ class RDDOperationGraphWrapperSerializer extends ProtobufSerDe { toId = edge.getToId) } } + +private[protobuf] object DeterministicLevelSerializer { + + def serialize(input: DeterministicLevel.Value): GDeterministicLevel = { + input match { + case DeterministicLevel.DETERMINATE => + GDeterministicLevel.DETERMINISTIC_LEVEL_DETERMINATE + case DeterministicLevel.UNORDERED => + GDeterministicLevel.DETERMINISTIC_LEVEL_UNORDERED + case DeterministicLevel.INDETERMINATE => + GDeterministicLevel.DETERMINISTIC_LEVEL_INDETERMINATE + } + } + + def deserialize(binary: GDeterministicLevel): DeterministicLevel.Value = { + binary match { + case GDeterministicLevel.DETERMINISTIC_LEVEL_DETERMINATE => + DeterministicLevel.DETERMINATE + case GDeterministicLevel.DETERMINISTIC_LEVEL_UNORDERED => + DeterministicLevel.UNORDERED + case GDeterministicLevel.DETERMINISTIC_LEVEL_INDETERMINATE => + DeterministicLevel.INDETERMINATE + case _ => null + } + } +} diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/StageStatusSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/StageStatusSerializer.scala index 6014379bb1e..fbd874cf541 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/StageStatusSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/StageStatusSerializer.scala @@ -17,19 +17,29 @@ package org.apache.spark.status.protobuf -import org.apache.commons.lang3.StringUtils - import org.apache.spark.status.api.v1.StageStatus +import org.apache.spark.status.protobuf.StoreTypes.{StageStatus => GStageStatus} private[protobuf] object StageStatusSerializer { - private def PREFIX = "STAGE_STATUS_" - - def serialize(input: StageStatus): StoreTypes.StageStatus = { - StoreTypes.StageStatus.valueOf(PREFIX + input.toString) + def serialize(input: StageStatus): GStageStatus = { + input match { + case StageStatus.ACTIVE => GStageStatus.STAGE_STATUS_ACTIVE + case StageStatus.COMPLETE => GStageStatus.STAGE_STATUS_COMPLETE + case StageStatus.FAILED => GStageStatus.STAGE_STATUS_FAILED + case StageStatus.PENDING => GStageStatus.STAGE_STATUS_PENDING + case StageStatus.SKIPPED => GStageStatus.STAGE_STATUS_SKIPPED + } } - def deserialize(binary: StoreTypes.StageStatus): StageStatus = { - StageStatus.valueOf(StringUtils.removeStart(binary.toString, PREFIX)) + def deserialize(binary: GStageStatus): StageStatus = { + binary match { + case GStageStatus.STAGE_STATUS_ACTIVE => StageStatus.ACTIVE + case GStageStatus.STAGE_STATUS_COMPLETE => StageStatus.COMPLETE + case GStageStatus.STAGE_STATUS_FAILED => StageStatus.FAILED + case GStageStatus.STAGE_STATUS_PENDING => StageStatus.PENDING + case GStageStatus.STAGE_STATUS_SKIPPED => StageStatus.SKIPPED + case _ => null + } } } diff --git a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala index 77b6f8925cb..7a4a3e2a55d 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala @@ -21,9 +21,8 @@ import java.util.Date import collection.JavaConverters._ -import org.apache.spark.JobExecutionStatus import org.apache.spark.sql.execution.ui.SQLExecutionUIData -import org.apache.spark.status.protobuf.{ProtobufSerDe, StoreTypes} +import org.apache.spark.status.protobuf.{JobExecutionStatusSerializer, ProtobufSerDe, StoreTypes} import org.apache.spark.status.protobuf.Utils.getOptional class SQLExecutionUIDataSerializer extends ProtobufSerDe { @@ -46,7 +45,7 @@ class SQLExecutionUIDataSerializer extends ProtobufSerDe { ui.errorMessage.foreach(builder.setErrorMessage) ui.jobs.foreach { case (id, status) => - builder.putJobs(id.toLong, StoreTypes.JobExecutionStatus.valueOf(status.toString)) + builder.putJobs(id.toLong, JobExecutionStatusSerializer.serialize(status)) } ui.stages.foreach(stageId => builder.addStages(stageId.toLong)) val metricValues = ui.metricValues @@ -66,7 +65,7 @@ class SQLExecutionUIDataSerializer extends ProtobufSerDe { val metrics = ui.getMetricsList.asScala.map(m => SQLPlanMetricSerializer.deserialize(m)) val jobs = ui.getJobsMap.asScala.map { - case (jobId, status) => jobId.toInt -> JobExecutionStatus.valueOf(status.toString) + case (jobId, status) => jobId.toInt -> JobExecutionStatusSerializer.deserialize(status) }.toMap val metricValues = ui.getMetricValuesMap.asScala.map { case (k, v) => k.toLong -> v --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org