This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 9db566a  [SPARK-34310][CORE][SQL] Replaces map and flatten with flatMap
9db566a is described below

commit 9db566a8821c02427434c551ee6e4d2501563dfa
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Mon Feb 1 08:21:35 2021 -0600

    [SPARK-34310][CORE][SQL] Replaces map and flatten with flatMap
    
    ### What changes were proposed in this pull request?
    Replaces `collection.map(f1).flatten(f2)` with `collection.flatMap` if 
possible. it's semantically consistent, but looks simpler.
    
    ### Why are the changes needed?
    Code Simpilefications.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass the Jenkins or GitHub Action
    
    Closes #31416 from LuciferYang/SPARK-34310.
    
    Authored-by: yangjie01 <yangji...@baidu.com>
    Signed-off-by: Sean Owen <sro...@gmail.com>
---
 core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala          | 2 +-
 .../spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala      | 2 +-
 .../spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala  | 4 ++--
 3 files changed, 4 insertions(+), 4 deletions(-)

diff --git a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
index d5a811d..1dec977 100644
--- a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala
@@ -124,7 +124,7 @@ private[spark] class SubtractedRDD[K: ClassTag, V: 
ClassTag, W: ClassTag](
     integrate(0, t => getSeq(t._1) += t._2)
     // the second dep is rdd2; remove all of its keys
     integrate(1, t => map.remove(t._1))
-    map.asScala.iterator.map(t => t._2.iterator.map((t._1, _))).flatten
+    map.asScala.iterator.flatMap(t => t._2.iterator.map((t._1, _)))
   }
 
   override def clearDependencies(): Unit = {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
index aa2610d..188435d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteDistinctAggregates.scala
@@ -292,7 +292,7 @@ object RewriteDistinctAggregates extends Rule[LogicalPlan] {
           // Final aggregate
           val operators = expressions.map { e =>
             val af = e.aggregateFunction
-            val condition = 
e.filter.map(distinctAggFilterAttrLookup.get(_)).flatten
+            val condition = e.filter.flatMap(distinctAggFilterAttrLookup.get)
             val naf = if (af.children.forall(_.foldable)) {
               // If aggregateFunction's children are all foldable, we only put 
the first child in
               // distinctAggGroups. So here we only need to rewrite the first 
child to
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala
index 3c83388..771ddbd 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/HadoopFileLinesReaderSuite.scala
@@ -36,13 +36,13 @@ class HadoopFileLinesReaderSuite extends SharedSparkSession 
{
     val delimOpt = delimiter.map(_.getBytes(StandardCharsets.UTF_8))
     Files.write(path.toPath, text.getBytes(StandardCharsets.UTF_8))
 
-    val lines = ranges.map { case (start, length) =>
+    val lines = ranges.flatMap { case (start, length) =>
       val file = PartitionedFile(InternalRow.empty, path.getCanonicalPath, 
start, length)
       val hadoopConf = conf.getOrElse(spark.sessionState.newHadoopConf())
       val reader = new HadoopFileLinesReader(file, delimOpt, hadoopConf)
 
       reader.map(_.toString)
-    }.flatten
+    }
 
     lines
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to