This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch 2.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.8 by this push:
new 7633ea8 KAFKA-12394; Return `TOPIC_AUTHORIZATION_FAILED` in delete
topic response if no describe permission (#10223)
7633ea8 is described below
commit 7633ea89ca9135d8c1773db9096a5b21e80ed885
Author: Jason Gustafson <[email protected]>
AuthorDate: Tue Mar 2 10:20:07 2021 -0800
KAFKA-12394; Return `TOPIC_AUTHORIZATION_FAILED` in delete topic response
if no describe permission (#10223)
We now accept topicIds in the `DeleteTopic` request. If the client
principal does not have `Describe` permission, then we return
`TOPIC_AUTHORIZATION_FAILED`. This is justified because the topicId is not
considered sensitive. However, in this case, we should not return the name of
the topic in the response since we do consider it sensitive.
Reviewers: David Jacot <[email protected]>, dengziming
<[email protected]>, Justine Olshan <[email protected]>, Chia-Ping
Tsai <[email protected]>
---
.../scala/kafka/controller/ControllerContext.scala | 4 +
core/src/main/scala/kafka/server/KafkaApis.scala | 37 +++--
.../kafka/api/AuthorizerIntegrationTest.scala | 132 +++++++----------
.../scala/unit/kafka/server/KafkaApisTest.scala | 161 ++++++++++++++++++++-
4 files changed, 235 insertions(+), 99 deletions(-)
diff --git a/core/src/main/scala/kafka/controller/ControllerContext.scala
b/core/src/main/scala/kafka/controller/ControllerContext.scala
index 0428301..379196a 100644
--- a/core/src/main/scala/kafka/controller/ControllerContext.scala
+++ b/core/src/main/scala/kafka/controller/ControllerContext.scala
@@ -463,6 +463,10 @@ class ControllerContext {
}.keySet
}
+ def topicName(topicId: Uuid): Option[String] = {
+ topicNames.get(topicId)
+ }
+
def clearPartitionLeadershipInfo(): Unit = partitionLeadershipInfo.clear()
def partitionWithLeadersCount: Int = partitionLeadershipInfo.size
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 5dafdd6..c3ae6f0 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1870,7 +1870,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (topic.name() != null && topic.topicId() != Uuid.ZERO_UUID)
throw new InvalidRequestException("Topic name and topic ID can not
both be specified.")
val name = if (topic.topicId() == Uuid.ZERO_UUID) topic.name()
- else
zkSupport.controller.controllerContext.topicNames.getOrElse(topic.topicId(),
null)
+ else
zkSupport.controller.controllerContext.topicName(topic.topicId).orNull
results.add(new DeletableTopicResult()
.setName(name)
.setTopicId(topic.topicId()))
@@ -1880,20 +1880,27 @@ class KafkaApis(val requestChannel: RequestChannel,
val authorizedDeleteTopics =
authHelper.filterByAuthorized(request.context, DELETE, TOPIC,
results.asScala.filter(result => result.name() != null))(_.name)
results.forEach { topic =>
- val unresolvedTopicId = !(topic.topicId() == Uuid.ZERO_UUID) &&
topic.name() == null
- if (!config.usesTopicId &&
topicIdsFromRequest.contains(topic.topicId)) {
- topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
- topic.setErrorMessage("Topic IDs are not supported on the server.")
- } else if (unresolvedTopicId)
- topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
- else if (topicIdsFromRequest.contains(topic.topicId) &&
!authorizedDescribeTopics(topic.name))
- topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
- else if (!authorizedDeleteTopics.contains(topic.name))
- topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
- else if (!metadataCache.contains(topic.name))
- topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
- else
- toDelete += topic.name
+ val unresolvedTopicId = topic.topicId() != Uuid.ZERO_UUID &&
topic.name() == null
+ if (!config.usesTopicId &&
topicIdsFromRequest.contains(topic.topicId)) {
+ topic.setErrorCode(Errors.UNSUPPORTED_VERSION.code)
+ topic.setErrorMessage("Topic IDs are not supported on the server.")
+ } else if (unresolvedTopicId) {
+ topic.setErrorCode(Errors.UNKNOWN_TOPIC_ID.code)
+ } else if (topicIdsFromRequest.contains(topic.topicId) &&
!authorizedDescribeTopics.contains(topic.name)) {
+
+ // Because the client does not have Describe permission, the name
should
+ // not be returned in the response. Note, however, that we do not
consider
+ // the topicId itself to be sensitive, so there is no reason to
obscure
+ // this case with `UNKNOWN_TOPIC_ID`.
+ topic.setName(null)
+ topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+ } else if (!authorizedDeleteTopics.contains(topic.name)) {
+ topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
+ } else if (!metadataCache.contains(topic.name)) {
+ topic.setErrorCode(Errors.UNKNOWN_TOPIC_OR_PARTITION.code)
+ } else {
+ toDelete += topic.name
+ }
}
// If no authorized topics return immediately
if (toDelete.isEmpty)
diff --git
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index fec75eb..634475b 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -62,6 +62,8 @@ import org.apache.kafka.common.{ElectionType, IsolationLevel,
Node, TopicPartiti
import org.apache.kafka.test.{TestUtils => JTestUtils}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
@@ -240,9 +242,14 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
})
)
- val requestKeysToErrorWithIds = (id: Uuid) => Map[ApiKeys, Nothing =>
Errors](
- ApiKeys.DELETE_TOPICS -> ((resp: requests.DeleteTopicsResponse) =>
Errors.forCode(resp.data.responses.asScala.find(_.topicId ==
id).get.errorCode()))
- )
+ def findErrorForTopicId(id: Uuid, response: AbstractResponse): Errors = {
+ response match {
+ case res: DeleteTopicsResponse =>
+ Errors.forCode(res.data.responses.asScala.find(_.topicId ==
id).get.errorCode)
+ case _ =>
+ fail(s"Unexpected response type $response")
+ }
+ }
val requestKeysToAcls = Map[ApiKeys, Map[ResourcePattern,
Set[AccessControlEntry]]](
ApiKeys.METADATA -> topicDescribeAcl,
@@ -528,12 +535,12 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
.setTimeoutMs(5000)).build()
}
- private def deleteTopicsWithIdsRequest(id: Uuid = getTopicIds()(topic)):
DeleteTopicsRequest = {
+ private def deleteTopicsWithIdsRequest(topicId: Uuid): DeleteTopicsRequest =
{
new DeleteTopicsRequest.Builder(
new DeleteTopicsRequestData()
.setTopics(Collections.singletonList(
new DeleteTopicsRequestData.DeleteTopicState()
- .setTopicId(id)))
+ .setTopicId(topicId)))
.setTimeoutMs(5000)).build()
}
@@ -726,29 +733,53 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
sendRequests(requestKeyToRequest)
}
- @Test
- def testAuthorizationDeleteTopicsIdWithTopicExisting(): Unit = {
- sendRequests(mutable.Map(ApiKeys.CREATE_TOPICS -> createTopicsRequest))
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testTopicIdAuthorization(withTopicExisting: Boolean): Unit = {
+ val topicId = if (withTopicExisting) {
+ createTopic(topic)
+ getTopicIds()(topic)
+ } else {
+ Uuid.randomUuid()
+ }
+
+ val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
+ ApiKeys.DELETE_TOPICS -> deleteTopicsWithIdsRequest(topicId)
+ )
- val id = getTopicIds()(topic)
+ def sendAndVerify(
+ request: AbstractRequest,
+ isAuthorized: Boolean,
+ isDescribeAuthorized: Boolean
+ ): Unit = {
+ val response = connectAndReceive[AbstractResponse](request)
+ val error = findErrorForTopicId(topicId, response)
+ if (!withTopicExisting) {
+ assertEquals(Errors.UNKNOWN_TOPIC_ID, error)
+ } else if (!isDescribeAuthorized || !isAuthorized) {
+ assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED, error)
+ }
+ }
- for ((key, request) <- mutable.Map(ApiKeys.DELETE_TOPICS ->
deleteTopicsWithIdsRequest())) {
+ for ((key, request) <- requestKeyToRequest) {
removeAllClientAcls()
- val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet
- sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized
= false, topicExists = true, describeAuthorized = false, id = id)
+ sendAndVerify(request, isAuthorized = false, isDescribeAuthorized =
false)
+
+ val describeAcls = topicDescribeAcl(topicResource)
+ addAndVerifyAcls(describeAcls, topicResource)
val resourceToAcls = requestKeysToAcls(key)
resourceToAcls.get(topicResource).foreach { acls =>
- val describeAcls = topicDescribeAcl(topicResource)
val isAuthorized = describeAcls == acls
- addAndVerifyAcls(describeAcls, topicResource)
- sendRequestWithIdAndVerifyResponseError(request, resources,
isAuthorized = isAuthorized, topicExists = true, describeAuthorized = true, id
= id)
- removeAllClientAcls()
+ sendAndVerify(request, isAuthorized = isAuthorized,
isDescribeAuthorized = true)
}
- for ((resource, acls) <- resourceToAcls)
+ removeAllClientAcls()
+ for ((resource, acls) <- resourceToAcls) {
addAndVerifyAcls(acls, resource)
- sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized
= true, topicExists = true, describeAuthorized = true, id = id)
+ }
+
+ sendAndVerify(request, isAuthorized = true, isDescribeAuthorized = true)
}
}
@@ -779,33 +810,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
@Test
- def testAuthorizationDeleteTopicsIdWithTopicNotExisting(): Unit = {
- val id = Uuid.randomUuid()
- val requestKeyToRequest = mutable.LinkedHashMap[ApiKeys, AbstractRequest](
- ApiKeys.DELETE_TOPICS -> deleteTopicsWithIdsRequest(id),
- )
-
- for ((key, request) <- requestKeyToRequest) {
- removeAllClientAcls()
- val resources = requestKeysToAcls(key).map(_._1.resourceType).toSet
- sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized
= false, topicExists = false, describeAuthorized = false, id = id)
-
- val resourceToAcls = requestKeysToAcls(key)
- resourceToAcls.get(topicResource).foreach { acls =>
- val describeAcls = topicDescribeAcl(topicResource)
- val isAuthorized = describeAcls == acls
- addAndVerifyAcls(describeAcls, topicResource)
- sendRequestWithIdAndVerifyResponseError(request, resources,
isAuthorized = isAuthorized, topicExists = false, describeAuthorized = true, id
= id)
- removeAllClientAcls()
- }
-
- for ((resource, acls) <- resourceToAcls)
- addAndVerifyAcls(acls, resource)
- sendRequestWithIdAndVerifyResponseError(request, resources, isAuthorized
= true, topicExists = false, describeAuthorized = true, id = id)
- }
- }
-
- @Test
def testCreateTopicAuthorizationWithClusterCreate(): Unit = {
removeAllClientAcls()
val resources = Set[ResourceType](TOPIC)
@@ -2025,44 +2029,6 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
}
}
- private def sendRequestWithIdAndVerifyResponseError(request: AbstractRequest,
- resources: Set[ResourceType],
- isAuthorized: Boolean,
- topicExists: Boolean,
- describeAuthorized: Boolean,
- id: Uuid): AbstractResponse = {
- val apiKey = request.apiKey
- val response = connectAndReceive[AbstractResponse](request)
- val error =
requestKeysToErrorWithIds(id)(apiKey).asInstanceOf[AbstractResponse =>
Errors](response)
-
- val authorizationErrors = resources.flatMap { resourceType =>
- if (resourceType == TOPIC) {
- if (isAuthorized)
- Set(Errors.UNKNOWN_TOPIC_ID,
AclEntry.authorizationError(ResourceType.TOPIC))
- else if (describeAuthorized)
- Set(AclEntry.authorizationError(ResourceType.TOPIC))
- else
- Set(Errors.UNKNOWN_TOPIC_ID)
- } else {
- Set(AclEntry.authorizationError(resourceType))
- }
- }
-
- if (topicExists)
- if (isAuthorized)
- assertFalse(authorizationErrors.contains(error), s"$apiKey should be
allowed. Found unexpected authorization error $error")
- else
- assertTrue(authorizationErrors.contains(error), s"$apiKey should be
forbidden. Found error $error but expected one of $authorizationErrors")
- else if (resources == Set(TOPIC))
- if (isAuthorized)
- assertEquals(Errors.UNKNOWN_TOPIC_ID, error, s"$apiKey had an
unexpected error")
- else {
- assertEquals(Errors.UNKNOWN_TOPIC_ID, error, s"$apiKey had an
unexpected error")
- }
-
- response
- }
-
private def sendRequestAndVerifyResponseError(request: AbstractRequest,
resources: Set[ResourceType],
isAuthorized: Boolean,
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index e80c6eb..b89b29c 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -26,7 +26,7 @@ import java.util.{Collections, Optional, Properties, Random}
import kafka.api.{ApiVersion, KAFKA_0_10_2_IV0, KAFKA_2_2_IV1, LeaderAndIsr}
import kafka.cluster.{Broker, Partition}
-import kafka.controller.KafkaController
+import kafka.controller.{ControllerContext, KafkaController}
import
kafka.coordinator.group.GroupCoordinatorConcurrencyTest.{JoinGroupCallback,
SyncGroupCallback}
import kafka.coordinator.group._
import kafka.coordinator.transaction.{InitProducerIdResult,
TransactionCoordinator}
@@ -76,6 +76,8 @@ import org.easymock.EasyMock._
import org.easymock.{Capture, EasyMock, IAnswer}
import org.junit.jupiter.api.Assertions._
import org.junit.jupiter.api.{AfterEach, Test}
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
import org.mockito.{ArgumentMatchers, Mockito}
import scala.annotation.nowarn
@@ -3390,6 +3392,163 @@ class KafkaApisTest {
}
+ @Test
+ def testDeleteTopicsByIdAuthorization(): Unit = {
+ val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+ val controllerContext: ControllerContext =
EasyMock.mock(classOf[ControllerContext])
+
+ EasyMock.expect(clientControllerQuotaManager.newQuotaFor(
+ EasyMock.anyObject(classOf[RequestChannel.Request]),
+ EasyMock.anyShort()
+ )).andReturn(UnboundedControllerMutationQuota)
+ EasyMock.expect(controller.isActive).andReturn(true)
+
EasyMock.expect(controller.controllerContext).andStubReturn(controllerContext)
+
+ // Try to delete three topics:
+ // 1. One without describe permission
+ // 2. One without delete permission
+ // 3. One which is authorized, but doesn't exist
+
+ expectTopicAuthorization(authorizer, AclOperation.DESCRIBE, Map(
+ "foo" -> AuthorizationResult.DENIED,
+ "bar" -> AuthorizationResult.ALLOWED
+ ))
+
+ expectTopicAuthorization(authorizer, AclOperation.DELETE, Map(
+ "foo" -> AuthorizationResult.DENIED,
+ "bar" -> AuthorizationResult.DENIED
+ ))
+
+ val topicIdsMap = Map(
+ Uuid.randomUuid() -> Some("foo"),
+ Uuid.randomUuid() -> Some("bar"),
+ Uuid.randomUuid() -> None
+ )
+
+ topicIdsMap.foreach { case (topicId, topicNameOpt) =>
+
EasyMock.expect(controllerContext.topicName(topicId)).andReturn(topicNameOpt)
+ }
+
+ val topicDatas = topicIdsMap.keys.map { topicId =>
+ new DeleteTopicsRequestData.DeleteTopicState().setTopicId(topicId)
+ }.toList
+ val deleteRequest = new DeleteTopicsRequest.Builder(new
DeleteTopicsRequestData()
+ .setTopics(topicDatas.asJava))
+ .build(ApiKeys.DELETE_TOPICS.latestVersion)
+
+ val request = buildRequest(deleteRequest)
+ val capturedResponse = expectNoThrottling()
+
+ EasyMock.replay(replicaManager, clientRequestQuotaManager,
clientControllerQuotaManager,
+ requestChannel, txnCoordinator, controller, controllerContext,
authorizer)
+ createKafkaApis(authorizer =
Some(authorizer)).handleDeleteTopicsRequest(request)
+
+ val deleteResponse = readResponse(deleteRequest, capturedResponse)
+ .asInstanceOf[DeleteTopicsResponse]
+
+ topicIdsMap.foreach { case (topicId, nameOpt) =>
+ val response = deleteResponse.data.responses.asScala.find(_.topicId ==
topicId).get
+ nameOpt match {
+ case Some("foo") =>
+ assertNull(response.name)
+ assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED,
Errors.forCode(response.errorCode))
+ case Some("bar") =>
+ assertEquals("bar", response.name)
+ assertEquals(Errors.TOPIC_AUTHORIZATION_FAILED,
Errors.forCode(response.errorCode))
+ case None =>
+ assertNull(response.name)
+ assertEquals(Errors.UNKNOWN_TOPIC_ID,
Errors.forCode(response.errorCode))
+ case _ =>
+ fail("Unexpected topic id/name mapping")
+ }
+ }
+ }
+
+ @ParameterizedTest
+ @ValueSource(booleans = Array(true, false))
+ def testDeleteTopicsByNameAuthorization(usePrimitiveTopicNameArray:
Boolean): Unit = {
+ val authorizer: Authorizer = EasyMock.niceMock(classOf[Authorizer])
+
+ EasyMock.expect(clientControllerQuotaManager.newQuotaFor(
+ EasyMock.anyObject(classOf[RequestChannel.Request]),
+ EasyMock.anyShort()
+ )).andReturn(UnboundedControllerMutationQuota)
+ EasyMock.expect(controller.isActive).andReturn(true)
+
+ // Try to delete three topics:
+ // 1. One without describe permission
+ // 2. One without delete permission
+ // 3. One which is authorized, but doesn't exist
+
+ expectTopicAuthorization(authorizer, AclOperation.DESCRIBE, Map(
+ "foo" -> AuthorizationResult.DENIED,
+ "bar" -> AuthorizationResult.ALLOWED,
+ "baz" -> AuthorizationResult.ALLOWED
+ ))
+
+ expectTopicAuthorization(authorizer, AclOperation.DELETE, Map(
+ "foo" -> AuthorizationResult.DENIED,
+ "bar" -> AuthorizationResult.DENIED,
+ "baz" -> AuthorizationResult.ALLOWED
+ ))
+
+ val deleteRequest = if (usePrimitiveTopicNameArray) {
+ new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData()
+ .setTopicNames(List("foo", "bar", "baz").asJava))
+ .build(5.toShort)
+ } else {
+ val topicDatas = List(
+ new DeleteTopicsRequestData.DeleteTopicState().setName("foo"),
+ new DeleteTopicsRequestData.DeleteTopicState().setName("bar"),
+ new DeleteTopicsRequestData.DeleteTopicState().setName("baz")
+ )
+ new DeleteTopicsRequest.Builder(new DeleteTopicsRequestData()
+ .setTopics(topicDatas.asJava))
+ .build(ApiKeys.DELETE_TOPICS.latestVersion)
+ }
+
+ val request = buildRequest(deleteRequest)
+ val capturedResponse = expectNoThrottling()
+
+ EasyMock.replay(replicaManager, clientRequestQuotaManager,
clientControllerQuotaManager,
+ requestChannel, txnCoordinator, controller, authorizer)
+ createKafkaApis(authorizer =
Some(authorizer)).handleDeleteTopicsRequest(request)
+
+ val deleteResponse = readResponse(deleteRequest, capturedResponse)
+ .asInstanceOf[DeleteTopicsResponse]
+
+ def lookupErrorCode(topic: String): Option[Errors] = {
+ Option(deleteResponse.data.responses().find(topic))
+ .map(result => Errors.forCode(result.errorCode))
+ }
+
+ assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED),
lookupErrorCode("foo"))
+ assertEquals(Some(Errors.TOPIC_AUTHORIZATION_FAILED),
lookupErrorCode("bar"))
+ assertEquals(Some(Errors.UNKNOWN_TOPIC_OR_PARTITION),
lookupErrorCode("baz"))
+ }
+
+ def expectTopicAuthorization(
+ authorizer: Authorizer,
+ aclOperation: AclOperation,
+ topicResults: Map[String, AuthorizationResult]
+ ): Unit = {
+ val expectedActions = topicResults.keys.map { topic =>
+ val pattern = new ResourcePattern(ResourceType.TOPIC, topic,
PatternType.LITERAL)
+ topic -> new Action(aclOperation, pattern, 1, true, true)
+ }.toMap
+
+ val actionsCapture: Capture[util.List[Action]] = EasyMock.newCapture()
+ EasyMock.expect(authorizer.authorize(anyObject[RequestContext],
EasyMock.capture(actionsCapture)))
+ .andAnswer(() => {
+ actionsCapture.getValue.asScala.map { action =>
+ val topic = action.resourcePattern.name
+ assertEquals(expectedActions(topic), action)
+ topicResults(topic)
+ }.asJava
+ })
+ .once()
+ }
+
private def createMockRequest(): RequestChannel.Request = {
val request: RequestChannel.Request =
EasyMock.createNiceMock(classOf[RequestChannel.Request])
val requestHeader: RequestHeader =
EasyMock.createNiceMock(classOf[RequestHeader])