This is an automated email from the ASF dual-hosted git repository.

dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 95ef3449404 MINOR: FindCoordinator API should return INVALID_REQUEST 
when share partition key is invalid (#19272)
95ef3449404 is described below

commit 95ef3449404fecd401822b707709ab9c8d06490b
Author: David Jacot <[email protected]>
AuthorDate: Mon Mar 24 16:29:20 2025 +0100

    MINOR: FindCoordinator API should return INVALID_REQUEST when share 
partition key is invalid (#19272)
    
    At the moment, the FindCoordinator API returns an `UNKNOWN_SERVER_ERROR`
    error when the share partition key is invalid. It seems that the aim was
    to return an `INVALID_REQUEST` error but the code has a small bug
    preventing it from working as expected.
    
    Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield 
<[email protected]>
---
 core/src/main/scala/kafka/server/KafkaApis.scala   |  2 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    | 24 ++++++++++++++++++++++
 2 files changed, 25 insertions(+), 1 deletion(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index b6fbce294aa..a7ddfd8aee4 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1169,7 +1169,7 @@ class KafkaApis(val requestChannel: RequestChannel,
         } catch {
           case e: IllegalArgumentException =>
             error(s"Share coordinator key is invalid", e)
-            (Errors.INVALID_REQUEST, Node.noNode())
+            return (Errors.INVALID_REQUEST, Node.noNode)
         }
       }
       val (partition, internalTopicName) = CoordinatorType.forId(keyType) 
match {
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index d02f35e71c6..de111e74ff2 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -765,6 +765,30 @@ class KafkaApisTest extends Logging {
     }
   }
 
+  @Test
+  def testFindCoordinatorWithInvalidSharePartitionKey(): Unit = {
+    val request = new FindCoordinatorRequestData()
+      .setKeyType(CoordinatorType.SHARE.id)
+      .setCoordinatorKeys(asList(""))
+
+    val requestChannelRequest = buildRequest(new 
FindCoordinatorRequest.Builder(request).build())
+
+    kafkaApis = createKafkaApis()
+    kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+    val expectedResponse = new FindCoordinatorResponseData()
+      .setCoordinators(asList(
+        new FindCoordinatorResponseData.Coordinator()
+          .setKey("")
+          .setErrorCode(Errors.INVALID_REQUEST.code)
+          .setNodeId(-1)
+          .setHost("")
+          .setPort(-1)))
+
+    val response = 
verifyNoThrottling[FindCoordinatorResponse](requestChannelRequest)
+    assertEquals(expectedResponse, response.data)
+  }
+
   @Test
   def testMetadataAutoTopicCreationForOffsetTopic(): Unit = {
     testMetadataAutoTopicCreation(Topic.GROUP_METADATA_TOPIC_NAME, 
enableAutoTopicCreation = true,

Reply via email to