This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 89ca8b6065e 
[SPARK-45605][CORE][SQL][SS][CONNECT][MLLIB][GRAPHX][DSTREAM][PROTOBUF][EXAMPLES]
 Replace `s.c.MapOps.mapValues` with `s.c.MapOps.view.mapValues`
89ca8b6065e is described below

commit 89ca8b6065e9f690a492c778262080741d50d94d
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Sun Oct 29 09:19:30 2023 -0500

    
[SPARK-45605][CORE][SQL][SS][CONNECT][MLLIB][GRAPHX][DSTREAM][PROTOBUF][EXAMPLES]
 Replace `s.c.MapOps.mapValues` with `s.c.MapOps.view.mapValues`
    
    ### What changes were proposed in this pull request?
    This pr replace `s.c.MapOps.mapValues` with `s.c.MapOps.view.mapValues`  
due to `s.c.MapOps.mapValues` marked as deprecated since Scala 2.13.0:
    
    
https://github.com/scala/scala/blob/bf45e199e96383b96a6955520d7d2524c78e6e12/src/library/scala/collection/Map.scala#L256-L262
    
    ```scala
      deprecated("Use .view.mapValues(f). A future version will include a 
strict version of this method (for now, .view.mapValues(f).toMap).", "2.13.0")
      def mapValues[W](f: V => W): MapView[K, W] = new MapView.MapValues(this, 
f)
    ```
    
    ### Why are the changes needed?
    Cleanup deprecated API usage.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    - Pass GitHub Acitons
    - Packaged the client, manually tested 
`DFSReadWriteTest/MiniReadWriteTest/PowerIterationClusteringExample`.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #43448 from LuciferYang/SPARK-45605.
    
    Lead-authored-by: yangjie01 <yangji...@baidu.com>
    Co-authored-by: YangJie <yangji...@baidu.com>
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 .../spark/util/sketch/CountMinSketchSuite.scala    |  2 +-
 .../org/apache/spark/sql/avro/AvroUtils.scala      |  1 +
 .../scala/org/apache/spark/sql/SparkSession.scala  |  2 +-
 .../spark/sql/ClientDataFrameStatSuite.scala       |  2 +-
 .../org/apache/spark/sql/connect/dsl/package.scala |  2 +-
 .../sql/connect/planner/SparkConnectPlanner.scala  | 13 ++++++----
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  |  3 ++-
 .../apache/spark/sql/kafka010/KafkaTestUtils.scala |  2 +-
 .../streaming/kafka010/ConsumerStrategy.scala      |  6 ++---
 .../kafka010/DirectKafkaInputDStream.scala         |  2 +-
 .../kafka010/DirectKafkaStreamSuite.scala          |  2 +-
 .../spark/streaming/kafka010/KafkaTestUtils.scala  |  2 +-
 .../spark/streaming/kinesis/KinesisTestUtils.scala |  2 +-
 .../kinesis/KPLBasedKinesisTestUtils.scala         |  2 +-
 .../kinesis/KinesisBackedBlockRDDSuite.scala       |  4 +--
 .../spark/sql/protobuf/utils/ProtobufUtils.scala   |  1 +
 .../org/apache/spark/api/java/JavaPairRDD.scala    |  4 +--
 .../apache/spark/api/java/JavaSparkContext.scala   |  2 +-
 .../spark/api/python/PythonWorkerFactory.scala     |  2 +-
 .../apache/spark/scheduler/InputFormatInfo.scala   |  2 +-
 .../apache/spark/scheduler/TaskSchedulerImpl.scala |  2 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala    |  2 +-
 ...plicationEnvironmentInfoWrapperSerializer.scala |  5 ++--
 .../ExecutorSummaryWrapperSerializer.scala         |  3 ++-
 .../status/protobuf/JobDataWrapperSerializer.scala |  2 +-
 .../protobuf/StageDataWrapperSerializer.scala      |  6 ++---
 .../org/apache/spark/SparkThrowableSuite.scala     |  2 +-
 .../apache/spark/rdd/PairRDDFunctionsSuite.scala   |  2 +-
 .../test/scala/org/apache/spark/rdd/RDDSuite.scala |  1 +
 .../scheduler/ExecutorResourceInfoSuite.scala      |  1 +
 .../BlockManagerDecommissionIntegrationSuite.scala |  2 +-
 .../storage/ShuffleBlockFetcherIteratorSuite.scala |  2 +-
 .../util/collection/ExternalSorterSuite.scala      |  2 +-
 .../apache/spark/examples/DFSReadWriteTest.scala   |  1 +
 .../apache/spark/examples/MiniReadWriteTest.scala  |  1 +
 .../mllib/PowerIterationClusteringExample.scala    |  2 +-
 .../spark/graphx/lib/ShortestPathsSuite.scala      |  2 +-
 .../spark/ml/evaluation/ClusteringMetrics.scala    |  1 +
 .../apache/spark/ml/feature/VectorIndexer.scala    |  2 +-
 .../org/apache/spark/ml/feature/Word2Vec.scala     |  2 +-
 .../apache/spark/ml/tree/impl/RandomForest.scala   |  4 +--
 .../spark/mllib/clustering/BisectingKMeans.scala   |  2 +-
 .../mllib/linalg/distributed/BlockMatrix.scala     |  4 +--
 .../apache/spark/mllib/stat/test/ChiSqTest.scala   |  1 +
 .../apache/spark/ml/recommendation/ALSSuite.scala  |  8 +++---
 .../apache/spark/mllib/feature/Word2VecSuite.scala | 12 ++++-----
 .../org/apache/spark/sql/types/Metadata.scala      |  2 +-
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  3 ++-
 .../catalyst/catalog/ExternalCatalogUtils.scala    |  2 +-
 .../sql/catalyst/catalog/SessionCatalog.scala      |  2 +-
 .../spark/sql/catalyst/expressions/package.scala   |  2 +-
 .../catalyst/optimizer/NestedColumnAliasing.scala  |  2 +-
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  1 +
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  2 +-
 .../apache/spark/sql/catalyst/trees/TreeNode.scala |  4 +--
 .../spark/sql/catalyst/util/ToNumberParser.scala   |  2 +-
 .../spark/sql/errors/QueryCompilationErrors.scala  |  1 +
 .../scala/org/apache/spark/sql/SparkSession.scala  |  2 +-
 .../command/AnalyzePartitionCommand.scala          |  2 +-
 .../execution/datasources/PartitioningUtils.scala  |  2 +-
 .../execution/datasources/orc/OrcFiltersBase.scala |  1 +
 .../datasources/parquet/ParquetFilters.scala       |  1 +
 .../execution/exchange/EnsureRequirements.scala    |  1 +
 .../sql/execution/streaming/ProgressReporter.scala | 12 ++++-----
 .../sql/execution/streaming/state/RocksDB.scala    |  4 +--
 .../execution/streaming/statefulOperators.scala    |  2 +-
 .../spark/sql/execution/ui/AllExecutionsPage.scala |  6 ++---
 .../org/apache/spark/sql/DataFrameStatSuite.scala  |  2 +-
 .../org/apache/spark/sql/SQLQueryTestSuite.scala   |  2 +-
 .../test/scala/org/apache/spark/sql/UDFSuite.scala |  2 +-
 .../spark/sql/execution/SQLViewTestSuite.scala     |  2 +-
 .../command/AlterTableDropPartitionSuiteBase.scala |  2 +-
 .../datasources/parquet/ParquetIOSuite.scala       |  2 +-
 .../sql/execution/metric/SQLMetricsTestUtils.scala |  4 +--
 .../execution/ui/SQLAppStatusListenerSuite.scala   | 30 +++++++++++-----------
 .../StreamingQueryStatusAndProgressSuite.scala     |  6 ++---
 .../sql/streaming/continuous/ContinuousSuite.scala |  2 +-
 .../spark/sql/hive/HiveExternalCatalog.scala       |  2 +-
 .../api/java/JavaStreamingListenerWrapper.scala    |  4 +--
 .../scheduler/ReceiverSchedulingPolicy.scala       |  2 +-
 .../streaming/scheduler/ReceiverTracker.scala      |  2 +-
 .../apache/spark/streaming/ui/BatchUIData.scala    |  2 +-
 .../ui/StreamingJobProgressListener.scala          |  5 ++--
 83 files changed, 140 insertions(+), 119 deletions(-)

diff --git 
a/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala
 
b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala
index 087dae26047..689452caa32 100644
--- 
a/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala
+++ 
b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala
@@ -56,7 +56,7 @@ class CountMinSketchSuite extends AnyFunSuite { // 
scalastyle:ignore funsuite
 
       val exactFreq = {
         val sampledItems = sampledItemIndices.map(allItems)
-        sampledItems.groupBy(identity).mapValues(_.length.toLong)
+        sampledItems.groupBy(identity).view.mapValues(_.length.toLong)
       }
 
       val sketch = CountMinSketch.create(epsOfTotalCount, confidence, seed)
diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
index 6a1655a91c9..e738f541ca7 100644
--- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
+++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
@@ -242,6 +242,7 @@ private[sql] object AvroUtils extends Logging {
     private[this] val avroFieldArray = avroSchema.getFields.asScala.toArray
     private[this] val fieldMap = avroSchema.getFields.asScala
       .groupBy(_.name.toLowerCase(Locale.ROOT))
+      .view
       .mapValues(_.toSeq) // toSeq needed for scala 2.13
 
     /** The fields which have matching equivalents in both Avro and Catalyst 
schemas. */
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 5e640bea570..78daaa5d3f5 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -306,7 +306,7 @@ class SparkSession private[sql] (
           proto.SqlCommand
             .newBuilder()
             .setSql(sqlText)
-            
.putAllNamedArguments(args.asScala.mapValues(lit(_).expr).toMap.asJava)))
+            
.putAllNamedArguments(args.asScala.view.mapValues(lit(_).expr).toMap.asJava)))
       val plan = proto.Plan.newBuilder().setCommand(cmd)
       // .toBuffer forces that the iterator is consumed and closed
       val responseSeq = client.execute(plan.build()).toBuffer.toSeq
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDataFrameStatSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDataFrameStatSuite.scala
index 069d8ec502f..747ca45d10d 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDataFrameStatSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDataFrameStatSuite.scala
@@ -120,7 +120,7 @@ class ClientDataFrameStatSuite extends RemoteSparkSession {
     val columnNames = crosstab.schema.fieldNames
     assert(columnNames(0) === "a_b")
     // reduce by key
-    val expected = data.map(t => (t, 1)).groupBy(_._1).mapValues(_.length)
+    val expected = data.map(t => (t, 1)).groupBy(_._1).view.mapValues(_.length)
     val rows = crosstab.collect()
     rows.foreach { row =>
       val i = row.getString(0).toInt
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
index 7c41491ba06..24fa2324f66 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
@@ -361,7 +361,7 @@ package object dsl {
       }
 
       def fillValueMap(valueMap: Map[String, Any]): Relation = {
-        val (cols, values) = valueMap.mapValues(toLiteralProto).toSeq.unzip
+        val (cols, values) = 
valueMap.view.mapValues(toLiteralProto).toSeq.unzip
         Relation
           .newBuilder()
           .setFillNa(
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index f5d83b81999..17c10e63301 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -287,11 +287,11 @@ class SparkConnectPlanner(
     if (!namedArguments.isEmpty) {
       NameParameterizedQuery(
         parsedPlan,
-        namedArguments.asScala.mapValues(transformExpression).toMap)
+        namedArguments.asScala.view.mapValues(transformExpression).toMap)
     } else if (!posArguments.isEmpty) {
       PosParameterizedQuery(parsedPlan, 
posArguments.asScala.map(transformExpression).toSeq)
     } else if (!args.isEmpty) {
-      NameParameterizedQuery(parsedPlan, 
args.asScala.mapValues(transformLiteral).toMap)
+      NameParameterizedQuery(parsedPlan, 
args.asScala.view.mapValues(transformLiteral).toMap)
     } else if (!posArgs.isEmpty) {
       PosParameterizedQuery(parsedPlan, 
posArgs.asScala.map(transformLiteral).toSeq)
     } else {
@@ -2518,7 +2518,7 @@ class SparkConnectPlanner(
     val df = if (!namedArguments.isEmpty) {
       session.sql(
         getSqlCommand.getSql,
-        namedArguments.asScala.mapValues(e => 
Column(transformExpression(e))).toMap,
+        namedArguments.asScala.view.mapValues(e => 
Column(transformExpression(e))).toMap,
         tracker)
     } else if (!posArguments.isEmpty) {
       session.sql(
@@ -2526,7 +2526,10 @@ class SparkConnectPlanner(
         posArguments.asScala.map(e => Column(transformExpression(e))).toArray,
         tracker)
     } else if (!args.isEmpty) {
-      session.sql(getSqlCommand.getSql, 
args.asScala.mapValues(transformLiteral).toMap, tracker)
+      session.sql(
+        getSqlCommand.getSql,
+        args.asScala.view.mapValues(transformLiteral).toMap,
+        tracker)
     } else if (!posArgs.isEmpty) {
       session.sql(getSqlCommand.getSql, 
posArgs.asScala.map(transformLiteral).toArray, tracker)
     } else {
@@ -3262,7 +3265,7 @@ class SparkConnectPlanner(
           proto.GetResourcesCommandResult
             .newBuilder()
             .putAllResources(
-              session.sparkContext.resources
+              session.sparkContext.resources.view
                 .mapValues(resource =>
                   proto.ResourceInformation
                     .newBuilder()
diff --git 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 4e8da137a47..54e05ff95c9 100644
--- 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -2294,7 +2294,8 @@ abstract class KafkaSourceSuiteBase extends 
KafkaSourceTest {
       Execute { q =>
         // wait to reach the last offset in every partition
         q.awaitOffset(0,
-          KafkaSourceOffset(partitionOffsets.mapValues(_ => 3L).toMap), 
streamingTimeout.toMillis)
+          KafkaSourceOffset(partitionOffsets.view.mapValues(_ => 3L).toMap),
+          streamingTimeout.toMillis)
       },
       CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22),
       StopStream,
diff --git 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index c54afc6290b..1624f7320bb 100644
--- 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -377,7 +377,7 @@ class KafkaTestUtils(
   }
 
   def getAllTopicsAndPartitionSize(): Seq[(String, Int)] = {
-    
zkClient.getPartitionsForTopics(zkClient.getAllTopicsInCluster()).mapValues(_.size).toSeq
+    
zkClient.getPartitionsForTopics(zkClient.getAllTopicsInCluster()).view.mapValues(_.size).toSeq
   }
 
   /** Create a Kafka topic and wait until it is propagated to the whole 
cluster */
diff --git 
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
 
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
index b96a2597f5d..1dd66675b91 100644
--- 
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
+++ 
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
@@ -241,7 +241,7 @@ object ConsumerStrategies {
     new Subscribe[K, V](
       new ju.ArrayList(topics.asJavaCollection),
       new ju.HashMap[String, Object](kafkaParams.asJava),
-      new ju.HashMap[TopicPartition, 
jl.Long](offsets.mapValues(jl.Long.valueOf).toMap.asJava))
+      new ju.HashMap[TopicPartition, 
jl.Long](offsets.view.mapValues(jl.Long.valueOf).toMap.asJava))
   }
 
   /**
@@ -320,7 +320,7 @@ object ConsumerStrategies {
     new SubscribePattern[K, V](
       pattern,
       new ju.HashMap[String, Object](kafkaParams.asJava),
-      new ju.HashMap[TopicPartition, 
jl.Long](offsets.mapValues(jl.Long.valueOf).toMap.asJava))
+      new ju.HashMap[TopicPartition, 
jl.Long](offsets.view.mapValues(jl.Long.valueOf).toMap.asJava))
   }
 
   /**
@@ -404,7 +404,7 @@ object ConsumerStrategies {
     new Assign[K, V](
       new ju.ArrayList(topicPartitions.asJavaCollection),
       new ju.HashMap[String, Object](kafkaParams.asJava),
-      new ju.HashMap[TopicPartition, 
jl.Long](offsets.mapValues(jl.Long.valueOf).toMap.asJava))
+      new ju.HashMap[TopicPartition, 
jl.Long](offsets.view.mapValues(jl.Long.valueOf).toMap.asJava))
   }
 
   /**
diff --git 
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
index d8037269961..f5967a74ad3 100644
--- 
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
+++ 
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
@@ -71,7 +71,7 @@ private[spark] class DirectKafkaInputDStream[K, V](
   def consumer(): Consumer[K, V] = this.synchronized {
     if (null == kc) {
       kc = consumerStrategy.onStart(
-        currentOffsets.mapValues(l => java.lang.Long.valueOf(l)).toMap.asJava)
+        currentOffsets.view.mapValues(l => 
java.lang.Long.valueOf(l)).toMap.asJava)
     }
     kc
   }
diff --git 
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
 
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
index 7b2cac4a68b..faf114108fa 100644
--- 
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
+++ 
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
@@ -726,7 +726,7 @@ class DirectKafkaStreamSuite
   /** Get the generated offset ranges from the DirectKafkaStream */
   private def getOffsetRanges[K, V](
       kafkaStream: DStream[ConsumerRecord[K, V]]): Seq[(Time, 
Array[OffsetRange])] = {
-    kafkaStream.generatedRDDs.mapValues { rdd =>
+    kafkaStream.generatedRDDs.view.mapValues { rdd =>
       rdd.asInstanceOf[HasOffsetRanges].offsetRanges
     }.toSeq.sortBy { _._1 }
   }
diff --git 
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
 
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
index 6a9ef52e990..66c253172e7 100644
--- 
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
+++ 
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
@@ -206,7 +206,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
 
   /** Java-friendly function for sending messages to the Kafka broker */
   def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = {
-    sendMessages(topic, 
Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*))
+    sendMessages(topic, 
Map(messageToFreq.asScala.view.mapValues(_.intValue()).toSeq: _*))
   }
 
   /** Send the messages to the Kafka broker */
diff --git 
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
 
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
index 50dfd50aa23..10d24df7b61 100644
--- 
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++ 
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -289,6 +289,6 @@ private[kinesis] class SimpleDataGenerator(
       sentSeqNumbers += ((num, seqNumber))
     }
 
-    shardIdToSeqNumbers.mapValues(_.toSeq).toMap
+    shardIdToSeqNumbers.view.mapValues(_.toSeq).toMap
   }
 }
diff --git 
a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
 
b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
index f95bee1d98f..2799965a8fc 100644
--- 
a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
+++ 
b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
@@ -72,6 +72,6 @@ private[kinesis] class KPLDataGenerator(regionName: String) 
extends KinesisDataG
       Futures.addCallback(future, kinesisCallBack, 
ThreadUtils.sameThreadExecutorService())
     }
     producer.flushSync()
-    shardIdToSeqNumbers.mapValues(_.toSeq).toMap
+    shardIdToSeqNumbers.view.mapValues(_.toSeq).toMap
   }
 }
diff --git 
a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
 
b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
index 4b3b7454b86..c9d1498a5a4 100644
--- 
a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
+++ 
b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
@@ -47,8 +47,8 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: 
Boolean)
       require(shardIdToDataAndSeqNumbers.size > 1, "Need data to be sent to 
multiple shards")
 
       shardIds = shardIdToDataAndSeqNumbers.keySet.toSeq
-      shardIdToData = shardIdToDataAndSeqNumbers.mapValues(_.map(_._1)).toMap
-      shardIdToSeqNumbers = 
shardIdToDataAndSeqNumbers.mapValues(_.map(_._2)).toMap
+      shardIdToData = 
shardIdToDataAndSeqNumbers.view.mapValues(_.map(_._1)).toMap
+      shardIdToSeqNumbers = 
shardIdToDataAndSeqNumbers.view.mapValues(_.map(_._2)).toMap
       shardIdToRange = shardIdToSeqNumbers.map { case (shardId, seqNumbers) =>
         val seqNumRange = SequenceNumberRange(
           testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last, 
seqNumbers.size)
diff --git 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala
 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala
index 2388668f66a..6c18a8863af 100644
--- 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala
+++ 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala
@@ -75,6 +75,7 @@ private[sql] object ProtobufUtils extends Logging {
     private[this] val protoFieldArray = descriptor.getFields.asScala.toArray
     private[this] val fieldMap = descriptor.getFields.asScala
       .groupBy(_.getName.toLowerCase(Locale.ROOT))
+      .view
       .mapValues(_.toSeq) // toSeq needed for scala 2.13
 
     /** The fields which have matching equivalents in both Protobuf and 
Catalyst schemas. */
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala 
b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index 7d11e4b157a..aa8e1b1520e 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -147,7 +147,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
       seed: Long): JavaPairRDD[K, V] =
     new JavaPairRDD[K, V](rdd.sampleByKey(
       withReplacement,
-      fractions.asScala.mapValues(_.toDouble).toMap, // map to Scala Double; 
toMap to serialize
+      fractions.asScala.view.mapValues(_.toDouble).toMap, // map to Scala 
Double; toMap to serialize
       seed))
 
   /**
@@ -179,7 +179,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
       seed: Long): JavaPairRDD[K, V] =
     new JavaPairRDD[K, V](rdd.sampleByKeyExact(
       withReplacement,
-      fractions.asScala.mapValues(_.toDouble).toMap, // map to Scala Double; 
toMap to serialize
+      fractions.asScala.view.mapValues(_.toDouble).toMap, // map to Scala 
Double; toMap to serialize
       seed))
 
   /**
diff --git 
a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala 
b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 6ead36971a9..c016910ed76 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -782,7 +782,7 @@ class JavaSparkContext(val sc: SparkContext) extends 
Closeable {
    * @note This does not necessarily mean the caching or computation was 
successful.
    */
   def getPersistentRDDs: JMap[java.lang.Integer, JavaRDD[_]] = {
-    sc.getPersistentRDDs.mapValues(s => JavaRDD.fromRDD(s)).toMap
+    sc.getPersistentRDDs.view.mapValues(s => JavaRDD.fromRDD(s)).toMap
       .asJava.asInstanceOf[JMap[java.lang.Integer, JavaRDD[_]]]
   }
 
diff --git 
a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index 2c8f4f2ca2a..14265f03795 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -369,7 +369,7 @@ private[spark] class PythonWorkerFactory(
         daemon = null
         daemonPort = 0
       } else {
-        simpleWorkers.mapValues(_.destroy())
+        simpleWorkers.view.mapValues(_.destroy())
       }
     }
   }
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala 
b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
index 4b3fd580341..e04c5b2de7d 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
@@ -178,6 +178,6 @@ object InputFormatInfo {
       }
     }
 
-    nodeToSplit.mapValues(_.toSet).toMap
+    nodeToSplit.view.mapValues(_.toSet).toMap
   }
 }
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index d00578e73e9..41f6b3ad64b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -155,7 +155,7 @@ private[spark] class TaskSchedulerImpl(
       .build[String, ExecutorDecommissionState]()
 
   def runningTasksByExecutors: Map[String, Int] = synchronized {
-    executorIdToRunningTaskIds.toMap.mapValues(_.size).toMap
+    executorIdToRunningTaskIds.toMap.view.mapValues(_.size).toMap
   }
 
   // The set of executors we have on each host; this is used to compute 
hostsAlive, which
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index dd53757fe85..c49b2411e76 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -736,7 +736,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   }
 
   def getExecutorsWithRegistrationTs(): Map[String, Long] = synchronized {
-    executorDataMap.mapValues(v => v.registrationTs).toMap
+    executorDataMap.view.mapValues(v => v.registrationTs).toMap
   }
 
   override def isExecutorActive(id: String): Boolean = synchronized {
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
index baedccd9b92..4ee864f00e8 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
@@ -130,9 +130,10 @@ private[protobuf] class 
ApplicationEnvironmentInfoWrapperSerializer
     new ResourceProfileInfo(
       id = info.getId,
       executorResources =
-        
info.getExecutorResourcesMap.asScala.mapValues(deserializeExecutorResourceRequest).toMap,
+        
info.getExecutorResourcesMap.asScala.view.mapValues(deserializeExecutorResourceRequest)
+          .toMap,
       taskResources =
-        
info.getTaskResourcesMap.asScala.mapValues(deserializeTaskResourceRequest).toMap)
+        
info.getTaskResourcesMap.asScala.view.mapValues(deserializeTaskResourceRequest).toMap)
   }
 
   private def deserializeExecutorResourceRequest(info: 
StoreTypes.ExecutorResourceRequest):
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
index 97daf37995d..188187c9b75 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
@@ -137,7 +137,8 @@ private[protobuf] class ExecutorSummaryWrapperSerializer
       blacklistedInStages = 
binary.getBlacklistedInStagesList.asScala.map(_.toInt).toSet,
       peakMemoryMetrics = peakMemoryMetrics,
       attributes = binary.getAttributesMap.asScala.toMap,
-      resources = 
binary.getResourcesMap.asScala.mapValues(deserializeResourceInformation).toMap,
+      resources =
+        
binary.getResourcesMap.asScala.view.mapValues(deserializeResourceInformation).toMap,
       resourceProfileId = binary.getResourceProfileId,
       isExcluded = binary.getIsExcluded,
       excludedInStages = 
binary.getExcludedInStagesList.asScala.map(_.toInt).toSet)
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
index 41d1fee1608..5252b8b8c01 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
@@ -107,6 +107,6 @@ private[protobuf] class JobDataWrapperSerializer extends 
ProtobufSerDe[JobDataWr
       numCompletedStages = info.getNumCompletedStages,
       numSkippedStages = info.getNumSkippedStages,
       numFailedStages = info.getNumFailedStages,
-      killedTasksSummary = 
info.getKillTasksSummaryMap.asScala.mapValues(_.toInt).toMap)
+      killedTasksSummary = 
info.getKillTasksSummaryMap.asScala.view.mapValues(_.toInt).toMap)
   }
 }
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
index 8793ca7a12c..4fbaff0327d 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
@@ -382,7 +382,7 @@ private[protobuf] class StageDataWrapperSerializer extends 
ProtobufSerDe[StageDa
     new StageDataWrapper(
       info = info,
       jobIds = binary.getJobIdsList.asScala.map(_.toInt).toSet,
-      locality = binary.getLocalityMap.asScala.mapValues(_.toLong).toMap
+      locality = binary.getLocalityMap.asScala.view.mapValues(_.toLong).toMap
     )
   }
 
@@ -402,7 +402,7 @@ private[protobuf] class StageDataWrapperSerializer extends 
ProtobufSerDe[StageDa
         entry => (entry._1.toLong, deserializeTaskData(entry._2))).toMap)
     } else None
     val executorSummary = if 
(MapUtils.isNotEmpty(binary.getExecutorSummaryMap)) {
-      Some(binary.getExecutorSummaryMap.asScala.mapValues(
+      Some(binary.getExecutorSummaryMap.asScala.view.mapValues(
         ExecutorStageSummarySerializer.deserialize).toMap)
     } else None
     val speculationSummary =
@@ -475,7 +475,7 @@ private[protobuf] class StageDataWrapperSerializer extends 
ProtobufSerDe[StageDa
       tasks = tasks,
       executorSummary = executorSummary,
       speculationSummary = speculationSummary,
-      killedTasksSummary = 
binary.getKilledTasksSummaryMap.asScala.mapValues(_.toInt).toMap,
+      killedTasksSummary = 
binary.getKilledTasksSummaryMap.asScala.view.mapValues(_.toInt).toMap,
       resourceProfileId = binary.getResourceProfileId,
       peakExecutorMetrics = peakExecutorMetrics,
       taskMetricsDistributions = taskMetricsDistributions,
diff --git a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
index fb82714949b..9f32d81f1ae 100644
--- a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
@@ -65,7 +65,7 @@ class SparkThrowableSuite extends SparkFunSuite {
   }
 
   def checkIfUnique(ss: Seq[Any]): Unit = {
-    val dups = ss.groupBy(identity).mapValues(_.size).filter(_._2 > 
1).keys.toSeq
+    val dups = ss.groupBy(identity).view.mapValues(_.size).filter(_._2 > 
1).keys.toSeq
     assert(dups.isEmpty, s"Duplicate error classes: ${dups.mkString(", ")}")
   }
 
diff --git 
a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index a669993352f..c30b4ca4dae 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -805,7 +805,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with 
SharedSparkContext {
         seed: Long,
         n: Long): Unit = {
       val trials = stratifiedData.countByKey()
-      val expectedSampleSize = stratifiedData.countByKey().mapValues(count =>
+      val expectedSampleSize = 
stratifiedData.countByKey().view.mapValues(count =>
         math.ceil(count * samplingRate).toInt)
       val fractions = Map("1" -> samplingRate, "0" -> samplingRate)
       val sample = if (exact) {
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 046017af3a3..f925a8b8b71 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -1254,6 +1254,7 @@ class RDDSuite extends SparkFunSuite with 
SharedSparkContext with Eventually {
       .getPartitions
       .map(coalescedRDD.getPreferredLocations(_).head)
       .groupBy(identity)
+      .view
       .mapValues(_.size)
 
     // Make sure the coalesced partitions are distributed fairly evenly 
between the two locations.
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala
index e392ff53e02..3f99e2b4598 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala
@@ -101,6 +101,7 @@ class ExecutorResourceInfoSuite extends SparkFunSuite {
       // assert that each address was assigned `slots` times
       info.assignedAddrs
         .groupBy(identity)
+        .view
         .mapValues(_.size)
         .foreach(x => assert(x._2 == slots))
 
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
index 5ab9f644be6..59ebc750af9 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
@@ -299,7 +299,7 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
         val blockLocs = rddUpdates.map { update =>
           (update.blockUpdatedInfo.blockId.name,
             update.blockUpdatedInfo.blockManagerId)}
-        val blocksToManagers = blockLocs.groupBy(_._1).mapValues(_.size)
+        val blocksToManagers = blockLocs.groupBy(_._1).view.mapValues(_.size)
         assert(blocksToManagers.exists(_._2 > 1),
           s"We should have a block that has been on multiple BMs in rdds:\n 
${rddUpdates} from:\n" +
           s"${blocksUpdated}\n but instead we got:\n ${blocksToManagers}")
diff --git 
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index c16dae77b83..769939a0a22 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -988,7 +988,7 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
       ShuffleBlockId(0, 1, 0) -> createMockManagedBuffer()
     )
 
-    configureMockTransfer(blocks.mapValues(_ => 
createMockManagedBuffer(0)).toMap)
+    configureMockTransfer(blocks.view.mapValues(_ => 
createMockManagedBuffer(0)).toMap)
 
     val iterator = createShuffleBlockIteratorWithDefaults(
       Map(remoteBmId -> toBlockList(blocks.keys, 1L, 0))
diff --git 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index 7bec9612187..02cc2bb35af 100644
--- 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -544,7 +544,7 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
     val expected = (0 until 3).map { p =>
       var v = (0 until size).map { i => (i / 4, i) }.filter { case (k, _) => k 
% 3 == p }.toSet
       if (withPartialAgg) {
-        v = v.groupBy(_._1).mapValues { s => s.map(_._2).sum }.toSet
+        v = v.groupBy(_._1).view.mapValues { s => s.map(_._2).sum }.toSet
       }
       (p, v.toSet)
     }.toSet
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala 
b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
index 5dfb8ec898c..4cad0f16426 100644
--- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
@@ -89,6 +89,7 @@ object DFSReadWriteTest {
       .flatMap(_.split("\t"))
       .filter(_.nonEmpty)
       .groupBy(w => w)
+      .view
       .mapValues(_.size)
       .values
       .sum
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/MiniReadWriteTest.scala 
b/examples/src/main/scala/org/apache/spark/examples/MiniReadWriteTest.scala
index c003dc8ba38..9095c9b75af 100644
--- a/examples/src/main/scala/org/apache/spark/examples/MiniReadWriteTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/MiniReadWriteTest.scala
@@ -81,6 +81,7 @@ object MiniReadWriteTest {
       .flatMap(_.split("\t"))
       .filter(_.nonEmpty)
       .groupBy(w => w)
+      .view
       .mapValues(_.size)
       .values
       .sum
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
 
b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
index 2a77702e223..d26c9a3a056 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
@@ -101,7 +101,7 @@ object PowerIterationClusteringExample {
       .setInitializationMode("degree")
       .run(circlesRdd)
 
-    val clusters = 
model.assignments.collect().groupBy(_.cluster).mapValues(_.map(_.id))
+    val clusters = 
model.assignments.collect().groupBy(_.cluster).view.mapValues(_.map(_.id))
     val assignments = clusters.toList.sortBy { case (k, v) => v.length }
     val assignmentsStr = assignments
       .map { case (k, v) =>
diff --git 
a/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala
index 527187d51ae..1f158808215 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala
@@ -34,7 +34,7 @@ class ShortestPathsSuite extends SparkFunSuite with 
LocalSparkContext {
       val graph = Graph.fromEdgeTuples(edges, 1)
       val landmarks = Seq(1, 4).map(_.toLong)
       val results = ShortestPaths.run(graph, landmarks).vertices.collect().map 
{
-        case (v, spMap) => (v, spMap.mapValues(i => i).toMap)
+        case (v, spMap) => (v, spMap.view.mapValues(i => i).toMap)
       }
       assert(results.toSet === shortestPaths)
     }
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala 
b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala
index b8563bed601..ee87f49806a 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala
@@ -331,6 +331,7 @@ private[evaluation] object SquaredEuclideanSilhouette 
extends Silhouette {
 
     clustersStatsRDD
       .collectAsMap()
+      .view
       .mapValues {
         case (featureSum: DenseVector, squaredNormSum: Double, weightSum: 
Double) =>
           SquaredEuclideanSilhouette.ClusterStats(featureSum, squaredNormSum, 
weightSum)
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
index cf1751f86f9..ff997194b42 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
@@ -300,7 +300,7 @@ class VectorIndexerModel private[ml] (
   /** Java-friendly version of [[categoryMaps]] */
   @Since("1.4.0")
   def javaCategoryMaps: JMap[JInt, JMap[JDouble, JInt]] = {
-    categoryMaps.mapValues(_.asJava).toMap.asJava.asInstanceOf[JMap[JInt, 
JMap[JDouble, JInt]]]
+    categoryMaps.view.mapValues(_.asJava).toMap.asJava.asInstanceOf[JMap[JInt, 
JMap[JDouble, JInt]]]
   }
 
   /**
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
index 6a22c3580e3..053aaac742b 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
@@ -217,7 +217,7 @@ class Word2VecModel private[ml] (
   @Since("1.5.0")
   @transient lazy val getVectors: DataFrame = {
     val spark = SparkSession.builder().getOrCreate()
-    val wordVec = wordVectors.getVectors.mapValues(vec => 
Vectors.dense(vec.map(_.toDouble)))
+    val wordVec = wordVectors.getVectors.view.mapValues(vec => 
Vectors.dense(vec.map(_.toDouble)))
     spark.createDataFrame(wordVec.toSeq).toDF("word", "vector")
   }
 
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala 
b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
index 012e942a60a..4e5b8cd5efe 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
@@ -1292,8 +1292,8 @@ private[spark] object RandomForest extends Logging with 
Serializable {
     }
     // Convert mutable maps to immutable ones.
     val nodesForGroup: Map[Int, Array[LearningNode]] =
-      mutableNodesForGroup.mapValues(_.toArray).toMap
-    val treeToNodeToIndexInfo = 
mutableTreeToNodeToIndexInfo.mapValues(_.toMap).toMap
+      mutableNodesForGroup.view.mapValues(_.toArray).toMap
+    val treeToNodeToIndexInfo = 
mutableTreeToNodeToIndexInfo.view.mapValues(_.toMap).toMap
     (nodesForGroup, treeToNodeToIndexInfo)
   }
 
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala
index a53581e8a60..fce2537b761 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala
@@ -220,7 +220,7 @@ class BisectingKMeans private (
             divisibleIndices.contains(parentIndex(index))
           }
           newClusters = summarize(d, newAssignments, dMeasure)
-          newClusterCenters = 
newClusters.mapValues(_.center).map(identity).toMap
+          newClusterCenters = 
newClusters.view.mapValues(_.center).map(identity).toMap
         }
         if (preIndices != null) {
           preIndices.unpersist()
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
index 8ab6aba641a..af0d9e48a3b 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
@@ -443,7 +443,7 @@ class BlockMatrix @Since("1.3.0") (
     val leftMatrix = blockInfo.keys.collect()
     val rightMatrix = other.blockInfo.keys.collect()
 
-    val rightCounterpartsHelper = 
rightMatrix.groupBy(_._1).mapValues(_.map(_._2))
+    val rightCounterpartsHelper = 
rightMatrix.groupBy(_._1).view.mapValues(_.map(_._2))
     val leftDestinations = leftMatrix.map { case (rowIndex, colIndex) =>
       val rightCounterparts = rightCounterpartsHelper.getOrElse(colIndex, 
Array.emptyIntArray)
       val partitions = rightCounterparts.map(b => 
partitioner.getPartition((rowIndex, b)))
@@ -452,7 +452,7 @@ class BlockMatrix @Since("1.3.0") (
         partitions.toSet.map((pid: Int) => pid * midDimSplitNum + 
midDimSplitIndex))
     }.toMap
 
-    val leftCounterpartsHelper = 
leftMatrix.groupBy(_._2).mapValues(_.map(_._1))
+    val leftCounterpartsHelper = 
leftMatrix.groupBy(_._2).view.mapValues(_.map(_._1))
     val rightDestinations = rightMatrix.map { case (rowIndex, colIndex) =>
       val leftCounterparts = leftCounterpartsHelper.getOrElse(rowIndex, 
Array.emptyIntArray)
       val partitions = leftCounterparts.map(b => partitioner.getPartition((b, 
colIndex)))
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala
index 9c761824134..ead9f887fe8 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala
@@ -162,6 +162,7 @@ private[spark] object ChiSqTest extends Logging {
           .map { case ((label, _), c) => (label, c) }
           .toArray
           .groupBy(_._1)
+          .view
           .mapValues(_.map(_._2).sum)
         labelCounts.foreach { case (label, countByLabel) =>
           val nnzByLabel = labelNNZ.getOrElse(label, 0L)
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
index d206b5fd280..08202c7f1f3 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
@@ -855,7 +855,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest 
with Logging {
 
     Seq(2, 4, 6).foreach { k =>
       val n = math.min(k, numItems).toInt
-      val expectedUpToN = expected.mapValues(_.slice(0, n))
+      val expectedUpToN = expected.view.mapValues(_.slice(0, n))
       val topItems = model.recommendForAllUsers(k)
       assert(topItems.count() == numUsers)
       assert(topItems.columns.contains("user"))
@@ -876,7 +876,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest 
with Logging {
 
     Seq(2, 3, 4).foreach { k =>
       val n = math.min(k, numUsers).toInt
-      val expectedUpToN = expected.mapValues(_.slice(0, n))
+      val expectedUpToN = expected.view.mapValues(_.slice(0, n))
       val topUsers = getALSModel.recommendForAllItems(k)
       assert(topUsers.count() == numItems)
       assert(topUsers.columns.contains("item"))
@@ -898,7 +898,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest 
with Logging {
 
     Seq(2, 4, 6).foreach { k =>
       val n = math.min(k, numItems).toInt
-      val expectedUpToN = expected.mapValues(_.slice(0, n))
+      val expectedUpToN = expected.view.mapValues(_.slice(0, n))
       val topItems = model.recommendForUserSubset(userSubset, k)
       assert(topItems.count() == numUsersSubset)
       assert(topItems.columns.contains("user"))
@@ -920,7 +920,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest 
with Logging {
 
     Seq(2, 3, 4).foreach { k =>
       val n = math.min(k, numUsers).toInt
-      val expectedUpToN = expected.mapValues(_.slice(0, n))
+      val expectedUpToN = expected.view.mapValues(_.slice(0, n))
       val topUsers = model.recommendForItemSubset(itemSubset, k)
       assert(topUsers.count() == numItemsSubset)
       assert(topUsers.columns.contains("item"))
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala
index e4cd492be3d..1973f306441 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala
@@ -43,8 +43,8 @@ class Word2VecSuite extends SparkFunSuite with 
MLlibTestSparkContext {
     // and a Word2VecMap give the same values.
     val word2VecMap = model.getVectors
     val newModel = new Word2VecModel(word2VecMap)
-    assert(newModel.getVectors.mapValues(_.toSeq).toMap ===
-      word2VecMap.mapValues(_.toSeq).toMap)
+    assert(newModel.getVectors.view.mapValues(_.toSeq).toMap ===
+      word2VecMap.view.mapValues(_.toSeq).toMap)
   }
 
   test("Word2Vec throws exception when vocabulary is empty") {
@@ -103,8 +103,8 @@ class Word2VecSuite extends SparkFunSuite with 
MLlibTestSparkContext {
     try {
       model.save(sc, path)
       val sameModel = Word2VecModel.load(sc, path)
-      assert(sameModel.getVectors.mapValues(_.toSeq).toMap ===
-        model.getVectors.mapValues(_.toSeq).toMap)
+      assert(sameModel.getVectors.view.mapValues(_.toSeq).toMap ===
+        model.getVectors.view.mapValues(_.toSeq).toMap)
     } finally {
       Utils.deleteRecursively(tempDir)
     }
@@ -138,8 +138,8 @@ class Word2VecSuite extends SparkFunSuite with 
MLlibTestSparkContext {
     try {
       model.save(sc, path)
       val sameModel = Word2VecModel.load(sc, path)
-      assert(sameModel.getVectors.mapValues(_.toSeq).toMap ===
-        model.getVectors.mapValues(_.toSeq).toMap)
+      assert(sameModel.getVectors.view.mapValues(_.toSeq).toMap ===
+        model.getVectors.view.mapValues(_.toSeq).toMap)
     }
     catch {
       case t: Throwable => fail("exception thrown persisting a model " +
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala
index 3677927b9a5..dc5e49fc35b 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala
@@ -210,7 +210,7 @@ object Metadata {
       // `map.mapValues` return `Map` in Scala 2.12 and return `MapView` in 
Scala 2.13, call
       // `toMap` for Scala version compatibility.
       case map: Map[_, _] =>
-        map.mapValues(hash).toMap.##
+        map.view.mapValues(hash).toMap.##
       case arr: Array[_] =>
         // Seq.empty[T] has the same hashCode regardless of T.
         arr.toSeq.map(hash).##
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 65338f9917b..3a5f60eb376 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1314,7 +1314,7 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
         val partCols = partitionColumnNames(r.table)
         validatePartitionSpec(partCols, i.partitionSpec)
 
-        val staticPartitions = 
i.partitionSpec.filter(_._2.isDefined).mapValues(_.get).toMap
+        val staticPartitions = 
i.partitionSpec.filter(_._2.isDefined).view.mapValues(_.get).toMap
         val query = addStaticPartitionColumns(r, 
projectByName.getOrElse(i.query), staticPartitions,
           isByName)
 
@@ -3616,6 +3616,7 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
         // `GetStructField` without the name property.
         .collect { case g: GetStructField if g.name.isEmpty => g }
         .groupBy(_.child)
+        .view
         .mapValues(_.map(_.ordinal).distinct.sorted)
 
       structChildToOrdinals.foreach { case (expr, ordinals) =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
index 67c57ec2787..0f235e0977b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
@@ -207,7 +207,7 @@ object ExternalCatalogUtils {
   }
 
   def convertNullPartitionValues(spec: TablePartitionSpec): TablePartitionSpec 
= {
-    spec.mapValues(v => if (v == null) DEFAULT_PARTITION_NAME else 
v).map(identity).toMap
+    spec.view.mapValues(v => if (v == null) DEFAULT_PARTITION_NAME else 
v).map(identity).toMap
   }
 }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index f48ff23f4ad..e71865df94d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -968,7 +968,7 @@ class SessionCatalog(
       } else {
         _.toLowerCase(Locale.ROOT)
       }
-      val nameToCounts = 
viewColumnNames.groupBy(normalizeColName).mapValues(_.length)
+      val nameToCounts = 
viewColumnNames.groupBy(normalizeColName).view.mapValues(_.length)
       val nameToCurrentOrdinal = 
scala.collection.mutable.HashMap.empty[String, Int]
       val viewDDL = buildViewDDL(metadata, isTempView)
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
index b32ef3d95aa..ea985ae5f30 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
@@ -126,7 +126,7 @@ package object expressions  {
     }
 
     private def unique[T](m: Map[T, Seq[Attribute]]): Map[T, Seq[Attribute]] = 
{
-      m.mapValues(_.distinct).toMap
+      m.view.mapValues(_.distinct).toMap
     }
 
     /** Map to use for direct case insensitive attribute lookups. */
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
index 5d4fcf772b8..710c07fab7e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
@@ -150,7 +150,7 @@ object NestedColumnAliasing {
 
     // A reference attribute can have multiple aliases for nested fields.
     val attrToAliases =
-      AttributeMap(attributeToExtractValuesAndAliases.mapValues(_.map(_._2)))
+      
AttributeMap(attributeToExtractValuesAndAliases.view.mapValues(_.map(_._2)))
 
     plan match {
       case Project(projectList, child) =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 6f65afada17..48ecb9aee21 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1055,6 +1055,7 @@ object CollapseProject extends Rule[LogicalPlan] with 
AliasHelper {
       .filter(_.references.exists(producerMap.contains))
       .flatMap(collectReferences)
       .groupBy(identity)
+      .view
       .mapValues(_.size)
       .forall {
         case (reference, count) =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 2b5f542f22b..d9d04db9ab0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -1083,7 +1083,7 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
     // window w1 as (partition by p_mfgr order by p_name
     //               range between 2 preceding and 2 following),
     //        w2 as w1
-    val windowMapView = baseWindowMap.mapValues {
+    val windowMapView = baseWindowMap.view.mapValues {
       case WindowSpecReference(name) =>
         baseWindowMap.get(name) match {
           case Some(spec: WindowSpecDefinition) =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index e7be8a7e29b..0e5dd301953 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -365,7 +365,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
         // `map.mapValues().view.force` return `Map` in Scala 2.12 but return 
`IndexedSeq` in Scala
         // 2.13, call `toMap` method manually to compatible with Scala 2.12 
and Scala 2.13
         // `mapValues` is lazy and we need to force it to materialize
-        m.mapValues(mapChild).view.force.toMap
+        m.view.mapValues(mapChild).view.force.toMap
       case arg: TreeNode[_] if containsChild(arg) => mapTreeNode(arg)
       case Some(child) => Some(mapChild(child))
       case nonChild: AnyRef => nonChild
@@ -786,7 +786,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
         Some(arg.asInstanceOf[BaseType].clone())
       // `map.mapValues().view.force` return `Map` in Scala 2.12 but return 
`IndexedSeq` in Scala
       // 2.13, call `toMap` method manually to compatible with Scala 2.12 and 
Scala 2.13
-      case m: Map[_, _] => m.mapValues {
+      case m: Map[_, _] => m.view.mapValues {
         case arg: TreeNode[_] if containsChild(arg) =>
           arg.asInstanceOf[BaseType].clone()
         case other => other
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ToNumberParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ToNumberParser.scala
index d56bca30a05..1274751ac94 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ToNumberParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ToNumberParser.scala
@@ -334,7 +334,7 @@ class ToNumberParser(numberFormat: String, errorOnFail: 
Boolean) extends Seriali
       )
     }
     // Make sure that the format string does not contain any prohibited 
duplicate tokens.
-    val inputTokenCounts = formatTokens.groupBy(identity).mapValues(_.size)
+    val inputTokenCounts = 
formatTokens.groupBy(identity).view.mapValues(_.size)
     Seq(DecimalPoint(),
       OptionalPlusOrMinusSign(),
       OptionalMinusSign(),
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index ae5ebd6a974..5fcd71d8bf9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -228,6 +228,7 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
       case seq => Some(CreateStruct(seq)).map(e => Alias(e, e.sql)()).get
     }
       .groupBy(_.dataType)
+      .view
       .mapValues(values => values.map(value => toSQLId(value.name)).sorted)
       .mapValues(values => if (values.length > 3) values.take(3) :+ "..." else 
values)
       .toList.sortBy(_._1.sql)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index aa3fe8dfb7c..59aa17baa7f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -683,7 +683,7 @@ class SparkSession private(
       val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) {
         val parsedPlan = sessionState.sqlParser.parsePlan(sqlText)
         if (args.nonEmpty) {
-          NameParameterizedQuery(parsedPlan, args.mapValues(lit(_).expr).toMap)
+          NameParameterizedQuery(parsedPlan, 
args.view.mapValues(lit(_).expr).toMap)
         } else {
           parsedPlan
         }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
index dc0857383e7..c2b227d6cad 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
@@ -64,7 +64,7 @@ case class AnalyzePartitionCommand(
         tableId.table, tableId.database.get, schemaColumns, specColumns)
     }
 
-    val filteredSpec = 
normalizedPartitionSpec.filter(_._2.isDefined).mapValues(_.get)
+    val filteredSpec = 
normalizedPartitionSpec.filter(_._2.isDefined).view.mapValues(_.get)
     if (filteredSpec.isEmpty) {
       None
     } else {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 8011cc00743..9b38155851e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -406,7 +406,7 @@ object PartitioningUtils extends SQLConfHelper {
     val distinctPartColNames = 
pathWithPartitionValues.map(_._2.columnNames).distinct
 
     def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] =
-      seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value) 
=> value }).toMap
+      seq.groupBy { case (key, _) => key }.view.mapValues(_.map { case (_, 
value) => value }).toMap
 
     val partColNamesToPaths = groupByKey(pathWithPartitionValues.map {
       case (path, partValues) => partValues.columnNames -> path
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala
index b7de20ae293..e5444094673 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala
@@ -81,6 +81,7 @@ trait OrcFiltersBase {
       val dedupPrimitiveFields = primitiveFields
         .groupBy(_._1.toLowerCase(Locale.ROOT))
         .filter(_._2.size == 1)
+        .view
         .mapValues(_.head._2)
       CaseInsensitiveMap(dedupPrimitiveFields.toMap)
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index c5360c9a04f..c1d02ba5a22 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -110,6 +110,7 @@ class ParquetFilters(
       primitiveFields
         .groupBy(_._1.toLowerCase(Locale.ROOT))
         .filter(_._2.size == 1)
+        .view
         .mapValues(_.head._2)
       CaseInsensitiveMap(dedupPrimitiveFields.toMap)
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index 8552c950f67..bddf6d02f1f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -491,6 +491,7 @@ case class EnsureRequirements(
               val numExpectedPartitions = partValues
                 .map(InternalRowComparableWrapper(_, partitionExprs))
                 .groupBy(identity)
+                .view
                 .mapValues(_.size)
 
               mergedPartValues = mergedPartValues.map { case (partVal, 
numParts) =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index 702d3ea09b6..e70e94001ee 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -136,9 +136,9 @@ trait ProgressReporter extends Logging {
       from: StreamProgress,
       to: StreamProgress,
       latest: StreamProgress): Unit = {
-    currentTriggerStartOffsets = from.mapValues(_.json).toMap
-    currentTriggerEndOffsets = to.mapValues(_.json).toMap
-    currentTriggerLatestOffsets = latest.mapValues(_.json).toMap
+    currentTriggerStartOffsets = from.view.mapValues(_.json).toMap
+    currentTriggerEndOffsets = to.view.mapValues(_.json).toMap
+    currentTriggerLatestOffsets = latest.view.mapValues(_.json).toMap
     latestStreamProgress = to
   }
 
@@ -243,7 +243,7 @@ trait ProgressReporter extends Logging {
       batchId = currentBatchId,
       batchDuration = processingTimeMills,
       durationMs =
-        new 
java.util.HashMap(currentDurationsMs.toMap.mapValues(long2Long).toMap.asJava),
+        new 
java.util.HashMap(currentDurationsMs.toMap.view.mapValues(long2Long).toMap.asJava),
       eventTime = new java.util.HashMap(executionStats.eventTimeStats.asJava),
       stateOperators = executionStats.stateOperators.toArray,
       sources = sourceProgress.toArray,
@@ -297,7 +297,7 @@ trait ProgressReporter extends Logging {
         Map(
           "max" -> stats.max,
           "min" -> stats.min,
-          "avg" -> stats.avg.toLong).mapValues(formatTimestamp)
+          "avg" -> stats.avg.toLong).view.mapValues(formatTimestamp)
     }.headOption.getOrElse(Map.empty) ++ watermarkTimestamp
 
     ExecutionStats(numInputRows, stateOperators, eventTimeStats.toMap)
@@ -307,7 +307,7 @@ trait ProgressReporter extends Logging {
   private def extractSourceToNumInputRows(): Map[SparkDataStream, Long] = {
 
     def sumRows(tuples: Seq[(SparkDataStream, Long)]): Map[SparkDataStream, 
Long] = {
-      tuples.groupBy(_._1).mapValues(_.map(_._2).sum).toMap // sum up rows for 
each source
+      tuples.groupBy(_._1).view.mapValues(_.map(_._2).sum).toMap // sum up 
rows for each source
     }
 
     def unrollCTE(plan: LogicalPlan): LogicalPlan = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index 40f63c86a6a..3dfc27b14c2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -507,7 +507,7 @@ class RocksDB(
       "put" -> DB_WRITE,
       "compaction" -> COMPACTION_TIME
     ).toMap
-    val nativeOpsLatencyMicros = nativeOpsHistograms.mapValues { typ =>
+    val nativeOpsLatencyMicros = nativeOpsHistograms.view.mapValues { typ =>
       RocksDBNativeHistogram(nativeStats.getHistogramData(typ))
     }
     val nativeOpsMetricTickers = Seq(
@@ -530,7 +530,7 @@ class RocksDB(
       /** Number of bytes written during flush */
       "totalBytesWrittenByFlush" -> FLUSH_WRITE_BYTES
     ).toMap
-    val nativeOpsMetrics = nativeOpsMetricTickers.mapValues { typ =>
+    val nativeOpsMetrics = nativeOpsMetricTickers.view.mapValues { typ =>
       nativeStats.getTickerCount(typ)
     }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index def4a4104f7..b36aa264e28 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -156,7 +156,7 @@ trait StateStoreWriter extends StatefulOperator with 
PythonSQLMetrics { self: Sp
       .map(entry => entry._1 -> longMetric(entry._1).value)
 
     val javaConvertedCustomMetrics: java.util.HashMap[String, java.lang.Long] =
-      new java.util.HashMap(customMetrics.mapValues(long2Long).toMap.asJava)
+      new 
java.util.HashMap(customMetrics.view.mapValues(long2Long).toMap.asJava)
 
     // We now don't report number of shuffle partitions inside the state 
operator. Instead,
     // it will be filled when the stream query progress is reported
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
index 2e088ec8e4b..f66b99a154e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
@@ -79,7 +79,7 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends 
WebUIPage("") with L
             request,
             "running",
             running.toSeq,
-            executionIdToSubExecutions.mapValues(_.toSeq).toMap,
+            executionIdToSubExecutions.view.mapValues(_.toSeq).toMap,
             currentTime,
             showErrorMessage = false,
             showRunningJobs = true,
@@ -105,7 +105,7 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends 
WebUIPage("") with L
           request,
           "completed",
           completed.toSeq,
-          executionIdToSubExecutions.mapValues(_.toSeq).toMap,
+          executionIdToSubExecutions.view.mapValues(_.toSeq).toMap,
           currentTime,
           showErrorMessage = false,
           showRunningJobs = false,
@@ -132,7 +132,7 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends 
WebUIPage("") with L
             request,
             "failed",
             failed.toSeq,
-            executionIdToSubExecutions.mapValues(_.toSeq).toMap,
+            executionIdToSubExecutions.view.mapValues(_.toSeq).toMap,
             currentTime,
             showErrorMessage = true,
             showRunningJobs = false,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
index 47ff942e5ca..1dece5c8285 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
@@ -343,7 +343,7 @@ class DataFrameStatSuite extends QueryTest with 
SharedSparkSession {
       val columnNames = crosstab.schema.fieldNames
       assert(columnNames(0) === "a_b")
       // reduce by key
-      val expected = data.map(t => (t, 1)).groupBy(_._1).mapValues(_.length)
+      val expected = data.map(t => (t, 
1)).groupBy(_._1).view.mapValues(_.length)
       val rows = crosstab.collect()
       rows.foreach { row =>
         val i = row.getString(0).toInt
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
index 226d5098d42..d6dc5165fbb 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
@@ -425,7 +425,7 @@ class SQLQueryTestSuite extends QueryTest with 
SharedSparkSession with SQLHelper
       // We need to do cartesian product for all the config dimensions, to get 
a list of
       // config sets, and run the query once for each config set.
       val configDimLines = 
comments.filter(_.startsWith("--CONFIG_DIM")).map(_.substring(12))
-      val configDims = configDimLines.groupBy(_.takeWhile(_ != ' ')).mapValues 
{ lines =>
+      val configDims = configDimLines.groupBy(_.takeWhile(_ != ' 
')).view.mapValues { lines =>
         lines.map(_.dropWhile(_ != ' ').substring(1)).map(_.split(",").map { 
kv =>
           val (conf, value) = kv.span(_ != '=')
           conf.trim -> value.substring(1).trim
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
index 814cf2f33ff..60d82fd1ac3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -527,7 +527,7 @@ class UDFSuite extends QueryTest with SharedSparkSession {
       sparkContext.parallelize(Seq(Row(Map("a" -> new 
BigDecimal("2011000000000002456556"))))),
       StructType(Seq(StructField("col1", MapType(StringType, DecimalType(30, 
0))))))
     val udf2 = org.apache.spark.sql.functions.udf((map: Map[String, 
BigDecimal]) => {
-      map.mapValues(value => if (value == null) null else 
value.toBigInteger.toString).toMap
+      map.view.mapValues(value => if (value == null) null else 
value.toBigInteger.toString).toMap
     })
     checkAnswer(df2.select(udf2($"col1")), Seq(Row(Map("a" -> 
"2011000000000002456556"))))
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
index 6563a7698e2..057cb527cf0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
@@ -629,7 +629,7 @@ class PersistedViewTestSuite extends SQLViewTestSuite with 
SharedSparkSession {
         val meta = catalog.getTableRawMetadata(TableIdentifier("test_view", 
Some("default")))
         // simulate a view meta with incompatible schema change
         val newProp = meta.properties
-          .mapValues(_.replace("col_i", "col_j")).toMap
+          .view.mapValues(_.replace("col_i", "col_j")).toMap
         val newSchema = StructType(Seq(StructField("col_j", IntegerType)))
         catalog.alterTable(meta.copy(properties = newProp, schema = newSchema))
         val e = intercept[AnalysisException] {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala
index eaf305414f1..1e786c8e578 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala
@@ -47,7 +47,7 @@ trait AlterTableDropPartitionSuiteBase extends QueryTest with 
DDLCommandTestUtil
       t: String,
       ifExists: String,
       specs: Map[String, Any]*): Unit = {
-    checkPartitions(t, specs.map(_.mapValues(_.toString).toMap): _*)
+    checkPartitions(t, specs.map(_.view.mapValues(_.toString).toMap): _*)
     val specStr = specs.map(partSpecToString).mkString(", ")
     sql(s"ALTER TABLE $t DROP $ifExists $specStr")
     checkPartitions(t)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index a5d5f8ce30f..726ae87b5ce 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -392,7 +392,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
     withParquetDataFrame(data) { df =>
       // Structs are converted to `Row`s
       checkAnswer(df, data.map { case Tuple1(m) =>
-        Row(m.mapValues(struct => Row(struct.productIterator.toSeq: _*)))
+        Row(m.view.mapValues(struct => Row(struct.productIterator.toSeq: _*)))
       })
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
index cae008af6f0..30315c12b58 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
@@ -216,8 +216,8 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
       expectedNumOfJobs: Int,
       expectedMetrics: Map[Long, (String, Map[String, Any])],
       enableWholeStage: Boolean = false): Unit = {
-    val expectedMetricsPredicates = expectedMetrics.mapValues { case 
(nodeName, nodeMetrics) =>
-      (nodeName, nodeMetrics.mapValues(expectedMetricValue =>
+    val expectedMetricsPredicates = expectedMetrics.view.mapValues { case 
(nodeName, nodeMetrics) =>
+      (nodeName, nodeMetrics.view.mapValues(expectedMetricValue =>
         (actualMetricValue: Any) => {
           actualMetricValue.toString.matches(expectedMetricValue.toString)
         }).toMap)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index 9a3313b8069..5affc9ef3b2 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -220,23 +220,23 @@ abstract class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTes
     )))
 
     checkAnswer(statusStore.executionMetrics(executionId),
-      accumulatorUpdates.mapValues(_ * 2).toMap)
+      accumulatorUpdates.view.mapValues(_ * 2).toMap)
 
     // Driver accumulator updates don't belong to this execution should be 
filtered and no
     // exception will be thrown.
     listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
 
     checkAnswer(statusStore.executionMetrics(executionId),
-      accumulatorUpdates.mapValues(_ * 2).toMap)
+      accumulatorUpdates.view.mapValues(_ * 2).toMap)
 
     listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", 
Seq(
       // (task id, stage id, stage attempt, accum updates)
       (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
-      (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 
2).toMap))
+      (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.view.mapValues(_ * 
2).toMap))
     )))
 
     checkAnswer(statusStore.executionMetrics(executionId),
-      accumulatorUpdates.mapValues(_ * 3).toMap)
+      accumulatorUpdates.view.mapValues(_ * 3).toMap)
 
     // Retrying a stage should reset the metrics
     listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 
1)))
@@ -250,7 +250,7 @@ abstract class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTes
     )))
 
     checkAnswer(statusStore.executionMetrics(executionId),
-      accumulatorUpdates.mapValues(_ * 2).toMap)
+      accumulatorUpdates.view.mapValues(_ * 2).toMap)
 
     // Ignore the task end for the first attempt
     listener.onTaskEnd(SparkListenerTaskEnd(
@@ -258,12 +258,12 @@ abstract class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTes
       stageAttemptId = 0,
       taskType = "",
       reason = null,
-      createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 
100).toMap),
+      createTaskInfo(0, 0, accums = accumulatorUpdates.view.mapValues(_ * 
100).toMap),
       new ExecutorMetrics,
       null))
 
     checkAnswer(statusStore.executionMetrics(executionId),
-      accumulatorUpdates.mapValues(_ * 2).toMap)
+      accumulatorUpdates.view.mapValues(_ * 2).toMap)
 
     // Finish two tasks
     listener.onTaskEnd(SparkListenerTaskEnd(
@@ -271,7 +271,7 @@ abstract class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTes
       stageAttemptId = 1,
       taskType = "",
       reason = null,
-      createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2).toMap),
+      createTaskInfo(0, 0, accums = accumulatorUpdates.view.mapValues(_ * 
2).toMap),
       new ExecutorMetrics,
       null))
     listener.onTaskEnd(SparkListenerTaskEnd(
@@ -279,12 +279,12 @@ abstract class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTes
       stageAttemptId = 1,
       taskType = "",
       reason = null,
-      createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3).toMap),
+      createTaskInfo(1, 0, accums = accumulatorUpdates.view.mapValues(_ * 
3).toMap),
       new ExecutorMetrics,
       null))
 
     checkAnswer(statusStore.executionMetrics(executionId),
-      accumulatorUpdates.mapValues(_ * 5).toMap)
+      accumulatorUpdates.view.mapValues(_ * 5).toMap)
 
     // Summit a new stage
     listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 
0)))
@@ -298,7 +298,7 @@ abstract class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTes
     )))
 
     checkAnswer(statusStore.executionMetrics(executionId),
-      accumulatorUpdates.mapValues(_ * 7).toMap)
+      accumulatorUpdates.view.mapValues(_ * 7).toMap)
 
     // Finish two tasks
     listener.onTaskEnd(SparkListenerTaskEnd(
@@ -306,7 +306,7 @@ abstract class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTes
       stageAttemptId = 0,
       taskType = "",
       reason = null,
-      createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 3).toMap),
+      createTaskInfo(0, 0, accums = accumulatorUpdates.view.mapValues(_ * 
3).toMap),
       new ExecutorMetrics,
       null))
     listener.onTaskEnd(SparkListenerTaskEnd(
@@ -314,12 +314,12 @@ abstract class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTes
       stageAttemptId = 0,
       taskType = "",
       reason = null,
-      createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3).toMap),
+      createTaskInfo(1, 0, accums = accumulatorUpdates.view.mapValues(_ * 
3).toMap),
       new ExecutorMetrics,
       null))
 
     checkAnswer(statusStore.executionMetrics(executionId),
-      accumulatorUpdates.mapValues(_ * 11).toMap)
+      accumulatorUpdates.view.mapValues(_ * 11).toMap)
 
     assertJobs(statusStore.execution(executionId), running = Seq(0))
 
@@ -334,7 +334,7 @@ abstract class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTes
     assertJobs(statusStore.execution(executionId), completed = Seq(0))
 
     checkAnswer(statusStore.executionMetrics(executionId),
-      accumulatorUpdates.mapValues(_ * 11).toMap)
+      accumulatorUpdates.view.mapValues(_ * 11).toMap)
   }
 
   test("control a plan explain mode in listeners via SQLConf") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index 4fa49064faa..4c478486c6b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -391,7 +391,7 @@ object StreamingQueryStatusAndProgressSuite {
     timestamp = "2016-12-05T20:54:20.827Z",
     batchId = 2L,
     batchDuration = 0L,
-    durationMs = new java.util.HashMap(Map("total" -> 
0L).mapValues(long2Long).toMap.asJava),
+    durationMs = new java.util.HashMap(Map("total" -> 
0L).view.mapValues(long2Long).toMap.asJava),
     eventTime = new java.util.HashMap(Map(
       "max" -> "2016-12-05T20:54:20.827Z",
       "min" -> "2016-12-05T20:54:20.827Z",
@@ -403,7 +403,7 @@ object StreamingQueryStatusAndProgressSuite {
       numShufflePartitions = 2, numStateStoreInstances = 2,
       customMetrics = new 
java.util.HashMap(Map("stateOnCurrentVersionSizeBytes" -> 2L,
         "loadedMapCacheHitCount" -> 1L, "loadedMapCacheMissCount" -> 0L)
-        .mapValues(long2Long).toMap.asJava)
+        .view.mapValues(long2Long).toMap.asJava)
     )),
     sources = Array(
       new SourceProgress(
@@ -429,7 +429,7 @@ object StreamingQueryStatusAndProgressSuite {
     timestamp = "2016-12-05T20:54:20.827Z",
     batchId = 2L,
     batchDuration = 0L,
-    durationMs = new java.util.HashMap(Map("total" -> 
0L).mapValues(long2Long).toMap.asJava),
+    durationMs = new java.util.HashMap(Map("total" -> 
0L).view.mapValues(long2Long).toMap.asJava),
     // empty maps should be handled correctly
     eventTime = new java.util.HashMap(Map.empty[String, String].asJava),
     stateOperators = Array(new StateOperatorProgress(operatorName = "op2",
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index 44de7af0b4a..e5e873cca12 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -61,7 +61,7 @@ class ContinuousSuiteBase extends StreamTest {
       c.committedOffsets.lastOption.map { case (_, offset) =>
         offset match {
           case o: RateStreamOffset =>
-            o.partitionToValueAndRunTimeMs.mapValues(_.value).toMap
+            o.partitionToValueAndRunTimeMs.view.mapValues(_.value).toMap
         }
       }
     }
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 5c340522f91..f52ff8cf9a1 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -1261,7 +1261,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
       table: String,
       partialSpec: Option[TablePartitionSpec] = None): Seq[String] = 
withClient {
     val catalogTable = getTable(db, table)
-    val partColNameMap = 
buildLowerCasePartColNameMap(catalogTable).mapValues(escapePathName)
+    val partColNameMap = 
buildLowerCasePartColNameMap(catalogTable).view.mapValues(escapePathName)
     val clientPartitionNames =
       client.getPartitionNames(catalogTable, 
partialSpec.map(toMetaStorePartitionSpec))
     clientPartitionNames.map { partitionPath =>
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
index af884914ad8..5a124be5679 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
@@ -65,7 +65,7 @@ private[streaming] class 
JavaStreamingListenerWrapper(javaStreamingListener: Jav
   private def toJavaBatchInfo(batchInfo: BatchInfo): JavaBatchInfo = {
     JavaBatchInfo(
       batchInfo.batchTime,
-      
batchInfo.streamIdToInputInfo.mapValues(toJavaStreamInputInfo).toMap.asJava,
+      
batchInfo.streamIdToInputInfo.view.mapValues(toJavaStreamInputInfo).toMap.asJava,
       batchInfo.submissionTime,
       batchInfo.processingStartTime.getOrElse(-1),
       batchInfo.processingEndTime.getOrElse(-1),
@@ -73,7 +73,7 @@ private[streaming] class 
JavaStreamingListenerWrapper(javaStreamingListener: Jav
       batchInfo.processingDelay.getOrElse(-1),
       batchInfo.totalDelay.getOrElse(-1),
       batchInfo.numRecords,
-      
batchInfo.outputOperationInfos.mapValues(toJavaOutputOperationInfo).toMap.asJava
+      
batchInfo.outputOperationInfos.view.mapValues(toJavaOutputOperationInfo).toMap.asJava
     )
   }
 
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
index 02eee36d4be..5c47cde0459 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
@@ -184,7 +184,7 @@ private[streaming] class ReceiverSchedulingPolicy {
 
     val executorWeights: Map[ExecutorCacheTaskLocation, Double] = {
       
receiverTrackingInfoMap.values.flatMap(convertReceiverTrackingInfoToExecutorWeights)
-        .groupBy(_._1).mapValues(_.map(_._2).sum).toMap // Sum weights for 
each executor
+        .groupBy(_._1).view.mapValues(_.map(_._2).sum).toMap // Sum weights 
for each executor
     }
 
     val idleExecutors = executors.toSet -- executorWeights.keys
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index 342a0a43b50..685ddf67237 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -244,7 +244,7 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
    */
   def allocatedExecutors(): Map[Int, Option[String]] = synchronized {
     if (isTrackerStarted) {
-      endpoint.askSync[Map[Int, 
ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues {
+      endpoint.askSync[Map[Int, 
ReceiverTrackingInfo]](GetAllReceiverInfo).view.mapValues {
         _.runningExecutor.map {
           _.executorId
         }
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
index 1af60857bc7..08de8d46c31 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
@@ -98,7 +98,7 @@ private[ui] object BatchUIData {
 
   def apply(batchInfo: BatchInfo): BatchUIData = {
     val outputOperations = mutable.HashMap[OutputOpId, OutputOperationUIData]()
-    outputOperations ++= 
batchInfo.outputOperationInfos.mapValues(OutputOperationUIData.apply)
+    outputOperations ++= 
batchInfo.outputOperationInfos.view.mapValues(OutputOperationUIData.apply)
     new BatchUIData(
       batchInfo.batchTime,
       batchInfo.streamIdToInputInfo,
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 9abf018584c..140eb866f6f 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -216,7 +216,8 @@ private[spark] class StreamingJobProgressListener(ssc: 
StreamingContext)
   def receivedRecordRateWithBatchTime: Map[Int, Seq[(Long, Double)]] = 
synchronized {
     val _retainedBatches = retainedBatches
     val latestBatches = _retainedBatches.map { batchUIData =>
-      (batchUIData.batchTime.milliseconds, 
batchUIData.streamIdToInputInfo.mapValues(_.numRecords))
+      (batchUIData.batchTime.milliseconds,
+        batchUIData.streamIdToInputInfo.view.mapValues(_.numRecords))
     }
     streamIds.map { streamId =>
       val recordRates = latestBatches.map {
@@ -230,7 +231,7 @@ private[spark] class StreamingJobProgressListener(ssc: 
StreamingContext)
 
   def lastReceivedBatchRecords: Map[Int, Long] = synchronized {
     val lastReceivedBlockInfoOption =
-      lastReceivedBatch.map(_.streamIdToInputInfo.mapValues(_.numRecords))
+      lastReceivedBatch.map(_.streamIdToInputInfo.view.mapValues(_.numRecords))
     lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
       streamIds.map { streamId =>
         (streamId, lastReceivedBlockInfo.getOrElse(streamId, 0L))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org


Reply via email to