This is already possible with the sc.textFile("/path/to/filedir/*") call.
Does that work for you?

Sent from my mobile phone
On Apr 29, 2014 2:46 AM, "Nicholas Chammas" <nicholas.cham...@gmail.com>
wrote:

> It would be useful to have some way to open multiple files at once into a
> single RDD (e.g. sc.textFile(iterable_over_uris)). Logically, it would be
> equivalent to opening a single file which is made by concatenating the
> various files together. This would only be useful, of course, if the source
> files were all in the same format.
>
> Nick
>
>
> On Mon, Apr 28, 2014 at 7:40 PM, Andrew Ash <and...@andrewash.com> wrote:
>
>> The way you've written it there, I would expect that to be serial runs.
>>  The reason is, you're looping over matches with a driver-level map, which
>> is serialized.  Then calling foreachWith on the RDDs executes the action in
>> a blocking way, so you don't get a result back until the cluster finishes.
>>
>> You can have multiple jobs running at the same time though by sharing a
>> SparkContext among threads.  Rather than run all the foreachWith()s in
>> serial on a single thread in the driver, try running each in its own thread.
>>
>>
>>
>>
>> On Tue, Apr 29, 2014 at 1:35 AM, Art Peel <found...@gmail.com> wrote:
>>
>>>
>>> I’m trying to process 3 S3 files in parallel, but they always get
>>> processed serially.
>>> Am I going about this the wrong way?  Full details below.
>>>
>>> Regards,
>>> Art
>>>
>>>
>>>
>>> I’m trying to do the following on a Spark cluster with 3 slave nodes.
>>>
>>> Given a list of N URLS for files in S3 (with s3n:// urls),
>>>
>>> For each file:
>>> 1. search for some items
>>>         2. record the findings from step 1 in an external data store.
>>>
>>> I’d like to process the 3 URLs in parallel on 3 different slave nodes,
>>> but instead, they are getting processed serially on one node.
>>>
>>> I’ve tried various versions of my code to no avail. I’m also creating
>>> the SparkContext with "spark.scheduler.mode", "FAIR”.
>>>
>>> Have I made a fundamental mistake?
>>>
>>> I’m running Spark 0.9.1 and my code looks roughly like this:
>>>
>>> def processFiles(sc: SparkContext) {
>>>
>>>     val urls = List("s3n://foo/file.txt", "s3n://bar/file.txt",
>>> "s3n://baz/file.txt")
>>>
>>>     val hadoopFiles = urls.map(url => {
>>>       sc.hadoopFile(url, classOf[FooInputFormat], classOf[LongWritable],
>>> classOf[WritableFooRecord])
>>>     })
>>>
>>>     val matches = hadoopFiles.par.map((hadoopFile) => {
>>>
>>>       findMatches(hadoopFile)
>>>     })
>>>
>>>
>>>     matches.map((matchRDD) => {
>>>       recordMatches(matchRDD)
>>>     })
>>>   }
>>>
>>>
>>>   def findMatches(hadoopFile: RDD): RDD = {
>>>
>>>     hadoopFile.map( record => caseClassResultFromSearch(record) )
>>>
>>>   }
>>>
>>>   def recordMatches(matchRDD: RDD) {
>>>
>>>
>>>     matchRDD.foreachWith(_ => {
>>>
>>>       makeRestClient(config)
>>>
>>>     // I get 3 jobs referring to the line number of the next line, but
>>> the jobs run serially on one node.
>>>     })((matchRecord, client) => {
>>>
>>>       client.storeResult(matchRecord)
>>>
>>>     }
>>>
>>>
>>>   }
>>>
>>>
>>>
>>
>

Reply via email to