This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 73b0c71fe33 [SPARK-42153][UI] Handle null string values in PairStrings/RDDOperationNode/RDDOperationClusterWrapper 73b0c71fe33 is described below commit 73b0c71fe33704dd640a6008342358da67f50390 Author: Gengliang Wang <gengli...@apache.org> AuthorDate: Sat Jan 21 20:09:26 2023 -0800 [SPARK-42153][UI] Handle null string values in PairStrings/RDDOperationNode/RDDOperationClusterWrapper ### What changes were proposed in this pull request? Similar to #39666, this PR handles null string values in PairStrings/RDDOperationNode/RDDOperationClusterWrapper ### Why are the changes needed? Properly handles null string values in the protobuf serializer. ### Does this PR introduce any user-facing change? No ### How was this patch tested? New UTs Closes #39696 from gengliangwang/moreNull3. Authored-by: Gengliang Wang <gengli...@apache.org> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../apache/spark/status/protobuf/store_types.proto | 12 ++--- ...plicationEnvironmentInfoWrapperSerializer.scala | 7 +-- .../RDDOperationGraphWrapperSerializer.scala | 17 ++++--- .../protobuf/KVStoreProtobufSerializerSuite.scala | 57 +++++++++++++++------- 4 files changed, 59 insertions(+), 34 deletions(-) diff --git a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto index 49076790321..155e73de056 100644 --- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto +++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto @@ -185,8 +185,8 @@ message RuntimeInfo { } message PairStrings { - string value1 = 1; - string value2 = 2; + optional string value1 = 1; + optional string value2 = 2; } message ApplicationEnvironmentInfo { @@ -472,16 +472,16 @@ enum DeterministicLevel { message RDDOperationNode { int32 id = 1; - string name = 2; + optional string name = 2; bool cached = 3; bool barrier = 4; - string callsite = 5; + optional string callsite = 5; DeterministicLevel output_deterministic_level = 6; } message RDDOperationClusterWrapper { - string id = 1; - string name = 2; + optional string id = 1; + optional string name = 2; repeated RDDOperationNode child_nodes = 3; repeated RDDOperationClusterWrapper child_clusters = 4; } diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala index fbbc55387b8..63c8387a8db 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala @@ -81,7 +81,8 @@ class ApplicationEnvironmentInfoWrapperSerializer scalaVersion = getStringField(rt.hasScalaVersion, () => rt.getScalaVersion) ) val pairSSToTuple = (pair: StoreTypes.PairStrings) => { - (pair.getValue1, pair.getValue2) + (getStringField(pair.hasValue1, pair.getValue1), + getStringField(pair.hasValue2, pair.getValue2)) } new ApplicationEnvironmentInfo( runtime = runtime, @@ -97,8 +98,8 @@ class ApplicationEnvironmentInfoWrapperSerializer private def serializePairStrings(pair: (String, String)): StoreTypes.PairStrings = { val builder = StoreTypes.PairStrings.newBuilder() - builder.setValue1(pair._1) - builder.setValue2(pair._2) + setStringField(pair._1, builder.setValue1) + setStringField(pair._2, builder.setValue2) builder.build() } diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala index f822ed1889a..3187b255d4c 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala @@ -22,6 +22,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.rdd.DeterministicLevel import org.apache.spark.status.{RDDOperationClusterWrapper, RDDOperationGraphWrapper} import org.apache.spark.status.protobuf.StoreTypes.{DeterministicLevel => GDeterministicLevel} +import org.apache.spark.status.protobuf.Utils.{getStringField, setStringField} import org.apache.spark.ui.scope.{RDDOperationEdge, RDDOperationNode} class RDDOperationGraphWrapperSerializer extends ProtobufSerDe[RDDOperationGraphWrapper] { @@ -56,8 +57,8 @@ class RDDOperationGraphWrapperSerializer extends ProtobufSerDe[RDDOperationGraph private def serializeRDDOperationClusterWrapper(op: RDDOperationClusterWrapper): StoreTypes.RDDOperationClusterWrapper = { val builder = StoreTypes.RDDOperationClusterWrapper.newBuilder() - builder.setId(op.id) - builder.setName(op.name) + setStringField(op.id, builder.setId) + setStringField(op.name, builder.setName) op.childNodes.foreach { node => builder.addChildNodes(serializeRDDOperationNode(node)) } @@ -70,8 +71,8 @@ class RDDOperationGraphWrapperSerializer extends ProtobufSerDe[RDDOperationGraph private def deserializeRDDOperationClusterWrapper(op: StoreTypes.RDDOperationClusterWrapper): RDDOperationClusterWrapper = { new RDDOperationClusterWrapper( - id = op.getId, - name = op.getName, + id = getStringField(op.hasId, op.getId), + name = getStringField(op.hasName, op.getName), childNodes = op.getChildNodesList.asScala.map(deserializeRDDOperationNode), childClusters = op.getChildClustersList.asScala.map(deserializeRDDOperationClusterWrapper) @@ -83,10 +84,10 @@ class RDDOperationGraphWrapperSerializer extends ProtobufSerDe[RDDOperationGraph node.outputDeterministicLevel) val builder = StoreTypes.RDDOperationNode.newBuilder() builder.setId(node.id) - builder.setName(node.name) + setStringField(node.name, builder.setName) + setStringField(node.callsite, builder.setCallsite) builder.setCached(node.cached) builder.setBarrier(node.barrier) - builder.setCallsite(node.callsite) builder.setOutputDeterministicLevel(outputDeterministicLevel) builder.build() } @@ -94,10 +95,10 @@ class RDDOperationGraphWrapperSerializer extends ProtobufSerDe[RDDOperationGraph private def deserializeRDDOperationNode(node: StoreTypes.RDDOperationNode): RDDOperationNode = { RDDOperationNode( id = node.getId, - name = node.getName, + name = getStringField(node.hasName, node.getName), cached = node.getCached, barrier = node.getBarrier, - callsite = node.getCallsite, + callsite = getStringField(node.hasCallsite, node.getCallsite), outputDeterministicLevel = DeterministicLevelSerializer.deserialize( node.getOutputDeterministicLevel) ) diff --git a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala index 14dd2cd601d..d4c79adf2ec 100644 --- a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala @@ -259,9 +259,11 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { javaVersion = javaVersion, javaHome = javaHome, scalaVersion = scalaVersion), - sparkProperties = Seq(("spark.conf.1", "1"), ("spark.conf.2", "2")), - hadoopProperties = Seq(("hadoop.conf.conf1", "1"), ("hadoop.conf2", "val2")), - systemProperties = Seq(("sys.prop.1", "value1"), ("sys.prop.2", "value2")), + sparkProperties = Seq(("spark.conf.1", "1"), ("spark.conf.2", "2"), (null, null)), + hadoopProperties = + Seq(("hadoop.conf.conf1", "1"), ("hadoop.conf2", "val2"), (null, "val3")), + systemProperties = + Seq(("sys.prop.1", "value1"), ("sys.prop.2", "value2"), ("sys.prop.3", null)), metricsProperties = Seq(("metric.1", "klass1"), ("metric2", "klass2")), classpathEntries = Seq(("/jar1", "System"), ("/jar2", "User")), resourceProfiles = Seq(new ResourceProfileInfo( @@ -875,20 +877,41 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { cached = true, barrier = false, callsite = "callsite_1", - outputDeterministicLevel = DeterministicLevel.INDETERMINATE)), - childClusters = Seq(new RDDOperationClusterWrapper( - id = "id_1", - name = "name1", - childNodes = Seq( - RDDOperationNode( - id = 15, - name = "name3", - cached = false, - barrier = true, - callsite = "callsite_2", - outputDeterministicLevel = DeterministicLevel.UNORDERED)), - childClusters = Seq.empty - )) + outputDeterministicLevel = DeterministicLevel.INDETERMINATE), + RDDOperationNode( + id = 20, + name = null, + cached = true, + barrier = false, + callsite = null, + outputDeterministicLevel = DeterministicLevel.DETERMINATE)), + childClusters = Seq( + new RDDOperationClusterWrapper( + id = "id_1", + name = "name1", + childNodes = Seq( + RDDOperationNode( + id = 15, + name = "name3", + cached = false, + barrier = true, + callsite = "callsite_2", + outputDeterministicLevel = DeterministicLevel.UNORDERED)), + childClusters = Seq.empty + ), + new RDDOperationClusterWrapper( + id = null, + name = null, + childNodes = Seq( + RDDOperationNode( + id = 21, + name = null, + cached = false, + barrier = true, + callsite = null, + outputDeterministicLevel = DeterministicLevel.UNORDERED)), + childClusters = Seq.empty + )) ) ) val bytes = serializer.serialize(input) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org