continuing processing when errors occur
Our system works with RDDs generated from Hadoop files. It processes each record in a Hadoop file and for a subset of those records generates output that is written to an external system via RDD.foreach. There are no dependencies between the records that are processed. If writing to the external system fails (due to a detail of what is being written) and throws an exception, I see the following behavior: 1. Spark retries the entire partition (thus wasting time and effort), reaches the problem record and fails again. 2. It repeats step 1 up to the default 4 tries and then gives up. As a result, the rest of records from that Hadoop file are not processed. 3. The executor where the 4th failure occurred is marked as failed and told to shut down and thus I lose a core for processing the remaining Hadoop files, thus slowing down the entire process. For this particular problem, I know how to prevent the underlying exception, but I'd still like to get a handle on error handling for future situations that I haven't yet encountered. My goal is this: Retry the problem record only (rather than starting over at the beginning of the partition) up to N times, then give up and move on to process the rest of the partition. As far as I can tell, I need to supply my own retry behavior and if I want to process records after the problem record I have to swallow exceptions inside the foreach block. My 2 questions are: 1. Is there anything I can do to prevent the executor from being shut down when a failure occurs? 2. Are there ways Spark can help me get closer to my goal of retrying only the problem record without writing my own re-try code and swallowing exceptions? Regards, Art
processing s3n:// files in parallel
I’m trying to process 3 S3 files in parallel, but they always get processed serially. Am I going about this the wrong way? Full details below. Regards, Art I’m trying to do the following on a Spark cluster with 3 slave nodes. Given a list of N URLS for files in S3 (with s3n:// urls), For each file: 1. search for some items 2. record the findings from step 1 in an external data store. I’d like to process the 3 URLs in parallel on 3 different slave nodes, but instead, they are getting processed serially on one node. I’ve tried various versions of my code to no avail. I’m also creating the SparkContext with "spark.scheduler.mode", "FAIR”. Have I made a fundamental mistake? I’m running Spark 0.9.1 and my code looks roughly like this: def processFiles(sc: SparkContext) { val urls = List("s3n://foo/file.txt", "s3n://bar/file.txt", "s3n://baz/file.txt") val hadoopFiles = urls.map(url => { sc.hadoopFile(url, classOf[FooInputFormat], classOf[LongWritable], classOf[WritableFooRecord]) }) val matches = hadoopFiles.par.map((hadoopFile) => { findMatches(hadoopFile) }) matches.map((matchRDD) => { recordMatches(matchRDD) }) } def findMatches(hadoopFile: RDD): RDD = { hadoopFile.map( record => caseClassResultFromSearch(record) ) } def recordMatches(matchRDD: RDD) { matchRDD.foreachWith(_ => { makeRestClient(config) // I get 3 jobs referring to the line number of the next line, but the jobs run serially on one node. })((matchRecord, client) => { client.storeResult(matchRecord) } }