This is an automated email from the ASF dual-hosted git repository. fanningpj pushed a commit to branch revert-44-wip/orcart-example in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit 818193fcc81c6a58bb279fde54f31734a1a58d5f Author: Johan Andrén <[email protected]> AuthorDate: Fri Jun 15 09:03:00 2018 +0200 Revert "alternate ShoppingCart based on ORMultiMap (#44)" This reverts commit 0744270dd525b0a41ce11cc6a8446fc750d18495. --- .../sample/distributeddata/ShoppingORCart.scala | 143 --------------------- .../distributeddata/ShoppingORCartSpec.scala | 116 ----------------- 2 files changed, 259 deletions(-) diff --git a/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingORCart.scala b/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingORCart.scala deleted file mode 100644 index aefbf4e..0000000 --- a/akka-sample-distributed-data-scala/src/main/scala/sample/distributeddata/ShoppingORCart.scala +++ /dev/null @@ -1,143 +0,0 @@ -package sample.distributeddata - -import java.util.UUID - -import scala.concurrent.duration._ -import akka.actor.Actor -import akka.actor.ActorRef -import akka.actor.Props -import akka.cluster.Cluster -import akka.cluster.ddata.{DistributedData, ORMultiMap, ORMultiMapKey} - -import scala.collection.immutable.HashSet - -object ShoppingORCart { - import akka.cluster.ddata.Replicator._ - - def props(userId: String): Props = Props(new ShoppingORCart(userId)) - - case object GetCart - final case class ChangeItemQuantity(item: LineItem) - final case class RemoveItem(productId: String) - - final case class Cart(items: Set[LineItem]) - final case class LineItem(productId: String, title: String, quantity: Int) - final case class SingleLineItem(productId: String, title: String, uid: String) - - //#read-write-majority-orcart - private val timeout = 3.seconds - private val readMajority = ReadMajority(timeout) - private val writeMajority = WriteMajority(timeout) - //#read-write-majority-orcart - - def getUniqueId(implicit cluster: Cluster): String = { - cluster.selfUniqueAddress.toString + UUID.randomUUID().toString - } - -} - -class ShoppingORCart(userId: String) extends Actor { - import ShoppingORCart._ - import akka.cluster.ddata.Replicator._ - - val replicator = DistributedData(context.system).replicator - implicit val cluster = Cluster(context.system) - - val DataKey = ORMultiMapKey[String, SingleLineItem]("cart-" + userId) - - def receive = receiveGetCart - .orElse[Any, Unit](receiveAddItem) - .orElse[Any, Unit](receiveRemoveItem) - .orElse[Any, Unit](receiveOther) - - //#get-orcart - def receiveGetCart: Receive = { - case GetCart ⇒ - replicator ! Get(DataKey, readMajority, Some(sender())) - - case g @ GetSuccess(DataKey, Some(replyTo: ActorRef)) ⇒ - val data = g.get(DataKey) - val entrySet = data.entries.groupBy(_._1).map { - entry ⇒ LineItem(entry._2.values.head.head.productId, entry._2.values.head.head.title, entry._2.values.head.size) - }.toSet - val cart = Cart(entrySet) - replyTo ! cart - - case NotFound(DataKey, Some(replyTo: ActorRef)) ⇒ - replyTo ! Cart(Set.empty) - - case GetFailure(DataKey, Some(replyTo: ActorRef)) ⇒ - // ReadMajority failure, try again with local read - replicator ! Get(DataKey, ReadLocal, Some(replyTo)) - } - //#get-orcart - - //#add-item-orcart - def receiveAddItem: Receive = { - case cmd @ ChangeItemQuantity(item) ⇒ - val update = Update(DataKey, ORMultiMap.emptyWithValueDeltas[String, SingleLineItem], writeMajority, Some(cmd)) { - cart ⇒ updateCart(cart, item) - } - replicator ! update - } - //#add-item-orcart - - def updateCart(data: ORMultiMap[String, SingleLineItem], item: LineItem): ORMultiMap[String, SingleLineItem] = - data.get(item.productId) match { - case Some(entries) ⇒ - val existingQuantity = entries.size - if (existingQuantity == item.quantity) { - data - } else if (existingQuantity < item.quantity) { - var newEntries = HashSet[SingleLineItem]() - (1 to (item.quantity - existingQuantity)).foreach { _ ⇒ - newEntries = newEntries + SingleLineItem(item.productId, item.title, getUniqueId) - } - val ops = newEntries.foldLeft(data) { case (d, item) ⇒ d.addBinding(item.productId, item) } - ops - } else { - val existingItems = entries.toVector - val ops = (1 to (existingQuantity - item.quantity)).foldLeft(data) { - case (d, index) ⇒ - d.removeBinding(item.productId, existingItems(index - 1)) - } - ops - } - case None ⇒ - var items: Set[SingleLineItem] = new HashSet[SingleLineItem]() - (1 to item.quantity).foreach { _ ⇒ - items = items + SingleLineItem(item.productId, item.title, getUniqueId) - } - data + (item.productId → items) - } - - //#remove-item-orcart - def receiveRemoveItem: Receive = { - case cmd @ RemoveItem(productId) ⇒ - // Try to fetch latest from a majority of nodes first, since ORMap - // remove must have seen the item to be able to remove it. - replicator ! Get(DataKey, readMajority, Some(cmd)) - - case GetSuccess(DataKey, Some(RemoveItem(productId))) ⇒ - replicator ! Update(DataKey, ORMultiMap.emptyWithValueDeltas[String, SingleLineItem], writeMajority, None) { - _ - productId // be careful, use ORMultiMap.emptyWithValueDeltas for safety, LWWMap and vanilla ORMultiMap can result in merge anomaly - } - - case GetFailure(DataKey, Some(RemoveItem(productId))) ⇒ - // ReadMajority failed, fall back to best effort local value - replicator ! Update(DataKey, ORMultiMap.emptyWithValueDeltas[String, SingleLineItem], writeMajority, None) { - _ - productId - } - - case NotFound(DataKey, Some(RemoveItem(productId))) ⇒ - // nothing to remove - } - //#remove-item-orcart - - def receiveOther: Receive = { - case _: UpdateSuccess[_] | _: UpdateTimeout[_] ⇒ - // UpdateTimeout, will eventually be replicated - case e: UpdateFailure[_] ⇒ throw new IllegalStateException("Unexpected failure: " + e) - } - -} diff --git a/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ShoppingORCartSpec.scala b/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ShoppingORCartSpec.scala deleted file mode 100644 index 3a7ca2e..0000000 --- a/akka-sample-distributed-data-scala/src/multi-jvm/scala/sample/distributeddata/ShoppingORCartSpec.scala +++ /dev/null @@ -1,116 +0,0 @@ -package sample.distributeddata - -import scala.concurrent.duration._ -import akka.cluster.Cluster -import akka.cluster.ddata.DistributedData -import akka.cluster.ddata.Replicator.GetReplicaCount -import akka.cluster.ddata.Replicator.ReplicaCount -import akka.remote.testconductor.RoleName -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec -import akka.testkit._ -import com.typesafe.config.ConfigFactory - -object ShoppingORCartSpec extends MultiNodeConfig { - val node1 = role("node-1") - val node2 = role("node-2") - val node3 = role("node-3") - - commonConfig(ConfigFactory.parseString(""" - akka.loglevel = INFO - akka.actor.provider = "cluster" - akka.log-dead-letters-during-shutdown = off - """)) - -} - -class ShoppingORCartSpecMultiJvmNode1 extends ShoppingORCartSpec -class ShoppingORCartSpecMultiJvmNode2 extends ShoppingORCartSpec -class ShoppingORCartSpecMultiJvmNode3 extends ShoppingORCartSpec - -class ShoppingORCartSpec extends MultiNodeSpec(ShoppingORCartSpec) with STMultiNodeSpec with ImplicitSender { - import ShoppingORCartSpec._ - import ShoppingORCart._ - - override def initialParticipants = roles.size - - val cluster = Cluster(system) - val shoppingORCart = system.actorOf(ShoppingORCart.props("user-1")) - - def join(from: RoleName, to: RoleName): Unit = { - runOn(from) { - cluster join node(to).address - } - enterBarrier(from.name + "-joined") - } - - "Demo of a replicated shopping cart" must { - "join cluster" in within(20.seconds) { - join(node1, node1) - join(node2, node1) - join(node3, node1) - - awaitAssert { - DistributedData(system).replicator ! GetReplicaCount - expectMsg(ReplicaCount(roles.size)) - } - enterBarrier("after-1") - } - - "handle updates directly after start" in within(15.seconds) { - runOn(node2) { - shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("1", "Apples", quantity = 2)) - shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("2", "Oranges", quantity = 3)) - } - enterBarrier("updates-done") - - awaitAssert { - shoppingORCart ! ShoppingORCart.GetCart - val cart = expectMsgType[Cart] - cart.items should be(Set(LineItem("1", "Apples", quantity = 2), LineItem("2", "Oranges", quantity = 3))) - } - - enterBarrier("after-2") - } - - "handle updates from different nodes" in within(5.seconds) { - runOn(node2) { - shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("1", "Apples", quantity = 5)) - shoppingORCart ! ShoppingORCart.RemoveItem("2") - } - runOn(node3) { - shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("3", "Bananas", quantity = 4)) - } - enterBarrier("updates-done") - - awaitAssert { - shoppingORCart ! ShoppingORCart.GetCart - val cart = expectMsgType[Cart] - cart.items should be(Set(LineItem("1", "Apples", quantity = 5), LineItem("3", "Bananas", quantity = 4))) - } - - enterBarrier("after-3") - } - - "handle more updates from different nodes" in within(5.seconds) { - runOn(node2) { - shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("1", "Apples", quantity = 2)) - } - runOn(node3) { - shoppingORCart ! ShoppingORCart.ChangeItemQuantity(LineItem("3", "Bananas", quantity = 6)) - } - enterBarrier("updates-done") - - awaitAssert { - shoppingORCart ! ShoppingORCart.GetCart - val cart = expectMsgType[Cart] - cart.items should be(Set(LineItem("1", "Apples", quantity = 2), LineItem("3", "Bananas", quantity = 6))) - } - - enterBarrier("after-3") - } - - } - -} - --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
