Repository: spark Updated Branches: refs/heads/master a1fc059b6 -> ce6ed2abd
[SPARK-3954][Streaming] Optimization to FileInputDStream about convert files to RDDS there are 3 loops with files sequence in spark source. loops files sequence: 1.files.map(...) 2.files.zip(fileRDDs) 3.files-size.foreach It's will very time consuming when lots of files.So I do the following correction: 3 loops with files sequence => only one loop Author: surq <s...@asiainfo.com> Closes #2811 from surq/SPARK-3954 and squashes the following commits: 321bbe8 [surq] updated the code style.The style from [for...yield]to [files.map(file=>{})] 88a2c20 [surq] Merge branch 'master' of https://github.com/apache/spark into SPARK-3954 178066f [surq] modify code's style. [Exceeds 100 columns] 626ef97 [surq] remove redundant import(ArrayBuffer) 739341f [surq] promote the speed of convert files to RDDS Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce6ed2ab Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce6ed2ab Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce6ed2ab Branch: refs/heads/master Commit: ce6ed2abd14de26b9ceaa415e9a42fbb1338f5fa Parents: a1fc059 Author: surq <s...@asiainfo.com> Authored: Mon Nov 10 17:37:16 2014 -0800 Committer: Tathagata Das <tathagata.das1...@gmail.com> Committed: Mon Nov 10 17:37:16 2014 -0800 ---------------------------------------------------------------------- .../org/apache/spark/streaming/dstream/FileInputDStream.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/ce6ed2ab/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala index 8152b75..55d6cf6 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala @@ -120,14 +120,15 @@ class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : Clas /** Generate one RDD from an array of files */ private def filesToRDD(files: Seq[String]): RDD[(K, V)] = { - val fileRDDs = files.map(file => context.sparkContext.newAPIHadoopFile[K, V, F](file)) - files.zip(fileRDDs).foreach { case (file, rdd) => { + val fileRDDs = files.map(file =>{ + val rdd = context.sparkContext.newAPIHadoopFile[K, V, F](file) if (rdd.partitions.size == 0) { logError("File " + file + " has no data in it. Spark Streaming can only ingest " + "files that have been \"moved\" to the directory assigned to the file stream. " + "Refer to the streaming programming guide for more details.") } - }} + rdd + }) new UnionRDD(context.sparkContext, fileRDDs) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org