continuing processing when errors occur

2014-07-24 Thread Art Peel
Our system works with RDDs generated from Hadoop files. It processes each
record in a Hadoop file and for a subset of those records generates output
that is written to an external system via RDD.foreach. There are no
dependencies between the records that are processed.

If writing to the external system fails (due to a detail of what is being
written) and throws an exception, I see the following behavior:

1. Spark retries the entire partition (thus wasting time and effort),
reaches the problem record and fails again.
2. It repeats step 1 up to the default 4 tries and then gives up. As a
result, the rest of records from that Hadoop file are not processed.
3. The executor where the 4th failure occurred is marked as failed and told
to shut down and thus I lose a core for processing the remaining Hadoop
files, thus slowing down the entire process.

For this particular problem, I know how to prevent the underlying
exception, but I'd still like to get a handle on error handling for future
situations that I haven't yet encountered.

My goal is this:
Retry the problem record only (rather than starting over at the beginning
of the partition) up to N times, then give up and move on to process the
rest of the partition.

As far as I can tell, I need to supply my own retry behavior and if I want
to process records after the problem record I have to swallow exceptions
inside the foreach block.

My 2 questions are:
1. Is there anything I can do to prevent the executor from being shut down
when a failure occurs?

2. Are there ways Spark can help me get closer to my goal of retrying only
the problem record without writing my own re-try code and swallowing
exceptions?

Regards,
Art


processing s3n:// files in parallel

2014-04-28 Thread Art Peel
I’m trying to process 3 S3 files in parallel, but they always get processed
serially.
Am I going about this the wrong way?  Full details below.

Regards,
Art



I’m trying to do the following on a Spark cluster with 3 slave nodes.

Given a list of N URLS for files in S3 (with s3n:// urls),

For each file:
1. search for some items
2. record the findings from step 1 in an external data store.

I’d like to process the 3 URLs in parallel on 3 different slave nodes, but
instead, they are getting processed serially on one node.

I’ve tried various versions of my code to no avail. I’m also creating the
SparkContext with "spark.scheduler.mode", "FAIR”.

Have I made a fundamental mistake?

I’m running Spark 0.9.1 and my code looks roughly like this:

def processFiles(sc: SparkContext) {

val urls = List("s3n://foo/file.txt", "s3n://bar/file.txt",
"s3n://baz/file.txt")

val hadoopFiles = urls.map(url => {
  sc.hadoopFile(url, classOf[FooInputFormat], classOf[LongWritable],
classOf[WritableFooRecord])
})

val matches = hadoopFiles.par.map((hadoopFile) => {

  findMatches(hadoopFile)
})


matches.map((matchRDD) => {
  recordMatches(matchRDD)
})
  }


  def findMatches(hadoopFile: RDD): RDD = {

hadoopFile.map( record => caseClassResultFromSearch(record) )

  }

  def recordMatches(matchRDD: RDD) {


matchRDD.foreachWith(_ => {

  makeRestClient(config)

// I get 3 jobs referring to the line number of the next line, but the
jobs run serially on one node.
})((matchRecord, client) => {

  client.storeResult(matchRecord)

}


  }