Thanks for reporting!
/Patrik

On Thu, Sep 18, 2014 at 6:52 PM, Chris Carter <ch...@carterventures.com>
wrote:

> Thanks Patrik, I've created an issue here:
> https://github.com/akka/akka/issues/15943
>
> I updated the example to include the RecoveryFailure option, I realized
> we're not handling that in our production code either.  Thanks for the
> reminder!
>
> Chris
>
> On Thursday, September 18, 2014 1:11:09 AM UTC-7, Patrik Nordwall wrote:
>>
>> Thank you, Chris!
>>
>> Now I understand the problem.
>> The exception in receiveRecover is caused by event.value.get, and that
>> triggers a restart and doesn't generate the RecoveryFailure message. I
>> agree that we should handle all kinds of exceptions from receiveRecover in
>> the same way.
>>
>> Please create an issue <https://github.com/akka/akka/issues>.
>>
>> The scenario I was thinking about was exceptions from de-serialization
>> and I have verified that those generates RecoveryFailure, and if that is
>> not handled the actor is by default stopped.
>>
>> [ERROR] [09/18/2014 10:02:23.450] 
>> [PersistentActorSpec-akka.actor.default-dispatcher-2]
>> [akka://PersistentActorSpec/user/$a] Processor killed after recovery
>> failure (persistent id = [my.test.id]). To avoid killing processors on
>> recovery failure, a processor must handle RecoveryFailure messages.
>> RecoveryFailure was caused by: java.io.InvalidClassException:
>> akka.persistence.PersistentActorSpec$MyEvent; local class incompatible:
>> stream classdesc serialVersionUID = 8162434214324358497, local class
>> serialVersionUID = -4420973383950594447 (akka.actor.ActorKilledException)
>>
>> By the way, you should use
>>   case _: RecoveryFailure =>
>> to handle the RecoveryFailure message.
>>
>> /Patrik
>>
>>
>> On Thu, Sep 18, 2014 at 8:24 AM, Chris Carter <ch...@carterventures.com>
>> wrote:
>>
>> Correction to the example in my previous email, this version actually
>> uses the test config generated by the code and also handles the
>> RecoveryFailure message:
>>
>> import java.util.concurrent.atomic.AtomicInteger
>>
>> import akka.actor._
>> import akka.persistence.{RecoveryFailure, PersistentActor}
>> import com.typesafe.config.ConfigFactory
>> import org.scalatest.FunSuite
>>
>> case class MyCommand(value: Option[String])
>> case class MyEvent(value: Option[String])
>>
>> object RecoveryMetric {
>>   val count = new AtomicInteger(0)
>>
>>   def inc() = count.incrementAndGet()
>> }
>>
>> class RecoverMe extends PersistentActor with ActorLogging {
>>
>>
>>   val receiveCommand: Receive = {
>>     case cmd: MyCommand => persist(MyEvent(cmd.value)) {
>>       event => log.info("Received: " + event.value.get)
>>     }
>>   }
>>
>>   val receiveRecover: Receive = {
>>     case event: MyEvent => {
>>       RecoveryMetric.inc()
>>       log.info("Recovered: " + event.value.get)
>>     }
>>     case RecoveryFailure => {
>>       self ! Kill
>>     }
>>   }
>>
>>   override def persistenceId: String = "my.test.id"
>> }
>>
>>
>> class RecoveryFailureTest extends FunSuite {
>>
>>   val cfg =
>>     s"""
>>         |akka.loggers = ["akka.event.slf4j.Slf4jLogger"]
>>         |akka.loglevel = "INFO"
>>         |akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
>>         |akka.remote.netty.tcp.port=2551
>>         |akka.remote.netty.tcp.hostname=127.0.0.1
>>         |akka.persistence.journal.plugin = "cassandra-journal"
>>         |cassandra-journal.contact-points = [ 127.0.0.1 ]
>>         |cassandra-journal.port = 9042
>>       """.stripMargin
>>
>>   val akkaCfg = ConfigFactory.parseString(cfg)
>>     .withFallback(ConfigFactory.load())
>>
>>   test("Recovery Fails repeatedly") {
>>     val system = ActorSystem("Test", akkaCfg)
>>     val actor = system.actorOf(Props.create(classOf[RecoverMe]))
>>     actor ! MyCommand(None)
>>     Thread.sleep(5000)
>>     system.shutdown()
>>     Thread.sleep(2000)
>>     println(s"Recovered ${RecoveryMetric.count.toString} times")
>>   }
>>
>> }
>>
>>
>>
>> On Wednesday, September 17, 2014 11:12:59 AM UTC-7, Chris Carter wrote:
>>
>> If it helps, here's a simple test case recreating the scenario as we're
>> experiencing it.  The environment we're running is:
>>
>>    - Cassandra persistence provider 0.3.3
>>    - Scala 2.11
>>    - Akka 2.3.5
>>    - Java 8 (have tested on 7 too)
>>
>> When I run the test below, I see: "Recovered 2933 times".
>>
>> import java.util.concurrent.atomic.AtomicInteger
>>
>> import akka.actor.{ActorLogging, ActorSystem, Props}
>> import akka.persistence.PersistentActor
>> import com.typesafe.config.ConfigFactory
>> import org.scalatest.FunSuite
>>
>> case class MyCommand(value: Option[String])
>> case class MyEvent(value: Option[String])
>>
>> object RecoveryMetric {
>>   val count = new AtomicInteger(0)
>>
>>   def inc() = count.incrementAndGet()
>> }
>>
>> class RecoverMe extends PersistentActor with ActorLogging {
>>
>>
>>   val receiveCommand: Receive = {
>>     case cmd: MyCommand => persist(MyEvent(cmd.value)) {
>>       event => log.info("Received: " + event.value.get)
>>     }
>>   }
>>
>>   val receiveRecover: Receive = {
>>     case event: MyEvent => {
>>       RecoveryMetric.inc()
>>       log.info("Recovered: " + event.value.get)
>>     }
>>   }
>>
>>   override def persistenceId: String = "my.test.id"
>> }
>>
>>
>> class RecoveryFailureTest extends FunSuite {
>>
>>
>>   val cfg =
>>     s"""
>>         |akka.loggers = ["akka.event.slf4j.Slf4jLogger"]
>>         |akka.loglevel = "INFO"
>>         |akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
>>         |akka.remote.netty.tcp.port=2551
>>         |akka.remote.netty.tcp.hostname=127.0.0.1
>>         |akka.persistence.journal.plugin = "cassandra-journal"
>>         |cassandra-journal.contact-points = [ 127.0.0.1 ]
>>         |cassandra-journal.port = 9042
>>       """.stripMargin
>>
>>   ConfigFactory.parseString(cfg)
>>     .withFallback(ConfigFactory.load())
>>
>>   test("Recovery Fails repeatedly") {
>>     val system = ActorSystem("Test")
>>     val actor = system.actorOf(Props.create(classOf[RecoverMe]))
>>     actor ! MyCommand(None)
>>     Thread.sleep(5000)
>>     system.shutdown()
>>     Thread.sleep(2000)
>>     println(s"Recovered ${RecoveryMetric.count.toString} times")
>>   }
>>
>> }
>>
>>
>>
>> On Wednesday, September 17, 2014 10:18:11 AM UTC-7, Patrik Nordwall wrote:
>>
>>
>>
>> On Wed, Sep 17, 2014 at 5:59 PM, Chris Carter <ch...@carterventures.com>
>> wrote:
>>
>> Hi Patrick,
>>
>> Thanks for the reply.  Yes, these are considerations we're working
>> through and absolutely a major point of design that we need to work out.
>> My primary reason for asking the question was to identify where we can
>> provide logic for identifying and dealing with the issue when it arises -
>> first in development as we're ferreting out possible failure scenarios, and
>> then in production in the event we run into corrupt data unexpectedly.
>> Ideally that won't happen, but sometimes things get missed, in which case
>> I'd rather the failure scenario be stop and notify rather than infinite
>> recovery attempts in a death loop.
>>
>>
>> Yes, an infinite recovery loop is not acceptable.
>>
>>
>>   For now, I did end up going ahead with the architecture you describe -
>> a sharded supervisor actor and a persistent child actor.
>>
>> I just checked our dependencies, and it looks like we're on 2.3.5.  I'll
>> try updating to 2.3.6 and see if that helps at all.  I was assuming it was
>> restarted based on what I saw in the logs and what I understand of the
>> actor lifecycle - we hadn't sent any new messages to the actor, but it was
>> stuck in a recovery loop where it was perpetually trying to recover the
>> same message over and over again (ie, the same failure was being logged
>> 1000s of times in a few seconds).  Does that make sense?
>>
>>
>> It makes sense. I will try it tomorrow and report back. You might have
>> found a bug. I expected that it would be stopped if recovery failed.
>>
>> /Patrik
>>
>>
>>
>> Either way - it sounds like the approach we settled on for now (the
>> supervisor + child) is the way to go, so we'll stick with that.  Thanks a
>> lot for the reply and advice!
>>
>> Chris
>>
>> On Wednesday, September 17, 2014 12:06:55 AM UTC-7, Patrik Nordwall wrote:
>>
>> Hi Chris,
>>
>> Currently it is not possible to define the supervision strategy for the
>> shard entries.
>> Your request is valid for consideration and I have created an issue:
>> https://github.com/akka/akka/issues/15901
>> It is not completely trivial, because what does it mean to stop an entry
>> actor when using cluster sharding? It will be started up again when next
>> message arrives.
>>
>> Exceptions during recovery means that the data is corrupt, and that is
>> supposed to stop the actor. Are you sure that it is restarted (in actor
>> lifecycle terminology
>> <http://doc.akka.io/docs/akka/2.3.6/scala/actors.html#Actor_Lifecycle>)?
>> Are you using latest 2.3.6 version? However, it will be started up again
>> when next message arrives, and then you will have same recovery failure.
>>
>> What you can do right now is to split up your entry actor in two actors,
>> one parent supervisor actor, with appropriate supervision strategy, and the
>> real entry actor as child. The parent actor would forward all messages to
>> the child.
>>
>> Regards,
>> Patrik
>>
>> On Tue, Sep 16, 2014 at 8:37 PM, Chris Carter <ch...@carterventures.com>
>> wrote:
>>
>> Hi there,
>>
>> We have an Akka Cluster which is using Cluster Sharding and Persistence
>> to create a series of actors which are sharded across the cluster, and
>> restored via the usual Persistence mechanism.  In certain cases we might
>> see a command come in which is not validated properly and persists an Event
>> which throws an exception upon recover, which then causes the Sharded actor
>> to restart and attempt recover again, which then causes the exception,
>> leading us into a spiraling loop of death.  Ideally we'll individually
>> handle all possible exceptions during Event recovery and properly validate
>> Commands, but while we're in development we're still working on that, so
>> I'd like to create a supervisor strategy for that particular Shard type
>> which will bail on recovery after a few tries.
>>
>> How do I go about this with Graph Sharding?  Right now it's being
>> instantiated from the ActorSystem itself, do I have to create a specific
>> guardian actor for particular Shard systems? Or do I just have to set the
>> default supervisor strategy for the whole system and rely on the Shard
>> actors picking that up?
>>
>> Thanks for any hints you guys can provide!
>>
>> Chris
>>
>> --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>> urrent/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+...@googlegroups.com.
>> To post to this group, send email to akka...@googlegroups.com.
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>>
>>
>>
>> --
>>
>> Patrik Nordwall
>> Typesafe <http://typesafe.com/> -  Reactive apps on the JVM
>> Twitter: @patriknw
>>
>>   --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/c
>> urrent/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+...@googlegroups.com.
>> To post to this group, send email to akka...@googlegroups.com.
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>>
>>
>>
>> --
>>
>> Patrik Nordwall
>> Typesafe <http://typesafe.com/> -  Reactive apps on the JVM
>> Twitter: @patriknw
>>
>>   --
>> >>>>>>>>>> Read the docs: http://akka.io/docs/
>> >>>>>>>>>> Check the FAQ: http://doc.akka.io/docs/akka/
>> current/additional/faq.html
>> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
>> ---
>> You received this message because you are subscribed to the Google Groups
>> "Akka User List" group.
>> To unsubscribe from this group and stop receiving emails from it, send an
>> email to akka-user+...@googlegroups.com.
>> To post to this group, send email to akka...@googlegroups.com.
>> Visit this group at http://groups.google.com/group/akka-user.
>> For more options, visit https://groups.google.com/d/optout.
>>
>>
>>
>>
>> --
>>
>> Patrik Nordwall
>> Typesafe <http://typesafe.com/> -  Reactive apps on th
>> ...
>
>  --
> >>>>>>>>>> Read the docs: http://akka.io/docs/
> >>>>>>>>>> Check the FAQ:
> http://doc.akka.io/docs/akka/current/additional/faq.html
> >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
> ---
> You received this message because you are subscribed to the Google Groups
> "Akka User List" group.
> To unsubscribe from this group and stop receiving emails from it, send an
> email to akka-user+unsubscr...@googlegroups.com.
> To post to this group, send email to akka-user@googlegroups.com.
> Visit this group at http://groups.google.com/group/akka-user.
> For more options, visit https://groups.google.com/d/optout.
>



-- 

Patrik Nordwall
Typesafe <http://typesafe.com/> -  Reactive apps on the JVM
Twitter: @patriknw

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to