This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new cbfbbe833d9 KAFKA-19234: broker should return UNAUTHORIZATION error 
for non-existing topic in produce request (#19635)
cbfbbe833d9 is described below

commit cbfbbe833d9f3cff650d354ee3cc03fff76e1ee6
Author: PoAn Yang <[email protected]>
AuthorDate: Wed May 14 11:56:09 2025 -0500

    KAFKA-19234: broker should return UNAUTHORIZATION error for non-existing 
topic in produce request (#19635)
    
    Since topic name is sensitive information, it should return a
    TOPIC_AUTHORIZATION_FAILED error for non-existing topic. The Fetch
    request also follows this pattern.
    
    Co-authored-by: John Doe <[email protected]>, Ken Huang 
<[email protected]>, Jhen-Yung Hsu <[email protected]>
    
    Reviewers: Jun Rao  <[email protected]>, Chia-Ping Tsai
     <[email protected]>, TaiJuWu  <[email protected]>, TengYao Chi
     <[email protected]>, Ken Huang  <[email protected]>, Jhen-Yung Hsu
     <[email protected]>
---
 core/src/main/scala/kafka/server/KafkaApis.scala   |   7 +-
 .../kafka/api/AuthorizerIntegrationTest.scala      | 188 ++++++++++++++++++---
 2 files changed, 164 insertions(+), 31 deletions(-)

diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 72620d8964c..e8c16c4e881 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -411,10 +411,9 @@ class KafkaApis(val requestChannel: RequestChannel,
         }
 
         val topicPartition = new TopicPartition(topicName, partition.index())
-        if (topicName.isEmpty)
+        // To be compatible with the old version, only return UNKNOWN_TOPIC_ID 
if request version uses topicId, but the corresponding topic name can't be 
found.
+        if (topicName.isEmpty && request.header.apiVersion > 12)
           nonExistingTopicResponses += new TopicIdPartition(topicId, 
topicPartition) -> new PartitionResponse(Errors.UNKNOWN_TOPIC_ID)
-        else if (!metadataCache.contains(topicPartition))
-          nonExistingTopicResponses += new TopicIdPartition(topicId, 
topicPartition) -> new PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
         else
           topicIdToPartitionData += new TopicIdPartition(topicId, 
topicPartition) -> partition
       }
@@ -429,6 +428,8 @@ class KafkaApis(val requestChannel: RequestChannel,
       val memoryRecords = partition.records.asInstanceOf[MemoryRecords]
       if (!authorizedTopics.contains(topicIdPartition.topic))
         unauthorizedTopicResponses += topicIdPartition -> new 
PartitionResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
+      else if (!metadataCache.contains(topicIdPartition.topicPartition))
+        nonExistingTopicResponses += topicIdPartition -> new 
PartitionResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
       else
         try {
           ProduceRequest.validateRecords(request.header.apiVersion, 
memoryRecords)
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 47f3de97cf1..0df8470fe02 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -53,7 +53,7 @@ import org.apache.kafka.security.authorizer.AclEntry
 import org.apache.kafka.security.authorizer.AclEntry.WILDCARD_HOST
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.{CsvSource, MethodSource}
+import org.junit.jupiter.params.provider.{MethodSource, ValueSource}
 
 import java.util.Collections.singletonList
 import org.apache.kafka.common.message.MetadataRequestData.MetadataRequestTopic
@@ -296,22 +296,22 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     new requests.MetadataRequest.Builder(List(topic).asJava, 
allowAutoTopicCreation).build()
   }
 
-  private def createProduceRequestWithId(id: Uuid) = {
+  private def createProduceRequest(name: String, id: Uuid, version: Short) = {
     requests.ProduceRequest.builder(new ProduceRequestData()
-      .setTopicData(new ProduceRequestData.TopicProduceDataCollection(
-        Collections.singletonList(new ProduceRequestData.TopicProduceData()
-          .setTopicId(id).setPartitionData(Collections.singletonList(
-          new ProduceRequestData.PartitionProduceData()
-            .setIndex(tp.partition)
-            .setRecords(MemoryRecords.withRecords(Compression.NONE, new 
SimpleRecord("test".getBytes))))))
-        .iterator))
-      .setAcks(1.toShort)
-      .setTimeoutMs(5000))
-      .build()
+        .setTopicData(new ProduceRequestData.TopicProduceDataCollection(
+          util.List.of(new ProduceRequestData.TopicProduceData()
+              .setName(name)
+              .setTopicId(id)
+              .setPartitionData(util.List.of(
+                new ProduceRequestData.PartitionProduceData()
+                  .setIndex(tp.partition)
+                  .setRecords(MemoryRecords.withRecords(Compression.NONE, new 
SimpleRecord("test".getBytes))))))
+            .iterator))
+        .setAcks(1.toShort)
+        .setTimeoutMs(5000))
+      .build(version)
   }
 
-  private def createProduceRequest = 
createProduceRequestWithId(getTopicIds().getOrElse(tp.topic, Uuid.ZERO_UUID))
-
   private def createFetchRequest = {
     val partitionMap = new util.LinkedHashMap[TopicPartition, 
requests.FetchRequest.PartitionData]
     partitionMap.put(tp, new 
requests.FetchRequest.PartitionData(getTopicIds().getOrElse(tp.topic, 
Uuid.ZERO_UUID),
@@ -326,6 +326,13 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     requests.FetchRequest.Builder.forConsumer(version, 100, Int.MaxValue, 
partitionMap).build()
   }
 
+  private def createFetchRequestWithEmptyTopicNameAndZeroTopicId(version: 
Short) = {
+    val partitionMap = new util.LinkedHashMap[TopicPartition, 
requests.FetchRequest.PartitionData]
+    partitionMap.put(new TopicPartition("", part),
+      new requests.FetchRequest.PartitionData(Uuid.ZERO_UUID, 0, 0, 100, 
Optional.of(27)))
+    requests.FetchRequest.Builder.forConsumer(version, 100, Int.MaxValue, 
partitionMap).build()
+  }
+
   private def createFetchFollowerRequest = {
     val partitionMap = new util.LinkedHashMap[TopicPartition, 
requests.FetchRequest.PartitionData]
     partitionMap.put(tp, new 
requests.FetchRequest.PartitionData(getTopicIds().getOrElse(tp.topic, 
Uuid.ZERO_UUID),
@@ -836,11 +843,13 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
   @Test
   def testAuthorizationWithTopicExisting(): Unit = {
     //First create the topic so we have a valid topic ID
-    sendRequests(mutable.Map(ApiKeys.CREATE_TOPICS -> createTopicsRequest))
+    createTopicWithBrokerPrincipal(topic)
+    val topicId = getTopicIds()(topic)
+    assertNotNull(topicId)
 
     val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
       ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = true),
-      ApiKeys.PRODUCE -> createProduceRequest,
+      ApiKeys.PRODUCE -> createProduceRequest("", topicId, 
ApiKeys.PRODUCE.latestVersion()),
       ApiKeys.FETCH -> createFetchRequest,
       ApiKeys.LIST_OFFSETS -> createListOffsetsRequest,
       ApiKeys.OFFSET_FETCH -> createOffsetFetchRequest,
@@ -888,7 +897,7 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
       ApiKeys.DELETE_TOPICS -> deleteTopicsRequest
     )
 
-    sendRequests(requestKeyToRequest, true)
+    sendRequests(requestKeyToRequest)
   }
 
   /*
@@ -900,7 +909,7 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     val topicNames = Map(id -> "topic")
     val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
       ApiKeys.METADATA -> createMetadataRequest(allowAutoTopicCreation = 
false),
-      ApiKeys.PRODUCE -> createProduceRequestWithId(id),
+      ApiKeys.PRODUCE -> createProduceRequest("", id, 
ApiKeys.PRODUCE.latestVersion()),
       ApiKeys.FETCH -> createFetchRequestWithUnknownTopic(id, 
ApiKeys.FETCH.latestVersion()),
       ApiKeys.LIST_OFFSETS -> createListOffsetsRequest,
       ApiKeys.OFFSET_COMMIT -> createOffsetCommitRequest,
@@ -921,8 +930,76 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     sendRequests(requestKeyToRequest, false, topicNames)
   }
 
+  /**
+   * Test that the produce request fails with TOPIC_AUTHORIZATION_FAILED if 
the client doesn't have permission
+   * and topic name is used in the request. Even if the topic doesn't exist, 
we return TOPIC_AUTHORIZATION_FAILED to
+   * prevent leaking the topic name.
+   * This case covers produce request version from oldest to 12.
+   * The newer version is covered by testAuthorizationWithTopicNotExisting and 
testAuthorizationWithTopicExisting.
+   */
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testAuthorizationProduceVersionFromOldestTo12(withTopicExisting: 
Boolean): Unit = {
+    if (withTopicExisting) {
+      createTopicWithBrokerPrincipal(topic)
+    }
+
+    for (version <- ApiKeys.PRODUCE.oldestVersion to 12) {
+      val request = createProduceRequest(topic, Uuid.ZERO_UUID, 
version.toShort)
+      val response = connectAndReceive[AbstractResponse](request, listenerName 
= listenerName)
+      val errorCode = response.asInstanceOf[ProduceResponse]
+        .data()
+        .responses()
+        .find(topic, Uuid.ZERO_UUID)
+        .partitionResponses.asScala.find(_.index == part).get
+        .errorCode
+
+      assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), errorCode, 
s"unexpected error for produce request version $version")
+    }
+  }
+
+  /**
+   * Test that the produce request fails with UNKNOWN_TOPIC_ID if topic id is 
zero when request version >= 13.
+   * The produce request only supports topic id above version 13.
+   */
+  @Test
+  def testZeroTopicIdForProduceVersionFrom13ToNewest(): Unit = {
+    for (version <- 13 to ApiKeys.PRODUCE.latestVersion()) {
+      val request = createProduceRequest("", Uuid.ZERO_UUID, version.toShort)
+      val response = connectAndReceive[AbstractResponse](request, listenerName 
= listenerName)
+      val errorCode = response.asInstanceOf[ProduceResponse]
+        .data()
+        .responses()
+        .find("", Uuid.ZERO_UUID)
+        .partitionResponses.asScala.find(_.index == part).get
+        .errorCode
+
+      assertEquals(Errors.UNKNOWN_TOPIC_ID.code(), errorCode, s"unexpected 
error for produce request version $version")
+    }
+  }
+
+  /**
+   * Test that the produce request fails with TOPIC_AUTHORIZATION_FAILED if 
topic name is empty when request version <= 12.
+   * The produce request only supports topic name below version 12.
+   */
+  @Test
+  def testEmptyTopicNameForProduceVersionFromOldestTo12(): Unit = {
+    for (version <- ApiKeys.PRODUCE.oldestVersion() to 12) {
+      val request = createProduceRequest("", Uuid.ZERO_UUID, version.toShort)
+      val response = connectAndReceive[AbstractResponse](request, listenerName 
= listenerName)
+      val errorCode = response.asInstanceOf[ProduceResponse]
+        .data()
+        .responses()
+        .find("", Uuid.ZERO_UUID)
+        .partitionResponses.asScala.find(_.index == part).get
+        .errorCode
+
+      assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), errorCode, 
s"unexpected error for produce request version $version")
+    }
+  }
+
   @ParameterizedTest
-  @CsvSource(value = Array("false", "true"))
+  @ValueSource(booleans = Array(true, false))
   def testTopicIdAuthorization(withTopicExisting: Boolean): Unit = {
     val topicId = if (withTopicExisting) {
       createTopicWithBrokerPrincipal(topic)
@@ -971,18 +1048,73 @@ class AuthorizerIntegrationTest extends 
AbstractAuthorizerIntegrationTest {
     }
   }
 
-  /*
-   * even if the topic doesn't exist, request APIs should not leak the topic 
name
+  /**
+   * Test that the fetch request fails with TOPIC_AUTHORIZATION_FAILED if the 
client doesn't have permission
+   * and topic name is used in the request. Even if the topic doesn't exist, 
we return TOPIC_AUTHORIZATION_FAILED to
+   * prevent leaking the topic name.
+   * This case covers fetch request version from oldest to 12.
+   * The newer version is covered by testAuthorizationWithTopicNotExisting and 
testAuthorizationWithTopicExisting.
    */
-  @Test
-  def testAuthorizationFetchV12WithTopicNotExisting(): Unit = {
+  @ParameterizedTest
+  @ValueSource(booleans = Array(true, false))
+  def testAuthorizationFetchVersionFromOldestTo12(withTopicExisting: Boolean): 
Unit = {
+    if (withTopicExisting) {
+      createTopicWithBrokerPrincipal(topic)
+    }
+
     val id = Uuid.ZERO_UUID
-    val topicNames = Map(id -> "topic")
-    val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
-      ApiKeys.FETCH -> createFetchRequestWithUnknownTopic(id, 12),
-    )
+    val topicNames = Map(id -> topic)
+    for (version <- ApiKeys.FETCH.oldestVersion to 12) {
+      val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, 
AbstractRequest](
+        ApiKeys.FETCH -> createFetchRequestWithUnknownTopic(id, 
version.toShort),
+      )
 
-    sendRequests(requestKeyToRequest, false, topicNames)
+      sendRequests(requestKeyToRequest, withTopicExisting, topicNames)
+    }
+  }
+
+  /**
+   * Test that the fetch request fails with UNKNOWN_TOPIC_ID if topic id is 
zero when request version >= 13.
+   * The fetch request only supports topic id above version 13.
+   */
+  @Test
+  def testZeroTopicIdForFetchVersionFrom13ToNewest(): Unit = {
+    for (version <- 13 to ApiKeys.FETCH.latestVersion) {
+      val request = 
createFetchRequestWithEmptyTopicNameAndZeroTopicId(version.toShort)
+      val response = connectAndReceive[AbstractResponse](request, listenerName 
= listenerName)
+
+      val errorCode = response.asInstanceOf[FetchResponse]
+        .data()
+        .responses()
+        .get(0)
+        .partitions()
+        .get(0)
+        .errorCode
+
+      assertEquals(Errors.UNKNOWN_TOPIC_ID.code(), errorCode, s"unexpected 
error for fetch request version $version")
+    }
+  }
+
+  /**
+   * Test that the fetch request fails with TOPIC_AUTHORIZATION_FAILED if 
topic name is empty when request version <= 12.
+   * The fetch request only supports topic name below version 12.
+   */
+  @Test
+  def testEmptyTopicNameForFetchVersionFromOldestTo12(): Unit = {
+    for (version <- ApiKeys.FETCH.oldestVersion to 12) {
+      val request = 
createFetchRequestWithEmptyTopicNameAndZeroTopicId(version.toShort)
+      val response = connectAndReceive[AbstractResponse](request, listenerName 
= listenerName)
+
+      val errorCode = response.asInstanceOf[FetchResponse]
+        .data()
+        .responses()
+        .get(0)
+        .partitions()
+        .get(0)
+        .errorCode
+
+      assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED.code(), errorCode, 
s"unexpected error for fetch request version $version")
+    }
   }
 
   @Test

Reply via email to