This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 73b0c71fe33 [SPARK-42153][UI] Handle null string values in 
PairStrings/RDDOperationNode/RDDOperationClusterWrapper
73b0c71fe33 is described below

commit 73b0c71fe33704dd640a6008342358da67f50390
Author: Gengliang Wang <gengli...@apache.org>
AuthorDate: Sat Jan 21 20:09:26 2023 -0800

    [SPARK-42153][UI] Handle null string values in 
PairStrings/RDDOperationNode/RDDOperationClusterWrapper
    
    ### What changes were proposed in this pull request?
    Similar to #39666, this PR handles null string values in 
PairStrings/RDDOperationNode/RDDOperationClusterWrapper
    
    ### Why are the changes needed?
    Properly handles null string values in the protobuf serializer.
    
    ### Does this PR introduce any user-facing change?
    No
    
    ### How was this patch tested?
    New UTs
    
    Closes #39696 from gengliangwang/moreNull3.
    
    Authored-by: Gengliang Wang <gengli...@apache.org>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../apache/spark/status/protobuf/store_types.proto | 12 ++---
 ...plicationEnvironmentInfoWrapperSerializer.scala |  7 +--
 .../RDDOperationGraphWrapperSerializer.scala       | 17 ++++---
 .../protobuf/KVStoreProtobufSerializerSuite.scala  | 57 +++++++++++++++-------
 4 files changed, 59 insertions(+), 34 deletions(-)

diff --git 
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto 
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index 49076790321..155e73de056 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -185,8 +185,8 @@ message RuntimeInfo {
 }
 
 message PairStrings {
-  string value1 = 1;
-  string value2 = 2;
+  optional string value1 = 1;
+  optional string value2 = 2;
 }
 
 message ApplicationEnvironmentInfo {
@@ -472,16 +472,16 @@ enum DeterministicLevel {
 
 message RDDOperationNode {
   int32 id = 1;
-  string name = 2;
+  optional string name = 2;
   bool cached = 3;
   bool barrier = 4;
-  string callsite = 5;
+  optional string callsite = 5;
   DeterministicLevel output_deterministic_level = 6;
 }
 
 message RDDOperationClusterWrapper {
-  string id = 1;
-  string name = 2;
+  optional string id = 1;
+  optional string name = 2;
   repeated RDDOperationNode child_nodes = 3;
   repeated RDDOperationClusterWrapper child_clusters = 4;
 }
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
index fbbc55387b8..63c8387a8db 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
@@ -81,7 +81,8 @@ class ApplicationEnvironmentInfoWrapperSerializer
       scalaVersion = getStringField(rt.hasScalaVersion, () => 
rt.getScalaVersion)
     )
     val pairSSToTuple = (pair: StoreTypes.PairStrings) => {
-      (pair.getValue1, pair.getValue2)
+      (getStringField(pair.hasValue1, pair.getValue1),
+        getStringField(pair.hasValue2, pair.getValue2))
     }
     new ApplicationEnvironmentInfo(
       runtime = runtime,
@@ -97,8 +98,8 @@ class ApplicationEnvironmentInfoWrapperSerializer
 
   private def serializePairStrings(pair: (String, String)): 
StoreTypes.PairStrings = {
     val builder = StoreTypes.PairStrings.newBuilder()
-    builder.setValue1(pair._1)
-    builder.setValue2(pair._2)
+    setStringField(pair._1, builder.setValue1)
+    setStringField(pair._2, builder.setValue2)
     builder.build()
   }
 
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
index f822ed1889a..3187b255d4c 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
@@ -22,6 +22,7 @@ import scala.collection.JavaConverters._
 import org.apache.spark.rdd.DeterministicLevel
 import org.apache.spark.status.{RDDOperationClusterWrapper, 
RDDOperationGraphWrapper}
 import org.apache.spark.status.protobuf.StoreTypes.{DeterministicLevel => 
GDeterministicLevel}
+import org.apache.spark.status.protobuf.Utils.{getStringField, setStringField}
 import org.apache.spark.ui.scope.{RDDOperationEdge, RDDOperationNode}
 
 class RDDOperationGraphWrapperSerializer extends 
ProtobufSerDe[RDDOperationGraphWrapper] {
@@ -56,8 +57,8 @@ class RDDOperationGraphWrapperSerializer extends 
ProtobufSerDe[RDDOperationGraph
   private def serializeRDDOperationClusterWrapper(op: 
RDDOperationClusterWrapper):
     StoreTypes.RDDOperationClusterWrapper = {
     val builder = StoreTypes.RDDOperationClusterWrapper.newBuilder()
-    builder.setId(op.id)
-    builder.setName(op.name)
+    setStringField(op.id, builder.setId)
+    setStringField(op.name, builder.setName)
     op.childNodes.foreach { node =>
       builder.addChildNodes(serializeRDDOperationNode(node))
     }
@@ -70,8 +71,8 @@ class RDDOperationGraphWrapperSerializer extends 
ProtobufSerDe[RDDOperationGraph
   private def deserializeRDDOperationClusterWrapper(op: 
StoreTypes.RDDOperationClusterWrapper):
     RDDOperationClusterWrapper = {
     new RDDOperationClusterWrapper(
-      id = op.getId,
-      name = op.getName,
+      id = getStringField(op.hasId, op.getId),
+      name = getStringField(op.hasName, op.getName),
       childNodes = 
op.getChildNodesList.asScala.map(deserializeRDDOperationNode),
       childClusters =
         
op.getChildClustersList.asScala.map(deserializeRDDOperationClusterWrapper)
@@ -83,10 +84,10 @@ class RDDOperationGraphWrapperSerializer extends 
ProtobufSerDe[RDDOperationGraph
       node.outputDeterministicLevel)
     val builder = StoreTypes.RDDOperationNode.newBuilder()
     builder.setId(node.id)
-    builder.setName(node.name)
+    setStringField(node.name, builder.setName)
+    setStringField(node.callsite, builder.setCallsite)
     builder.setCached(node.cached)
     builder.setBarrier(node.barrier)
-    builder.setCallsite(node.callsite)
     builder.setOutputDeterministicLevel(outputDeterministicLevel)
     builder.build()
   }
@@ -94,10 +95,10 @@ class RDDOperationGraphWrapperSerializer extends 
ProtobufSerDe[RDDOperationGraph
   private def deserializeRDDOperationNode(node: StoreTypes.RDDOperationNode): 
RDDOperationNode = {
     RDDOperationNode(
       id = node.getId,
-      name = node.getName,
+      name = getStringField(node.hasName, node.getName),
       cached = node.getCached,
       barrier = node.getBarrier,
-      callsite = node.getCallsite,
+      callsite = getStringField(node.hasCallsite, node.getCallsite),
       outputDeterministicLevel = DeterministicLevelSerializer.deserialize(
         node.getOutputDeterministicLevel)
     )
diff --git 
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
 
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
index 14dd2cd601d..d4c79adf2ec 100644
--- 
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
@@ -259,9 +259,11 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite 
{
           javaVersion = javaVersion,
           javaHome = javaHome,
           scalaVersion = scalaVersion),
-        sparkProperties = Seq(("spark.conf.1", "1"), ("spark.conf.2", "2")),
-        hadoopProperties = Seq(("hadoop.conf.conf1", "1"), ("hadoop.conf2", 
"val2")),
-        systemProperties = Seq(("sys.prop.1", "value1"), ("sys.prop.2", 
"value2")),
+        sparkProperties = Seq(("spark.conf.1", "1"), ("spark.conf.2", "2"), 
(null, null)),
+        hadoopProperties =
+          Seq(("hadoop.conf.conf1", "1"), ("hadoop.conf2", "val2"), (null, 
"val3")),
+        systemProperties =
+          Seq(("sys.prop.1", "value1"), ("sys.prop.2", "value2"), 
("sys.prop.3", null)),
         metricsProperties = Seq(("metric.1", "klass1"), ("metric2", "klass2")),
         classpathEntries = Seq(("/jar1", "System"), ("/jar2", "User")),
         resourceProfiles = Seq(new ResourceProfileInfo(
@@ -875,20 +877,41 @@ class KVStoreProtobufSerializerSuite extends 
SparkFunSuite {
             cached = true,
             barrier = false,
             callsite = "callsite_1",
-            outputDeterministicLevel = DeterministicLevel.INDETERMINATE)),
-        childClusters = Seq(new RDDOperationClusterWrapper(
-          id = "id_1",
-          name = "name1",
-          childNodes = Seq(
-            RDDOperationNode(
-              id = 15,
-              name = "name3",
-              cached = false,
-              barrier = true,
-              callsite = "callsite_2",
-              outputDeterministicLevel = DeterministicLevel.UNORDERED)),
-          childClusters = Seq.empty
-        ))
+            outputDeterministicLevel = DeterministicLevel.INDETERMINATE),
+          RDDOperationNode(
+            id = 20,
+            name = null,
+            cached = true,
+            barrier = false,
+            callsite = null,
+            outputDeterministicLevel = DeterministicLevel.DETERMINATE)),
+        childClusters = Seq(
+          new RDDOperationClusterWrapper(
+            id = "id_1",
+            name = "name1",
+            childNodes = Seq(
+              RDDOperationNode(
+                id = 15,
+                name = "name3",
+                cached = false,
+                barrier = true,
+                callsite = "callsite_2",
+                outputDeterministicLevel = DeterministicLevel.UNORDERED)),
+            childClusters = Seq.empty
+          ),
+          new RDDOperationClusterWrapper(
+            id = null,
+            name = null,
+            childNodes = Seq(
+              RDDOperationNode(
+                id = 21,
+                name = null,
+                cached = false,
+                barrier = true,
+                callsite = null,
+                outputDeterministicLevel = DeterministicLevel.UNORDERED)),
+            childClusters = Seq.empty
+          ))
       )
     )
     val bytes = serializer.serialize(input)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to