rondagostino commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1298706120


##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -2999,7 +2999,38 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleCreateTokenRequest(request: RequestChannel.Request): Unit = {
+    val createTokenRequest = request.body[CreateDelegationTokenRequest]
+
+    val requester = request.context.principal
+    val ownerPrincipalName = createTokenRequest.data.ownerPrincipalName
+    val owner = if (ownerPrincipalName == null || ownerPrincipalName.isEmpty) {
+      request.context.principal
+    } else {
+      new KafkaPrincipal(createTokenRequest.data.ownerPrincipalType, 
ownerPrincipalName)
+    }
+    val renewerList = 
createTokenRequest.data.renewers.asScala.toList.map(entry =>
+      new KafkaPrincipal(entry.principalType, entry.principalName))
+
+    if (!allowTokenRequests(request)) {
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+        
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, 
requestThrottleMs,
+          Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, owner, requester))
+    } else if (!owner.equals(requester) && 
!authHelper.authorize(request.context, CREATE_TOKENS, USER, owner.toString)) {
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+        
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, 
requestThrottleMs,
+          Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED, owner, requester))
+    } else if (renewerList.exists(principal => principal.getPrincipalType != 
KafkaPrincipal.USER_TYPE)) {
+      requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+        
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, 
requestThrottleMs,
+          Errors.INVALID_PRINCIPAL_TYPE, owner, requester))
+    } else {
+      maybeForwardToController(request, handleCreateTokenRequestZk)
+    }
+  }
+
+  def handleCreateTokenRequestZk(request: RequestChannel.Request): Unit = {

Review Comment:
   I wonder if this is a broader problem.  Are there other RPCs that mutate 
Zookeeper that need to check that they are the active controller when migration 
is enabled?  It is likely a short window of time, but there does seem to be a 
possibility that an RPC could make it to a broker and end up mutating Zookeeper 
when in fact the active controller is a KRaft controller.  @mumrah WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to