This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new bfb3ffe  [SPARK-27682][CORE][GRAPHX][MLLIB] Replace use of collections 
and methods that will be removed in Scala 2.13 with work-alikes
bfb3ffe is described below

commit bfb3ffe9b33a403a1f3b6f5407d34a477ce62c85
Author: Sean Owen <sean.o...@databricks.com>
AuthorDate: Wed May 15 09:29:12 2019 -0500

    [SPARK-27682][CORE][GRAPHX][MLLIB] Replace use of collections and methods 
that will be removed in Scala 2.13 with work-alikes
    
    ## What changes were proposed in this pull request?
    
    This replaces use of collection classes like `MutableList` and `ArrayStack` 
with workalikes that are available in 2.12, as they will be removed in 2.13. It 
also removes use of `.to[Collection]` as its uses was superfluous anyway. 
Removing `collection.breakOut` will have to wait until 2.13
    
    ## How was this patch tested?
    
    Existing tests
    
    Closes #24586 from srowen/SPARK-27682.
    
    Authored-by: Sean Owen <sean.o...@databricks.com>
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
---
 .../org/apache/spark/scheduler/DAGScheduler.scala  | 50 +++++++++++-----------
 .../spark/util/random/RandomSamplerSuite.scala     |  4 +-
 .../apache/spark/graphx/lib/LabelPropagation.scala | 10 +++--
 .../apache/spark/graphx/lib/ShortestPaths.scala    | 10 +++--
 .../apache/spark/graphx/lib/PageRankSuite.scala    |  2 +-
 .../apache/spark/ml/tree/impl/RandomForest.scala   | 22 +++++-----
 .../spark/ml/tree/impl/RandomForestSuite.scala     | 10 ++---
 .../org/apache/spark/mllib/feature/PCASuite.scala  |  4 +-
 .../integrationtest/KubernetesTestComponents.scala |  2 +-
 .../expressions/EquivalentExpressions.scala        |  6 +--
 .../org/apache/spark/sql/types/StructType.scala    | 11 +++--
 11 files changed, 71 insertions(+), 60 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 1d4972e..de57807 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -24,7 +24,7 @@ import java.util.concurrent.atomic.AtomicInteger
 
 import scala.annotation.tailrec
 import scala.collection.Map
-import scala.collection.mutable.{ArrayStack, HashMap, HashSet}
+import scala.collection.mutable.{HashMap, HashSet, ListBuffer}
 import scala.concurrent.duration._
 import scala.util.control.NonFatal
 
@@ -468,21 +468,21 @@ private[spark] class DAGScheduler(
 
   /** Find ancestor shuffle dependencies that are not registered in 
shuffleToMapStage yet */
   private def getMissingAncestorShuffleDependencies(
-      rdd: RDD[_]): ArrayStack[ShuffleDependency[_, _, _]] = {
-    val ancestors = new ArrayStack[ShuffleDependency[_, _, _]]
+      rdd: RDD[_]): ListBuffer[ShuffleDependency[_, _, _]] = {
+    val ancestors = new ListBuffer[ShuffleDependency[_, _, _]]
     val visited = new HashSet[RDD[_]]
     // We are manually maintaining a stack here to prevent StackOverflowError
     // caused by recursively visiting
-    val waitingForVisit = new ArrayStack[RDD[_]]
-    waitingForVisit.push(rdd)
+    val waitingForVisit = new ListBuffer[RDD[_]]
+    waitingForVisit += rdd
     while (waitingForVisit.nonEmpty) {
-      val toVisit = waitingForVisit.pop()
+      val toVisit = waitingForVisit.remove(0)
       if (!visited(toVisit)) {
         visited += toVisit
         getShuffleDependencies(toVisit).foreach { shuffleDep =>
           if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
-            ancestors.push(shuffleDep)
-            waitingForVisit.push(shuffleDep.rdd)
+            ancestors.prepend(shuffleDep)
+            waitingForVisit.prepend(shuffleDep.rdd)
           } // Otherwise, the dependency and its ancestors have already been 
registered.
         }
       }
@@ -506,17 +506,17 @@ private[spark] class DAGScheduler(
       rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
     val parents = new HashSet[ShuffleDependency[_, _, _]]
     val visited = new HashSet[RDD[_]]
-    val waitingForVisit = new ArrayStack[RDD[_]]
-    waitingForVisit.push(rdd)
+    val waitingForVisit = new ListBuffer[RDD[_]]
+    waitingForVisit += rdd
     while (waitingForVisit.nonEmpty) {
-      val toVisit = waitingForVisit.pop()
+      val toVisit = waitingForVisit.remove(0)
       if (!visited(toVisit)) {
         visited += toVisit
         toVisit.dependencies.foreach {
           case shuffleDep: ShuffleDependency[_, _, _] =>
             parents += shuffleDep
           case dependency =>
-            waitingForVisit.push(dependency.rdd)
+            waitingForVisit.prepend(dependency.rdd)
         }
       }
     }
@@ -529,10 +529,10 @@ private[spark] class DAGScheduler(
    */
   private def traverseParentRDDsWithinStage(rdd: RDD[_], predicate: RDD[_] => 
Boolean): Boolean = {
     val visited = new HashSet[RDD[_]]
-    val waitingForVisit = new ArrayStack[RDD[_]]
-    waitingForVisit.push(rdd)
+    val waitingForVisit = new ListBuffer[RDD[_]]
+    waitingForVisit += rdd
     while (waitingForVisit.nonEmpty) {
-      val toVisit = waitingForVisit.pop()
+      val toVisit = waitingForVisit.remove(0)
       if (!visited(toVisit)) {
         if (!predicate(toVisit)) {
           return false
@@ -542,7 +542,7 @@ private[spark] class DAGScheduler(
           case _: ShuffleDependency[_, _, _] =>
             // Not within the same stage with current rdd, do nothing.
           case dependency =>
-            waitingForVisit.push(dependency.rdd)
+            waitingForVisit.prepend(dependency.rdd)
         }
       }
     }
@@ -554,7 +554,8 @@ private[spark] class DAGScheduler(
     val visited = new HashSet[RDD[_]]
     // We are manually maintaining a stack here to prevent StackOverflowError
     // caused by recursively visiting
-    val waitingForVisit = new ArrayStack[RDD[_]]
+    val waitingForVisit = new ListBuffer[RDD[_]]
+    waitingForVisit += stage.rdd
     def visit(rdd: RDD[_]) {
       if (!visited(rdd)) {
         visited += rdd
@@ -568,15 +569,14 @@ private[spark] class DAGScheduler(
                   missing += mapStage
                 }
               case narrowDep: NarrowDependency[_] =>
-                waitingForVisit.push(narrowDep.rdd)
+                waitingForVisit.prepend(narrowDep.rdd)
             }
           }
         }
       }
     }
-    waitingForVisit.push(stage.rdd)
     while (waitingForVisit.nonEmpty) {
-      visit(waitingForVisit.pop())
+      visit(waitingForVisit.remove(0))
     }
     missing.toList
   }
@@ -2000,7 +2000,8 @@ private[spark] class DAGScheduler(
     val visitedRdds = new HashSet[RDD[_]]
     // We are manually maintaining a stack here to prevent StackOverflowError
     // caused by recursively visiting
-    val waitingForVisit = new ArrayStack[RDD[_]]
+    val waitingForVisit = new ListBuffer[RDD[_]]
+    waitingForVisit += stage.rdd
     def visit(rdd: RDD[_]) {
       if (!visitedRdds(rdd)) {
         visitedRdds += rdd
@@ -2009,17 +2010,16 @@ private[spark] class DAGScheduler(
             case shufDep: ShuffleDependency[_, _, _] =>
               val mapStage = getOrCreateShuffleMapStage(shufDep, 
stage.firstJobId)
               if (!mapStage.isAvailable) {
-                waitingForVisit.push(mapStage.rdd)
+                waitingForVisit.prepend(mapStage.rdd)
               }  // Otherwise there's no need to follow the dependency back
             case narrowDep: NarrowDependency[_] =>
-              waitingForVisit.push(narrowDep.rdd)
+              waitingForVisit.prepend(narrowDep.rdd)
           }
         }
       }
     }
-    waitingForVisit.push(stage.rdd)
     while (waitingForVisit.nonEmpty) {
-      visit(waitingForVisit.pop())
+      visit(waitingForVisit.remove(0))
     }
     visitedRdds.contains(target.rdd)
   }
diff --git 
a/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala 
b/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala
index c2e3830..fef514e 100644
--- a/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala
@@ -333,7 +333,7 @@ class RandomSamplerSuite extends SparkFunSuite with 
Matchers {
 
     // ArrayBuffer iterator (indexable type)
     d = medianKSD(
-      
gaps(sampler.sample(Iterator.from(0).take(20*sampleSize).to[ArrayBuffer].iterator)),
+      
gaps(sampler.sample(ArrayBuffer(Iterator.from(0).take(20*sampleSize).toSeq: 
_*).iterator)),
       gaps(sample(Iterator.from(0), 0.1)))
     d should be < D
 
@@ -557,7 +557,7 @@ class RandomSamplerSuite extends SparkFunSuite with 
Matchers {
 
     // ArrayBuffer iterator (indexable type)
     d = medianKSD(
-      
gaps(sampler.sample(Iterator.from(0).take(20*sampleSize).to[ArrayBuffer].iterator)),
+      
gaps(sampler.sample(ArrayBuffer(Iterator.from(0).take(20*sampleSize).toSeq: 
_*).iterator)),
       gaps(sampleWR(Iterator.from(0), 0.1)))
     d should be < D
 
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala
index 507d21d..7381f59 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/LabelPropagation.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.graphx.lib
 
+import scala.collection.{mutable, Map}
 import scala.reflect.ClassTag
 
 import org.apache.spark.graphx._
@@ -51,11 +52,14 @@ object LabelPropagation {
     }
     def mergeMessage(count1: Map[VertexId, Long], count2: Map[VertexId, Long])
       : Map[VertexId, Long] = {
-      (count1.keySet ++ count2.keySet).map { i =>
+      // Mimics the optimization of breakOut, not present in Scala 2.13, while 
working in 2.12
+      val map = mutable.Map[VertexId, Long]()
+      (count1.keySet ++ count2.keySet).foreach { i =>
         val count1Val = count1.getOrElse(i, 0L)
         val count2Val = count2.getOrElse(i, 0L)
-        i -> (count1Val + count2Val)
-      }(collection.breakOut)
+        map.put(i, count1Val + count2Val)
+      }
+      map
     }
     def vertexProgram(vid: VertexId, attr: Long, message: Map[VertexId, 
Long]): VertexId = {
       if (message.isEmpty) attr else message.maxBy(_._2)._1
diff --git 
a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala 
b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala
index 9edfe58..a410473 100644
--- a/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala
+++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/ShortestPaths.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.graphx.lib
 
+import scala.collection.{mutable, Map}
 import scala.reflect.ClassTag
 
 import org.apache.spark.graphx._
@@ -34,9 +35,12 @@ object ShortestPaths extends Serializable {
   private def incrementMap(spmap: SPMap): SPMap = spmap.map { case (v, d) => v 
-> (d + 1) }
 
   private def addMaps(spmap1: SPMap, spmap2: SPMap): SPMap = {
-    (spmap1.keySet ++ spmap2.keySet).map {
-      k => k -> math.min(spmap1.getOrElse(k, Int.MaxValue), 
spmap2.getOrElse(k, Int.MaxValue))
-    }(collection.breakOut)
+    // Mimics the optimization of breakOut, not present in Scala 2.13, while 
working in 2.12
+    val map = mutable.Map[VertexId, Int]()
+    (spmap1.keySet ++ spmap2.keySet).foreach { k =>
+      map.put(k, math.min(spmap1.getOrElse(k, Int.MaxValue), 
spmap2.getOrElse(k, Int.MaxValue)))
+    }
+    map
   }
 
   /**
diff --git 
a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
index 1e4c6c7..d8f1c49 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/PageRankSuite.scala
@@ -24,7 +24,7 @@ import org.apache.spark.graphx.util.GraphGenerators
 
 object GridPageRank {
   def apply(nRows: Int, nCols: Int, nIter: Int, resetProb: Double): 
Seq[(VertexId, Double)] = {
-    val inNbrs = Array.fill(nRows * 
nCols)(collection.mutable.MutableList.empty[Int])
+    val inNbrs = Array.fill(nRows * 
nCols)(collection.mutable.ArrayBuffer.empty[Int])
     val outDegree = Array.fill(nRows * nCols)(0)
     // Convert row column address into vertex ids (row major order)
     def sub2ind(r: Int, c: Int): Int = r * nCols + c
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala 
b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
index b041dd4..7921823 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
@@ -194,14 +194,16 @@ private[spark] object RandomForest extends Logging with 
Serializable {
       training the same tree in the next iteration.  This focus allows us to 
send fewer trees to
       workers on each iteration; see topNodesForGroup below.
      */
-    val nodeStack = new mutable.ArrayStack[(Int, LearningNode)]
+    val nodeStack = new mutable.ListBuffer[(Int, LearningNode)]
 
     val rng = new Random()
     rng.setSeed(seed)
 
     // Allocate and queue root nodes.
     val topNodes = 
Array.fill[LearningNode](numTrees)(LearningNode.emptyNode(nodeIndex = 1))
-    Range(0, numTrees).foreach(treeIndex => nodeStack.push((treeIndex, 
topNodes(treeIndex))))
+    for (treeIndex <- 0 until numTrees) {
+      nodeStack.prepend((treeIndex, topNodes(treeIndex)))
+    }
 
     timer.stop("init")
 
@@ -398,7 +400,7 @@ private[spark] object RandomForest extends Logging with 
Serializable {
       nodesForGroup: Map[Int, Array[LearningNode]],
       treeToNodeToIndexInfo: Map[Int, Map[Int, NodeIndexInfo]],
       splits: Array[Array[Split]],
-      nodeStack: mutable.ArrayStack[(Int, LearningNode)],
+      nodeStack: mutable.ListBuffer[(Int, LearningNode)],
       timer: TimeTracker = new TimeTracker,
       nodeIdCache: Option[NodeIdCache] = None): Unit = {
 
@@ -639,10 +641,10 @@ private[spark] object RandomForest extends Logging with 
Serializable {
 
           // enqueue left child and right child if they are not leaves
           if (!leftChildIsLeaf) {
-            nodeStack.push((treeIndex, node.leftChild.get))
+            nodeStack.prepend((treeIndex, node.leftChild.get))
           }
           if (!rightChildIsLeaf) {
-            nodeStack.push((treeIndex, node.rightChild.get))
+            nodeStack.prepend((treeIndex, node.rightChild.get))
           }
 
           logDebug("leftChildIndex = " + node.leftChild.get.id +
@@ -1042,8 +1044,8 @@ private[spark] object RandomForest extends Logging with 
Serializable {
       var partNumSamples = 0.0
       var unweightedNumSamples = 0.0
       featureSamples.foreach { case (sampleWeight, feature) =>
-        partValueCountMap(feature) = partValueCountMap.getOrElse(feature, 0.0) 
+ sampleWeight;
-        partNumSamples += sampleWeight;
+        partValueCountMap(feature) = partValueCountMap.getOrElse(feature, 0.0) 
+ sampleWeight
+        partNumSamples += sampleWeight
         unweightedNumSamples += 1.0
       }
 
@@ -1131,7 +1133,7 @@ private[spark] object RandomForest extends Logging with 
Serializable {
    *          The feature indices are None if not subsampling features.
    */
   private[tree] def selectNodesToSplit(
-      nodeStack: mutable.ArrayStack[(Int, LearningNode)],
+      nodeStack: mutable.ListBuffer[(Int, LearningNode)],
       maxMemoryUsage: Long,
       metadata: DecisionTreeMetadata,
       rng: Random): (Map[Int, Array[LearningNode]], Map[Int, Map[Int, 
NodeIndexInfo]]) = {
@@ -1146,7 +1148,7 @@ private[spark] object RandomForest extends Logging with 
Serializable {
     // so we allow one iteration if memUsage == 0.
     var groupDone = false
     while (nodeStack.nonEmpty && !groupDone) {
-      val (treeIndex, node) = nodeStack.top
+      val (treeIndex, node) = nodeStack.head
       // Choose subset of features for node (if subsampling).
       val featureSubset: Option[Array[Int]] = if 
(metadata.subsamplingFeatures) {
         Some(SamplingUtils.reservoirSampleAndCount(Range(0,
@@ -1157,7 +1159,7 @@ private[spark] object RandomForest extends Logging with 
Serializable {
       // Check if enough memory remains to add this node to the group.
       val nodeMemUsage = RandomForest.aggregateSizeForNode(metadata, 
featureSubset) * 8L
       if (memUsage + nodeMemUsage <= maxMemoryUsage || memUsage == 0) {
-        nodeStack.pop()
+        nodeStack.remove(0)
         mutableNodesForGroup.getOrElseUpdate(treeIndex, new 
mutable.ArrayBuffer[LearningNode]()) +=
           node
         mutableTreeToNodeToIndexInfo
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala
index b89cc60..a63ab91 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala
@@ -40,8 +40,6 @@ class RandomForestSuite extends SparkFunSuite with 
MLlibTestSparkContext {
 
   import RandomForestSuite.mapToVec
 
-  private val seed = 42
-
   /////////////////////////////////////////////////////////////////////////////
   // Tests for split calculation
   /////////////////////////////////////////////////////////////////////////////
@@ -350,7 +348,7 @@ class RandomForestSuite extends SparkFunSuite with 
MLlibTestSparkContext {
     val treeToNodeToIndexInfo = Map(0 -> Map(
       topNode.id -> new RandomForest.NodeIndexInfo(0, None)
     ))
-    val nodeStack = new mutable.ArrayStack[(Int, LearningNode)]
+    val nodeStack = new mutable.ListBuffer[(Int, LearningNode)]
     RandomForest.findBestSplits(baggedInput, metadata, Map(0 -> topNode),
       nodesForGroup, treeToNodeToIndexInfo, splits, nodeStack)
 
@@ -392,7 +390,7 @@ class RandomForestSuite extends SparkFunSuite with 
MLlibTestSparkContext {
     val treeToNodeToIndexInfo = Map(0 -> Map(
       topNode.id -> new RandomForest.NodeIndexInfo(0, None)
     ))
-    val nodeStack = new mutable.ArrayStack[(Int, LearningNode)]
+    val nodeStack = new mutable.ListBuffer[(Int, LearningNode)]
     RandomForest.findBestSplits(baggedInput, metadata, Map(0 -> topNode),
       nodesForGroup, treeToNodeToIndexInfo, splits, nodeStack)
 
@@ -505,11 +503,11 @@ class RandomForestSuite extends SparkFunSuite with 
MLlibTestSparkContext {
         val failString = s"Failed on test with:" +
           s"numTrees=$numTrees, featureSubsetStrategy=$featureSubsetStrategy," 
+
           s" numFeaturesPerNode=$numFeaturesPerNode, seed=$seed"
-        val nodeStack = new mutable.ArrayStack[(Int, LearningNode)]
+        val nodeStack = new mutable.ListBuffer[(Int, LearningNode)]
         val topNodes: Array[LearningNode] = new Array[LearningNode](numTrees)
         Range(0, numTrees).foreach { treeIndex =>
           topNodes(treeIndex) = LearningNode.emptyNode(nodeIndex = 1)
-          nodeStack.push((treeIndex, topNodes(treeIndex)))
+          nodeStack.prepend((treeIndex, topNodes(treeIndex)))
         }
         val rng = new scala.util.Random(seed = seed)
         val (nodesForGroup: Map[Int, Array[LearningNode]],
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala
index fe49162..e478f14 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/PCASuite.scala
@@ -57,8 +57,8 @@ class PCASuite extends SparkFunSuite with 
MLlibTestSparkContext {
 
   test("number of features more than 65535") {
     val data1 = sc.parallelize(Array(
-      Vectors.dense((1 to 100000).map(_ => 2.0).to[scala.Vector].toArray),
-      Vectors.dense((1 to 100000).map(_ => 0.0).to[scala.Vector].toArray)
+      Vectors.dense(Array.fill(100000)(2.0)),
+      Vectors.dense(Array.fill(100000)(0.0))
     ), 2)
 
     val pca = new PCA(2).fit(data1)
diff --git 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
index 4005945..50a7ef7 100644
--- 
a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
+++ 
b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala
@@ -125,7 +125,7 @@ private[spark] object SparkAppLauncher extends Logging {
         appConf.toStringArray :+ appArguments.mainAppResource
 
     if (appArguments.appArgs.nonEmpty) {
-      commandLine ++= appArguments.appArgs.to[ArrayBuffer]
+      commandLine ++= appArguments.appArgs
     }
     logInfo(s"Launching a spark app with command line: 
${commandLine.mkString(" ")}")
     ProcessUtils.executeProcess(commandLine.toArray, timeoutSecs)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
index 8d06804..72ff936 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
@@ -41,7 +41,7 @@ class EquivalentExpressions {
   }
 
   // For each expression, the set of equivalent expressions.
-  private val equivalenceMap = mutable.HashMap.empty[Expr, 
mutable.MutableList[Expression]]
+  private val equivalenceMap = mutable.HashMap.empty[Expr, 
mutable.ArrayBuffer[Expression]]
 
   /**
    * Adds each expression to this data structure, grouping them with existing 
equivalent
@@ -56,7 +56,7 @@ class EquivalentExpressions {
         f.get += expr
         true
       } else {
-        equivalenceMap.put(e, mutable.MutableList(expr))
+        equivalenceMap.put(e, mutable.ArrayBuffer(expr))
         false
       }
     } else {
@@ -102,7 +102,7 @@ class EquivalentExpressions {
    * an empty collection if there are none.
    */
   def getEquivalentExprs(e: Expression): Seq[Expression] = {
-    equivalenceMap.getOrElse(Expr(e), mutable.MutableList())
+    equivalenceMap.getOrElse(Expr(e), Seq.empty)
   }
 
   /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
index fe324a4..edf8d2c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.types
 
-import scala.collection.mutable.ArrayBuffer
+import scala.collection.{mutable, Map}
 import scala.util.Try
 import scala.util.control.NonFatal
 
@@ -516,7 +516,7 @@ object StructType extends AbstractDataType {
           leftContainsNull || rightContainsNull)
 
       case (StructType(leftFields), StructType(rightFields)) =>
-        val newFields = ArrayBuffer.empty[StructField]
+        val newFields = mutable.ArrayBuffer.empty[StructField]
 
         val rightMapped = fieldsMap(rightFields)
         leftFields.foreach {
@@ -575,7 +575,10 @@ object StructType extends AbstractDataType {
     }
 
   private[sql] def fieldsMap(fields: Array[StructField]): Map[String, 
StructField] = {
-    import scala.collection.breakOut
-    fields.map(s => (s.name, s))(breakOut)
+    // Mimics the optimization of breakOut, not present in Scala 2.13, while 
working in 2.12
+    val map = mutable.Map[String, StructField]()
+    map.sizeHint(fields.length)
+    fields.foreach(s => map.put(s.name, s))
+    map
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to