Thanks to folks here for the suggestions. I ended up settling on what seems
to be a simple and scalable approach. I am no longer using
sparkContext.textFiles with wildcards (it is too slow when working with a
large number of files). Instead, I have implemented directory traversal as
a Spark job, which enables it to parallelize across the cluster.

First, a couple of functions. One to traverse directories, and another to
get the lines in a file:

  def list_file_names(path: String): Seq[String] = {
    val fs = FileSystem.get(new java.net.URI(path), hadoopConfiguration)
    def f(path: Path): Seq[String] = {
      Option(fs.listStatus(path)).getOrElse(Array[FileStatus]()).
      flatMap {
        case fileStatus if fileStatus.isDir ⇒ f(fileStatus.getPath)
        case fileStatus ⇒ Seq(fileStatus.getPath.toString)
      }
    }
    f(new Path(path))
  }

  def read_log_file(path: String): Seq[String] = {
    val fs = FileSystem.get(new java.net.URI(path), hadoopConfiguration)
    val file = fs.open(new Path(path))
    val source = Source.fromInputStream(file)
    source.getLines.toList
  }

Next, I generate a list of "root" paths to scan:

  val paths =
    for {
      record_type ← record_types
      year ← years
      month ← months
      day ← days
      hour ← hours
    } yield s"s3n://s3-bucket-name/$record_type/$year/$month/$day/$hour/"
  }

(In this case, I generate one path per hour per record type.)

Finally, using Spark, I can build an RDD with the contents of every file in
the path list:

val rdd: RDD[String] =
    sparkContext.
    parallelize(paths, paths.size).
    flatMap(list_file_names).
    flatMap(read_log_file)

I am posting this info here with the hope that it will be useful to
somebody in the future.

L


On Tue, Oct 7, 2014 at 12:58 AM, deenar.toraskar <deenar.toras...@db.com>
wrote:

> Hi Landon
>
> I had a problem very similar to your, where we have to process around 5
> million relatively small files on NFS. After trying various options, we did
> something similar to what Matei suggested.
>
> 1) take the original path and find the subdirectories under that path and
> then parallelize the resulting list. you can configure the depth you want
> to
> go down to before sending the paths across the cluster.
>
>   def getFileList(srcDir:File, depth:Int) : List[File] = {
>     var list : ListBuffer[File] = new ListBuffer[File]()
>     if (srcDir.isDirectory()) {
>     srcDir.listFiles() .foreach((file: File) =>
>        if (file.isFile()) {
>           list +=(file)
>        } else {
>           if (depth > 0 ) {
>              list ++= getFileList(file, (depth- 1 ))
>           }
>    else if (depth < 0) {
>         list ++= getFileList(file, (depth))
>           }
>        else {
>           list += file
>        }
>     })
>     }
>     else {
>        list += srcDir
>     }
>     list .toList
>   }
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Strategies-for-reading-large-numbers-of-files-tp15644p15835.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
*Landon Kuhn*, *Software Architect*, Janrain, Inc. <http://bit.ly/cKKudR>
E: lan...@janrain.com | M: 971-645-5501 | F: 888-267-9025
Follow Janrain: Facebook <http://bit.ly/9CGHdf> | Twitter
<http://bit.ly/9umxlK> | YouTube <http://bit.ly/N0OiBT> | LinkedIn
<http://bit.ly/a7WZMC> | Blog <http://bit.ly/OI2uOR>
Follow Me: LinkedIn <http://www.linkedin.com/in/landonkuhn>
-------------------------------------------------------------------------------------
*Acquire, understand, and engage your users. Watch our video
<http://bit.ly/janrain-overview> or sign up for a live demo
<http://bit.ly/janraindemo> to see what it's all about.*

Reply via email to