This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new cbfbbe833d9 KAFKA-19234: broker should return UNAUTHORIZATION error
for non-existing topic in produce request (#19635)
cbfbbe833d9 is described below
commit cbfbbe833d9f3cff650d354ee3cc03fff76e1ee6
Author: PoAn Yang <[email protected]>
AuthorDate: Wed May 14 11:56:09 2025 -0500
KAFKA-19234: broker should return UNAUTHORIZATION error for non-existing
topic in produce request (#19635)
Since topic name is sensitive information, it should return a
TOPIC_AUTHORIZATION_FAILED error for non-existing topic. The Fetch
request also follows this pattern.
Co-authored-by: John Doe <[email protected]>, Ken Huang
<[email protected]>, Jhen-Yung Hsu <[email protected]>
Reviewers: Jun Rao <[email protected]>, Chia-Ping Tsai
<[email protected]>, TaiJuWu <[email protected]>, TengYao Chi
<[email protected]>, Ken Huang <[email protected]>, Jhen-Yung Hsu
<[email protected]>
---
core/src/main/scala/kafka/server/KafkaApis.scala | 7 +-
.../kafka/api/AuthorizerIntegrationTest.scala | 188 ++++++++++++++++++---
2 files changed, 164 insertions(+), 31 deletions(-)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 72620d8964c..e8c16c4e881 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -411,10 +411,9 @@ class KafkaApis(val requestChannel: RequestChannel,
}
val topicPartition = new TopicPartition(topicName, partition.index())
- if (topicName.isEmpty)
+ // To be compatible with the old version, only return UNKNOWN_TOPIC_ID
if request version uses topicId, but the corresponding topic name can't be
found.
+ if (topicName.isEmpty && request.header.apiVersion > 12)
nonExistingTopicResponses += new TopicIdPartition(topicId,
topicPartition) -> new PartitionResponse(Errors.UNKNOWN_TOPIC_ID)
- else if (!metadataCache.contains(topicPartition))
- nonExistingTopicResponses += new TopicIdPartition(topicId,
topicPartition) -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
else
topicIdToPartitionData += new TopicIdPartition(topicId,
topicPartition) -> partition
}
@@ -429,6 +428,8 @@ class KafkaApis(val requestChannel: RequestChannel,
val memoryRecords = partition.records.asInstanceOf[MemoryRecords]
if (!authorizedTopics.contains(topicIdPartition.topic))
unauthorizedTopicResponses += topicIdPartition -> new
PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
+ else if (!metadataCache.contains(topicIdPartition.topicPartition))
+ nonExistingTopicResponses += topicIdPartition -> new
PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
else
try {
ProduceRequest.validateRecords(request.header.apiVersion,
memoryRecords)
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 47f3de97cf1..0df8470fe02 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -53,7 +53,7 @@ import org.apache.kafka.security.authorizer.AclEntry
import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{CsvSource, MethodSource}
+import org.junit.jupiter.params.provider.{MethodSource, ValueSource}
import java.util.Collections.singletonList
import org.apache.kafka.common.message.MetadataRequestData.MetadataRequestTopic
@@ -296,22 +296,22 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
new requests.MetadataRequest.Builder(List(topic).asJava,
allowAutoTopicCreation).build()
}
- private def createProduceRequestWithId(id: Uuid) = {
+ private def createProduceRequest(name: String, id: Uuid, version: Short) = {
requests.ProduceRequest.builder(new ProduceRequestData()
- .setTopicData(new ProduceRequestData.TopicProduceDataCollection(
- Collections.singletonList(new ProduceRequestData.TopicProduceData()
- .setTopicId(id).setPartitionData(Collections.singletonList(
- new ProduceRequestData.PartitionProduceData()
- .setIndex(tp.partition)
- .setRecords(MemoryRecords.withRecords(Compression.NONE, new
SimpleRecord("test".getBytes))))))
- .iterator))
- .setAcks(1.toShort)
- .setTimeoutMs(5000))
- .build()
+ .setTopicData(new ProduceRequestData.TopicProduceDataCollection(
+ util.List.of(new ProduceRequestData.TopicProduceData()
+ .setName(name)
+ .setTopicId(id)
+ .setPartitionData(util.List.of(
+ new ProduceRequestData.PartitionProduceData()
+ .setIndex(tp.partition)
+ .setRecords(MemoryRecords.withRecords(Compression.NONE, new
SimpleRecord("test".getBytes))))))
+ .iterator))
+ .setAcks(1.toShort)
+ .setTimeoutMs(5000))
+ .build(version)
}
- private def createProduceRequest =
createProduceRequestWithId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID))
-
private def createFetchRequest = {
val partitionMap = new util.LinkedHashMap[TopicPartition,
requests.FetchRequest.PartitionData]
partitionMap.put(tp, new
requests.FetchRequest.PartitionData(getTopicIds().getOrElse(tp.topic,
Uuid.ZERO_UUID),
@@ -326,6 +326,13 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
requests.FetchRequest.Builder.forConsumer(version, 100, Int.MaxValue,
partitionMap).build()
}
+ private def createFetchRequestWithEmptyTopicNameAndZeroTopicId(version:
Short) = {
+ val partitionMap = new util.LinkedHashMap[TopicPartition,
requests.FetchRequest.PartitionData]
+ partitionMap.put(new TopicPartition("", part),
+ new requests.FetchRequest.PartitionData(Uuid.ZERO_UUID, 0, 0, 100,
Optional.of(27)))
+ requests.FetchRequest.Builder.forConsumer(version, 100, Int.MaxValue,
partitionMap).build()
+ }
+
private def createFetchFollowerRequest = {
val partitionMap = new util.LinkedHashMap[TopicPartition,
requests.FetchRequest.PartitionData]
partitionMap.put(tp, new
requests.FetchRequest.PartitionData(getTopicIds().getOrElse(tp.topic,
Uuid.ZERO_UUID),
@@ -836,11 +843,13 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
@Test
def testAuthorizationWithTopicExisting(): Unit = {
//First create the topic so we have a valid topic ID
- sendRequests(mutable.Map(ApiKeys.CREATE_TOPICS -> createTopicsRequest))
+ createTopicWithBrokerPrincipal(topic)
+ val topicId = getTopicIds()(topic)
+ assertNotNull(topicId)
val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = true),
- ApiKeys.PRODUCE -> createProduceRequest,
+ ApiKeys.PRODUCE -> createProduceRequest("", topicId,
ApiKeys.PRODUCE.latestVersion()),
ApiKeys.FETCH -> createFetchRequest,
ApiKeys.LIST_OFFSETS -> createListOffsetsRequest,
ApiKeys.OFFSET_FETCH -> createOffsetFetchRequest,
@@ -888,7 +897,7 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
ApiKeys.DELETE_TOPICS -> deleteTopicsRequest
)
- sendRequests(requestKeyToRequest, true)
+ sendRequests(requestKeyToRequest)
}
/*
@@ -900,7 +909,7 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
val topicNames = Map(id -> "topic")
val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation =
false),
- ApiKeys.PRODUCE -> createProduceRequestWithId(id),
+ ApiKeys.PRODUCE -> createProduceRequest("", id,
ApiKeys.PRODUCE.latestVersion()),
ApiKeys.FETCH -> createFetchRequestWithUnknownTopic(id,
ApiKeys.FETCH.latestVersion()),
ApiKeys.LIST_OFFSETS -> createListOffsetsRequest,
ApiKeys.OFFSET_COMMIT -> createOffsetCommitRequest,
@@ -921,8 +930,76 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
sendRequests(requestKeyToRequest, false, topicNames)
}
+ /**
+ * Test that the produce request fails with TOPIC_AUTHORIZATION_FAILED if
the client doesn't have permission
+ * and topic name is used in the request. Even if the topic doesn't exist,
we return TOPIC_AUTHORIZATION_FAILED to
+ * prevent leaking the topic name.
+ * This case covers produce request version from oldest to 12.
+ * The newer version is covered by testAuthorizationWithTopicNotExisting and
testAuthorizationWithTopicExisting.
+ */
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testAuthorizationProduceVersionFromOldestTo12(withTopicExisting:
Boolean): Unit = {
+ if (withTopicExisting) {
+ createTopicWithBrokerPrincipal(topic)
+ }
+
+ for (version <- ApiKeys.PRODUCE.oldestVersion to 12) {
+ val request = createProduceRequest(topic, Uuid.ZERO_UUID,
version.toShort)
+ val response = connectAndReceive[AbstractResponse](request, listenerName
= listenerName)
+ val errorCode = response.asInstanceOf[ProduceResponse]
+ .data()
+ .responses()
+ .find(topic, Uuid.ZERO_UUID)
+ .partitionResponses.asScala.find(_.index == part).get
+ .errorCode
+
+ assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), errorCode,
s"unexpected error for produce request version $version")
+ }
+ }
+
+ /**
+ * Test that the produce request fails with UNKNOWN_TOPIC_ID if topic id is
zero when request version >= 13.
+ * The produce request only supports topic id above version 13.
+ */
+ @Test
+ def testZeroTopicIdForProduceVersionFrom13ToNewest(): Unit = {
+ for (version <- 13 to ApiKeys.PRODUCE.latestVersion()) {
+ val request = createProduceRequest("", Uuid.ZERO_UUID, version.toShort)
+ val response = connectAndReceive[AbstractResponse](request, listenerName
= listenerName)
+ val errorCode = response.asInstanceOf[ProduceResponse]
+ .data()
+ .responses()
+ .find("", Uuid.ZERO_UUID)
+ .partitionResponses.asScala.find(_.index == part).get
+ .errorCode
+
+ assertEquals(Errors.UNKNOWN_TOPIC_ID.code(), errorCode, s"unexpected
error for produce request version $version")
+ }
+ }
+
+ /**
+ * Test that the produce request fails with TOPIC_AUTHORIZATION_FAILED if
topic name is empty when request version <= 12.
+ * The produce request only supports topic name below version 12.
+ */
+ @Test
+ def testEmptyTopicNameForProduceVersionFromOldestTo12(): Unit = {
+ for (version <- ApiKeys.PRODUCE.oldestVersion() to 12) {
+ val request = createProduceRequest("", Uuid.ZERO_UUID, version.toShort)
+ val response = connectAndReceive[AbstractResponse](request, listenerName
= listenerName)
+ val errorCode = response.asInstanceOf[ProduceResponse]
+ .data()
+ .responses()
+ .find("", Uuid.ZERO_UUID)
+ .partitionResponses.asScala.find(_.index == part).get
+ .errorCode
+
+ assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), errorCode,
s"unexpected error for produce request version $version")
+ }
+ }
+
@ParameterizedTest
- @CsvSource(value = Array("false", "true"))
+ @ValueSource(booleans = Array(true, false))
def testTopicIdAuthorization(withTopicExisting: Boolean): Unit = {
val topicId = if (withTopicExisting) {
createTopicWithBrokerPrincipal(topic)
@@ -971,18 +1048,73 @@ class AuthorizerIntegrationTest extends
AbstractAuthorizerIntegrationTest {
}
}
- /*
- * even if the topic doesn't exist, request APIs should not leak the topic
name
+ /**
+ * Test that the fetch request fails with TOPIC_AUTHORIZATION_FAILED if the
client doesn't have permission
+ * and topic name is used in the request. Even if the topic doesn't exist,
we return TOPIC_AUTHORIZATION_FAILED to
+ * prevent leaking the topic name.
+ * This case covers fetch request version from oldest to 12.
+ * The newer version is covered by testAuthorizationWithTopicNotExisting and
testAuthorizationWithTopicExisting.
*/
- @Test
- def testAuthorizationFetchV12WithTopicNotExisting(): Unit = {
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testAuthorizationFetchVersionFromOldestTo12(withTopicExisting: Boolean):
Unit = {
+ if (withTopicExisting) {
+ createTopicWithBrokerPrincipal(topic)
+ }
+
val id = Uuid.ZERO_UUID
- val topicNames = Map(id -> "topic")
- val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
- ApiKeys.FETCH -> createFetchRequestWithUnknownTopic(id, 12),
- )
+ val topicNames = Map(id -> topic)
+ for (version <- ApiKeys.FETCH.oldestVersion to 12) {
+ val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys,
AbstractRequest](
+ ApiKeys.FETCH -> createFetchRequestWithUnknownTopic(id,
version.toShort),
+ )
- sendRequests(requestKeyToRequest, false, topicNames)
+ sendRequests(requestKeyToRequest, withTopicExisting, topicNames)
+ }
+ }
+
+ /**
+ * Test that the fetch request fails with UNKNOWN_TOPIC_ID if topic id is
zero when request version >= 13.
+ * The fetch request only supports topic id above version 13.
+ */
+ @Test
+ def testZeroTopicIdForFetchVersionFrom13ToNewest(): Unit = {
+ for (version <- 13 to ApiKeys.FETCH.latestVersion) {
+ val request =
createFetchRequestWithEmptyTopicNameAndZeroTopicId(version.toShort)
+ val response = connectAndReceive[AbstractResponse](request, listenerName
= listenerName)
+
+ val errorCode = response.asInstanceOf[FetchResponse]
+ .data()
+ .responses()
+ .get(0)
+ .partitions()
+ .get(0)
+ .errorCode
+
+ assertEquals(Errors.UNKNOWN_TOPIC_ID.code(), errorCode, s"unexpected
error for fetch request version $version")
+ }
+ }
+
+ /**
+ * Test that the fetch request fails with TOPIC_AUTHORIZATION_FAILED if
topic name is empty when request version <= 12.
+ * The fetch request only supports topic name below version 12.
+ */
+ @Test
+ def testEmptyTopicNameForFetchVersionFromOldestTo12(): Unit = {
+ for (version <- ApiKeys.FETCH.oldestVersion to 12) {
+ val request =
createFetchRequestWithEmptyTopicNameAndZeroTopicId(version.toShort)
+ val response = connectAndReceive[AbstractResponse](request, listenerName
= listenerName)
+
+ val errorCode = response.asInstanceOf[FetchResponse]
+ .data()
+ .responses()
+ .get(0)
+ .partitions()
+ .get(0)
+ .errorCode
+
+ assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), errorCode,
s"unexpected error for fetch request version $version")
+ }
}
@Test