Oh snap! I didn’t know that! Confirmed that both the wildcard syntax and the comma-separated syntax work in PySpark. For example:
sc.textFile('s3n://file1,s3n://file2').count() Art, Would this approach work for you? It would let you load your 3 files into a single RDD, which your workers could then all work on in parallel. Nick On Mon, Apr 28, 2014 at 9:09 PM, Matei Zaharia <matei.zaha...@gmail.com> wrote: Actually wildcards work too, e.g. s3n://bucket/file1*, and I believe so do > comma-separated lists (e.g. s3n://file1,s3n://file2). These are all > inherited from FileInputFormat in Hadoop. > > Matei > > On Apr 28, 2014, at 6:05 PM, Andrew Ash <and...@andrewash.com> wrote: > > This is already possible with the sc.textFile("/path/to/filedir/*") call. > Does that work for you? > > Sent from my mobile phone > On Apr 29, 2014 2:46 AM, "Nicholas Chammas" <nicholas.cham...@gmail.com> > wrote: > >> It would be useful to have some way to open multiple files at once into a >> single RDD (e.g. sc.textFile(iterable_over_uris)). Logically, it would >> be equivalent to opening a single file which is made by concatenating the >> various files together. This would only be useful, of course, if the source >> files were all in the same format. >> >> Nick >> >> >> On Mon, Apr 28, 2014 at 7:40 PM, Andrew Ash <and...@andrewash.com> wrote: >> >>> The way you've written it there, I would expect that to be serial runs. >>> The reason is, you're looping over matches with a driver-level map, which >>> is serialized. Then calling foreachWith on the RDDs executes the action in >>> a blocking way, so you don't get a result back until the cluster finishes. >>> >>> You can have multiple jobs running at the same time though by sharing a >>> SparkContext among threads. Rather than run all the foreachWith()s in >>> serial on a single thread in the driver, try running each in its own thread. >>> >>> >>> >>> >>> On Tue, Apr 29, 2014 at 1:35 AM, Art Peel <found...@gmail.com> wrote: >>> >>>> >>>> I’m trying to process 3 S3 files in parallel, but they always get >>>> processed serially. >>>> Am I going about this the wrong way? Full details below. >>>> >>>> Regards, >>>> Art >>>> >>>> >>>> >>>> I’m trying to do the following on a Spark cluster with 3 slave nodes. >>>> >>>> Given a list of N URLS for files in S3 (with s3n:// urls), >>>> >>>> For each file: >>>> 1. search for some items >>>> 2. record the findings from step 1 in an external data store. >>>> >>>> I’d like to process the 3 URLs in parallel on 3 different slave nodes, >>>> but instead, they are getting processed serially on one node. >>>> >>>> I’ve tried various versions of my code to no avail. I’m also creating >>>> the SparkContext with "spark.scheduler.mode", "FAIR”. >>>> >>>> Have I made a fundamental mistake? >>>> >>>> I’m running Spark 0.9.1 and my code looks roughly like this: >>>> >>>> def processFiles(sc: SparkContext) { >>>> >>>> val urls = List("s3n://foo/file.txt", "s3n://bar/file.txt", " >>>> s3n://baz/file.txt") >>>> >>>> val hadoopFiles = urls.map(url => { >>>> sc.hadoopFile(url, classOf[FooInputFormat], >>>> classOf[LongWritable], classOf[WritableFooRecord]) >>>> }) >>>> >>>> val matches = hadoopFiles.par.map((hadoopFile) => { >>>> >>>> findMatches(hadoopFile) >>>> }) >>>> >>>> >>>> matches.map((matchRDD) => { >>>> recordMatches(matchRDD) >>>> }) >>>> } >>>> >>>> >>>> def findMatches(hadoopFile: RDD): RDD = { >>>> >>>> hadoopFile.map( record => caseClassResultFromSearch(record) ) >>>> >>>> } >>>> >>>> def recordMatches(matchRDD: RDD) { >>>> >>>> >>>> matchRDD.foreachWith(_ => { >>>> >>>> makeRestClient(config) >>>> >>>> // I get 3 jobs referring to the line number of the next line, but >>>> the jobs run serially on one node. >>>> })((matchRecord, client) => { >>>> >>>> client.storeResult(matchRecord) >>>> >>>> } >>>> >>>> >>>> } >>>> >>>> >>>> >>> >> >