This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 88fc48f5e7e [SPARK-41431][CORE][SQL][UI] Protobuf serializer for 
`SQLExecutionUIData`
88fc48f5e7e is described below

commit 88fc48f5e7e907c25d082a7b35231744ccef2c7e
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Fri Dec 23 15:53:40 2022 -0800

    [SPARK-41431][CORE][SQL][UI] Protobuf serializer for `SQLExecutionUIData`
    
    ### What changes were proposed in this pull request?
    Add Protobuf serializer for `SQLExecutionUIData`
    
    ### Why are the changes needed?
    Support fast and compact serialization/deserialization for 
`SQLExecutionUIData` over RocksDB.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Add new UT
    
    Closes #39139 from LuciferYang/SPARK-41431.
    
    Authored-by: yangjie01 <yangji...@baidu.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../apache/spark/status/protobuf/store_types.proto | 21 +++++
 sql/core/pom.xml                                   |  5 ++
 .../org.apache.spark.status.protobuf.ProtobufSerDe | 18 +++++
 .../sql/SQLExecutionUIDataSerializer.scala         | 90 ++++++++++++++++++++++
 .../protobuf/sql/SQLPlanMetricSerializer.scala     | 36 +++++++++
 .../sql/KVStoreProtobufSerializerSuite.scala       | 88 +++++++++++++++++++++
 6 files changed, 258 insertions(+)

diff --git 
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto 
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index 7cf5c2921cb..cb0dea540bd 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -355,3 +355,24 @@ message ExecutorSummary {
 message ExecutorSummaryWrapper {
   ExecutorSummary info = 1;
 }
+
+message SQLPlanMetric {
+  string name = 1;
+  int64 accumulator_id = 2;
+  string metric_type = 3;
+}
+
+message SQLExecutionUIData {
+  int64 execution_id = 1;
+  string description = 2;
+  string details = 3;
+  string physical_plan_description = 4;
+  map<string, string> modified_configs = 5;
+  repeated SQLPlanMetric metrics = 6;
+  int64 submission_time = 7;
+  optional int64 completion_time = 8;
+  optional string error_message = 9;
+  map<int64, JobExecutionStatus> jobs = 10;
+  repeated int64 stages = 11;
+  map<int64, string> metric_values = 12;
+}
diff --git a/sql/core/pom.xml b/sql/core/pom.xml
index cfcf7455ad0..71c57f8a7f7 100644
--- a/sql/core/pom.xml
+++ b/sql/core/pom.xml
@@ -147,6 +147,11 @@
       <groupId>org.apache.xbean</groupId>
       <artifactId>xbean-asm9-shaded</artifactId>
     </dependency>
+    <dependency>
+      <groupId>com.google.protobuf</groupId>
+      <artifactId>protobuf-java</artifactId>
+      <version>${protobuf.version}</version>
+    </dependency>
     <dependency>
       <groupId>org.scalacheck</groupId>
       <artifactId>scalacheck_${scala.binary.version}</artifactId>
diff --git 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
new file mode 100644
index 00000000000..de5f2c2d05c
--- /dev/null
+++ 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe
@@ -0,0 +1,18 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.spark.status.protobuf.sql.SQLExecutionUIDataSerializer
diff --git 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
new file mode 100644
index 00000000000..8dc28517ff0
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf.sql
+
+import java.util.Date
+
+import collection.JavaConverters._
+
+import org.apache.spark.JobExecutionStatus
+import org.apache.spark.sql.execution.ui.SQLExecutionUIData
+import org.apache.spark.status.protobuf.{ProtobufSerDe, StoreTypes}
+import org.apache.spark.status.protobuf.Utils.getOptional
+
+class SQLExecutionUIDataSerializer extends ProtobufSerDe {
+
+  override val supportClass: Class[_] = classOf[SQLExecutionUIData]
+
+  override def serialize(input: Any): Array[Byte] = {
+    val ui = input.asInstanceOf[SQLExecutionUIData]
+    val builder = StoreTypes.SQLExecutionUIData.newBuilder()
+    builder.setExecutionId(ui.executionId)
+    builder.setDescription(ui.description)
+    builder.setDetails(ui.details)
+    builder.setPhysicalPlanDescription(ui.physicalPlanDescription)
+    ui.modifiedConfigs.foreach {
+      case (k, v) => builder.putModifiedConfigs(k, v)
+    }
+    ui.metrics.foreach(m => 
builder.addMetrics(SQLPlanMetricSerializer.serialize(m)))
+    builder.setSubmissionTime(ui.submissionTime)
+    ui.completionTime.foreach(ct => builder.setCompletionTime(ct.getTime))
+    ui.errorMessage.foreach(builder.setErrorMessage)
+    ui.jobs.foreach {
+      case (id, status) =>
+        builder.putJobs(id.toLong, 
StoreTypes.JobExecutionStatus.valueOf(status.toString))
+    }
+    ui.stages.foreach(stageId => builder.addStages(stageId.toLong))
+    val metricValues = ui.metricValues
+    if (metricValues != null) {
+      metricValues.foreach {
+        case (k, v) => builder.putMetricValues(k, v)
+      }
+    }
+    builder.build().toByteArray
+  }
+
+  override def deserialize(bytes: Array[Byte]): SQLExecutionUIData = {
+    val ui = StoreTypes.SQLExecutionUIData.parseFrom(bytes)
+    val completionTime =
+      getOptional(ui.hasCompletionTime, () => new Date(ui.getCompletionTime))
+    val errorMessage = getOptional(ui.hasErrorMessage, () => 
ui.getErrorMessage)
+    val metrics =
+      ui.getMetricsList.asScala.map(m => 
SQLPlanMetricSerializer.deserialize(m)).toSeq
+    val jobs = ui.getJobsMap.asScala.map {
+      case (jobId, status) => jobId.toInt -> 
JobExecutionStatus.valueOf(status.toString)
+    }.toMap
+    val metricValues = ui.getMetricValuesMap.asScala.map {
+      case (k, v) => k.toLong -> v
+    }.toMap
+
+    new SQLExecutionUIData(
+      executionId = ui.getExecutionId,
+      description = ui.getDescription,
+      details = ui.getDetails,
+      physicalPlanDescription = ui.getPhysicalPlanDescription,
+      modifiedConfigs = ui.getModifiedConfigsMap.asScala.toMap,
+      metrics = metrics,
+      submissionTime = ui.getSubmissionTime,
+      completionTime = completionTime,
+      errorMessage = errorMessage,
+      jobs = jobs,
+      stages = ui.getStagesList.asScala.map(_.toInt).toSet,
+      metricValues = metricValues
+    )
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLPlanMetricSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLPlanMetricSerializer.scala
new file mode 100644
index 00000000000..8886bba2f92
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLPlanMetricSerializer.scala
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf.sql
+
+import org.apache.spark.sql.execution.ui.SQLPlanMetric
+import org.apache.spark.status.protobuf.StoreTypes
+
+object SQLPlanMetricSerializer {
+
+  def serialize(metric: SQLPlanMetric): StoreTypes.SQLPlanMetric = {
+    StoreTypes.SQLPlanMetric.newBuilder()
+      .setName(metric.name)
+      .setAccumulatorId(metric.accumulatorId)
+      .setMetricType(metric.metricType)
+      .build()
+  }
+
+  def deserialize(metrics: StoreTypes.SQLPlanMetric): SQLPlanMetric = {
+    SQLPlanMetric(metrics.getName, metrics.getAccumulatorId, 
metrics.getMetricType)
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
new file mode 100644
index 00000000000..9d6a938c3fe
--- /dev/null
+++ 
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.status.protobuf.sql
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.execution.ui.SQLExecutionUIData
+import org.apache.spark.status.api.v1.sql.SqlResourceSuite
+import org.apache.spark.status.protobuf.KVStoreProtobufSerializer
+
+class KVStoreProtobufSerializerSuite extends SparkFunSuite {
+
+  private val serializer = new KVStoreProtobufSerializer()
+
+  test("SQLExecutionUIData") {
+    val input = SqlResourceSuite.sqlExecutionUIData
+    val bytes = serializer.serialize(input)
+    val result = serializer.deserialize(bytes, classOf[SQLExecutionUIData])
+    assert(result.executionId == input.executionId)
+    assert(result.description == input.description)
+    assert(result.details == input.details)
+    assert(result.physicalPlanDescription == input.physicalPlanDescription)
+    assert(result.modifiedConfigs == input.modifiedConfigs)
+    assert(result.metrics == input.metrics)
+    assert(result.submissionTime == input.submissionTime)
+    assert(result.completionTime == input.completionTime)
+    assert(result.errorMessage == input.errorMessage)
+    assert(result.jobs == input.jobs)
+    assert(result.stages == input.stages)
+    assert(result.metricValues == input.metricValues)
+  }
+
+  test("SQLExecutionUIData with metricValues is empty map and null") {
+    val templateData = SqlResourceSuite.sqlExecutionUIData
+
+    val input1 = new SQLExecutionUIData(
+      executionId = templateData.executionId,
+      description = templateData.description,
+      details = templateData.details,
+      physicalPlanDescription = templateData.physicalPlanDescription,
+      modifiedConfigs = templateData.modifiedConfigs,
+      metrics = templateData.metrics,
+      submissionTime = templateData.submissionTime,
+      completionTime = templateData.completionTime,
+      errorMessage = templateData.errorMessage,
+      jobs = templateData.jobs,
+      stages = templateData.stages,
+      metricValues = Map.empty
+    )
+    val bytes1 = serializer.serialize(input1)
+    val result1 = serializer.deserialize(bytes1, classOf[SQLExecutionUIData])
+    // input.metricValues is empty map, result.metricValues is empty map.
+    assert(result1.metricValues.isEmpty)
+
+    val input2 = new SQLExecutionUIData(
+      executionId = templateData.executionId,
+      description = templateData.description,
+      details = templateData.details,
+      physicalPlanDescription = templateData.physicalPlanDescription,
+      modifiedConfigs = templateData.modifiedConfigs,
+      metrics = templateData.metrics,
+      submissionTime = templateData.submissionTime,
+      completionTime = templateData.completionTime,
+      errorMessage = templateData.errorMessage,
+      jobs = templateData.jobs,
+      stages = templateData.stages,
+      metricValues = null
+    )
+    val bytes2 = serializer.serialize(input2)
+    val result2 = serializer.deserialize(bytes2, classOf[SQLExecutionUIData])
+    // input.metricValues is null, result.metricValues is also empty map.
+    assert(result2.metricValues.isEmpty)
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to