Hello,

I find myself in need of being able to process a large number of files (28M) 
stored in a deeply nested folder hierarchy (Pairtree... a multi-level 
hashtable-on-disk -like structure). Here's an example path: 

./udel/pairtree_root/31/74/11/11/56/89/39/31741111568939/31741111568939.zip

I can't scan the entire folder structure ahead of time to build a list of 
files, and then use sc.parallelize(list, ...) to create and RDD to process, 
because traversing the entire folder structure would take a very long time. 
(also, the folder content is fluid, meaning that files are added and deleted 
periodically)  I'm thinking I would need to use Spark Streaming whereby I use 
something like java.nio.Files.walkFileTree(...) to "discover" these files and 
"stream" them into Spark as they're found.

What do the experts think?  Is there a better way of handling this?
The unit of parallelization is a single file (i.e. the "processing" operates at 
a file level).

Here's what I've created so far:

class DirectoryExplorerReceiver(dir: String, accept: (String, 
BasicFileAttributes) => Boolean) 
  extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) {
  
  override def onStart(): Unit = {
    val context = scala.concurrent.ExecutionContext.Implicits.global
    val rootPath = FileSystems.getDefault.getPath(dir)
    
    context.execute(new Runnable {
      override def run(): Unit =
        Files.walkFileTree(rootPath, new SimpleFileVisitor[Path] {
          override def visitFile(file: Path, attrs: BasicFileAttributes): 
FileVisitResult = {
            if (isStopped()) 
              return FileVisitResult.TERMINATE

            if (accept(file.toString, attrs))
              store(file.toFile.getPath)

            FileVisitResult.CONTINUE
          }
        })
    })
  }

  override def onStop(): Unit = {}
}

And use it like this:

val ssc = new StreamingContext(conf, Seconds(10))
val zipFiles = ssc.receiverStream(
  new DirectoryExplorerReceiver(
    dataDir,
    (filePath, attrs) =>
        attrs.isRegularFile &&
        filePath.toLowerCase.endsWith(".zip") &&
        attrs.size() > 0
  )
)

val processedData = zipFiles.map(zipFile => doSomethingUseful(zipFile))

This basically uses a 10 second window to "stream" all files discovered during 
that window.

Thanks for any comments/suggestions.

-Boris

Reply via email to