This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 4ab394f6aff [SPARK-42143][UI] Handle null string values in 
RDDStorageInfo/RDDDataDistribution/RDDPartitionInfo
4ab394f6aff is described below

commit 4ab394f6affbbdb463e08be2283df63a11d16b03
Author: Gengliang Wang <gengli...@apache.org>
AuthorDate: Sat Jan 21 13:29:51 2023 -0800

    [SPARK-42143][UI] Handle null string values in 
RDDStorageInfo/RDDDataDistribution/RDDPartitionInfo
    
    ### What changes were proposed in this pull request?
    
    Similar to https://github.com/apache/spark/pull/39666, this PR handles null 
string values in RDDStorageInfo/RDDDataDistribution/RDDPartitionInfo
    ### Why are the changes needed?
    
    Properly handles null string values in the protobuf serializer.
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    New UTs
    
    Closes #39686 from gengliangwang/fixMoreNull2.
    
    Authored-by: Gengliang Wang <gengli...@apache.org>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../apache/spark/status/protobuf/store_types.proto |  8 +++---
 .../protobuf/RDDStorageInfoWrapperSerializer.scala | 19 +++++++------
 .../protobuf/KVStoreProtobufSerializerSuite.scala  | 32 ++++++++++++++++++++--
 3 files changed, 44 insertions(+), 15 deletions(-)

diff --git 
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto 
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index 32b775ac90f..598800da9f5 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -241,7 +241,7 @@ message StreamBlockData {
 }
 
 message RDDDataDistribution {
-  string address = 1;
+  optional string address = 1;
   int64 memory_used = 2;
   int64 memory_remaining = 3;
   int64 disk_used = 4;
@@ -252,8 +252,8 @@ message RDDDataDistribution {
 }
 
 message RDDPartitionInfo {
-  string block_name = 1;
-  string storage_level = 2;
+  optional string block_name = 1;
+  optional string storage_level = 2;
   int64 memory_used = 3;
   int64 disk_used = 4;
   repeated string executors = 5;
@@ -261,7 +261,7 @@ message RDDPartitionInfo {
 
 message RDDStorageInfo {
   int32 id = 1;
-  string name = 2;
+  optional string name = 2;
   int32 num_partitions = 3;
   int32 num_cached_partitions = 4;
   string storage_level = 5;
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/RDDStorageInfoWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDStorageInfoWrapperSerializer.scala
index e59d363243e..06dbb485000 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/RDDStorageInfoWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDStorageInfoWrapperSerializer.scala
@@ -21,7 +21,8 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.status.RDDStorageInfoWrapper
 import org.apache.spark.status.api.v1.{RDDDataDistribution, RDDPartitionInfo, 
RDDStorageInfo}
-import org.apache.spark.status.protobuf.Utils.getOptional
+import org.apache.spark.status.protobuf.Utils.{getOptional, getStringField, 
setStringField}
+import org.apache.spark.util.Utils.weakIntern
 
 class RDDStorageInfoWrapperSerializer extends 
ProtobufSerDe[RDDStorageInfoWrapper] {
 
@@ -41,7 +42,7 @@ class RDDStorageInfoWrapperSerializer extends 
ProtobufSerDe[RDDStorageInfoWrappe
   private def serializeRDDStorageInfo(info: RDDStorageInfo): 
StoreTypes.RDDStorageInfo = {
     val builder = StoreTypes.RDDStorageInfo.newBuilder()
     builder.setId(info.id)
-    builder.setName(info.name)
+    setStringField(info.name, builder.setName)
     builder.setNumPartitions(info.numPartitions)
     builder.setNumCachedPartitions(info.numCachedPartitions)
     builder.setStorageLevel(info.storageLevel)
@@ -51,7 +52,7 @@ class RDDStorageInfoWrapperSerializer extends 
ProtobufSerDe[RDDStorageInfoWrappe
     if (info.dataDistribution.isDefined) {
       info.dataDistribution.get.foreach { dd =>
         val dataDistributionBuilder = 
StoreTypes.RDDDataDistribution.newBuilder()
-        dataDistributionBuilder.setAddress(dd.address)
+        setStringField(dd.address, dataDistributionBuilder.setAddress)
         dataDistributionBuilder.setMemoryUsed(dd.memoryUsed)
         dataDistributionBuilder.setMemoryRemaining(dd.memoryRemaining)
         dataDistributionBuilder.setDiskUsed(dd.diskUsed)
@@ -66,8 +67,8 @@ class RDDStorageInfoWrapperSerializer extends 
ProtobufSerDe[RDDStorageInfoWrappe
     if (info.partitions.isDefined) {
       info.partitions.get.foreach { p =>
         val partitionsBuilder = StoreTypes.RDDPartitionInfo.newBuilder()
-        partitionsBuilder.setBlockName(p.blockName)
-        partitionsBuilder.setStorageLevel(p.storageLevel)
+        setStringField(p.blockName, partitionsBuilder.setBlockName)
+        setStringField(p.storageLevel, partitionsBuilder.setStorageLevel)
         partitionsBuilder.setMemoryUsed(p.memoryUsed)
         partitionsBuilder.setDiskUsed(p.diskUsed)
         p.executors.foreach(partitionsBuilder.addExecutors)
@@ -81,7 +82,7 @@ class RDDStorageInfoWrapperSerializer extends 
ProtobufSerDe[RDDStorageInfoWrappe
   private def deserializeRDDStorageInfo(info: StoreTypes.RDDStorageInfo): 
RDDStorageInfo = {
     new RDDStorageInfo(
       id = info.getId,
-      name = info.getName,
+      name = getStringField(info.hasName, info.getName),
       numPartitions = info.getNumPartitions,
       numCachedPartitions = info.getNumCachedPartitions,
       storageLevel = info.getStorageLevel,
@@ -102,7 +103,7 @@ class RDDStorageInfoWrapperSerializer extends 
ProtobufSerDe[RDDStorageInfoWrappe
     RDDDataDistribution = {
 
     new RDDDataDistribution(
-      address = info.getAddress,
+      address = getStringField(info.hasAddress, info.getAddress),
       memoryUsed = info.getMemoryUsed,
       memoryRemaining = info.getMemoryRemaining,
       diskUsed = info.getDiskUsed,
@@ -117,8 +118,8 @@ class RDDStorageInfoWrapperSerializer extends 
ProtobufSerDe[RDDStorageInfoWrappe
 
   private def deserializeRDDPartitionInfo(info: StoreTypes.RDDPartitionInfo): 
RDDPartitionInfo = {
     new RDDPartitionInfo(
-      blockName = info.getBlockName,
-      storageLevel = info.getStorageLevel,
+      blockName = getStringField(info.hasBlockName, info.getBlockName),
+      storageLevel = getStringField(info.hasStorageLevel, () => 
weakIntern(info.getStorageLevel)),
       memoryUsed = info.getMemoryUsed,
       diskUsed = info.getDiskUsed,
       executors = info.getExecutorsList.asScala
diff --git 
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
 
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
index e51b2f5d012..e9dc9e00f67 100644
--- 
a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala
@@ -386,7 +386,16 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite 
{
         onHeapMemoryUsed = Some(101),
         offHeapMemoryUsed = Some(102),
         onHeapMemoryRemaining = Some(103),
-        offHeapMemoryRemaining = Some(104))
+        offHeapMemoryRemaining = Some(104)),
+      new RDDDataDistribution(
+        address = null,
+        memoryUsed = 60,
+        memoryRemaining = 80,
+        diskUsed = 1000,
+        onHeapMemoryUsed = Some(1010),
+        offHeapMemoryUsed = Some(1020),
+        onHeapMemoryRemaining = Some(1030),
+        offHeapMemoryRemaining = Some(1040))
     )
     val rddPartitionInfo = Seq(
       new RDDPartitionInfo(
@@ -394,7 +403,13 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite 
{
         storageLevel = "IN_MEM",
         memoryUsed = 105,
         diskUsed = 106,
-        executors = Seq("exec_0", "exec_1"))
+        executors = Seq("exec_0", "exec_1")),
+      new RDDPartitionInfo(
+        blockName = null,
+        storageLevel = null,
+        memoryUsed = 105,
+        diskUsed = 106,
+        executors = Seq("exec_2", "exec_3"))
     )
     val inputs = Seq(
       new RDDStorageInfoWrapper(
@@ -422,6 +437,19 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite 
{
           dataDistribution = None,
           partitions = Some(Seq.empty)
         )
+      ),
+      new RDDStorageInfoWrapper(
+        info = new RDDStorageInfo(
+          id = 3,
+          name = null,
+          numPartitions = 8,
+          numCachedPartitions = 5,
+          storageLevel = "IN_MEMORY",
+          memoryUsed = 100,
+          diskUsed = 2560,
+          dataDistribution = None,
+          partitions = Some(Seq.empty)
+        )
       )
     )
     inputs.foreach { input =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to