whoops!  just realized I was retyring the function even on success.  didn't
pay enough attention to the output from my calls.  Slightly updated
definitions:

class RetryFunction[-A](nTries: Int,f: A => Unit) extends Function[A,Unit]
{
  def apply(a: A): Unit = {
    var tries = 0
    var success = false
    while(!success && tries < nTries) {
      tries += 1
      try {
        f(a)
        success = true
      } catch {
        case scala.util.control.NonFatal(ex) =>
          println(s"failed on input $a, try $tries with $ex")
      }
    }
  }
}

implicit class Retryable[A](f: A => Unit) {
  def retryable(nTries:Int): RetryFunction[A] = new RetryFunction(nTries,f)
}


def tenDiv(x:Int) = println(x + " ---> " + (10 / x))


and example usage:

scala> (-2 to 2).foreach{(tenDiv _).retryable(3)}
-2 ---> -5
-1 ---> -10
failed on input 0, try 1 with java.lang.ArithmeticException: / by zero
failed on input 0, try 2 with java.lang.ArithmeticException: / by zero
failed on input 0, try 3 with java.lang.ArithmeticException: / by zero
1 ---> 10
2 ---> 5





On Thu, Jul 24, 2014 at 2:58 PM, Imran Rashid <im...@therashids.com> wrote:

> Hi Art,
>
> I have some advice that isn't spark-specific at all, so it doesn't
> *exactly* address your questions, but you might still find helpful.  I
> think using an implicit to add your retyring behavior might be useful.  I
> can think of two options:
>
> 1. enriching RDD itself, eg. to add a .retryForeach, which would have the
> desired behavior.
>
> 2. enriching Function to create a variant with retry behavior.
>
> I prefer option 2, because it could be useful outside of spark, and even
> within spark, you might realize you want to do something similar for more
> than just foreach.
>
> Here's an example.  (probably there is a more functional way to do this,
> to avoid the while loop, but my brain isn't working and that's not the
> point of this anyway)
>
> Lets say we have this function:
>
> def tenDiv(x:Int) = println(10 / x)
>
> and we try applying it to a normal old Range:
>
> scala> (-10 to 10).foreach{tenDiv}
> -1
> -1
> -1
> -1
> -1
> -2
> -2
> -3
> -5
> -10
> java.lang.ArithmeticException: / by zero
>     at .tenDiv(<console>:7)
>
>
> We can create enrich Function to add some retry behavior:
>
> class RetryFunction[-A](nTries: Int,f: A => Unit) extends Function[A,Unit]
> {
>   def apply(a: A): Unit = {
>     var tries = 0
>     var success = false
>     while(!success && tries < nTries) {
>       tries += 1
>       try {
>         f(a)
>       } catch {
>         case scala.util.control.NonFatal(ex) =>
>           println(s"failed on try $tries with $ex")
>       }
>     }
>   }
> }
>
> implicit class Retryable[A](f: A => Unit) {
>   def retryable(nTries:Int): RetryFunction[A] = new RetryFunction(nTries,f)
> }
>
>
>
> We "activate" this behavior by calling .retryable(nTries) on our method.
> Like so:
>
> scala> (-2 to 2).foreach{(tenDiv _).retryable(1)}
> -5
> -10
> failed on try 1 with java.lang.ArithmeticException: / by zero
> 10
> 5
>
> scala> (-2 to 2).foreach{(tenDiv _).retryable(3)}
> -5
> -5
> -5
> -10
> -10
> -10
> failed on try 1 with java.lang.ArithmeticException: / by zero
> failed on try 2 with java.lang.ArithmeticException: / by zero
> failed on try 3 with java.lang.ArithmeticException: / by zero
> 10
> 10
> 10
> 5
> 5
> 5
>
>
> You could do the same thing on closures you pass to RDD.foreach.
>
> I should add, that I'm often very hesitant to use implicits because in can
> make it harder to follow what's going on in the code.  I think this version
> is OK, though, b/c somebody coming along later and looking at the code at
> least can see the call to "retryable" as a clue.  (I really dislike
> implicit conversions that happen without any hints in the actual code.)
> Hopefully that's enough of a hint for others to figure out what is going
> on.  Eg., intellij will know where that method came from and jump to it,
> and also if you make the name unique enough, you can probably find it with
> plain text search / c-tags.  But, its definitely worth considering for
> yourself.
>
> hope this helps,
> Imran
>
>
>
> On Thu, Jul 24, 2014 at 1:12 PM, Art Peel <found...@gmail.com> wrote:
>
>> Our system works with RDDs generated from Hadoop files. It processes each
>> record in a Hadoop file and for a subset of those records generates output
>> that is written to an external system via RDD.foreach. There are no
>> dependencies between the records that are processed.
>>
>> If writing to the external system fails (due to a detail of what is being
>> written) and throws an exception, I see the following behavior:
>>
>> 1. Spark retries the entire partition (thus wasting time and effort),
>> reaches the problem record and fails again.
>> 2. It repeats step 1 up to the default 4 tries and then gives up. As a
>> result, the rest of records from that Hadoop file are not processed.
>> 3. The executor where the 4th failure occurred is marked as failed and
>> told to shut down and thus I lose a core for processing the remaining
>> Hadoop files, thus slowing down the entire process.
>>
>>
>> For this particular problem, I know how to prevent the underlying
>> exception, but I'd still like to get a handle on error handling for future
>> situations that I haven't yet encountered.
>>
>> My goal is this:
>> Retry the problem record only (rather than starting over at the beginning
>> of the partition) up to N times, then give up and move on to process the
>> rest of the partition.
>>
>> As far as I can tell, I need to supply my own retry behavior and if I
>> want to process records after the problem record I have to swallow
>> exceptions inside the foreach block.
>>
>> My 2 questions are:
>> 1. Is there anything I can do to prevent the executor from being shut
>> down when a failure occurs?
>>
>>
>> 2. Are there ways Spark can help me get closer to my goal of retrying
>> only the problem record without writing my own re-try code and swallowing
>> exceptions?
>>
>> Regards,
>> Art
>>
>>
>

Reply via email to