This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new c0769759f4f [SPARK-41432][UI][FOLLOWUP] Fix a bug in protobuf serializer of SparkPlanGraphNodeWrapper c0769759f4f is described below commit c0769759f4fd3cbce859cde790dcd1df568cfd0b Author: Gengliang Wang <gengli...@apache.org> AuthorDate: Tue Jan 10 06:48:51 2023 -0800 [SPARK-41432][UI][FOLLOWUP] Fix a bug in protobuf serializer of SparkPlanGraphNodeWrapper ### What changes were proposed in this pull request? SparkPlanGraphNodeWrapper can only contain either a node or a cluster. In the current implementation, both the node and cluster fields are not null. It breaks the assertion of the method `toSparkPlanGraphNode`: ```scala class SparkPlanGraphNodeWrapper( val node: SparkPlanGraphNode, val cluster: SparkPlanGraphClusterWrapper) { def toSparkPlanGraphNode(): SparkPlanGraphNode = { assert(node == null ^ cluster == null, "Exactly one of node, cluster values to be set.") if (node != null) node else cluster.toSparkPlanGraphCluster() } } ``` This PR is to fix the bug by using `oneof` in the protobuf definition of `SparkPlanGraphNodeWrapper` ### Why are the changes needed? Bug fix ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Update related UT. Also run SQLMetricsSuite with RocksDB backend enabled. Closes #39471 from gengliangwang/fixGraph. Authored-by: Gengliang Wang <gengli...@apache.org> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../apache/spark/status/protobuf/store_types.proto | 6 +- .../sql/SparkPlanGraphWrapperSerializer.scala | 24 ++++-- .../sql/KVStoreProtobufSerializerSuite.scala | 89 ++++++++-------------- 3 files changed, 52 insertions(+), 67 deletions(-) diff --git a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto index e9aaad261f9..9001847e872 100644 --- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto +++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto @@ -436,8 +436,10 @@ message SparkPlanGraphClusterWrapper { } message SparkPlanGraphNodeWrapper { - SparkPlanGraphNode node = 1; - SparkPlanGraphClusterWrapper cluster = 2; + oneof wrapper { + SparkPlanGraphNode node = 1; + SparkPlanGraphClusterWrapper cluster = 2; + } } message SparkPlanGraphEdge { diff --git a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala index db63fd6afe2..c68466489ce 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala @@ -53,19 +53,27 @@ class SparkPlanGraphWrapperSerializer extends ProtobufSerDe { StoreTypes.SparkPlanGraphNodeWrapper = { val builder = StoreTypes.SparkPlanGraphNodeWrapper.newBuilder() - Option(input.node).foreach(node => builder.setNode(serializeSparkPlanGraphNode(node))) - Option(input.cluster) - .foreach(cluster => builder.setCluster(serializeSparkPlanGraphClusterWrapper(cluster))) + if (input.node != null) { + builder.setNode(serializeSparkPlanGraphNode(input.node)) + } else { + builder.setCluster(serializeSparkPlanGraphClusterWrapper(input.cluster)) + } builder.build() } private def deserializeSparkPlanGraphNodeWrapper(input: StoreTypes.SparkPlanGraphNodeWrapper): SparkPlanGraphNodeWrapper = { - - new SparkPlanGraphNodeWrapper( - node = deserializeSparkPlanGraphNode(input.getNode), - cluster = deserializeSparkPlanGraphClusterWrapper(input.getCluster) - ) + if (input.hasNode) { + new SparkPlanGraphNodeWrapper( + node = deserializeSparkPlanGraphNode(input.getNode), + cluster = null + ) + } else { + new SparkPlanGraphNodeWrapper( + node = null, + cluster = deserializeSparkPlanGraphClusterWrapper(input.getCluster) + ) + } } private def serializeSparkPlanGraphEdge(edge: SparkPlanGraphEdge): diff --git a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala index ddc693f1ee3..cfb5093611b 100644 --- a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala @@ -114,24 +114,7 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { ) ) ), - cluster = new SparkPlanGraphClusterWrapper( - id = 15, - name = "name_15", - desc = "desc_15", - nodes = Seq(), - metrics = Seq( - SQLPlanMetric( - name = "name_16", - accumulatorId = 16, - metricType = "metric_16" - ), - SQLPlanMetric( - name = "name_17", - accumulatorId = 17, - metricType = "metric_17" - ) - ) - ) + cluster = null )), metrics = Seq( SQLPlanMetric( @@ -147,23 +130,7 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { ) ) val node = new SparkPlanGraphNodeWrapper( - node = new SparkPlanGraphNode( - id = 2, - name = "name_1", - desc = "desc_1", - metrics = Seq( - SQLPlanMetric( - name = "name_2", - accumulatorId = 3, - metricType = "metric_1" - ), - SQLPlanMetric( - name = "name_3", - accumulatorId = 4, - metricType = "metric_2" - ) - ) - ), + node = null, cluster = cluster ) val input = new SparkPlanGraphWrapper( @@ -181,29 +148,37 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { assert(result.nodes.size == input.nodes.size) def compareNodes(n1: SparkPlanGraphNodeWrapper, n2: SparkPlanGraphNodeWrapper): Unit = { - assert(n1.node.id == n2.node.id) - assert(n1.node.name == n2.node.name) - assert(n1.node.desc == n2.node.desc) - - assert(n1.node.metrics.size == n2.node.metrics.size) - n1.node.metrics.zip(n2.node.metrics).foreach { case (m1, m2) => - assert(m1.name == m2.name) - assert(m1.accumulatorId == m2.accumulatorId) - assert(m1.metricType == m2.metricType) - } - - assert(n1.cluster.id == n2.cluster.id) - assert(n1.cluster.name == n2.cluster.name) - assert(n1.cluster.desc == n2.cluster.desc) - assert(n1.cluster.nodes.size == n2.cluster.nodes.size) - n1.cluster.nodes.zip(n2.cluster.nodes).foreach { case (n3, n4) => - compareNodes(n3, n4) - } - n1.cluster.metrics.zip(n2.cluster.metrics).foreach { case (m1, m2) => - assert(m1.name == m2.name) - assert(m1.accumulatorId == m2.accumulatorId) - assert(m1.metricType == m2.metricType) + if (n1.node != null) { + assert(n2.node != null) + assert(n1.node.id == n2.node.id) + assert(n1.node.name == n2.node.name) + assert(n1.node.desc == n2.node.desc) + + assert(n1.node.metrics.size == n2.node.metrics.size) + n1.node.metrics.zip(n2.node.metrics).foreach { case (m1, m2) => + assert(m1.name == m2.name) + assert(m1.accumulatorId == m2.accumulatorId) + assert(m1.metricType == m2.metricType) + } + } else { + assert(n2.node == null) + assert(n1.cluster != null && n2.cluster != null) + assert(n1.cluster.id == n2.cluster.id) + assert(n1.cluster.name == n2.cluster.name) + assert(n1.cluster.desc == n2.cluster.desc) + assert(n1.cluster.nodes.size == n2.cluster.nodes.size) + n1.cluster.nodes.zip(n2.cluster.nodes).foreach { case (n3, n4) => + compareNodes(n3, n4) + } + n1.cluster.metrics.zip(n2.cluster.metrics).foreach { case (m1, m2) => + assert(m1.name == m2.name) + assert(m1.accumulatorId == m2.accumulatorId) + assert(m1.metricType == m2.metricType) + } } + val metrics = Map(6L -> "a", 7L -> "b", 13L -> "c", 14L -> "d") + assert(n1.toSparkPlanGraphNode().makeDotNode(metrics) == + n2.toSparkPlanGraphNode().makeDotNode(metrics)) } result.nodes.zip(input.nodes).foreach { case (n1, n2) => --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org