Repository: spark
Updated Branches:
  refs/heads/branch-1.1 64945f868 -> 3d889dfc1


[SPARK-3954][Streaming] Optimization to FileInputDStream

about convert files to RDDS there are 3 loops with files sequence in spark 
source.
loops files sequence:
1.files.map(...)
2.files.zip(fileRDDs)
3.files-size.foreach
It's will very time consuming when lots of files.So I do the following 
correction:
3 loops with files sequence => only one loop

Author: surq <s...@asiainfo.com>

Closes #2811 from surq/SPARK-3954 and squashes the following commits:

321bbe8 [surq]  updated the code style.The style from [for...yield]to 
[files.map(file=>{})]
88a2c20 [surq] Merge branch 'master' of https://github.com/apache/spark into 
SPARK-3954
178066f [surq] modify code's style. [Exceeds 100 columns]
626ef97 [surq] remove redundant import(ArrayBuffer)
739341f [surq] promote the speed of convert files to RDDS

(cherry picked from commit ce6ed2abd14de26b9ceaa415e9a42fbb1338f5fa)
Signed-off-by: Tathagata Das <tathagata.das1...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3d889dfc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3d889dfc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3d889dfc

Branch: refs/heads/branch-1.1
Commit: 3d889dfc1b470b8e2149e56352059464cc10a252
Parents: 64945f8
Author: surq <s...@asiainfo.com>
Authored: Mon Nov 10 17:37:16 2014 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Mon Nov 10 17:38:22 2014 -0800

----------------------------------------------------------------------
 .../org/apache/spark/streaming/dstream/FileInputDStream.scala | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3d889dfc/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
index 8152b75..55d6cf6 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala
@@ -120,14 +120,15 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: 
NewInputFormat[K,V] : Clas
 
   /** Generate one RDD from an array of files */
   private def filesToRDD(files: Seq[String]): RDD[(K, V)] = {
-    val fileRDDs = files.map(file => context.sparkContext.newAPIHadoopFile[K, 
V, F](file))
-    files.zip(fileRDDs).foreach { case (file, rdd) => {
+    val fileRDDs = files.map(file =>{
+      val rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file)
       if (rdd.partitions.size == 0) {
         logError("File " + file + " has no data in it. Spark Streaming can 
only ingest " +
           "files that have been \"moved\" to the directory assigned to the 
file stream. " +
           "Refer to the streaming programming guide for more details.")
       }
-    }}
+      rdd
+    })
     new UnionRDD(context.sparkContext, fileRDDs)
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to