This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 984cfa162da [SPARK-42139][CORE][SQL] Handle null string values in 
SQLExecutionUIData/SparkPlanGraphWrapper/SQLPlanMetric
984cfa162da is described below

commit 984cfa162da99a19dd169ae5ecc4b568e11fe4b1
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Sun Jan 22 13:48:41 2023 -0800

    [SPARK-42139][CORE][SQL] Handle null string values in 
SQLExecutionUIData/SparkPlanGraphWrapper/SQLPlanMetric
    
    ### What changes were proposed in this pull request?
    Similar to #39666, this PR handles null string values in 
SQLExecutionUIData/SparkPlanGraphWrapper/SQLPlanMetric
    
    ### Why are the changes needed?
    Properly handles null string values in the protobuf serializer.
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    New UTs
    
    Closes #39682 from LuciferYang/SPARK-42139.
    
    Authored-by: yangjie01 <yangji...@baidu.com>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../apache/spark/status/protobuf/store_types.proto |  18 ++--
 .../sql/SQLExecutionUIDataSerializer.scala         |  15 +--
 .../protobuf/sql/SQLPlanMetricSerializer.scala     |  18 ++--
 .../sql/SparkPlanGraphWrapperSerializer.scala      |  18 ++--
 .../sql/KVStoreProtobufSerializerSuite.scala       | 118 ++++++++++++++-------
 5 files changed, 121 insertions(+), 66 deletions(-)

diff --git 
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto 
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index 155e73de056..ab6861057c9 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -403,17 +403,17 @@ message ExecutorSummaryWrapper {
 }
 
 message SQLPlanMetric {
-  string name = 1;
+  optional string name = 1;
   int64 accumulator_id = 2;
-  string metric_type = 3;
+  optional string metric_type = 3;
 }
 
 message SQLExecutionUIData {
   int64 execution_id = 1;
   int64 root_execution_id = 2;
-  string description = 3;
-  string details = 4;
-  string physical_plan_description = 5;
+  optional string description = 3;
+  optional string details = 4;
+  optional string physical_plan_description = 5;
   map<string, string> modified_configs = 6;
   repeated SQLPlanMetric metrics = 7;
   int64 submission_time = 8;
@@ -427,15 +427,15 @@ message SQLExecutionUIData {
 
 message SparkPlanGraphNode {
   int64 id = 1;
-  string name = 2;
-  string desc = 3;
+  optional string name = 2;
+  optional string desc = 3;
   repeated SQLPlanMetric metrics = 4;
 }
 
 message SparkPlanGraphClusterWrapper {
   int64 id = 1;
-  string name = 2;
-  string desc = 3;
+  optional string name = 2;
+  optional string desc = 3;
   repeated SparkPlanGraphNodeWrapper nodes = 4;
   repeated SQLPlanMetric metrics = 5;
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
index dc76ab9a4e9..f0cdca985b7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala
@@ -23,7 +23,7 @@ import collection.JavaConverters._
 
 import org.apache.spark.sql.execution.ui.SQLExecutionUIData
 import org.apache.spark.status.protobuf.{JobExecutionStatusSerializer, 
ProtobufSerDe, StoreTypes}
-import org.apache.spark.status.protobuf.Utils.getOptional
+import org.apache.spark.status.protobuf.Utils._
 
 class SQLExecutionUIDataSerializer extends ProtobufSerDe[SQLExecutionUIData] {
 
@@ -31,9 +31,9 @@ class SQLExecutionUIDataSerializer extends 
ProtobufSerDe[SQLExecutionUIData] {
     val builder = StoreTypes.SQLExecutionUIData.newBuilder()
     builder.setExecutionId(ui.executionId)
     builder.setRootExecutionId(ui.rootExecutionId)
-    Option(ui.description).foreach(builder.setDescription)
-    Option(ui.details).foreach(builder.setDetails)
-    
Option(ui.physicalPlanDescription).foreach(builder.setPhysicalPlanDescription)
+    setStringField(ui.description, builder.setDescription)
+    setStringField(ui.details, builder.setDetails)
+    setStringField(ui.physicalPlanDescription, 
builder.setPhysicalPlanDescription)
     if (ui.modifiedConfigs != null) {
       ui.modifiedConfigs.foreach {
         case (k, v) => builder.putModifiedConfigs(k, v)
@@ -81,9 +81,10 @@ class SQLExecutionUIDataSerializer extends 
ProtobufSerDe[SQLExecutionUIData] {
     new SQLExecutionUIData(
       executionId = ui.getExecutionId,
       rootExecutionId = ui.getRootExecutionId,
-      description = ui.getDescription,
-      details = ui.getDetails,
-      physicalPlanDescription = ui.getPhysicalPlanDescription,
+      description = getStringField(ui.hasDescription, () => ui.getDescription),
+      details = getStringField(ui.hasDetails, () => ui.getDetails),
+      physicalPlanDescription =
+        getStringField(ui.hasPhysicalPlanDescription, () => 
ui.getPhysicalPlanDescription),
       modifiedConfigs = ui.getModifiedConfigsMap.asScala.toMap,
       metrics = metrics,
       submissionTime = ui.getSubmissionTime,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLPlanMetricSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLPlanMetricSerializer.scala
index 8886bba2f92..88ba51c52b4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLPlanMetricSerializer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLPlanMetricSerializer.scala
@@ -19,18 +19,24 @@ package org.apache.spark.status.protobuf.sql
 
 import org.apache.spark.sql.execution.ui.SQLPlanMetric
 import org.apache.spark.status.protobuf.StoreTypes
+import org.apache.spark.status.protobuf.Utils._
+import org.apache.spark.util.Utils.weakIntern
 
 object SQLPlanMetricSerializer {
 
   def serialize(metric: SQLPlanMetric): StoreTypes.SQLPlanMetric = {
-    StoreTypes.SQLPlanMetric.newBuilder()
-      .setName(metric.name)
-      .setAccumulatorId(metric.accumulatorId)
-      .setMetricType(metric.metricType)
-      .build()
+    val builder = StoreTypes.SQLPlanMetric.newBuilder()
+    setStringField(metric.name, builder.setName)
+    builder.setAccumulatorId(metric.accumulatorId)
+    setStringField(metric.metricType, builder.setMetricType)
+    builder.build()
   }
 
   def deserialize(metrics: StoreTypes.SQLPlanMetric): SQLPlanMetric = {
-    SQLPlanMetric(metrics.getName, metrics.getAccumulatorId, 
metrics.getMetricType)
+    SQLPlanMetric(
+      name = getStringField(metrics.hasName, () => 
weakIntern(metrics.getName)),
+      accumulatorId = metrics.getAccumulatorId,
+      metricType = getStringField(metrics.hasMetricType, () => 
weakIntern(metrics.getMetricType))
+    )
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
index 1df82e3246a..bff5c0d7619 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
@@ -22,6 +22,8 @@ import collection.JavaConverters._
 import org.apache.spark.sql.execution.ui.{SparkPlanGraphClusterWrapper, 
SparkPlanGraphEdge, SparkPlanGraphNode, SparkPlanGraphNodeWrapper, 
SparkPlanGraphWrapper}
 import org.apache.spark.status.protobuf.ProtobufSerDe
 import org.apache.spark.status.protobuf.StoreTypes
+import org.apache.spark.status.protobuf.Utils.{getStringField, setStringField}
+import org.apache.spark.util.Utils.weakIntern
 
 class SparkPlanGraphWrapperSerializer extends 
ProtobufSerDe[SparkPlanGraphWrapper] {
 
@@ -92,8 +94,8 @@ class SparkPlanGraphWrapperSerializer extends 
ProtobufSerDe[SparkPlanGraphWrappe
     StoreTypes.SparkPlanGraphNode = {
     val builder = StoreTypes.SparkPlanGraphNode.newBuilder()
     builder.setId(node.id)
-    builder.setName(node.name)
-    builder.setDesc(node.desc)
+    setStringField(node.name, builder.setName)
+    setStringField(node.desc, builder.setDesc)
     node.metrics.foreach { metric =>
       builder.addMetrics(SQLPlanMetricSerializer.serialize(metric))
     }
@@ -105,8 +107,8 @@ class SparkPlanGraphWrapperSerializer extends 
ProtobufSerDe[SparkPlanGraphWrappe
 
     new SparkPlanGraphNode(
       id = node.getId,
-      name = node.getName,
-      desc = node.getDesc,
+      name = getStringField(node.hasName, () => weakIntern(node.getName)),
+      desc = getStringField(node.hasDesc, () => node.getDesc),
       metrics = 
node.getMetricsList.asScala.map(SQLPlanMetricSerializer.deserialize)
     )
   }
@@ -115,8 +117,8 @@ class SparkPlanGraphWrapperSerializer extends 
ProtobufSerDe[SparkPlanGraphWrappe
     StoreTypes.SparkPlanGraphClusterWrapper = {
     val builder = StoreTypes.SparkPlanGraphClusterWrapper.newBuilder()
     builder.setId(cluster.id)
-    builder.setName(cluster.name)
-    builder.setDesc(cluster.desc)
+    setStringField(cluster.name, builder.setName)
+    setStringField(cluster.desc, builder.setDesc)
     cluster.nodes.foreach { node =>
       builder.addNodes(serializeSparkPlanGraphNodeWrapper(node))
     }
@@ -131,8 +133,8 @@ class SparkPlanGraphWrapperSerializer extends 
ProtobufSerDe[SparkPlanGraphWrappe
 
     new SparkPlanGraphClusterWrapper(
       id = cluster.getId,
-      name = cluster.getName,
-      desc = cluster.getDesc,
+      name = getStringField(cluster.hasName, () => 
weakIntern(cluster.getName)),
+      desc = getStringField(cluster.hasDesc, () => cluster.getDesc),
       nodes = 
cluster.getNodesList.asScala.map(deserializeSparkPlanGraphNodeWrapper),
       metrics = 
cluster.getMetricsList.asScala.map(SQLPlanMetricSerializer.deserialize)
     )
diff --git 
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
index 41f185900ad..c220ca1c96f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
@@ -30,22 +30,39 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite {
   private val serializer = new KVStoreProtobufSerializer()
 
   test("SQLExecutionUIData") {
-    val input = SqlResourceSuite.sqlExecutionUIData
-    val bytes = serializer.serialize(input)
-    val result = serializer.deserialize(bytes, classOf[SQLExecutionUIData])
-    assert(result.executionId == input.executionId)
-    assert(result.rootExecutionId == input.rootExecutionId)
-    assert(result.description == input.description)
-    assert(result.details == input.details)
-    assert(result.physicalPlanDescription == input.physicalPlanDescription)
-    assert(result.modifiedConfigs == input.modifiedConfigs)
-    assert(result.metrics == input.metrics)
-    assert(result.submissionTime == input.submissionTime)
-    assert(result.completionTime == input.completionTime)
-    assert(result.errorMessage == input.errorMessage)
-    assert(result.jobs == input.jobs)
-    assert(result.stages == input.stages)
-    assert(result.metricValues == input.metricValues)
+    val normal = SqlResourceSuite.sqlExecutionUIData
+    val withNull = new SQLExecutionUIData(
+      executionId = normal.executionId,
+      rootExecutionId = normal.rootExecutionId,
+      description = null,
+      details = null,
+      physicalPlanDescription = null,
+      modifiedConfigs = normal.modifiedConfigs,
+      metrics = Seq(SQLPlanMetric(null, 0, null)),
+      submissionTime = normal.submissionTime,
+      completionTime = normal.completionTime,
+      errorMessage = normal.errorMessage,
+      jobs = normal.jobs,
+      stages = normal.stages,
+      metricValues = normal.metricValues
+    )
+    Seq(normal, withNull).foreach { input =>
+      val bytes = serializer.serialize(input)
+      val result = serializer.deserialize(bytes, classOf[SQLExecutionUIData])
+      assert(result.executionId == input.executionId)
+      assert(result.rootExecutionId == input.rootExecutionId)
+      assert(result.description == input.description)
+      assert(result.details == input.details)
+      assert(result.physicalPlanDescription == input.physicalPlanDescription)
+      assert(result.modifiedConfigs == input.modifiedConfigs)
+      assert(result.metrics == input.metrics)
+      assert(result.submissionTime == input.submissionTime)
+      assert(result.completionTime == input.completionTime)
+      assert(result.errorMessage == input.errorMessage)
+      assert(result.jobs == input.jobs)
+      assert(result.stages == input.stages)
+      assert(result.metricValues == input.metricValues)
+    }
   }
 
   test("SQLExecutionUIData with metricValues is empty map and null") {
@@ -93,30 +110,59 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite 
{
   }
 
   test("Spark Plan Graph") {
+    val node0: SparkPlanGraphNodeWrapper = new SparkPlanGraphNodeWrapper(
+      node = new SparkPlanGraphNode(
+        id = 12,
+        name = "name_12",
+        desc = "desc_12",
+        metrics = Seq(
+          SQLPlanMetric(
+            name = "name_13",
+            accumulatorId = 13,
+            metricType = "metric_13"
+          ),
+          SQLPlanMetric(
+            name = "name_14",
+            accumulatorId = 14,
+            metricType = "metric_14"
+          )
+        )
+      ),
+      cluster = null
+    )
+
+    val node1: SparkPlanGraphNodeWrapper = new SparkPlanGraphNodeWrapper(
+      node = new SparkPlanGraphNode(
+        id = 13,
+        name = null,
+        desc = null,
+        metrics = Seq(
+          SQLPlanMetric(
+            name = null,
+            accumulatorId = 13,
+            metricType = null
+          )
+        )
+      ),
+      cluster = null
+    )
+
+    val node2: SparkPlanGraphNodeWrapper = new SparkPlanGraphNodeWrapper(
+      node = null,
+      cluster = new SparkPlanGraphClusterWrapper(
+        id = 6,
+        name = null,
+        desc = null,
+        nodes = Seq.empty,
+        metrics = Seq.empty
+      )
+    )
+
     val cluster = new SparkPlanGraphClusterWrapper(
       id = 5,
       name = "name_5",
       desc = "desc_5",
-      nodes = Seq(new SparkPlanGraphNodeWrapper(
-        node = new SparkPlanGraphNode(
-          id = 12,
-          name = "name_12",
-          desc = "desc_12",
-          metrics = Seq(
-            SQLPlanMetric(
-              name = "name_13",
-              accumulatorId = 13,
-              metricType = "metric_13"
-            ),
-            SQLPlanMetric(
-              name = "name_14",
-              accumulatorId = 14,
-              metricType = "metric_14"
-            )
-          )
-        ),
-        cluster = null
-      )),
+      nodes = Seq(node0, node1, node2),
       metrics = Seq(
         SQLPlanMetric(
           name = "name_6",


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to