This is an automated email from the ASF dual-hosted git repository. fanningpj pushed a commit to branch wip-seglo-kafka-sharding in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit f53f3b0a0adbc72aec268fc456f741676afa45f9 Author: Sean Glover <[email protected]> AuthorDate: Tue Feb 18 17:15:27 2020 -0500 RebalanceListener --- .../scala/akka/kafka/KafkaClusterSharding.scala | 131 ++++++++++++++++----- .../main/scala/sample/sharding/kafka/Main.scala | 12 +- .../sample/sharding/kafka/TopicListener.scala | 54 --------- .../scala/sample/sharding/kafka/UserEvents.scala | 23 ++-- .../sharding/kafka/UserEventsKafkaProcessor.scala | 11 +- .../sample/sharding/kafka/UserGrpcService.scala | 4 +- 6 files changed, 127 insertions(+), 108 deletions(-) diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala index 5562c42..5bbd885 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala @@ -2,34 +2,64 @@ package akka.kafka import java.util.concurrent.atomic.AtomicInteger +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors import akka.actor.{ActorSystem, ExtendedActorSystem} +import akka.cluster.sharding.external.ExternalShardAllocation +import akka.cluster.sharding.typed.scaladsl.EntityTypeKey import akka.cluster.sharding.typed.{ShardingEnvelope, ShardingMessageExtractor} +import akka.cluster.typed.Cluster import akka.kafka.scaladsl.MetadataClient import akka.util.Timeout._ import org.apache.kafka.common.utils.Utils -import scala.concurrent.{ExecutionContextExecutor, Future} import scala.concurrent.duration._ +import scala.concurrent.{ExecutionContextExecutor, Future} +import scala.util.{Failure, Success} +/** + * Utilities to enable Akka Cluster External Sharding with Alpakka Kafka. + */ object KafkaClusterSharding { private val metadataActorCounter = new AtomicInteger + /** + * Asynchronously return a [[ShardingMessageExtractor]] with a default hashing strategy based on Apache Kafka's + * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]]. + * + * The number of partitions to use with the hashing strategy will be automatically determined by querying the Kafka + * cluster for the number of partitions of a user provided [[topic]]. Use the [[settings]] parameter to configure + * the Kafka Consumer connection required to retrieve the number of partitions. + * + * _Important_: All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure + * that entities are routed to the same Entity type. + */ def messageExtractor[M](system: ActorSystem, - groupId: String, topic: String, timeout: FiniteDuration, settings: ConsumerSettings[_,_]): Future[KafkaShardingMessageExtractor[M]] = getPartitionCount(system, topic, timeout, settings) - .map(kafkaPartitions => new KafkaShardingMessageExtractor[M](groupId, kafkaPartitions))(system.dispatcher) + .map(kafkaPartitions => new KafkaShardingMessageExtractor[M](kafkaPartitions))(system.dispatcher) + /** + * Asynchronously return a [[ShardingMessageExtractor]] with a default hashing strategy based on Apache Kafka's + * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]]. + * + * The number of partitions to use with the hashing strategy will be automatically determined by querying the Kafka + * cluster for the number of partitions of a user provided [[topic]]. Use the [[settings]] parameter to configure + * the Kafka Consumer connection required to retrieve the number of partitions. Use the [[entityIdExtractor]] to pick + * a field from the Entity to use as the entity id for the hashing strategy. + * + * _Important_: All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure + * that entities are routed to the same Entity type. + */ def messageExtractorNoEnvelope[M](system: ActorSystem, - groupId: String, topic: String, timeout: FiniteDuration, entityIdExtractor: M => String, settings: ConsumerSettings[_,_]): Future[KafkaShardingNoEnvelopeExtractor[M]] = getPartitionCount(system, topic, timeout, settings) - .map(kafkaPartitions => new KafkaShardingNoEnvelopeExtractor[M](groupId, kafkaPartitions, entityIdExtractor))(system.dispatcher) + .map(kafkaPartitions => new KafkaShardingNoEnvelopeExtractor[M](kafkaPartitions, entityIdExtractor))(system.dispatcher) private def getPartitionCount[M](system: ActorSystem, topic: String, timeout: FiniteDuration, settings: ConsumerSettings[_, _]): Future[Int] = { implicit val ec: ExecutionContextExecutor = system.dispatcher @@ -44,35 +74,72 @@ object KafkaClusterSharding { count } } -} -trait KafkaClusterSharding { - def groupId: String - def kafkaPartitions: Int + sealed trait KafkaClusterSharding { + def kafkaPartitions: Int + def shardId(entityId: String): String = { + // simplified version of Kafka's `DefaultPartitioner` implementation + val partition = org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(entityId.getBytes())) % kafkaPartitions + partition.toString + } + } - def shardId(entityId: String): String = { - // simplified version of Kafka's `DefaultPartitioner` implementation - val partition = org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(entityId.getBytes())) % kafkaPartitions - s"$groupId-$partition" + final class KafkaShardingMessageExtractor[M](val kafkaPartitions: Int) + extends ShardingMessageExtractor[ShardingEnvelope[M], M] with KafkaClusterSharding { + override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId + override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message } -} -class KafkaShardingMessageExtractor[M](val groupId: String, val kafkaPartitions: Int) - extends ShardingMessageExtractor[ShardingEnvelope[M], M] with KafkaClusterSharding { - override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId - override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message -} + final class KafkaShardingNoEnvelopeExtractor[M](val kafkaPartitions: Int, entityIdExtractor: M => String) + extends ShardingMessageExtractor[M, M] with KafkaClusterSharding { + override def entityId(message: M): String = entityIdExtractor(message) + override def unwrapMessage(message: M): M = message + } -/** - * Caveats - * - If Consumer subscription contains multiple topics, each topic has the exact same number of partitions. - * - Values are passed as `null` to the partitioner. - * - A fake [[org.apache.kafka.common.Cluster]] is passed to the [[org.apache.kafka.clients.producer.Partitioner]] that - * only contains partitions for the provided topic. If you choose to reuse a different partitioner then make sure your - * partitioner doesn't make use of any other Kafka Cluster metadata. - */ -class KafkaShardingNoEnvelopeExtractor[M](val groupId: String, val kafkaPartitions: Int, entityIdExtractor: M => String) - extends ShardingMessageExtractor[M, M] with KafkaClusterSharding { - override def entityId(message: M): String = entityIdExtractor(message) - override def unwrapMessage(message: M): M = message -} + // TODO: + // - will require `akka-actors-typed` as another provided dep, or should we just return a classic actor? + // - returning a typed actor is more flexible for the user so that they can easy create it under the `user` guardian + // when running akka typed. an alternative would be to create the actor ourself as a system actor, like is done with + // the KafkaConsumerActor for the metadata client. + /** + * The [[RebalanceListener]] handles [[TopicPartitionsAssigned]] events created by the Kafka consumer group rebalance + * listener. As partitions are assigned to this consumer group member we update the External Sharding strategy + * accordingly so that entities are (eventually) routed to the local Akka cluster member. + */ + object RebalanceListener { + def apply(typeKey: EntityTypeKey[_]): Behavior[ConsumerRebalanceEvent] = + Behaviors.setup { ctx => + val typeKeyName = typeKey.name + val shardAllocationClient = ExternalShardAllocation(ctx.system).clientFor(typeKeyName) + val address = Cluster(ctx.system).selfMember.address + Behaviors.receive[ConsumerRebalanceEvent] { + case (ctx, TopicPartitionsAssigned(_, partitions)) => + import ctx.executionContext + val partitionsList = partitions.mkString(",") + ctx.log.info("Consumer group '{}' is assigning topic partitions to cluster member '{}': [{}]", + typeKeyName, address, partitionsList) + val updates = partitions.map { tp => + val shardId = tp.partition().toString + // Kafka partition number becomes the akka shard id + // TODO: support assigning more than 1 shard id at once? + shardAllocationClient.updateShardLocation(shardId, address) + } + // TODO: pipeToSelf since we're closing over local state? + Future + .sequence(updates) + // each Future returns successfully once a majority of cluster nodes receive the update. + // there's no point blocking here because the rebalance listener is triggered asynchronously. if we want + // to block rebalances then we should provide an implementing using the `PartitionAssignmentHandler` instead + .onComplete { + case Success(_) => + ctx.log.info("Completed consumer group '{}' assignment of topic partitions to cluster member '{}': [{}]", + typeKeyName, address, partitionsList) + case Failure(ex) => + ctx.log.error("A failure occurred while updating cluster shards", ex) + } + Behaviors.same + case (_, TopicPartitionsRevoked(_, _)) => Behaviors.same + } + } + } +} \ No newline at end of file diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala index a695853..53e9d4d 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/Main.scala @@ -18,7 +18,7 @@ import scala.util.{Failure, Success} sealed trait Command case object NodeMemberUp extends Command case object StartProcessor extends Command -final case class MessageExtractor(strategy: ShardingMessageExtractor[UserEvents.Message, UserEvents.Message]) extends Command +final case class MessageExtractor(extractor: ShardingMessageExtractor[UserEvents.Message, UserEvents.Message]) extends Command object Main { def main(args: Array[String]): Unit = { @@ -57,12 +57,16 @@ object Main { ctx.self.tell(StartProcessor) starting(Some(extractor)) case (ctx, StartProcessor) if extractor.isDefined => - UserEvents.init(ctx.system, extractor.get) - val eventProcessor = ctx.spawn[Nothing](UserEventsKafkaProcessor(extractor.get), "kafka-event-processor") + val messageExtractor = extractor.get + val processorSettings = ProcessorConfig(ctx.system.settings.config.getConfig("kafka-to-sharding-processor")) + UserEvents.init(ctx.system, messageExtractor, processorSettings.groupId) + val eventProcessor = ctx.spawn[Nothing](UserEventsKafkaProcessor(messageExtractor), "kafka-event-processor") ctx.watch(eventProcessor) ctx.log.info("Processor started.") - val binding: Future[Http.ServerBinding] = startGrpc(ctx.system, frontEndPort, extractor.get) + val binding: Future[Http.ServerBinding] = startGrpc(ctx.system, frontEndPort, messageExtractor) running(binding, eventProcessor) + case (ctx, StartProcessor) => + Behaviors.same } def running(binding: Future[Http.ServerBinding], processor: ActorRef[Nothing]): Behavior[Command] = Behaviors diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala deleted file mode 100644 index 3b59790..0000000 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/TopicListener.scala +++ /dev/null @@ -1,54 +0,0 @@ -package sample.sharding.kafka - -import akka.actor.typed.Behavior -import akka.actor.typed.scaladsl.Behaviors -import akka.cluster.sharding.external._ -import akka.cluster.typed.Cluster -import akka.kafka.ConsumerRebalanceEvent -import akka.kafka.TopicPartitionsAssigned -import akka.kafka.TopicPartitionsRevoked -import akka.cluster.sharding.typed.scaladsl.EntityTypeKey - -import scala.concurrent.duration._ -import scala.util.Failure -import scala.util.Success - -object TopicListener { - def apply(groupId: String, typeKey: EntityTypeKey[_]): Behavior[ConsumerRebalanceEvent] = - Behaviors.setup { ctx => - import ctx.executionContext - val shardAllocationClient = ExternalShardAllocation(ctx.system).clientFor(typeKey.name) - ctx.system.scheduler.scheduleAtFixedRate(10.seconds, 20.seconds) { () => - shardAllocationClient.shardLocations().onComplete { - case Success(shardLocations) => - ctx.log.info("Current shard locations {}", shardLocations.locations) - case Failure(t) => - ctx.log.error("failed to get shard locations", t) - } - } - val address = Cluster(ctx.system).selfMember.address - Behaviors.receiveMessage[ConsumerRebalanceEvent] { - case TopicPartitionsAssigned(sub, partitions) => - // TODO - // - log all partitions assigned in one log line - // - block for shard allocation to complete, add configurable timeout - partitions.foreach(tp => { - val shardId = s"$groupId-${tp.partition()}" - ctx.log.info("Partition [{}] assigned to current node. Updating shard allocation", shardId) - // kafka partition becomes the akka shard - val done = shardAllocationClient.updateShardLocation(shardId, address) - done.onComplete { result => - ctx.log.info("Result for updating shard {}: {}", shardId, result) - } - - }) - Behaviors.same - case TopicPartitionsRevoked(_, topicPartitions) => - ctx.log.info( - "Partitions [{}] of group [{}] revoked from current node. New location will update shard allocation", - topicPartitions.mkString(","), - groupId) - Behaviors.same - } - } -} diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala index 69d815d..1cfa62d 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala @@ -7,17 +7,13 @@ import akka.actor.typed.{ActorRef, ActorSystem, Behavior} import akka.cluster.sharding.external.ExternalShardAllocationStrategy import akka.cluster.sharding.typed.{ClusterShardingSettings, ShardingMessageExtractor} import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity, EntityTypeKey} -import akka.kafka.{ConsumerSettings, KafkaClusterSharding, KafkaShardingNoEnvelopeExtractor} +import akka.kafka.{ConsumerSettings, KafkaClusterSharding} import org.apache.kafka.common.serialization.StringDeserializer import scala.concurrent.Future import scala.concurrent.duration._ object UserEvents { - - val TypeKey: EntityTypeKey[UserEvents.Message] = - EntityTypeKey[UserEvents.Message]("user-processing") - sealed trait Message extends CborSerializable { def userId: String } @@ -60,12 +56,11 @@ object UserEvents { * retrieve the number of partitions and use the same hashing algorithm used by the Apache Kafka * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]] (murmur2) with Akka Cluster Sharding. */ - def messageExtractor(system: ActorSystem[_]): Future[KafkaShardingNoEnvelopeExtractor[Message]] = { + def messageExtractor(system: ActorSystem[_]): Future[KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor[Message]] = { val processorConfig = ProcessorConfig(system.settings.config.getConfig("kafka-to-sharding-processor")) KafkaClusterSharding.messageExtractorNoEnvelope( system = system.toClassic, timeout = 10.seconds, - groupId = processorConfig.groupId, topic = processorConfig.topics.head, entityIdExtractor = (msg: Message) => msg.userId, settings = ConsumerSettings(system.toClassic, new StringDeserializer, new StringDeserializer) @@ -73,13 +68,17 @@ object UserEvents { ) } - def init(system: ActorSystem[_], messageExtractor: ShardingMessageExtractor[Message, Message]): ActorRef[Message] = + def init(system: ActorSystem[_], messageExtractor: ShardingMessageExtractor[Message, Message], groupId: String): ActorRef[Message] = { + // create an Akka Cluster Sharding `EntityTypeKey` for `UserEvents` for this Kafka consumer group + val typeKey = EntityTypeKey[UserEvents.Message](groupId) ClusterSharding(system).init( - Entity(TypeKey)(createBehavior = _ => UserEvents()) - .withAllocationStrategy(new ExternalShardAllocationStrategy(system, TypeKey.name)) + Entity(typeKey)(createBehavior = _ => UserEvents()) + // NOTE: why does `ExternalShardAllocationStrategy` not accept the type key type itself? + .withAllocationStrategy(new ExternalShardAllocationStrategy(system, typeKey.name)) .withMessageExtractor(messageExtractor) .withSettings(ClusterShardingSettings(system))) + } - def querySide(system: ActorSystem[_], messageExtractor: ShardingMessageExtractor[Message, Message]): ActorRef[UserQuery] = - init(system, messageExtractor).narrow[UserQuery] + def querySide(system: ActorSystem[_], messageExtractor: ShardingMessageExtractor[Message, Message], groupId: String): ActorRef[UserQuery] = + init(system, messageExtractor, groupId).narrow[UserQuery] } diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala index 155a51c..941f177 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala @@ -9,8 +9,8 @@ import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter._ import akka.cluster.sharding.typed.ShardingMessageExtractor -import akka.kafka.ConsumerSettings -import akka.kafka.Subscriptions +import akka.cluster.sharding.typed.scaladsl.EntityTypeKey +import akka.kafka.{ConsumerSettings, KafkaClusterSharding, Subscriptions} import akka.kafka.scaladsl.Consumer import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source @@ -40,8 +40,9 @@ object UserEventsKafkaProcessor { implicit val scheduler: Scheduler = classic.scheduler // TODO config val timeout = Timeout(3.seconds) - val rebalancerRef = ctx.spawn(TopicListener(processorSettings.groupId, UserEvents.TypeKey), "rebalancerRef") - val shardRegion = UserEvents.init(ctx.system, extractor) + val typeKey = EntityTypeKey[UserEvents.Message](processorSettings.groupId) + val rebalanceListener = ctx.spawn(KafkaClusterSharding.RebalanceListener(typeKey), "kafka-cluster-sharding-rebalance-listener") + val shardRegion = UserEvents.init(ctx.system, extractor, processorSettings.groupId) val consumerSettings = ConsumerSettings(ctx.system.toClassic, new StringDeserializer, new ByteArrayDeserializer) .withBootstrapServers(processorSettings.bootstrapServers) @@ -51,7 +52,7 @@ object UserEventsKafkaProcessor { val subscription = Subscriptions .topics(processorSettings.topics: _*) - .withRebalanceListener(rebalancerRef.toClassic) + .withRebalanceListener(rebalanceListener.toClassic) val kafkaConsumer: Source[ConsumerRecord[String, Array[Byte]], Consumer.Control] = Consumer.plainSource(consumerSettings, subscription) diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala index f3acb7c..3a3e808 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserGrpcService.scala @@ -5,6 +5,7 @@ import akka.actor.typed.ActorSystem import akka.actor.typed.scaladsl.AskPattern._ import akka.cluster.sharding.typed.ShardingMessageExtractor import akka.util.Timeout +import com.typesafe.config.ConfigFactory import sample.sharding.kafka.UserEvents.GetRunningTotal import sample.sharding.kafka.UserEvents.RunningTotal @@ -17,7 +18,8 @@ class UserGrpcService(system: ActorSystem[_], extractor: ShardingMessageExtracto implicit val sched = system.scheduler implicit val ec = system.executionContext - private val shardRegion: ActorRef[UserEvents.UserQuery] = UserEvents.querySide(system, extractor) + private val processorSettings = ProcessorConfig(ConfigFactory.load().getConfig("kafka-to-sharding-processor")) + private val shardRegion: ActorRef[UserEvents.UserQuery] = UserEvents.querySide(system, extractor, processorSettings.groupId) override def userStats(in: UserStatsRequest): Future[UserStatsResponse] = { shardRegion --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
