This is an automated email from the ASF dual-hosted git repository.

yangjie01 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 29eb577cefa 
[SPARK-45701][SPARK-45684][SPARK-45692][CORE][SQL][SS][ML][K8S] Clean up the 
deprecated API usage related to 
`mutable.SetOps/c.SeqOps/Iterator/Iterable/IterableOps`
29eb577cefa is described below

commit 29eb577cefabd954f3de2c284a692790d621d0ba
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Tue Oct 31 15:07:47 2023 +0800

    [SPARK-45701][SPARK-45684][SPARK-45692][CORE][SQL][SS][ML][K8S] Clean up 
the deprecated API usage related to 
`mutable.SetOps/c.SeqOps/Iterator/Iterable/IterableOps`
    
    ### What changes were proposed in this pull request?
    This pr clean up the deprecated API usage related to `SetOps`:
    
    - `--` -> `diff`
    - `-` -> `diff`
    - `+` -> `union`
    - `retain` -> `filterInPlace`
    
    the changes are refer to
    
    ```scala
      deprecated("Consider requiring an immutable Set", "2.13.0")
      def -- (that: IterableOnce[A]): C = {
        val toRemove = that.iterator.to(immutable.Set)
        fromSpecific(view.filterNot(toRemove))
      }
    
     deprecated("Consider requiring an immutable Set or fall back to Set.diff", 
"2.13.0")
      def - (elem: A): C = diff(Set(elem))
    
      deprecated("Consider requiring an immutable Set or fall back to 
Set.union", "2.13.0")
      def + (elem: A): C = fromSpecific(new View.Appended(this, elem))
    
      deprecated("Use filterInPlace instead", "2.13.0")
      inline final def retain(p: A => Boolean): Unit = filterInPlace(p)
    ```
    
    This pr also clean up deprecated API usage related to `SeqOps`
    
    - `transform` -> `mapInPlace`
    - `reverseMap` -> `.reverseIterator.map(f).to(...)`
    - `union` -> `concat`
    
    the changes are refer to
    
    ```scala
      deprecated("Use `mapInPlace` on an `IndexedSeq` instead", "2.13.0")
      `inline`final def transform(f: A => A): this.type = {
        var i = 0
        val siz = size
        while (i < siz) { this(i) = f(this(i)); i += 1 }
        this
      }
    
      deprecated("Use .reverseIterator.map(f).to(...) instead of 
.reverseMap(f)", "2.13.0")
      def reverseMap[B](f: A => B): CC[B] = iterableFactory.from(new 
View.Map(View.fromIteratorProvider(() => reverseIterator), f))
    
      deprecated("Use `concat` instead", "2.13.0")
      inline final def union[B >: A](that: Seq[B]): CC[B] = concat(that)
    ```
    
    This pr also clean up deprecated API usage related to 
`Iterator/Iterable/IterableOps` refer to
    
    trait Iterable
    
    - `toIterable` -> immutable.ArraySeq.unsafeWrapArray
    
    ```scala
      deprecated("toIterable is internal and will be made protected; its name 
is similar to `toList` or `toSeq`, but it doesn't copy non-immutable 
collections", "2.13.7")
      final def toIterable: this.type = this
    ```
    
    - s.c.Iterator
    
    - `.seq` -> removed
    
    ```scala
      deprecated("Iterator.seq always returns the iterator itself", "2.13.0")
      def seq: this.type = this
    ```
    
    - s.c.IterableOps
    
    - `toTraversable ` -> removed
    
    ```
      deprecated("toTraversable is internal and will be made protected; its 
name is similar to `toList` or `toSeq`, but it doesn't copy non-immutable 
collections", "2.13.0")
      final def toTraversable: Traversable[A] = toIterable
    ```
    
    ### Why are the changes needed?
    Clean up deprecated Scala API usage
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass GitHub Actions
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #43575 from LuciferYang/SPARK-45701.
    
    Lead-authored-by: yangjie01 <yangji...@baidu.com>
    Co-authored-by: YangJie <yangji...@baidu.com>
    Signed-off-by: yangjie01 <yangji...@baidu.com>
---
 .../org/apache/spark/util/ClosureCleaner.scala     |  4 +--
 .../scala/org/apache/spark/sql/SparkSession.scala  |  3 ++-
 .../sql/KeyValueGroupedDatasetE2ETestSuite.scala   |  2 +-
 .../connect/client/GrpcExceptionConverter.scala    |  3 ++-
 .../org/apache/spark/sql/connect/dsl/package.scala |  4 +--
 .../sql/connect/planner/SparkConnectPlanner.scala  | 26 +++++++++---------
 .../spark/sql/connect/utils/ErrorUtils.scala       | 31 +++++++++++-----------
 .../spark/storage/BlockReplicationPolicy.scala     |  2 +-
 .../collection/ExternalAppendOnlyMapSuite.scala    |  6 ++---
 .../ml/classification/LogisticRegression.scala     |  2 +-
 .../apache/spark/ml/feature/NormalizerSuite.scala  |  4 +--
 .../spark/ml/feature/StringIndexerSuite.scala      |  4 +--
 .../cluster/k8s/ExecutorPodsAllocator.scala        |  2 +-
 .../cluster/k8s/ExecutorPodsLifecycleManager.scala |  2 +-
 .../sql/catalyst/expressions/AttributeSet.scala    |  4 +--
 .../catalyst/expressions/stringExpressions.scala   |  2 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  2 +-
 .../catalyst/rules/QueryExecutionMetering.scala    |  2 +-
 .../apache/spark/sql/execution/command/ddl.scala   |  2 +-
 .../spark/sql/execution/streaming/memory.scala     |  2 +-
 .../apache/spark/sql/streaming/StreamSuite.scala   |  2 +-
 21 files changed, 57 insertions(+), 54 deletions(-)

diff --git 
a/common/utils/src/main/scala/org/apache/spark/util/ClosureCleaner.scala 
b/common/utils/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
index ffa2f0e60b2..5ea3c9afa9c 100644
--- a/common/utils/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
+++ b/common/utils/src/main/scala/org/apache/spark/util/ClosureCleaner.scala
@@ -95,13 +95,13 @@ private[spark] object ClosureCleaner extends Logging {
       if (cr != null) {
         val set = Set.empty[Class[_]]
         cr.accept(new InnerClosureFinder(set), 0)
-        for (cls <- set -- seen) {
+        for (cls <- set.diff(seen)) {
           seen += cls
           stack.push(cls)
         }
       }
     }
-    (seen - obj.getClass).toList
+    seen.diff(Set(obj.getClass)).toList
   }
 
   /** Initializes the accessed fields for outer classes and their super 
classes. */
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 2d3e4205da9..969ac017ecb 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -21,6 +21,7 @@ import java.net.URI
 import java.util.concurrent.TimeUnit._
 import java.util.concurrent.atomic.{AtomicLong, AtomicReference}
 
+import scala.collection.immutable
 import scala.jdk.CollectionConverters._
 import scala.reflect.runtime.universe.TypeTag
 
@@ -247,7 +248,7 @@ class SparkSession private[sql] (
         proto.SqlCommand
           .newBuilder()
           .setSql(sqlText)
-          .addAllPosArguments(args.map(lit(_).expr).toIterable.asJava)))
+          
.addAllPosArguments(immutable.ArraySeq.unsafeWrapArray(args.map(lit(_).expr)).asJava)))
     val plan = proto.Plan.newBuilder().setCommand(cmd)
     // .toBuffer forces that the iterator is consumed and closed
     val responseSeq = client.execute(plan.build()).toBuffer.toSeq
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/KeyValueGroupedDatasetE2ETestSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/KeyValueGroupedDatasetE2ETestSuite.scala
index 98a947826e3..91516b0069b 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/KeyValueGroupedDatasetE2ETestSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/KeyValueGroupedDatasetE2ETestSuite.scala
@@ -198,7 +198,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest 
with SQLHelper {
       .groupByKey(v => v / 2)
     val values = grouped
       .cogroup(otherGrouped) { (k, it, otherIt) =>
-        Seq(it.toSeq.size + otherIt.seq.size)
+        Seq(it.toSeq.size + otherIt.size)
       }
       .collectAsList()
 
diff --git 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
index f404cfd2e41..b2782442f4a 100644
--- 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
+++ 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.connect.client
 
 import java.time.DateTimeException
 
+import scala.collection.immutable
 import scala.jdk.CollectionConverters._
 import scala.reflect.ClassTag
 
@@ -371,7 +372,7 @@ private[client] object GrpcExceptionConverter {
         FetchErrorDetailsResponse.Error
           .newBuilder()
           .setMessage(message)
-          .addAllErrorTypeHierarchy(classes.toIterable.asJava)
+          
.addAllErrorTypeHierarchy(immutable.ArraySeq.unsafeWrapArray(classes).asJava)
           .build()))
   }
 }
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
index 24fa2324f66..5fd1a035385 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
@@ -520,7 +520,7 @@ package object dsl {
         .setProject(
           Project
             .newBuilder()
-            .addAllExpressions(exprs.toIterable.asJava)
+            .addAllExpressions(exprs.asJava)
             .build())
         .build()
     }
@@ -533,7 +533,7 @@ package object dsl {
             Project
               .newBuilder()
               .setInput(logicalPlan)
-              .addAllExpressions(exprs.toIterable.asJava)
+              .addAllExpressions(exprs.asJava)
               .build())
           .build()
       }
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 17c10e63301..ec57909ad14 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.connect.planner
 
+import scala.collection.immutable
 import scala.collection.mutable
 import scala.jdk.CollectionConverters._
 import scala.util.Try
@@ -3183,9 +3184,9 @@ class SparkConnectPlanner(
       case StreamingQueryManagerCommand.CommandCase.ACTIVE =>
         val active_queries = session.streams.active
         respBuilder.getActiveBuilder.addAllActiveQueries(
-          active_queries
-            .map(query => buildStreamingQueryInstance(query))
-            .toIterable
+          immutable.ArraySeq
+            .unsafeWrapArray(active_queries
+              .map(query => buildStreamingQueryInstance(query)))
             .asJava)
 
       case StreamingQueryManagerCommand.CommandCase.GET_QUERY =>
@@ -3264,16 +3265,15 @@ class SparkConnectPlanner(
         .setGetResourcesCommandResult(
           proto.GetResourcesCommandResult
             .newBuilder()
-            .putAllResources(
-              session.sparkContext.resources.view
-                .mapValues(resource =>
-                  proto.ResourceInformation
-                    .newBuilder()
-                    .setName(resource.name)
-                    .addAllAddresses(resource.addresses.toIterable.asJava)
-                    .build())
-                .toMap
-                .asJava)
+            .putAllResources(session.sparkContext.resources.view
+              .mapValues(resource =>
+                proto.ResourceInformation
+                  .newBuilder()
+                  .setName(resource.name)
+                  
.addAllAddresses(immutable.ArraySeq.unsafeWrapArray(resource.addresses).asJava)
+                  .build())
+              .toMap
+              .asJava)
             .build())
         .build())
   }
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
index f49b81dda8d..741fa97f178 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql.connect.utils
 import java.util.UUID
 
 import scala.annotation.tailrec
+import scala.collection.immutable
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.jdk.CollectionConverters._
@@ -90,21 +91,21 @@ private[connect] object ErrorUtils extends Logging {
 
       if (serverStackTraceEnabled) {
         builder.addAllStackTrace(
-          currentError.getStackTrace
-            .map { stackTraceElement =>
-              val stackTraceBuilder = 
FetchErrorDetailsResponse.StackTraceElement
-                .newBuilder()
-                .setDeclaringClass(stackTraceElement.getClassName)
-                .setMethodName(stackTraceElement.getMethodName)
-                .setLineNumber(stackTraceElement.getLineNumber)
-
-              if (stackTraceElement.getFileName != null) {
-                stackTraceBuilder.setFileName(stackTraceElement.getFileName)
-              }
-
-              stackTraceBuilder.build()
-            }
-            .toIterable
+          immutable.ArraySeq
+            .unsafeWrapArray(currentError.getStackTrace
+              .map { stackTraceElement =>
+                val stackTraceBuilder = 
FetchErrorDetailsResponse.StackTraceElement
+                  .newBuilder()
+                  .setDeclaringClass(stackTraceElement.getClassName)
+                  .setMethodName(stackTraceElement.getMethodName)
+                  .setLineNumber(stackTraceElement.getLineNumber)
+
+                if (stackTraceElement.getFileName != null) {
+                  stackTraceBuilder.setFileName(stackTraceElement.getFileName)
+                }
+
+                stackTraceBuilder.build()
+              })
             .asJava)
       }
 
diff --git 
a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala 
b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala
index 0bacc34cdfd..893b5605414 100644
--- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala
+++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala
@@ -67,7 +67,7 @@ object BlockReplicationUtils {
   private def getSampleIds(n: Int, m: Int, r: Random): List[Int] = {
     val indices = (n - m + 1 to n).foldLeft(mutable.LinkedHashSet.empty[Int]) 
{case (set, i) =>
       val t = r.nextInt(i) + 1
-      if (set.contains(t)) set + i else set + t
+      if (set.contains(t)) set.union(Set(i)) else set.union(Set(t))
     }
     indices.map(_ - 1).toList
   }
diff --git 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
index fb1502848ca..59f6e3f2d35 100644
--- 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala
@@ -453,7 +453,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
     val first50Keys = for ( _ <- 0 until 50) yield {
       val (k, vs) = it.next()
       val sortedVs = vs.sorted
-      assert(sortedVs.seq == (0 until 10).map(10 * k + _))
+      assert(sortedVs == (0 until 10).map(10 * k + _))
       k
     }
     assert(map.numSpills == 0)
@@ -474,7 +474,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
     val next50Keys = for ( _ <- 0 until 50) yield {
       val (k, vs) = it.next()
       val sortedVs = vs.sorted
-      assert(sortedVs.seq == (0 until 10).map(10 * k + _))
+      assert(sortedVs == (0 until 10).map(10 * k + _))
       k
     }
     assert(!it.hasNext)
@@ -506,7 +506,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite
     val keys = it.map{
       case (k, vs) =>
         val sortedVs = vs.sorted
-        assert(sortedVs.seq == (0 until 10).map(10 * k + _))
+        assert(sortedVs == (0 until 10).map(10 * k + _))
         k
     }
     .toList
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
index 0efa57e56f1..8b796a65f4f 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala
@@ -672,7 +672,7 @@ class LogisticRegression @Since("1.2.0") (
       denseCoefficientMatrix.foreachActive { case (i, j, v) =>
         centers(j) += v
       }
-      centers.transform(_ / numCoefficientSets)
+      centers.mapInPlace(_ / numCoefficientSets)
       denseCoefficientMatrix.foreachActive { case (i, j, v) =>
         denseCoefficientMatrix.update(i, j, v - centers(j))
       }
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala
index 1c602cd7d9a..7fa2ad5ee41 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala
@@ -76,7 +76,7 @@ class NormalizerSuite extends MLTest with 
DefaultReadWriteTest {
 
   test("Normalization with default parameter") {
     val normalizer = new 
Normalizer().setInputCol("features").setOutputCol("normalized")
-    val dataFrame: DataFrame = data.zip(l2Normalized).seq.toDF("features", 
"expected")
+    val dataFrame: DataFrame = data.zip(l2Normalized).toDF("features", 
"expected")
 
     testTransformer[(Vector, Vector)](dataFrame, normalizer, "features", 
"normalized", "expected") {
       case Row(features: Vector, normalized: Vector, expected: Vector) =>
@@ -102,7 +102,7 @@ class NormalizerSuite extends MLTest with 
DefaultReadWriteTest {
   }
 
   test("Normalization with setter") {
-    val dataFrame: DataFrame = data.zip(l1Normalized).seq.toDF("features", 
"expected")
+    val dataFrame: DataFrame = data.zip(l1Normalized).toDF("features", 
"expected")
     val normalizer = new 
Normalizer().setInputCol("features").setOutputCol("normalized").setP(1)
 
     testTransformer[(Vector, Vector)](dataFrame, normalizer, "features", 
"normalized", "expected") {
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
index c8247b9c8f3..99f12eab7d6 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala
@@ -92,7 +92,7 @@ class StringIndexerSuite extends MLTest with 
DefaultReadWriteTest {
       val attr = Attribute.fromStructField(rows.head.schema("labelIndex"))
         .asInstanceOf[NominalAttribute]
       assert(attr.values.get === Array("a", "c", "b"))
-      assert(rows.seq === expected.collect().toSeq)
+      assert(rows === expected.collect().toSeq)
     }
   }
 
@@ -139,7 +139,7 @@ class StringIndexerSuite extends MLTest with 
DefaultReadWriteTest {
       val attrSkip = Attribute.fromStructField(rows.head.schema("labelIndex"))
         .asInstanceOf[NominalAttribute]
       assert(attrSkip.values.get === Array("b", "a"))
-      assert(rows.seq === expectedSkip.collect().toSeq)
+      assert(rows === expectedSkip.collect().toSeq)
     }
 
     indexer.setHandleInvalid("keep")
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index e678489e100..a4403fb96b2 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -277,7 +277,7 @@ class ExecutorPodsAllocator(
         case _ => false
       }.keySet
 
-      val newFailedExecutorIds = currentFailedExecutorIds -- failedExecutorIds
+      val newFailedExecutorIds = 
currentFailedExecutorIds.diff(failedExecutorIds)
       if (newFailedExecutorIds.nonEmpty) {
         logWarning(s"${newFailedExecutorIds.size} new failed executors.")
         newFailedExecutorIds.foreach { _ => 
failureTracker.registerExecutorFailure() }
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
index 49bfde98bb8..46129f3c9ea 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala
@@ -116,7 +116,7 @@ private[spark] class ExecutorPodsLifecycleManager(
     // This makes sure that we don't keep growing that set indefinitely, in 
case we end up missing
     // an update for some pod.
     if (inactivatedPods.nonEmpty && snapshots.nonEmpty) {
-      inactivatedPods.retain(snapshots.last.executorPods.contains(_))
+      inactivatedPods.filterInPlace(snapshots.last.executorPods.contains(_))
     }
 
     // Reconcile the case where Spark claims to know about an executor but the 
corresponding pod
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
index b059e4f8496..2628afd8923 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala
@@ -84,11 +84,11 @@ class AttributeSet private (private val baseSet: 
mutable.LinkedHashSet[Attribute
 
   /** Returns a new [[AttributeSet]] that contains `elem` in addition to the 
current elements. */
   def +(elem: Attribute): AttributeSet =  // scalastyle:ignore
-    new AttributeSet(baseSet + new AttributeEquals(elem))
+    new AttributeSet(baseSet.union(Set(new AttributeEquals(elem))))
 
   /** Returns a new [[AttributeSet]] that does not contain `elem`. */
   def -(elem: Attribute): AttributeSet =
-    new AttributeSet(baseSet - new AttributeEquals(elem))
+    new AttributeSet(baseSet.diff(Set(new AttributeEquals(elem))))
 
   /** Returns an iterator containing all of the attributes in the set. */
   def iterator: Iterator[Attribute] = baseSet.map(_.a).iterator
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
index eff63bf0341..563223bb33b 100755
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
@@ -2573,7 +2573,7 @@ object Decode {
             default = search
           }
         }
-        CaseWhen(branches.seq.toSeq, default)
+        CaseWhen(branches.toSeq, default)
     }
   }
 }
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index d9d04db9ab0..6d70ad29f87 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -1835,7 +1835,7 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
 
     // Reverse the contexts to have them in the same sequence as in the SQL 
statement & turn them
     // into expressions.
-    val expressions = contexts.reverseMap(expression)
+    val expressions = contexts.reverseIterator.map(expression).to(ArrayBuffer)
 
     // Create a balanced tree.
     def reduceToExpressionTree(low: Int, high: Int): Expression = high - low 
match {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala
index 4b7b0079f07..60d82d81df7 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala
@@ -86,7 +86,7 @@ case class QueryExecutionMetering() {
     val colRunTime = "Effective Time / Total Time".padTo(len = 47, " 
").mkString
     val colNumRuns = "Effective Runs / Total Runs".padTo(len = 47, " 
").mkString
 
-    val ruleMetrics = map.toSeq.sortBy(_._2).reverseMap { case (name, time) =>
+    val ruleMetrics = map.toSeq.sortBy(_._2).reverseIterator.map { case (name, 
time) =>
       val timeEffectiveRun = timeEffectiveRunsMap.get(name)
       val numRuns = numRunsMap.get(name)
       val numEffectiveRun = numEffectiveRunsMap.get(name)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
index a30734abfa7..9dc6d7aab8b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala
@@ -707,7 +707,7 @@ case class RepairTableCommand(
       val partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)] =
         try {
           scanPartitions(spark, fs, pathFilter, root, Map(), 
table.partitionColumnNames, threshold,
-            spark.sessionState.conf.resolver, new 
ForkJoinTaskSupport(evalPool)).seq
+            spark.sessionState.conf.resolver, new 
ForkJoinTaskSupport(evalPool))
         } finally {
           evalPool.shutdown()
         }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
index 826543fd565..2ff478ef98e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala
@@ -69,7 +69,7 @@ abstract class MemoryStreamBase[A : Encoder](sqlContext: 
SQLContext) extends Spa
   }
 
   def addData(data: A*): OffsetV2 = {
-    addData(data.toTraversable)
+    addData(data)
   }
 
   def addData(data: IterableOnce[A]): OffsetV2
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index e54ce649736..2174e91cb44 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -174,7 +174,7 @@ class StreamSuite extends StreamTest {
         try {
           query.processAllAvailable()
           val outputDf = spark.read.parquet(outputDir.getAbsolutePath).as[Long]
-          checkDatasetUnorderly[Long](outputDf, (0L to 10L).union((0L to 
10L)).toArray: _*)
+          checkDatasetUnorderly[Long](outputDf, (0L to 10L).concat(0L to 10L): 
_*)
         } finally {
           query.stop()
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to