Hi Roland,

All good and thanks for being helpful. That clears things up :) 

Btw, PersistentActor has a good ring to it.


On Tuesday, May 27, 2014 8:44:11 AM UTC+1, rkuhn wrote:
>
> Hi Alex,
>
> 26 maj 2014 kl. 21:57 skrev ahjohannessen <ahjoha...@gmail.com<javascript:>
> >:
>
> Hi Roland,
>
> the main concern you express is that messages sent from an 
> EventsourcedProcessor are eventually delivered to their recipients, which 
> is what a durable message queue (formerly called PersistentChannel) is for. 
> The main use of the non-persistent Channel is to deduplicate messages, but 
> it fundamentally cannot do that reliably in any case (since a confirmation 
> may have been on its way when it crashed).
>
>
> I don't follow your train of thought here. We use 
> *apply-event-and-notify-via-channel" during normal operation *and* recovery 
> in Earnings. I don't see how making Earnings concern itself with maintaing 
> state of acks for each and every domain event improves matters - that is 
> what persistent.confirms + channel during recovery is useful for in this 
> case. Earnings' journal is *the* durable message queue here because we just 
> want to make sure that events from it are replicated in a corresponding 
> aggregator due to the fact that there is currently no way to get a global 
> view of EPs of same type.
>
> Earnings itself does not care whether Aggregator gets its messages, it is 
> just a technical workaround for missing global views for EPs of same type. 
> The calculation triggering I mentioned in an earlier post can easily be 
> changed to an EarningsAggregator View based approach, but using a separate 
> channel was less work.
>
> A separate durable message queue seems overkill for our use case.
>
>
> I think we are talking past each other: let’s make up a name for the new 
> PersistentChannel that will be efficient at redelivery and also clear its 
> state from old messages that are already confirmed (since our Persistence 
> journals are not made for this use-case), say we call it X. Instead of 
> using a Channel, you would continue to do the same thing with an X, X will 
> fully replace both Channel and PersistentChannel.
>
> In addition you will not need this workaround anymore because we will 
> allow you to reliably subscribe to and merge the event streams from any 
> number of PersistentActors.
>
> Does this clear things up, or am I still missing something?
>
> My explanation below, which I can see now must have been confusing to you, 
> was about implementing guaranteed, lossless delivery of events within a 
> PersistentActor network. As I tried to point out, just sending to a Channel 
> does not necessarily achieve the same thing, even if you also do it during 
> recovery, since recovery may never happen (i.e. Channel fails, but actor 
> does not). For replication of events from the journal we can make it work, 
> because the journal will then be the sender (so we can implement retries).
>
> Regards,
>
> Roland
>
>
> This is the general flow we have for all of our registrators, Earnings is 
> one of those:
>
> trait RegistrationFlow[ST] extends EventsourcedProcessor {
>   this: RegistrationBehavior[ST] ⇒
>
>   val pid: String
>   override val processorId: String = pid
>
>   def stream: ActorRef
>
>   final def receiveCommand = commands orElse questions
>   final def receiveRecover = recover
>
>   private def questions: Receive = {
>     case q: Question if questionHandler.isDefinedAt(q) ⇒
>       sender() ! questionHandler(q)(state)
>   }
>
>   private def commands: Receive = {
>     case c: Command if commandHandler.isDefinedAt(c) ⇒
>       commandHandler(c)(state) match {
>         case Success(events)  ⇒ handleEvents(events)(sender() ! 
> ACK(c.correlationId))
>         case Failure(failure) ⇒ sender() ! NACK(failure, c.correlationId)
>     }
>   }
>
>   private def recover: Receive = {
>     case e: Event ⇒ applyAndEmit(e)
>   }
>
>   private def handleEvents(events: Seq[Event])(doneSignal: ⇒ Unit) = {
>     if(events.isEmpty) doneSignal else {
>       val lastEventToPersist = events.last
>       persist(events) { pe ⇒
>         applyAndEmit(pe)
>         if(pe == lastEventToPersist) doneSignal
>       }
>     }
>   }
>
>   private def applyAndEmit(e: Event) = {
>     apply(e)
>     currentPersistentMessage foreach(stream ! _)
>   }
>
>   private def apply(e: Event) = {
>     if (eventHandler.isDefinedAt(e))
>       state = eventHandler(e)(state)
>   }
>
>   private var state = stateBuilder
> }
>
>
>  
>
> In the solution you describe, if your Earnings must make sure to send to 
> the Aggregator, then they will in either case need to resend the message 
> periodically until confirmed, not only during restart, otherwise the 
> message can be deferred indefinitely; this is the same regardless of which 
> transport helper is used. My concrete proposal would be to store 
> unconfirmed outbound messages in the Earnings’ state and resend based on a 
> timer tick—this extends naturally to recovery after a machine crash.
>
>
> I am sorry, but I don't see what storing "unconfirmed" outbound domain 
> events in Earnings' state gives us other than busywork compared to what we 
> have today. Earnings does not communicate with destinations directly, the 
> aggregator is pure technical concerns because we need global views.
>  
> Of course, I would hope to have a more clean solution than we currently 
> have, however I don't see how the removal of channels without a viable 
> alternative improves the status quo.
>
>
> I think we’ll see this resending functionality factored out in a common 
> trait soon as more people start building software this way.
>
> Regards,
>
> Roland
>
> 21 maj 2014 kl. 15:30 skrev ahjohannessen <ahjoha...@gmail.com>:
>
> Hi Roland,
>
> You state the following:
>
> *"...talks about Channel since that was only needed to contain the 
> side-effecting replay nature of command sourced processors."*
>
>
> A channel is also needed when sending events from an eventsourced 
> processor to another actor when one needs at-least-once delivery rather
> than at-most-once:
>
> def receiveRecover: Receive = {
>   case event: String => handleEvent(event)
> }
>  
> def handleEvent(event: String) = {
>   // update state
>   // ...
>   // reliably deliver events
>   channel ! Deliver(Persistent(event), destination.path)
>   // alternatively something that encapsulates this pair
> }
>
>
>
> We are dependent on this very functionality for reliable coordination in 
> our apps and removing Channel leaves us worried.
>
>
> *"...Your description below is rather terse, so it is not fully clear to 
> me how you are using Channel in this case and what a replacement should be, 
> can you elaborate?"*
>
>
> As stated earlier, we have many eventsourced processors of same type, e.g. 
> 10000 instances of "Earnings", that are loaded on demand by a supervisor 
> when a command enters into the domain. When an instance is loaded the 
> command is forwarded to this guy and when the command 
> results in domain events we acknowledge to sender in the persist callback 
> *but* also forward the persistent message to another actor, let's call it 
> "stream", 
> that needs to react to this. It ensures that a calculation is initiated by 
> delivering a message to a "change reactor" via a unique channel. 
> Furthermore "stream" also ensures that a message is delivered, via another 
> unique channel, to an EarningsAggregator that essentially replicates all of 
> those 10000 instances' events, because we need a global view in order to 
> maintain read models. 
>
> In receiveRecover we *also* forward replayed events to "stream" in order 
> to ensure that messages get delivered, via channels, to the respective 
> destinations, 
> in the case of a JVM crash.
>
> During system start up we start all recently active (say last 24 hours) 
> Earnings instances to ensure that all calculations are triggered and that 
> our EarningsAggregator 
> gets the event, again in the name of crash paranoia. These instances shut 
> themselves down after a reasonable receive timeout in order to keep memory 
> footprint low. 
> This roughly happens by sending a "passivate me" message to their 
> supervisor which in turn ensures a proper shutdown and all of the 
> instance's messages are 
> processed by using a combination of poison pill, become, stash and 
> listening to terminated.
>
> In the above I mention Earnings eventsourced processor, however we have 
> around 10 similar types under same supervisor that are similarly structured 
> and have the same
> need of what a channel gives us.
>
> Other usages of channels that we have are communication among processes. 
> Often here it is perfectly fine for us to use channels because it ensures 
> that something eventually
> will happen, some eventsourced processor emits an event to reactor 
> (ordinary actor) via a channel (also during receiveRecover) that transforms 
> the event to a command in the 
> language of another eventsourced processor. All of such processes are 
> idempotent and maintaining an queue on both sides is just a lot of useless 
> busywork in most of our 
> use cases.
>
> I have no idea of what a replacement of Channel would be because I do not 
> have an issue with it and think it is a good primitive that solves many of 
> our use cases in ways 
> that are satisfying and saves us a lot of work trying to re-implement the 
> same functionality.
>
> *"...My motivation here is to not remove needed functionality without 
> improved replacement."*
>
>
> That is good to know :)
>
>
> On Tuesday, May 20, 2014 7:32:38 PM UTC+1, rkuhn wrote:
>
> Hi Alex,
>
> I have filed the ticket for Processor’s removal (
> https://github.com/akka/akka/issues/15230), which also talks about 
> Channel since that was only needed to contain the side-effecting replay 
> nature of command sourced processors. Your description below is rather 
> terse, so it is not fully clear to me how you are using Channel in this 
> case and what a replacement should be, can you elaborate?
>
> There is also the discussion ticket for reinventing PersistentChannel (<a 
> href="https://github.com/akka/akka/issues/15231"; target="_blank" 
> onmousedown="this.href='
> https://www.google.com/url?q\75https%3A%2F%2Fgithub.com%2Fakka%2Fakka%2Fissues%2F15231\46sa\75D\46sntz\0751\46usg\75AFQjCNHVgLE8rZoAfgzee49OnEr_3wqWmA';return<https://www.google.com/url?q%5C75https%3A%2F%2Fgithub.com%2Fakka%2Fakka%2Fissues%2F15231%5C46sa%5C75D%5C46sntz%5C0751%5C46usg%5C75AFQjCNHVgLE8rZoAfgzee49OnEr_3wqWmA';return>true;"
>  onclick="this.href='
> https://www.google.com/url?q\75https%3A%2F%2Fgithub.com%2Fakka%2Fakka%2Fissues%2F1<https://www.google.com/url?q%5C75https%3A%2F%2Fgithub.com%2Fakka%2Fakka%2Fissues%2F1>
>
> ...

-- 
>>>>>>>>>>      Read the docs: http://akka.io/docs/
>>>>>>>>>>      Check the FAQ: 
>>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html
>>>>>>>>>>      Search the archives: https://groups.google.com/group/akka-user
--- 
You received this message because you are subscribed to the Google Groups "Akka 
User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/d/optout.

Reply via email to