This is an automated email from the ASF dual-hosted git repository. fanningpj pushed a commit to branch minor-kafka-updaes in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit 131b8bb56e98fc2ce66e47d5410c8f24d501a8a4 Author: Christopher Batey <[email protected]> AuthorDate: Mon Mar 16 15:36:26 2020 +0000 WIP --- ...ble.Vector.nullSlotAndCopy(Object[][],_int).bgv | Bin 0 -> 18702 bytes ...ble.Vector.nullSlotAndCopy(Object[][],_int).cfg | 5 +++ ...ctorPointer.gotoPosWritable1(int,_int,_int).bgv | Bin 0 -> 9978 bytes ...ctorPointer.gotoPosWritable1(int,_int,_int).cfg | 5 +++ ...ollection.immutable.Vector.appended(Object).bgv | Bin 0 -> 251627 bytes ...ollection.immutable.Vector.appended(Object).cfg | 5 +++ ...ble.Vector.nullSlotAndCopy(Object[][],_int).bgv | Bin 0 -> 18700 bytes ...ble.Vector.nullSlotAndCopy(Object[][],_int).cfg | 5 +++ .../processor/src/main/resources/application.conf | 5 ++- .../main/scala/sample/sharding/kafka/Main.scala | 14 ++++--- .../sample/sharding/kafka/ProcessorSettings.scala | 2 +- .../scala/sample/sharding/kafka/UserEvents.scala | 45 ++++++++------------- .../sharding/kafka/UserEventsKafkaProcessor.scala | 8 ++-- .../sample/sharding/kafka/UserGrpcService.scala | 4 +- 14 files changed, 53 insertions(+), 45 deletions(-) diff --git a/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).bgv b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).bgv new file mode 100644 index 0000000..317ce7a Binary files /dev/null and b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).bgv differ diff --git a/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).cfg b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).cfg new file mode 100644 index 0000000..9a4840a --- /dev/null +++ b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).cfg @@ -0,0 +1,5 @@ +begin_compilation + name " HotSpotCompilation-36914[scala.collection.immutable.Vector.nullSlotAndCopy(Object[][], int)]" + method "HotSpotCompilation-36914[scala.collection.immutable.Vector.nullSlotAndCopy(Object[][], int)]" + date 1583857594397 +end_compilation diff --git a/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int)/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int).bgv b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int)/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int).bgv new file mode 100644 index 0000000..190c24c Binary files /dev/null and b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int)/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int).bgv differ diff --git a/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int)/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int).cfg b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int)/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int).cfg new file mode 100644 index 0000000..b8126a4 --- /dev/null +++ b/akka-sample-cqrs-scala/dumps/1583857594249/graal_diagnostics_77321/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int)/scala.collection.immutable.VectorPointer.gotoPosWritable1(int,_int,_int).cfg @@ -0,0 +1,5 @@ +begin_compilation + name " HotSpotCompilation-37214[scala.collection.immutable.VectorPointer.gotoPosWritable1(int, int, int)]" + method "HotSpotCompilation-37214[scala.collection.immutable.VectorPointer.gotoPosWritable1(int, int, int)]" + date 1583859506333 +end_compilation diff --git a/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.appended(Object)/scala.collection.immutable.Vector.appended(Object).bgv b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.appended(Object)/scala.collection.immutable.Vector.appended(Object).bgv new file mode 100644 index 0000000..0bc4add Binary files /dev/null and b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.appended(Object)/scala.collection.immutable.Vector.appended(Object).bgv differ diff --git a/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.appended(Object)/scala.collection.immutable.Vector.appended(Object).cfg b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.appended(Object)/scala.collection.immutable.Vector.appended(Object).cfg new file mode 100644 index 0000000..67a37d2 --- /dev/null +++ b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.appended(Object)/scala.collection.immutable.Vector.appended(Object).cfg @@ -0,0 +1,5 @@ +begin_compilation + name " HotSpotCompilation-35789[scala.collection.immutable.Vector.appended(Object)]" + method "HotSpotCompilation-35789[scala.collection.immutable.Vector.appended(Object)]" + date 1583857658632 +end_compilation diff --git a/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).bgv b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).bgv new file mode 100644 index 0000000..259b2be Binary files /dev/null and b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).bgv differ diff --git a/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).cfg b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).cfg new file mode 100644 index 0000000..9c7e37a --- /dev/null +++ b/akka-sample-cqrs-scala/dumps/1583857594326/graal_diagnostics_75521/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int)/scala.collection.immutable.Vector.nullSlotAndCopy(Object[][],_int).cfg @@ -0,0 +1,5 @@ +begin_compilation + name " HotSpotCompilation-35679[scala.collection.immutable.Vector.nullSlotAndCopy(Object[][], int)]" + method "HotSpotCompilation-35679[scala.collection.immutable.Vector.nullSlotAndCopy(Object[][], int)]" + date 1583857594379 +end_compilation diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf b/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf index 306e957..6f5594d 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/application.conf @@ -26,8 +26,9 @@ akka { cluster { seed-nodes = [ - "akka://[email protected]:2551", - "akka://[email protected]:2552"] + "akka://[email protected]:2551" + "akka://[email protected]:2552" + ] sharding { retry-interval = 200ms diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala index 301bd8a..283c9d1 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala @@ -9,14 +9,13 @@ import akka.http.scaladsl.model.{HttpRequest, HttpResponse} import akka.management.scaladsl.AkkaManagement import akka.stream.Materializer import com.typesafe.config.{Config, ConfigFactory} -import sample.sharding.kafka.UserEvents.Message import scala.concurrent.Future import scala.util.{Failure, Success} sealed trait Command case object NodeMemberUp extends Command -final case class ShardingStarted(region: ActorRef[Message]) extends Command +final case class ShardingStarted(region: ActorRef[UserEvents.Command]) extends Command object Main { def main(args: Array[String]): Unit = { @@ -24,6 +23,9 @@ object Main { def isInt(s: String): Boolean = s.matches("""\d+""") args.toList match { + case single :: Nil if isInt(single) => + val nr = single.toInt + init(2550 + nr, 8550 + nr, 8080 + nr) case portString :: managementPort :: frontEndPort :: Nil if isInt(portString) && isInt(managementPort) && isInt(frontEndPort) => init(portString.toInt, managementPort.toInt, frontEndPort.toInt) @@ -47,14 +49,14 @@ object Main { starting(ctx, None, joinedCluster = false, settings) }, "KafkaToSharding", config(remotingPort, akkaManagementPort)) - def start(ctx: ActorContext[Command], region: ActorRef[Message], settings: ProcessorSettings): Behavior[Command] = { - ctx.log.info("Sharding started and joine cluster. Starting event processor") + def start(ctx: ActorContext[Command], region: ActorRef[UserEvents.Command], settings: ProcessorSettings): Behavior[Command] = { + ctx.log.info("Sharding started and joined cluster. Starting event processor") val eventProcessor = ctx.spawn[Nothing](UserEventsKafkaProcessor(region, settings), "kafka-event-processor") val binding: Future[Http.ServerBinding] = startGrpc(ctx.system, frontEndPort, region) running(binding, eventProcessor) } - def starting(ctx: ActorContext[Command], sharding: Option[ActorRef[Message]], joinedCluster: Boolean, settings: ProcessorSettings): Behavior[Command] = Behaviors + def starting(ctx: ActorContext[Command], sharding: Option[ActorRef[UserEvents.Command]], joinedCluster: Boolean, settings: ProcessorSettings): Behavior[Command] = Behaviors .receive[Command] { case (ctx, ShardingStarted(region)) if joinedCluster => ctx.log.info("Sharding has started") @@ -79,7 +81,7 @@ object Main { } - def startGrpc(system: ActorSystem[_], frontEndPort: Int, region: ActorRef[Message]): Future[Http.ServerBinding] = { + def startGrpc(system: ActorSystem[_], frontEndPort: Int, region: ActorRef[UserEvents.Command]): Future[Http.ServerBinding] = { val mat = Materializer.createMaterializer(system.toClassic) val service: HttpRequest => Future[HttpResponse] = UserServiceHandler(new UserGrpcService(system, region))(mat, system.toClassic) diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorSettings.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorSettings.scala index af569c1..a47bc75 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorSettings.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/ProcessorSettings.scala @@ -36,5 +36,5 @@ final class ProcessorSettings(val bootstrapServers: String, val topics: List[Str * By using the same consumer group id as our entity type key name we can setup multiple consumer groups and connect * each with a different sharded entity coordinator. */ - val entityTypeKey: EntityTypeKey[UserEvents.Message] = EntityTypeKey(groupId) + val entityTypeKey: EntityTypeKey[UserEvents.Command] = EntityTypeKey(groupId) } diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala index 9c94230..2865705 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala @@ -4,7 +4,6 @@ import akka.Done import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.{ActorRef, ActorSystem, Behavior} import akka.cluster.sharding.external.ExternalShardAllocationStrategy -import akka.cluster.sharding.typed.ClusterShardingSettings import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity} import akka.kafka.cluster.sharding.KafkaClusterSharding @@ -12,39 +11,36 @@ import scala.concurrent.Future import scala.concurrent.duration._ object UserEvents { - def init(system: ActorSystem[_], settings: ProcessorSettings): Future[ActorRef[Message]] = { + def init(system: ActorSystem[_], settings: ProcessorSettings): Future[ActorRef[Command]] = { import system.executionContext - messageExtractor(settings).map(messageExtractor => { + KafkaClusterSharding(settings.system).messageExtractorNoEnvelope( + timeout = 10.seconds, + topic = settings.topics.head, + entityIdExtractor = (msg: Command) => msg.userId, + settings = settings.kafkaConsumerSettings() + ).map(messageExtractor => { system.log.info("Message extractor created. Initializing sharding") ClusterSharding(system).init( Entity(settings.entityTypeKey)(createBehavior = _ => UserEvents()) .withAllocationStrategy(new ExternalShardAllocationStrategy(system, settings.entityTypeKey.name)) - .withMessageExtractor(messageExtractor) - .withSettings(ClusterShardingSettings(system))) + .withMessageExtractor(messageExtractor)) }) } - sealed trait Message extends CborSerializable { + sealed trait Command extends CborSerializable { def userId: String } - sealed trait UserEvent extends Message - case class UserAction(userId: String, description: String, replyTo: ActorRef[Done]) extends UserEvent - case class UserPurchase(userId: String, product: String, quantity: Long, priceInPence: Long, replyTo: ActorRef[Done]) - extends UserEvent + final case class UserPurchase(userId: String, product: String, quantity: Long, priceInPence: Long, replyTo: ActorRef[Done]) extends Command + final case class GetRunningTotal(userId: String, replyTo: ActorRef[RunningTotal]) extends Command - sealed trait UserQuery extends Message - case class GetRunningTotal(userId: String, replyTo: ActorRef[RunningTotal]) extends UserQuery - case class RunningTotal(totalPurchases: Long, amountSpent: Long) extends CborSerializable + // state + final case class RunningTotal(totalPurchases: Long, amountSpent: Long) extends CborSerializable - def apply(): Behavior[Message] = running(RunningTotal(0, 0)) + def apply(): Behavior[Command] = running(RunningTotal(0, 0)) - private def running(runningTotal: RunningTotal): Behavior[Message] = { + private def running(runningTotal: RunningTotal): Behavior[Command] = { Behaviors.setup { ctx => - Behaviors.receiveMessage[Message] { - case UserAction(_, desc, ack) => - ctx.log.info("user event {}", desc) - ack.tell(Done) - Behaviors.same + Behaviors.receiveMessage[Command] { case UserPurchase(id, product, quantity, price, ack) => ctx.log.info("user {} purchase {}, quantity {}, price {}", id, product, quantity, price) ack.tell(Done) @@ -59,13 +55,4 @@ object UserEvents { } } } - - private def messageExtractor(settings: ProcessorSettings): Future[KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor[Message]] = { - KafkaClusterSharding(settings.system).messageExtractorNoEnvelope( - timeout = 10.seconds, - topic = settings.topics.head, - entityIdExtractor = (msg: Message) => msg.userId, - settings = settings.kafkaConsumerSettings() - ) - } } diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala index 39c1c5b..9ad8305 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala @@ -10,7 +10,6 @@ import akka.kafka.cluster.sharding.KafkaClusterSharding import akka.kafka.scaladsl.{Committer, Consumer} import akka.kafka.{CommitterSettings, Subscriptions} import akka.pattern.retry -import sample.sharding.kafka.UserEvents.Message import sample.sharding.kafka.serialization.UserPurchaseProto import scala.concurrent.duration._ @@ -23,22 +22,21 @@ object UserEventsKafkaProcessor { private case class KafkaConsumerStopped(reason: Try[Any]) extends Command - def apply(shardRegion: ActorRef[Message], processorSettings: ProcessorSettings): Behavior[Nothing] = { + def apply(shardRegion: ActorRef[UserEvents.Command], processorSettings: ProcessorSettings): Behavior[Nothing] = { Behaviors .setup[Command] { ctx => implicit val classic: ActorSystem = ctx.system.toClassic implicit val ec: ExecutionContextExecutor = ctx.executionContext implicit val scheduler: Scheduler = classic.scheduler - val rebalanceListener = KafkaClusterSharding(classic).rebalanceListener(ctx.system, processorSettings.entityTypeKey) + val rebalanceListener = KafkaClusterSharding(classic).rebalanceListener(processorSettings.entityTypeKey) val subscription = Subscriptions .topics(processorSettings.topics: _*) - // convert rebalance listener to classic ActorRef .withRebalanceListener(rebalanceListener.toClassic) val stream: Future[Done] = Consumer.sourceWithOffsetContext(processorSettings.kafkaConsumerSettings(), subscription) - // MapAsync and Retries will be replaced by reliable delivery in the next 2.6 version + // MapAsync and Retries can be replaced by reliable delivery .mapAsync(20) { record => ctx.log.info(s"user id consumed kafka partition ${record.key()}->${record.partition()}") retry(() => diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala index ebce25d..2e947c6 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala @@ -3,12 +3,12 @@ package sample.sharding.kafka import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.{ActorRef, ActorSystem, Scheduler} import akka.util.Timeout -import sample.sharding.kafka.UserEvents.{GetRunningTotal, Message, RunningTotal} +import sample.sharding.kafka.UserEvents.{GetRunningTotal, Command, RunningTotal} import scala.concurrent.duration._ import scala.concurrent.{ExecutionContextExecutor, Future} -class UserGrpcService(system: ActorSystem[_], shardRegion: ActorRef[Message]) extends UserService { +class UserGrpcService(system: ActorSystem[_], shardRegion: ActorRef[Command]) extends UserService { implicit val timeout: Timeout = Timeout(5.seconds) implicit val sched: Scheduler = system.scheduler --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
