This is already possible with the sc.textFile("/path/to/filedir/*") call. Does that work for you?
Sent from my mobile phone On Apr 29, 2014 2:46 AM, "Nicholas Chammas" <nicholas.cham...@gmail.com> wrote: > It would be useful to have some way to open multiple files at once into a > single RDD (e.g. sc.textFile(iterable_over_uris)). Logically, it would be > equivalent to opening a single file which is made by concatenating the > various files together. This would only be useful, of course, if the source > files were all in the same format. > > Nick > > > On Mon, Apr 28, 2014 at 7:40 PM, Andrew Ash <and...@andrewash.com> wrote: > >> The way you've written it there, I would expect that to be serial runs. >> The reason is, you're looping over matches with a driver-level map, which >> is serialized. Then calling foreachWith on the RDDs executes the action in >> a blocking way, so you don't get a result back until the cluster finishes. >> >> You can have multiple jobs running at the same time though by sharing a >> SparkContext among threads. Rather than run all the foreachWith()s in >> serial on a single thread in the driver, try running each in its own thread. >> >> >> >> >> On Tue, Apr 29, 2014 at 1:35 AM, Art Peel <found...@gmail.com> wrote: >> >>> >>> I’m trying to process 3 S3 files in parallel, but they always get >>> processed serially. >>> Am I going about this the wrong way? Full details below. >>> >>> Regards, >>> Art >>> >>> >>> >>> I’m trying to do the following on a Spark cluster with 3 slave nodes. >>> >>> Given a list of N URLS for files in S3 (with s3n:// urls), >>> >>> For each file: >>> 1. search for some items >>> 2. record the findings from step 1 in an external data store. >>> >>> I’d like to process the 3 URLs in parallel on 3 different slave nodes, >>> but instead, they are getting processed serially on one node. >>> >>> I’ve tried various versions of my code to no avail. I’m also creating >>> the SparkContext with "spark.scheduler.mode", "FAIR”. >>> >>> Have I made a fundamental mistake? >>> >>> I’m running Spark 0.9.1 and my code looks roughly like this: >>> >>> def processFiles(sc: SparkContext) { >>> >>> val urls = List("s3n://foo/file.txt", "s3n://bar/file.txt", >>> "s3n://baz/file.txt") >>> >>> val hadoopFiles = urls.map(url => { >>> sc.hadoopFile(url, classOf[FooInputFormat], classOf[LongWritable], >>> classOf[WritableFooRecord]) >>> }) >>> >>> val matches = hadoopFiles.par.map((hadoopFile) => { >>> >>> findMatches(hadoopFile) >>> }) >>> >>> >>> matches.map((matchRDD) => { >>> recordMatches(matchRDD) >>> }) >>> } >>> >>> >>> def findMatches(hadoopFile: RDD): RDD = { >>> >>> hadoopFile.map( record => caseClassResultFromSearch(record) ) >>> >>> } >>> >>> def recordMatches(matchRDD: RDD) { >>> >>> >>> matchRDD.foreachWith(_ => { >>> >>> makeRestClient(config) >>> >>> // I get 3 jobs referring to the line number of the next line, but >>> the jobs run serially on one node. >>> })((matchRecord, client) => { >>> >>> client.storeResult(matchRecord) >>> >>> } >>> >>> >>> } >>> >>> >>> >> >