This is an automated email from the ASF dual-hosted git repository.

viirya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 8302492  [SPARK-34059][SQL][CORE] Use for/foreach rather than map to 
make sure execute it eagerly
8302492 is described below

commit 830249284df4f5574aba7762cd981244d7b2dfaa
Author: HyukjinKwon <gurwls...@apache.org>
AuthorDate: Sun Jan 10 15:22:24 2021 -0800

    [SPARK-34059][SQL][CORE] Use for/foreach rather than map to make sure 
execute it eagerly
    
    ### What changes were proposed in this pull request?
    
    This PR is basically a followup of 
https://github.com/apache/spark/pull/14332.
    Calling `map` alone might leave it not executed due to lazy evaluation, 
e.g.)
    
    ```
    scala> val foo = Seq(1,2,3)
    foo: Seq[Int] = List(1, 2, 3)
    
    scala> foo.map(println)
    1
    2
    3
    res0: Seq[Unit] = List((), (), ())
    
    scala> foo.view.map(println)
    res1: scala.collection.SeqView[Unit,Seq[_]] = SeqViewM(...)
    
    scala> foo.view.foreach(println)
    1
    2
    3
    ```
    
    We should better use `foreach` to make sure it's executed where the output 
is unused or `Unit`.
    
    ### Why are the changes needed?
    
    To prevent the potential issues by not executing `map`.
    
    ### Does this PR introduce _any_ user-facing change?
    
    No, the current codes look not causing any problem for now.
    
    ### How was this patch tested?
    
    I found these item by running IntelliJ inspection, double checked one by 
one, and fixed them. These should be all instances across the codebase ideally.
    
    Closes #31110 from HyukjinKwon/SPARK-34059.
    
    Authored-by: HyukjinKwon <gurwls...@apache.org>
    Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com>
---
 core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala    | 2 +-
 .../test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala   | 2 +-
 .../spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala     | 2 +-
 .../src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala   | 2 +-
 .../spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala      | 4 ++--
 .../src/main/scala/org/apache/spark/sql/execution/CacheManager.scala  | 2 +-
 .../sql/execution/datasources/v2/SetCatalogAndNamespaceExec.scala     | 4 ++--
 .../spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala    | 4 ++--
 .../sql/execution/columnar/compression/PassThroughEncodingSuite.scala | 4 ++--
 9 files changed, 13 insertions(+), 13 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index f49cb3c..8d9f2be 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -244,7 +244,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT](
             /* backlog */ 1,
             InetAddress.getByName("localhost")))
           // A call to accept() for ServerSocket shall block infinitely.
-          serverSocket.map(_.setSoTimeout(0))
+          serverSocket.foreach(_.setSoTimeout(0))
           new Thread("accept-connections") {
             setDaemon(true)
 
diff --git 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
index 8256965..12ebddf 100644
--- 
a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
+++ 
b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala
@@ -246,7 +246,7 @@ abstract class AvroLogicalTypeSuite extends QueryTest with 
SharedSparkSession {
     dataFileWriter.create(schema, new File(avroFile))
     val logicalType = LogicalTypes.decimal(precision, scale)
 
-    decimalInputData.map { x =>
+    decimalInputData.foreach { x =>
       val avroRec = new GenericData.Record(schema)
       val decimal = new java.math.BigDecimal(x).setScale(scale)
       val bytes =
diff --git 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
index a5a2611..e9fbff7 100644
--- 
a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
+++ 
b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala
@@ -138,7 +138,7 @@ private[spark] object MesosSchedulerBackendUtil extends 
Logging {
     val containerInfo = ContainerInfo.newBuilder()
       .setType(containerType)
 
-    conf.get(EXECUTOR_DOCKER_IMAGE).map { image =>
+    conf.get(EXECUTOR_DOCKER_IMAGE).foreach { image =>
       val forcePullImage = conf
         .get(EXECUTOR_DOCKER_FORCE_PULL_IMAGE).contains(true)
 
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index ac50c1c..f236fc3 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -263,7 +263,7 @@ private[yarn] class YarnAllocator(
   private def getPendingAtLocation(
       location: String): Map[Int, Seq[ContainerRequest]] = synchronized {
     val allContainerRequests = new mutable.HashMap[Int, Seq[ContainerRequest]]
-    rpIdToResourceProfile.keys.map { id =>
+    rpIdToResourceProfile.keys.foreach { id =>
       val profResource = rpIdToYarnResource.get(id)
       val result = amClient.getMatchingRequests(getContainerPriority(id), 
location, profResource)
         .asScala.flatMap(_.asScala)
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
index fa77947..4867daf 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala
@@ -685,7 +685,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite with 
SQLHelper {
         isMapGroupsWithState = false, null, 
streamRelation).groupBy("*")(count("*")),
       OutputMode.Append())
 
-    Seq(Inner, LeftOuter, RightOuter).map { joinType =>
+    Seq(Inner, LeftOuter, RightOuter).foreach { joinType =>
       assertFailOnGlobalWatermarkLimit(
         s"stream-stream $joinType after FlatMapGroupsWithState in Append mode",
         streamRelation.join(
@@ -718,7 +718,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite with 
SQLHelper {
       Deduplicate(Seq(attribute), streamRelation).groupBy("a")(count("*")),
       OutputMode.Append())
 
-    Seq(Inner, LeftOuter, RightOuter).map { joinType =>
+    Seq(Inner, LeftOuter, RightOuter).foreach { joinType =>
       assertPassOnGlobalWatermarkLimit(
         s"$joinType join after deduplicate in Append mode",
         streamRelation.join(Deduplicate(Seq(attribute), streamRelation), 
joinType = joinType,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index b367194..0c6f22d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -221,7 +221,7 @@ class CacheManager extends Logging with 
AdaptiveSparkPlanHelper {
       // Remove the cache entry before creating a new ones.
       cachedData = cachedData.filterNot(cd => needToRecache.exists(_ eq cd))
     }
-    needToRecache.map { cd =>
+    needToRecache.foreach { cd =>
       cd.cachedRepresentation.cacheBuilder.clearCache()
       val sessionWithConfigsOff = SparkSession.getOrCloneSessionWithConfigsOff(
         spark, forceDisableConfigs)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetCatalogAndNamespaceExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetCatalogAndNamespaceExec.scala
index 9e6f00e..b13cea2 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetCatalogAndNamespaceExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetCatalogAndNamespaceExec.scala
@@ -32,8 +32,8 @@ case class SetCatalogAndNamespaceExec(
   override protected def run(): Seq[InternalRow] = {
     // The catalog is updated first because CatalogManager resets the current 
namespace
     // when the current catalog is set.
-    catalogName.map(catalogManager.setCurrentCatalog)
-    namespace.map(ns => catalogManager.setCurrentNamespace(ns.toArray))
+    catalogName.foreach(catalogManager.setCurrentCatalog)
+    namespace.foreach(ns => catalogManager.setCurrentNamespace(ns.toArray))
 
     Seq.empty
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
index 27558e5..c4258fa 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala
@@ -127,7 +127,7 @@ class JDBCTableCatalog extends TableCatalog with 
SupportsNamespaces with Logging
     var tableComment: String = ""
     var tableProperties: String = ""
     if (!properties.isEmpty) {
-      properties.asScala.map {
+      properties.asScala.foreach {
         case (k, v) => k match {
           case TableCatalog.PROP_COMMENT => tableComment = v
           case TableCatalog.PROP_PROVIDER =>
@@ -226,7 +226,7 @@ class JDBCTableCatalog extends TableCatalog with 
SupportsNamespaces with Logging
     case Array(db) if !namespaceExists(namespace) =>
       var comment = ""
       if (!metadata.isEmpty) {
-        metadata.asScala.map {
+        metadata.asScala.foreach {
           case (k, v) => k match {
             case SupportsNamespaces.PROP_COMMENT => comment = v
             case SupportsNamespaces.PROP_OWNER => // ignore
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala
index f946a67..c6fe64d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala
@@ -46,7 +46,7 @@ class PassThroughSuite extends SparkFunSuite {
 
       val builder = TestCompressibleColumnBuilder(columnStats, columnType, 
PassThrough)
 
-      input.map { value =>
+      input.foreach { value =>
         val row = new GenericInternalRow(1)
         columnType.setField(row, 0, value)
         builder.appendFrom(row, 0)
@@ -98,7 +98,7 @@ class PassThroughSuite extends SparkFunSuite {
       val row = new GenericInternalRow(1)
       val nullRow = new GenericInternalRow(1)
       nullRow.setNullAt(0)
-      input.map { value =>
+      input.foreach { value =>
         if (value == nullValue) {
           builder.appendFrom(nullRow, 0)
         } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to