Hi Michael,
with akka-persistence you have several options to maintain persistent
read models:
1.) processor -> channel -> destination processor. The destination
processor itself writes messages (received via channel) to the journal.
2.) eventsourced processor -> (target) processor. The eventsourced
processor sends events to the (target) processor from within its command
handler. When using reliable event delivery
<http://doc.akka.io/docs/akka/2.3.0-RC4/scala/persistence.html#reliable-event-delivery>,
this is more or less equivalent to 1 (from the read model perspective).
3.) processor or eventsourced processor | view. Here, the view reads
directly from a processor's journal. Processor or eventsourced processor
don't send any messages directly to view.
Mixing 3 with other options is not a good idea, as you already
mentioned. What are the differences between these three options?
ref 1.) at-least-once delivery semantics but destination processor must
be able to deal with duplicates as well as out-of-order message
delivery. Depends on the application-specific processing logic if this
is an issue.
ref 2.) at-most-once delivery semantics i.e. (target) processor must
assume that message can be lost. With realiable event delivery, see ref 1.
ref 3.) at-least-once delivery semantics i.e. failed reads (=
incremental replays) from a processor's journal are retried by the view
but a view can make ordering assumptions here: if it maintains a
lastProcessedSequenceNr field, it can safely ignore any messages with a
sequence number <= lastProcessorSequenceNr. Read models for which
message ordering matters should be implemented as views.
More inline ...
On 25.02.14 19:25, delasoul wrote:
Hello,
following the improvement suggestions in the latest cluster sharding
activator example I have added a persistent view and a replicated
journal using
https://github.com/ddevore/akka-persistence-mongo/.
As usual everything works as announced and I think persistence/cluster
sharding is really great and we'll be using it a lot in the future.
Now, I am not too sure how to best use persistent views:
I have added an AuthorListingView (pls. see code at the end of this
post). The view is also sharded and I have deactivated auto updates as
I think in most cases I will not find the correct update interval -
either the update is tried too often without any changes happened or
the delay between a change happening and updating the view would be
too long.
So, I create the AuthorListing with the view's shardRegion:
ClusterSharding(system).start(
typeName = AuthorListing.shardName,
entryProps =
Some(AuthorListing.props(ClusterSharding(system).shardRegion(AuthorListingView.shardName)))
and in AuthorListing when it gets a PostSummary msg I update the
corresponding view "manually" via the shardRegion:
def receive = {
case Persistent(s: PostSummary, _) =>
posts :+= s
log.info("Post added to {}'s list: {}", s.author, s.title)
view ! ListingChanged(s.author)
Forgetting about the duplicate sending of DoUpdate msgs when replaying
( I think Views will be mostly used with EventSourced processors
anyway) this works
but it still feels not correct somehow - I just could send the view
the persisted msg (command or event) directly without the Update,
saving a journal query.
Here you you mix option 3 (see above) with another one. Furthermore,
views never write to a journal.
Also, it is probably not a good solution when the same event has to be
sent to more than one view (using auto update would of course fix this
problem...).
You can always use a view (option 3) or a destination/target processor
(options 1 and 2) that distribute their events to plain actors without
using channels. This way you save redundant journal updates or queries.
Would it be a good idea to update a view's state by just sending
"normal" events(and never use auto-update or Update msgs), but of
course use the replay and snapshot functionality of a view for
starting/restarting?
No, see initial comments.
When updating Views by accessing the journal and not using "messaging"
will the journal not become a bottleneck at a certain point?
Most replicated journals should be able to scale reads well (preferrable
with a configurable read consistency). For actor systems on nodes that
only maintain read models, one could also develop a journal plugin that
is optimized for very fast, scalable reads (e.g. reading from a
datastore that receives updates from the datastore that accepts writes
from processors).
In the moment (we are using eventsourced) we do it the classical way -
command side persisting to the event store and updating the view side
by emitting events and the view persists the view model again.
This is option 1 above (unless you mean that the view directly writes
its current state to a database, for example)
But I really like the idea to also keep the view in memory when
needed, update it, shut it down when not used and bring it back easily
by replaying the corresponding processor or snapshotting the view.
This is possible with all options 1-3 described above.
Hope that helps,
Cheers,
Martin
I think I forgot all other things I additionally wanted to ask by now...
Thanks in advance for any suggestions,
michael
object AuthorListingView {
case class GetPosts(author: String)
case class Posts(list: immutable.IndexedSeq[PostSummary])
case class ListingChanged(author: String)
val idExtractor: ShardRegion.IdExtractor = {
case p @ Persistent(s: PostSummary, _) => (s.author, p)
case ListingChanged(author) => (author, Update(await = true))
case m: GetPosts => (m.author, m)
}
val shardResolver: ShardRegion.ShardResolver = msg => msg match {
case Persistent(s: PostSummary, _) => (math.abs(s.author.hashCode)
% 100).toString
case ListingChanged(author) => (math.abs(author.hashCode) %
100).toString
case GetPosts(author) =>
(math.abs(author.hashCode) % 100).toString }
val shardName: String = "AuthorListingView"
}
class AuthorListingView extends View with ActorLogging {
import AuthorListingView._
log.info(s"viewId: $viewId and processorId: $processorId")
override def processorId: String = this.viewId.replace(shardName,
AuthorListing.shardName)
override def autoUpdate = false
var posts = Vector.empty[PostSummary]
override def receive: Receive = {
case Persistent(s: PostSummary, _) =>
log.info(s"View for ${self.path.name} received summary: $s")
posts :+= s
case GetPosts(_) => sender ! Posts(posts)
}
--
>>>>>>>>>> Read the docs: http://akka.io/docs/
>>>>>>>>>> Check the FAQ: http://akka.io/faq/
>>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google
Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send
an email to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/groups/opt_out.
--
Martin Krasser
blog: http://krasserm.blogspot.com
code: http://github.com/krasserm
twitter: http://twitter.com/mrt1nz
--
Read the docs: http://akka.io/docs/
Check the FAQ: http://akka.io/faq/
Search the archives: https://groups.google.com/group/akka-user
---
You received this message because you are subscribed to the Google Groups "Akka User List" group.
To unsubscribe from this group and stop receiving emails from it, send an email
to akka-user+unsubscr...@googlegroups.com.
To post to this group, send email to akka-user@googlegroups.com.
Visit this group at http://groups.google.com/group/akka-user.
For more options, visit https://groups.google.com/groups/opt_out.