This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 984cfa162da [SPARK-42139][CORE][SQL] Handle null string values in SQLExecutionUIData/SparkPlanGraphWrapper/SQLPlanMetric 984cfa162da is described below commit 984cfa162da99a19dd169ae5ecc4b568e11fe4b1 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Sun Jan 22 13:48:41 2023 -0800 [SPARK-42139][CORE][SQL] Handle null string values in SQLExecutionUIData/SparkPlanGraphWrapper/SQLPlanMetric ### What changes were proposed in this pull request? Similar to #39666, this PR handles null string values in SQLExecutionUIData/SparkPlanGraphWrapper/SQLPlanMetric ### Why are the changes needed? Properly handles null string values in the protobuf serializer. ### Does this PR introduce any user-facing change? No ### How was this patch tested? New UTs Closes #39682 from LuciferYang/SPARK-42139. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../apache/spark/status/protobuf/store_types.proto | 18 ++-- .../sql/SQLExecutionUIDataSerializer.scala | 15 +-- .../protobuf/sql/SQLPlanMetricSerializer.scala | 18 ++-- .../sql/SparkPlanGraphWrapperSerializer.scala | 18 ++-- .../sql/KVStoreProtobufSerializerSuite.scala | 118 ++++++++++++++------- 5 files changed, 121 insertions(+), 66 deletions(-) diff --git a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto index 155e73de056..ab6861057c9 100644 --- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto +++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto @@ -403,17 +403,17 @@ message ExecutorSummaryWrapper { } message SQLPlanMetric { - string name = 1; + optional string name = 1; int64 accumulator_id = 2; - string metric_type = 3; + optional string metric_type = 3; } message SQLExecutionUIData { int64 execution_id = 1; int64 root_execution_id = 2; - string description = 3; - string details = 4; - string physical_plan_description = 5; + optional string description = 3; + optional string details = 4; + optional string physical_plan_description = 5; map<string, string> modified_configs = 6; repeated SQLPlanMetric metrics = 7; int64 submission_time = 8; @@ -427,15 +427,15 @@ message SQLExecutionUIData { message SparkPlanGraphNode { int64 id = 1; - string name = 2; - string desc = 3; + optional string name = 2; + optional string desc = 3; repeated SQLPlanMetric metrics = 4; } message SparkPlanGraphClusterWrapper { int64 id = 1; - string name = 2; - string desc = 3; + optional string name = 2; + optional string desc = 3; repeated SparkPlanGraphNodeWrapper nodes = 4; repeated SQLPlanMetric metrics = 5; } diff --git a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala index dc76ab9a4e9..f0cdca985b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLExecutionUIDataSerializer.scala @@ -23,7 +23,7 @@ import collection.JavaConverters._ import org.apache.spark.sql.execution.ui.SQLExecutionUIData import org.apache.spark.status.protobuf.{JobExecutionStatusSerializer, ProtobufSerDe, StoreTypes} -import org.apache.spark.status.protobuf.Utils.getOptional +import org.apache.spark.status.protobuf.Utils._ class SQLExecutionUIDataSerializer extends ProtobufSerDe[SQLExecutionUIData] { @@ -31,9 +31,9 @@ class SQLExecutionUIDataSerializer extends ProtobufSerDe[SQLExecutionUIData] { val builder = StoreTypes.SQLExecutionUIData.newBuilder() builder.setExecutionId(ui.executionId) builder.setRootExecutionId(ui.rootExecutionId) - Option(ui.description).foreach(builder.setDescription) - Option(ui.details).foreach(builder.setDetails) - Option(ui.physicalPlanDescription).foreach(builder.setPhysicalPlanDescription) + setStringField(ui.description, builder.setDescription) + setStringField(ui.details, builder.setDetails) + setStringField(ui.physicalPlanDescription, builder.setPhysicalPlanDescription) if (ui.modifiedConfigs != null) { ui.modifiedConfigs.foreach { case (k, v) => builder.putModifiedConfigs(k, v) @@ -81,9 +81,10 @@ class SQLExecutionUIDataSerializer extends ProtobufSerDe[SQLExecutionUIData] { new SQLExecutionUIData( executionId = ui.getExecutionId, rootExecutionId = ui.getRootExecutionId, - description = ui.getDescription, - details = ui.getDetails, - physicalPlanDescription = ui.getPhysicalPlanDescription, + description = getStringField(ui.hasDescription, () => ui.getDescription), + details = getStringField(ui.hasDetails, () => ui.getDetails), + physicalPlanDescription = + getStringField(ui.hasPhysicalPlanDescription, () => ui.getPhysicalPlanDescription), modifiedConfigs = ui.getModifiedConfigsMap.asScala.toMap, metrics = metrics, submissionTime = ui.getSubmissionTime, diff --git a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLPlanMetricSerializer.scala b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLPlanMetricSerializer.scala index 8886bba2f92..88ba51c52b4 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLPlanMetricSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SQLPlanMetricSerializer.scala @@ -19,18 +19,24 @@ package org.apache.spark.status.protobuf.sql import org.apache.spark.sql.execution.ui.SQLPlanMetric import org.apache.spark.status.protobuf.StoreTypes +import org.apache.spark.status.protobuf.Utils._ +import org.apache.spark.util.Utils.weakIntern object SQLPlanMetricSerializer { def serialize(metric: SQLPlanMetric): StoreTypes.SQLPlanMetric = { - StoreTypes.SQLPlanMetric.newBuilder() - .setName(metric.name) - .setAccumulatorId(metric.accumulatorId) - .setMetricType(metric.metricType) - .build() + val builder = StoreTypes.SQLPlanMetric.newBuilder() + setStringField(metric.name, builder.setName) + builder.setAccumulatorId(metric.accumulatorId) + setStringField(metric.metricType, builder.setMetricType) + builder.build() } def deserialize(metrics: StoreTypes.SQLPlanMetric): SQLPlanMetric = { - SQLPlanMetric(metrics.getName, metrics.getAccumulatorId, metrics.getMetricType) + SQLPlanMetric( + name = getStringField(metrics.hasName, () => weakIntern(metrics.getName)), + accumulatorId = metrics.getAccumulatorId, + metricType = getStringField(metrics.hasMetricType, () => weakIntern(metrics.getMetricType)) + ) } } diff --git a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala index 1df82e3246a..bff5c0d7619 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala @@ -22,6 +22,8 @@ import collection.JavaConverters._ import org.apache.spark.sql.execution.ui.{SparkPlanGraphClusterWrapper, SparkPlanGraphEdge, SparkPlanGraphNode, SparkPlanGraphNodeWrapper, SparkPlanGraphWrapper} import org.apache.spark.status.protobuf.ProtobufSerDe import org.apache.spark.status.protobuf.StoreTypes +import org.apache.spark.status.protobuf.Utils.{getStringField, setStringField} +import org.apache.spark.util.Utils.weakIntern class SparkPlanGraphWrapperSerializer extends ProtobufSerDe[SparkPlanGraphWrapper] { @@ -92,8 +94,8 @@ class SparkPlanGraphWrapperSerializer extends ProtobufSerDe[SparkPlanGraphWrappe StoreTypes.SparkPlanGraphNode = { val builder = StoreTypes.SparkPlanGraphNode.newBuilder() builder.setId(node.id) - builder.setName(node.name) - builder.setDesc(node.desc) + setStringField(node.name, builder.setName) + setStringField(node.desc, builder.setDesc) node.metrics.foreach { metric => builder.addMetrics(SQLPlanMetricSerializer.serialize(metric)) } @@ -105,8 +107,8 @@ class SparkPlanGraphWrapperSerializer extends ProtobufSerDe[SparkPlanGraphWrappe new SparkPlanGraphNode( id = node.getId, - name = node.getName, - desc = node.getDesc, + name = getStringField(node.hasName, () => weakIntern(node.getName)), + desc = getStringField(node.hasDesc, () => node.getDesc), metrics = node.getMetricsList.asScala.map(SQLPlanMetricSerializer.deserialize) ) } @@ -115,8 +117,8 @@ class SparkPlanGraphWrapperSerializer extends ProtobufSerDe[SparkPlanGraphWrappe StoreTypes.SparkPlanGraphClusterWrapper = { val builder = StoreTypes.SparkPlanGraphClusterWrapper.newBuilder() builder.setId(cluster.id) - builder.setName(cluster.name) - builder.setDesc(cluster.desc) + setStringField(cluster.name, builder.setName) + setStringField(cluster.desc, builder.setDesc) cluster.nodes.foreach { node => builder.addNodes(serializeSparkPlanGraphNodeWrapper(node)) } @@ -131,8 +133,8 @@ class SparkPlanGraphWrapperSerializer extends ProtobufSerDe[SparkPlanGraphWrappe new SparkPlanGraphClusterWrapper( id = cluster.getId, - name = cluster.getName, - desc = cluster.getDesc, + name = getStringField(cluster.hasName, () => weakIntern(cluster.getName)), + desc = getStringField(cluster.hasDesc, () => cluster.getDesc), nodes = cluster.getNodesList.asScala.map(deserializeSparkPlanGraphNodeWrapper), metrics = cluster.getMetricsList.asScala.map(SQLPlanMetricSerializer.deserialize) ) diff --git a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala index 41f185900ad..c220ca1c96f 100644 --- a/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/status/protobuf/sql/KVStoreProtobufSerializerSuite.scala @@ -30,22 +30,39 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { private val serializer = new KVStoreProtobufSerializer() test("SQLExecutionUIData") { - val input = SqlResourceSuite.sqlExecutionUIData - val bytes = serializer.serialize(input) - val result = serializer.deserialize(bytes, classOf[SQLExecutionUIData]) - assert(result.executionId == input.executionId) - assert(result.rootExecutionId == input.rootExecutionId) - assert(result.description == input.description) - assert(result.details == input.details) - assert(result.physicalPlanDescription == input.physicalPlanDescription) - assert(result.modifiedConfigs == input.modifiedConfigs) - assert(result.metrics == input.metrics) - assert(result.submissionTime == input.submissionTime) - assert(result.completionTime == input.completionTime) - assert(result.errorMessage == input.errorMessage) - assert(result.jobs == input.jobs) - assert(result.stages == input.stages) - assert(result.metricValues == input.metricValues) + val normal = SqlResourceSuite.sqlExecutionUIData + val withNull = new SQLExecutionUIData( + executionId = normal.executionId, + rootExecutionId = normal.rootExecutionId, + description = null, + details = null, + physicalPlanDescription = null, + modifiedConfigs = normal.modifiedConfigs, + metrics = Seq(SQLPlanMetric(null, 0, null)), + submissionTime = normal.submissionTime, + completionTime = normal.completionTime, + errorMessage = normal.errorMessage, + jobs = normal.jobs, + stages = normal.stages, + metricValues = normal.metricValues + ) + Seq(normal, withNull).foreach { input => + val bytes = serializer.serialize(input) + val result = serializer.deserialize(bytes, classOf[SQLExecutionUIData]) + assert(result.executionId == input.executionId) + assert(result.rootExecutionId == input.rootExecutionId) + assert(result.description == input.description) + assert(result.details == input.details) + assert(result.physicalPlanDescription == input.physicalPlanDescription) + assert(result.modifiedConfigs == input.modifiedConfigs) + assert(result.metrics == input.metrics) + assert(result.submissionTime == input.submissionTime) + assert(result.completionTime == input.completionTime) + assert(result.errorMessage == input.errorMessage) + assert(result.jobs == input.jobs) + assert(result.stages == input.stages) + assert(result.metricValues == input.metricValues) + } } test("SQLExecutionUIData with metricValues is empty map and null") { @@ -93,30 +110,59 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { } test("Spark Plan Graph") { + val node0: SparkPlanGraphNodeWrapper = new SparkPlanGraphNodeWrapper( + node = new SparkPlanGraphNode( + id = 12, + name = "name_12", + desc = "desc_12", + metrics = Seq( + SQLPlanMetric( + name = "name_13", + accumulatorId = 13, + metricType = "metric_13" + ), + SQLPlanMetric( + name = "name_14", + accumulatorId = 14, + metricType = "metric_14" + ) + ) + ), + cluster = null + ) + + val node1: SparkPlanGraphNodeWrapper = new SparkPlanGraphNodeWrapper( + node = new SparkPlanGraphNode( + id = 13, + name = null, + desc = null, + metrics = Seq( + SQLPlanMetric( + name = null, + accumulatorId = 13, + metricType = null + ) + ) + ), + cluster = null + ) + + val node2: SparkPlanGraphNodeWrapper = new SparkPlanGraphNodeWrapper( + node = null, + cluster = new SparkPlanGraphClusterWrapper( + id = 6, + name = null, + desc = null, + nodes = Seq.empty, + metrics = Seq.empty + ) + ) + val cluster = new SparkPlanGraphClusterWrapper( id = 5, name = "name_5", desc = "desc_5", - nodes = Seq(new SparkPlanGraphNodeWrapper( - node = new SparkPlanGraphNode( - id = 12, - name = "name_12", - desc = "desc_12", - metrics = Seq( - SQLPlanMetric( - name = "name_13", - accumulatorId = 13, - metricType = "metric_13" - ), - SQLPlanMetric( - name = "name_14", - accumulatorId = 14, - metricType = "metric_14" - ) - ) - ), - cluster = null - )), + nodes = Seq(node0, node1, node2), metrics = Seq( SQLPlanMetric( name = "name_6", --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org