The way you've written it there, I would expect that to be serial runs.
 The reason is, you're looping over matches with a driver-level map, which
is serialized.  Then calling foreachWith on the RDDs executes the action in
a blocking way, so you don't get a result back until the cluster finishes.

You can have multiple jobs running at the same time though by sharing a
SparkContext among threads.  Rather than run all the foreachWith()s in
serial on a single thread in the driver, try running each in its own thread.




On Tue, Apr 29, 2014 at 1:35 AM, Art Peel <found...@gmail.com> wrote:

>
> I’m trying to process 3 S3 files in parallel, but they always get
> processed serially.
> Am I going about this the wrong way?  Full details below.
>
> Regards,
> Art
>
>
>
> I’m trying to do the following on a Spark cluster with 3 slave nodes.
>
> Given a list of N URLS for files in S3 (with s3n:// urls),
>
> For each file:
> 1. search for some items
>         2. record the findings from step 1 in an external data store.
>
> I’d like to process the 3 URLs in parallel on 3 different slave nodes, but
> instead, they are getting processed serially on one node.
>
> I’ve tried various versions of my code to no avail. I’m also creating the
> SparkContext with "spark.scheduler.mode", "FAIR”.
>
> Have I made a fundamental mistake?
>
> I’m running Spark 0.9.1 and my code looks roughly like this:
>
> def processFiles(sc: SparkContext) {
>
>     val urls = List("s3n://foo/file.txt", "s3n://bar/file.txt",
> "s3n://baz/file.txt")
>
>     val hadoopFiles = urls.map(url => {
>       sc.hadoopFile(url, classOf[FooInputFormat], classOf[LongWritable],
> classOf[WritableFooRecord])
>     })
>
>     val matches = hadoopFiles.par.map((hadoopFile) => {
>
>       findMatches(hadoopFile)
>     })
>
>
>     matches.map((matchRDD) => {
>       recordMatches(matchRDD)
>     })
>   }
>
>
>   def findMatches(hadoopFile: RDD): RDD = {
>
>     hadoopFile.map( record => caseClassResultFromSearch(record) )
>
>   }
>
>   def recordMatches(matchRDD: RDD) {
>
>
>     matchRDD.foreachWith(_ => {
>
>       makeRestClient(config)
>
>     // I get 3 jobs referring to the line number of the next line, but the
> jobs run serially on one node.
>     })((matchRecord, client) => {
>
>       client.storeResult(matchRecord)
>
>     }
>
>
>   }
>
>
>

Reply via email to