This is an automated email from the ASF dual-hosted git repository. fanningpj pushed a commit to branch wip-chbatey-reliable-delivery in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit 311a02733d5abb4c005f58697858e695e32a7dd8 Author: Christopher Batey <[email protected]> AuthorDate: Tue Mar 3 14:27:00 2020 +0000 WIP: migrate kafka write side to use reliable delivery --- akka-sample-kafka-to-sharding-scala/build.sbt | 2 +- .../processor/src/main/resources/logback.xml | 30 ++-- .../main/scala/sample/sharding/kafka/Main.scala | 1 - .../scala/sample/sharding/kafka/UserEvents.scala | 91 +++++++---- .../sharding/kafka/UserEventsKafkaProcessor.scala | 179 ++++++++++++++++++--- .../sample/sharding/kafka/UserGrpcService.scala | 14 +- 6 files changed, 245 insertions(+), 72 deletions(-) diff --git a/akka-sample-kafka-to-sharding-scala/build.sbt b/akka-sample-kafka-to-sharding-scala/build.sbt index 10f8cbb..70d63f5 100644 --- a/akka-sample-kafka-to-sharding-scala/build.sbt +++ b/akka-sample-kafka-to-sharding-scala/build.sbt @@ -1,4 +1,4 @@ -val AkkaVersion = "2.6.3" +val AkkaVersion = "2.6.3+135-0a7adf56+20200301-1347" // TODO upgrade to 2.0.0 val AlpakkaKafkaVersion = "1.1.0" val AkkaManagementVersion = "1.0.5" diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/logback.xml b/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/logback.xml index 31d2e1d..32f5c5b 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/logback.xml +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/resources/logback.xml @@ -1,17 +1,15 @@ -<?xml version="1.0" encoding="UTF-8"?> +<?xml version="1.0" encoding="utf-8"?> <configuration> - - <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> - <encoder> - <pattern>[%date{ISO8601}] [%level] [%logger] [%thread] [%X{akkaSource}] - %msg%n</pattern> - </encoder> - </appender> - - <logger name="org.apache.kafka" level="WARN" /> - - <logger name="akka.cluster.sharding" level="DEBUG" /> - - <root level="INFO"> - <appender-ref ref="STDOUT"/> - </root> -</configuration> \ No newline at end of file + <appender name="STDOUT" + class="ch.qos.logback.core.ConsoleAppender"> + <encoder> + <pattern>[%date{ISO8601}] [%level] [%logger] [%thread] + [%X{akkaSource}] - %msg%n</pattern> + </encoder> + </appender> + <logger name="org.apache.kafka" level="WARN" /> + <logger name="akka.cluster.sharding" level="DEBUG" /> + <root level="DEBUG"> + <appender-ref ref="STDOUT" /> + </root> +</configuration> diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala index 96fce26..79d3943 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala @@ -45,7 +45,6 @@ object Main { .receiveMessage[MemberUp] { case MemberUp(member) if member.uniqueAddress == cluster.selfMember.uniqueAddress => ctx.log.info("Joined the cluster. Starting sharding and kafka processor") - UserEvents.init(ctx.system) val eventProcessor = ctx.spawn[Nothing](UserEventsKafkaProcessor(), "kafka-event-processor") ctx.watch(eventProcessor) Behaviors.same diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala index f093fff..d4a4b86 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala @@ -11,44 +11,81 @@ import akka.cluster.sharding.typed.Murmur2NoEnvelopeMessageExtractor import akka.cluster.sharding.typed.scaladsl.ClusterSharding import akka.cluster.sharding.typed.scaladsl.Entity import akka.cluster.sharding.typed.scaladsl.EntityTypeKey +import akka.actor.typed.delivery.ConsumerController +import akka.actor.typed.delivery.ConsumerController.Start +import akka.actor.typed.delivery.ConsumerController +import akka.cluster.sharding.typed.delivery.ShardingConsumerController +import akka.cluster.sharding.typed.ShardingEnvelope +import akka.actor.typed.delivery.ConsumerController.SequencedMessage +import akka.cluster.sharding.typed.Murmur2MessageExtractor +import akka.actor.typed.delivery.ConsumerController.Confirmed object UserEvents { - val TypeKey: EntityTypeKey[UserEvents.Message] = - EntityTypeKey[UserEvents.Message]("user-processing") + val TypeKey: EntityTypeKey[SequencedMessage[UserEvents.Message]] = + EntityTypeKey[SequencedMessage[UserEvents.Message]]("user-processing") sealed trait Message extends CborSerializable { def userId: String } sealed trait UserEvent extends Message - case class UserAction(userId: String, description: String, replyTo: ActorRef[Done]) extends UserEvent - case class UserPurchase(userId: String, product: String, quantity: Long, priceInPence: Long, replyTo: ActorRef[Done]) + case class UserAction(userId: String, description: String) extends UserEvent + case class UserPurchase(userId: String, product: String, quantity: Long, priceInPence: Long) extends UserEvent sealed trait UserQuery extends Message case class GetRunningTotal(userId: String, replyTo: ActorRef[RunningTotal]) extends UserQuery + final case class UserEventDelivery(message: Message, confirmTo: ActorRef[ConsumerController.Confirmed], seqNr: Long) + case class RunningTotal(totalPurchases: Long, amountSpent: Long) extends CborSerializable - def apply(): Behavior[Message] = running(RunningTotal(0, 0)) + def shardingInit(system: ActorSystem[_]): ActorRef[ShardingEnvelope[SequencedMessage[Message]]] = { + val processorConfig = ProcessorConfig(system.settings.config.getConfig("kafka-to-sharding-processor")) + val entity: Entity[SequencedMessage[Message], ShardingEnvelope[SequencedMessage[Message]]] = Entity[SequencedMessage[Message]](TypeKey)(_ => { + ShardingConsumerController(controller => UserEvents(controller)) + }) + val entityWithExtractor = + entity + .withAllocationStrategy(new ExternalShardAllocationStrategy(system, TypeKey.name)) + .withMessageExtractor(new Murmur2MessageExtractor[SequencedMessage[Message]](processorConfig.nrPartitions)) + .withSettings(ClusterShardingSettings(system)) + + ClusterSharding(system).init(entityWithExtractor) + } + + def apply(controller: ActorRef[ConsumerController.Start[Message]]): Behavior[UserEventDelivery] = { + Behaviors.setup { ctx => + val messageAdapter: ActorRef[ConsumerController.Delivery[Message]] = + ctx.messageAdapter(d => UserEventDelivery(d.msg, d.confirmTo, d.seqNr)) + controller ! Start(messageAdapter) + running(RunningTotal(0, 0)) + } + } - private def running(runningTotal: RunningTotal): Behavior[Message] = { + private def running(runningTotal: RunningTotal): Behavior[UserEventDelivery] = { Behaviors.setup { ctx => - Behaviors.receiveMessage[Message] { - case UserAction(_, desc, ack) => - ctx.log.info("user event {}", desc) - ack.tell(Done) - Behaviors.same - case UserPurchase(id, product, quantity, price, ack) => - ctx.log.info("user {} purchase {}, quantity {}, price {}", id, product, quantity, price) - ack.tell(Done) - running( - runningTotal.copy( - totalPurchases = runningTotal.totalPurchases + 1, - amountSpent = runningTotal.amountSpent + (quantity * price))) - case GetRunningTotal(_, replyTo) => - replyTo ! runningTotal - Behaviors.same + Behaviors.receiveMessage { + case UserEventDelivery(msg, confirmTo, seqNr) => + msg match { + case UserAction(_, desc) => + ctx.log.info("user event {}", desc) + confirmTo ! Confirmed(seqNr) + Behaviors.same + case UserPurchase(id, product, quantity, price) => + ctx.log.info("user {} purchase {}, quantity {}, price {}", id, product, quantity, price) + confirmTo ! Confirmed(seqNr) + running( + runningTotal.copy( + totalPurchases = runningTotal.totalPurchases + 1, + amountSpent = runningTotal.amountSpent + (quantity * price) + ) + ) + case GetRunningTotal(_, replyTo) => + replyTo ! runningTotal + confirmTo ! Confirmed(seqNr) + Behaviors.same + } } } } @@ -58,20 +95,20 @@ object UserEvents { * have keys that are strings */ class UserIdMessageExtractor(nrKafkaPartitions: Int) - extends Murmur2NoEnvelopeMessageExtractor[Message](nrKafkaPartitions) { - override def entityId(message: Message): String = message.userId + extends Murmur2NoEnvelopeMessageExtractor[SequencedMessage[Message]](nrKafkaPartitions) { + override def entityId(message: SequencedMessage[Message]): String = message.msg.userId } + /* def init(system: ActorSystem[_]): ActorRef[Message] = { val processorConfig = ProcessorConfig(system.settings.config.getConfig("kafka-to-sharding-processor")) ClusterSharding(system).init( Entity(TypeKey)(createBehavior = _ => UserEvents()) .withAllocationStrategy(new ExternalShardAllocationStrategy(system, TypeKey.name)) .withMessageExtractor(new UserIdMessageExtractor(processorConfig.nrPartitions)) - .withSettings(ClusterShardingSettings(system))) + .withSettings(ClusterShardingSettings(system)) + ) } + */ - def querySide(system: ActorSystem[_]): ActorRef[UserQuery] = { - init(system).narrow[UserQuery] - } } diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala index 8c69c83..8f15bb2 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala @@ -14,20 +14,35 @@ import akka.kafka.scaladsl.Consumer import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source import akka.util.Timeout +import akka.cluster.sharding.typed.delivery._ import org.apache.kafka.clients.consumer.ConsumerConfig import org.apache.kafka.clients.consumer.ConsumerRecord import org.apache.kafka.common.serialization.ByteArrayDeserializer import org.apache.kafka.common.serialization.StringDeserializer import sample.sharding.kafka.serialization.UserPurchaseProto -import scala.concurrent.ExecutionContextExecutor -import scala.concurrent.Future +import scala.concurrent.{ExecutionContextExecutor, Future, Promise} import scala.concurrent.duration._ import scala.util.Try +import akka.cluster.sharding.typed.ShardingMessageExtractor +import akka.stream.stage.{GraphStage, GraphStageLogic, GraphStageLogicWithLogging, GraphStageWithMaterializedValue, InHandler, OutHandler} +import akka.stream.FlowShape +import akka.cluster.sharding.typed.ShardingEnvelope +import akka.stream.Inlet +import akka.stream.Outlet +import akka.stream.Attributes +import akka.actor.ActorRef +import akka.cluster.sharding.typed.delivery.ShardingProducerController.RequestNext +import akka.stream.SinkShape +import akka.NotUsed +import akka.stream.scaladsl.Keep + +import scala.util.control.NonFatal object UserEventsKafkaProcessor { sealed trait Command + private case class KafkaConsumerStopped(reason: Try[Any]) extends Command def apply(): Behavior[Nothing] = { @@ -40,7 +55,14 @@ object UserEventsKafkaProcessor { // TODO config val timeout = Timeout(3.seconds) val rebalancerRef = ctx.spawn(TopicListener(UserEvents.TypeKey), "rebalancerRef") - val shardRegion = UserEvents.init(ctx.system) + + // FIXME + val shardRegion = UserEvents.shardingInit(ctx.system) + + val shardingProducerController = ctx.spawn( + ShardingProducerController[UserEvents.Message]("producer-id", shardRegion, None), + s"shardingController" + ) val consumerSettings = ConsumerSettings(ctx.system.toClassic, new StringDeserializer, new ByteArrayDeserializer) .withBootstrapServers(processorSettings.bootstrapServers) @@ -53,32 +75,27 @@ object UserEventsKafkaProcessor { val kafkaConsumer: Source[ConsumerRecord[String, Array[Byte]], Consumer.Control] = Consumer.plainSource(consumerSettings, subscription) - // TODO use committable source and reliable delivery (once released)? - val stream: Future[Done] = kafkaConsumer + val stream = kafkaConsumer .log("kafka-consumer") .filter(_.key() != null) // no entity id - .mapAsync(20) { record => + .map { record => // alternatively the user id could be in the message rather than use the kafka key ctx.log.info(s"entityId->partition ${record.key()}->${record.partition()}") ctx.log.info("Forwarding message for entity {} to cluster sharding", record.key()) - // idempotency? - retry( - () => - shardRegion.ask[Done](replyTo => { - val purchaseProto = UserPurchaseProto.parseFrom(record.value()) - UserEvents.UserPurchase( - purchaseProto.userId, - purchaseProto.product, - purchaseProto.quantity, - purchaseProto.price, - replyTo) - })(timeout, ctx.system.scheduler), - 3, - 1.second) + val purchaseProto = UserPurchaseProto.parseFrom(record.value()) + ShardingEnvelope[UserEvents.Message](purchaseProto.userId, + UserEvents.UserPurchase( + purchaseProto.userId, + purchaseProto.product, + purchaseProto.quantity, + purchaseProto.price + )) } - .runWith(Sink.ignore) + .toMat(ShardingToReliableDelivery.sink(shardingProducerController))(Keep.right) + .run() stream.onComplete { result => + println("Stream finished " + result) ctx.self ! KafkaConsumerStopped(result) } Behaviors.receiveMessage[Command] { @@ -89,5 +106,125 @@ object UserEventsKafkaProcessor { } .narrow } +} + +object ShardingToReliableDelivery { + def sink[M]( + producerController: akka.actor.typed.ActorRef[ShardingProducerController.Command[M]] + ): Sink[ShardingEnvelope[M], Future[Done]] = { + Sink.fromGraph(new ShardingToReliableDeliveryStage(producerController)) + } +} + +/** + * Forwards messages to sharded entities via reliable delivery. + * + * Will keep requesting messages until there is a max number of messages buffered or + * a single entity has too many buffered messages + * + * Should one slow entity be allowed to slow everything down? Option to start dropping for a given entity? + */ +class ShardingToReliableDeliveryStage[M]( + producerController: akka.actor.typed.ActorRef[ShardingProducerController.Command[M]] + ) extends GraphStageWithMaterializedValue[SinkShape[ShardingEnvelope[M]], Future[Done]] { + + val in = Inlet[ShardingEnvelope[M]]("ShardingToReliableDelivery.in") + + val shape = SinkShape.of(in) + + val MaxBufferedMessages = 1000 + val MaxBufferedForSingleEntity = 100 + + + override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, Future[Done]) = { + val promise = Promise[Done]() + val logic = new GraphStageLogicWithLogging(shape) { + var currentRequest: Option[RequestNext[M]] = None + + private def receive(message: (ActorRef, Any)): Unit = message match { + case (_, rn: RequestNext[M]) => + try { + currentRequest = Some(rn.asInstanceOf[RequestNext[M]]) + pullIfShardingDemand() + } catch { + case NonFatal(t) => + failStage(t) + promise.tryFailure(t) + } + case msg => + log.warning("unexpected message to stage actor {}", msg) + } + override def preStart(): Unit = { + import akka.actor.typed.scaladsl.adapter._ + val stageActor: ActorRef = getStageActor(receive).ref + val asTyped = stageActor.toTyped[ShardingProducerController.RequestNext[M]] + producerController ! ShardingProducerController.Start(asTyped) + pull(in) + } + + override def postStop(): Unit = { + super.postStop() + promise.trySuccess(Done) + } + + /** + * Enforces that max total buffered messages or max for a signle + * entity. + */ + private def reliableDeliveryFull(): Boolean = { + currentRequest.exists(request => { + val entityTooMany = request.bufferedForEntitiesWithoutDemand.find(_._2 >= MaxBufferedForSingleEntity) + if (entityTooMany.isDefined) { + log.info("Entity {} has max buffered messages. Not producing demand.", entityTooMany.get._1) + true + } else if (request.bufferedForEntitiesWithoutDemand.values.sum > MaxBufferedMessages) { + log.info("Max number of messages in flight to reliable delivery") + true + } else { + false + } + }) + } + + /** + * Pulls a request has been received from reliable delivery, and that request + * has not hit max buffers. + */ + def pullIfShardingDemand(): Unit = { + if (currentRequest.isDefined && !reliableDeliveryFull() && !hasBeenPulled(in)) { + pull(in) + } + } + + setHandler( + in, + new InHandler { + override def onUpstreamFinish(): Unit = { + log.info("Upstream finished") + super.onUpstreamFinish() + promise.trySuccess(Done) + } + + override def onUpstreamFailure(ex: Throwable): Unit = { + log.error("Upstream failed", ex) + super.onUpstreamFailure(ex) + promise.tryFailure(ex) + } + + override def onPush(): Unit = { + log.info("onPush") + // there should be demand from sharding and downstream, otherwise there would not have been a pull + val next = grab(in) + currentRequest match { + case None => throw new IllegalStateException("onPush called when no demand from sharding") + case Some(request) => request.sendNextTo ! next + } + pullIfShardingDemand() + } + } + ) + } + (logic, promise.future) + } } diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala index adfaf67..759f0aa 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala @@ -2,6 +2,7 @@ package sample.sharding.kafka import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem +import akka.actor.typed.delivery.ConsumerController.SequencedMessage import akka.actor.typed.scaladsl.AskPattern._ import akka.util.Timeout import sample.sharding.kafka.UserEvents.GetRunningTotal @@ -9,6 +10,7 @@ import sample.sharding.kafka.UserEvents.RunningTotal import scala.concurrent.Future import scala.concurrent.duration._ +import akka.cluster.sharding.typed.ShardingEnvelope class UserGrpcService(system: ActorSystem[_]) extends UserService { @@ -16,11 +18,11 @@ class UserGrpcService(system: ActorSystem[_]) extends UserService { implicit val sched = system.scheduler implicit val ec = system.executionContext - private val shardRegion: ActorRef[UserEvents.UserQuery] = UserEvents.querySide(system) - - override def userStats(in: UserStatsRequest): Future[UserStatsResponse] = { - shardRegion - .ask[RunningTotal](replyTo => GetRunningTotal(in.id, replyTo)) - .map(runningTotal => UserStatsResponse(in.id, runningTotal.totalPurchases, runningTotal.amountSpent)) + override def userStats(in: UserStatsRequest): Future[UserStatsResponse] = { + // shardRegion + // .ask[RunningTotal](replyTo => GetRunningTotal(in.id, replyTo)) + // .map(runningTotal => UserStatsResponse(in.id, runningTotal.totalPurchases, runningTotal.amountSpent)) + ??? } + } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
