Re: [akka-user] Re: Feedback on BackoffSupervisor
There is an alternative to not doing the context.stop(self) on persist failures, that I didn't follow up on in the discussion in the pull request; you can throw a special exception that inherits from Throwable (let's call this `PersistFailureException`) So, the motivation behind context.stop(self) on a persist failure is that you don't want to have to worry about the child actor stopping itself correctly (to guarantee safety). Additionally we also know that the actor can be restarted (without having to be unconditionally stopped), as long as it's done with an exponential backoff. Therefore by throwing `PersistFailureException` we can guarantee that by default, if the developer does not have a supervisor strategy that handles this special exception, it will bubble up all the way to the root guardian and stop the entire actor system. So now instead of the documentation saying that the Persistent Actor always shuts itself down, it will say: "Your SupervisorStrategy must handle the `PersistFailureException`, either by specifying a `Stop` directive on a normal supervisor, or by using the `Restart` directive under the `TransparentExponentialBackoffSupervisor`. Otherwise if left unhandled this exception will bubble all the way up to the root guardian and cause the ActorSystem to shutdown." On Thursday, December 17, 2015 at 3:18:47 AM UTC-8, Patrik Nordwall wrote: > > > > On Thu, Dec 17, 2015 at 11:58 AM, Raymond Roestenburg < > raymond.r...@gmail.com > wrote: > >> Hi Patrik, >> >> Henry Mai looped me in on the existence of >> the TransparentExponentialBackoffSupervisor in akka-contrib and PR >> https://github.com/akka/akka/pull/18776 (thanks Henry!). >> I find it confusing that there are two. As a user I would like one >> BackoffSupervisor, in akka.pattern, not two, one of which is in contrib (I >> know, users always want the impossible!). >> > > Me too. I tried, but failed. It was motivated by that they are different, > and should be different classes. > > >> >> This is a very common pattern that pops up all the time which deserves a >> spot in akka.pattern IMHO. >> Almost any time you're working with some network you can't just use the >> default supervisor stuff since it causes too fast reconnects, I think it >> should be part of akka-actor, not akka-contrib. >> >> Both have the issue I was referring to, so I could implement the fix in >> both now, sidestepping the issue of wanting just one version. >> >> But let me apply some logic, (subjective, possibly flawed :-) >> >> Are there any plans (or even possibility) to change the 'Stop behavior' >> in akka persistence? >> > > Why? We had flawed failure handling in Akka Persistence 2.3. We must > enforce stopping of the persistent actor when there are journal failures, > becuase the state must not diverge from what has actually been stored. > http://doc.akka.io/docs/akka/2.4.1/scala/persistence.html#Failures > > >> >> If it does not, I'm assuming everyone agrees that Stop is a good option >> to indicate failure (in certain scenarios), which I think means that there >> is a good reason to have options on one implementation. >> >> If Stop is not good to indicate failure, I would expect akka-persistence >> to change, which also means there is just one version of the >> BackoffSupervisor. >> >> But then, what happens if someone puts this in front of a remote actor, >> where deathwatch (i.e Stop) is the only option to indicate failure?... >> >> "Location transparency, anyone?" ;-) >> >> Cheers, >> Ray >> >> >> >> >> >> Cheers, >> Ray >> >> On Wednesday, December 16, 2015 at 9:30:43 PM UTC+2, Raymond Roestenburg >> wrote: >>> >>> Ok, I'll put a PR in for this. >>> >>> Cheers, >>> Ray >>> >>> On Wednesday, December 16, 2015 at 7:09:59 PM UTC+2, Patrik Nordwall >>> wrote: >>>> >>>> Sounds like a good improvement. PR is very welcome. >>>> >>>> Cheers, >>>> Patrik >>>> >>>> On Tue, Dec 15, 2015 at 1:31 PM, Raymond Roestenburg < >>>> raymond.r...@gmail.com> wrote: >>>> >>>>> BTW if you think this is a good idea, I would be happy to provide a PR >>>>> for it (since I can't move to 2.4.1 yet anyway and I'm back-porting it >>>>> right now). >>>>> >>>>> >>>>> On Tuesday, December 15, 2015 at 2:28:36 PM UTC+2, Raymond Roestenb
Re: [akka-user] Re: Feedback on BackoffSupervisor
That is not true. The default behavior to restart does not apply to custom exceptions that extend from Throwable. It will escalate custom exceptions that extend from Throwable. You can test this out yourself by creating a parent actor and a child actor within the parent actor and have it throw something extending from a Throwable. Here is an example that demonstrates that it escalates and causes the system to shutdown: Code: package somepackage import akka.actor._ class CustomException extends Throwable class B extends Actor { self ! "Throw" def receive: Receive = { case "Throw" => throw new CustomException } } class A extends Actor { val child = context.actorOf(Props(new B)) def receive: Receive = { case _ => } } object Main extends App { val system = ActorSystem() system.actorOf(Props(new A)) } Output: [info] Running somepackage.Main [ERROR] [12/17/2015 13:09:51.738] [default-akka.actor.default-dispatcher-3] [LocalActorRefProvider(akka://default)] guardian failed, shutting down system somepackage.CustomException at somepackage.B$$anonfun$receive$1.applyOrElse(A.scala:11) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at somepackage.B.aroundReceive(A.scala:7) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [ERROR] [12/17/2015 13:09:51.739] [default-akka.actor.default-dispatcher-3] [akka://default/user] null somepackage.CustomException at somepackage.B$$anonfun$receive$1.applyOrElse(A.scala:11) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at somepackage.B.aroundReceive(A.scala:7) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [success] Total time: 4 s, completed Dec 17, 2015 1:09:51 PM On Thursday, December 17, 2015 at 12:55:36 PM UTC-8, Patrik Nordwall wrote: > > > > On Thu, Dec 17, 2015 at 6:14 PM, Henry Mai > wrote: > >> There is an alternative to not doing the context.stop(self) on persist >> failures, that I didn't follow up on in the discussion in the pull request; >> you can throw a special exception that inherits from Throwable (let's call >> this `PersistFailureException`) >> > > That is almost what we had in 2.3, and that does not ensure that the actor > is stopped unconditionally. By default it will be restarted. > > >> >> So, the motivation behind context.stop(self) on a persist failure is that >> you don't want to have to worry about the child actor stopping itself >> correctly (to guarantee safety). Additionally we also know that the actor >> can be restarted (without having to be unconditionally stopped), as long as >> it's done with an exponential backoff. >> >> Therefore by throwing `PersistFailureException` we can guarantee that by >> default, if the developer does not have a supervisor strategy that handles >> this special exception, it will bubble up all the way to the root guardian >> and stop the entire actor system. >> > > That is not how default supervision works. By default the actor will be > restarted. > > >> So now instead of the documentation saying that the Persistent Actor >> always shuts itself down, it will say: >> "Your SupervisorStrategy must handle the `PersistFailureException`, >> either by specifying a `Stop` directive on a normal supervisor, or by using >> the `Restart` directive under the >> `TransparentExponentialBackoffSupervisor`. Otherwise if left unhandled this >> exception will bubble all the way up to the root guardian and cause the >> ActorSystem to shutdown." >> >> >> On Thursday, December 17, 2015 at 3:18:47 AM UTC-8, Patrik Nordwall
Re: [akka-user] Re: Feedback on BackoffSupervisor
> User can install supervision that catch and for example resume, which can result in that the in-memory state is different from what is actually stored. Sure, the user can catch all Throwables, but my statement was that it doesn't happen by default. > We have a solution that works. Why change? I'm not necessarily suggesting that we have to change it right now. This is more of a discussion/exploration of an alternative to overloading the meaning of stopping an actor as a failure for akka persistence. Roland mentioned that you guys are "making all lifecycle events (like child failure) normal messages in Akka Typed" (https://github.com/akka/akka/pull/18776#discussion_r43050078). I think also opens the path to an alternative to having the persistent actor stop itself (my assumption here is that supervisors in akka typed are going to be explicit actors rather than the parents being implicit supervisors). On Thursday, December 17, 2015 at 10:15:10 PM UTC-8, Patrik Nordwall wrote: > > Ah, I missed that you extended Throwable. > That will anyway not enforce that the actor is always stopped when there > are storage exceptions. User can install supervision that catch and for > example resume, which can result in that the in-memory state is different > from what is actually stored. > > We have a solution that works. Why change? I'm sorry that I'm not open for > more flexibility in this error handling, but it's no fun to take > responsibility for support questions about data corruption. > tors 17 dec. 2015 kl. 22:10 skrev Henry Mai >: > >> That is not true. >> The default behavior to restart does not apply to custom exceptions that >> extend from Throwable. It will escalate custom exceptions that extend from >> Throwable. >> You can test this out yourself by creating a parent actor and a child >> actor within the parent actor and have it throw something extending from a >> Throwable. >> >> Here is an example that demonstrates that it escalates and causes the >> system to shutdown: >> >> Code: >> package somepackage >> >> import akka.actor._ >> >> class CustomException extends Throwable >> >> class B extends Actor { >> self ! "Throw" >> >> def receive: Receive = { >> case "Throw" => throw new CustomException >> } >> } >> >> >> class A extends Actor { >> val child = context.actorOf(Props(new B)) >> >> def receive: Receive = { >> case _ => >> } >> } >> >> object Main extends App { >> val system = ActorSystem() >> system.actorOf(Props(new A)) >> } >> >> >> Output: >> [info] Running somepackage.Main >> [ERROR] [12/17/2015 13:09:51.738] >> [default-akka.actor.default-dispatcher-3] >> [LocalActorRefProvider(akka://default)] guardian failed, shutting down >> system >> somepackage.CustomException >> at somepackage.B$$anonfun$receive$1.applyOrElse(A.scala:11) >> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >> at somepackage.B.aroundReceive(A.scala:7) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >> at >> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >> at >> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >> >> [ERROR] [12/17/2015 13:09:51.739] >> [default-akka.actor.default-dispatcher-3] [akka://default/user] null >> somepackage.CustomException >> at somepackage.B$$anonfun$receive$1.applyOrElse(A.scala:11) >> at akka.actor.Actor$class.aroundReceive(Actor.scala:467) >> at somepackage.B.aroundReceive(A.scala:7) >> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) >> at akka.actor.ActorCell.invoke(ActorCell.scala:487) >> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) >> at akka.dispatch.Mailbox.run(Mailbox.scala:220) >> at >> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) >> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >> at >> scala.concurrent.forkjoin.ForkJoinPool$WorkQueu
Re: [akka-user] Feedback on BackoffSupervisor
> The only Actor that can supervise another Actor must be the one that has created it, anything else would be rather pointless because only the parent can react to the termination of its child Actor by re-creating it. I wasn't suggesting that something other than a parent supervising the child. To clarify what I'm saying, in untyped Akka right now all parents actors are automatically supervisors in addition to their defined actor behavior. I believe this is why the it led to the life cycle events being the way they are right now (not explicit messages). The explicit supervision that I'm talking about is the Erlang OTP model of supervision, where the supervisor, is an actor (still a parent actor) with a dedicated behavior for supervising children. So while you can still have Akka implicit supervision with lifecycle events as messages, you would end up with actors that are logically two separate actor behaviors (the actual behavior + the supervision behavior). > Dealing with failure and/or termination is an interesting question that I do not yet have a fully consistent and hardened answer to (which explains why Persistence does not yet exist for typed Actors). Akka Typed, while not purposefully, has some of the semantics changed to be closer to the Erlang style of doing things. For example, if a sender reference is required, it must be explicitly provided in the message, rather than implicitly provided by the system. Another example is having explicit messages for life cycle events. So what I'm saying, is that it may be useful to look at Erlang, which may/may not have for answers to these kinds of questions. On Friday, December 18, 2015 at 1:50:14 AM UTC-8, rkuhn wrote: > > Hi Henry, > > 18 dec 2015 kl. 07:29 skrev Henry Mai >: > > > User can install supervision that catch and for example resume, which > can result in that the in-memory state is different from what is actually > stored. > > Sure, the user can catch all Throwables, but my statement was that it > doesn't happen by default. > > > We have a solution that works. Why change? > > I'm not necessarily suggesting that we have to change it right now. This > is more of a discussion/exploration of an alternative to overloading the > meaning of stopping an actor as a failure for akka persistence. > Roland mentioned that you guys are "making all lifecycle events (like > child failure) normal messages in Akka Typed" ( > https://github.com/akka/akka/pull/18776#discussion_r43050078). > I think also opens the path to an alternative to having the persistent > actor stop itself (my assumption here is that supervisors in akka typed are > going to be explicit actors rather than the parents being implicit > supervisors). > > > The only Actor that can supervise another Actor must be the one that has > created it, anything else would be rather pointless because only the parent > can react to the termination of its child Actor by re-creating it. > > But Persistence in Akka Typed may indeed look totally different from the > current PersistentActor—not in spirit or semantics but in how it is > expressed in code. Dealing with failure and/or termination is an > interesting question that I do not yet have a fully consistent and hardened > answer to (which explains why Persistence does not yet exist for typed > Actors). > > Regards, > > Roland > > > > On Thursday, December 17, 2015 at 10:15:10 PM UTC-8, Patrik Nordwall wrote: > > Ah, I missed that you extended Throwable. > That will anyway not enforce that the actor is always stopped when there > are storage exceptions. User can install supervision that catch and for > example resume, which can result in that the in-memory state is different > from what is actually stored. > > We have a solution that works. Why change? I'm sorry that I'm not open for > more flexibility in this error handling, but it's no fun to take > responsibility for support questions about data corruption. > tors 17 dec. 2015 kl. 22:10 skrev Henry Mai : > > That is not true. > The default behavior to restart does not apply to custom exceptions that > extend from Throwable. It will escalate custom exceptions that extend from > Throwable. > You can test this out yourself by creating a parent actor and a child > actor within the parent actor and have it throw something extending from a > Throwable. > > Here is an example that demonstrates that it escalates and causes the > system to shutdown: > > Code: > package somepackage > > import akka.actor._ > > class CustomException extends Throwable > > class B extends Actor { > self ! "Throw" > > def receive: Receive = { > case "Throw"
[akka-user] Re: Akka webscoket client support - outgoing messages
I think you might be looking for something like this: https://github.com/playframework/playframework/blob/master/framework/src/play-streams/src/main/scala/play/api/libs/streams/ActorFlow.scala On Saturday, June 4, 2016 at 2:36:58 AM UTC-7, Marcelo Alves wrote: > > Hello, >I'm noob on Scala and Akka development. So, I need to implement a > websocket client communication and my dificuty are on outgoing messages. I > dont really know how to do that using akka ws-client support. My flow: > > val flow: Flow[Message, Message, Promise[Option[Message]]] = > Flow.fromSinkAndSourceMat( > sink, > Source.maybe[Message])(Keep.right) > > > Sink are ok! I can receive all message but now I need to implement the > outgoing message, anyone can help me on that ? > > > Thanks!! > > > -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: Using TestProbe to automate test by replying automatically just to ensure that the test goes
Defining a receive for TestProbe won't do anything. Instead take a look at Auto-Pilot for the behavior that you want: http://doc.akka.io/docs/akka/current/scala/testing.html#Auto-Pilot On Thursday, August 4, 2016 at 3:59:50 PM UTC-7, Maatary Okouya wrote: > > I am trying to get a test probe to reply with an acknowledgement, whenever > it receive any message . > > I wrote the following code in my test but it does not work: > > val chgtWriter = new TestProbe(system) { > > def receive: Receive = { > > case m => println("receive messagereplying with ACK"); sender() ! > ACK > > } > > } > > Is there a way to do that. The actor that is actually sending the message > to the test probe is definitely running on another thread than the > TestThread. Below you can see the full test as currently crafted. > > feature("The changeSetActor periodically fetch new change set following a > schedule") { > > > scenario("A ChangeSetActor fetch new changeset from a Fetcher Actor that > return a full and an empty ChangeSet"){ > > > Given("a ChangeSetActor with a schedule of fetching a message every 10 > seconds, a ChangeFetcher and a ChangeWriter") > > val chgtFetcher = TestProbe() > > val chgtWriter = new TestProbe(system) { > > def receive: Receive = { > > case m => println("receive message {} replying with ACK"); sender() ! > ACK > > } > > } > val fromTime = Instant.now().truncatedTo(ChronoUnit.SECONDS) > val chgtActor = system.actorOf(ChangeSetActor.props(chgtWriter.ref, > chgtFetcher.ref, fromTime)) > > When("all are started") > > > Then("The Change Fetcher should receive at least 3 messages from the > ChangeSetActor within 40 seconds") > > var changesetSNum = 1 > > val received = chgtFetcher.receiveWhile( 40 seconds) { > > case FetchNewChangeSet(m) => { > > println(s"received: FetchNewChangeSet(${m}") > > if (changesetSNum == 1) { > chgtFetcher.reply(NewChangeSet(changeSet1)) > changesetSNum += 1 > } > else > chgtFetcher.reply(NoAvailableChangeSet) > } > > } > > received.size should be (3) > } > > } > > The changeSetActor is fully tested and works. The test hang with the > ChangeWriter. It never receive a message in the receive method. > -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: Using TestProbe to automate test by replying automatically just to ensure that the test goes
> What is testActor ? There is no variable of that name? what does it represent ? testActor in the example is just some arbitrary actor that is created in the test code (not shown in the docs). It refers to the B actor in this sentence: "This code can be used to forward messages, e.g. in a chain A --> Probe --> B, as long as a certain protocol is obeyed." It is just for example purposes you don't actually need to use an additional actor. In your case (given the code you pasted before) you probably just want to do this: 1. val probe = TestProbe() 2. probe.setAutoPilot(new TestActor.AutoPilot { 3. def run(sender: ActorRef, msg: Any): TestActor.AutoPilot = 4. msg match { 5. case x ⇒ sender() ! ACK; TestActor.KeepRunning 6. } 7. }) > 1 - what is the current one referring to here: the next message or the auto-pilot ? To me, this sentence is just confusing, as much as the example. I believe i would need more help to understand it. "current one" is referring to the current auto-pilot. -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.