This is an automated email from the ASF dual-hosted git repository. fanningpj pushed a commit to branch wip-seglo-kafka-sharding in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit 7aa68a292e0b20cb14584849e3f9ac149623239d Author: Sean Glover <[email protected]> AuthorDate: Wed Feb 26 13:40:02 2020 -0500 Create as extension --- akka-sample-kafka-to-sharding-scala/README.md | 6 +- .../scala/akka/kafka/KafkaClusterSharding.scala | 129 ++++++++++++++------- .../scala/sample/sharding/kafka/UserEvents.scala | 3 +- .../sharding/kafka/UserEventsKafkaProcessor.scala | 2 +- 4 files changed, 95 insertions(+), 45 deletions(-) diff --git a/akka-sample-kafka-to-sharding-scala/README.md b/akka-sample-kafka-to-sharding-scala/README.md index 20ccf8b..732db5b 100644 --- a/akka-sample-kafka-to-sharding-scala/README.md +++ b/akka-sample-kafka-to-sharding-scala/README.md @@ -46,8 +46,8 @@ sbt "kafka / run" In the Kafka server window you'll see the following when the server is ready: ``` -12:46:47.022 INFO [run-main-0 ] sample.sharding.embeddedkafka.Main$ Kafka running on port '9092' -12:46:47.022 INFO [run-main-0 ] sample.sharding.embeddedkafka.Main$ Topic 'user-events' with '128' partitions created +12:06:59.711 INFO [run-main-0 ] s.s.embeddedkafka.KafkaBroker$ Kafka running: localhost:9092 +12:06:59.711 INFO [run-main-0 ] s.s.embeddedkafka.KafkaBroker$ Topic 'user-events' with 128 partitions created ``` If you want to use a different Kafka cluster then then update the `applications.conf`s in each project to point to your @@ -183,7 +183,7 @@ the correct node even if that moves due to a kafka rebalance. A gRPC client is included which can be started with... ``` -sbt "client/run" +sbt "client / run" ``` It assumes there is one of the nodes running its front end port on 8081. diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala index 2fbaa7f..6c981d0 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala @@ -5,12 +5,13 @@ import java.util.concurrent.atomic.AtomicInteger import akka.actor.typed.Behavior import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.Behaviors -import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem} +import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem, Extension, ExtensionId} import akka.annotation.{ApiMayChange, InternalApi} import akka.cluster.sharding.external.ExternalShardAllocation import akka.cluster.sharding.typed.scaladsl.EntityTypeKey import akka.cluster.sharding.typed.{ShardingEnvelope, ShardingMessageExtractor} import akka.cluster.typed.Cluster +import akka.kafka import akka.kafka.scaladsl.MetadataClient import akka.util.Timeout._ import org.apache.kafka.common.utils.Utils @@ -20,10 +21,14 @@ import scala.concurrent.{ExecutionContextExecutor, Future} import scala.util.{Failure, Success} /** - * Utilities to enable Akka Cluster External Sharding with Alpakka Kafka. + * Akka Extension to enable Akka Cluster External Sharding with Alpakka Kafka. */ -object KafkaClusterSharding { +class KafkaClusterSharding(system: ExtendedActorSystem) extends Extension { + import KafkaClusterSharding._ + private val metadataActorCounter = new AtomicInteger + private var _messageExtractor: Option[KafkaShardingMessageExtractor[_]] = None + private var _messageExtractorNoEnvelope: Option[KafkaShardingNoEnvelopeExtractor[_]] = None /** * API MAY CHANGE @@ -39,12 +44,33 @@ object KafkaClusterSharding { * that entities are routed to the same Entity type. */ @ApiMayChange - def messageExtractor[M](system: ActorSystem, - topic: String, + def messageExtractor[M](topic: String, timeout: FiniteDuration, - settings: ConsumerSettings[_,_]): Future[KafkaShardingMessageExtractor[M]] = - getPartitionCount(system, topic, timeout, settings) - .map(kafkaPartitions => new KafkaShardingMessageExtractor[M](kafkaPartitions))(system.dispatcher) + settings: ConsumerSettings[_,_]): Future[KafkaShardingMessageExtractor[M]] = _messageExtractor match { + case Some(extractor) => Future.successful(extractor.asInstanceOf[KafkaShardingMessageExtractor[M]]) + case _ => + getPartitionCount(topic, timeout, settings) + .map { kafkaPartitions => + val extractor = new KafkaShardingMessageExtractor[M](kafkaPartitions) + _messageExtractor = Some(extractor) + extractor + }(system.dispatcher) + } + + /** + * API MAY CHANGE + * + * Asynchronously return a [[ShardingMessageExtractor]] with a default hashing strategy based on Apache Kafka's + * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]]. + * + * The number of partitions to use with the hashing strategy is provided by explicitly with [[kafkaPartitions]]. + * + * All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure + * that entities are routed to the same Entity type. + */ + @ApiMayChange + def messageExtractor[M](kafkaPartitions: Int): Future[KafkaShardingMessageExtractor[M]] = + Future.successful(new KafkaShardingMessageExtractor[M](kafkaPartitions)) /** * API MAY CHANGE @@ -61,19 +87,39 @@ object KafkaClusterSharding { * that entities are routed to the same Entity type. */ @ApiMayChange - def messageExtractorNoEnvelope[M](system: ActorSystem, - topic: String, + def messageExtractorNoEnvelope[M](topic: String, timeout: FiniteDuration, entityIdExtractor: M => String, - settings: ConsumerSettings[_,_]): Future[KafkaShardingNoEnvelopeExtractor[M]] = - getPartitionCount(system, topic, timeout, settings) - .map(kafkaPartitions => new KafkaShardingNoEnvelopeExtractor[M](kafkaPartitions, entityIdExtractor))(system.dispatcher) + settings: ConsumerSettings[_,_]): Future[KafkaShardingNoEnvelopeExtractor[M]] = _messageExtractorNoEnvelope match { + case Some(extractor) => Future.successful(extractor.asInstanceOf[KafkaShardingNoEnvelopeExtractor[M]]) + case _ => + getPartitionCount(topic, timeout, settings) + .map { kafkaPartitions => + val extractor = new KafkaShardingNoEnvelopeExtractor[M](kafkaPartitions, entityIdExtractor) + _messageExtractorNoEnvelope = Some(extractor) + extractor + }(system.dispatcher) + } - private def getPartitionCount[M](system: ActorSystem, topic: String, timeout: FiniteDuration, settings: ConsumerSettings[_, _]): Future[Int] = { + /** + * API MAY CHANGE + * + * Asynchronously return a [[ShardingMessageExtractor]] with a default hashing strategy based on Apache Kafka's + * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]]. + * + * The number of partitions to use with the hashing strategy is provided by explicitly with [[kafkaPartitions]]. + * + * All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure + * that entities are routed to the same Entity type. + */ + @ApiMayChange + def messageExtractorNoEnvelope[M](kafkaPartitions: Int, entityIdExtractor: M => String): Future[KafkaShardingNoEnvelopeExtractor[M]] = + Future.successful(new KafkaShardingNoEnvelopeExtractor[M](kafkaPartitions, entityIdExtractor)) + + private def getPartitionCount[M](topic: String, timeout: FiniteDuration, settings: ConsumerSettings[_, _]): Future[Int] = { implicit val ec: ExecutionContextExecutor = system.dispatcher val actorNum = metadataActorCounter.getAndIncrement() val consumerActor = system - .asInstanceOf[ExtendedActorSystem] .systemActorOf(KafkaConsumerActor.props(settings), s"metadata-consumer-actor-$actorNum") val metadataClient = MetadataClient.create(consumerActor, timeout) val numPartitions = metadataClient.getPartitionsFor(topic).map(_.length) @@ -83,30 +129,6 @@ object KafkaClusterSharding { } } - @InternalApi - sealed trait KafkaClusterSharding { - def kafkaPartitions: Int - def shardId(entityId: String): String = { - // simplified version of Kafka's `DefaultPartitioner` implementation - val partition = org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(entityId.getBytes())) % kafkaPartitions - partition.toString - } - } - - @InternalApi - final class KafkaShardingMessageExtractor[M] private[kafka](val kafkaPartitions: Int) - extends ShardingMessageExtractor[ShardingEnvelope[M], M] with KafkaClusterSharding { - override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId - override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message - } - - @InternalApi - final class KafkaShardingNoEnvelopeExtractor[M] private[kafka](val kafkaPartitions: Int, entityIdExtractor: M => String) - extends ShardingMessageExtractor[M, M] with KafkaClusterSharding { - override def entityId(message: M): String = entityIdExtractor(message) - override def unwrapMessage(message: M): M = message - } - // TODO: will require `akka-actors-typed` as a provided dep /** * API MAY CHANGE @@ -158,4 +180,33 @@ object KafkaClusterSharding { .systemActorOf(actor, "kafka-cluster-sharding-rebalance-listener") .toClassic } +} + +object KafkaClusterSharding extends ExtensionId[KafkaClusterSharding] { + @InternalApi + sealed trait KafkaClusterShardingContract { + def kafkaPartitions: Int + def shardId(entityId: String): String = { + // simplified version of Kafka's `DefaultPartitioner` implementation + val partition = org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(entityId.getBytes())) % kafkaPartitions + partition.toString + } + } + + @InternalApi + final class KafkaShardingMessageExtractor[M] private[kafka](val kafkaPartitions: Int) + extends ShardingMessageExtractor[ShardingEnvelope[M], M] with KafkaClusterShardingContract { + override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId + override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message + } + + @InternalApi + final class KafkaShardingNoEnvelopeExtractor[M] private[kafka](val kafkaPartitions: Int, entityIdExtractor: M => String) + extends ShardingMessageExtractor[M, M] with KafkaClusterShardingContract { + override def entityId(message: M): String = entityIdExtractor(message) + override def unwrapMessage(message: M): M = message + } + + override def createExtension(system: ExtendedActorSystem): kafka.KafkaClusterSharding = + new KafkaClusterSharding(system) } \ No newline at end of file diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala index 1cfa62d..763bc49 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala @@ -58,8 +58,7 @@ object UserEvents { */ def messageExtractor(system: ActorSystem[_]): Future[KafkaClusterSharding.KafkaShardingNoEnvelopeExtractor[Message]] = { val processorConfig = ProcessorConfig(system.settings.config.getConfig("kafka-to-sharding-processor")) - KafkaClusterSharding.messageExtractorNoEnvelope( - system = system.toClassic, + KafkaClusterSharding(system.toClassic).messageExtractorNoEnvelope( timeout = 10.seconds, topic = processorConfig.topics.head, entityIdExtractor = (msg: Message) => msg.userId, diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala index 43ad4bb..6c8d4c0 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEventsKafkaProcessor.scala @@ -40,7 +40,7 @@ object UserEventsKafkaProcessor { // TODO config val timeout = Timeout(3.seconds) val typeKey = EntityTypeKey[UserEvents.Message](processorSettings.groupId) - val rebalanceListener = KafkaClusterSharding.rebalanceListener(classic, typeKey) + val rebalanceListener = KafkaClusterSharding(classic).rebalanceListener(classic, typeKey) val shardRegion = UserEvents.init(ctx.system, extractor, processorSettings.groupId) val consumerSettings = ConsumerSettings(classic, new StringDeserializer, new ByteArrayDeserializer) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
