This is an automated email from the ASF dual-hosted git repository. viirya pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 8302492 [SPARK-34059][SQL][CORE] Use for/foreach rather than map to make sure execute it eagerly 8302492 is described below commit 830249284df4f5574aba7762cd981244d7b2dfaa Author: HyukjinKwon <gurwls...@apache.org> AuthorDate: Sun Jan 10 15:22:24 2021 -0800 [SPARK-34059][SQL][CORE] Use for/foreach rather than map to make sure execute it eagerly ### What changes were proposed in this pull request? This PR is basically a followup of https://github.com/apache/spark/pull/14332. Calling `map` alone might leave it not executed due to lazy evaluation, e.g.) ``` scala> val foo = Seq(1,2,3) foo: Seq[Int] = List(1, 2, 3) scala> foo.map(println) 1 2 3 res0: Seq[Unit] = List((), (), ()) scala> foo.view.map(println) res1: scala.collection.SeqView[Unit,Seq[_]] = SeqViewM(...) scala> foo.view.foreach(println) 1 2 3 ``` We should better use `foreach` to make sure it's executed where the output is unused or `Unit`. ### Why are the changes needed? To prevent the potential issues by not executing `map`. ### Does this PR introduce _any_ user-facing change? No, the current codes look not causing any problem for now. ### How was this patch tested? I found these item by running IntelliJ inspection, double checked one by one, and fixed them. These should be all instances across the codebase ideally. Closes #31110 from HyukjinKwon/SPARK-34059. Authored-by: HyukjinKwon <gurwls...@apache.org> Signed-off-by: Liang-Chi Hsieh <vii...@gmail.com> --- core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala | 2 +- .../test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala | 2 +- .../spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala | 2 +- .../src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala | 2 +- .../spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala | 4 ++-- .../src/main/scala/org/apache/spark/sql/execution/CacheManager.scala | 2 +- .../sql/execution/datasources/v2/SetCatalogAndNamespaceExec.scala | 4 ++-- .../spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala | 4 ++-- .../sql/execution/columnar/compression/PassThroughEncodingSuite.scala | 4 ++-- 9 files changed, 13 insertions(+), 13 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index f49cb3c..8d9f2be 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -244,7 +244,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( /* backlog */ 1, InetAddress.getByName("localhost"))) // A call to accept() for ServerSocket shall block infinitely. - serverSocket.map(_.setSoTimeout(0)) + serverSocket.foreach(_.setSoTimeout(0)) new Thread("accept-connections") { setDaemon(true) diff --git a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala index 8256965..12ebddf 100644 --- a/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala +++ b/external/avro/src/test/scala/org/apache/spark/sql/avro/AvroLogicalTypeSuite.scala @@ -246,7 +246,7 @@ abstract class AvroLogicalTypeSuite extends QueryTest with SharedSparkSession { dataFileWriter.create(schema, new File(avroFile)) val logicalType = LogicalTypes.decimal(precision, scale) - decimalInputData.map { x => + decimalInputData.foreach { x => val avroRec = new GenericData.Record(schema) val decimal = new java.math.BigDecimal(x).setScale(scale) val bytes = diff --git a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala index a5a2611..e9fbff7 100644 --- a/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala +++ b/resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendUtil.scala @@ -138,7 +138,7 @@ private[spark] object MesosSchedulerBackendUtil extends Logging { val containerInfo = ContainerInfo.newBuilder() .setType(containerType) - conf.get(EXECUTOR_DOCKER_IMAGE).map { image => + conf.get(EXECUTOR_DOCKER_IMAGE).foreach { image => val forcePullImage = conf .get(EXECUTOR_DOCKER_FORCE_PULL_IMAGE).contains(true) diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index ac50c1c..f236fc3 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -263,7 +263,7 @@ private[yarn] class YarnAllocator( private def getPendingAtLocation( location: String): Map[Int, Seq[ContainerRequest]] = synchronized { val allContainerRequests = new mutable.HashMap[Int, Seq[ContainerRequest]] - rpIdToResourceProfile.keys.map { id => + rpIdToResourceProfile.keys.foreach { id => val profResource = rpIdToYarnResource.get(id) val result = amClient.getMatchingRequests(getContainerPriority(id), location, profResource) .asScala.flatMap(_.asScala) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala index fa77947..4867daf 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationsSuite.scala @@ -685,7 +685,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { isMapGroupsWithState = false, null, streamRelation).groupBy("*")(count("*")), OutputMode.Append()) - Seq(Inner, LeftOuter, RightOuter).map { joinType => + Seq(Inner, LeftOuter, RightOuter).foreach { joinType => assertFailOnGlobalWatermarkLimit( s"stream-stream $joinType after FlatMapGroupsWithState in Append mode", streamRelation.join( @@ -718,7 +718,7 @@ class UnsupportedOperationsSuite extends SparkFunSuite with SQLHelper { Deduplicate(Seq(attribute), streamRelation).groupBy("a")(count("*")), OutputMode.Append()) - Seq(Inner, LeftOuter, RightOuter).map { joinType => + Seq(Inner, LeftOuter, RightOuter).foreach { joinType => assertPassOnGlobalWatermarkLimit( s"$joinType join after deduplicate in Append mode", streamRelation.join(Deduplicate(Seq(attribute), streamRelation), joinType = joinType, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala index b367194..0c6f22d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala @@ -221,7 +221,7 @@ class CacheManager extends Logging with AdaptiveSparkPlanHelper { // Remove the cache entry before creating a new ones. cachedData = cachedData.filterNot(cd => needToRecache.exists(_ eq cd)) } - needToRecache.map { cd => + needToRecache.foreach { cd => cd.cachedRepresentation.cacheBuilder.clearCache() val sessionWithConfigsOff = SparkSession.getOrCloneSessionWithConfigsOff( spark, forceDisableConfigs) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetCatalogAndNamespaceExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetCatalogAndNamespaceExec.scala index 9e6f00e..b13cea2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetCatalogAndNamespaceExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/SetCatalogAndNamespaceExec.scala @@ -32,8 +32,8 @@ case class SetCatalogAndNamespaceExec( override protected def run(): Seq[InternalRow] = { // The catalog is updated first because CatalogManager resets the current namespace // when the current catalog is set. - catalogName.map(catalogManager.setCurrentCatalog) - namespace.map(ns => catalogManager.setCurrentNamespace(ns.toArray)) + catalogName.foreach(catalogManager.setCurrentCatalog) + namespace.foreach(ns => catalogManager.setCurrentNamespace(ns.toArray)) Seq.empty } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala index 27558e5..c4258fa 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/jdbc/JDBCTableCatalog.scala @@ -127,7 +127,7 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging var tableComment: String = "" var tableProperties: String = "" if (!properties.isEmpty) { - properties.asScala.map { + properties.asScala.foreach { case (k, v) => k match { case TableCatalog.PROP_COMMENT => tableComment = v case TableCatalog.PROP_PROVIDER => @@ -226,7 +226,7 @@ class JDBCTableCatalog extends TableCatalog with SupportsNamespaces with Logging case Array(db) if !namespaceExists(namespace) => var comment = "" if (!metadata.isEmpty) { - metadata.asScala.map { + metadata.asScala.foreach { case (k, v) => k match { case SupportsNamespaces.PROP_COMMENT => comment = v case SupportsNamespaces.PROP_OWNER => // ignore diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala index f946a67..c6fe64d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/compression/PassThroughEncodingSuite.scala @@ -46,7 +46,7 @@ class PassThroughSuite extends SparkFunSuite { val builder = TestCompressibleColumnBuilder(columnStats, columnType, PassThrough) - input.map { value => + input.foreach { value => val row = new GenericInternalRow(1) columnType.setField(row, 0, value) builder.appendFrom(row, 0) @@ -98,7 +98,7 @@ class PassThroughSuite extends SparkFunSuite { val row = new GenericInternalRow(1) val nullRow = new GenericInternalRow(1) nullRow.setNullAt(0) - input.map { value => + input.foreach { value => if (value == nullValue) { builder.appendFrom(nullRow, 0) } else { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org