Re: [akka-user] Re: Streaming http call gives EntityStreamSizeException (2.0-M2)
FYI I created this issue: https://github.com/akka/akka/issues/19237 Any idea on: "One thing I do notice is that the CPU keeps running high whenever I kill 'curl'. Is there something I should do to close the stream? Suspending curl works fine though." -- >> Read the docs: http://akka.io/docs/ >> Check the FAQ: >> http://doc.akka.io/docs/akka/current/additional/faq.html >> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: Streaming http call gives EntityStreamSizeException (2.0-M2)
Same code with akka-streams 1.0 works fine. Is this a bug? Regards, Jeroen Op donderdag 17 december 2015 22:16:17 UTC+1 schreef Jeroen Gordijn: > > Hi all, > > I'm running into an EntityStreamSizeException when streaming data from a > streaming response I got by calling another endpoint.. It is a little bit > like presented in the talk by Mathias & Johannes at scalaworld: > https://www.youtube.com/watch?v=6VBn9V3S2aQ > > I'm using with akka-http 2.0-M2 and created my problem in isolation. See > the route (and link to full gist below). When I call `curl -i > http://localhost:8080/endless` <http://localhost:8080/endless> the stream > will continue indefinitely. However, when I call `curl -i > http://localhost:8080/pipe` <http://localhost:8080/pipe> it takes a few > seconds to get "curl: (56) Recv failure: Connection reset by peer" on the > client an the exception below on the server. The source below is just an > example to isolate the problem. > > Am I doing something wrong? I would expect an endless stream and no limit. > I'm using Chunked as stated in > http://doc.akka.io/docs/akka-stream-and-http-experimental/snapshot/scala/http/common/http-model.html#HttpEntity > > Thanks! > Jeroen > > val source: Source[Int, Unit] = Source(Stream.from(1)) > > val route = (path("endless") & get) { > complete { > HttpResponse( > entity = HttpEntity.Chunked( > MediaTypes.`text/plain`, > source.map(nr ⇒ ByteString((nr.toString * 10) + "\n", "UTF-8")) > ) > ) > } > } ~ > (path("pipe") & get) { > val s = Http().singleRequest(HttpRequest(uri = > "http://localhost:8080/endless;)).map { > _.entity.dataBytes > .via(Framing.delimiter(ByteString("\n"), > maximumFrameLength = 1, allowTruncation = true)) > .map(entry ⇒ entry.utf8String) > } > onSuccess(s) { x ⇒ > complete(HttpResponse( > entity = HttpEntity.Chunked( > MediaTypes.`text/plain`, > x.map(x ⇒ ByteString(x + "\n", "UTF-8") > ) > ))) > } > } > > > > *Full gist*: https://gist.github.com/jgordijn/390c9022062cfb9fce8c > > *Exception:* > [ERROR] [12/17/2015 22:06:10.493] [Test-akka.actor.default-dispatcher-4] > [ActorSystem(Test)] Outgoing request stream error > EntityStreamSizeException(8388608, None) > at > akka.http.scaladsl.model.HttpEntity$$anonfun$limitable$1$$anon$1.onPush(HttpEntity.scala:469) > at > akka.http.scaladsl.model.HttpEntity$$anonfun$limitable$1$$anon$1.onPush(HttpEntity.scala:451) > at > akka.stream.stage.AbstractStage$PushPullGraphLogic$$anon$1.onPush(Stage.scala:54) > at > akka.stream.impl.fusing.GraphInterpreter.processElement$1(GraphInterpreter.scala:535) > at > akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:546) > at > akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:509) > at > akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$runBatch(ActorGraphInterpreter.scala:408) > at > akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:346) > at akka.actor.Actor$class.aroundReceive(Actor.scala:467) > at > akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:292) > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) > at akka.actor.ActorCell.invoke(ActorCell.scala:487) > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) > at akka.dispatch.Mailbox.run(Mailbox.scala:220) > at > akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) > at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) > at > scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) > at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) > at > scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) > > [ERROR] [12/17/2015 22:06:10.491] [Test-akka.actor.default-dispatcher-9] > [ActorSystem(Test)] Outgoing response stream error > EntityStreamSizeException(8388608, None) > at > akka.http.scaladsl.model.HttpEntity$$anonfun$limitable$1$$anon$1.onPush(HttpEntity.scala:469) > at > akka.http.scaladsl.model.HttpEntity$$anonfun$limitable$1$$anon$1.onPush(HttpEntity.scala:451) > at > akka.stream.stage.AbstractStage$PushPullGraphLogic$$anon$1.onPush(Stage.scala:54) > at > akka.stream.impl.fusing.GraphInterpreter.processElement$1(GraphInterpreter.scala:535) > at &
Re: [akka-user] Re: Streaming http call gives EntityStreamSizeException (2.0-M2)
Hi Konrad, thanks for you answer. This explains a lot and makes sense. Configuring 'infinite' fixes my issue. The new error description makes it a lot easier. One thing I do notice is that the CPU keeps running high whenever I kill 'curl'. Is there something I should do to close the stream? Suspending curl works fine though. Thanks, Jeroen Op vrijdag 18 december 2015 14:17:47 UTC+1 schreef Konrad Malawski: > > It's a feature. (yes, really) :-) > > Allow me to explain; Akka HTTP always opts on the safe side of things. > For example, if you write an endpoint that can get POST data, and someone > (an attacker) sends you data and it never ends sending... > You do want to kill that connection as soon as you notice something fishy > is going on (i.e. perhaps an attack, someone sending loads of data > and you never expected them to). > > So the default is to play safe, and limit any HTTP Entity to 8M (which is > pretty huge anyway, if you think about it – we've heard > people say "whoa, that's a huge default!", but of course they'd trim it > down to a lower setting for production, suiting their needs). > > If you know that all calls you do will use streaming instead of > "accumulate that infinite stream as String" (you can do that, right), > you can disable this check by using the setting described here: > > http://doc.akka.io/docs/akka-stream-and-http-experimental/snapshot/scala/http/common/http-model.html#Limiting_message_entity_length > > So the value of: > akka.http.[server|client].parsing.max-content-length > (Depending if you want it for server or client). > > Last week I also improved that exception to be more self-explanatory, how > it looks like this ( https://github.com/akka/akka/pull/19158 ): > s"EntityStreamSizeException: actual entity size ($actualSize) exceeded > content length limit ($limit bytes)! " + > s"You can configure this by setting > `akka.http.[server|client].parsing.max-content-length` or calling > `HttpEntity.withSizeLimit` " + > s"before materializing the dataBytes stream." > > So your question will have been answered by the exception itself hopefully > :-) > > > I also strongly recommend you have a look at this workshop I did on Scala > Exchange a week ago: > > https://skillsmatter.com/skillscasts/6869-workshop-end-to-end-asynchronous-back-pressure-with-akka-streams > It goes into depth who and how these things work the way they work. > > Thanks for trying out the early 2.0 milestones, we'll be releasing a "2.0" > very soon, please upgrade to it then! :-) > Hope this helps, happy hakking! > > -- > Cheers, > Konrad 'ktoso’ Malawski > Akka <http://akka.io> @ Typesafe <http://typesafe.com> > > On 18 December 2015 at 14:02:30, Jeroen Gordijn (jeroen@gmail.com > ) wrote: > > Same code with akka-streams 1.0 works fine. > > Is this a bug? > > Regards, > Jeroen > > Op donderdag 17 december 2015 22:16:17 UTC+1 schreef Jeroen Gordijn: >> >> Hi all, >> >> I'm running into an EntityStreamSizeException when streaming data from a >> streaming response I got by calling another endpoint.. It is a little bit >> like presented in the talk by Mathias & Johannes at scalaworld: >> https://www.youtube.com/watch?v=6VBn9V3S2aQ >> >> I'm using with akka-http 2.0-M2 and created my problem in isolation. See >> the route (and link to full gist below). When I call `curl -i >> http://localhost:8080/endless` <http://localhost:8080/endless> the >> stream will continue indefinitely. However, when I call `curl -i >> http://localhost:8080/pipe` <http://localhost:8080/pipe> it takes a few >> seconds to get "curl: (56) Recv failure: Connection reset by peer" on the >> client an the exception below on the server. The source below is just an >> example to isolate the problem. >> >> Am I doing something wrong? I would expect an endless stream and no >> limit. I'm using Chunked as stated in >> http://doc.akka.io/docs/akka-stream-and-http-experimental/snapshot/scala/http/common/http-model.html#HttpEntity >> >> Thanks! >> Jeroen >> >> val source: Source[Int, Unit] = Source(Stream.from(1)) >> >> val route = (path("endless") & get) { >> complete { >> HttpResponse( >> entity = HttpEntity.Chunked( >> MediaTypes.`text/plain`, >> source.map(nr ⇒ ByteString((nr.toString * 10) + "\n", "UTF-8")) >> ) >> ) >> } >> } ~ >> (path("pipe") & get) { >> val s = Http().singleRequest(HttpReques
Re: [akka-user] Re: Streaming http call gives EntityStreamSizeException (2.0-M2)
BTW is there a way to set infinite on a specific entity? I set put this in my application conf "max-content-length = infinite", but it seems reasonable to do this only for the entities where it makes sense. WithSizeLimit takes a long and although I could use Long.MaxValue, it doesn't state infinite. --Jeroen Op vrijdag 18 december 2015 15:16:40 UTC+1 schreef Jeroen Gordijn: > > Hi Konrad, > > thanks for you answer. This explains a lot and makes sense. Configuring > 'infinite' fixes my issue. The new error description makes it a lot easier. > > One thing I do notice is that the CPU keeps running high whenever I kill > 'curl'. Is there something I should do to close the stream? Suspending curl > works fine though. > > Thanks, > Jeroen > > > Op vrijdag 18 december 2015 14:17:47 UTC+1 schreef Konrad Malawski: >> >> It's a feature. (yes, really) :-) >> >> Allow me to explain; Akka HTTP always opts on the safe side of things. >> For example, if you write an endpoint that can get POST data, and someone >> (an attacker) sends you data and it never ends sending... >> You do want to kill that connection as soon as you notice something fishy >> is going on (i.e. perhaps an attack, someone sending loads of data >> and you never expected them to). >> >> So the default is to play safe, and limit any HTTP Entity to 8M (which is >> pretty huge anyway, if you think about it – we've heard >> people say "whoa, that's a huge default!", but of course they'd trim it >> down to a lower setting for production, suiting their needs). >> >> If you know that all calls you do will use streaming instead of >> "accumulate that infinite stream as String" (you can do that, right), >> you can disable this check by using the setting described here: >> >> http://doc.akka.io/docs/akka-stream-and-http-experimental/snapshot/scala/http/common/http-model.html#Limiting_message_entity_length >> >> So the value of: >> akka.http.[server|client].parsing.max-content-length >> (Depending if you want it for server or client). >> >> Last week I also improved that exception to be more self-explanatory, how >> it looks like this ( https://github.com/akka/akka/pull/19158 ): >> s"EntityStreamSizeException: actual entity size ($actualSize) exceeded >> content length limit ($limit bytes)! " + >> s"You can configure this by setting >> `akka.http.[server|client].parsing.max-content-length` or calling >> `HttpEntity.withSizeLimit` " + >> s"before materializing the dataBytes stream." >> >> So your question will have been answered by the exception itself >> hopefully :-) >> >> >> I also strongly recommend you have a look at this workshop I did on Scala >> Exchange a week ago: >> >> https://skillsmatter.com/skillscasts/6869-workshop-end-to-end-asynchronous-back-pressure-with-akka-streams >> It goes into depth who and how these things work the way they work. >> >> Thanks for trying out the early 2.0 milestones, we'll be releasing a >> "2.0" very soon, please upgrade to it then! :-) >> Hope this helps, happy hakking! >> >> -- >> Cheers, >> Konrad 'ktoso’ Malawski >> Akka <http://akka.io> @ Typesafe <http://typesafe.com> >> >> On 18 December 2015 at 14:02:30, Jeroen Gordijn (jeroen@gmail.com) >> wrote: >> >> Same code with akka-streams 1.0 works fine. >> >> Is this a bug? >> >> Regards, >> Jeroen >> >> Op donderdag 17 december 2015 22:16:17 UTC+1 schreef Jeroen Gordijn: >>> >>> Hi all, >>> >>> I'm running into an EntityStreamSizeException when streaming data from a >>> streaming response I got by calling another endpoint.. It is a little bit >>> like presented in the talk by Mathias & Johannes at scalaworld: >>> https://www.youtube.com/watch?v=6VBn9V3S2aQ >>> >>> I'm using with akka-http 2.0-M2 and created my problem in isolation. See >>> the route (and link to full gist below). When I call `curl -i >>> http://localhost:8080/endless` <http://localhost:8080/endless> the >>> stream will continue indefinitely. However, when I call `curl -i >>> http://localhost:8080/pipe` <http://localhost:8080/pipe> it takes a few >>> seconds to get "curl: (56) Recv failure: Connection reset by peer" on the >>> client an the exception below on the server. The source below is just an >>> example to isolate the problem. >>> >>> Am I doing something
[akka-user] Streaming http call gives EntityStreamSizeException (2.0-M2)
Hi all, I'm running into an EntityStreamSizeException when streaming data from a streaming response I got by calling another endpoint.. It is a little bit like presented in the talk by Mathias & Johannes at scalaworld: https://www.youtube.com/watch?v=6VBn9V3S2aQ I'm using with akka-http 2.0-M2 and created my problem in isolation. See the route (and link to full gist below). When I call `curl -i http://localhost:8080/endless` the stream will continue indefinitely. However, when I call `curl -i http://localhost:8080/pipe` it takes a few seconds to get "curl: (56) Recv failure: Connection reset by peer" on the client an the exception below on the server. The source below is just an example to isolate the problem. Am I doing something wrong? I would expect an endless stream and no limit. I'm using Chunked as stated in http://doc.akka.io/docs/akka-stream-and-http-experimental/snapshot/scala/http/common/http-model.html#HttpEntity Thanks! Jeroen val source: Source[Int, Unit] = Source(Stream.from(1)) val route = (path("endless") & get) { complete { HttpResponse( entity = HttpEntity.Chunked( MediaTypes.`text/plain`, source.map(nr ⇒ ByteString((nr.toString * 10) + "\n", "UTF-8")) ) ) } } ~ (path("pipe") & get) { val s = Http().singleRequest(HttpRequest(uri = "http://localhost:8080/endless;)).map { _.entity.dataBytes .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = 1, allowTruncation = true)) .map(entry ⇒ entry.utf8String) } onSuccess(s) { x ⇒ complete(HttpResponse( entity = HttpEntity.Chunked( MediaTypes.`text/plain`, x.map(x ⇒ ByteString(x + "\n", "UTF-8") ) ))) } } *Full gist*: https://gist.github.com/jgordijn/390c9022062cfb9fce8c *Exception:* [ERROR] [12/17/2015 22:06:10.493] [Test-akka.actor.default-dispatcher-4] [ActorSystem(Test)] Outgoing request stream error EntityStreamSizeException(8388608, None) at akka.http.scaladsl.model.HttpEntity$$anonfun$limitable$1$$anon$1.onPush(HttpEntity.scala:469) at akka.http.scaladsl.model.HttpEntity$$anonfun$limitable$1$$anon$1.onPush(HttpEntity.scala:451) at akka.stream.stage.AbstractStage$PushPullGraphLogic$$anon$1.onPush(Stage.scala:54) at akka.stream.impl.fusing.GraphInterpreter.processElement$1(GraphInterpreter.scala:535) at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:546) at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:509) at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$runBatch(ActorGraphInterpreter.scala:408) at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:346) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:292) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) [ERROR] [12/17/2015 22:06:10.491] [Test-akka.actor.default-dispatcher-9] [ActorSystem(Test)] Outgoing response stream error EntityStreamSizeException(8388608, None) at akka.http.scaladsl.model.HttpEntity$$anonfun$limitable$1$$anon$1.onPush(HttpEntity.scala:469) at akka.http.scaladsl.model.HttpEntity$$anonfun$limitable$1$$anon$1.onPush(HttpEntity.scala:451) at akka.stream.stage.AbstractStage$PushPullGraphLogic$$anon$1.onPush(Stage.scala:54) at akka.stream.impl.fusing.GraphInterpreter.processElement$1(GraphInterpreter.scala:535) at akka.stream.impl.fusing.GraphInterpreter.processEvent(GraphInterpreter.scala:546) at akka.stream.impl.fusing.GraphInterpreter.execute(GraphInterpreter.scala:509) at akka.stream.impl.fusing.ActorGraphInterpreter.akka$stream$impl$fusing$ActorGraphInterpreter$$runBatch(ActorGraphInterpreter.scala:408) at akka.stream.impl.fusing.ActorGraphInterpreter$$anonfun$receive$1.applyOrElse(ActorGraphInterpreter.scala:346) at akka.actor.Actor$class.aroundReceive(Actor.scala:467) at akka.stream.impl.fusing.ActorGraphInterpreter.aroundReceive(ActorGraphInterpreter.scala:292) at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) at akka.actor.ActorCell.invoke(ActorCell.scala:487) at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) at akka.dispatch.Mailbox.run(Mailbox.scala:220) at
[akka-user] Re: ANNOUNCE: Akka Streams HTTP 1.0-RC3
Hi Roland, This is great! Kudo's for the good work. I wanted to get it going and noticed that http://akka.io/docs/ still mentions two seperate http artifacts for scala and java. It is not listing all available artifacts as mentioned on http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-RC3/scala/http/introduction.html (specifically: akka-http-spray-json-experimental, akka-http-spray-testkit-experimental, akka-http-spray-xml-experimental) Have a nice weekend, Jeroen Op vrijdag 22 mei 2015 17:43:47 UTC+2 schreef rkuhn: Dear hakkers, we—the Akka committers—are pleased to announce the third release candidate for Akka Streams HTTP. The most notable changes since 1.0-RC2 are: - we integrated the Scala and Java DSLs within combined artifacts again, since the separation was only superficial and did not justify the added build complexity; this entails no movement of classes between packages, we only reduced the number of produced artifacts - we added tons of documentation, mostly in HTTP - we added HTTPS integration for client and server (but no session renegotiation for now) - we smoothed some wrinkles and straightened out some kinks in the DSLs (made argument types less restrictive, added preStart/postStop hooks for Stages, …) - we updated the Akka dependency to 2.3.11 (which contains some important regression fixes) - and we fixed a few deficiencies here and there (most notably some Actor leaks and Reactive Streams compliance issues). For the full list please refer to the github milestones for Streams https://github.com/akka/akka/issues?q=milestone%3Astreams-1.0-RC3+is%3Aclosed and HTTP https://github.com/akka/akka/issues?q=milestone%3Ahttp-1.0-RC3+is%3Aclosed . We’d like to thank Philip L. McMahon, Alexander Golubev, Greg Methvin and 2beaucoup for their contributions! Please give it a spin! Happy hakking! -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Akka, Camel ActiveMQ oneway
Hi all, I want to place a message on a JMS Queue with Akka-Camel. This works just fine, however I want to be sure that the message was really placed on the queue. The queue is oneway, so I do not expect any response from another consumer. However, when I configure this in my code as oneway, the actor sending to the producer does not get any notification that the message was really delivered on the queue. ProducerSupport in Akka-Camel has the following code: protected def routeResponse(msg: Any): Unit = if (!oneway) sender() ! transformResponse(msg) This means that I do not get any response when oneway is configured. But the endpoint does give feedback whether the message was delivered and this is now lost. I solved it by putting the following code in my own producer: override protected def routeResponse(msg: Any): Unit = sender() ! transformResponse(msg) Is there any reason for the if statement? Can anything go wrong when the if statement is dropped in the akka-camel codebase? Thanks, Jeroen -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Resolve of unknown path to dead letters in Akka Persistence test
I'm seeing the same entries in my log, but everything is working just fine. I haven't looked into it yet, but I'm wondering what is the cause of this message. There are a lot of these kind of logging entries in my log and they appear on every startup of my app. Cheers, Jeroen Op woensdag 22 oktober 2014 09:06:16 UTC+2 schreef Patrik Nordwall: I don't think this is anything to worry about, but let us know if you find anything that is not working as expected. Regards, Patrik On Mon, Oct 20, 2014 at 6:59 PM, Richard Rodseth rrod...@gmail.com javascript: wrote: Yes, I'm using 2.3.6. I *think* the result is still correct, but haven't had time to verify. It also happens in an app that does nothing more than instantiate a persistent view object AuditPersistentActor extends App { println(Audit persistent actor app) implicit val system = ActorSystem(audit-persistent-actor-test) val view = system.actorOf(SamplePersistentView.props(id-1)) } [DEBUG] [10/20/2014 09:56:44.240] [main] [EventStream(akka://audit-persistent-actor-test)] logger log1-Logging$DefaultLogger started [DEBUG] [10/20/2014 09:56:44.241] [main] [EventStream(akka://audit-persistent-actor-test)] Default Loggers started [DEBUG] [10/20/2014 09:56:44.655] [audit-persistent-actor-test-akka.persistence.dispatchers.default-replay-dispatcher-9] [akka.serialization.Serialization(akka://audit-persistent-actor-test)] Using serializer[akka.persistence.serialization.MessageSerializer] for message [akka.persistence.PersistentRepr] [DEBUG] [10/20/2014 09:56:44.712] [audit-persistent-actor-test-akka.persistence.dispatchers.default-replay-dispatcher-9] [LocalActorRefProvider(akka://audit-persistent-actor-test)] resolve of unknown path [akka://create-persistent-actors-test/deadLetters] failed [DEBUG] [10/20/2014 09:56:44.716] [audit-persistent-actor-test-akka.persistence.dispatchers.default-replay-dispatcher-9] [LocalActorRefProvider(akka://audit-persistent-actor-test)] resolve of unknown path [akka://create-persistent-actors-test/deadLetters] failed [DEBUG] [10/20/2014 09:56:44.717] [audit-persistent-actor-test-akka.persistence.dispatchers.default-replay-dispatcher-9] [LocalActorRefProvider(akka://audit-persistent-actor-test)] resolve of unknown path [akka://create-persistent-actors-test/deadLetters] failed [DEBUG] [10/20/2014 09:56:44.717] [audit-persistent-actor-test-akka.actor.default-dispatcher-3] [akka://audit-persistent-actor-test/user/$a] Received from journal AggregateAdded(initial content) [DEBUG] [10/20/2014 09:56:44.717] [audit-persistent-actor-test-akka.actor.default-dispatcher-3] [akka://audit-persistent-actor-test/user/$a] Received from journal ContentChanged(Sat Oct 18 11:14:48 PDT 2014) [DEBUG] [10/20/2014 09:56:44.717] [audit-persistent-actor-test-akka.persistence.dispatchers.default-replay-dispatcher-9] [LocalActorRefProvider(akka://audit-persistent-actor-test)] resolve of unknown path [akka://create-persistent-actors-test/deadLetters] failed [DEBUG] [10/20/2014 09:56:44.717] [audit-persistent-actor-test-akka.actor.default-dispatcher-3] [akka://audit-persistent-actor-test/user/$a] Received from journal ContentChanged(Sat Oct 18 11:16:47 PDT 2014) [DEBUG] [10/20/2014 09:56:44.717] [audit-persistent-actor-test-akka.actor.default-dispatcher-3] [akka://audit-persistent-actor-test/user/$a] Received from journal ContentChanged(Sat Oct 18 11:25:44 PDT 2014) [DEBUG] [10/20/2014 09:56:44.718] [audit-persistent-actor-test-akka.persistence.dispatchers.default-replay-dispatcher-9] [LocalActorRefProvider(akka://audit-persistent-actor-test)] resolve of unknown path [akka://create-persistent-actors-test/deadLetters] failed [DEBUG] [10/20/2014 09:56:44.718] [audit-persistent-actor-test-akka.actor.default-dispatcher-3] [akka://audit-persistent-actor-test/user/$a] Received from journal ContentChanged(Sat Oct 18 11:27:14 PDT 2014) [DEBUG] [10/20/2014 09:56:44.718] [audit-persistent-actor-test-akka.persistence.dispatchers.default-replay-dispatcher-9] [LocalActorRefProvider(akka://audit-persistent-actor-test)] resolve of unknown path [akka://create-persistent-actors-test/deadLetters] failed [DEBUG] [10/20/2014 09:56:44.718] [audit-persistent-actor-test-akka.actor.default-dispatcher-3] [akka://audit-persistent-actor-test/user/$a] Received from journal ContentChanged(Sat Oct 18 11:27:42 PDT 2014) On Mon, Oct 20, 2014 at 12:59 AM, Patrik Nordwall patrik@gmail.com javascript: wrote: Hi Richard, I don't know from where the resolve of unknown path originates. I would guess it is something from deserialization. It is logged at debug level. Is the result still correct? Are you using Akka 2.3.6? Regards, Patrik On Sat, Oct 18, 2014 at 8:58 PM, Richard Rodseth rrod...@gmail.com javascript: wrote: I have a simple test of Akka persistence with LevelDB. First app creates 1 aggregates,
Re: [akka-user] Event versioning revisited
I thought I read/heard somewhere that Pickling is aiming to become the default serialization mechanism in Scala, but I cannot find any proof (like a SIP) for it (yet). Integrating Pickling in Spark is currently work in progress: https://groups.google.com/forum/#!msg/sbt-dev/7IpZrq79oz0/UunWL0GLYjsJ (see Heather Millers posts) -Jeroen Op woensdag 22 oktober 2014 18:10:28 UTC+2 schreef rrodseth: I'm hoping Typesafe will come up with a great solution. I noticed that Scala Pickling is making its way into Spark At the very end of this good talk, he mentions plans to use Pickling in Spark. http://functional.tv/post/9769999/scala-by-the-bay2014-matei-zaharia-next-generation-langu http://www.google.com/url?q=http%3A%2F%2Ffunctional.tv%2Fpost%2F9769999%2Fscala-by-the-bay2014-matei-zaharia-next-generation-langusa=Dsntz=1usg=AFQjCNGpo4CrFZFa7z2YEwiJI91Mvn7bNg and Heather Miller discusses it on Scalawags https://www.youtube.com/watch?v=uCpw9eBYbOw I also came across this ticket: https://github.com/scala/pickling/issues/39 On Tue, Oct 21, 2014 at 5:17 PM, Richard Rodseth rrod...@gmail.com javascript: wrote: Thanks for the input. I wonder if anyone has tried Scala Pickling. At first glance, it seems like it would be a great solution. On Tue, Oct 21, 2014 at 1:47 PM, Jeroen Gordijn jeroen@gmail.com javascript: wrote: Hi, I'm not really sure yet about the best solution. Currently using json. It starts out being really straight forward when using spray-json. Just create the formatters. But the moment you have to deal with evolution, you have to deal with the Json AST and you loose all help from your compiler. I'm looking into a Protobuf implementation of the serializer. It will give the overhead of having to specify a definition of each persistable message in a proto file, but I guess it will help a lot with evolution. I found that I have to wrap my messages in a general envelope to indicate which message was serialized. I could not use the Akka Persistence default manifest as it indicates which Domain Class was persisted as opposed to the protobuff class. Cheers, Jeroen Op dinsdag 21 oktober 2014 17:31:32 UTC+2 schreef rrodseth: Thanks for the references, Patrik. I hope others will share their experiences. On Tue, Oct 21, 2014 at 4:57 AM, Patrik Nordwall patrik@gmail.com wrote: Hi Richard, I'm also interested in what others are currently using. On Sun, Oct 19, 2014 at 3:35 AM, Richard Rodseth rrod...@gmail.com wrote: I'm hoping to build a case for using Akka Persistence, and expect the event schema evolution to be a sticking point. People are quite used to their database migration scripts. I've seen the threads on this list, but I'm wondering what the current thinking is on the least painful option, and would also like pointers to sample code. I have not heard of any new ideas. Migration and data evolution can be implemented in a custom serializer. Do Protobuf, Avro and Thrift all require an IDL compiler? Protobuf and Thrift, yes. I'm not sure about Avro. I think it can be used without generated code. Is Cap'n Proto a viable option? Yes, it looks interesting, especially for high performance, but I have no experience of it. In same category you also have SBE and FlatBuffers. Here is a nice comparison https://kentonv.github.io/capnproto/news/2014-06-17-capnproto-flatbuffers-sbe.html . Would something based on Scala macros provide a way to avoid an extra compilation? Scala Pickling https://github.com/scala/pickling. I don't have much experience of it, and I'm uncertain of how mature it is. Anything on the horizon I should know about? JSON is well known and has the nice property of being schema-less. CBOR https://www.rfc-editor.org/info/rfc7049 has similar characteristics but has the advantages of a binary format. I have played with the jackson-dataformat-cbor https://github.com/FasterXML/jackson-dataformat-cbor tool and here is an example of an Akka serializer https://github.com/typesafehub/activator-akka-cluster-sharding-scala/pull/8/files. That sample is using a low level API, with all the advantages of full control and possibilities of data migration. The drawback is of course the amount of hand written mapping code. I expect that it will be possible to also use other high level tools with CBOR, such as Jackson Databinding and Play-json https://www.playframework.com/documentation/2.3.x/ScalaJson. Cheers, Patrik Thanks. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user
Re: [akka-user] AtLeastOnceDelivery and crash on recovery
I created a fix in this pull request: https://github.com/akka/akka/pull/16117 Cheers Jeroen Op maandag 20 oktober 2014 18:09:59 UTC+2 schreef Patrik Nordwall: Thanks! On Mon, Oct 20, 2014 at 9:30 AM, Jeroen Gordijn jeroen@gmail.com javascript: wrote: Done: https://github.com/akka/akka/issues/16104 I will look into creating a fix for it. Cheers, Jeroen Op zaterdag 18 oktober 2014 16:55:50 UTC+2 schreef Patrik Nordwall: Hi Jeroen, Sounds like a bug to me. Please create a github issue. Thanks for the sample. If you know how to fix it a pull request is welcome. /Patrik 18 okt 2014 kl. 09:12 skrev Jeroen Gordijn jeroen@gmail.com: Hi Konrad, Thanks for your reply. I am aware of the confirmDelivery method. I left it out, because the crash prevents us from reaching it anyway. The case here is that the at least once delivery sends the message even when the actor crashes. Something which I didn't expect. I am sending a message to a Camel actor which sends an email (unfortunately the service I'm calling isn't idempotent). The question is: why does atLeastOnceDelivery send unconfirmed messages even when the actor crashes and the supervisor strategy is to stop the actor. Or when restart strategy is to restart 10x, the message is sent 10x. Thanks, Jeroen Op vrijdag 17 oktober 2014 16:47:58 UTC+2 schreef Konrad Malawski: Hi Jeroen, Thanks for preparing the example, it made checking what the problem is very simple :-) Try searching for `confirmDelivery` in your code snippet. It's not there. And with no confirmation, there is no information on in the sending-actor that the target has really received the message. The interaction MUST be such that: [bob][alice] m(1) -X # message lost! m(1) -- got m(1) # message got here X confirm 1 # confirmation lost! # re-send triggers m(1) --- got m(1) # oh, I know it already! confirmDelivery(1) - confirm 1 # confirmation got to sender At least once delivery semantics imply two things: 1) the receiver must be ready to receive the message multiple times (should simply ignore if it knows it already, or the messages should be idempotent). 2) the sender must retry sending a message until it gets a confirmation back from the receiver In your case you forgot the 2) rule. AtLeastOnceDelivery has helpers for this, in form of: confirmDelivery. Please read the example in the docs in detail: http://doc.akka.io/docs/akka/snapshot/scala/persistence. html#at-least-once-delivery I would also highly recommend watching my or Endre's talks which are * without past or present by Endre http://letitcrash.com/post/ 83823246588/without-past-and-present * distributed consensus aka. what do we eat for lunch by me http://letitcrash.com/post/98879157982/introduction-to- distributed-consensus-by-konrad They have videos and slides, I hope you'll enjoy it (great watch right in time for a friday evening?)! Happy hakking! -- Cheers, Konrad 'ktoso' Malawski hAkker @ Typesafe -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from
Re: [akka-user] Event versioning revisited
Hi, I'm not really sure yet about the best solution. Currently using json. It starts out being really straight forward when using spray-json. Just create the formatters. But the moment you have to deal with evolution, you have to deal with the Json AST and you loose all help from your compiler. I'm looking into a Protobuf implementation of the serializer. It will give the overhead of having to specify a definition of each persistable message in a proto file, but I guess it will help a lot with evolution. I found that I have to wrap my messages in a general envelope to indicate which message was serialized. I could not use the Akka Persistence default manifest as it indicates which Domain Class was persisted as opposed to the protobuff class. Cheers, Jeroen Op dinsdag 21 oktober 2014 17:31:32 UTC+2 schreef rrodseth: Thanks for the references, Patrik. I hope others will share their experiences. On Tue, Oct 21, 2014 at 4:57 AM, Patrik Nordwall patrik@gmail.com javascript: wrote: Hi Richard, I'm also interested in what others are currently using. On Sun, Oct 19, 2014 at 3:35 AM, Richard Rodseth rrod...@gmail.com javascript: wrote: I'm hoping to build a case for using Akka Persistence, and expect the event schema evolution to be a sticking point. People are quite used to their database migration scripts. I've seen the threads on this list, but I'm wondering what the current thinking is on the least painful option, and would also like pointers to sample code. I have not heard of any new ideas. Migration and data evolution can be implemented in a custom serializer. Do Protobuf, Avro and Thrift all require an IDL compiler? Protobuf and Thrift, yes. I'm not sure about Avro. I think it can be used without generated code. Is Cap'n Proto a viable option? Yes, it looks interesting, especially for high performance, but I have no experience of it. In same category you also have SBE and FlatBuffers. Here is a nice comparison https://kentonv.github.io/capnproto/news/2014-06-17-capnproto-flatbuffers-sbe.html . Would something based on Scala macros provide a way to avoid an extra compilation? Scala Pickling https://github.com/scala/pickling. I don't have much experience of it, and I'm uncertain of how mature it is. Anything on the horizon I should know about? JSON is well known and has the nice property of being schema-less. CBOR https://www.rfc-editor.org/info/rfc7049 has similar characteristics but has the advantages of a binary format. I have played with the jackson-dataformat-cbor https://github.com/FasterXML/jackson-dataformat-cbor tool and here is an example of an Akka serializer https://github.com/typesafehub/activator-akka-cluster-sharding-scala/pull/8/files. That sample is using a low level API, with all the advantages of full control and possibilities of data migration. The drawback is of course the amount of hand written mapping code. I expect that it will be possible to also use other high level tools with CBOR, such as Jackson Databinding and Play-json https://www.playframework.com/documentation/2.3.x/ScalaJson. Cheers, Patrik Thanks. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To
Re: [akka-user] AtLeastOnceDelivery and crash on recovery
Done: https://github.com/akka/akka/issues/16104 I will look into creating a fix for it. Cheers, Jeroen Op zaterdag 18 oktober 2014 16:55:50 UTC+2 schreef Patrik Nordwall: Hi Jeroen, Sounds like a bug to me. Please create a github issue. Thanks for the sample. If you know how to fix it a pull request is welcome. /Patrik 18 okt 2014 kl. 09:12 skrev Jeroen Gordijn jeroen@gmail.com javascript:: Hi Konrad, Thanks for your reply. I am aware of the confirmDelivery method. I left it out, because the crash prevents us from reaching it anyway. The case here is that the at least once delivery sends the message even when the actor crashes. Something which I didn't expect. I am sending a message to a Camel actor which sends an email (unfortunately the service I'm calling isn't idempotent). The question is: why does atLeastOnceDelivery send unconfirmed messages even when the actor crashes and the supervisor strategy is to stop the actor. Or when restart strategy is to restart 10x, the message is sent 10x. Thanks, Jeroen Op vrijdag 17 oktober 2014 16:47:58 UTC+2 schreef Konrad Malawski: Hi Jeroen, Thanks for preparing the example, it made checking what the problem is very simple :-) Try searching for `confirmDelivery` in your code snippet. It's not there. And with no confirmation, there is no information on in the sending-actor that the target has really received the message. The interaction MUST be such that: [bob][alice] m(1) -X # message lost! m(1) -- got m(1) # message got here X confirm 1 # confirmation lost! # re-send triggers m(1) --- got m(1) # oh, I know it already! confirmDelivery(1) - confirm 1 # confirmation got to sender At least once delivery semantics imply two things: 1) the receiver must be ready to receive the message multiple times (should simply ignore if it knows it already, or the messages should be idempotent). 2) the sender must retry sending a message until it gets a confirmation back from the receiver In your case you forgot the 2) rule. AtLeastOnceDelivery has helpers for this, in form of: confirmDelivery. Please read the example in the docs in detail: http://doc.akka.io/docs/akka/snapshot/scala/persistence.html#at-least-once-delivery I would also highly recommend watching my or Endre's talks which are * without past or present by Endre http://letitcrash.com/post/83823246588/without-past-and-present * distributed consensus aka. what do we eat for lunch by me http://letitcrash.com/post/98879157982/introduction-to-distributed-consensus-by-konrad They have videos and slides, I hope you'll enjoy it (great watch right in time for a friday evening?)! Happy hakking! -- Cheers, Konrad 'ktoso' Malawski hAkker @ Typesafe -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] AtLeastOnceDelivery and crash on recovery
Hi Konrad, Thanks for your reply. I am aware of the confirmDelivery method. I left it out, because the crash prevents us from reaching it anyway. The case here is that the at least once delivery sends the message even when the actor crashes. Something which I didn't expect. I am sending a message to a Camel actor which sends an email (unfortunately the service I'm calling isn't idempotent). The question is: why does atLeastOnceDelivery send unconfirmed messages even when the actor crashes and the supervisor strategy is to stop the actor. Or when restart strategy is to restart 10x, the message is sent 10x. Thanks, Jeroen Op vrijdag 17 oktober 2014 16:47:58 UTC+2 schreef Konrad Malawski: Hi Jeroen, Thanks for preparing the example, it made checking what the problem is very simple :-) Try searching for `confirmDelivery` in your code snippet. It's not there. And with no confirmation, there is no information on in the sending-actor that the target has really received the message. The interaction MUST be such that: [bob][alice] m(1) -X # message lost! m(1) -- got m(1) # message got here X confirm 1 # confirmation lost! # re-send triggers m(1) --- got m(1) # oh, I know it already! confirmDelivery(1) - confirm 1 # confirmation got to sender At least once delivery semantics imply two things: 1) the receiver must be ready to receive the message multiple times (should simply ignore if it knows it already, or the messages should be idempotent). 2) the sender must retry sending a message until it gets a confirmation back from the receiver In your case you forgot the 2) rule. AtLeastOnceDelivery has helpers for this, in form of: confirmDelivery. Please read the example in the docs in detail: http://doc.akka.io/docs/akka/snapshot/scala/persistence.html#at-least-once-delivery I would also highly recommend watching my or Endre's talks which are * without past or present by Endre http://letitcrash.com/post/83823246588/without-past-and-present * distributed consensus aka. what do we eat for lunch by me http://letitcrash.com/post/98879157982/introduction-to-distributed-consensus-by-konrad They have videos and slides, I hope you'll enjoy it (great watch right in time for a friday evening?)! Happy hakking! -- Cheers, Konrad 'ktoso' Malawski hAkker @ Typesafe -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] AtLeastOnceDelivery and crash on recovery
Hi, I stumbled upon behaviour of AtLeastOnceDelivery which I do not understand. When my actor is started and I call deliver in receiveRecover to make sure that my message will eventually be delivered at least once. Because of an error the actor throws an exception on a next event. This leads to a few restarts and then my actor stops. I expected that the AtLeastOnceDelivery postpones sending messages until the actor is completely started. However, on every restart the actor sends messages to the destination. After looking through the persistence code I noticed that Recovery catches the exception and keeps receiving all stored events (but not processing it in the actor). When all stored events are received the actor sends the exception to itself to throw it. But the AtLeastOnceDelivery has the following code: override protected[akka] def aroundReceive(receive: Receive, message: Any): Unit = message match { case ReplayMessagesSuccess ⇒ redeliverOverdue() super.aroundReceive(receive, message) // } ReplayMessagesSuccess is received before the exception and leads to sending all pending deliveries. On the next message the actor is restarted and above process plays again. Why does AtLeastOnceDelivery always send the pending deliveries when the journal is Replayed successfully, even when replaying the actual message lead to an exception leading to a restart or stop of the actor? Wouldn't it make more sense to send a RedeliveryTick so self to start the redelivery? This makes sure that delivery will only start when the actor is successfully started. I attached a test sample below to show the exact problem I'm facing. Cheers, Jeroen import akka.actor._ import akka.actor.SupervisorStrategy.{ Stop, Escalate } import akka.event.LoggingReceive import akka.persistence.{ AtLeastOnceDelivery, PersistentActor } import akka.testkit.{ TestKit, TestProbe, ImplicitSender } import org.scalatest.WordSpecLike class SuperVisor(testProbe: ActorRef) extends Actor { import scala.concurrent.duration._ override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 10 seconds) { case _: IllegalStateException ⇒ Stop case t⇒ super.supervisorStrategy.decider. applyOrElse(t, (_: Any) ⇒ Escalate) } val crashingActor = context.actorOf(Props(new CrashingActor(testProbe)), CrashingActor) def receive: Receive = LoggingReceive { case msg ⇒ crashingActor forward msg } } class WrappingActor(testProbe: ActorRef) extends Actor with ActorLogging { def receive = LoggingReceive { case msg ⇒ log.debug(RECEIVED -- + msg) testProbe forward msg } } object CrashingActor { case object Message case object CrashMessage case class SendingMessage(deliveryId: Long, recovering: Boolean) } class CrashingActor(testProbe: ActorRef) extends PersistentActor with AtLeastOnceDelivery with ActorLogging { import CrashingActor._ override def receive = LoggingReceive { case x ⇒ log.warning(RECEIVED: + x) super.receive(x) } override def receiveRecover: Receive = LoggingReceive { case Message ⇒ send() case CrashMessage ⇒ log.error(Crash it) throw new IllegalStateException(Intentionally crashed) case x ⇒ log.warning(RECOVER MESSAGE: + x) } override def preRestart(reason: Throwable, message: Option[Any]): Unit = { log.error(reason, RESTARTING + message) } override def receiveCommand: Receive = LoggingReceive { case Message ⇒ persist(Message)(_ ⇒ send()) case CrashMessage ⇒ persist(CrashMessage) { evt ⇒ } } def send() = { log.debug(SENDING + recoveryRunning) deliver(testProbe.path, { id ⇒ SendingMessage(id, false) }) } } class CrashIssueSpec extends TestKit(ActorSystem(tst)) with WordSpecLike with ImplicitSender { At least once delivery should { not send on reboot in { val testProbe = TestProbe() val testProbeWrapper = system.actorOf(Props(new WrappingActor( testProbe.ref)), testProbe) val superVisor = system.actorOf(Props(new SuperVisor(testProbeWrapper )), supervisor) superVisor ! CrashingActor.Message testProbe.expectMsgType[CrashingActor.SendingMessage] superVisor ! CrashingActor.CrashMessage val deathProbe = TestProbe() deathProbe.watch(superVisor) superVisor ! PoisonPill deathProbe.expectTerminated(superVisor) testProbe.expectNoMsg() system.actorOf(Props(new SuperVisor(testProbeWrapper)), supervisor) testProbe.expectNoMsg() } } } -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to
Re: [akka-user] PersistentActor initialization best practices
Hi Sinan, The name of the actor that is created in the cluster is stable and can be used as persistenceId, In the Example the name of parent concatenated with the entityId is used. You could as wel create the id yourself like: 1. def persistenceId = orders- + self.path.name This way you know the persistenceId and you re-create the actor in the same way as you had created it in the first place. HTH Cheers, Jeroen Op dinsdag 23 september 2014 09:26:52 UTC+2 schreef sinel: Hi, Even though it has been about a month since the last comment in this thread, I will add my question here since I think it is a direct continuation of this discussion. I want to follow up on Jeroen’s comment. I am facing the same problem of figuring out how pass instance-specific props to my persistent actor using cluster sharding. I understand the solution of initializing it with a command, and this would work in the situation of the given AuctionActor example where secondary properties are set, but how about the situation where you would like to set an instance-specific persistence id? In my case, I am trying to implement a domain actor representing an entity with its own unique identifier. I would like to use the same identifier for persistence (or a hybrid including the entity id) so that I can easily trace events related to this actor in the event logs and conveniently reclaim the actor through the entity id which I would always know. While using persistence without cluster sharding, passing an identifier via props and assigning it to persistence id worked fine. However, with cluster sharding, I admit I am confused on how to implement this functionality. Using an initialize command won’t work, since I cannot change the persistence id after the fact. I have looked at the akka-cluster-sharding template with its post example, but this hasn’t cleared my confusion. a) It looks like separate ids are used for the post entity vs persistence, but I assume that this is just a preference of the template and not a requirement. Since the id extractor recovers the post id, how does the cluster know which persistent actor to use? Looking at the code, the persistence id is totally different and I cannot see how the persistence id would be retrieved from the post id, which is a uuid in the test spec. b) I also cannot see how the persistence id generated by the combination of self.path.parent.name and self.path.name can be unique for each post actor. Wouldn’t the parent be the same for most if not all requests? On the other hand, if there is a single post actor serving all incoming requests, I admit I cannot understand the become logic within the actor since it looks like it would accept only one AddPost command. How does it handle concurrent add post requests? c) Whichever way sharding is managing this internally, I cannot see how I could reclaim a post actor outside of sharding. For example, if I would want to unit test the post actor and I would want to check that it recovers its state probably. Thus I would create an instance, then delete it, but how can I then reclaim the same actor to test recovery? Most probably, I am missing something fundamental regarding how persistent actors are supposed to be used with sharding, and the template hasn’t helped clear my confusion. Any help/guidance would be greatly appreciated. Thanks, Sinan On Tuesday, August 19, 2014 12:01:04 AM UTC+3, Jeroen Gordijn wrote: Hi, I guess that a downside of 3 is that it makes it less suitable to use in ClusterSharding (in ClusterSharding you cannot pass instance specific props to an actor). So my preference (especially with domain actors) is to create the actor and initialize it with a command. All domain aggregates should react to commands, so initializing with an event makes the design more consistent IMHO. Cheers, Jeroen -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Re: Help with Akka persistence and dead letters
Hi M., Your persistenceId should be unique per request or per photo. There should only be one instance of an actor holding the same persistenceId. Otherwise there are multiple threads writing and reading from the same store. Every time you instantiate a new actor it will replay the journal and resend the first photo. After that, the actor stops and all other messages go to dead letters. What are your requirements? What do you want to make resilient. Your current direction you're heading won't make it resilient, but it will store every request you receive. BTW: receiveRecover should have no side effects. You are sending to S3 on every recovery. Cheers, Jeroen Op dinsdag 23 september 2014 14:37:04 UTC+2 schreef mat...@wondr.it: Hi, I'm trying to write a resilient photo uploading system using Akka persistence, but I'm having some issues. I have a supervisor called *PhotoUploadSupervisor* that spawns a child actor when I need to upload a photo. class PhotoUploadSupervisor extends Actor with ActorLogging { import context._ def newChild = actorOf(Props[PhotoUploadActor]) override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 10, withinTimeRange = 1 minute) { case _: IOException = Restart case _ = Escalate } def receive = { case postPhoto: POSTPhoto = newChild ! UploadPhoto(postPhoto) } } *POSTPhoto *is a simple case class: case class POSTPhoto( blob: String, ) { require(blob.nonEmpty) } This is the actor: sealed trait Command case class UploadPhoto(data: POSTPhoto) extends Command sealed trait Event case class StateChanged(data: POSTPhoto) extends Event class PhotoUploadActor extends PersistentActor with ActorLogging { override def persistenceId = photoUploadActor var state = POSTPhoto(fakeBlob) def updateState(evt: StateChanged) = { state = POSTPhoto(evt.data.blob) } val receiveRecover: Receive = { case evt: StateChanged = updateState(evt) uploadToS3() } val receiveCommand: Receive = { case UploadPhoto(data) = persist(StateChanged(data)) { changed = updateState(changed) uploadToS3() } } private def uploadToS3() = { println(fUploading photo in actor ${self.path}) /*Photo upload code goes here*/ /*It might throw an IOException*/ context.stop(self) } } I want the actor to upload the photo and then stop. The supervisor should spawn one child per photo upload request. It looks like it's working, but I get a bunch of dead letters: [INFO] [09/23/2014 13:33:21.587] [on-spray-can-akka.actor.default-dispatcher-2] [akka://on-spray-can/user/$a/$a] Message [akka.persistence.JournalProtocol$ReplayedMessage] from Actor[akka://on-spray-can/user/$a#-494389522] to Actor[akka://on-spray-can/user/$a/$a#1751213071] was not delivered. [1] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [09/23/2014 13:33:21.587] [on-spray-can-akka.actor.default-dispatcher-2] [akka://on-spray-can/user/$a/$a] Message [akka.persistence.JournalProtocol$ReplayedMessage] from Actor[akka://on-spray-can/user/$a#-494389522] to Actor[akka://on-spray-can/user/$a/$a#1751213071] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [09/23/2014 13:33:21.587] [on-spray-can-akka.actor.default-dispatcher-2] [akka://on-spray-can/user/$a/$a] Message [akka.persistence.JournalProtocol$ReplayedMessage] from Actor[akka://on-spray-can/user/$a#644188776] to Actor[akka://on-spray-can/user/$a/$a#1751213071] was not delivered. [3] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [09/23/2014 13:33:21.587] [on-spray-can-akka.actor.default-dispatcher-2] [akka://on-spray-can/user/$a/$a] Message [akka.persistence.JournalProtocol$ReplayedMessage] from Actor[akka://on-spray-can/user/$a#1549609203] to Actor[akka://on-spray-can/user/$a/$a#1751213071] was not delivered. [4] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. [INFO] [09/23/2014 13:33:21.587] [on-spray-can-akka.actor.default-dispatcher-2] [akka://on-spray-can/user/$a/$a] Message [akka.persistence.JournalProtocol$ReplayedMessage] from Actor[akka://on-spray-can/user/$a#1549609203] to Actor[akka://on-spray-can/user/$a/$a#1751213071] was not delivered. [5] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and
Re: [akka-user] Question about: Make entries of shards persistent
Done: https://github.com/akka/akka/issues/15797 Cheers, Jeroen Op maandag 1 september 2014 08:34:12 UTC+2 schreef Patrik Nordwall: Jeroen, could you please create an issue https://github.com/akka/akka/issues for this, so it is not lost. Thanks, Patrik On Fri, Aug 22, 2014 at 10:16 PM, Jeroen Gordijn jeroen@gmail.com javascript: wrote: Hi Patrik, Making remembered an explicit choice forces people to think about it. It's a good start. In the future this needs a solution, as sharding is typically used in situations where the solution does not fit on one machine. When someone reaches that point without a solution, it will be a big problem. A first solution that comes to mind is using a lower bound number of nodes to be up in the cluster to eagerly startup. This makes it possible to restart your complete solutions when the active entities no longer fit on one machine. Do you (or anybody else) now of any solutions that are used with other sharding technology? Do they read into memory? The point with Akka is that actors are living things and can have behaviour in time. So they need to be active in memory. Probably something other sharding technologies do not have to deal with and where balancing data is more important than behaviour. Cheers, Jeroen Op donderdag 21 augustus 2014 10:25:14 UTC+2 schreef Patrik Nordwall: Hi Jeroen, This is an excellent question and it might need a solution, but first of all, making entries remembered and eagerly started is something you have to specify per entry type. Not all entries are remembered by default. It is intended for those entries that need this, i.e. those that are active without input from external messages. Cheers, Patrik On Wed, Aug 20, 2014 at 10:39 PM, Jeroen Gordijn jeroen@gmail.com wrote: Hi, I saw this tweet by Patrik: 'Fantastic #Akka https://twitter.com/hashtag/Akka?src=hash contribution by Dominic Black merged to master. Cluster sharding remembers active entries. https://github.com/akka/akka/pull/15597 … https://t.co/P32NbQQokG' I think it is a really cool feature to be able to reactivate active entities in a cluster when rebalancing occurs. So thanks to Dominic for the great work! I will benefit a lot from this feature. However I am wondering about fault tolerance. One would typically use sharding when the actors doesn't fit on one machine (CPU or Memory) right? So restarting the sharded entries eagerly could end up in a software solution that will be unable to restart again, or will tear down the whole cluster when rebalancing. I guess we need some strategy to prevent this eager restarting when there are not enough (how do we now what is enough) nodes in the cluster. This will keep the cluster alive. How do the NoSQL Databases that support sharding do this? Is there already a strategy to prevent eager restarting to make this feature fault tolerant? Cheers, Jeroen -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit
[akka-user] Re: Akka FSM unhandled event PersistentImpl(PrizeCreated(1,T119)) in state Oldest, where does the state come from?
Hi Dennis, FSM does not work with PersistentActor at this moment: https://github.com/akka/akka/issues/15279 Cheers, Jeroen Op maandag 25 augustus 2014 22:17:16 UTC+2 schreef Dennis Vriend: Hi, I get the following warning: -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Question about: Make entries of shards persistent
Hi Patrik, Making remembered an explicit choice forces people to think about it. It's a good start. In the future this needs a solution, as sharding is typically used in situations where the solution does not fit on one machine. When someone reaches that point without a solution, it will be a big problem. A first solution that comes to mind is using a lower bound number of nodes to be up in the cluster to eagerly startup. This makes it possible to restart your complete solutions when the active entities no longer fit on one machine. Do you (or anybody else) now of any solutions that are used with other sharding technology? Do they read into memory? The point with Akka is that actors are living things and can have behaviour in time. So they need to be active in memory. Probably something other sharding technologies do not have to deal with and where balancing data is more important than behaviour. Cheers, Jeroen Op donderdag 21 augustus 2014 10:25:14 UTC+2 schreef Patrik Nordwall: Hi Jeroen, This is an excellent question and it might need a solution, but first of all, making entries remembered and eagerly started is something you have to specify per entry type. Not all entries are remembered by default. It is intended for those entries that need this, i.e. those that are active without input from external messages. Cheers, Patrik On Wed, Aug 20, 2014 at 10:39 PM, Jeroen Gordijn jeroen@gmail.com javascript: wrote: Hi, I saw this tweet by Patrik: 'Fantastic #Akka https://twitter.com/hashtag/Akka?src=hash contribution by Dominic Black merged to master. Cluster sharding remembers active entries. https://github.com/akka/akka/pull/15597 … https://t.co/P32NbQQokG' I think it is a really cool feature to be able to reactivate active entities in a cluster when rebalancing occurs. So thanks to Dominic for the great work! I will benefit a lot from this feature. However I am wondering about fault tolerance. One would typically use sharding when the actors doesn't fit on one machine (CPU or Memory) right? So restarting the sharded entries eagerly could end up in a software solution that will be unable to restart again, or will tear down the whole cluster when rebalancing. I guess we need some strategy to prevent this eager restarting when there are not enough (how do we now what is enough) nodes in the cluster. This will keep the cluster alive. How do the NoSQL Databases that support sharding do this? Is there already a strategy to prevent eager restarting to make this feature fault tolerant? Cheers, Jeroen -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.com javascript:. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
[akka-user] Question about: Make entries of shards persistent
Hi, I saw this tweet by Patrik: 'Fantastic #Akka https://twitter.com/hashtag/Akka?src=hash contribution by Dominic Black merged to master. Cluster sharding remembers active entries. https:// github.com/akka/akka/pull/15597 … https://t.co/P32NbQQokG' I think it is a really cool feature to be able to reactivate active entities in a cluster when rebalancing occurs. So thanks to Dominic for the great work! I will benefit a lot from this feature. However I am wondering about fault tolerance. One would typically use sharding when the actors doesn't fit on one machine (CPU or Memory) right? So restarting the sharded entries eagerly could end up in a software solution that will be unable to restart again, or will tear down the whole cluster when rebalancing. I guess we need some strategy to prevent this eager restarting when there are not enough (how do we now what is enough) nodes in the cluster. This will keep the cluster alive. How do the NoSQL Databases that support sharding do this? Is there already a strategy to prevent eager restarting to make this feature fault tolerant? Cheers, Jeroen -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] PersistentActor initialization best practices
Hi, I guess that a downside of 3 is that it makes it less suitable to use in ClusterSharding (in ClusterSharding you cannot pass instance specific props to an actor). So my preference (especially with domain actors) is to create the actor and initialize it with a command. All domain aggregates should react to commands, so initializing with an event makes the design more consistent IMHO. Cheers, Jeroen -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Re: Reactive applications with CQRS and Akka Persistence?
Hi Ashley, Op vrijdag 27 juni 2014 06:57:17 UTC+2 schreef Ashley Aitken: Hi Jeroen, On Friday, 27 June 2014 05:13:54 UTC+8, Jeroen Gordijn wrote: Why do you need a view that watches multiple Processors? This can already be achieved. You create a child View for every Processor you would like to watch. The child simply forwards all messages to its parent. There are two (or three) reasons I would suggest, please correct me if I am wrong: 1. What if there are a very very large number of Processors (e.g. for each Customer aggregate root) Do we keep the Views for each Processor (as there could be more than one) active all the time? If they are not active then (AFAIK) they will not be updated and when a query comes in for that View they will need to be activated and recover their state. How many aggregates do you expect? Actors are light weight and you can have ~2.5 million actors per GB of heap (source: akka.io). I would expect there to be more problems with keeping other state in memory. I would keep the actors active, until I am certain this will run into problems. A View that could be notified of changes to any Processor of a specific type could create/activate and update that Processor's View as needed so that it is ready for a subsequent query. This could include putting the view into an external store. 2. What if the Processors have a one to many association with another type of Processor (e.g. each Customer can have many Songs) One of the roles of the read side in CQRS is to enable (possibly multiple) denormalised read models. How do we maintain DenormalisedView of Processor (e.g. Customer with Songs) without having all the Views of the associated Processor also active? My first attempt would be to do exactly the same as I mentioned before. Just create actors that forward the events to the aggregator. I guess that this possibly lead to many actors. Otherwise I would create an actor per Processor and let the aggregators subscribe to that. This also gives you the possibility to enrich the message with some information which is not in the event. I am persisting an OrderSubmitted event. This event does not contain the orderId. In the view I do need the OrderId to correlate this event to the correct representation. The View knows the orderId and enriches the event by wrapping it in a Envelope with the orderId. Like: case OrderSubmitted = context.parent ! Envelope(orderId, OrderSubmitted) Otherwise every event needs to contain the AggregateId. A DenormalisedView(s) that could be notified of changes to one of its dependent Processors could create/activate that Processor's View and update the denormalised view(s) based on that. 3. When both (1.) and (2.) above occur (which I think is quite common). If you can think of a way to efficiently do these at scale I would be very interested to hear. From my understanding of CQRS it is common for the read side to be able to subscribe to notification of changes to the event store so as to efficiently handle the cases above. I agree that receiving events would be a nice optimization, but you still need to know for which Aggregate the event is. This will show in your code. To be able to scale with the current solution you could use cluster sharding. However, I don't know the implementation details and don;t know how the polling is performed. Do 1000 Views lead to 1 poll every interval? Akka Persistence's current approach (as far as I can tell and confirmed somewhat by Patrik) can't really handle the cases above efficiently or at scale because of this lack of more general notifications (i.e. it just updates active Views at regular intervals based solely on one specified Processor). The events could lead to faster eventually consistency. As events are published per aggregate (processorId) I do not see how this can be automatically merged into one stream without the need to change your software to include the aggregateId in every event. You could argue that including the id in every event makes things clearer anyway so in that case I follow you reasoning. Patrik in another thread suggested implementing the above scenarios using notifications with an external distributed pub-sub and Pawel has done just that in his fantastic DDD-Leaven-Akka examples using Apache Camel and ActiveMQ (but there are still limitations, e.g. rebuilding denormalised views). It's a shame that Akka Persistence doesn't seem (on its own) fully functional for CQRS as yet (IMHO). It is still experimental, of course, so there is great hope that something will be done to address this. I was not aware of the DDD-Leaven-Akka thanks for mentioning that! I will look into it. Apart from optimization I do not feel hindered to create a CQRS application. However, I did not build a large scale application with it yet. Cheers, Jeroen -- Read the docs
Re: [akka-user] Re: Reactive applications with CQRS and Akka Persistence?
Op maandag 23 juni 2014 15:27:38 UTC+2 schreef Ashley Aitken: Sorry to bring this thread up again but I am still trying to work out the read model in CQRS with Akka Persistence. On Tuesday, 6 May 2014 15:18:29 UTC+8, Martin Krasser wrote: The availability of (new) events in the journal is also an event. Whether a view is actively notified about their availability (= push) or if the view tries to find out itself (= pull) is rather a technical detail. A pull-based view even allows to decouple a processor from its view(s) in the dimension of lifetime. Why do you see this in conflict with being reactive? Do you mean that an active View can subscribe to the Event Bus and be notified of the arrival of (new) events in the journal for its Processor(s)? If not I am beginning to feel that this would be very useful, if not necessary, for efficient view models built with Akka Persistence. Of course, along with Views watching multiple Processors, which I don't believe will be the same as the new Akka Streams. Why do you need a view that watches multiple Processors? This can already be achieved. You create a child View for every Processor you would like to watch. The child simply forwards all messages to its parent. Regards, Jeroen Thanks, Ashley. -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Cluster Singleton duplicated if primary seed restarted
Hi, The docs also advice against auto-downing. However I do not really get the alternative. Manual downing would be unworkable, because it could render your application unavailable for to long. So should I implement some strategy in my akka solution, or in some external monitoring system? How are people using this in production? Cheers, Jeroen -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Some questions about clustering
Op maandag 19 mei 2014 17:13:38 UTC+2 schreef Martynas Mickevičius: On Sun, May 18, 2014 at 3:55 AM, Luis Medina lu4...@gmail.comjavascript: wrote: 8. If I have a cluster aware pool router in a cluster with say 10 nodes and I define: nr-of-instances = 100 will those 100 routess be distributed evenly to each node? In this case, will each node be running 10 routees? Also, if one of the nodes were to go down, would the remaining 9 nodes create more routees in order to compensate for the 10 that were lost? Yes, routees will be distributed evenly. Routees will be recreated after cluster membership change. Please correct me if I'm wrong, but this is only true for removed nodes, right? It will not redistribute when a new node is added and the nr-of-instances is already reached. So when we have a 2 node cluster with: - nr-of-instances = 10 - max-nr-of-instances-per-node = 5 - allow-local-routees = on then both nodes will have 5 routees. When an extra node enters the cluster it will get no routees, as they are all distributed to the first 2 nodes. Cheers, Jeroen -- Martynas Mickevičius Typesafe http://typesafe.com/ – Reactivehttp://www.reactivemanifesto.org/Apps on the JVM -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Active restore ClusterShard
Op vrijdag 16 mei 2014 09:43:33 UTC+2 schreef Patrik Nordwall: On Fri, May 16, 2014 at 9:31 AM, Heiko Seeberger heiko.s...@gmail.comjavascript: wrote: Couldn't we add a watchdog feature to Cluster Sharding itself? I don't think this is a core feature of Cluster Sharding and I think it can be better implemented at the application level. For example, the application knows how to send a message to a specific actor, since it is the application that defines the id extractor. /Patrik I think it would be great from the developers perspective to be able to mark an actor to survive rebalancing (or crashes), so that the actor itself is re-created on the new node when the Shard is moved. Why do you think that this feature should *not* be part of the Cluster Sharding? What makes it application level? In my mind sharding is just configuration and not part of the application itself. It still leaves me wondering how I should prevent my whole cluster from going down when a node crashes and moving the shard results in an out-of-memory on the new node. To be clear, I don't have a real problem yet, but I'm trying to get my head around the cluster sharding concepts. Cheers, Jeroen Heiko On Fri, May 16, 2014 at 9:25 AM, Patrik Nordwall patrik@gmail.comjavascript: wrote: Hi Jeroen, On Thu, May 15, 2014 at 10:09 PM, Jeroen Gordijn jeroen@gmail.comjavascript: wrote: Hi, When rebalancing a Shard, the old shard is stopped and a new shard is started on another node, after which all messages for that Shard will be send to the new node. When a message is received, the actor will be created. When Akka-persistence is used the Actor will reload all its events and restore state before processing the new message. But if no message is sent, the actor will not be created. This can be problematic when the actor is has some active state with retry mechanisme or timeout. Is my understanding correct? Your reasoning is correct. I think you can implement that by letting the actor schedule a keep-alive message to itself, but via the ShardRegion. Normally that will be local only message roundtrip via the scheduler and local ShardRegion, but after rebalancing it will delegate the message to the new node and thereby wake up the actor again. What this doesn't solve is when a node crashes. An actor living on that node will not be automatically started somewhere else, until a message is sent to it. To solve that I think you have to let the actor register itself to a a few (for redundancy) watchdog actors, which watch the actor and know how to send the wake-up message via ClusterSharding. Does that make sense? Cheers, Patrik Is there a way to actively restore the Shard state when the shard is moved to another node? One problem I can see with this is when going back to less nodes. This means that the shards will be rebalanced, but potentially giving memory problems. This will cause rebalancing and memory problems on the next node and eventually putting the whole cluster down. Starting the cluster will be also problematic for the same reason. Cheers, Jeroen -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.comjavascript: . Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw JOIN US. REGISTER TODAY! http://www.scaladays.org/ Scala http://www.scaladays.org/ Days http://www.scaladays.org/ June 16th-18th, http://www.scaladays.org/ Berlin http://www.scaladays.org/ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.comjavascript: . Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Heiko Seeberger Twitter: @hseeberger Blog: blog.heikoseeberger.name -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https
[akka-user] Re: Akka Camel: Consuming a single message
Hi Sam, I don't know a way by using the akka-camel consumer, but you can get to camel via the CamelExtension. 1. val camel = CamelExtension(system) 2. val consumerTemplate = camel.context.createConsumerTemplate() 3. val message = camel.context.createConsumerTemplate().receiveNoWait(activemq:youQueue) 4. if(message == null) // no message 5. else //message This will consume one message if there is any. Create a scheduler to perform this every 5 seconds. I haven't tried this could, but this is what I would try. Cheers, Jeroen Op donderdag 15 mei 2014 17:06:16 UTC+2 schreef Sam Starling: Hello, Is it at all possible to get an akka.camel.Consumer to consume a single message periodically? The code in the link below consumes all messages from a queue as quickly as possible, whereas I'd like to consume them at a particular rate (eg. one every 5 seconds) and have them left in ActiveMQ's queue until the last possible moment. https://gist.github.com/samstarling/1e228fe6c42468aba0f7 I was thinking of getting the Consumer to consume a single message, and scheduling this using Akka.scheduler.repeat, but I'm open to other options as well. Thanks, Sam -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.
Re: [akka-user] Cluster sharding API
@Patrik, I guess you're right. I have looked at the code, and while this action is blocking, the code underneath just creates a few actors which is non-blocking. So I guess this would be really fast and there would be no gain in putting a lot of effort to make this non-blocking. @Roland Does that mean the actorOf() will then no longer return a plain ActorRef, but maybe some future or a message after initialization? That would make it harder to use and reason about, doesn't it? I guess you are taking that into consideration. Is there any information about Akka Gålbma? Op zaterdag 10 mei 2014 12:41:18 UTC+2 schreef Akka Team: Returning a strict result immediately and performing the work asynchronously are fundamentally at odds with each other, which is why I consider it a minor miracle that system.actorOf() could be made to work—but it still is the biggest mistake that I made in evolving Akka considering all the pain it inflicts on the implementation and its maintainers. Akka Gålbma will not have this feature. Regards, Roland On Fri, May 9, 2014 at 7:33 PM, Heiko Seeberger heiko.s...@gmail.comjavascript: wrote: Maybe we could have `start` return an `ActorRef` immediately, i.e. an asynchronous operation like `actorOf`? Heiko On Fri, May 9, 2014 at 4:48 PM, Patrik Nordwall patrik@gmail.comjavascript: wrote: On Fri, May 9, 2014 at 4:43 PM, Jeroen Gordijn jeroen@gmail.comjavascript: wrote: Thanks Patrik, I overlooked that it is blocking, that makes the fix even easier. You mention that you would not do it in the same place and I don't have enough experience to comment on that, but in the course it seemed perfectly valid to do just that. I created a ticket: https://github.com/akka/akka/issues/15157 Thanks, yes, on second thought I agree. Pass it in to other actors. /Patrik Cheers, Jeroen Op vrijdag 9 mei 2014 16:33:07 UTC+2 schreef Patrik Nordwall: You should be able to do those calls immediately after each other, from same thread. ClusterSharding(context.system).start ClusterSharding(context.system).shardRegion Start doesn't return until it has been initalized. Normally, I don't think these two things are done at the same place in the code, so I don't think it is much value in returning the ActorRef from start, but it would not hurt to do so. Please create a tickethttps://github.com/akka/akka/issues. Thanks for reporting. Cheers, Patrik On Fri, May 9, 2014 at 3:48 PM, Jeroen Gordijn jeroen@gmail.comwrote: Hi, I followed the Advanced akka course in the last 2 days (which was awesome by the way) and noticed something of the API which I think can be improved. When you start the ClusterSharding its return type is Unit and you have to retrieve the ShardRegion by calling the shardRegion on ClusterSharding. However, this cannot take place in a short period, because shardRegion(...) will 'throw new IllegalArgumentException(sShard type [$typeName] must be started first)'. In my mind it would be an improvement if the start returns an ActorRef immediately and then buffers all requests to it until the ShardRegion is initialized. Am I overlooking some problems with that? Cheers, Jeroen -- Start ShardRegion ClusterSharding(context.system).start( player, Some(Player.props), Player.idExtractor, Player.shardResolver(10) ) -- get reference to ShardRegion val playerRegion = ClusterSharding(context. system).shardRegion(player) -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/ current/additional/faq.html Search the archives: https://groups.google.com/ group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com. To post to this group, send email to akka...@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout. -- Patrik Nordwall Typesafe http://typesafe.com/ - Reactive apps on the JVM Twitter: @patriknw JOIN US. REGISTER TODAY! http://www.scaladays.org/ Scala http://www.scaladays.org/ Days http://www.scaladays.org/ June 16th-18th, http://www.scaladays.org/ Berlin http://www.scaladays.org/ -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+...@googlegroups.com javascript:. To post to this group, send email to akka...@googlegroups.comjavascript: . Visit this group at http://groups.google.com/group
[akka-user] Cluster sharding API
Hi, I followed the Advanced akka course in the last 2 days (which was awesome by the way) and noticed something of the API which I think can be improved. When you start the ClusterSharding its return type is Unit and you have to retrieve the ShardRegion by calling the shardRegion on ClusterSharding. However, this cannot take place in a short period, because shardRegion(...) will 'throw new IllegalArgumentException(sShard type [$typeName] must be started first)'. In my mind it would be an improvement if the start returns an ActorRef immediately and then buffers all requests to it until the ShardRegion is initialized. Am I overlooking some problems with that? Cheers, Jeroen -- Start ShardRegion ClusterSharding(context.system).start( player, Some(Player.props), Player.idExtractor, Player.shardResolver(10) ) -- get reference to ShardRegion val playerRegion = ClusterSharding(context.system).shardRegion(player) -- Read the docs: http://akka.io/docs/ Check the FAQ: http://doc.akka.io/docs/akka/current/additional/faq.html Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups Akka User List group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at http://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.