[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-18 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1299014422


##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -842,6 +848,114 @@ class ControllerApis(val requestChannel: RequestChannel,
   }
   }
 
+  def allowTokenRequests(request: RequestChannel.Request): Boolean = {
+val protocol = request.context.securityProtocol
+if (request.context.principal.tokenAuthenticated ||
+  // We allow forwarded requests to use PLAINTEXT for testing purposes
+  (request.isForwarded == false && protocol == SecurityProtocol.PLAINTEXT) 
||
+  // disallow requests from 1-way SSL
+  (protocol == SecurityProtocol.SSL && request.context.principal == 
KafkaPrincipal.ANONYMOUS))
+  false
+else
+  true
+  }

Review Comment:
   Updated with comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-18 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1299014398


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -6069,19 +6071,19 @@ class KafkaApisTest {
   @Test
   def testRaftShouldAlwaysForwardCreateTokenRequest(): Unit = {
 metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = 
true).handleCreateTokenRequest)
+verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = 
true).handleCreateTokenRequestZk)

Review Comment:
   I added a comment



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-18 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1299014357


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -5290,8 +5290,10 @@ class KafkaApisTest {
 
 // read the header from the buffer first so that the body can be read next 
from the Request constructor
 val header = RequestHeader.parse(buffer)
-val context = new RequestContext(header, "1", InetAddress.getLocalHost, 
KafkaPrincipal.ANONYMOUS,
-  listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, 
fromPrivilegedListener,
+// DelegationTokens require the context authenticated to be a
+// non SecurityProtocol.PLAINTEXT and a non KafkaPrincipal.ANONYMOUS
+val context = new RequestContext(header, "1", InetAddress.getLocalHost, 
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "Alice"),
+  listenerName, SecurityProtocol.SSL, ClientInformation.EMPTY, 
fromPrivilegedListener,

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-18 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1298798195


##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -279,10 +280,7 @@ class BrokerServer(
   )
 
   /* start token manager */
-  if (config.tokenAuthEnabled) {
-throw new UnsupportedOperationException("Delegation tokens are not 
supported")
-  }
-  tokenManager = new DelegationTokenManager(config, tokenCache, time , 
null)
+  tokenManager = new DelegationTokenManager(config, tokenCache, time)
   tokenManager.startup() // does nothing, we just need a token manager in 
order to compile right now...

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-18 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1298784076


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -6069,19 +6071,19 @@ class KafkaApisTest {
   @Test
   def testRaftShouldAlwaysForwardCreateTokenRequest(): Unit = {
 metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = 
true).handleCreateTokenRequest)
+verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = 
true).handleCreateTokenRequestZk)

Review Comment:
   It is testing the error message in the case where the request was not 
forwarded and we are in KRaft mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-18 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1298770601


##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -842,6 +848,114 @@ class ControllerApis(val requestChannel: RequestChannel,
   }
   }
 
+  def allowTokenRequests(request: RequestChannel.Request): Boolean = {
+val protocol = request.context.securityProtocol
+if (request.context.principal.tokenAuthenticated ||
+  // We allow forwarded requests to use PLAINTEXT for testing purposes
+  (request.isForwarded == false && protocol == SecurityProtocol.PLAINTEXT) 
||
+  // disallow requests from 1-way SSL
+  (protocol == SecurityProtocol.SSL && request.context.principal == 
KafkaPrincipal.ANONYMOUS))
+  false
+else
+  true
+  }

Review Comment:
   Yes, I agree that special casing PLAINTEXT is generally bad, and no 
production system should ever have PLAINTEXT. Here I am doing just what you 
suggested, I'm only allowing it on the forwarded cases and really that should 
only ever be on non production systems used for testing. The other checks still 
should not be allowed on the forwarded anyway. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-18 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1298728194


##
core/src/main/scala/kafka/server/DelegationTokenManager.scala:
##
@@ -165,16 +99,16 @@ object DelegationTokenManager {
 
 class DelegationTokenManager(val config: KafkaConfig,
  val tokenCache: DelegationTokenCache,
- val time: Time,
- val zkClient: KafkaZkClient) extends Logging {
+ val time: Time) extends Logging {
   this.logIdent = s"[Token Manager on Broker ${config.brokerId}]: "

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-18 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1298721551


##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+private Time time = Time.SYSTEM;
+
+static class Builder {
+private LogContext logContext = null;
+private DelegationTokenCache tokenCache = null;
+private String tokenSecretKeyString = null;
+private long tokenDefaultMaxLifetimeMs = 0;
+private long tokenDefaultRenewLifetimeMs = 0;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setTokenCache(DelegationTokenCache tokenCache) {
+this.tokenCache = tokenCache;
+return this;
+}
+
+Builder setDelegationTokenSecretKey(String tokenSecretKeyString) {
+this.tokenSecretKeyString = tokenSecretKeyString;
+return this;
+}
+
+Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetimeMs) {
+this.tokenDefaultMaxLifetimeMs = tokenDefaultMaxLifetimeMs;
+return this;
+}
+
+Builder setDelegationTokenExpiryTimeMs(long 
tokenDefaultRenewLifetimeMs) {
+this.tokenDefaultRenewLifetimeMs = tokenDefaultRenewLifetimeMs;
+return this;
+}
+
+DelegationTokenControlManager build() {
+if (logContext == null) logContext = new LogContext();
+return new DelegationTokenControlManager(
+  logContext,
+  tokenCache,
+  tokenSecretKeyString,
+  tokenDefaultMaxLifetimeMs,
+  tokenDefaultRenewLifetimeMs);
+}
+}
+
+private final Logger log;
+private final DelegationTokenCache tokenCache;
+private final String tokenSecretKeyString;
+private final long tokenDefaultMaxLifetimeMs;
+private final long 

[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-18 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1298461774


##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -842,6 +848,114 @@ class ControllerApis(val requestChannel: RequestChannel,
   }
   }
 
+  def allowTokenRequests(request: RequestChannel.Request): Boolean = {
+val protocol = request.context.securityProtocol
+if (request.context.principal.tokenAuthenticated ||
+  // We have to allow PLAINTEXT for the controller Apis
+  // protocol == SecurityProtocol.PLAINTEXT ||

Review Comment:
   Yes and done!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-17 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1297944016


##
metadata/src/main/java/org/apache/kafka/image/DelegationTokenDelta.java:
##
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.MetadataVersion;
+
+import java.util.HashMap;
+import java.util.Map.Entry;
+import java.util.Map;
+import java.util.Optional;
+
+
+/**
+ * Represents changes to delegation tokens in the metadata image.
+ */
+public final class DelegationTokenDelta {
+private final DelegationTokenImage image;
+
+// Key is TokenID which is contained in the value TokenInformation
+private final Map> changes = new 
HashMap<>();
+
+public DelegationTokenDelta(DelegationTokenImage image) {
+this.image = image;
+}
+
+public void finishSnapshot() {
+for (String tokenId : image.tokens().keySet()) {
+if (!changes.containsKey(tokenId)) {
+// If the tokenId from the image did not appear in the 
snapshot, mark it as removed
+changes.put(tokenId, Optional.empty());
+
+}
+}
+}
+
+public DelegationTokenImage image() {
+return image;
+}
+
+public Map> changes() {
+return changes;
+}
+
+public void replay(DelegationTokenRecord record) {
+changes.put(record.tokenId(), 
Optional.of(DelegationTokenData.fromRecord(record)));
+}
+
+public void replay(RemoveDelegationTokenRecord record) {
+changes.put(record.tokenId(), Optional.empty());
+}
+
+public void handleMetadataVersionChange(MetadataVersion 
changedMetadataVersion) {
+// nothing to do
+}
+
+public DelegationTokenImage apply() {
+Map newTokens = new HashMap<>();
+for (Entry entry : 
image.tokens().entrySet()) {

Review Comment:
   Comments added



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-17 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1297930601


##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+private Time time = Time.SYSTEM;
+
+static class Builder {
+private LogContext logContext = null;
+private DelegationTokenCache tokenCache = null;
+private String secretKeyString = null;
+private long tokenDefaultMaxLifetime = 0;
+private long tokenDefaultRenewLifetime = 0;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setTokenCache(DelegationTokenCache tokenCache) {
+this.tokenCache = tokenCache;
+return this;
+}
+
+Builder setTokenKeyString(String secretKeyString) {
+this.secretKeyString = secretKeyString;
+return this;
+}
+
+Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+return this;
+}
+
+Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+return this;
+}
+
+DelegationTokenControlManager build() {
+if (logContext == null) logContext = new LogContext();
+return new DelegationTokenControlManager(
+  logContext,
+  tokenCache,
+  secretKeyString,
+  tokenDefaultMaxLifetime,
+  tokenDefaultRenewLifetime);
+}
+}
+
+private final Logger log;
+private final DelegationTokenCache tokenCache;
+private final String secretKeyString;
+private final long tokenDefaultMaxLifetime;
+private final long tokenDefaultRenewLifetime;
+
+private 

[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-17 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1297918868


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -2999,7 +2999,38 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleCreateTokenRequest(request: RequestChannel.Request): Unit = {
+val createTokenRequest = request.body[CreateDelegationTokenRequest]
+
+val requester = request.context.principal
+val ownerPrincipalName = createTokenRequest.data.ownerPrincipalName
+val owner = if (ownerPrincipalName == null || ownerPrincipalName.isEmpty) {
+  request.context.principal
+} else {
+  new KafkaPrincipal(createTokenRequest.data.ownerPrincipalType, 
ownerPrincipalName)
+}
+val renewerList = 
createTokenRequest.data.renewers.asScala.toList.map(entry =>
+  new KafkaPrincipal(entry.principalType, entry.principalName))
+
+if (!allowTokenRequests(request)) {
+  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, 
requestThrottleMs,
+  Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, owner, requester))
+} else if (!owner.equals(requester) && 
!authHelper.authorize(request.context, CREATE_TOKENS, USER, owner.toString)) {
+  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, 
requestThrottleMs,
+  Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED, owner, requester))
+} else if (renewerList.exists(principal => principal.getPrincipalType != 
KafkaPrincipal.USER_TYPE)) {
+  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, 
requestThrottleMs,
+  Errors.INVALID_PRINCIPAL_TYPE, owner, requester))
+} else {
+  maybeForwardToController(request, handleCreateTokenRequestZk)
+}
+  }
+
+  def handleCreateTokenRequestZk(request: RequestChannel.Request): Unit = {

Review Comment:
   We only need to check if we are the active controller if we are in 
migration. I've updated the code to reflect that.
   



##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -215,9 +215,9 @@ class KafkaApis(val requestChannel: RequestChannel,
 case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
 case ApiKeys.SASL_AUTHENTICATE => 
handleSaslAuthenticateRequest(request)
 case ApiKeys.CREATE_PARTITIONS => maybeForwardToController(request, 
handleCreatePartitionsRequest)
-case ApiKeys.CREATE_DELEGATION_TOKEN => 
maybeForwardToController(request, handleCreateTokenRequest)
-case ApiKeys.RENEW_DELEGATION_TOKEN => 
maybeForwardToController(request, handleRenewTokenRequest)
-case ApiKeys.EXPIRE_DELEGATION_TOKEN => 
maybeForwardToController(request, handleExpireTokenRequest)
+case ApiKeys.CREATE_DELEGATION_TOKEN => 
handleCreateTokenRequest(request)
+case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request)
+case ApiKeys.EXPIRE_DELEGATION_TOKEN => 
handleExpireTokenRequest(request)

Review Comment:
   Comment added.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-17 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1297648037


##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+private Time time = Time.SYSTEM;
+
+static class Builder {
+private LogContext logContext = null;
+private DelegationTokenCache tokenCache = null;
+private String secretKeyString = null;
+private long tokenDefaultMaxLifetime = 0;
+private long tokenDefaultRenewLifetime = 0;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setTokenCache(DelegationTokenCache tokenCache) {
+this.tokenCache = tokenCache;
+return this;
+}
+
+Builder setTokenKeyString(String secretKeyString) {
+this.secretKeyString = secretKeyString;
+return this;
+}
+
+Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+return this;
+}
+
+Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+return this;
+}
+
+DelegationTokenControlManager build() {
+if (logContext == null) logContext = new LogContext();
+return new DelegationTokenControlManager(
+  logContext,
+  tokenCache,
+  secretKeyString,
+  tokenDefaultMaxLifetime,
+  tokenDefaultRenewLifetime);
+}
+}
+
+private final Logger log;
+private final DelegationTokenCache tokenCache;
+private final String secretKeyString;
+private final long tokenDefaultMaxLifetime;
+private final long tokenDefaultRenewLifetime;
+
+private 

[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-17 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1297647792


##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+private Time time = Time.SYSTEM;
+
+static class Builder {
+private LogContext logContext = null;
+private DelegationTokenCache tokenCache = null;
+private String secretKeyString = null;
+private long tokenDefaultMaxLifetime = 0;
+private long tokenDefaultRenewLifetime = 0;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setTokenCache(DelegationTokenCache tokenCache) {
+this.tokenCache = tokenCache;
+return this;
+}
+
+Builder setTokenKeyString(String secretKeyString) {
+this.secretKeyString = secretKeyString;
+return this;
+}
+
+Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+return this;
+}
+
+Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+return this;
+}
+
+DelegationTokenControlManager build() {
+if (logContext == null) logContext = new LogContext();
+return new DelegationTokenControlManager(
+  logContext,
+  tokenCache,
+  secretKeyString,
+  tokenDefaultMaxLifetime,
+  tokenDefaultRenewLifetime);
+}
+}
+
+private final Logger log;
+private final DelegationTokenCache tokenCache;
+private final String secretKeyString;
+private final long tokenDefaultMaxLifetime;
+private final long tokenDefaultRenewLifetime;
+
+private 

[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-17 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1297472470


##
metadata/src/test/java/org/apache/kafka/image/DelegationTokenImageTest.java:
##
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.image.writer.ImageWriterOptions;
+import org.apache.kafka.image.writer.RecordListWriter;
+import org.apache.kafka.metadata.RecordTestUtils;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+
+@Timeout(value = 40)
+public class DelegationTokenImageTest {
+public final static DelegationTokenImage IMAGE1;
+
+public final static List DELTA1_RECORDS;
+
+final static DelegationTokenDelta DELTA1;
+
+final static DelegationTokenImage IMAGE2;
+
+static DelegationTokenData randomDelegationTokenData(String tokenId, long 
expireTimestamp) {
+TokenInformation ti = new TokenInformation(
+tokenId,
+SecurityUtils.parseKafkaPrincipal(KafkaPrincipal.USER_TYPE + ":" + 
"fred"),
+SecurityUtils.parseKafkaPrincipal(KafkaPrincipal.USER_TYPE + ":" + 
"fred"),
+new ArrayList(),
+0,
+1000,
+expireTimestamp);
+return new DelegationTokenData(ti);
+}
+
+static {
+Map image1 = new HashMap<>();
+image1.put("somerandomuuid1", 
randomDelegationTokenData("somerandomuuid1", 100));
+image1.put("somerandomuuid2", 
randomDelegationTokenData("somerandomuuid2", 100));
+image1.put("somerandomuuid3", 
randomDelegationTokenData("somerandomuuid3", 100));
+IMAGE1 = new DelegationTokenImage(image1);
+
+DELTA1_RECORDS = new ArrayList<>();
+DELTA1_RECORDS.add(new ApiMessageAndVersion(new 
DelegationTokenRecord().
+setOwner(KafkaPrincipal.USER_TYPE + ":" + "fred").
+setRequester(KafkaPrincipal.USER_TYPE + ":" + "fred").
+setIssueTimestamp(0).
+setMaxTimestamp(1000).
+setExpirationTimestamp(200).
+setTokenId("somerandomuuid1"), (short) 0));
+DELTA1_RECORDS.add(new ApiMessageAndVersion(new 
RemoveDelegationTokenRecord().
+setTokenId("somerandomuuid3"), (short) 0));
+
+DELTA1 = new DelegationTokenDelta(IMAGE1);
+RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);
+
+Map image2 = new HashMap<>();
+image2.put("somerandomuuid1", 
randomDelegationTokenData("somerandomuuid1", 200));
+image2.put("somerandomuuid2", 
randomDelegationTokenData("somerandomuuid2", 100));
+IMAGE2 = new DelegationTokenImage(image2);
+}
+
+@Test
+public void testEmptyImageRoundTrip() throws Throwable {
+testToImageAndBack(DelegationTokenImage.EMPTY);
+}
+
+@Test
+public void testImage1RoundTrip() throws Throwable {
+testToImageAndBack(IMAGE1);
+}
+
+@Test
+public void testApplyDelta1() throws Throwable {
+assertEquals(IMAGE2, DELTA1.apply());
+}
+
+@Test
+public void testImage2RoundTrip() throws Throwable {
+testToImageAndBack(IMAGE2);
+}
+
+private void testToImageAndBack(DelegationTokenImage image) throws 
Throwable {
+RecordListWriter writer = new RecordListWriter();
+image.write(writer, new ImageWriterOptions.Builder().build());
+DelegationTokenDelta delta = new 
DelegationTokenDelta(DelegationTokenImage.EMPTY);
+

[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-17 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1297471447


##
metadata/src/main/java/org/apache/kafka/metadata/DelegationTokenData.java:
##


Review Comment:
   TokenInformation is Legacy so I'm not updating a test for it. I did add 
toRecord/fromRecord which is tested in the new DelegationTokenDataTest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-17 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1297470130


##
metadata/src/main/java/org/apache/kafka/metadata/DelegationTokenData.java:
##


Review Comment:
   I have added a DelegationTokenDataTest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-16 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1296578432


##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+private Time time = Time.SYSTEM;
+
+static class Builder {
+private LogContext logContext = null;
+private DelegationTokenCache tokenCache = null;
+private String secretKeyString = null;
+private long tokenDefaultMaxLifetime = 0;
+private long tokenDefaultRenewLifetime = 0;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setTokenCache(DelegationTokenCache tokenCache) {
+this.tokenCache = tokenCache;
+return this;
+}
+
+Builder setTokenKeyString(String secretKeyString) {
+this.secretKeyString = secretKeyString;
+return this;
+}
+
+Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+return this;
+}
+
+Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+return this;
+}
+
+DelegationTokenControlManager build() {
+if (logContext == null) logContext = new LogContext();
+return new DelegationTokenControlManager(
+  logContext,
+  tokenCache,
+  secretKeyString,
+  tokenDefaultMaxLifetime,
+  tokenDefaultRenewLifetime);
+}
+}
+
+private final Logger log;
+private final DelegationTokenCache tokenCache;
+private final String secretKeyString;
+private final long tokenDefaultMaxLifetime;
+private final long tokenDefaultRenewLifetime;
+
+private 

[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-16 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1296577535


##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+private Time time = Time.SYSTEM;
+
+static class Builder {
+private LogContext logContext = null;
+private DelegationTokenCache tokenCache = null;
+private String secretKeyString = null;
+private long tokenDefaultMaxLifetime = 0;
+private long tokenDefaultRenewLifetime = 0;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setTokenCache(DelegationTokenCache tokenCache) {
+this.tokenCache = tokenCache;
+return this;
+}
+
+Builder setTokenKeyString(String secretKeyString) {
+this.secretKeyString = secretKeyString;
+return this;
+}
+
+Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+return this;
+}
+
+Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+return this;
+}
+
+DelegationTokenControlManager build() {
+if (logContext == null) logContext = new LogContext();
+return new DelegationTokenControlManager(
+  logContext,
+  tokenCache,
+  secretKeyString,
+  tokenDefaultMaxLifetime,
+  tokenDefaultRenewLifetime);
+}
+}
+
+private final Logger log;
+private final DelegationTokenCache tokenCache;
+private final String secretKeyString;
+private final long tokenDefaultMaxLifetime;
+private final long tokenDefaultRenewLifetime;
+
+private 

[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-16 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1296577263


##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+private Time time = Time.SYSTEM;
+
+static class Builder {
+private LogContext logContext = null;
+private DelegationTokenCache tokenCache = null;
+private String secretKeyString = null;
+private long tokenDefaultMaxLifetime = 0;
+private long tokenDefaultRenewLifetime = 0;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setTokenCache(DelegationTokenCache tokenCache) {
+this.tokenCache = tokenCache;
+return this;
+}
+
+Builder setTokenKeyString(String secretKeyString) {
+this.secretKeyString = secretKeyString;
+return this;
+}
+
+Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+return this;
+}
+
+Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+return this;
+}
+
+DelegationTokenControlManager build() {
+if (logContext == null) logContext = new LogContext();
+return new DelegationTokenControlManager(
+  logContext,
+  tokenCache,
+  secretKeyString,
+  tokenDefaultMaxLifetime,
+  tokenDefaultRenewLifetime);
+}
+}
+
+private final Logger log;
+private final DelegationTokenCache tokenCache;
+private final String secretKeyString;
+private final long tokenDefaultMaxLifetime;
+private final long tokenDefaultRenewLifetime;
+
+private 

[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-16 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1296576872


##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+private Time time = Time.SYSTEM;
+
+static class Builder {
+private LogContext logContext = null;
+private DelegationTokenCache tokenCache = null;
+private String secretKeyString = null;
+private long tokenDefaultMaxLifetime = 0;
+private long tokenDefaultRenewLifetime = 0;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setTokenCache(DelegationTokenCache tokenCache) {
+this.tokenCache = tokenCache;
+return this;
+}
+
+Builder setTokenKeyString(String secretKeyString) {
+this.secretKeyString = secretKeyString;
+return this;
+}
+
+Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+return this;
+}
+
+Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+return this;
+}
+
+DelegationTokenControlManager build() {
+if (logContext == null) logContext = new LogContext();
+return new DelegationTokenControlManager(
+  logContext,
+  tokenCache,
+  secretKeyString,
+  tokenDefaultMaxLifetime,
+  tokenDefaultRenewLifetime);
+}
+}
+
+private final Logger log;
+private final DelegationTokenCache tokenCache;
+private final String secretKeyString;
+private final long tokenDefaultMaxLifetime;
+private final long tokenDefaultRenewLifetime;
+
+private 

[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-16 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1296576768


##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+private Time time = Time.SYSTEM;
+
+static class Builder {
+private LogContext logContext = null;
+private DelegationTokenCache tokenCache = null;
+private String secretKeyString = null;
+private long tokenDefaultMaxLifetime = 0;
+private long tokenDefaultRenewLifetime = 0;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setTokenCache(DelegationTokenCache tokenCache) {
+this.tokenCache = tokenCache;
+return this;
+}
+
+Builder setTokenKeyString(String secretKeyString) {
+this.secretKeyString = secretKeyString;
+return this;
+}
+
+Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+return this;
+}
+
+Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+return this;
+}
+
+DelegationTokenControlManager build() {
+if (logContext == null) logContext = new LogContext();
+return new DelegationTokenControlManager(
+  logContext,
+  tokenCache,
+  secretKeyString,
+  tokenDefaultMaxLifetime,
+  tokenDefaultRenewLifetime);
+}
+}
+
+private final Logger log;
+private final DelegationTokenCache tokenCache;
+private final String secretKeyString;
+private final long tokenDefaultMaxLifetime;
+private final long tokenDefaultRenewLifetime;
+
+private 

[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-16 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1296576660


##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+private Time time = Time.SYSTEM;
+
+static class Builder {
+private LogContext logContext = null;
+private DelegationTokenCache tokenCache = null;
+private String secretKeyString = null;
+private long tokenDefaultMaxLifetime = 0;
+private long tokenDefaultRenewLifetime = 0;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setTokenCache(DelegationTokenCache tokenCache) {
+this.tokenCache = tokenCache;
+return this;
+}
+
+Builder setTokenKeyString(String secretKeyString) {
+this.secretKeyString = secretKeyString;
+return this;
+}
+
+Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+return this;
+}
+
+Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+return this;
+}
+
+DelegationTokenControlManager build() {
+if (logContext == null) logContext = new LogContext();
+return new DelegationTokenControlManager(
+  logContext,
+  tokenCache,
+  secretKeyString,
+  tokenDefaultMaxLifetime,
+  tokenDefaultRenewLifetime);
+}
+}
+
+private final Logger log;
+private final DelegationTokenCache tokenCache;
+private final String secretKeyString;
+private final long tokenDefaultMaxLifetime;
+private final long tokenDefaultRenewLifetime;
+
+private 

[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-16 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1296576394


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -202,6 +211,11 @@ static public class Builder {
 private BootstrapMetadata bootstrapMetadata = null;
 private int maxRecordsPerBatch = MAX_RECORDS_PER_BATCH;
 private boolean zkMigrationEnabled = false;
+private DelegationTokenCache tokenCache;
+private String tokenKeyString;

Review Comment:
   The config var is delegationTokenSecretKey which is a String.
   The DelegationTokenManager calls things secretKey but sometimes that is not 
a String and instead a byte array or something else. 
   I wanted to be explicit that this is a String and that no changes are made 
to it until it is used to create a DelegationToken.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-16 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1296572585


##
metadata/src/test/java/org/apache/kafka/image/DelegationTokenImageTest.java:
##
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.image.writer.ImageWriterOptions;
+import org.apache.kafka.image.writer.RecordListWriter;
+import org.apache.kafka.metadata.RecordTestUtils;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+
+@Timeout(value = 40)
+public class DelegationTokenImageTest {
+public final static DelegationTokenImage IMAGE1;
+
+public final static List DELTA1_RECORDS;
+
+final static DelegationTokenDelta DELTA1;
+
+final static DelegationTokenImage IMAGE2;
+
+static DelegationTokenData randomDelegationTokenData(String tokenId, long 
expireTimestamp) {
+TokenInformation ti = new TokenInformation(
+tokenId,
+SecurityUtils.parseKafkaPrincipal(KafkaPrincipal.USER_TYPE + ":" + 
"fred"),
+SecurityUtils.parseKafkaPrincipal(KafkaPrincipal.USER_TYPE + ":" + 
"fred"),
+new ArrayList(),
+0,
+1000,
+expireTimestamp);
+return new DelegationTokenData(ti);
+}
+
+static {
+Map image1 = new HashMap<>();
+image1.put("somerandomuuid1", 
randomDelegationTokenData("somerandomuuid1", 100));
+image1.put("somerandomuuid2", 
randomDelegationTokenData("somerandomuuid2", 100));
+image1.put("somerandomuuid3", 
randomDelegationTokenData("somerandomuuid3", 100));
+IMAGE1 = new DelegationTokenImage(image1);
+
+DELTA1_RECORDS = new ArrayList<>();
+DELTA1_RECORDS.add(new ApiMessageAndVersion(new 
DelegationTokenRecord().
+setOwner(KafkaPrincipal.USER_TYPE + ":" + "fred").
+setRequester(KafkaPrincipal.USER_TYPE + ":" + "fred").
+setIssueTimestamp(0).
+setMaxTimestamp(1000).
+setExpirationTimestamp(200).
+setTokenId("somerandomuuid1"), (short) 0));
+DELTA1_RECORDS.add(new ApiMessageAndVersion(new 
RemoveDelegationTokenRecord().
+setTokenId("somerandomuuid3"), (short) 0));
+
+DELTA1 = new DelegationTokenDelta(IMAGE1);
+RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);
+
+Map image2 = new HashMap<>();
+image2.put("somerandomuuid1", 
randomDelegationTokenData("somerandomuuid1", 200));
+image2.put("somerandomuuid2", 
randomDelegationTokenData("somerandomuuid2", 100));
+IMAGE2 = new DelegationTokenImage(image2);
+}
+
+@Test
+public void testEmptyImageRoundTrip() throws Throwable {
+testToImageAndBack(DelegationTokenImage.EMPTY);
+}
+
+@Test
+public void testImage1RoundTrip() throws Throwable {
+testToImageAndBack(IMAGE1);
+}
+
+@Test
+public void testApplyDelta1() throws Throwable {
+assertEquals(IMAGE2, DELTA1.apply());
+}
+
+@Test
+public void testImage2RoundTrip() throws Throwable {
+testToImageAndBack(IMAGE2);
+}
+
+private void testToImageAndBack(DelegationTokenImage image) throws 
Throwable {
+RecordListWriter writer = new RecordListWriter();
+image.write(writer, new ImageWriterOptions.Builder().build());
+DelegationTokenDelta delta = new 
DelegationTokenDelta(DelegationTokenImage.EMPTY);
+

[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-16 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1296567527


##
metadata/src/main/java/org/apache/kafka/image/node/DelegationTokenDataNode.java:
##


Review Comment:
   There are unit tests to check the redaction state. See 
MetadataNodeRedactionCriteriaTest.java



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-16 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1296410110


##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+private Time time = Time.SYSTEM;
+
+static class Builder {
+private LogContext logContext = null;
+private DelegationTokenCache tokenCache = null;
+private String secretKeyString = null;
+private long tokenDefaultMaxLifetime = 0;
+private long tokenDefaultRenewLifetime = 0;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setTokenCache(DelegationTokenCache tokenCache) {
+this.tokenCache = tokenCache;
+return this;
+}
+
+Builder setTokenKeyString(String secretKeyString) {
+this.secretKeyString = secretKeyString;
+return this;
+}
+
+Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+return this;
+}
+
+Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+return this;
+}
+
+DelegationTokenControlManager build() {
+if (logContext == null) logContext = new LogContext();
+return new DelegationTokenControlManager(
+  logContext,
+  tokenCache,
+  secretKeyString,
+  tokenDefaultMaxLifetime,
+  tokenDefaultRenewLifetime);
+}
+}
+
+private final Logger log;
+private final DelegationTokenCache tokenCache;
+private final String secretKeyString;
+private final long tokenDefaultMaxLifetime;
+private final long tokenDefaultRenewLifetime;
+
+private 

[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-16 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1296409238


##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+private Time time = Time.SYSTEM;
+
+static class Builder {
+private LogContext logContext = null;
+private DelegationTokenCache tokenCache = null;
+private String secretKeyString = null;
+private long tokenDefaultMaxLifetime = 0;
+private long tokenDefaultRenewLifetime = 0;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setTokenCache(DelegationTokenCache tokenCache) {
+this.tokenCache = tokenCache;
+return this;
+}
+
+Builder setTokenKeyString(String secretKeyString) {
+this.secretKeyString = secretKeyString;
+return this;
+}
+
+Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+return this;
+}
+
+Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+return this;
+}
+
+DelegationTokenControlManager build() {
+if (logContext == null) logContext = new LogContext();
+return new DelegationTokenControlManager(
+  logContext,
+  tokenCache,
+  secretKeyString,
+  tokenDefaultMaxLifetime,
+  tokenDefaultRenewLifetime);
+}
+}
+
+private final Logger log;
+private final DelegationTokenCache tokenCache;
+private final String secretKeyString;
+private final long tokenDefaultMaxLifetime;
+private final long tokenDefaultRenewLifetime;
+
+private 

[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-16 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1296399634


##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+private Time time = Time.SYSTEM;
+
+static class Builder {
+private LogContext logContext = null;
+private DelegationTokenCache tokenCache = null;
+private String secretKeyString = null;
+private long tokenDefaultMaxLifetime = 0;
+private long tokenDefaultRenewLifetime = 0;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setTokenCache(DelegationTokenCache tokenCache) {
+this.tokenCache = tokenCache;
+return this;
+}
+
+Builder setTokenKeyString(String secretKeyString) {
+this.secretKeyString = secretKeyString;
+return this;
+}
+
+Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+return this;
+}
+
+Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+return this;
+}
+
+DelegationTokenControlManager build() {
+if (logContext == null) logContext = new LogContext();
+return new DelegationTokenControlManager(
+  logContext,
+  tokenCache,
+  secretKeyString,
+  tokenDefaultMaxLifetime,
+  tokenDefaultRenewLifetime);
+}
+}
+
+private final Logger log;
+private final DelegationTokenCache tokenCache;
+private final String secretKeyString;
+private final long tokenDefaultMaxLifetime;
+private final long tokenDefaultRenewLifetime;
+
+private 

[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-16 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1296382585


##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+private Time time = Time.SYSTEM;
+
+static class Builder {
+private LogContext logContext = null;
+private DelegationTokenCache tokenCache = null;
+private String secretKeyString = null;
+private long tokenDefaultMaxLifetime = 0;
+private long tokenDefaultRenewLifetime = 0;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setTokenCache(DelegationTokenCache tokenCache) {
+this.tokenCache = tokenCache;
+return this;
+}
+
+Builder setTokenKeyString(String secretKeyString) {

Review Comment:
   So the kafka config string is "delegation.token.secret.key"
   The kafka config variable is ` delegationTokenSecretKey`
   So I probably need to change this to setDelegationTokenSecretKey



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-16 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1296379603


##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+private Time time = Time.SYSTEM;
+
+static class Builder {
+private LogContext logContext = null;
+private DelegationTokenCache tokenCache = null;
+private String secretKeyString = null;
+private long tokenDefaultMaxLifetime = 0;
+private long tokenDefaultRenewLifetime = 0;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setTokenCache(DelegationTokenCache tokenCache) {
+this.tokenCache = tokenCache;
+return this;
+}
+
+Builder setTokenKeyString(String secretKeyString) {
+this.secretKeyString = secretKeyString;
+return this;
+}
+
+Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+return this;
+}
+
+Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{

Review Comment:
   So the kafka config string is "delegation.token.expiry.time.ms"
   The kafka config variable is `delegationTokenExpiryTimeMs`
   The builder is taking the config to instantiate the Manager so the names of 
the methods should match the inputs. Internally the names can be more sane.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-16 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1296376127


##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+private Time time = Time.SYSTEM;
+
+static class Builder {
+private LogContext logContext = null;
+private DelegationTokenCache tokenCache = null;
+private String secretKeyString = null;
+private long tokenDefaultMaxLifetime = 0;
+private long tokenDefaultRenewLifetime = 0;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setTokenCache(DelegationTokenCache tokenCache) {
+this.tokenCache = tokenCache;
+return this;
+}
+
+Builder setTokenKeyString(String secretKeyString) {
+this.secretKeyString = secretKeyString;
+return this;
+}
+
+Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {

Review Comment:
   So the kafka config string is "delegation.token.max.lifetime.ms"
   The kafka config variable is `delegationTokenMaxLifeMs`
   So either Life or Lifetime 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-16 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1296352968


##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+private Time time = Time.SYSTEM;
+
+static class Builder {
+private LogContext logContext = null;
+private DelegationTokenCache tokenCache = null;
+private String secretKeyString = null;
+private long tokenDefaultMaxLifetime = 0;
+private long tokenDefaultRenewLifetime = 0;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setTokenCache(DelegationTokenCache tokenCache) {
+this.tokenCache = tokenCache;
+return this;
+}
+
+Builder setTokenKeyString(String secretKeyString) {
+this.secretKeyString = secretKeyString;
+return this;
+}
+
+Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+return this;
+}
+
+Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+return this;
+}
+
+DelegationTokenControlManager build() {
+if (logContext == null) logContext = new LogContext();
+return new DelegationTokenControlManager(
+  logContext,
+  tokenCache,
+  secretKeyString,
+  tokenDefaultMaxLifetime,
+  tokenDefaultRenewLifetime);
+}
+}
+
+private final Logger log;
+private final DelegationTokenCache tokenCache;
+private final String secretKeyString;
+private final long tokenDefaultMaxLifetime;
+private final long tokenDefaultRenewLifetime;
+
+private 

[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-16 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1296349986


##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+private Time time = Time.SYSTEM;
+
+static class Builder {
+private LogContext logContext = null;
+private DelegationTokenCache tokenCache = null;
+private String secretKeyString = null;
+private long tokenDefaultMaxLifetime = 0;
+private long tokenDefaultRenewLifetime = 0;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setTokenCache(DelegationTokenCache tokenCache) {
+this.tokenCache = tokenCache;
+return this;
+}
+
+Builder setTokenKeyString(String secretKeyString) {
+this.secretKeyString = secretKeyString;
+return this;
+}
+
+Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+return this;
+}
+
+Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+return this;
+}
+
+DelegationTokenControlManager build() {
+if (logContext == null) logContext = new LogContext();
+return new DelegationTokenControlManager(
+  logContext,
+  tokenCache,
+  secretKeyString,
+  tokenDefaultMaxLifetime,
+  tokenDefaultRenewLifetime);
+}
+}
+
+private final Logger log;
+private final DelegationTokenCache tokenCache;
+private final String secretKeyString;
+private final long tokenDefaultMaxLifetime;
+private final long tokenDefaultRenewLifetime;
+
+private 

[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-16 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1296013735


##
core/src/main/scala/kafka/server/DelegationTokenManager.scala:
##
@@ -257,6 +153,13 @@ class DelegationTokenManager(val config: KafkaConfig,
 scramCredentialMap.toMap
   }
 
+  /**
+   * @param token
+   */
+  def updateToken(token: DelegationToken): Unit = {

Review Comment:
   > Should we protect this as well with the lock? The ZK variant is protected 
through the create/renew/expire methods but this isn't. (As I see we only use 
it through `DelegationTokenPublisher.onMetadataUpdate` but I don't know if 
calls to that come from multiple threads or just a single one.)
   
   The calls come from just a single thread. They have to because replay of 
records out of order could lead to inconsistent state.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-16 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1296005323


##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -842,6 +847,75 @@ class ControllerApis(val requestChannel: RequestChannel,
   }
   }
 
+  def handleCreateDelegationTokenRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {

Review Comment:
   In the Zk case we will always have to do the checks in the broker 
(`KafkaApis.scala` or `DelegationTokenManagerZk.scala`)
   
   In the KRaft case where the client talks directly to the kcontroller we will 
have to do the checks in the controller (`ControllerApis.scala` or 
`DelegationTokenControlManager.java`) 
   
   In the KRaft case where the client talks to the broker and the broker 
forwards the request to the kcontroller we have to do the check on the 
connection authentication in the broker because the information about how the 
client to broker connection is authenticated is not forwarded. We cannot allow 
clients authenticated with delegation tokens to create, renew or expire 
delegation tokens.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-15 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1295339433


##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -842,6 +847,75 @@ class ControllerApis(val requestChannel: RequestChannel,
   }
   }
 
+  def handleCreateDelegationTokenRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+val alterRequest = request.body[CreateDelegationTokenRequest]
+
+val requester = request.context.principal
+val ownerPrincipalName = alterRequest.data.ownerPrincipalName
+val ownerPrincipalType = alterRequest.data.ownerPrincipalType
+val owner = if (ownerPrincipalName == null || ownerPrincipalName.isEmpty) {
+  request.context.principal
+} else {
+  new KafkaPrincipal(ownerPrincipalType, ownerPrincipalName)
+}
+
+// Requester is always allowed to create token for self
+if (!owner.equals(requester) && 

Review Comment:
   I added the code. Please look over the added code. 
   
   One change is that I had to allow PLAINTEXT because all our broker to 
controller forwarding tests use PLAINTEXT.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-15 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1295289182


##
core/src/main/scala/kafka/server/DelegationTokenManager.scala:
##
@@ -186,57 +134,28 @@ class DelegationTokenManager(val config: KafkaConfig,
   val tokenMaxLifetime: Long = config.delegationTokenMaxLifeMs
   val defaultTokenRenewTime: Long = config.delegationTokenExpiryTimeMs
   val tokenRemoverScanInterval: Long = 
config.delegationTokenExpiryCheckIntervalMs
-  private val lock = new Object()
-  private var tokenChangeListener: ZkNodeChangeNotificationListener = _
 
   def startup(): Unit = {
 if (config.tokenAuthEnabled) {
-  zkClient.createDelegationTokenPaths()
   loadCache()
-  tokenChangeListener = new ZkNodeChangeNotificationListener(zkClient, 
DelegationTokenChangeNotificationZNode.path, 
DelegationTokenChangeNotificationSequenceZNode.SequenceNumberPrefix, 
TokenChangedNotificationHandler)
-  tokenChangeListener.init()
 }
   }
 
   def shutdown(): Unit = {
 if (config.tokenAuthEnabled) {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-15 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1295288501


##
core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala:
##
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server.metadata
+
+import kafka.server.DelegationTokenManager
+import kafka.server.KafkaConfig
+import kafka.utils.Logging
+import org.apache.kafka.image.loader.LoaderManifest
+import org.apache.kafka.image.{MetadataDelta, MetadataImage}
+import org.apache.kafka.server.fault.FaultHandler
+
+
+class DelegationTokenPublisher(
+  conf: KafkaConfig,
+  faultHandler: FaultHandler,
+  nodeType: String,
+  tokenManager: DelegationTokenManager,
+) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
+  logIdent = s"[${name()}] "
+
+  var _firstPublish = true
+
+  override def name(): String = s"DelegationTokenPublisher ${nodeType} 
id=${conf.nodeId}"
+
+  override def onMetadataUpdate(
+delta: MetadataDelta,
+newImage: MetadataImage,
+manifest: LoaderManifest
+  ): Unit = {
+onMetadataUpdate(delta, newImage)
+  }
+
+  def onMetadataUpdate(
+delta: MetadataDelta,
+newImage: MetadataImage,
+  ): Unit = {
+val deltaName = if (_firstPublish) {
+  s"initial MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}"
+} else {
+  s"update MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}"
+}
+try {
+  if (_firstPublish) {
+// Initialize the tokenCache with the Image
+Option(newImage.delegationTokens()).foreach { delegationTokenImage =>
+  delegationTokenImage.tokens().forEach { (tokenId, 
delegationTokenData) =>
+
tokenManager.updateToken(tokenManager.getDelegationToken(delegationTokenData.tokenInformation()))
+  }
+}
+_firstPublish = false
+  }
+  // Apply changes to DelegationTokens.
+  Option(delta.delegationTokenDelta()).foreach { delegationTokenDelta =>
+delegationTokenDelta.changes().forEach { 
+  case (tokenId, delegationTokenData) => 

Review Comment:
   > nit: `tokenId` isn't used, you can just replace it with `_`
   
   It's used in the else clause when we remove the token.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-15 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1295230735


##
metadata/src/test/java/org/apache/kafka/image/DelegationTokenImageTest.java:
##


Review Comment:
   Yes and Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-15 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1295230635


##
metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java:
##


Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-15 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1295140057


##
metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java:
##


Review Comment:
   It is because the compare is using the toString() which redacts sensitive 
data and I didn't remove any of the delegation tokens between IMAGE1 and IMAGE2 
so all the keys match. Once I add a RemoveDelegationTokenRecord in the test, 
that should cause it to fail.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-15 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1294925543


##
metadata/src/main/resources/common/metadata/DelegationTokenRecord.json:
##
@@ -22,8 +22,10 @@
   "fields": [
 { "name": "Owner", "type": "string", "versions": "0+",
   "about": "The delegation token owner." },
+{ "name": "Requester", "type": "string", "versions": "0+",
+  "about": "The delegation token requester." },

Review Comment:
   Understood. This is also a case where the original creator of the fields 
missed this field from TokenInformation which is what ZK uses today.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-15 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1294918864


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -267,6 +270,10 @@ public boolean isLeaderEpochBumpRequiredOnIsrShrink() {
 return !this.isAtLeast(IBP_3_6_IV0);
 }
 
+public boolean isDelegationTokenSupported() {
+return this.isAtLeast(IBP_3_6_IV1);
+}

Review Comment:
   Done. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-15 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1294766805


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -173,11 +173,14 @@ public enum MetadataVersion {
 // Adds replica epoch to Fetch request (KIP-903).
 IBP_3_5_IV1(10, "3.5", "IV1", false),
 
-// Support for SCRAM
+// KRaft support for SCRAM
 IBP_3_5_IV2(11, "3.5", "IV2", true),
 
-// Remove leader epoch bump when KRaft controller shrinks the ISR 
(KAFKA-15021)
-IBP_3_6_IV0(12, "3.6", "IV0", false);
+// Support for Remove leader epoch bump when KRaft controller shrinks the 
ISR (KAFKA-15021)
+IBP_3_6_IV0(12, "3.6", "IV0", false),
+
+// KRaft support for DelegationTokens

Review Comment:
   Fixed. 
   The conflict with metadata transaction commit have been addressed but make 
sure to look closely at changes around that and delegation tokens in 
MetadataVersion.java



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-15 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1294648669


##
core/src/main/scala/kafka/server/DelegationTokenManager.scala:
##
@@ -186,57 +133,28 @@ class DelegationTokenManager(val config: KafkaConfig,
   val tokenMaxLifetime: Long = config.delegationTokenMaxLifeMs
   val defaultTokenRenewTime: Long = config.delegationTokenExpiryTimeMs
   val tokenRemoverScanInterval: Long = 
config.delegationTokenExpiryCheckIntervalMs

Review Comment:
   Yes. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-15 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1294624738


##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -842,6 +847,75 @@ class ControllerApis(val requestChannel: RequestChannel,
   }
   }
 
+  def handleCreateDelegationTokenRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+val alterRequest = request.body[CreateDelegationTokenRequest]

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-15 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1294623852


##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -842,6 +847,75 @@ class ControllerApis(val requestChannel: RequestChannel,
   }
   }
 
+  def handleCreateDelegationTokenRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+val alterRequest = request.body[CreateDelegationTokenRequest]

Review Comment:
   I went with createTokenRequest.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-14 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1293796381


##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+private Time time = Time.SYSTEM;
+
+static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private DelegationTokenCache tokenCache = null;
+private String secretKeyString = null;
+private long tokenDefaultMaxLifetime = 0;
+private long tokenDefaultRenewLifetime = 0;
+private long tokenRemoverScanInterval = 0;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder setTokenCache(DelegationTokenCache tokenCache) {
+this.tokenCache = tokenCache;
+return this;
+}
+
+Builder setTokenKeyString(String secretKeyString) {
+this.secretKeyString = secretKeyString;
+return this;
+}
+
+Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+return this;
+}
+
+Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+return this;
+}
+
+Builder setDelegationTokenExpiryCheckIntervalMs(long 
tokenRemoverScanInterval) {
+this.tokenRemoverScanInterval = tokenRemoverScanInterval;
+return this;
+}
+
+DelegationTokenControlManager build() {
+if (logContext == null) 

[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-14 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1293793859


##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+private Time time = Time.SYSTEM;
+
+static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private DelegationTokenCache tokenCache = null;
+private String secretKeyString = null;
+private long tokenDefaultMaxLifetime = 0;
+private long tokenDefaultRenewLifetime = 0;
+private long tokenRemoverScanInterval = 0;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder setTokenCache(DelegationTokenCache tokenCache) {
+this.tokenCache = tokenCache;
+return this;
+}
+
+Builder setTokenKeyString(String secretKeyString) {
+this.secretKeyString = secretKeyString;
+return this;
+}
+
+Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+return this;
+}
+
+Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+return this;
+}
+
+Builder setDelegationTokenExpiryCheckIntervalMs(long 
tokenRemoverScanInterval) {
+this.tokenRemoverScanInterval = tokenRemoverScanInterval;
+return this;
+}
+
+DelegationTokenControlManager build() {
+if (logContext == null) 

[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-14 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1293786529


##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -842,6 +847,75 @@ class ControllerApis(val requestChannel: RequestChannel,
   }
   }
 
+  def handleCreateDelegationTokenRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+val alterRequest = request.body[CreateDelegationTokenRequest]
+
+val requester = request.context.principal
+val ownerPrincipalName = alterRequest.data.ownerPrincipalName
+val ownerPrincipalType = alterRequest.data.ownerPrincipalType
+val owner = if (ownerPrincipalName == null || ownerPrincipalName.isEmpty) {
+  request.context.principal
+} else {
+  new KafkaPrincipal(ownerPrincipalType, ownerPrincipalName)
+}
+
+// Requester is always allowed to create token for self
+if (!owner.equals(requester) && 
+  !authHelper.authorize(request.context, CREATE_TOKENS, USER, 
owner.toString)) {
+  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, 
requestThrottleMs,
+  Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED, owner, requester))
+}
+
+val context = new ControllerRequestContext(request.context.header.data, 
request.context.principal,
+  OptionalLong.empty())
+
+// Copy the response data to a new response so we can apply the request 
version
+controller.createDelegationToken(context, alterRequest.data)
+  .thenApply[Unit] { response =>
+ requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+   CreateDelegationTokenResponse.prepareResponse(
+ request.context.requestVersion,
+ requestThrottleMs,
+ Errors.forCode(response.errorCode()),
+ new KafkaPrincipal(response.principalType(), 
response.principalName()),
+ new KafkaPrincipal(response.tokenRequesterPrincipalType(), 
response.tokenRequesterPrincipalName()),
+ response.issueTimestampMs(),
+ response.expiryTimestampMs(),
+ response.maxTimestampMs(),
+ response.tokenId(),
+ ByteBuffer.wrap(response.hmac(
+  }
+  }
+
+  def handleRenewDelegationTokenRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+ val alterRequest = request.body[RenewDelegationTokenRequest]

Review Comment:
   Fixed



##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -842,6 +847,75 @@ class ControllerApis(val requestChannel: RequestChannel,
   }
   }
 
+  def handleCreateDelegationTokenRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+val alterRequest = request.body[CreateDelegationTokenRequest]
+
+val requester = request.context.principal
+val ownerPrincipalName = alterRequest.data.ownerPrincipalName
+val ownerPrincipalType = alterRequest.data.ownerPrincipalType
+val owner = if (ownerPrincipalName == null || ownerPrincipalName.isEmpty) {
+  request.context.principal
+} else {
+  new KafkaPrincipal(ownerPrincipalType, ownerPrincipalName)
+}
+
+// Requester is always allowed to create token for self
+if (!owner.equals(requester) && 
+  !authHelper.authorize(request.context, CREATE_TOKENS, USER, 
owner.toString)) {
+  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, 
requestThrottleMs,
+  Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED, owner, requester))
+}
+
+val context = new ControllerRequestContext(request.context.header.data, 
request.context.principal,
+  OptionalLong.empty())
+
+// Copy the response data to a new response so we can apply the request 
version
+controller.createDelegationToken(context, alterRequest.data)
+  .thenApply[Unit] { response =>
+ requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+   CreateDelegationTokenResponse.prepareResponse(
+ request.context.requestVersion,
+ requestThrottleMs,
+ Errors.forCode(response.errorCode()),
+ new KafkaPrincipal(response.principalType(), 
response.principalName()),
+ new KafkaPrincipal(response.tokenRequesterPrincipalType(), 
response.tokenRequesterPrincipalName()),
+ response.issueTimestampMs(),
+ response.expiryTimestampMs(),
+ response.maxTimestampMs(),
+ response.tokenId(),
+ ByteBuffer.wrap(response.hmac(
+  }
+  }
+
+  def handleRenewDelegationTokenRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+ val alterRequest = request.body[RenewDelegationTokenRequest]
+
+ val context = new ControllerRequestContext(
+   request.context.header.data,

[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-14 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1293786264


##
core/src/main/scala/kafka/server/DelegationTokenManager.scala:
##
@@ -24,30 +24,20 @@ import java.util.Base64
 
 import javax.crypto.spec.SecretKeySpec
 import javax.crypto.{Mac, SecretKey}
-import kafka.common.{NotificationHandler, ZkNodeChangeNotificationListener}
-import kafka.utils.{CoreUtils, Json, Logging}
-import kafka.zk.{DelegationTokenChangeNotificationSequenceZNode, 
DelegationTokenChangeNotificationZNode, DelegationTokensZNode, KafkaZkClient}
+import kafka.utils.Logging
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.security.scram.internals.{ScramFormatter, 
ScramMechanism}
 import org.apache.kafka.common.security.scram.ScramCredential
 import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
 import org.apache.kafka.common.security.token.delegation.{DelegationToken, 
TokenInformation}
-import org.apache.kafka.common.utils.{Sanitizer, SecurityUtils, Time}
+import org.apache.kafka.common.utils.Time
 
 import scala.jdk.CollectionConverters._
 import scala.collection.mutable
 
 object DelegationTokenManager {
   val DefaultHmacAlgorithm = "HmacSHA512"
-  val OwnerKey ="owner"
-  val TokenRequesterKey = "tokenRequester"
-  val RenewersKey = "renewers"
-  val IssueTimestampKey = "issueTimestamp"
-  val MaxTimestampKey = "maxTimestamp"
-  val ExpiryTimestampKey = "expiryTimestamp"
-  val TokenIdKey = "tokenId"
-  val VersionKey = "version"
   val CurrentVersion = 3
   val ErrorTimestamp = -1
 

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-14 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1293675820


##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+private Time time = Time.SYSTEM;
+
+static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private DelegationTokenCache tokenCache = null;
+private String secretKeyString = null;
+private long tokenDefaultMaxLifetime = 0;
+private long tokenDefaultRenewLifetime = 0;
+private long tokenRemoverScanInterval = 0;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder setTokenCache(DelegationTokenCache tokenCache) {
+this.tokenCache = tokenCache;
+return this;
+}
+
+Builder setTokenKeyString(String secretKeyString) {
+this.secretKeyString = secretKeyString;
+return this;
+}
+
+Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+return this;
+}
+
+Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+return this;
+}
+
+Builder setDelegationTokenExpiryCheckIntervalMs(long 
tokenRemoverScanInterval) {
+this.tokenRemoverScanInterval = tokenRemoverScanInterval;
+return this;
+}
+
+DelegationTokenControlManager build() {
+if (logContext == null) 

[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-14 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1293465257


##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -842,6 +847,75 @@ class ControllerApis(val requestChannel: RequestChannel,
   }
   }
 
+  def handleCreateDelegationTokenRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {
+val alterRequest = request.body[CreateDelegationTokenRequest]
+
+val requester = request.context.principal
+val ownerPrincipalName = alterRequest.data.ownerPrincipalName
+val ownerPrincipalType = alterRequest.data.ownerPrincipalType
+val owner = if (ownerPrincipalName == null || ownerPrincipalName.isEmpty) {
+  request.context.principal
+} else {
+  new KafkaPrincipal(ownerPrincipalType, ownerPrincipalName)
+}
+
+// Requester is always allowed to create token for self
+if (!owner.equals(requester) && 

Review Comment:
   We may in the future allow for talking directly to the kcontroller nodes and 
so all checks need to be done in the kcontroller too.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-14 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1293458695


##
metadata/src/main/java/org/apache/kafka/controller/QuorumController.java:
##
@@ -1498,6 +1543,30 @@ private void cancelNextWriteNoOpRecord() {
 queue.cancelDeferred(WRITE_NO_OP_RECORD);
 }
 
+private static final String WRITE_REMOVE_DELEGATIONTOKEN_RECORD = 
"writeRemoveDelegationTokenRecord";

Review Comment:
   I changed it to SWEEP_EXPIRED_DELEGATION_TOKENS and 
"sweepExpiredDelegationTokens" as that is more of what it is doing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-14 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1293455737


##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##
@@ -633,4 +641,35 @@ void handleAclsDelta(AclsImage image, AclsDelta delta, 
KRaftMigrationOperationCo
 migrationClient.aclClient().writeResourceAcls(resourcePattern, 
accessControlEntries, migrationState));
 });
 }
+
+void handleDelegationTokenDelta(DelegationTokenImage image, 
DelegationTokenDelta delta, KRaftMigrationOperationConsumer operationConsumer) {
+Set updatedResources = delta.changes().keySet();

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-14 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1293448066


##
core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala:
##
@@ -109,13 +127,44 @@ class DelegationTokenRequestsTest extends BaseRequestTest 
with SaslSetup {
 val expireResult2 = adminClient.expireDelegationToken(token2.hmac())
 expiryTimestamp = expireResult2.expiryTimestamp().get()
 
+val expireResult3 = adminClient.expireDelegationToken(token3.hmac())
+expiryTimestamp = expireResult3.expiryTimestamp().get()
+
+TestUtils.waitUntilTrue(() => brokers.forall(server => 
server.tokenCache.tokens().size() == 0),
+  "Timed out waiting for token to propagate to all servers")
+
 tokens = adminClient.describeDelegationToken().delegationTokens().get()
 assertTrue(tokens.size == 0)
 
 //create token with invalid principal type
-val renewer3 = 
List(SecurityUtils.parseKafkaPrincipal("Group:Renewer3")).asJava
-val createResult3 = adminClient.createDelegationToken(new 
CreateDelegationTokenOptions().renewers(renewer3))
-assertThrows(classOf[ExecutionException], () => 
createResult3.delegationToken().get()).getCause.isInstanceOf[InvalidPrincipalTypeException]
+val renewer4 = 
List(SecurityUtils.parseKafkaPrincipal("Group:Renewer4")).asJava
+val createResult4 = adminClient.createDelegationToken(new 
CreateDelegationTokenOptions().renewers(renewer4))
+val createResult4Error = assertThrows(classOf[ExecutionException], () => 
createResult4.delegationToken().get())
+
assertTrue(createResult4Error.getCause.isInstanceOf[InvalidPrincipalTypeException])
+
+// Try to renew a deleted token
+val renewResultPostDelete = adminClient.renewDelegationToken(token1.hmac())
+val renewResultPostDeleteError = assertThrows(classOf[ExecutionException], 
() => renewResultPostDelete.expiryTimestamp().get())
+
assertTrue(renewResultPostDeleteError.getCause.isInstanceOf[DelegationTokenNotFoundException])
+
+// Create a DelegationToken with a short lifetime to validate the expire 
code
+val createResult5 = adminClient.createDelegationToken(new 
CreateDelegationTokenOptions()
+  .renewers(renewer1)
+  .maxlifeTimeMs(60 * 1000))
+val token5 = createResult5.delegationToken().get()
+
+TestUtils.waitUntilTrue(() => brokers.forall(server => 
server.tokenCache.tokens().size() == 1),
+  "Timed out waiting for token to propagate to all servers")
+
+Thread.sleep(2 * 60 *1000)

Review Comment:
   > is this Thread.sleep required?
   
   Yes, we are testing that the sweep of expired tokens works. We are creating 
a token with a short lifetime and we have set up the sweep of expired tokens to 
run every minute. We are testing that the sweep removed the token it is no 
longer in the cache.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-14 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1293441755


##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+private Time time = Time.SYSTEM;
+
+static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private DelegationTokenCache tokenCache = null;
+private String secretKeyString = null;
+private long tokenDefaultMaxLifetime = 0;
+private long tokenDefaultRenewLifetime = 0;
+private long tokenRemoverScanInterval = 0;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder setTokenCache(DelegationTokenCache tokenCache) {
+this.tokenCache = tokenCache;
+return this;
+}
+
+Builder setTokenKeyString(String secretKeyString) {
+this.secretKeyString = secretKeyString;
+return this;
+}
+
+Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+return this;
+}
+
+Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+return this;
+}
+
+Builder setDelegationTokenExpiryCheckIntervalMs(long 
tokenRemoverScanInterval) {
+this.tokenRemoverScanInterval = tokenRemoverScanInterval;
+return this;
+}
+
+DelegationTokenControlManager build() {
+if (logContext == null) 

[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-14 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1293439494


##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -176,8 +176,11 @@ public enum MetadataVersion {
 // Support for SCRAM
 IBP_3_5_IV2(11, "3.5", "IV2", true),
 
-// Remove leader epoch bump when KRaft controller shrinks the ISR 
(KAFKA-15021)
-IBP_3_6_IV0(12, "3.6", "IV0", false);
+// Support for Remove leader epoch bump when KRaft controller shrinks the 
ISR (KAFKA-15021)
+IBP_3_6_IV0(12, "3.6", "IV0", false),
+
+// Support for DelegationTokens

Review Comment:
   Fixed



##
metadata/src/main/java/org/apache/kafka/metadata/migration/KRaftMigrationZkWriter.java:
##
@@ -633,4 +641,35 @@ void handleAclsDelta(AclsImage image, AclsDelta delta, 
KRaftMigrationOperationCo
 migrationClient.aclClient().writeResourceAcls(resourcePattern, 
accessControlEntries, migrationState));
 });
 }
+
+void handleDelegationTokenDelta(DelegationTokenImage image, 
DelegationTokenDelta delta, KRaftMigrationOperationConsumer operationConsumer) {
+Set updatedResources = delta.changes().keySet();
+updatedResources.forEach(tokenId -> {
+DelegationTokenData tokenData = image.tokens().get(tokenId);
+if (tokenData == null) {
+operationConsumer.accept("DeleteDelegationToken", "Delete 
DelegationToken for " + tokenId, migrationState ->
+
migrationClient.delegationTokenClient().deleteDelegationToken(tokenId, 
migrationState));
+} else {
+operationConsumer.accept("UpdateDelegationToken", "Update 
DelegationToken for " + tokenId, migrationState ->
+
migrationClient.delegationTokenClient().writeDelegationToken(tokenId, 
tokenData.tokenInformation(), migrationState));
+}
+});
+}
+
+void handleDelegationTokenSnapshot(DelegationTokenImage image, 
KRaftMigrationOperationConsumer operationConsumer) {
+image.tokens().keySet().forEach(tokenId -> {
+DelegationTokenData tokenData = image.tokens().get(tokenId);
+operationConsumer.accept("UpdateDelegationToken", "Update 
DelegationToken for " + tokenId, migrationState ->
+
migrationClient.delegationTokenClient().writeDelegationToken(tokenId, 
tokenData.tokenInformation(), migrationState));
+});
+
+List tokens = 
migrationClient.delegationTokenClient().getDelegationTokens();
+tokens.forEach(tokenId -> {
+if (!image.tokens().containsKey(tokenId)) {
+operationConsumer.accept("DeleteDelegationToken", "Delete 
DelegationToken for " + tokenId, migrationState ->
+
migrationClient.delegationTokenClient().deleteDelegationToken(tokenId, 
migrationState));
+

Review Comment:
   Fixed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-12 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1292539194


##
core/src/main/scala/kafka/server/DelegationTokenManager.scala:
##
@@ -165,10 +112,11 @@ object DelegationTokenManager {
 
 class DelegationTokenManager(val config: KafkaConfig,
  val tokenCache: DelegationTokenCache,
- val time: Time,
- val zkClient: KafkaZkClient) extends Logging {
+ val time: Time) extends Logging {
   this.logIdent = s"[Token Manager on Broker ${config.brokerId}]: "
 
+  protected val lock = new Object()
+

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-12 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1292538597


##
core/src/main/scala/kafka/server/metadata/KRaftMetadataCache.scala:
##
@@ -400,4 +400,10 @@ class KRaftMetadataCache(val brokerId: Int) extends 
MetadataCache with Logging w
   image.highestOffsetAndEpoch().offset,
   true)
   }
+
+  // Return a list of tokens TokenInformation for a list of KafkaPricipals

Review Comment:
   Done. I thought I was going to implement the describe DelegationToken but 
then I realized I could keep the old code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-12 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1292538778


##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+private Time time = Time.SYSTEM;
+
+static class Builder {
+private LogContext logContext = null;
+private SnapshotRegistry snapshotRegistry = null;
+private DelegationTokenCache tokenCache = null;
+private String secretKeyString = null;
+private long tokenDefaultMaxLifetime = 0;
+private long tokenDefaultRenewLifetime = 0;
+private long tokenRemoverScanInterval = 0;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+this.snapshotRegistry = snapshotRegistry;
+return this;
+}
+
+Builder setTokenCache(DelegationTokenCache tokenCache) {
+this.tokenCache = tokenCache;
+return this;
+}
+
+Builder setTokenKeyString(String secretKeyString) {
+this.secretKeyString = secretKeyString;
+return this;
+}
+
+Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+return this;
+}
+
+Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+return this;
+}
+
+Builder setDelegationTokenExpiryCheckIntervalMs(long 
tokenRemoverScanInterval) {
+this.tokenRemoverScanInterval = tokenRemoverScanInterval;
+return this;
+}
+
+DelegationTokenControlManager build() {
+if (logContext == null) 

[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-12 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1292538611


##
core/src/test/scala/integration/kafka/api/DelegationTokenEndToEndAuthorizationTest.scala:
##
@@ -145,12 +163,12 @@ class DelegationTokenEndToEndAuthorizationTest extends 
EndToEndAuthorizationTest
   val privilegedAdminClient = 
createScramAdminClient(kafkaClientSaslMechanism, kafkaPrincipal.getName, 
kafkaPassword)
   try {
 val token = 
adminClient.createDelegationToken(createDelegationTokenOptions()).delegationToken().get()
-if (assert) {
-  assertToken(token)
-}
+//if (assert) {

Review Comment:
   Done and tested



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-12 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1292538382


##
core/src/main/scala/kafka/server/MetadataSupport.scala:
##
@@ -69,6 +69,16 @@ sealed trait MetadataSupport {
   handler(request)
 }
   }
+
+  def alsoMaybeForward(request: RequestChannel.Request,

Review Comment:
   Removed. I added it for some early testing.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-02 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1282518474


##
core/src/main/scala/kafka/server/DelegationTokenManager.scala:
##
@@ -165,10 +112,11 @@ object DelegationTokenManager {
 
 class DelegationTokenManager(val config: KafkaConfig,

Review Comment:
   > I think the functionality of this class has changed drastically as in the 
new implementation it seems like to be a cache only. Having 
DelegationTokenManagerZk extending this is a little bit confusing as you need 
to throw exceptions in methods that this class really would never use. What do 
you think about refactoring this a bit and split it into 2 parts:
   > 
   > * a cache that has a zk and a kraft impl
   > * a forwarder that also has a zk and a kraft impl
   > 
   > You can then initialize it accordingly in `BrokerServer` and `KafkaServer` 
accordingly.
   
   See the answer in the next comment. 
   
   One additional reason this doesn't work is that KafkaApi needs to access the 
TokenCache via a DelegationTokenManager for both Zk and Kraft so having a Zk 
and Kraft implementation would need a base class anyway.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-02 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1282514451


##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {

Review Comment:
   > It seems to me that the overall way of creating/expiring etc. a token is 
very similar both in Zk and KRaft as we aim to keep the functionality mostly 
intact. Do you think it should be refactored to a common code that has 
`createToken`, `expireToken` etc. method that can be used both in this class 
and `DelegationTokenManagerZk`?
   
   The answer to why it is the way I coded it is long and I spent a lot of time 
trying different things until I finally settled on the current implementation.
   
   First, the original DelegationTokenManager was and is the interface for 
updating the TokenCache for DelegationTokens. Authentication using 
DelegationTokens is handled elsewhere using the TokenCache and has not changed 
at all. I felt that reimplementing the TokenCache and then updating the 
authentication mechanisms was too much work, so I had to keep some interface to 
the TokenCache for both Zk and KRaft.
   
   The original ZK implementation is to call the DelegationTokenManager for 
updates which queries the TokenCache for state, updates the token state and 
pushes the new token to Zk and updating the local TokenCache. The other brokers 
will pull the new state of the token and update their local TokenCache in the 
DelegationTokenManager. This workflow has not changed in the new code, but the 
Zk specific code has moved to DelegationTokenManagerZk which inherits the 
common code from DelegationTokenManager and overrides the methods it needs to 
achieve 100% backwards compatibility.
   
   The KRaft way is to forward the request to a controller which queries a 
local TokenCache for state, updates the token and pushes the token to the 
metadata log. Then on replay of 

[GitHub] [kafka] pprovenzano commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-02 Thread via GitHub


pprovenzano commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1282479785


##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -842,6 +847,75 @@ class ControllerApis(val requestChannel: RequestChannel,
   }
   }
 
+  def handleCreateDelegationTokenRequest(request: RequestChannel.Request): 
CompletableFuture[Unit] = {

Review Comment:
   > Is it allowed for users to send direct requests like this to the 
controller (bypassing the broker)?
   
   RIght now no, but in the future maybe yes. 
   One strong motivation of the KRaft design is that you can make the 
controllers more secure if there isn't direct communication to the controller 
form an external source. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org