This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 6f96a27c965 [SPARK-41890][CORE][SQL][UI] Reduce `toSeq` in 
`RDDOperationGraphWrapperSerializer`/`SparkPlanGraphWrapperSerializer` for 
Scala 2.13
6f96a27c965 is described below

commit 6f96a27c9653c6028e39051a0288cf661ea4b971
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Fri Jan 6 08:18:33 2023 -0600

    [SPARK-41890][CORE][SQL][UI] Reduce `toSeq` in 
`RDDOperationGraphWrapperSerializer`/`SparkPlanGraphWrapperSerializer` for 
Scala 2.13
    
    ### What changes were proposed in this pull request?
    Similar to SPARK-41709, this pr aims to avoid `toSeq` in 
`RDDOperationGraphWrapperSerializer` and `SparkPlanGraphWrapperSerializer` to 
make no performance difference between Scala 2.13 and Scala 2.12 when create ui 
objects from protobuf objects, the `Seq` in related `ui` class also explicitly 
defined as `collection.Seq` in this pr.
    
    ### Why are the changes needed?
    Avoid collection conversion when creating ui objects from protobuf objects 
for Scala 2.13.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass GitHub Actions
    
    Closes #39399 from LuciferYang/SPARK-41709-FOLLOWUP.
    
    Authored-by: yangjie01 <yangji...@baidu.com>
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 .../protobuf/RDDOperationGraphWrapperSerializer.scala       | 10 +++++-----
 .../src/main/scala/org/apache/spark/status/storeTypes.scala | 10 +++++-----
 .../scala/org/apache/spark/ui/scope/RDDOperationGraph.scala |  6 +++---
 project/MimaExcludes.scala                                  | 13 ++++++++++++-
 .../spark/sql/execution/ui/SQLAppStatusListener.scala       |  3 ++-
 .../apache/spark/sql/execution/ui/SQLAppStatusStore.scala   |  8 ++++----
 .../org/apache/spark/sql/execution/ui/SparkPlanGraph.scala  |  9 +++++----
 .../org/apache/spark/status/api/v1/sql/SqlResource.scala    |  7 ++++---
 .../main/scala/org/apache/spark/status/api/v1/sql/api.scala |  6 +++---
 .../protobuf/sql/SparkPlanGraphWrapperSerializer.scala      | 10 +++++-----
 .../apache/spark/status/api/v1/sql/SqlResourceSuite.scala   |  2 +-
 11 files changed, 49 insertions(+), 35 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
index 44622514ac9..c0d86ede198 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/RDDOperationGraphWrapperSerializer.scala
@@ -49,9 +49,9 @@ class RDDOperationGraphWrapperSerializer extends 
ProtobufSerDe {
     val wrapper = StoreTypes.RDDOperationGraphWrapper.parseFrom(bytes)
     new RDDOperationGraphWrapper(
       stageId = wrapper.getStageId.toInt,
-      edges = 
wrapper.getEdgesList.asScala.map(deserializeRDDOperationEdge).toSeq,
-      outgoingEdges = 
wrapper.getOutgoingEdgesList.asScala.map(deserializeRDDOperationEdge).toSeq,
-      incomingEdges = 
wrapper.getIncomingEdgesList.asScala.map(deserializeRDDOperationEdge).toSeq,
+      edges = wrapper.getEdgesList.asScala.map(deserializeRDDOperationEdge),
+      outgoingEdges = 
wrapper.getOutgoingEdgesList.asScala.map(deserializeRDDOperationEdge),
+      incomingEdges = 
wrapper.getIncomingEdgesList.asScala.map(deserializeRDDOperationEdge),
       rootCluster = 
deserializeRDDOperationClusterWrapper(wrapper.getRootCluster)
     )
   }
@@ -75,9 +75,9 @@ class RDDOperationGraphWrapperSerializer extends 
ProtobufSerDe {
     new RDDOperationClusterWrapper(
       id = op.getId,
       name = op.getName,
-      childNodes = 
op.getChildNodesList.asScala.map(deserializeRDDOperationNode).toSeq,
+      childNodes = 
op.getChildNodesList.asScala.map(deserializeRDDOperationNode),
       childClusters =
-        
op.getChildClustersList.asScala.map(deserializeRDDOperationClusterWrapper).toSeq
+        
op.getChildClustersList.asScala.map(deserializeRDDOperationClusterWrapper)
     )
   }
 
diff --git a/core/src/main/scala/org/apache/spark/status/storeTypes.scala 
b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
index 08bc4c89b47..b53455207a0 100644
--- a/core/src/main/scala/org/apache/spark/status/storeTypes.scala
+++ b/core/src/main/scala/org/apache/spark/status/storeTypes.scala
@@ -490,8 +490,8 @@ private[spark] class StreamBlockData(
 private[spark] class RDDOperationClusterWrapper(
     val id: String,
     val name: String,
-    val childNodes: Seq[RDDOperationNode],
-    val childClusters: Seq[RDDOperationClusterWrapper]) {
+    val childNodes: collection.Seq[RDDOperationNode],
+    val childClusters: collection.Seq[RDDOperationClusterWrapper]) {
 
   def toRDDOperationCluster(): RDDOperationCluster = {
     val isBarrier = childNodes.exists(_.barrier)
@@ -508,9 +508,9 @@ private[spark] class RDDOperationClusterWrapper(
 
 private[spark] class RDDOperationGraphWrapper(
     @KVIndexParam val stageId: Int,
-    val edges: Seq[RDDOperationEdge],
-    val outgoingEdges: Seq[RDDOperationEdge],
-    val incomingEdges: Seq[RDDOperationEdge],
+    val edges: collection.Seq[RDDOperationEdge],
+    val outgoingEdges: collection.Seq[RDDOperationEdge],
+    val incomingEdges: collection.Seq[RDDOperationEdge],
     val rootCluster: RDDOperationClusterWrapper) {
 
   def toRDDOperationGraph(): RDDOperationGraph = {
diff --git 
a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala 
b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
index e1f7609dfc7..533749f4b96 100644
--- a/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
+++ b/core/src/main/scala/org/apache/spark/ui/scope/RDDOperationGraph.scala
@@ -38,9 +38,9 @@ import org.apache.spark.storage.StorageLevel
  * the graph from nodes that belong to adjacent graphs.
  */
 private[spark] case class RDDOperationGraph(
-    edges: Seq[RDDOperationEdge],
-    outgoingEdges: Seq[RDDOperationEdge],
-    incomingEdges: Seq[RDDOperationEdge],
+    edges: collection.Seq[RDDOperationEdge],
+    outgoingEdges: collection.Seq[RDDOperationEdge],
+    incomingEdges: collection.Seq[RDDOperationEdge],
     rootCluster: RDDOperationCluster)
 
 /** A node in an RDDOperationGraph. This represents an RDD. */
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 43d0986e9ea..3bb8deb2561 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -158,7 +158,18 @@ object MimaExcludes {
     // [SPARK-41423][CORE] Protobuf serializer for StageDataWrapper for Scala 
2.13
     
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.StageData.rddIds"),
     
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.StageData.accumulatorUpdates"),
-    
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.status.api.v1.StageData.this")
+    
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.status.api.v1.StageData.this"),
+
+    // [SPARK-41890][CORE][SQL][UI] Reduce `toSeq` in 
`RDDOperationGraphWrapperSerializer`/`SparkPlanGraphWrapperSerializer` for 
Scala 2.13
+    
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.sql.ExecutionData.nodes"),
+    
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.sql.ExecutionData.edges"),
+    
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.status.api.v1.sql.ExecutionData.this"),
+    
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.status.api.v1.sql.Node.apply"),
+    
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.sql.Node.metrics"),
+    
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.status.api.v1.sql.Node.copy"),
+    
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.status.api.v1.sql.Node.copy$default$4"),
+    
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.status.api.v1.sql.Node.this"),
+    
ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.status.api.v1.sql.Node.apply")
   )
 
   // Defulat exclude rules
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
index 99718b63353..64c652542ff 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListener.scala
@@ -322,7 +322,8 @@ class SQLAppStatusListener(
     }
   }
 
-  private def toStoredNodes(nodes: Seq[SparkPlanGraphNode]): 
Seq[SparkPlanGraphNodeWrapper] = {
+  private def toStoredNodes(
+      nodes: collection.Seq[SparkPlanGraphNode]): 
collection.Seq[SparkPlanGraphNodeWrapper] = {
     nodes.map {
       case cluster: SparkPlanGraphCluster =>
         val storedCluster = new SparkPlanGraphClusterWrapper(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
index 25cb8c2f043..46827aaa1d9 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SQLAppStatusStore.scala
@@ -109,8 +109,8 @@ class SQLExecutionUIData(
 
 class SparkPlanGraphWrapper(
     @KVIndexParam val executionId: Long,
-    val nodes: Seq[SparkPlanGraphNodeWrapper],
-    val edges: Seq[SparkPlanGraphEdge]) {
+    val nodes: collection.Seq[SparkPlanGraphNodeWrapper],
+    val edges: collection.Seq[SparkPlanGraphEdge]) {
 
   def toSparkPlanGraph(): SparkPlanGraph = {
     SparkPlanGraph(nodes.map(_.toSparkPlanGraphNode()), edges)
@@ -122,8 +122,8 @@ class SparkPlanGraphClusterWrapper(
     val id: Long,
     val name: String,
     val desc: String,
-    val nodes: Seq[SparkPlanGraphNodeWrapper],
-    val metrics: Seq[SQLPlanMetric]) {
+    val nodes: collection.Seq[SparkPlanGraphNodeWrapper],
+    val metrics: collection.Seq[SQLPlanMetric]) {
 
   def toSparkPlanGraphCluster(): SparkPlanGraphCluster = {
     new SparkPlanGraphCluster(id, name, desc,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
index 3b011301421..6163e26e49c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/SparkPlanGraph.scala
@@ -33,7 +33,8 @@ import org.apache.spark.sql.execution.{SparkPlanInfo, 
WholeStageCodegenExec}
  * SparkPlan tree, and each edge represents a parent-child relationship 
between two nodes.
  */
 case class SparkPlanGraph(
-    nodes: Seq[SparkPlanGraphNode], edges: Seq[SparkPlanGraphEdge]) {
+    nodes: collection.Seq[SparkPlanGraphNode],
+    edges: collection.Seq[SparkPlanGraphEdge]) {
 
   def makeDotFile(metrics: Map[Long, String]): String = {
     val dotFile = new StringBuilder
@@ -47,7 +48,7 @@ case class SparkPlanGraph(
   /**
    * All the SparkPlanGraphNodes, including those inside of WholeStageCodegen.
    */
-  val allNodes: Seq[SparkPlanGraphNode] = {
+  val allNodes: collection.Seq[SparkPlanGraphNode] = {
     nodes.flatMap {
       case cluster: SparkPlanGraphCluster => cluster.nodes :+ cluster
       case node => Seq(node)
@@ -157,7 +158,7 @@ class SparkPlanGraphNode(
     val id: Long,
     val name: String,
     val desc: String,
-    val metrics: Seq[SQLPlanMetric]) {
+    val metrics: collection.Seq[SQLPlanMetric]) {
 
   def makeDotNode(metricsValue: Map[Long, String]): String = {
     val builder = new mutable.StringBuilder("<b>" + name + "</b>")
@@ -198,7 +199,7 @@ class SparkPlanGraphCluster(
     name: String,
     desc: String,
     val nodes: mutable.ArrayBuffer[SparkPlanGraphNode],
-    metrics: Seq[SQLPlanMetric])
+    metrics: collection.Seq[SQLPlanMetric])
   extends SparkPlanGraphNode(id, name, desc, metrics) {
 
   override def makeDotNode(metricsValue: Map[Long, String]): String = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala 
b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala
index 4dd96e5ae25..a6fb07284b6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/SqlResource.scala
@@ -115,8 +115,8 @@ private[v1] class SqlResource extends BaseAppResource {
       edges)
   }
 
-  private def printableMetrics(allNodes: Seq[SparkPlanGraphNode],
-    metricValues: Map[Long, String]): Seq[Node] = {
+  private def printableMetrics(allNodes: collection.Seq[SparkPlanGraphNode],
+    metricValues: Map[Long, String]): collection.Seq[Node] = {
 
     def getMetric(metricValues: Map[Long, String], accumulatorId: Long,
       metricName: String): Option[Metric] = {
@@ -138,7 +138,8 @@ private[v1] class SqlResource extends BaseAppResource {
     nodes.sortBy(_.nodeId).reverse
   }
 
-  private def getNodeIdAndWSCGIdMap(allNodes: Seq[SparkPlanGraphNode]): 
Map[Long, Option[Long]] = {
+  private def getNodeIdAndWSCGIdMap(
+      allNodes: collection.Seq[SparkPlanGraphNode]): Map[Long, Option[Long]] = 
{
     val wscgNodes = 
allNodes.filter(_.name.trim.startsWith(WHOLE_STAGE_CODEGEN))
     val nodeIdAndWSCGIdMap: Map[Long, Option[Long]] = wscgNodes.flatMap {
       _ match {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala 
b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala
index 0ddf66718bc..c0f5c9c27ec 100644
--- a/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala
+++ b/sql/core/src/main/scala/org/apache/spark/status/api/v1/sql/api.scala
@@ -30,13 +30,13 @@ class ExecutionData private[spark] (
     val runningJobIds: Seq[Int],
     val successJobIds: Seq[Int],
     val failedJobIds: Seq[Int],
-    val nodes: Seq[Node],
-    val edges: Seq[SparkPlanGraphEdge])
+    val nodes: collection.Seq[Node],
+    val edges: collection.Seq[SparkPlanGraphEdge])
 
 case class Node private[spark](
     nodeId: Long,
     nodeName: String,
     wholeStageCodegenId: Option[Long] = None,
-    metrics: Seq[Metric])
+    metrics: collection.Seq[Metric])
 
 case class Metric private[spark] (name: String, value: String)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
index a8f715564fc..db63fd6afe2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/status/protobuf/sql/SparkPlanGraphWrapperSerializer.scala
@@ -44,8 +44,8 @@ class SparkPlanGraphWrapperSerializer extends ProtobufSerDe {
     val wrapper = StoreTypes.SparkPlanGraphWrapper.parseFrom(bytes)
     new SparkPlanGraphWrapper(
       executionId = wrapper.getExecutionId,
-      nodes = 
wrapper.getNodesList.asScala.map(deserializeSparkPlanGraphNodeWrapper).toSeq,
-      edges = 
wrapper.getEdgesList.asScala.map(deserializeSparkPlanGraphEdge).toSeq
+      nodes = 
wrapper.getNodesList.asScala.map(deserializeSparkPlanGraphNodeWrapper),
+      edges = wrapper.getEdgesList.asScala.map(deserializeSparkPlanGraphEdge)
     )
   }
 
@@ -102,7 +102,7 @@ class SparkPlanGraphWrapperSerializer extends ProtobufSerDe 
{
       id = node.getId,
       name = node.getName,
       desc = node.getDesc,
-      metrics = 
node.getMetricsList.asScala.map(SQLPlanMetricSerializer.deserialize).toSeq
+      metrics = 
node.getMetricsList.asScala.map(SQLPlanMetricSerializer.deserialize)
     )
   }
 
@@ -128,8 +128,8 @@ class SparkPlanGraphWrapperSerializer extends ProtobufSerDe 
{
       id = cluster.getId,
       name = cluster.getName,
       desc = cluster.getDesc,
-      nodes = 
cluster.getNodesList.asScala.map(deserializeSparkPlanGraphNodeWrapper).toSeq,
-      metrics = 
cluster.getMetricsList.asScala.map(SQLPlanMetricSerializer.deserialize).toSeq
+      nodes = 
cluster.getNodesList.asScala.map(deserializeSparkPlanGraphNodeWrapper),
+      metrics = 
cluster.getMetricsList.asScala.map(SQLPlanMetricSerializer.deserialize)
     )
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala
index 336779988fc..f6fd1fc42ce 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/status/api/v1/sql/SqlResourceSuite.scala
@@ -56,7 +56,7 @@ object SqlResourceSuite {
 
   val edges: Seq[SparkPlanGraphEdge] = Seq(SparkPlanGraphEdge(3, 2))
 
-  val nodesWhenCodegenIsOff: Seq[SparkPlanGraphNode] =
+  val nodesWhenCodegenIsOff: collection.Seq[SparkPlanGraphNode] =
     SparkPlanGraph(nodes, edges).allNodes.filterNot(_.name == 
WHOLE_STAGE_CODEGEN_1)
 
   val metrics: Seq[SQLPlanMetric] = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to