This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 492356d1646 [SPARK-41768][CORE] Refactor the definition of enum to 
follow with the code style
492356d1646 is described below

commit 492356d1646a5e7571dad7e3107a11f765ee810a
Author: panbingkun <pbk1...@gmail.com>
AuthorDate: Wed Jan 4 13:15:04 2023 -0800

    [SPARK-41768][CORE] Refactor the definition of enum to follow with the code 
style
    
    ### What changes were proposed in this pull request?
    The pr aims to refactor the definition of enum in `UI protobuf serializer` 
to follow with the code style.
    
    ### Why are the changes needed?
    Following code style:
    https://developers.google.com/protocol-buffers/docs/style#enums
    <img width="860" alt="image" 
src="https://user-images.githubusercontent.com/15246973/209946067-4c541101-be0d-49a6-9812-768ba98423a4.png";>
    
    ### Does this PR introduce _any_ user-facing change?
    No.
    
    ### How was this patch tested?
    Pass GA
    Existed UT.
    
    Closes #39286 from panbingkun/SPARK-41768.
    
    Authored-by: panbingkun <pbk1...@gmail.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../apache/spark/status/protobuf/store_types.proto | 21 ++++++-----
 .../status/protobuf/JobDataWrapperSerializer.scala | 12 ++----
 .../protobuf/JobExecutionStatusSerializer.scala    | 43 ++++++++++++++++++++++
 .../RDDOperationGraphWrapperSerializer.scala       | 35 ++++++++++++++++--
 .../status/protobuf/StageStatusSerializer.scala    | 26 +++++++++----
 .../sql/SQLExecutionUIDataSerializer.scala         |  7 ++--
 6 files changed, 110 insertions(+), 34 deletions(-)

diff --git 
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto 
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index 38b82518ddd..6ba1915dfa1 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -27,10 +27,10 @@ package org.apache.spark.status.protobuf;
 
 enum JobExecutionStatus {
   JOB_EXECUTION_STATUS_UNSPECIFIED = 0;
-  RUNNING = 1;
-  SUCCEEDED = 2;
-  FAILED = 3;
-  UNKNOWN = 4;
+  JOB_EXECUTION_STATUS_RUNNING = 1;
+  JOB_EXECUTION_STATUS_SUCCEEDED = 2;
+  JOB_EXECUTION_STATUS_FAILED = 3;
+  JOB_EXECUTION_STATUS_UNKNOWN = 4;
 }
 
 message JobData {
@@ -434,13 +434,14 @@ message RDDOperationEdge {
   int32 to_id = 2;
 }
 
+enum DeterministicLevel {
+  DETERMINISTIC_LEVEL_UNSPECIFIED = 0;
+  DETERMINISTIC_LEVEL_DETERMINATE = 1;
+  DETERMINISTIC_LEVEL_UNORDERED = 2;
+  DETERMINISTIC_LEVEL_INDETERMINATE = 3;
+}
+
 message RDDOperationNode {
-  enum DeterministicLevel {
-    UNSPECIFIED = 0;
-    DETERMINATE = 1;
-    UNORDERED = 2;
-    INDETERMINATE = 3;
-  }
   int32 id = 1;
   string name = 2;
   bool cached = 3;
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
index 98ac2d643c9..e2e2a1a8d89 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
@@ -17,10 +17,10 @@
 
 package org.apache.spark.status.protobuf
 
-import collection.JavaConverters._
 import java.util.Date
 
-import org.apache.spark.JobExecutionStatus
+import collection.JavaConverters._
+
 import org.apache.spark.status.JobDataWrapper
 import org.apache.spark.status.api.v1.JobData
 import org.apache.spark.status.protobuf.Utils.getOptional
@@ -55,7 +55,7 @@ class JobDataWrapperSerializer extends ProtobufSerDe {
     val jobDataBuilder = StoreTypes.JobData.newBuilder()
     jobDataBuilder.setJobId(jobData.jobId.toLong)
       .setName(jobData.name)
-      .setStatus(serializeJobExecutionStatus(jobData.status))
+      .setStatus(JobExecutionStatusSerializer.serialize(jobData.status))
       .setNumTasks(jobData.numTasks)
       .setNumActiveTasks(jobData.numActiveTasks)
       .setNumCompletedTasks(jobData.numCompletedTasks)
@@ -89,7 +89,7 @@ class JobDataWrapperSerializer extends ProtobufSerDe {
       getOptional(info.hasSubmissionTime, () => new 
Date(info.getSubmissionTime))
     val completionTime = getOptional(info.hasCompletionTime, () => new 
Date(info.getCompletionTime))
     val jobGroup = getOptional(info.hasJobGroup, info.getJobGroup)
-    val status = JobExecutionStatus.valueOf(info.getStatus.toString)
+    val status = JobExecutionStatusSerializer.deserialize(info.getStatus)
 
     new JobData(
       jobId = info.getJobId.toInt,
@@ -113,8 +113,4 @@ class JobDataWrapperSerializer extends ProtobufSerDe {
       numFailedStages = info.getNumFailedStages,
       killedTasksSummary = 
info.getKillTasksSummaryMap.asScala.mapValues(_.toInt).toMap)
   }
-
-  private def serializeJobExecutionStatus(j: JobExecutionStatus): 
StoreTypes.JobExecutionStatus = {
-    StoreTypes.JobExecutionStatus.valueOf(j.toString)
-  }
 }
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/JobExecutionStatusSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/JobExecutionStatusSerializer.scala
new file mode 100644
index 00000000000..fd07da61a9e
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/JobExecutionStatusSerializer.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf
+
+import org.apache.spark.JobExecutionStatus
+import org.apache.spark.status.protobuf.StoreTypes.{JobExecutionStatus => 
GJobExecutionStatus}
+
+private[protobuf] object JobExecutionStatusSerializer {
+
+  def serialize(input: JobExecutionStatus): GJobExecutionStatus = {
+    input match {
+      case JobExecutionStatus.RUNNING => 
GJobExecutionStatus.JOB_EXECUTION_STATUS_RUNNING
+      case JobExecutionStatus.SUCCEEDED => 
GJobExecutionStatus.JOB_EXECUTION_STATUS_SUCCEEDED
+      case JobExecutionStatus.FAILED => 
GJobExecutionStatus.JOB_EXECUTION_STATUS_FAILED
+      case JobExecutionStatus.UNKNOWN => 
GJobExecutionStatus.JOB_EXECUTION_STATUS_UNKNOWN
+    }
+  }
+
+  def deserialize(binary: GJobExecutionStatus): JobExecutionStatus = {
+    binary match {
+      case GJobExecutionStatus.JOB_EXECUTION_STATUS_RUNNING => 
JobExecutionStatus.RUNNING
+      case GJobExecutionStatus.JOB_EXECUTION_STATUS_SUCCEEDED => 
JobExecutionStatus.SUCCEEDED
+      case GJobExecutionStatus.JOB_EXECUTION_STATUS_FAILED => 
JobExecutionStatus.FAILED
+      case GJobExecutionStatus.JOB_EXECUTION_STATUS_UNKNOWN => 
JobExecutionStatus.UNKNOWN
+      case _ => null
+    }
+  }
+}
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
index 8975062082c..44622514ac9 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
@@ -21,6 +21,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.rdd.DeterministicLevel
 import org.apache.spark.status.{RDDOperationClusterWrapper, 
RDDOperationGraphWrapper}
+import org.apache.spark.status.protobuf.StoreTypes.{DeterministicLevel => 
GDeterministicLevel}
 import org.apache.spark.ui.scope.{RDDOperationEdge, RDDOperationNode}
 
 class RDDOperationGraphWrapperSerializer extends ProtobufSerDe {
@@ -81,8 +82,8 @@ class RDDOperationGraphWrapperSerializer extends 
ProtobufSerDe {
   }
 
   private def serializeRDDOperationNode(node: RDDOperationNode): 
StoreTypes.RDDOperationNode = {
-    val outputDeterministicLevel = 
StoreTypes.RDDOperationNode.DeterministicLevel
-      .valueOf(node.outputDeterministicLevel.toString)
+    val outputDeterministicLevel = DeterministicLevelSerializer.serialize(
+      node.outputDeterministicLevel)
     val builder = StoreTypes.RDDOperationNode.newBuilder()
     builder.setId(node.id)
     builder.setName(node.name)
@@ -100,8 +101,8 @@ class RDDOperationGraphWrapperSerializer extends 
ProtobufSerDe {
       cached = node.getCached,
       barrier = node.getBarrier,
       callsite = node.getCallsite,
-      outputDeterministicLevel =
-        DeterministicLevel.withName(node.getOutputDeterministicLevel.toString)
+      outputDeterministicLevel = DeterministicLevelSerializer.deserialize(
+        node.getOutputDeterministicLevel)
     )
   }
 
@@ -118,3 +119,29 @@ class RDDOperationGraphWrapperSerializer extends 
ProtobufSerDe {
       toId = edge.getToId)
   }
 }
+
+private[protobuf] object DeterministicLevelSerializer {
+
+  def serialize(input: DeterministicLevel.Value): GDeterministicLevel = {
+    input match {
+      case DeterministicLevel.DETERMINATE =>
+        GDeterministicLevel.DETERMINISTIC_LEVEL_DETERMINATE
+      case DeterministicLevel.UNORDERED =>
+        GDeterministicLevel.DETERMINISTIC_LEVEL_UNORDERED
+      case DeterministicLevel.INDETERMINATE =>
+        GDeterministicLevel.DETERMINISTIC_LEVEL_INDETERMINATE
+    }
+  }
+
+  def deserialize(binary: GDeterministicLevel): DeterministicLevel.Value = {
+    binary match {
+      case GDeterministicLevel.DETERMINISTIC_LEVEL_DETERMINATE =>
+        DeterministicLevel.DETERMINATE
+      case GDeterministicLevel.DETERMINISTIC_LEVEL_UNORDERED =>
+        DeterministicLevel.UNORDERED
+      case GDeterministicLevel.DETERMINISTIC_LEVEL_INDETERMINATE =>
+        DeterministicLevel.INDETERMINATE
+      case _ => null
+    }
+  }
+}
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/StageStatusSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/StageStatusSerializer.scala
index 6014379bb1e..fbd874cf541 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/StageStatusSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/StageStatusSerializer.scala
@@ -17,19 +17,29 @@
 
 package org.apache.spark.status.protobuf
 
-import org.apache.commons.lang3.StringUtils
-
 import org.apache.spark.status.api.v1.StageStatus
+import org.apache.spark.status.protobuf.StoreTypes.{StageStatus => 
GStageStatus}
 
 private[protobuf] object StageStatusSerializer {
 
-  private def PREFIX = "STAGE_STATUS_"
-
-  def serialize(input: StageStatus): StoreTypes.StageStatus = {
-    StoreTypes.StageStatus.valueOf(PREFIX + input.toString)
+  def serialize(input: StageStatus): GStageStatus = {
+    input match {
+      case StageStatus.ACTIVE => GStageStatus.STAGE_STATUS_ACTIVE
+      case StageStatus.COMPLETE => GStageStatus.STAGE_STATUS_COMPLETE
+      case StageStatus.FAILED => GStageStatus.STAGE_STATUS_FAILED
+      case StageStatus.PENDING => GStageStatus.STAGE_STATUS_PENDING
+      case StageStatus.SKIPPED => GStageStatus.STAGE_STATUS_SKIPPED
+    }
   }
 
-  def deserialize(binary: StoreTypes.StageStatus): StageStatus = {
-    StageStatus.valueOf(StringUtils.removeStart(binary.toString, PREFIX))
+  def deserialize(binary: GStageStatus): StageStatus = {
+    binary match {
+      case GStageStatus.STAGE_STATUS_ACTIVE => StageStatus.ACTIVE
+      case GStageStatus.STAGE_STATUS_COMPLETE => StageStatus.COMPLETE
+      case GStageStatus.STAGE_STATUS_FAILED => StageStatus.FAILED
+      case GStageStatus.STAGE_STATUS_PENDING => StageStatus.PENDING
+      case GStageStatus.STAGE_STATUS_SKIPPED => StageStatus.SKIPPED
+      case _ => null
+    }
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
index 77b6f8925cb..7a4a3e2a55d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
@@ -21,9 +21,8 @@ import java.util.Date
 
 import collection.JavaConverters._
 
-import org.apache.spark.JobExecutionStatus
 import org.apache.spark.sql.execution.ui.SQLExecutionUIData
-import org.apache.spark.status.protobuf.{ProtobufSerDe, StoreTypes}
+import org.apache.spark.status.protobuf.{JobExecutionStatusSerializer, 
ProtobufSerDe, StoreTypes}
 import org.apache.spark.status.protobuf.Utils.getOptional
 
 class SQLExecutionUIDataSerializer extends ProtobufSerDe {
@@ -46,7 +45,7 @@ class SQLExecutionUIDataSerializer extends ProtobufSerDe {
     ui.errorMessage.foreach(builder.setErrorMessage)
     ui.jobs.foreach {
       case (id, status) =>
-        builder.putJobs(id.toLong, 
StoreTypes.JobExecutionStatus.valueOf(status.toString))
+        builder.putJobs(id.toLong, 
JobExecutionStatusSerializer.serialize(status))
     }
     ui.stages.foreach(stageId => builder.addStages(stageId.toLong))
     val metricValues = ui.metricValues
@@ -66,7 +65,7 @@ class SQLExecutionUIDataSerializer extends ProtobufSerDe {
     val metrics =
       ui.getMetricsList.asScala.map(m => 
SQLPlanMetricSerializer.deserialize(m))
     val jobs = ui.getJobsMap.asScala.map {
-      case (jobId, status) => jobId.toInt -> 
JobExecutionStatus.valueOf(status.toString)
+      case (jobId, status) => jobId.toInt -> 
JobExecutionStatusSerializer.deserialize(status)
     }.toMap
     val metricValues = ui.getMetricValuesMap.asScala.map {
       case (k, v) => k.toLong -> v


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to