This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9db58886097 MINOR: FindCoordinator API does not lookup partition for 
share partition key correctly (#19273)
9db58886097 is described below

commit 9db5888609752db2c4138c4be78b47563e4d1198
Author: David Jacot <[email protected]>
AuthorDate: Mon Mar 24 19:43:23 2025 +0100

    MINOR: FindCoordinator API does not lookup partition for share partition 
key correctly (#19273)
    
    This patch fixes another bug in the FindCoordinator API handling for
    share partition key. `shareCoordinator.foreach` returns `Unit` so
    `shareCoordinator.foreach(coordinator =>
    coordinator.partitionFor(SharePartitionKey.getInstance(key)))` does not
    return the partition for the key.
    
    Reviewers: Jhen-Yung Hsu <[email protected]>, Chia-Ping Tsai 
<[email protected]>
---
 core/src/main/scala/kafka/server/KafkaApis.scala   |  3 ++-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 31 +++++++++++++++++++++-
 2 files changed, 32 insertions(+), 2 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index a7ddfd8aee4..af851f3ed84 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1180,7 +1180,8 @@ class KafkaApis(val requestChannel: RequestChannel,
           (txnCoordinator.partitionFor(key), TRANSACTION_STATE_TOPIC_NAME)
 
         case CoordinatorType.SHARE =>
-          (shareCoordinator.foreach(coordinator => 
coordinator.partitionFor(SharePartitionKey.getInstance(key))), 
SHARE_GROUP_STATE_TOPIC_NAME)
+          // We know that shareCoordinator is defined at this stage.
+          
(shareCoordinator.get.partitionFor(SharePartitionKey.getInstance(key)), 
SHARE_GROUP_STATE_TOPIC_NAME)
       }
 
       val topicMetadata = 
metadataCache.getTopicMetadata(Set(internalTopicName).asJava, 
request.context.listenerName, false, false).asScala
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index de111e74ff2..d71e5c88778 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -33,6 +33,7 @@ import org.apache.kafka.common.config.ConfigResource
 import org.apache.kafka.common.config.ConfigResource.Type.{BROKER, 
BROKER_LOGGER}
 import org.apache.kafka.common.errors.{ClusterAuthorizationException, 
UnsupportedVersionException}
 import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.internals.Topic.SHARE_GROUP_STATE_TOPIC_NAME
 import org.apache.kafka.common.memory.MemoryPool
 import 
org.apache.kafka.common.message.AddPartitionsToTxnRequestData.{AddPartitionsToTxnTopic,
 AddPartitionsToTxnTopicCollection, AddPartitionsToTxnTransaction, 
AddPartitionsToTxnTransactionCollection}
 import 
org.apache.kafka.common.message.AddPartitionsToTxnResponseData.AddPartitionsToTxnResult
@@ -88,7 +89,7 @@ import org.apache.kafka.server.authorizer.{Action, 
AuthorizationResult, Authoriz
 import org.apache.kafka.server.common.{FeatureVersion, FinalizedFeatures, 
GroupVersion, KRaftVersion, MetadataVersion, RequestLocal, TransactionVersion}
 import org.apache.kafka.server.config.{KRaftConfigs, ReplicationConfigs, 
ServerConfigs, ServerLogConfigs}
 import org.apache.kafka.server.metrics.ClientMetricsTestUtils
-import org.apache.kafka.server.share.{CachedSharePartition, 
ErroneousAndValidPartitionData}
+import org.apache.kafka.server.share.{CachedSharePartition, 
ErroneousAndValidPartitionData, SharePartitionKey}
 import org.apache.kafka.server.quota.ThrottleCallback
 import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
 import org.apache.kafka.server.share.context.{FinalContext, 
ShareSessionContext}
@@ -789,6 +790,34 @@ class KafkaApisTest extends Logging {
     assertEquals(expectedResponse, response.data)
   }
 
+  @Test
+  def testFindCoordinatorWithValidSharePartitionKey(): Unit = {
+    addTopicToMetadataCache(SHARE_GROUP_STATE_TOPIC_NAME, 10, 3)
+    val key = SharePartitionKey.getInstance("foo", Uuid.randomUuid(), 10)
+
+    val request = new FindCoordinatorRequestData()
+      .setKeyType(CoordinatorType.SHARE.id)
+      .setCoordinatorKeys(asList(key.asCoordinatorKey))
+
+    val requestChannelRequest = buildRequest(new 
FindCoordinatorRequest.Builder(request).build())
+
+    kafkaApis = createKafkaApis()
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+    
when(shareCoordinator.partitionFor(ArgumentMatchers.eq(key))).thenReturn(10)
+
+    val expectedResponse = new FindCoordinatorResponseData()
+      .setCoordinators(asList(
+        new FindCoordinatorResponseData.Coordinator()
+          .setKey(key.asCoordinatorKey)
+          .setNodeId(0)
+          .setHost("broker0")
+          .setPort(9092)))
+
+    val response = 
verifyNoThrottling[FindCoordinatorResponse](requestChannelRequest)
+    assertEquals(expectedResponse, response.data)
+  }
+
   @Test
   def testMetadataAutoTopicCreationForOffsetTopic(): Unit = {
     testMetadataAutoTopicCreation(Topic.GROUP_METADATA_TOPIC_NAME, 
enableAutoTopicCreation = true,

Reply via email to