This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 95ef3449404 MINOR: FindCoordinator API should return INVALID_REQUEST
when share partition key is invalid (#19272)
95ef3449404 is described below
commit 95ef3449404fecd401822b707709ab9c8d06490b
Author: David Jacot <[email protected]>
AuthorDate: Mon Mar 24 16:29:20 2025 +0100
MINOR: FindCoordinator API should return INVALID_REQUEST when share
partition key is invalid (#19272)
At the moment, the FindCoordinator API returns an `UNKNOWN_SERVER_ERROR`
error when the share partition key is invalid. It seems that the aim was
to return an `INVALID_REQUEST` error but the code has a small bug
preventing it from working as expected.
Reviewers: Apoorv Mittal <[email protected]>, Andrew Schofield
<[email protected]>
---
core/src/main/scala/kafka/server/KafkaApis.scala | 2 +-
.../scala/unit/kafka/server/KafkaApisTest.scala | 24 ++++++++++++++++++++++
2 files changed, 25 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index b6fbce294aa..a7ddfd8aee4 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1169,7 +1169,7 @@ class KafkaApis(val requestChannel: RequestChannel,
} catch {
case e: IllegalArgumentException =>
error(s"Share coordinator key is invalid", e)
- (Errors.INVALID_REQUEST, Node.noNode())
+ return (Errors.INVALID_REQUEST, Node.noNode)
}
}
val (partition, internalTopicName) = CoordinatorType.forId(keyType)
match {
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index d02f35e71c6..de111e74ff2 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -765,6 +765,30 @@ class KafkaApisTest extends Logging {
}
}
+ @Test
+ def testFindCoordinatorWithInvalidSharePartitionKey(): Unit = {
+ val request = new FindCoordinatorRequestData()
+ .setKeyType(CoordinatorType.SHARE.id)
+ .setCoordinatorKeys(asList(""))
+
+ val requestChannelRequest = buildRequest(new
FindCoordinatorRequest.Builder(request).build())
+
+ kafkaApis = createKafkaApis()
+ kafkaApis.handle(requestChannelRequest, RequestLocal.noCaching)
+
+ val expectedResponse = new FindCoordinatorResponseData()
+ .setCoordinators(asList(
+ new FindCoordinatorResponseData.Coordinator()
+ .setKey("")
+ .setErrorCode(Errors.INVALID_REQUEST.code)
+ .setNodeId(-1)
+ .setHost("")
+ .setPort(-1)))
+
+ val response =
verifyNoThrottling[FindCoordinatorResponse](requestChannelRequest)
+ assertEquals(expectedResponse, response.data)
+ }
+
@Test
def testMetadataAutoTopicCreationForOffsetTopic(): Unit = {
testMetadataAutoTopicCreation(Topic.GROUP_METADATA_TOPIC_NAME,
enableAutoTopicCreation = true,