This is an automated email from the ASF dual-hosted git repository. yangjie01 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 29eb577cefa [SPARK-45701][SPARK-45684][SPARK-45692][CORE][SQL][SS][ML][K8S] Clean up the deprecated API usage related to `mutable.SetOps/c.SeqOps/Iterator/Iterable/IterableOps` 29eb577cefa is described below commit 29eb577cefabd954f3de2c284a692790d621d0ba Author: yangjie01 <yangji...@baidu.com> AuthorDate: Tue Oct 31 15:07:47 2023 +0800 [SPARK-45701][SPARK-45684][SPARK-45692][CORE][SQL][SS][ML][K8S] Clean up the deprecated API usage related to `mutable.SetOps/c.SeqOps/Iterator/Iterable/IterableOps` ### What changes were proposed in this pull request? This pr clean up the deprecated API usage related to `SetOps`: - `--` -> `diff` - `-` -> `diff` - `+` -> `union` - `retain` -> `filterInPlace` the changes are refer to ```scala deprecated("Consider requiring an immutable Set", "2.13.0") def -- (that: IterableOnce[A]): C = { val toRemove = that.iterator.to(immutable.Set) fromSpecific(view.filterNot(toRemove)) } deprecated("Consider requiring an immutable Set or fall back to Set.diff", "2.13.0") def - (elem: A): C = diff(Set(elem)) deprecated("Consider requiring an immutable Set or fall back to Set.union", "2.13.0") def + (elem: A): C = fromSpecific(new View.Appended(this, elem)) deprecated("Use filterInPlace instead", "2.13.0") inline final def retain(p: A => Boolean): Unit = filterInPlace(p) ``` This pr also clean up deprecated API usage related to `SeqOps` - `transform` -> `mapInPlace` - `reverseMap` -> `.reverseIterator.map(f).to(...)` - `union` -> `concat` the changes are refer to ```scala deprecated("Use `mapInPlace` on an `IndexedSeq` instead", "2.13.0") `inline`final def transform(f: A => A): this.type = { var i = 0 val siz = size while (i < siz) { this(i) = f(this(i)); i += 1 } this } deprecated("Use .reverseIterator.map(f).to(...) instead of .reverseMap(f)", "2.13.0") def reverseMap[B](f: A => B): CC[B] = iterableFactory.from(new View.Map(View.fromIteratorProvider(() => reverseIterator), f)) deprecated("Use `concat` instead", "2.13.0") inline final def union[B >: A](that: Seq[B]): CC[B] = concat(that) ``` This pr also clean up deprecated API usage related to `Iterator/Iterable/IterableOps` refer to trait Iterable - `toIterable` -> immutable.ArraySeq.unsafeWrapArray ```scala deprecated("toIterable is internal and will be made protected; its name is similar to `toList` or `toSeq`, but it doesn't copy non-immutable collections", "2.13.7") final def toIterable: this.type = this ``` - s.c.Iterator - `.seq` -> removed ```scala deprecated("Iterator.seq always returns the iterator itself", "2.13.0") def seq: this.type = this ``` - s.c.IterableOps - `toTraversable ` -> removed ``` deprecated("toTraversable is internal and will be made protected; its name is similar to `toList` or `toSeq`, but it doesn't copy non-immutable collections", "2.13.0") final def toTraversable: Traversable[A] = toIterable ``` ### Why are the changes needed? Clean up deprecated Scala API usage ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes #43575 from LuciferYang/SPARK-45701. Lead-authored-by: yangjie01 <yangji...@baidu.com> Co-authored-by: YangJie <yangji...@baidu.com> Signed-off-by: yangjie01 <yangji...@baidu.com> --- .../org/apache/spark/util/ClosureCleaner.scala | 4 +-- .../scala/org/apache/spark/sql/SparkSession.scala | 3 ++- .../sql/KeyValueGroupedDatasetE2ETestSuite.scala | 2 +- .../connect/client/GrpcExceptionConverter.scala | 3 ++- .../org/apache/spark/sql/connect/dsl/package.scala | 4 +-- .../sql/connect/planner/SparkConnectPlanner.scala | 26 +++++++++--------- .../spark/sql/connect/utils/ErrorUtils.scala | 31 +++++++++++----------- .../spark/storage/BlockReplicationPolicy.scala | 2 +- .../collection/ExternalAppendOnlyMapSuite.scala | 6 ++--- .../ml/classification/LogisticRegression.scala | 2 +- .../apache/spark/ml/feature/NormalizerSuite.scala | 4 +-- .../spark/ml/feature/StringIndexerSuite.scala | 4 +-- .../cluster/k8s/ExecutorPodsAllocator.scala | 2 +- .../cluster/k8s/ExecutorPodsLifecycleManager.scala | 2 +- .../sql/catalyst/expressions/AttributeSet.scala | 4 +-- .../catalyst/expressions/stringExpressions.scala | 2 +- .../spark/sql/catalyst/parser/AstBuilder.scala | 2 +- .../catalyst/rules/QueryExecutionMetering.scala | 2 +- .../apache/spark/sql/execution/command/ddl.scala | 2 +- .../spark/sql/execution/streaming/memory.scala | 2 +- .../apache/spark/sql/streaming/StreamSuite.scala | 2 +- 21 files changed, 57 insertions(+), 54 deletions(-) diff --git a/common/utils/src/main/scala/org/apache/spark/util/ClosureCleaner.scala b/common/utils/src/main/scala/org/apache/spark/util/ClosureCleaner.scala index ffa2f0e60b2..5ea3c9afa9c 100644 --- a/common/utils/src/main/scala/org/apache/spark/util/ClosureCleaner.scala +++ b/common/utils/src/main/scala/org/apache/spark/util/ClosureCleaner.scala @@ -95,13 +95,13 @@ private[spark] object ClosureCleaner extends Logging { if (cr != null) { val set = Set.empty[Class[_]] cr.accept(new InnerClosureFinder(set), 0) - for (cls <- set -- seen) { + for (cls <- set.diff(seen)) { seen += cls stack.push(cls) } } } - (seen - obj.getClass).toList + seen.diff(Set(obj.getClass)).toList } /** Initializes the accessed fields for outer classes and their super classes. */ diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala index 2d3e4205da9..969ac017ecb 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -21,6 +21,7 @@ import java.net.URI import java.util.concurrent.TimeUnit._ import java.util.concurrent.atomic.{AtomicLong, AtomicReference} +import scala.collection.immutable import scala.jdk.CollectionConverters._ import scala.reflect.runtime.universe.TypeTag @@ -247,7 +248,7 @@ class SparkSession private[sql] ( proto.SqlCommand .newBuilder() .setSql(sqlText) - .addAllPosArguments(args.map(lit(_).expr).toIterable.asJava))) + .addAllPosArguments(immutable.ArraySeq.unsafeWrapArray(args.map(lit(_).expr)).asJava))) val plan = proto.Plan.newBuilder().setCommand(cmd) // .toBuffer forces that the iterator is consumed and closed val responseSeq = client.execute(plan.build()).toBuffer.toSeq diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/KeyValueGroupedDatasetE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/KeyValueGroupedDatasetE2ETestSuite.scala index 98a947826e3..91516b0069b 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/KeyValueGroupedDatasetE2ETestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/KeyValueGroupedDatasetE2ETestSuite.scala @@ -198,7 +198,7 @@ class KeyValueGroupedDatasetE2ETestSuite extends QueryTest with SQLHelper { .groupByKey(v => v / 2) val values = grouped .cogroup(otherGrouped) { (k, it, otherIt) => - Seq(it.toSeq.size + otherIt.seq.size) + Seq(it.toSeq.size + otherIt.size) } .collectAsList() diff --git a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala index f404cfd2e41..b2782442f4a 100644 --- a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala +++ b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/client/GrpcExceptionConverter.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.connect.client import java.time.DateTimeException +import scala.collection.immutable import scala.jdk.CollectionConverters._ import scala.reflect.ClassTag @@ -371,7 +372,7 @@ private[client] object GrpcExceptionConverter { FetchErrorDetailsResponse.Error .newBuilder() .setMessage(message) - .addAllErrorTypeHierarchy(classes.toIterable.asJava) + .addAllErrorTypeHierarchy(immutable.ArraySeq.unsafeWrapArray(classes).asJava) .build())) } } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala index 24fa2324f66..5fd1a035385 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala @@ -520,7 +520,7 @@ package object dsl { .setProject( Project .newBuilder() - .addAllExpressions(exprs.toIterable.asJava) + .addAllExpressions(exprs.asJava) .build()) .build() } @@ -533,7 +533,7 @@ package object dsl { Project .newBuilder() .setInput(logicalPlan) - .addAllExpressions(exprs.toIterable.asJava) + .addAllExpressions(exprs.asJava) .build()) .build() } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 17c10e63301..ec57909ad14 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.connect.planner +import scala.collection.immutable import scala.collection.mutable import scala.jdk.CollectionConverters._ import scala.util.Try @@ -3183,9 +3184,9 @@ class SparkConnectPlanner( case StreamingQueryManagerCommand.CommandCase.ACTIVE => val active_queries = session.streams.active respBuilder.getActiveBuilder.addAllActiveQueries( - active_queries - .map(query => buildStreamingQueryInstance(query)) - .toIterable + immutable.ArraySeq + .unsafeWrapArray(active_queries + .map(query => buildStreamingQueryInstance(query))) .asJava) case StreamingQueryManagerCommand.CommandCase.GET_QUERY => @@ -3264,16 +3265,15 @@ class SparkConnectPlanner( .setGetResourcesCommandResult( proto.GetResourcesCommandResult .newBuilder() - .putAllResources( - session.sparkContext.resources.view - .mapValues(resource => - proto.ResourceInformation - .newBuilder() - .setName(resource.name) - .addAllAddresses(resource.addresses.toIterable.asJava) - .build()) - .toMap - .asJava) + .putAllResources(session.sparkContext.resources.view + .mapValues(resource => + proto.ResourceInformation + .newBuilder() + .setName(resource.name) + .addAllAddresses(immutable.ArraySeq.unsafeWrapArray(resource.addresses).asJava) + .build()) + .toMap + .asJava) .build()) .build()) } diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala index f49b81dda8d..741fa97f178 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/utils/ErrorUtils.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.connect.utils import java.util.UUID import scala.annotation.tailrec +import scala.collection.immutable import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.jdk.CollectionConverters._ @@ -90,21 +91,21 @@ private[connect] object ErrorUtils extends Logging { if (serverStackTraceEnabled) { builder.addAllStackTrace( - currentError.getStackTrace - .map { stackTraceElement => - val stackTraceBuilder = FetchErrorDetailsResponse.StackTraceElement - .newBuilder() - .setDeclaringClass(stackTraceElement.getClassName) - .setMethodName(stackTraceElement.getMethodName) - .setLineNumber(stackTraceElement.getLineNumber) - - if (stackTraceElement.getFileName != null) { - stackTraceBuilder.setFileName(stackTraceElement.getFileName) - } - - stackTraceBuilder.build() - } - .toIterable + immutable.ArraySeq + .unsafeWrapArray(currentError.getStackTrace + .map { stackTraceElement => + val stackTraceBuilder = FetchErrorDetailsResponse.StackTraceElement + .newBuilder() + .setDeclaringClass(stackTraceElement.getClassName) + .setMethodName(stackTraceElement.getMethodName) + .setLineNumber(stackTraceElement.getLineNumber) + + if (stackTraceElement.getFileName != null) { + stackTraceBuilder.setFileName(stackTraceElement.getFileName) + } + + stackTraceBuilder.build() + }) .asJava) } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala index 0bacc34cdfd..893b5605414 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala @@ -67,7 +67,7 @@ object BlockReplicationUtils { private def getSampleIds(n: Int, m: Int, r: Random): List[Int] = { val indices = (n - m + 1 to n).foldLeft(mutable.LinkedHashSet.empty[Int]) {case (set, i) => val t = r.nextInt(i) + 1 - if (set.contains(t)) set + i else set + t + if (set.contains(t)) set.union(Set(i)) else set.union(Set(t)) } indices.map(_ - 1).toList } diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala index fb1502848ca..59f6e3f2d35 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala @@ -453,7 +453,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite val first50Keys = for ( _ <- 0 until 50) yield { val (k, vs) = it.next() val sortedVs = vs.sorted - assert(sortedVs.seq == (0 until 10).map(10 * k + _)) + assert(sortedVs == (0 until 10).map(10 * k + _)) k } assert(map.numSpills == 0) @@ -474,7 +474,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite val next50Keys = for ( _ <- 0 until 50) yield { val (k, vs) = it.next() val sortedVs = vs.sorted - assert(sortedVs.seq == (0 until 10).map(10 * k + _)) + assert(sortedVs == (0 until 10).map(10 * k + _)) k } assert(!it.hasNext) @@ -506,7 +506,7 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite val keys = it.map{ case (k, vs) => val sortedVs = vs.sorted - assert(sortedVs.seq == (0 until 10).map(10 * k + _)) + assert(sortedVs == (0 until 10).map(10 * k + _)) k } .toList diff --git a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala index 0efa57e56f1..8b796a65f4f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/classification/LogisticRegression.scala @@ -672,7 +672,7 @@ class LogisticRegression @Since("1.2.0") ( denseCoefficientMatrix.foreachActive { case (i, j, v) => centers(j) += v } - centers.transform(_ / numCoefficientSets) + centers.mapInPlace(_ / numCoefficientSets) denseCoefficientMatrix.foreachActive { case (i, j, v) => denseCoefficientMatrix.update(i, j, v - centers(j)) } diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala index 1c602cd7d9a..7fa2ad5ee41 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/NormalizerSuite.scala @@ -76,7 +76,7 @@ class NormalizerSuite extends MLTest with DefaultReadWriteTest { test("Normalization with default parameter") { val normalizer = new Normalizer().setInputCol("features").setOutputCol("normalized") - val dataFrame: DataFrame = data.zip(l2Normalized).seq.toDF("features", "expected") + val dataFrame: DataFrame = data.zip(l2Normalized).toDF("features", "expected") testTransformer[(Vector, Vector)](dataFrame, normalizer, "features", "normalized", "expected") { case Row(features: Vector, normalized: Vector, expected: Vector) => @@ -102,7 +102,7 @@ class NormalizerSuite extends MLTest with DefaultReadWriteTest { } test("Normalization with setter") { - val dataFrame: DataFrame = data.zip(l1Normalized).seq.toDF("features", "expected") + val dataFrame: DataFrame = data.zip(l1Normalized).toDF("features", "expected") val normalizer = new Normalizer().setInputCol("features").setOutputCol("normalized").setP(1) testTransformer[(Vector, Vector)](dataFrame, normalizer, "features", "normalized", "expected") { diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala index c8247b9c8f3..99f12eab7d6 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/feature/StringIndexerSuite.scala @@ -92,7 +92,7 @@ class StringIndexerSuite extends MLTest with DefaultReadWriteTest { val attr = Attribute.fromStructField(rows.head.schema("labelIndex")) .asInstanceOf[NominalAttribute] assert(attr.values.get === Array("a", "c", "b")) - assert(rows.seq === expected.collect().toSeq) + assert(rows === expected.collect().toSeq) } } @@ -139,7 +139,7 @@ class StringIndexerSuite extends MLTest with DefaultReadWriteTest { val attrSkip = Attribute.fromStructField(rows.head.schema("labelIndex")) .asInstanceOf[NominalAttribute] assert(attrSkip.values.get === Array("b", "a")) - assert(rows.seq === expectedSkip.collect().toSeq) + assert(rows === expectedSkip.collect().toSeq) } indexer.setHandleInvalid("keep") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index e678489e100..a4403fb96b2 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -277,7 +277,7 @@ class ExecutorPodsAllocator( case _ => false }.keySet - val newFailedExecutorIds = currentFailedExecutorIds -- failedExecutorIds + val newFailedExecutorIds = currentFailedExecutorIds.diff(failedExecutorIds) if (newFailedExecutorIds.nonEmpty) { logWarning(s"${newFailedExecutorIds.size} new failed executors.") newFailedExecutorIds.foreach { _ => failureTracker.registerExecutorFailure() } diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala index 49bfde98bb8..46129f3c9ea 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsLifecycleManager.scala @@ -116,7 +116,7 @@ private[spark] class ExecutorPodsLifecycleManager( // This makes sure that we don't keep growing that set indefinitely, in case we end up missing // an update for some pod. if (inactivatedPods.nonEmpty && snapshots.nonEmpty) { - inactivatedPods.retain(snapshots.last.executorPods.contains(_)) + inactivatedPods.filterInPlace(snapshots.last.executorPods.contains(_)) } // Reconcile the case where Spark claims to know about an executor but the corresponding pod diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala index b059e4f8496..2628afd8923 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/AttributeSet.scala @@ -84,11 +84,11 @@ class AttributeSet private (private val baseSet: mutable.LinkedHashSet[Attribute /** Returns a new [[AttributeSet]] that contains `elem` in addition to the current elements. */ def +(elem: Attribute): AttributeSet = // scalastyle:ignore - new AttributeSet(baseSet + new AttributeEquals(elem)) + new AttributeSet(baseSet.union(Set(new AttributeEquals(elem)))) /** Returns a new [[AttributeSet]] that does not contain `elem`. */ def -(elem: Attribute): AttributeSet = - new AttributeSet(baseSet - new AttributeEquals(elem)) + new AttributeSet(baseSet.diff(Set(new AttributeEquals(elem)))) /** Returns an iterator containing all of the attributes in the set. */ def iterator: Iterator[Attribute] = baseSet.map(_.a).iterator diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala index eff63bf0341..563223bb33b 100755 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala @@ -2573,7 +2573,7 @@ object Decode { default = search } } - CaseWhen(branches.seq.toSeq, default) + CaseWhen(branches.toSeq, default) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index d9d04db9ab0..6d70ad29f87 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -1835,7 +1835,7 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging { // Reverse the contexts to have them in the same sequence as in the SQL statement & turn them // into expressions. - val expressions = contexts.reverseMap(expression) + val expressions = contexts.reverseIterator.map(expression).to(ArrayBuffer) // Create a balanced tree. def reduceToExpressionTree(low: Int, high: Int): Expression = high - low match { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala index 4b7b0079f07..60d82d81df7 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/QueryExecutionMetering.scala @@ -86,7 +86,7 @@ case class QueryExecutionMetering() { val colRunTime = "Effective Time / Total Time".padTo(len = 47, " ").mkString val colNumRuns = "Effective Runs / Total Runs".padTo(len = 47, " ").mkString - val ruleMetrics = map.toSeq.sortBy(_._2).reverseMap { case (name, time) => + val ruleMetrics = map.toSeq.sortBy(_._2).reverseIterator.map { case (name, time) => val timeEffectiveRun = timeEffectiveRunsMap.get(name) val numRuns = numRunsMap.get(name) val numEffectiveRun = numEffectiveRunsMap.get(name) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala index a30734abfa7..9dc6d7aab8b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/ddl.scala @@ -707,7 +707,7 @@ case class RepairTableCommand( val partitionSpecsAndLocs: Seq[(TablePartitionSpec, Path)] = try { scanPartitions(spark, fs, pathFilter, root, Map(), table.partitionColumnNames, threshold, - spark.sessionState.conf.resolver, new ForkJoinTaskSupport(evalPool)).seq + spark.sessionState.conf.resolver, new ForkJoinTaskSupport(evalPool)) } finally { evalPool.shutdown() } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala index 826543fd565..2ff478ef98e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/memory.scala @@ -69,7 +69,7 @@ abstract class MemoryStreamBase[A : Encoder](sqlContext: SQLContext) extends Spa } def addData(data: A*): OffsetV2 = { - addData(data.toTraversable) + addData(data) } def addData(data: IterableOnce[A]): OffsetV2 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index e54ce649736..2174e91cb44 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -174,7 +174,7 @@ class StreamSuite extends StreamTest { try { query.processAllAvailable() val outputDf = spark.read.parquet(outputDir.getAbsolutePath).as[Long] - checkDatasetUnorderly[Long](outputDf, (0L to 10L).union((0L to 10L)).toArray: _*) + checkDatasetUnorderly[Long](outputDf, (0L to 10L).concat(0L to 10L): _*) } finally { query.stop() } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org