[GitHub] [kafka] mumrah commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-18 Thread via GitHub


mumrah commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1298714610


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -2999,7 +2999,38 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleCreateTokenRequest(request: RequestChannel.Request): Unit = {
+val createTokenRequest = request.body[CreateDelegationTokenRequest]
+
+val requester = request.context.principal
+val ownerPrincipalName = createTokenRequest.data.ownerPrincipalName
+val owner = if (ownerPrincipalName == null || ownerPrincipalName.isEmpty) {
+  request.context.principal
+} else {
+  new KafkaPrincipal(createTokenRequest.data.ownerPrincipalType, 
ownerPrincipalName)
+}
+val renewerList = 
createTokenRequest.data.renewers.asScala.toList.map(entry =>
+  new KafkaPrincipal(entry.principalType, entry.principalName))
+
+if (!allowTokenRequests(request)) {
+  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, 
requestThrottleMs,
+  Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, owner, requester))
+} else if (!owner.equals(requester) && 
!authHelper.authorize(request.context, CREATE_TOKENS, USER, owner.toString)) {
+  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, 
requestThrottleMs,
+  Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED, owner, requester))
+} else if (renewerList.exists(principal => principal.getPrincipalType != 
KafkaPrincipal.USER_TYPE)) {
+  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, 
requestThrottleMs,
+  Errors.INVALID_PRINCIPAL_TYPE, owner, requester))
+} else {
+  maybeForwardToController(request, handleCreateTokenRequestZk)
+}
+  }
+
+  def handleCreateTokenRequestZk(request: RequestChannel.Request): Unit = {

Review Comment:
   @rondagostino it's a good call out. We should audit KafkaApis to ensure any 
mutating request handler is protected by `KafkaController#isActive`. I suspect 
delegation tokens is a bit of an outlier since it does not have a controller 
RPC (in ZK mode), but it would be good to check the other handlers to be 
certain.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] mumrah commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-09 Thread via GitHub


mumrah commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1289141277


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1498,6 +1543,30 @@ private void cancelNextWriteNoOpRecord() {
 queue.cancelDeferred(WRITE_NO_OP_RECORD);
 }
 
+private static final String WRITE_REMOVE_DELEGATIONTOKEN_RECORD = 
"writeRemoveDelegationTokenRecord";

Review Comment:
   I think should be something like "maybeExpireDelegationTokens" or something 
to reflect what this event is doing (rather than what record ends up being used)



##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -176,8 +176,11 @@ public enum MetadataVersion {
 // Support for SCRAM
 IBP_3_5_IV2(11, "3.5", "IV2", true),
 
-// Remove leader epoch bump when KRaft controller shrinks the ISR 
(KAFKA-15021)
-IBP_3_6_IV0(12, "3.6", "IV0", false);
+// Support for Remove leader epoch bump when KRaft controller shrinks the 
ISR (KAFKA-15021)
+IBP_3_6_IV0(12, "3.6", "IV0", false),
+
+// Support for DelegationTokens

Review Comment:
   nit: "KRaft support for DelegationTokens"



##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##
@@ -633,4 +641,35 @@ void handleAclsDelta(AclsImage image, AclsDelta delta, 
KRaftMigrationOperationCo
 migrationClient.aclClient().writeResourceAcls(resourcePattern, 
accessControlEntries, migrationState));
 });
 }
+
+void handleDelegationTokenDelta(DelegationTokenImage image, 
DelegationTokenDelta delta, KRaftMigrationOperationConsumer operationConsumer) {
+Set updatedResources = delta.changes().keySet();

Review Comment:
   nit: "updatedTokens" ?



##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+