This is an automated email from the ASF dual-hosted git repository. fanningpj pushed a commit to branch wip-seglo-kafka-sharding in repository https://gitbox.apache.org/repos/asf/incubator-pekko-samples.git
commit 5c118c69be77f2ceac59f4c04c4e561eaa59738d Author: Sean Glover <[email protected]> AuthorDate: Fri Feb 14 13:14:22 2020 -0500 WIP- use systemActorOf to create metadata client --- .../akka/kafka/KafkaShardingMessageExtractor.scala | 78 ++++++++++++---------- .../scala/sample/sharding/kafka/UserEvents.scala | 29 ++++---- 2 files changed, 60 insertions(+), 47 deletions(-) diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala index f02eb41..14353ab 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/akka/kafka/KafkaShardingMessageExtractor.scala @@ -1,52 +1,61 @@ package akka.kafka +import java.util.concurrent.atomic.AtomicInteger + import akka.actor.{ActorSystem, ExtendedActorSystem} import akka.cluster.sharding.typed.{ShardingEnvelope, ShardingMessageExtractor} +import akka.kafka.DefaultKafkaShardingMessageExtractor.PartitionCountStrategy import akka.kafka.scaladsl.MetadataClient import akka.util.Timeout._ -import org.apache.kafka.clients.producer.Partitioner -import org.apache.kafka.clients.producer.internals.DefaultPartitioner -import org.apache.kafka.common.{Node, PartitionInfo, Cluster => KafkaCluster} +import org.apache.kafka.common.utils.Utils import scala.concurrent.duration._ -import scala.concurrent.{Await, ExecutionContextExecutor} -import scala.jdk.CollectionConverters._ - -private[kafka] trait DefaultKafkaShardingMessageExtractor { - implicit val actorSystem: ActorSystem - implicit val timeout: FiniteDuration - implicit val ec: ExecutionContextExecutor = actorSystem.dispatcher +import scala.concurrent.{Await, ExecutionContext} - val clientSettings: ConsumerSettings[_, _] - val groupId: String - val topic: String +object DefaultKafkaShardingMessageExtractor { + sealed trait PartitionCountStrategy { + def groupId: String + def partitions: Int + } + final case class Provided(groupId: String, partitions: Int) extends PartitionCountStrategy + final case class RetrieveFromKafka( + system: ActorSystem, + timeout: FiniteDuration, + groupId: String, + topic: String, + settings: ConsumerSettings[_,_]) + extends PartitionCountStrategy { + import RetrieveFromKafka._ + private implicit val ec: ExecutionContext = system.dispatcher + lazy val partitions: Int = { + val actorNum = metadataActorCounter.getAndIncrement() + val consumerActor = system + .asInstanceOf[ExtendedActorSystem] + .systemActorOf(KafkaConsumerActor.props(settings), s"metadata-consumer-actor-$actorNum") + val metadataClient = MetadataClient.create(consumerActor, timeout) + val numPartitions = metadataClient.getPartitionsFor(topic).map(_.length) + numPartitions.foreach(num => system.log.info("Retrieved {} partitions for topic '{}' for group '{}'", num, topic, groupId)) + Await.result(numPartitions, timeout) + } + } + object RetrieveFromKafka { + private val metadataActorCounter = new AtomicInteger + } +} - private val CLUSTER_ID = "cluster-id" - private val kafkaPartitioner = partitioner() - private val kafkaCluster = cluster(partitions()) +private[kafka] trait DefaultKafkaShardingMessageExtractor { + val strategy: PartitionCountStrategy + private val groupId: String = strategy.groupId + private val kafkaPartitions: Int = strategy.partitions def shardId(entityId: String): String = { - val partition = kafkaPartitioner - .partition(topic, entityId, entityId.getBytes(), null, null, kafkaCluster) + // simplified version of Kafka's `DefaultPartitioner` implementation + val partition = org.apache.kafka.common.utils.Utils.toPositive(Utils.murmur2(entityId.getBytes())) % kafkaPartitions s"$groupId-$partition" } - - def partitions(): List[PartitionInfo] = { - val consumerActor = actorSystem.actorOf(KafkaConsumerActor.props(clientSettings), "metadata-consumer-actor") - val metadataClient = MetadataClient.create(consumerActor, timeout) - val partitions = metadataClient.getPartitionsFor(topic) - partitions.foreach(p => actorSystem.log.info("Retrieved %s partitions for topic %s for group %s", p.length, topic, groupId)) - Await.result(partitions, timeout) - } - - def cluster(partitions: List[PartitionInfo]): KafkaCluster = - new KafkaCluster(CLUSTER_ID, List.empty[Node].asJavaCollection, partitions.asJavaCollection, Set.empty[String].asJava, Set.empty[String].asJava) - - def partitioner(): Partitioner = new DefaultPartitioner() } -final class KafkaShardingMessageExtractor[M](val clientSettings: ConsumerSettings[_,_], val groupId: String, val topic: String) - (implicit val actorSystem: ActorSystem, val timeout: FiniteDuration) +final class KafkaShardingMessageExtractor[M](val strategy: PartitionCountStrategy) extends ShardingMessageExtractor[ShardingEnvelope[M], M] with DefaultKafkaShardingMessageExtractor { override def entityId(envelope: ShardingEnvelope[M]): String = envelope.entityId override def unwrapMessage(envelope: ShardingEnvelope[M]): M = envelope.message @@ -60,8 +69,7 @@ final class KafkaShardingMessageExtractor[M](val clientSettings: ConsumerSetting * only contains partitions for the provided topic. If you choose to reuse a different partitioner then make sure your * partitioner doesn't make use of any other Kafka Cluster metadata. */ -abstract class KafkaShardingNoEnvelopeExtractor[M](val clientSettings: ConsumerSettings[_,_], val groupId: String, val topic: String) - (implicit val actorSystem: ActorSystem, val timeout: FiniteDuration) +abstract class KafkaShardingNoEnvelopeExtractor[M](val strategy: PartitionCountStrategy) extends ShardingMessageExtractor[M, M] with DefaultKafkaShardingMessageExtractor { override def unwrapMessage(message: M): M = message } diff --git a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala index f0f4f90..5cf3321 100644 --- a/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala +++ b/akka-sample-kafka-to-sharding-scala/processor/src/main/scala/sample/sharding/kafka/UserEvents.scala @@ -7,6 +7,7 @@ import akka.actor.typed.{ActorRef, ActorSystem, Behavior} import akka.cluster.sharding.external.ExternalShardAllocationStrategy import akka.cluster.sharding.typed.ClusterShardingSettings import akka.cluster.sharding.typed.scaladsl.{ClusterSharding, Entity, EntityTypeKey} +import akka.kafka.DefaultKafkaShardingMessageExtractor.{PartitionCountStrategy, RetrieveFromKafka} import akka.kafka.{ConsumerSettings, KafkaShardingNoEnvelopeExtractor} import org.apache.kafka.common.serialization.StringDeserializer @@ -53,24 +54,28 @@ object UserEvents { } } - /* - * The KafkaShardingMessageExtractor uses the KafkaProducer's underlying DefaultPartitioner so that the same murmur2 - * hashing algorithm is used for Kafka and Akka Cluster Sharding + /** + * Passing in a [[RetrieveFromKafka]] strategy will automatically retrieve the number of partitions of a topic for + * use with the same hashing algorithm used by the Apache Kafka [[org.apache.kafka.clients.producer.internals.DefaultPartitioner]] + * (murmur2) with Akka Cluster Sharding. */ - class UserIdMessageExtractor(clientSettings: ConsumerSettings[_,_], topic: String, groupId: String) - (implicit actorSystem: akka.actor.ActorSystem, timeout: FiniteDuration) - extends KafkaShardingNoEnvelopeExtractor[Message](clientSettings, topic, groupId) { + class UserIdMessageExtractor(strategy: PartitionCountStrategy) + extends KafkaShardingNoEnvelopeExtractor[Message](strategy) { def entityId(message: Message): String = message.userId } def init(system: ActorSystem[_]): ActorRef[Message] = { val processorConfig = ProcessorConfig(system.settings.config.getConfig("kafka-to-sharding-processor")) - implicit val classic: akka.actor.ActorSystem = system.toClassic - implicit val timeout: FiniteDuration = 10.seconds - val clientSettings = ConsumerSettings(classic, new StringDeserializer, new StringDeserializer) - val topic = processorConfig.topics.head - val groupId = processorConfig.groupId - val messageExtractor = new UserIdMessageExtractor(clientSettings, topic, groupId) + val messageExtractor = new UserIdMessageExtractor( + strategy = RetrieveFromKafka( + system = system.toClassic, + timeout = 10.seconds, + groupId = processorConfig.groupId, + topic = processorConfig.topics.head, + settings = ConsumerSettings(system.toClassic, new StringDeserializer, new StringDeserializer) + .withBootstrapServers(processorConfig.bootstrapServers) + ) + ) ClusterSharding(system).init( Entity(TypeKey)(createBehavior = _ => UserEvents()) .withAllocationStrategy(new ExternalShardAllocationStrategy(system, TypeKey.name)) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
