This is an automated email from the ASF dual-hosted git repository. fanningpj pushed a commit to branch wip-seglo-kafka-sharding in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit 27b6c800a34b66e4878562e9cd0ffe286550aa36 Author: Sean Glover <[email protected]> AuthorDate: Wed Feb 26 16:56:45 2020 -0500 Use Alpakka Kafka snapshot --- akka-sample-kafka-to-sharding-scala/build.sbt | 2 +- .../scala/akka/kafka/KafkaClusterSharding.scala | 212 --------------------- .../scala/sample/sharding/kafka/UserEvents.scala | 1 - 3 files changed, 1 insertion(+), 214 deletions(-) diff --git a/akka-sample-kafka-to-sharding-scala/build.sbt b/akka-sample-kafka-to-sharding-scala/build.sbt index 5a08bb7..1a3c8a7 100644 --- a/akka-sample-kafka-to-sharding-scala/build.sbt +++ b/akka-sample-kafka-to-sharding-scala/build.sbt @@ -1,5 +1,5 @@ val AkkaVersion = "2.6.3" -val AlpakkaKafkaVersion = "2.0.1" +val AlpakkaKafkaVersion = "2.0.2+2-88deb905+20200226-1650" val AkkaManagementVersion = "1.0.5" val AkkaHttpVersion = "10.1.11" val KafkaVersion = "2.4.0" diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala deleted file mode 100644 index 6c981d0..0000000 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaClusterSharding.scala +++ /dev/null @@ -1,212 +0,0 @@ -package akka.kafka - -import java.util.concurrent.atomic.AtomicInteger - -import akka.actor.typed.Behavior -import akka.actor.typed.scaladsl.adapter._ -import akka.actor.typed.scaladsl.Behaviors -import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem, Extension, ExtensionId} -import akka.annotation.{ApiMayChange, InternalApi} -import akka.cluster.sharding.external.ExternalShardAllocation -import akka.cluster.sharding.typed.scaladsl.EntityTypeKey -import akka.cluster.sharding.typed.{ShardingEnvelope, ShardingMessageExtractor} -import akka.cluster.typed.Cluster -import akka.kafka -import akka.kafka.scaladsl.MetadataClient -import akka.util.Timeout._ -import org.apache.kafka.common.utils.Utils - -import scala.concurrent.duration._ -import scala.concurrent.{ExecutionContextExecutor, Future} -import scala.util.{Failure, Success} - -/** - * Akka Extension to enable Akka Cluster External Sharding with Alpakka Kafka. - */ -class KafkaClusterSharding(system: ExtendedActorSystem) extends Extension { - import KafkaClusterSharding._ - - private val metadataActorCounter = new AtomicInteger - private var _messageExtractor: Option[KafkaShardingMessageExtractor[_]] = None - private var _messageExtractorNoEnvelope: Option[KafkaShardingNoEnvelopeExtractor[_]] = None - - /** - * API MAY CHANGE - * - * Asynchronously return a [[ShardingMessageExtractor]] with a default hashing strategy based on Apache Kafka's - * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]]. - * - * The number of partitions to use with the hashing strategy will be automatically determined by querying the Kafka - * cluster for the number of partitions of a user provided [[topic]]. Use the [[settings]] parameter to configure - * the Kafka Consumer connection required to retrieve the number of partitions. - * - * All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure - * that entities are routed to the same Entity type. - */ - @ApiMayChange - def messageExtractor[M](topic: String, - timeout: FiniteDuration, - settings: ConsumerSettings[_,_]): Future[KafkaShardingMessageExtractor[M]] = _messageExtractor match { - case Some(extractor) => Future.successful(extractor.asInstanceOf[KafkaShardingMessageExtractor[M]]) - case _ => - getPartitionCount(topic, timeout, settings) - .map { kafkaPartitions => - val extractor = new KafkaShardingMessageExtractor[M](kafkaPartitions) - _messageExtractor = Some(extractor) - extractor - }(system.dispatcher) - } - - /** - * API MAY CHANGE - * - * Asynchronously return a [[ShardingMessageExtractor]] with a default hashing strategy based on Apache Kafka's - * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]]. - * - * The number of partitions to use with the hashing strategy is provided by explicitly with [[kafkaPartitions]]. - * - * All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure - * that entities are routed to the same Entity type. - */ - @ApiMayChange - def messageExtractor[M](kafkaPartitions: Int): Future[KafkaShardingMessageExtractor[M]] = - Future.successful(new KafkaShardingMessageExtractor[M](kafkaPartitions)) - - /** - * API MAY CHANGE - * - * Asynchronously return a [[ShardingMessageExtractor]] with a default hashing strategy based on Apache Kafka's - * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]]. - * - * The number of partitions to use with the hashing strategy will be automatically determined by querying the Kafka - * cluster for the number of partitions of a user provided [[topic]]. Use the [[settings]] parameter to configure - * the Kafka Consumer connection required to retrieve the number of partitions. Use the [[entityIdExtractor]] to pick - * a field from the Entity to use as the entity id for the hashing strategy. - * - * All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure - * that entities are routed to the same Entity type. - */ - @ApiMayChange - def messageExtractorNoEnvelope[M](topic: String, - timeout: FiniteDuration, - entityIdExtractor: M => String, - settings: ConsumerSettings[_,_]): Future[KafkaShardingNoEnvelopeExtractor[M]] = _messageExtractorNoEnvelope match { - case Some(extractor) => Future.successful(extractor.asInstanceOf[KafkaShardingNoEnvelopeExtractor[M]]) - case _ => - getPartitionCount(topic, timeout, settings) - .map { kafkaPartitions => - val extractor = new KafkaShardingNoEnvelopeExtractor[M](kafkaPartitions, entityIdExtractor) - _messageExtractorNoEnvelope = Some(extractor) - extractor - }(system.dispatcher) - } - - /** - * API MAY CHANGE - * - * Asynchronously return a [[ShardingMessageExtractor]] with a default hashing strategy based on Apache Kafka's - * [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]]. - * - * The number of partitions to use with the hashing strategy is provided by explicitly with [[kafkaPartitions]]. - * - * All topics used in a Consumer [[Subscription]] must contain the same number of partitions to ensure - * that entities are routed to the same Entity type. - */ - @ApiMayChange - def messageExtractorNoEnvelope[M](kafkaPartitions: Int, entityIdExtractor: M => String): Future[KafkaShardingNoEnvelopeExtractor[M]] = - Future.successful(new KafkaShardingNoEnvelopeExtractor[M](kafkaPartitions, entityIdExtractor)) - - private def getPartitionCount[M](topic: String, timeout: FiniteDuration, settings: ConsumerSettings[_, _]): Future[Int] = { - implicit val ec: ExecutionContextExecutor = system.dispatcher - val actorNum = metadataActorCounter.getAndIncrement() - val consumerActor = system - .systemActorOf(KafkaConsumerActor.props(settings), s"metadata-consumer-actor-$actorNum") - val metadataClient = MetadataClient.create(consumerActor, timeout) - val numPartitions = metadataClient.getPartitionsFor(topic).map(_.length) - numPartitions.map { count => - system.log.info("Retrieved {} partitions for topic '{}'", count, topic) - count - } - } - - // TODO: will require `akka-actors-typed` as a provided dep - /** - * API MAY CHANGE - * - * Create an Alpakka Kafka rebalance listener that handles [[TopicPartitionsAssigned]] events. The [[typeKey]] is used - * to create the [[ExternalShardAllocation]] client. When partitions are assigned to this consumer group member the - * rebalance listener will use the [[ExternalShardAllocation]] client to update the External Sharding strategy - * accordingly so that entities are (eventually) routed to the local Akka cluster member. - * - * Returns an Akka classic [[ActorRef]] that can be passed to an Alpakka Kafka [[ConsumerSettings]]. - */ - @ApiMayChange - def rebalanceListener(system: ActorSystem, typeKey: EntityTypeKey[_]): ActorRef = { - val actor: Behavior[ConsumerRebalanceEvent] = Behaviors.setup { ctx => - val typeKeyName = typeKey.name - val shardAllocationClient = ExternalShardAllocation(ctx.system).clientFor(typeKeyName) - val address = Cluster(ctx.system).selfMember.address - Behaviors.receive[ConsumerRebalanceEvent] { - case (ctx, TopicPartitionsAssigned(_, partitions)) => - import ctx.executionContext - val partitionsList = partitions.mkString(",") - ctx.log.info("Consumer group '{}' is assigning topic partitions to cluster member '{}': [{}]", - typeKeyName, address, partitionsList) - val updates = partitions.map { tp => - val shardId = tp.partition().toString - // Kafka partition number becomes the akka shard id - // TODO: support assigning more than 1 shard id at once? - shardAllocationClient.updateShardLocation(shardId, address) - } - Future - .sequence(updates) - // Each Future returns successfully once a majority of cluster nodes receive the update. There's no point - // blocking here because the rebalance listener is triggered asynchronously. If we want to block during - // rebalance then we should provide an implementation using the `PartitionAssignmentHandler` instead - .onComplete { - case Success(_) => - ctx.log.info("Completed consumer group '{}' assignment of topic partitions to cluster member '{}': [{}]", - typeKeyName, address, partitionsList) - case Failure(ex) => - ctx.log.error("A failure occurred while updating cluster shards", ex) - } - Behaviors.same - case (_, TopicPartitionsRevoked(_, _)) => Behaviors.same - } - } - - system - .toTyped - .systemActorOf(actor, "kafka-cluster-sharding-rebalance-listener") - .toClassic - } -} - -object KafkaClusterSharding extends ExtensionId[KafkaClusterSharding] { - @InternalApi - sealed trait KafkaClusterShardingContract { - def kafkaPartitions: Int - def shardId(entityId: String): String = { - // simplified version of Kafka's `DefaultPartitioner` implementation - val partition = org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(entityId.getBytes())) % kafkaPartitions - partition.toString - } - } - - @InternalApi - final class KafkaShardingMessageExtractor[M] private[kafka](val kafkaPartitions: Int) - extends ShardingMessageExtractor[ShardingEnvelope[M], M] with KafkaClusterShardingContract { - override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId - override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message - } - - @InternalApi - final class KafkaShardingNoEnvelopeExtractor[M] private[kafka](val kafkaPartitions: Int, entityIdExtractor: M => String) - extends ShardingMessageExtractor[M, M] with KafkaClusterShardingContract { - override def entityId(message: M): String = entityIdExtractor(message) - override def unwrapMessage(message: M): M = message - } - - override def createExtension(system: ExtendedActorSystem): kafka.KafkaClusterSharding = - new KafkaClusterSharding(system) -} \ No newline at end of file diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala index 763bc49..a7b85d3 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala @@ -72,7 +72,6 @@ object UserEvents { val typeKey = EntityTypeKey[UserEvents.Message](groupId) ClusterSharding(system).init( Entity(typeKey)(createBehavior = _ => UserEvents()) - // NOTE: why does `ExternalShardAllocationStrategy` not accept the type key type itself? .withAllocationStrategy(new ExternalShardAllocationStrategy(system, typeKey.name)) .withMessageExtractor(messageExtractor) .withSettings(ClusterShardingSettings(system))) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
