Hi everyone, I would like to have some advices from the akka experts. These month, I working on a use case in which I fetched data from Kafka. I have a set of devices (which will grown in time) pushing their data to Kafka and I need to fetch them. I know that there is a library (akka-reactive-kafka) making the job but because I'm newbie in Akka, I need and want to fully understand Akka before to use a library using Akka.
I'm trying to search a way to scale out my application because the number of devices in the application scope will grown. Below the current architecture of my app : ActorSystem -> KafkaListener -> Data_Buffer -> Block_Buffer -> WriteToDb. Each "->" describe a relation "parent -> child" and in each actor, there are caches. So for examples, in the Data_Buffer actpr, I have a Map[Device, List[Sample]] for the device samples and so on and so on .... I know that by default, Akka is scaling but last days, I'm interesting to routers, more particulary to the hashing router. I wrote a POC to illustrate the idea that I have with using the hashing router : package com.bioserenity.test.hashingrouter import akka.actor._ import akka.routing.ConsistentHashingRouter.ConsistentHashable import akka.actor.Props import akka.routing.ConsistentHashingPool import akka.routing.ConsistentHashingRouter.ConsistentHashMapping import akka.routing.ConsistentHashingRouter.ConsistentHashableEnvelope import akka.testkit.{ TestActors, TestKit, ImplicitSender } import org.scalatest.WordSpecLike import org.scalatest.Matchers import org.scalatest.BeforeAndAfterAll object Cache { def props(actor : ActorRef) : Props = Props(new Cache(actor)) } class Cache(actor : ActorRef) extends Actor { def hashMapping: ConsistentHashMapping = { case Evict(key) => key } val cache: ActorRef = context.actorOf(ConsistentHashingPool(10, hashMapping = hashMapping). props(Props[SubCache]), name = "cache") def receive = { case m : Entry => cache ! ConsistentHashableEnvelope(message = m, hashKey = m.key) case m : Get => cache ! ConsistentHashableEnvelope(message = m, hashKey = m.key) case m : Evict => cache ! ConsistentHashableEnvelope(message = m, hashKey = m.key) case l : List[String] => actor ! l println("receive " + l.toString) } } class SubCache extends Actor { var l = scala.collection.mutable.ListBuffer[String]() def receive = { case Entry(key, value) => println(self.path + " receive new request " + key) l += value println(l) () case Get(key) => println("Ask data of " + key) sender() ! l.toList case Evict(key) => l.clear } } final case class Evict(key: String) final case class Get(key: String) extends ConsistentHashable { override def consistentHashKey: Any = key } final case class Entry(key: String, value: String) class TEST_HashingRouter extends TestKit(ActorSystem("foo")) with ImplicitSender with WordSpecLike with Matchers with BeforeAndAfterAll { def hashMapping: ConsistentHashMapping = { case Evict(key) => key } val cache: ActorRef = system.actorOf(Cache.props(testActor), name = "cache") "cache" must { "put values in the cache" in { cache ! Entry("hello", "HELLO") cache ! Entry("hello", "HELLO2") cache ! Entry("hi", "HI") cache ! Entry("hi", "HI3") cache ! Get("hello") expectMsg(List("HELLO", "HELLO2")) cache ! Get("hi") expectMsg(List("HI","HI3")) cache ! Evict("hi") cache ! Get("hi") expectMsg(List()) cache ! Entry("hI", "bar") expectNoMsg } } } *I want to a use a pool of actor and I want to have one actor by device and manage the lifecycle of my device in the time. Because I can configure the number of actor in hashing router pool, I can create first a pool of 1000 actor (so devices) which manage automatically each devices data. For me and the current knoweledeges of Akka, the second architecture with the hashing router should scale if I have an important number of devices or the current architecture that I had is enough to scale out my application even for an important number of devices.* Cheers, Alifirat Kilic.