Hi folks, I am just starting with event sourcing and still struggle to get domain problems solved. Maybe a user database is not a good fit for ES, dunno :)
What do I want to achieve: * Users can be created (having an immutable UUID and an email which is used as username) * Email can be changed * Email should be unique (since used as login username) * Lookup users is needed by email (for login) and by UUID (for everything else) *Lookup by email and change email* Since user database is potentially unbound I wanted to use sharding over the UUID to be able to split the state hold in memory across nodes. So I would end up with multiple ES event streams, one for each user UUID. So getting the current user state by ID is done. But how do I look up the user by email? Would it be a common solution to have a secondary event stream, one for each email and have that be updated whenever a new user is created or a user changes it's email address? *Uniqueness of email* As I read uniqueness is expensive to be guaranteed. I thought about not creating the user object directly, but instead on registration, send an email to the given email address that contains a verification link. But this link does not only contain a nonce, but instead is an encrypted and signed JWT. Only when the user clicks that link, the user account is actually created. This does not totally solve the uniqueness constraint, but it at least ensures that not two different persons create an account with the same email. *Choosing persistenceId (and maybe also shardingId/entityId)* Here is some code I have so far, but it is not totally working (for example the akka.persistence.Update event is not properly propagated through the sharding). Also when using sharding this fails, because multiple instances of UserProcessor share the same `persistenceId`. But I did not now what to do, since when I include the user UUID in the `persistenceId`, which would make sense, then the `EmailView` does not get all events to build up the email lookup dictionary. TL;DR: this code is probably total nonsense. // UserProcessor.scala package io.airfocus.user import akka.actor.{Actor, ActorLogging, ActorRef} import akka.persistence.{PersistentActor} import de.choffmeister.auth.common.{PBKDF2, PasswordHasher, Plain} import io.airfocus.common.{RandomString, WithPassivation} import io.airfocus.model.User import io.airfocus.model.UserProcessor._ import scala.concurrent.duration._ class UserProcessor(userView: ActorRef, emailView: ActorRef) extends Actor with ActorLogging with PersistentActor with WithPassivation { private val passwordHasher = new PasswordHasher("pbkdf2", "hmac-sha1" :: "10000" :: "128" :: Nil, PBKDF2 :: Plain :: Nil) override def passivationTimeout: FiniteDuration = 5.seconds override def persistenceId: String = "users" override def receiveRecover: Receive = { case ev: CreateUser => } override def receiveCommand: Receive = withPassivation({ case cmd @ CreateUser(userId, email, password) => persistAsync(UserCreated(User(id = userId, emails = Map(email -> Some(RandomString.base32(32))), passwordHash = passwordHasher.hash(password)))) { ev => userView ! io.airfocus.model.UserView.Update(userId) emailView ! io.airfocus.model.EmailView.Update(email) sender() ! ev.user } }) override def preStart() = log.info("Starting {}", self.path) override def postStop() = log.info("Stopped {}", self.path) } // UserView.scala package io.airfocus.user import java.util.UUID import akka.actor.{Actor, ActorLogging} import akka.persistence.PersistentView import io.airfocus.common.WithPassivation import io.airfocus.model.User import io.airfocus.model.UserProcessor._ import io.airfocus.model.UserView._ import scala.concurrent.duration._ class UserView extends Actor with ActorLogging with PersistentView with WithPassivation { var users = Map.empty[UUID, User] override def passivationTimeout: FiniteDuration = 5.seconds override def persistenceId: String = "users" override def viewId: String = s"users-user-${self.path.name}" override def autoUpdate: Boolean = true override def autoUpdateInterval: FiniteDuration = 1.second override def receive: Receive = withPassivation({ case ev @ UserCreated(u) => println(ev) users = users + (u.id -> u) case GetUser(userId) => sender() ! users.get(userId) case Update(_) => self ! akka.persistence.Update(await = true) }) } // EmailView.scala package io.airfocus.user import java.util.UUID import akka.actor.{Actor, ActorLogging} import akka.persistence.PersistentView import io.airfocus.common.WithPassivation import io.airfocus.model.EmailView.{LookupEmail, Update} import io.airfocus.model.UserProcessor._ import scala.concurrent.duration._ class EmailView extends Actor with ActorLogging with PersistentView with WithPassivation { var users = Map.empty[String, UUID] override def passivationTimeout: FiniteDuration = 5.seconds override def persistenceId: String = "users" override def viewId: String = s"users-email-${self.path.name}" override def receive: Receive = withPassivation({ case UserCreated(u) => users = users ++ u.emails.map(_._1 -> u.id) case LookupEmail(email) => sender() ! users.get(email) case Update(_) => self ! akka.persistence.Update(await = true) }) } *Other stuff* There are still more questions in my mind the more I think about all this. I guess my use case is pretty common, so if anyone has any other ideas or links to that specific topic I would be happy. Thanks Christian -- >>>>>>>>>> Read the docs: http://akka.io/docs/ >>>>>>>>>> Check the FAQ: >>>>>>>>>> http://doc.akka.io/docs/akka/current/additional/faq.html >>>>>>>>>> Search the archives: https://groups.google.com/group/akka-user --- You received this message because you are subscribed to the Google Groups "Akka User List" group. To unsubscribe from this group and stop receiving emails from it, send an email to akka-user+unsubscr...@googlegroups.com. To post to this group, send email to akka-user@googlegroups.com. Visit this group at https://groups.google.com/group/akka-user. For more options, visit https://groups.google.com/d/optout.