Thanks to folks here for the suggestions. I ended up settling on what seems to be a simple and scalable approach. I am no longer using sparkContext.textFiles with wildcards (it is too slow when working with a large number of files). Instead, I have implemented directory traversal as a Spark job, which enables it to parallelize across the cluster.
First, a couple of functions. One to traverse directories, and another to get the lines in a file: def list_file_names(path: String): Seq[String] = { val fs = FileSystem.get(new java.net.URI(path), hadoopConfiguration) def f(path: Path): Seq[String] = { Option(fs.listStatus(path)).getOrElse(Array[FileStatus]()). flatMap { case fileStatus if fileStatus.isDir ⇒ f(fileStatus.getPath) case fileStatus ⇒ Seq(fileStatus.getPath.toString) } } f(new Path(path)) } def read_log_file(path: String): Seq[String] = { val fs = FileSystem.get(new java.net.URI(path), hadoopConfiguration) val file = fs.open(new Path(path)) val source = Source.fromInputStream(file) source.getLines.toList } Next, I generate a list of "root" paths to scan: val paths = for { record_type ← record_types year ← years month ← months day ← days hour ← hours } yield s"s3n://s3-bucket-name/$record_type/$year/$month/$day/$hour/" } (In this case, I generate one path per hour per record type.) Finally, using Spark, I can build an RDD with the contents of every file in the path list: val rdd: RDD[String] = sparkContext. parallelize(paths, paths.size). flatMap(list_file_names). flatMap(read_log_file) I am posting this info here with the hope that it will be useful to somebody in the future. L On Tue, Oct 7, 2014 at 12:58 AM, deenar.toraskar <deenar.toras...@db.com> wrote: > Hi Landon > > I had a problem very similar to your, where we have to process around 5 > million relatively small files on NFS. After trying various options, we did > something similar to what Matei suggested. > > 1) take the original path and find the subdirectories under that path and > then parallelize the resulting list. you can configure the depth you want > to > go down to before sending the paths across the cluster. > > def getFileList(srcDir:File, depth:Int) : List[File] = { > var list : ListBuffer[File] = new ListBuffer[File]() > if (srcDir.isDirectory()) { > srcDir.listFiles() .foreach((file: File) => > if (file.isFile()) { > list +=(file) > } else { > if (depth > 0 ) { > list ++= getFileList(file, (depth- 1 )) > } > else if (depth < 0) { > list ++= getFileList(file, (depth)) > } > else { > list += file > } > }) > } > else { > list += srcDir > } > list .toList > } > > > > > > -- > View this message in context: > http://apache-spark-user-list.1001560.n3.nabble.com/Strategies-for-reading-large-numbers-of-files-tp15644p15835.html > Sent from the Apache Spark User List mailing list archive at Nabble.com. > > --------------------------------------------------------------------- > To unsubscribe, e-mail: user-unsubscr...@spark.apache.org > For additional commands, e-mail: user-h...@spark.apache.org > > -- *Landon Kuhn*, *Software Architect*, Janrain, Inc. <http://bit.ly/cKKudR> E: lan...@janrain.com | M: 971-645-5501 | F: 888-267-9025 Follow Janrain: Facebook <http://bit.ly/9CGHdf> | Twitter <http://bit.ly/9umxlK> | YouTube <http://bit.ly/N0OiBT> | LinkedIn <http://bit.ly/a7WZMC> | Blog <http://bit.ly/OI2uOR> Follow Me: LinkedIn <http://www.linkedin.com/in/landonkuhn> ------------------------------------------------------------------------------------- *Acquire, understand, and engage your users. Watch our video <http://bit.ly/janrain-overview> or sign up for a live demo <http://bit.ly/janraindemo> to see what it's all about.*