rondagostino commented on code in PR #14083: URL: https://github.com/apache/kafka/pull/14083#discussion_r1298706120
########## core/src/main/scala/kafka/server/KafkaApis.scala: ########## @@ -2999,7 +2999,38 @@ class KafkaApis(val requestChannel: RequestChannel, } def handleCreateTokenRequest(request: RequestChannel.Request): Unit = { + val createTokenRequest = request.body[CreateDelegationTokenRequest] + + val requester = request.context.principal + val ownerPrincipalName = createTokenRequest.data.ownerPrincipalName + val owner = if (ownerPrincipalName == null || ownerPrincipalName.isEmpty) { + request.context.principal + } else { + new KafkaPrincipal(createTokenRequest.data.ownerPrincipalType, ownerPrincipalName) + } + val renewerList = createTokenRequest.data.renewers.asScala.toList.map(entry => + new KafkaPrincipal(entry.principalType, entry.principalName)) + + if (!allowTokenRequests(request)) { + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => + CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, requestThrottleMs, + Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, owner, requester)) + } else if (!owner.equals(requester) && !authHelper.authorize(request.context, CREATE_TOKENS, USER, owner.toString)) { + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => + CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, requestThrottleMs, + Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED, owner, requester)) + } else if (renewerList.exists(principal => principal.getPrincipalType != KafkaPrincipal.USER_TYPE)) { + requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs => + CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, requestThrottleMs, + Errors.INVALID_PRINCIPAL_TYPE, owner, requester)) + } else { + maybeForwardToController(request, handleCreateTokenRequestZk) + } + } + + def handleCreateTokenRequestZk(request: RequestChannel.Request): Unit = { Review Comment: I wonder if this is a broader problem. Are there other RPCs that mutate Zookeeper that need to check that they are the active controller when migration is enabled? It is likely a short window of time, but there does seem to be a possibility that an RPC could make it to a broker and end up mutating Zookeeper when in fact the active controller is a KRaft controller. @mumrah WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org