Re: Strategies for reading large numbers of files

2014-10-21 Thread Landon Kuhn
Thanks to folks here for the suggestions. I ended up settling on what seems
to be a simple and scalable approach. I am no longer using
sparkContext.textFiles with wildcards (it is too slow when working with a
large number of files). Instead, I have implemented directory traversal as
a Spark job, which enables it to parallelize across the cluster.

First, a couple of functions. One to traverse directories, and another to
get the lines in a file:

  def list_file_names(path: String): Seq[String] = {
val fs = FileSystem.get(new java.net.URI(path), hadoopConfiguration)
def f(path: Path): Seq[String] = {
  Option(fs.listStatus(path)).getOrElse(Array[FileStatus]()).
  flatMap {
case fileStatus if fileStatus.isDir ⇒ f(fileStatus.getPath)
case fileStatus ⇒ Seq(fileStatus.getPath.toString)
  }
}
f(new Path(path))
  }

  def read_log_file(path: String): Seq[String] = {
val fs = FileSystem.get(new java.net.URI(path), hadoopConfiguration)
val file = fs.open(new Path(path))
val source = Source.fromInputStream(file)
source.getLines.toList
  }

Next, I generate a list of "root" paths to scan:

  val paths =
for {
  record_type ← record_types
  year ← years
  month ← months
  day ← days
  hour ← hours
} yield s"s3n://s3-bucket-name/$record_type/$year/$month/$day/$hour/"
  }

(In this case, I generate one path per hour per record type.)

Finally, using Spark, I can build an RDD with the contents of every file in
the path list:

val rdd: RDD[String] =
sparkContext.
parallelize(paths, paths.size).
flatMap(list_file_names).
flatMap(read_log_file)

I am posting this info here with the hope that it will be useful to
somebody in the future.

L


On Tue, Oct 7, 2014 at 12:58 AM, deenar.toraskar 
wrote:

> Hi Landon
>
> I had a problem very similar to your, where we have to process around 5
> million relatively small files on NFS. After trying various options, we did
> something similar to what Matei suggested.
>
> 1) take the original path and find the subdirectories under that path and
> then parallelize the resulting list. you can configure the depth you want
> to
> go down to before sending the paths across the cluster.
>
>   def getFileList(srcDir:File, depth:Int) : List[File] = {
> var list : ListBuffer[File] = new ListBuffer[File]()
> if (srcDir.isDirectory()) {
> srcDir.listFiles() .foreach((file: File) =>
>if (file.isFile()) {
>   list +=(file)
>} else {
>   if (depth > 0 ) {
>  list ++= getFileList(file, (depth- 1 ))
>   }
>else if (depth < 0) {
> list ++= getFileList(file, (depth))
>   }
>else {
>   list += file
>}
> })
> }
> else {
>list += srcDir
> }
> list .toList
>   }
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/Strategies-for-reading-large-numbers-of-files-tp15644p15835.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


-- 
*Landon Kuhn*, *Software Architect*, Janrain, Inc. <http://bit.ly/cKKudR>
E: lan...@janrain.com | M: 971-645-5501 | F: 888-267-9025
Follow Janrain: Facebook <http://bit.ly/9CGHdf> | Twitter
<http://bit.ly/9umxlK> | YouTube <http://bit.ly/N0OiBT> | LinkedIn
<http://bit.ly/a7WZMC> | Blog <http://bit.ly/OI2uOR>
Follow Me: LinkedIn <http://www.linkedin.com/in/landonkuhn>
-
*Acquire, understand, and engage your users. Watch our video
<http://bit.ly/janrain-overview> or sign up for a live demo
<http://bit.ly/janraindemo> to see what it's all about.*


Re: Strategies for reading large numbers of files

2014-10-06 Thread Landon Kuhn
Nicholas, thanks for the tip. Your suggestion certainly seemed like the
right approach, but after a few days of fiddling I've come to the
conclusion that s3distcp will not work for my use case. It is unable to
flatten directory hierarchies, which I need because my source directories
contain hour/minute/second parts.

See https://forums.aws.amazon.com/message.jspa?messageID=478960. It seems
that s3distcp can only combine files in the same path.

Thanks again. That gave me a lot to go on. Any further suggestions?

L


On Thu, Oct 2, 2014 at 4:15 PM, Nicholas Chammas  wrote:

> I believe this is known as the "Hadoop Small Files Problem", and it
> affects Spark as well. The best approach I've seen to merging small files
> like this is by using s3distcp, as suggested here
> <http://snowplowanalytics.com/blog/2013/05/30/dealing-with-hadoops-small-files-problem/>,
> as a pre-processing step.
>
> It would be great if Spark could somehow handle this common situation out
> of the box, but for now I don't think it does.
>
> Nick
>
> On Thu, Oct 2, 2014 at 7:10 PM, Landon Kuhn  wrote:
>
>> Hello, I'm trying to use Spark to process a large number of files in S3.
>> I'm running into an issue that I believe is related to the high number of
>> files, and the resources required to build the listing within the driver
>> program. If anyone in the Spark community can provide insight or guidance,
>> it would be greatly appreciated.
>>
>> The task at hand is to read ~100 million files stored in S3, and
>> repartition the data into a sensible number of files (perhaps 1,000). The
>> files are organized in a directory structure like so:
>>
>>
>> s3://bucket/event_type/year/month/day/hour/minute/second/customer_id/file_name
>>
>> (Note that each file is very small, containing 1-10 records each.
>> Unfortunately this is an artifact of the upstream systems that put data in
>> S3.)
>>
>> My Spark program is simple, and works when I target a relatively specific
>> subdirectory. For example:
>>
>>
>> sparkContext.textFile("s3n://bucket/purchase/2014/01/01/00/*/*/*/*").coalesce(...).write(...)
>>
>> This targets 1 hour's worth of purchase records, containing about 10,000
>> files. The driver program blocks (I assume it is making S3 calls to
>> traverse the directories), and during this time no activity is visible in
>> the driver UI. After about a minute, the stages and tasks allocate in the
>> UI, and then everything progresses and completes within a few minutes.
>>
>> I need to process all the data (several year's worth). Something like:
>>
>>
>> sparkContext.textFile("s3n://bucket/*/*/*/*/*/*/*/*/*").coalesce(...).write(...)
>>
>> This blocks "forever" (I have only run the program for as long as
>> overnight). The stages and tasks never appear in the UI. I assume Spark is
>> building the file listing, which will either take too long and/or cause the
>> driver to eventually run out of memory.
>>
>> I would appreciate any comments or suggestions. I'm happy to provide more
>> information if that would be helpful.
>>
>> Thanks
>>
>> Landon
>>
>>
>


-- 
*Landon Kuhn*, *Software Architect*, Janrain, Inc. <http://bit.ly/cKKudR>
E: lan...@janrain.com | M: 971-645-5501 | F: 888-267-9025
Follow Janrain: Facebook <http://bit.ly/9CGHdf> | Twitter
<http://bit.ly/9umxlK> | YouTube <http://bit.ly/N0OiBT> | LinkedIn
<http://bit.ly/a7WZMC> | Blog <http://bit.ly/OI2uOR>
Follow Me: LinkedIn <http://www.linkedin.com/in/landonkuhn>
-
*Acquire, understand, and engage your users. Watch our video
<http://bit.ly/janrain-overview> or sign up for a live demo
<http://bit.ly/janraindemo> to see what it's all about.*


Strategies for reading large numbers of files

2014-10-02 Thread Landon Kuhn
Hello, I'm trying to use Spark to process a large number of files in S3.
I'm running into an issue that I believe is related to the high number of
files, and the resources required to build the listing within the driver
program. If anyone in the Spark community can provide insight or guidance,
it would be greatly appreciated.

The task at hand is to read ~100 million files stored in S3, and
repartition the data into a sensible number of files (perhaps 1,000). The
files are organized in a directory structure like so:


s3://bucket/event_type/year/month/day/hour/minute/second/customer_id/file_name

(Note that each file is very small, containing 1-10 records each.
Unfortunately this is an artifact of the upstream systems that put data in
S3.)

My Spark program is simple, and works when I target a relatively specific
subdirectory. For example:


sparkContext.textFile("s3n://bucket/purchase/2014/01/01/00/*/*/*/*").coalesce(...).write(...)

This targets 1 hour's worth of purchase records, containing about 10,000
files. The driver program blocks (I assume it is making S3 calls to
traverse the directories), and during this time no activity is visible in
the driver UI. After about a minute, the stages and tasks allocate in the
UI, and then everything progresses and completes within a few minutes.

I need to process all the data (several year's worth). Something like:


sparkContext.textFile("s3n://bucket/*/*/*/*/*/*/*/*/*").coalesce(...).write(...)

This blocks "forever" (I have only run the program for as long as
overnight). The stages and tasks never appear in the UI. I assume Spark is
building the file listing, which will either take too long and/or cause the
driver to eventually run out of memory.

I would appreciate any comments or suggestions. I'm happy to provide more
information if that would be helpful.

Thanks

Landon