This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 6f96a27c965 [SPARK-41890][CORE][SQL][UI] Reduce `toSeq` in `RDDOperationGraphWrapperSerializer`/`SparkPlanGraphWrapperSerializer` for Scala 2.13 6f96a27c965 is described below commit 6f96a27c9653c6028e39051a0288cf661ea4b971 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Fri Jan 6 08:18:33 2023 -0600 [SPARK-41890][CORE][SQL][UI] Reduce `toSeq` in `RDDOperationGraphWrapperSerializer`/`SparkPlanGraphWrapperSerializer` for Scala 2.13 ### What changes were proposed in this pull request? Similar to SPARK-41709, this pr aims to avoid `toSeq` in `RDDOperationGraphWrapperSerializer` and `SparkPlanGraphWrapperSerializer` to make no performance difference between Scala 2.13 and Scala 2.12 when create ui objects from protobuf objects, the `Seq` in related `ui` class also explicitly defined as `collection.Seq` in this pr. ### Why are the changes needed? Avoid collection conversion when creating ui objects from protobuf objects for Scala 2.13. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions Closes #39399 from LuciferYang/SPARK-41709-FOLLOWUP. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../protobuf/RDDOperationGraphWrapperSerializer.scala | 10 +++++----- .../src/main/scala/org/apache/spark/status/storeTypes.scala | 10 +++++----- .../scala/org/apache/spark/ui/scope/RDDOperationGraph.scala | 6 +++--- project/MimaExcludes.scala | 13 ++++++++++++- .../spark/sql/execution/ui/SQLAppStatusListener.scala | 3 ++- .../apache/spark/sql/execution/ui/SQLAppStatusStore.scala | 8 ++++---- .../org/apache/spark/sql/execution/ui/SparkPlanGraph.scala | 9 +++++---- .../org/apache/spark/status/api/v1/sql/SqlResource.scala | 7 ++++--- .../main/scala/org/apache/spark/status/api/v1/sql/api.scala | 6 +++--- .../protobuf/sql/SparkPlanGraphWrapperSerializer.scala | 10 +++++----- .../apache/spark/status/api/v1/sql/SqlResourceSuite.scala | 2 +- 11 files changed, 49 insertions(+), 35 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala index 44622514ac9..c0d86ede198 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala @@ -49,9 +49,9 @@ class RDDOperationGraphWrapperSerializer extends ProtobufSerDe { val wrapper = StoreTypes.RDDOperationGraphWrapper.parseFrom(bytes) new RDDOperationGraphWrapper( stageId = wrapper.getStageId.toInt, - edges = wrapper.getEdgesList.asScala.map(deserializeRDDOperationEdge).toSeq, - outgoingEdges = wrapper.getOutgoingEdgesList.asScala.map(deserializeRDDOperationEdge).toSeq, - incomingEdges = wrapper.getIncomingEdgesList.asScala.map(deserializeRDDOperationEdge).toSeq, + edges = wrapper.getEdgesList.asScala.map(deserializeRDDOperationEdge), + outgoingEdges = wrapper.getOutgoingEdgesList.asScala.map(deserializeRDDOperationEdge), + incomingEdges = wrapper.getIncomingEdgesList.asScala.map(deserializeRDDOperationEdge), rootCluster = deserializeRDDOperationClusterWrapper(wrapper.getRootCluster) ) } @@ -75,9 +75,9 @@ class RDDOperationGraphWrapperSerializer extends ProtobufSerDe { new RDDOperationClusterWrapper( id = op.getId, name = op.getName, - childNodes = op.getChildNodesList.asScala.map(deserializeRDDOperationNode).toSeq, + childNodes = op.getChildNodesList.asScala.map(deserializeRDDOperationNode), childClusters = - op.getChildClustersList.asScala.map(deserializeRDDOperationClusterWrapper).toSeq + op.getChildClustersList.asScala.map(deserializeRDDOperationClusterWrapper) ) } diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala b/core/src/main/scala/org/apache/spark/status/storeTypes.scala index 08bc4c89b47..b53455207a0 100644 --- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala +++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala @@ -490,8 +490,8 @@ private[spark] class StreamBlockData( private[spark] class RDDOperationClusterWrapper( val id: String, val name: String, - val childNodes: Seq[RDDOperationNode], - val childClusters: Seq[RDDOperationClusterWrapper]) { + val childNodes: collection.Seq[RDDOperationNode], + val childClusters: collection.Seq[RDDOperationClusterWrapper]) { def toRDDOperationCluster(): RDDOperationCluster = { val isBarrier = childNodes.exists(_.barrier) @@ -508,9 +508,9 @@ private[spark] class RDDOperationClusterWrapper( private[spark] class RDDOperationGraphWrapper( @KVIndexParam val stageId: Int, - val edges: Seq[RDDOperationEdge], - val outgoingEdges: Seq[RDDOperationEdge], - val incomingEdges: Seq[RDDOperationEdge], + val edges: collection.Seq[RDDOperationEdge], + val outgoingEdges: collection.Seq[RDDOperationEdge], + val incomingEdges: collection.Seq[RDDOperationEdge], val rootCluster: RDDOperationClusterWrapper) { def toRDDOperationGraph(): RDDOperationGraph = { diff --git a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala index e1f7609dfc7..533749f4b96 100644 --- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala +++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala @@ -38,9 +38,9 @@ import org.apache.spark.storage.StorageLevel * the graph from nodes that belong to adjacent graphs. */ private[spark] case class RDDOperationGraph( - edges: Seq[RDDOperationEdge], - outgoingEdges: Seq[RDDOperationEdge], - incomingEdges: Seq[RDDOperationEdge], + edges: collection.Seq[RDDOperationEdge], + outgoingEdges: collection.Seq[RDDOperationEdge], + incomingEdges: collection.Seq[RDDOperationEdge], rootCluster: RDDOperationCluster) /** A node in an RDDOperationGraph. This represents an RDD. */ diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 43d0986e9ea..3bb8deb2561 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -158,7 +158,18 @@ object MimaExcludes { // [SPARK-41423][CORE] Protobuf serializer for StageDataWrapper for Scala 2.13 ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.StageData.rddIds"), ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.StageData.accumulatorUpdates"), - ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.status.api.v1.StageData.this") + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.status.api.v1.StageData.this"), + + // [SPARK-41890][CORE][SQL][UI] Reduce `toSeq` in `RDDOperationGraphWrapperSerializer`/`SparkPlanGraphWrapperSerializer` for Scala 2.13 + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.sql.ExecutionData.nodes"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.sql.ExecutionData.edges"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.status.api.v1.sql.ExecutionData.this"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.status.api.v1.sql.Node.apply"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.sql.Node.metrics"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.status.api.v1.sql.Node.copy"), + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.sql.Node.copy$default$4"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.status.api.v1.sql.Node.this"), + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.status.api.v1.sql.Node.apply") ) // Defulat exclude rules diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala index 99718b63353..64c652542ff 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala @@ -322,7 +322,8 @@ class SQLAppStatusListener( } } - private def toStoredNodes(nodes: Seq[SparkPlanGraphNode]): Seq[SparkPlanGraphNodeWrapper] = { + private def toStoredNodes( + nodes: collection.Seq[SparkPlanGraphNode]): collection.Seq[SparkPlanGraphNodeWrapper] = { nodes.map { case cluster: SparkPlanGraphCluster => val storedCluster = new SparkPlanGraphClusterWrapper( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala index 25cb8c2f043..46827aaa1d9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala @@ -109,8 +109,8 @@ class SQLExecutionUIData( class SparkPlanGraphWrapper( @KVIndexParam val executionId: Long, - val nodes: Seq[SparkPlanGraphNodeWrapper], - val edges: Seq[SparkPlanGraphEdge]) { + val nodes: collection.Seq[SparkPlanGraphNodeWrapper], + val edges: collection.Seq[SparkPlanGraphEdge]) { def toSparkPlanGraph(): SparkPlanGraph = { SparkPlanGraph(nodes.map(_.toSparkPlanGraphNode()), edges) @@ -122,8 +122,8 @@ class SparkPlanGraphClusterWrapper( val id: Long, val name: String, val desc: String, - val nodes: Seq[SparkPlanGraphNodeWrapper], - val metrics: Seq[SQLPlanMetric]) { + val nodes: collection.Seq[SparkPlanGraphNodeWrapper], + val metrics: collection.Seq[SQLPlanMetric]) { def toSparkPlanGraphCluster(): SparkPlanGraphCluster = { new SparkPlanGraphCluster(id, name, desc, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala index 3b011301421..6163e26e49c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala @@ -33,7 +33,8 @@ import org.apache.spark.sql.execution.{SparkPlanInfo, WholeStageCodegenExec} * SparkPlan tree, and each edge represents a parent-child relationship between two nodes. */ case class SparkPlanGraph( - nodes: Seq[SparkPlanGraphNode], edges: Seq[SparkPlanGraphEdge]) { + nodes: collection.Seq[SparkPlanGraphNode], + edges: collection.Seq[SparkPlanGraphEdge]) { def makeDotFile(metrics: Map[Long, String]): String = { val dotFile = new StringBuilder @@ -47,7 +48,7 @@ case class SparkPlanGraph( /** * All the SparkPlanGraphNodes, including those inside of WholeStageCodegen. */ - val allNodes: Seq[SparkPlanGraphNode] = { + val allNodes: collection.Seq[SparkPlanGraphNode] = { nodes.flatMap { case cluster: SparkPlanGraphCluster => cluster.nodes :+ cluster case node => Seq(node) @@ -157,7 +158,7 @@ class SparkPlanGraphNode( val id: Long, val name: String, val desc: String, - val metrics: Seq[SQLPlanMetric]) { + val metrics: collection.Seq[SQLPlanMetric]) { def makeDotNode(metricsValue: Map[Long, String]): String = { val builder = new mutable.StringBuilder("<b>" + name + "</b>") @@ -198,7 +199,7 @@ class SparkPlanGraphCluster( name: String, desc: String, val nodes: mutable.ArrayBuffer[SparkPlanGraphNode], - metrics: Seq[SQLPlanMetric]) + metrics: collection.Seq[SQLPlanMetric]) extends SparkPlanGraphNode(id, name, desc, metrics) { override def makeDotNode(metricsValue: Map[Long, String]): String = { diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala index 4dd96e5ae25..a6fb07284b6 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala @@ -115,8 +115,8 @@ private[v1] class SqlResource extends BaseAppResource { edges) } - private def printableMetrics(allNodes: Seq[SparkPlanGraphNode], - metricValues: Map[Long, String]): Seq[Node] = { + private def printableMetrics(allNodes: collection.Seq[SparkPlanGraphNode], + metricValues: Map[Long, String]): collection.Seq[Node] = { def getMetric(metricValues: Map[Long, String], accumulatorId: Long, metricName: String): Option[Metric] = { @@ -138,7 +138,8 @@ private[v1] class SqlResource extends BaseAppResource { nodes.sortBy(_.nodeId).reverse } - private def getNodeIdAndWSCGIdMap(allNodes: Seq[SparkPlanGraphNode]): Map[Long, Option[Long]] = { + private def getNodeIdAndWSCGIdMap( + allNodes: collection.Seq[SparkPlanGraphNode]): Map[Long, Option[Long]] = { val wscgNodes = allNodes.filter(_.name.trim.startsWith(WHOLE_STAGE_CODEGEN)) val nodeIdAndWSCGIdMap: Map[Long, Option[Long]] = wscgNodes.flatMap { _ match { diff --git a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala index 0ddf66718bc..c0f5c9c27ec 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala @@ -30,13 +30,13 @@ class ExecutionData private[spark] ( val runningJobIds: Seq[Int], val successJobIds: Seq[Int], val failedJobIds: Seq[Int], - val nodes: Seq[Node], - val edges: Seq[SparkPlanGraphEdge]) + val nodes: collection.Seq[Node], + val edges: collection.Seq[SparkPlanGraphEdge]) case class Node private[spark]( nodeId: Long, nodeName: String, wholeStageCodegenId: Option[Long] = None, - metrics: Seq[Metric]) + metrics: collection.Seq[Metric]) case class Metric private[spark] (name: String, value: String) diff --git a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala index a8f715564fc..db63fd6afe2 100644 --- a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala +++ b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala @@ -44,8 +44,8 @@ class SparkPlanGraphWrapperSerializer extends ProtobufSerDe { val wrapper = StoreTypes.SparkPlanGraphWrapper.parseFrom(bytes) new SparkPlanGraphWrapper( executionId = wrapper.getExecutionId, - nodes = wrapper.getNodesList.asScala.map(deserializeSparkPlanGraphNodeWrapper).toSeq, - edges = wrapper.getEdgesList.asScala.map(deserializeSparkPlanGraphEdge).toSeq + nodes = wrapper.getNodesList.asScala.map(deserializeSparkPlanGraphNodeWrapper), + edges = wrapper.getEdgesList.asScala.map(deserializeSparkPlanGraphEdge) ) } @@ -102,7 +102,7 @@ class SparkPlanGraphWrapperSerializer extends ProtobufSerDe { id = node.getId, name = node.getName, desc = node.getDesc, - metrics = node.getMetricsList.asScala.map(SQLPlanMetricSerializer.deserialize).toSeq + metrics = node.getMetricsList.asScala.map(SQLPlanMetricSerializer.deserialize) ) } @@ -128,8 +128,8 @@ class SparkPlanGraphWrapperSerializer extends ProtobufSerDe { id = cluster.getId, name = cluster.getName, desc = cluster.getDesc, - nodes = cluster.getNodesList.asScala.map(deserializeSparkPlanGraphNodeWrapper).toSeq, - metrics = cluster.getMetricsList.asScala.map(SQLPlanMetricSerializer.deserialize).toSeq + nodes = cluster.getNodesList.asScala.map(deserializeSparkPlanGraphNodeWrapper), + metrics = cluster.getMetricsList.asScala.map(SQLPlanMetricSerializer.deserialize) ) } } diff --git a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala index 336779988fc..f6fd1fc42ce 100644 --- a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala @@ -56,7 +56,7 @@ object SqlResourceSuite { val edges: Seq[SparkPlanGraphEdge] = Seq(SparkPlanGraphEdge(3, 2)) - val nodesWhenCodegenIsOff: Seq[SparkPlanGraphNode] = + val nodesWhenCodegenIsOff: collection.Seq[SparkPlanGraphNode] = SparkPlanGraph(nodes, edges).allNodes.filterNot(_.name == WHOLE_STAGE_CODEGEN_1) val metrics: Seq[SQLPlanMetric] = { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org