Hello, I find myself in need of being able to process a large number of files (28M) stored in a deeply nested folder hierarchy (Pairtree... a multi-level hashtable-on-disk -like structure). Here's an example path:
./udel/pairtree_root/31/74/11/11/56/89/39/31741111568939/31741111568939.zip I can't scan the entire folder structure ahead of time to build a list of files, and then use sc.parallelize(list, ...) to create and RDD to process, because traversing the entire folder structure would take a very long time. (also, the folder content is fluid, meaning that files are added and deleted periodically) I'm thinking I would need to use Spark Streaming whereby I use something like java.nio.Files.walkFileTree(...) to "discover" these files and "stream" them into Spark as they're found. What do the experts think? Is there a better way of handling this? The unit of parallelization is a single file (i.e. the "processing" operates at a file level). Here's what I've created so far: class DirectoryExplorerReceiver(dir: String, accept: (String, BasicFileAttributes) => Boolean) extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) { override def onStart(): Unit = { val context = scala.concurrent.ExecutionContext.Implicits.global val rootPath = FileSystems.getDefault.getPath(dir) context.execute(new Runnable { override def run(): Unit = Files.walkFileTree(rootPath, new SimpleFileVisitor[Path] { override def visitFile(file: Path, attrs: BasicFileAttributes): FileVisitResult = { if (isStopped()) return FileVisitResult.TERMINATE if (accept(file.toString, attrs)) store(file.toFile.getPath) FileVisitResult.CONTINUE } }) }) } override def onStop(): Unit = {} } And use it like this: val ssc = new StreamingContext(conf, Seconds(10)) val zipFiles = ssc.receiverStream( new DirectoryExplorerReceiver( dataDir, (filePath, attrs) => attrs.isRegularFile && filePath.toLowerCase.endsWith(".zip") && attrs.size() > 0 ) ) val processedData = zipFiles.map(zipFile => doSomethingUseful(zipFile)) This basically uses a 10 second window to "stream" all files discovered during that window. Thanks for any comments/suggestions. -Boris