This is an automated email from the ASF dual-hosted git repository.

gengliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new c0769759f4f [SPARK-41432][UI][FOLLOWUP] Fix a bug in protobuf 
serializer of SparkPlanGraphNodeWrapper
c0769759f4f is described below

commit c0769759f4fd3cbce859cde790dcd1df568cfd0b
Author: Gengliang Wang <gengli...@apache.org>
AuthorDate: Tue Jan 10 06:48:51 2023 -0800

    [SPARK-41432][UI][FOLLOWUP] Fix a bug in protobuf serializer of 
SparkPlanGraphNodeWrapper
    
    ### What changes were proposed in this pull request?
    
    SparkPlanGraphNodeWrapper can only contain either a node or a cluster. In 
the current implementation, both the node and cluster fields are not null. It 
breaks the assertion of the method `toSparkPlanGraphNode`:
    ```scala
    class SparkPlanGraphNodeWrapper(
        val node: SparkPlanGraphNode,
        val cluster: SparkPlanGraphClusterWrapper) {
    
      def toSparkPlanGraphNode(): SparkPlanGraphNode = {
        assert(node == null ^ cluster == null, "Exactly one of node, cluster 
values to be set.")
        if (node != null) node else cluster.toSparkPlanGraphCluster()
      }
    
    }
    ```
    This PR is to fix the bug by using `oneof` in the protobuf definition of 
`SparkPlanGraphNodeWrapper`
    ### Why are the changes needed?
    
    Bug fix
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    ### How was this patch tested?
    
    Update related UT. Also run SQLMetricsSuite with RocksDB backend enabled.
    
    Closes #39471 from gengliangwang/fixGraph.
    
    Authored-by: Gengliang Wang <gengli...@apache.org>
    Signed-off-by: Gengliang Wang <gengli...@apache.org>
---
 .../apache/spark/status/protobuf/store_types.proto |  6 +-
 .../sql/SparkPlanGraphWrapperSerializer.scala      | 24 ++++--
 .../sql/KVStoreProtobufSerializerSuite.scala       | 89 ++++++++--------------
 3 files changed, 52 insertions(+), 67 deletions(-)

diff --git 
a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto 
b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
index e9aaad261f9..9001847e872 100644
--- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
+++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto
@@ -436,8 +436,10 @@ message SparkPlanGraphClusterWrapper {
 }
 
 message SparkPlanGraphNodeWrapper {
-  SparkPlanGraphNode node = 1;
-  SparkPlanGraphClusterWrapper cluster = 2;
+  oneof wrapper {
+    SparkPlanGraphNode node = 1;
+    SparkPlanGraphClusterWrapper cluster = 2;
+  }
 }
 
 message SparkPlanGraphEdge {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
index db63fd6afe2..c68466489ce 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
@@ -53,19 +53,27 @@ class SparkPlanGraphWrapperSerializer extends ProtobufSerDe 
{
     StoreTypes.SparkPlanGraphNodeWrapper = {
 
     val builder = StoreTypes.SparkPlanGraphNodeWrapper.newBuilder()
-    Option(input.node).foreach(node => 
builder.setNode(serializeSparkPlanGraphNode(node)))
-    Option(input.cluster)
-      .foreach(cluster => 
builder.setCluster(serializeSparkPlanGraphClusterWrapper(cluster)))
+    if (input.node != null) {
+      builder.setNode(serializeSparkPlanGraphNode(input.node))
+    } else {
+      builder.setCluster(serializeSparkPlanGraphClusterWrapper(input.cluster))
+    }
     builder.build()
   }
 
   private def deserializeSparkPlanGraphNodeWrapper(input: 
StoreTypes.SparkPlanGraphNodeWrapper):
     SparkPlanGraphNodeWrapper = {
-
-    new SparkPlanGraphNodeWrapper(
-      node = deserializeSparkPlanGraphNode(input.getNode),
-      cluster = deserializeSparkPlanGraphClusterWrapper(input.getCluster)
-    )
+    if (input.hasNode) {
+      new SparkPlanGraphNodeWrapper(
+        node = deserializeSparkPlanGraphNode(input.getNode),
+        cluster = null
+      )
+    } else {
+      new SparkPlanGraphNodeWrapper(
+        node = null,
+        cluster = deserializeSparkPlanGraphClusterWrapper(input.getCluster)
+      )
+    }
   }
 
   private def serializeSparkPlanGraphEdge(edge: SparkPlanGraphEdge):
diff --git 
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
index ddc693f1ee3..cfb5093611b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala
@@ -114,24 +114,7 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite 
{
             )
           )
         ),
-        cluster = new SparkPlanGraphClusterWrapper(
-          id = 15,
-          name = "name_15",
-          desc = "desc_15",
-          nodes = Seq(),
-          metrics = Seq(
-            SQLPlanMetric(
-              name = "name_16",
-              accumulatorId = 16,
-              metricType = "metric_16"
-            ),
-            SQLPlanMetric(
-              name = "name_17",
-              accumulatorId = 17,
-              metricType = "metric_17"
-            )
-          )
-        )
+        cluster = null
       )),
       metrics = Seq(
         SQLPlanMetric(
@@ -147,23 +130,7 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite 
{
       )
     )
     val node = new SparkPlanGraphNodeWrapper(
-      node = new SparkPlanGraphNode(
-        id = 2,
-        name = "name_1",
-        desc = "desc_1",
-        metrics = Seq(
-          SQLPlanMetric(
-            name = "name_2",
-            accumulatorId = 3,
-            metricType = "metric_1"
-          ),
-          SQLPlanMetric(
-            name = "name_3",
-            accumulatorId = 4,
-            metricType = "metric_2"
-          )
-        )
-      ),
+      node = null,
       cluster = cluster
     )
     val input = new SparkPlanGraphWrapper(
@@ -181,29 +148,37 @@ class KVStoreProtobufSerializerSuite extends 
SparkFunSuite {
     assert(result.nodes.size == input.nodes.size)
 
     def compareNodes(n1: SparkPlanGraphNodeWrapper, n2: 
SparkPlanGraphNodeWrapper): Unit = {
-      assert(n1.node.id == n2.node.id)
-      assert(n1.node.name == n2.node.name)
-      assert(n1.node.desc == n2.node.desc)
-
-      assert(n1.node.metrics.size == n2.node.metrics.size)
-      n1.node.metrics.zip(n2.node.metrics).foreach { case (m1, m2) =>
-        assert(m1.name == m2.name)
-        assert(m1.accumulatorId == m2.accumulatorId)
-        assert(m1.metricType == m2.metricType)
-      }
-
-      assert(n1.cluster.id == n2.cluster.id)
-      assert(n1.cluster.name == n2.cluster.name)
-      assert(n1.cluster.desc == n2.cluster.desc)
-      assert(n1.cluster.nodes.size == n2.cluster.nodes.size)
-      n1.cluster.nodes.zip(n2.cluster.nodes).foreach { case (n3, n4) =>
-        compareNodes(n3, n4)
-      }
-      n1.cluster.metrics.zip(n2.cluster.metrics).foreach { case (m1, m2) =>
-        assert(m1.name == m2.name)
-        assert(m1.accumulatorId == m2.accumulatorId)
-        assert(m1.metricType == m2.metricType)
+      if (n1.node != null) {
+        assert(n2.node != null)
+        assert(n1.node.id == n2.node.id)
+        assert(n1.node.name == n2.node.name)
+        assert(n1.node.desc == n2.node.desc)
+
+        assert(n1.node.metrics.size == n2.node.metrics.size)
+        n1.node.metrics.zip(n2.node.metrics).foreach { case (m1, m2) =>
+          assert(m1.name == m2.name)
+          assert(m1.accumulatorId == m2.accumulatorId)
+          assert(m1.metricType == m2.metricType)
+        }
+      } else {
+        assert(n2.node == null)
+        assert(n1.cluster != null && n2.cluster != null)
+        assert(n1.cluster.id == n2.cluster.id)
+        assert(n1.cluster.name == n2.cluster.name)
+        assert(n1.cluster.desc == n2.cluster.desc)
+        assert(n1.cluster.nodes.size == n2.cluster.nodes.size)
+        n1.cluster.nodes.zip(n2.cluster.nodes).foreach { case (n3, n4) =>
+          compareNodes(n3, n4)
+        }
+        n1.cluster.metrics.zip(n2.cluster.metrics).foreach { case (m1, m2) =>
+          assert(m1.name == m2.name)
+          assert(m1.accumulatorId == m2.accumulatorId)
+          assert(m1.metricType == m2.metricType)
+        }
       }
+      val metrics = Map(6L -> "a", 7L -> "b", 13L -> "c", 14L -> "d")
+      assert(n1.toSparkPlanGraphNode().makeDotNode(metrics) ==
+        n2.toSparkPlanGraphNode().makeDotNode(metrics))
     }
 
     result.nodes.zip(input.nodes).foreach { case (n1, n2) =>


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to