Oh snap! I didn’t know that!

Confirmed that both the wildcard syntax and the comma-separated syntax work
in PySpark. For example:

sc.textFile('s3n://file1,s3n://file2').count()

Art,

Would this approach work for you? It would let you load your 3 files into a
single RDD, which your workers could then all work on in parallel.

Nick

On Mon, Apr 28, 2014 at 9:09 PM, Matei Zaharia <matei.zaha...@gmail.com>
wrote:

Actually wildcards work too, e.g. s3n://bucket/file1*, and I believe so do
> comma-separated lists (e.g. s3n://file1,s3n://file2). These are all
> inherited from FileInputFormat in Hadoop.
>
> Matei
>
> On Apr 28, 2014, at 6:05 PM, Andrew Ash <and...@andrewash.com> wrote:
>
> This is already possible with the sc.textFile("/path/to/filedir/*") call.
> Does that work for you?
>
> Sent from my mobile phone
> On Apr 29, 2014 2:46 AM, "Nicholas Chammas" <nicholas.cham...@gmail.com>
> wrote:
>
>> It would be useful to have some way to open multiple files at once into a
>> single RDD (e.g. sc.textFile(iterable_over_uris)). Logically, it would
>> be equivalent to opening a single file which is made by concatenating the
>> various files together. This would only be useful, of course, if the source
>> files were all in the same format.
>>
>> Nick
>>
>>
>> On Mon, Apr 28, 2014 at 7:40 PM, Andrew Ash <and...@andrewash.com> wrote:
>>
>>> The way you've written it there, I would expect that to be serial runs.
>>>  The reason is, you're looping over matches with a driver-level map, which
>>> is serialized.  Then calling foreachWith on the RDDs executes the action in
>>> a blocking way, so you don't get a result back until the cluster finishes.
>>>
>>> You can have multiple jobs running at the same time though by sharing a
>>> SparkContext among threads.  Rather than run all the foreachWith()s in
>>> serial on a single thread in the driver, try running each in its own thread.
>>>
>>>
>>>
>>>
>>> On Tue, Apr 29, 2014 at 1:35 AM, Art Peel <found...@gmail.com> wrote:
>>>
>>>>
>>>> I’m trying to process 3 S3 files in parallel, but they always get
>>>> processed serially.
>>>> Am I going about this the wrong way?  Full details below.
>>>>
>>>> Regards,
>>>> Art
>>>>
>>>>
>>>>
>>>> I’m trying to do the following on a Spark cluster with 3 slave nodes.
>>>>
>>>> Given a list of N URLS for files in S3 (with s3n:// urls),
>>>>
>>>> For each file:
>>>> 1. search for some items
>>>>         2. record the findings from step 1 in an external data store.
>>>>
>>>> I’d like to process the 3 URLs in parallel on 3 different slave nodes,
>>>> but instead, they are getting processed serially on one node.
>>>>
>>>> I’ve tried various versions of my code to no avail. I’m also creating
>>>> the SparkContext with "spark.scheduler.mode", "FAIR”.
>>>>
>>>> Have I made a fundamental mistake?
>>>>
>>>> I’m running Spark 0.9.1 and my code looks roughly like this:
>>>>
>>>> def processFiles(sc: SparkContext) {
>>>>
>>>>     val urls = List("s3n://foo/file.txt", "s3n://bar/file.txt", "
>>>> s3n://baz/file.txt")
>>>>
>>>>     val hadoopFiles = urls.map(url => {
>>>>       sc.hadoopFile(url, classOf[FooInputFormat],
>>>> classOf[LongWritable], classOf[WritableFooRecord])
>>>>     })
>>>>
>>>>     val matches = hadoopFiles.par.map((hadoopFile) => {
>>>>
>>>>       findMatches(hadoopFile)
>>>>     })
>>>>
>>>>
>>>>     matches.map((matchRDD) => {
>>>>       recordMatches(matchRDD)
>>>>     })
>>>>   }
>>>>
>>>>
>>>>   def findMatches(hadoopFile: RDD): RDD = {
>>>>
>>>>     hadoopFile.map( record => caseClassResultFromSearch(record) )
>>>>
>>>>   }
>>>>
>>>>   def recordMatches(matchRDD: RDD) {
>>>>
>>>>
>>>>     matchRDD.foreachWith(_ => {
>>>>
>>>>       makeRestClient(config)
>>>>
>>>>     // I get 3 jobs referring to the line number of the next line, but
>>>> the jobs run serially on one node.
>>>>     })((matchRecord, client) => {
>>>>
>>>>       client.storeResult(matchRecord)
>>>>
>>>>     }
>>>>
>>>>
>>>>   }
>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to