This is an automated email from the ASF dual-hosted git repository. gengliang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new bb18703fdbf [SPARK-41429][UI] Protobuf serializer for RDDOperationGraphWrapper bb18703fdbf is described below commit bb18703fdbfbe4f7887abebd75beb37af662d0f3 Author: Sandeep Singh <sand...@techaddict.me> AuthorDate: Thu Dec 29 16:14:07 2022 -0800 [SPARK-41429][UI] Protobuf serializer for RDDOperationGraphWrapper ### What changes were proposed in this pull request? Add Protobuf serializer for RDDOperationGraphWrapper ### Why are the changes needed? Support fast and compact serialization/deserialization for RDDOperationGraphWrapper over RocksDB. ### Does this PR introduce any user-facing change? No ### How was this patch tested? New UT Closes #39110 from techaddict/SPARK-41429-RDDOperationGraphWrapper. Authored-by: Sandeep Singh <sand...@techaddict.me> Signed-off-by: Gengliang Wang <gengli...@apache.org> --- .../apache/spark/status/protobuf/store_types.proto | 35 ++++++ .../org.apache.spark.status.protobuf.ProtobufSerDe | 1 + .../RDDOperationGraphWrapperSerializer.scala | 120 +++++++++++++++++++++ .../protobuf/KVStoreProtobufSerializerSuite.scala | 84 +++++++++++++++ 4 files changed, 240 insertions(+) diff --git a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto index 22e22eea1a2..e9150490746 100644 --- a/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto +++ b/core/src/main/protobuf/org/apache/spark/status/protobuf/store_types.proto @@ -421,3 +421,38 @@ message SparkPlanGraphWrapper { repeated SparkPlanGraphNodeWrapper nodes = 2; repeated SparkPlanGraphEdge edges = 3; } + +message RDDOperationEdge { + int32 from_id = 1; + int32 to_id = 2; +} + +message RDDOperationNode { + enum DeterministicLevel { + UNSPECIFIED = 0; + DETERMINATE = 1; + UNORDERED = 2; + INDETERMINATE = 3; + } + int32 id = 1; + string name = 2; + bool cached = 3; + bool barrier = 4; + string callsite = 5; + DeterministicLevel output_deterministic_level = 6; +} + +message RDDOperationClusterWrapper { + string id = 1; + string name = 2; + repeated RDDOperationNode child_nodes = 3; + repeated RDDOperationClusterWrapper child_clusters = 4; +} + +message RDDOperationGraphWrapper { + int64 stage_id = 1; + repeated RDDOperationEdge edges = 2; + repeated RDDOperationEdge outgoing_edges = 3; + repeated RDDOperationEdge incoming_edges = 4; + RDDOperationClusterWrapper root_cluster = 5; +} diff --git a/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe b/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe index 39127e6a28c..4e39d9ecdc0 100644 --- a/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe +++ b/core/src/main/resources/META-INF/services/org.apache.spark.status.protobuf.ProtobufSerDe @@ -27,3 +27,4 @@ org.apache.spark.status.protobuf.ResourceProfileWrapperSerializer org.apache.spark.status.protobuf.SpeculationStageSummaryWrapperSerializer org.apache.spark.status.protobuf.ExecutorSummaryWrapperSerializer org.apache.spark.status.protobuf.ProcessSummaryWrapperSerializer +org.apache.spark.status.protobuf.RDDOperationGraphWrapperSerializer diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala new file mode 100644 index 00000000000..8975062082c --- /dev/null +++ b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.status.protobuf + +import scala.collection.JavaConverters._ + +import org.apache.spark.rdd.DeterministicLevel +import org.apache.spark.status.{RDDOperationClusterWrapper, RDDOperationGraphWrapper} +import org.apache.spark.ui.scope.{RDDOperationEdge, RDDOperationNode} + +class RDDOperationGraphWrapperSerializer extends ProtobufSerDe { + + override val supportClass: Class[_] = classOf[RDDOperationGraphWrapper] + + override def serialize(input: Any): Array[Byte] = { + val op = input.asInstanceOf[RDDOperationGraphWrapper] + val builder = StoreTypes.RDDOperationGraphWrapper.newBuilder() + builder.setStageId(op.stageId.toLong) + op.edges.foreach { e => + builder.addEdges(serializeRDDOperationEdge(e)) + } + op.outgoingEdges.foreach { e => + builder.addOutgoingEdges(serializeRDDOperationEdge(e)) + } + op.incomingEdges.foreach { e => + builder.addIncomingEdges(serializeRDDOperationEdge(e)) + } + builder.setRootCluster(serializeRDDOperationClusterWrapper(op.rootCluster)) + builder.build().toByteArray + } + + def deserialize(bytes: Array[Byte]): RDDOperationGraphWrapper = { + val wrapper = StoreTypes.RDDOperationGraphWrapper.parseFrom(bytes) + new RDDOperationGraphWrapper( + stageId = wrapper.getStageId.toInt, + edges = wrapper.getEdgesList.asScala.map(deserializeRDDOperationEdge).toSeq, + outgoingEdges = wrapper.getOutgoingEdgesList.asScala.map(deserializeRDDOperationEdge).toSeq, + incomingEdges = wrapper.getIncomingEdgesList.asScala.map(deserializeRDDOperationEdge).toSeq, + rootCluster = deserializeRDDOperationClusterWrapper(wrapper.getRootCluster) + ) + } + + private def serializeRDDOperationClusterWrapper(op: RDDOperationClusterWrapper): + StoreTypes.RDDOperationClusterWrapper = { + val builder = StoreTypes.RDDOperationClusterWrapper.newBuilder() + builder.setId(op.id) + builder.setName(op.name) + op.childNodes.foreach { node => + builder.addChildNodes(serializeRDDOperationNode(node)) + } + op.childClusters.foreach { cluster => + builder.addChildClusters(serializeRDDOperationClusterWrapper(cluster)) + } + builder.build() + } + + private def deserializeRDDOperationClusterWrapper(op: StoreTypes.RDDOperationClusterWrapper): + RDDOperationClusterWrapper = { + new RDDOperationClusterWrapper( + id = op.getId, + name = op.getName, + childNodes = op.getChildNodesList.asScala.map(deserializeRDDOperationNode).toSeq, + childClusters = + op.getChildClustersList.asScala.map(deserializeRDDOperationClusterWrapper).toSeq + ) + } + + private def serializeRDDOperationNode(node: RDDOperationNode): StoreTypes.RDDOperationNode = { + val outputDeterministicLevel = StoreTypes.RDDOperationNode.DeterministicLevel + .valueOf(node.outputDeterministicLevel.toString) + val builder = StoreTypes.RDDOperationNode.newBuilder() + builder.setId(node.id) + builder.setName(node.name) + builder.setCached(node.cached) + builder.setBarrier(node.barrier) + builder.setCallsite(node.callsite) + builder.setOutputDeterministicLevel(outputDeterministicLevel) + builder.build() + } + + private def deserializeRDDOperationNode(node: StoreTypes.RDDOperationNode): RDDOperationNode = { + RDDOperationNode( + id = node.getId, + name = node.getName, + cached = node.getCached, + barrier = node.getBarrier, + callsite = node.getCallsite, + outputDeterministicLevel = + DeterministicLevel.withName(node.getOutputDeterministicLevel.toString) + ) + } + + private def serializeRDDOperationEdge(edge: RDDOperationEdge): StoreTypes.RDDOperationEdge = { + val builder = StoreTypes.RDDOperationEdge.newBuilder() + builder.setFromId(edge.fromId) + builder.setToId(edge.toId) + builder.build() + } + + private def deserializeRDDOperationEdge(edge: StoreTypes.RDDOperationEdge): RDDOperationEdge = { + RDDOperationEdge( + fromId = edge.getFromId, + toId = edge.getToId) + } +} diff --git a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala index 5efe56b4449..dab9d9c071f 100644 --- a/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/status/protobuf/KVStoreProtobufSerializerSuite.scala @@ -22,9 +22,11 @@ import java.util.Date import org.apache.spark.{JobExecutionStatus, SparkFunSuite} import org.apache.spark.executor.ExecutorMetrics import org.apache.spark.metrics.ExecutorMetricType +import org.apache.spark.rdd.DeterministicLevel import org.apache.spark.resource.{ExecutorResourceRequest, ResourceInformation, TaskResourceRequest} import org.apache.spark.status._ import org.apache.spark.status.api.v1._ +import org.apache.spark.ui.scope.{RDDOperationEdge, RDDOperationNode} class KVStoreProtobufSerializerSuite extends SparkFunSuite { private val serializer = new KVStoreProtobufSerializer() @@ -773,4 +775,86 @@ class KVStoreProtobufSerializerSuite extends SparkFunSuite { assert(result.info.processLogs(k) == input.info.processLogs(k)) } } + + test("RDD Operation Graph") { + val input = new RDDOperationGraphWrapper( + stageId = 1, + edges = Seq( + RDDOperationEdge(fromId = 2, toId = 3) + ), + outgoingEdges = Seq( + RDDOperationEdge(fromId = 4, toId = 5), + RDDOperationEdge(fromId = 6, toId = 7) + ), + incomingEdges = Seq( + RDDOperationEdge(fromId = 8, toId = 9), + RDDOperationEdge(fromId = 10, toId = 11), + RDDOperationEdge(fromId = 12, toId = 13) + ), + rootCluster = new RDDOperationClusterWrapper( + id = "id_1", + name = "name1", + childNodes = Seq( + RDDOperationNode( + id = 14, + name = "name2", + cached = true, + barrier = false, + callsite = "callsite_1", + outputDeterministicLevel = DeterministicLevel.INDETERMINATE)), + childClusters = Seq(new RDDOperationClusterWrapper( + id = "id_1", + name = "name1", + childNodes = Seq( + RDDOperationNode( + id = 15, + name = "name3", + cached = false, + barrier = true, + callsite = "callsite_2", + outputDeterministicLevel = DeterministicLevel.UNORDERED)), + childClusters = Seq.empty + )) + ) + ) + val bytes = serializer.serialize(input) + val result = serializer.deserialize(bytes, classOf[RDDOperationGraphWrapper]) + + assert(result.stageId == input.stageId) + assert(result.edges.size == input.edges.size) + result.edges.zip(input.edges).foreach { case (e1, e2) => + assert(e1.fromId == e2.fromId) + assert(e1.toId == e2.toId) + } + assert(result.outgoingEdges.size == input.outgoingEdges.size) + result.outgoingEdges.zip(input.outgoingEdges).foreach { case (e1, e2) => + assert(e1.fromId == e2.fromId) + assert(e1.toId == e2.toId) + } + assert(result.incomingEdges.size == input.incomingEdges.size) + result.incomingEdges.zip(input.incomingEdges).foreach { case (e1, e2) => + assert(e1.fromId == e2.fromId) + assert(e1.toId == e2.toId) + } + + def compareClusters(c1: RDDOperationClusterWrapper, c2: RDDOperationClusterWrapper): Unit = { + assert(c1.id == c2.id) + assert(c1.name == c2.name) + assert(c1.childNodes.size == c2.childNodes.size) + c1.childNodes.zip(c2.childNodes).foreach { case (n1, n2) => + assert(n1.id == n2.id) + assert(n1.name == n2.name) + assert(n1.cached == n2.cached) + assert(n1.barrier == n2.barrier) + assert(n1.callsite == n2.callsite) + assert(n1.outputDeterministicLevel == n2.outputDeterministicLevel) + } + assert(c1.childClusters.size == c2.childClusters.size) + c1.childClusters.zip(c2.childClusters).foreach { + case (_c1, _c2) => compareClusters(_c1, _c2) + } + } + + compareClusters(result.rootCluster, input.rootCluster) + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org