Hi Michael,

with akka-persistence you have several options to maintain persistent read models:

1.) processor -> channel -> destination processor. The destination processor itself writes messages (received via channel) to the journal. 2.) eventsourced processor -> (target) processor. The eventsourced processor sends events to the (target) processor from within its command handler. When using reliable event delivery <http://doc.akka.io/docs/akka/2.3.0-RC4/scala/persistence.html#reliable-event-delivery>, this is more or less equivalent to 1 (from the read model perspective). 3.) processor or eventsourced processor | view. Here, the view reads directly from a processor's journal. Processor or eventsourced processor don't send any messages directly to view.

Mixing 3 with other options is not a good idea, as you already mentioned. What are the differences between these three options?

ref 1.) at-least-once delivery semantics but destination processor must be able to deal with duplicates as well as out-of-order message delivery. Depends on the application-specific processing logic if this is an issue. ref 2.) at-most-once delivery semantics i.e. (target) processor must assume that message can be lost. With realiable event delivery, see ref 1. ref 3.) at-least-once delivery semantics i.e. failed reads (= incremental replays) from a processor's journal are retried by the view but a view can make ordering assumptions here: if it maintains a lastProcessedSequenceNr field, it can safely ignore any messages with a sequence number <= lastProcessorSequenceNr. Read models for which message ordering matters should be implemented as views.

More inline ...

On 25.02.14 19:25, delasoul wrote:
Hello,

following the improvement suggestions in the latest cluster sharding activator example I have added a persistent view and a replicated journal using
https://github.com/ddevore/akka-persistence-mongo/.
As usual everything works as announced and I think persistence/cluster sharding is really great and we'll be using it a lot in the future.
Now, I am not too sure how to best use persistent views:
I have added an AuthorListingView (pls. see code at the end of this post). The view is also sharded and I have deactivated auto updates as I think in most cases I will not find the correct update interval - either the update is tried too often without any changes happened or the delay between a change happening and updating the view would be too long.
So, I create the AuthorListing with the view's shardRegion:

ClusterSharding(system).start(
        typeName = AuthorListing.shardName,
entryProps = Some(AuthorListing.props(ClusterSharding(system).shardRegion(AuthorListingView.shardName)))

and in AuthorListing when it gets a PostSummary msg I update the corresponding view "manually" via the shardRegion:

def receive = {
    case Persistent(s: PostSummary, _) =>
      posts :+= s
      log.info("Post added to {}'s list: {}", s.author, s.title)
      view ! ListingChanged(s.author)

Forgetting about the duplicate sending of DoUpdate msgs when replaying ( I think Views will be mostly used with EventSourced processors anyway) this works but it still feels not correct somehow - I just could send the view the persisted msg (command or event) directly without the Update, saving a journal query.

Here you you mix option 3 (see above) with another one. Furthermore, views never write to a journal.

Also, it is probably not a good solution when the same event has to be sent to more than one view (using auto update would of course fix this problem...).

You can always use a view (option 3) or a destination/target processor (options 1 and 2) that distribute their events to plain actors without using channels. This way you save redundant journal updates or queries.

Would it be a good idea to update a view's state by just sending "normal" events(and never use auto-update or Update msgs), but of course use the replay and snapshot functionality of a view for starting/restarting?

No, see initial comments.

When updating Views by accessing the journal and not using "messaging" will the journal not become a bottleneck at a certain point?

Most replicated journals should be able to scale reads well (preferrable with a configurable read consistency). For actor systems on nodes that only maintain read models, one could also develop a journal plugin that is optimized for very fast, scalable reads (e.g. reading from a datastore that receives updates from the datastore that accepts writes from processors).

In the moment (we are using eventsourced) we do it the classical way - command side persisting to the event store and updating the view side by emitting events and the view persists the view model again.

This is option 1 above (unless you mean that the view directly writes its current state to a database, for example)

But I really like the idea to also keep the view in memory when needed, update it, shut it down when not used and bring it back easily by replaying the corresponding processor or snapshotting the view.

This is possible with all options 1-3 described above.

Hope that helps,

Cheers,
Martin


I think I forgot all other things I additionally wanted to ask by now...

Thanks in advance for any suggestions,

michael



object AuthorListingView {

  case class GetPosts(author: String)
  case class Posts(list: immutable.IndexedSeq[PostSummary])
  case class ListingChanged(author: String)

  val idExtractor: ShardRegion.IdExtractor = {
    case p @ Persistent(s: PostSummary, _) => (s.author, p)
    case ListingChanged(author) => (author, Update(await = true))
    case m: GetPosts => (m.author, m)
  }

  val shardResolver: ShardRegion.ShardResolver = msg => msg match {
case Persistent(s: PostSummary, _) => (math.abs(s.author.hashCode) % 100).toString case ListingChanged(author) => (math.abs(author.hashCode) % 100).toString case GetPosts(author) => (math.abs(author.hashCode) % 100).toString }

  val shardName: String = "AuthorListingView"
}


class AuthorListingView extends View with ActorLogging {
  import AuthorListingView._

  log.info(s"viewId: $viewId and processorId: $processorId")

override def processorId: String = this.viewId.replace(shardName, AuthorListing.shardName)

  override def autoUpdate = false

  var posts = Vector.empty[PostSummary]

  override def receive: Receive = {
    case Persistent(s: PostSummary, _) =>
      log.info(s"View for ${self.path.name} received summary: $s")
      posts :+= s
    case GetPosts(_) => sender ! Posts(posts)
  }



--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://akka.io/faq/
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/groups/opt_out.

--
Martin Krasser

blog:    http://krasserm.blogspot.com
code:    http://github.com/krasserm
twitter: http://twitter.com/mrt1nz

--
     Read the docs: http://akka.io/docs/
     Check the FAQ: http://akka.io/faq/
     Search the archives: https://groups.google.com/group/akka-user
--- You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/groups/opt_out.

Reply via email to