whoops! just realized I was retyring the function even on success. didn't pay enough attention to the output from my calls. Slightly updated definitions:
class RetryFunction[-A](nTries: Int,f: A => Unit) extends Function[A,Unit] { def apply(a: A): Unit = { var tries = 0 var success = false while(!success && tries < nTries) { tries += 1 try { f(a) success = true } catch { case scala.util.control.NonFatal(ex) => println(s"failed on input $a, try $tries with $ex") } } } } implicit class Retryable[A](f: A => Unit) { def retryable(nTries:Int): RetryFunction[A] = new RetryFunction(nTries,f) } def tenDiv(x:Int) = println(x + " ---> " + (10 / x)) and example usage: scala> (-2 to 2).foreach{(tenDiv _).retryable(3)} -2 ---> -5 -1 ---> -10 failed on input 0, try 1 with java.lang.ArithmeticException: / by zero failed on input 0, try 2 with java.lang.ArithmeticException: / by zero failed on input 0, try 3 with java.lang.ArithmeticException: / by zero 1 ---> 10 2 ---> 5 On Thu, Jul 24, 2014 at 2:58 PM, Imran Rashid <im...@therashids.com> wrote: > Hi Art, > > I have some advice that isn't spark-specific at all, so it doesn't > *exactly* address your questions, but you might still find helpful. I > think using an implicit to add your retyring behavior might be useful. I > can think of two options: > > 1. enriching RDD itself, eg. to add a .retryForeach, which would have the > desired behavior. > > 2. enriching Function to create a variant with retry behavior. > > I prefer option 2, because it could be useful outside of spark, and even > within spark, you might realize you want to do something similar for more > than just foreach. > > Here's an example. (probably there is a more functional way to do this, > to avoid the while loop, but my brain isn't working and that's not the > point of this anyway) > > Lets say we have this function: > > def tenDiv(x:Int) = println(10 / x) > > and we try applying it to a normal old Range: > > scala> (-10 to 10).foreach{tenDiv} > -1 > -1 > -1 > -1 > -1 > -2 > -2 > -3 > -5 > -10 > java.lang.ArithmeticException: / by zero > at .tenDiv(<console>:7) > > > We can create enrich Function to add some retry behavior: > > class RetryFunction[-A](nTries: Int,f: A => Unit) extends Function[A,Unit] > { > def apply(a: A): Unit = { > var tries = 0 > var success = false > while(!success && tries < nTries) { > tries += 1 > try { > f(a) > } catch { > case scala.util.control.NonFatal(ex) => > println(s"failed on try $tries with $ex") > } > } > } > } > > implicit class Retryable[A](f: A => Unit) { > def retryable(nTries:Int): RetryFunction[A] = new RetryFunction(nTries,f) > } > > > > We "activate" this behavior by calling .retryable(nTries) on our method. > Like so: > > scala> (-2 to 2).foreach{(tenDiv _).retryable(1)} > -5 > -10 > failed on try 1 with java.lang.ArithmeticException: / by zero > 10 > 5 > > scala> (-2 to 2).foreach{(tenDiv _).retryable(3)} > -5 > -5 > -5 > -10 > -10 > -10 > failed on try 1 with java.lang.ArithmeticException: / by zero > failed on try 2 with java.lang.ArithmeticException: / by zero > failed on try 3 with java.lang.ArithmeticException: / by zero > 10 > 10 > 10 > 5 > 5 > 5 > > > You could do the same thing on closures you pass to RDD.foreach. > > I should add, that I'm often very hesitant to use implicits because in can > make it harder to follow what's going on in the code. I think this version > is OK, though, b/c somebody coming along later and looking at the code at > least can see the call to "retryable" as a clue. (I really dislike > implicit conversions that happen without any hints in the actual code.) > Hopefully that's enough of a hint for others to figure out what is going > on. Eg., intellij will know where that method came from and jump to it, > and also if you make the name unique enough, you can probably find it with > plain text search / c-tags. But, its definitely worth considering for > yourself. > > hope this helps, > Imran > > > > On Thu, Jul 24, 2014 at 1:12 PM, Art Peel <found...@gmail.com> wrote: > >> Our system works with RDDs generated from Hadoop files. It processes each >> record in a Hadoop file and for a subset of those records generates output >> that is written to an external system via RDD.foreach. There are no >> dependencies between the records that are processed. >> >> If writing to the external system fails (due to a detail of what is being >> written) and throws an exception, I see the following behavior: >> >> 1. Spark retries the entire partition (thus wasting time and effort), >> reaches the problem record and fails again. >> 2. It repeats step 1 up to the default 4 tries and then gives up. As a >> result, the rest of records from that Hadoop file are not processed. >> 3. The executor where the 4th failure occurred is marked as failed and >> told to shut down and thus I lose a core for processing the remaining >> Hadoop files, thus slowing down the entire process. >> >> >> For this particular problem, I know how to prevent the underlying >> exception, but I'd still like to get a handle on error handling for future >> situations that I haven't yet encountered. >> >> My goal is this: >> Retry the problem record only (rather than starting over at the beginning >> of the partition) up to N times, then give up and move on to process the >> rest of the partition. >> >> As far as I can tell, I need to supply my own retry behavior and if I >> want to process records after the problem record I have to swallow >> exceptions inside the foreach block. >> >> My 2 questions are: >> 1. Is there anything I can do to prevent the executor from being shut >> down when a failure occurs? >> >> >> 2. Are there ways Spark can help me get closer to my goal of retrying >> only the problem record without writing my own re-try code and swallowing >> exceptions? >> >> Regards, >> Art >> >> >