[GitHub] [kafka] rondagostino commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-18 Thread via GitHub


rondagostino commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1298761957


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -5290,8 +5290,10 @@ class KafkaApisTest {
 
 // read the header from the buffer first so that the body can be read next 
from the Request constructor
 val header = RequestHeader.parse(buffer)
-val context = new RequestContext(header, "1", InetAddress.getLocalHost, 
KafkaPrincipal.ANONYMOUS,
-  listenerName, SecurityProtocol.PLAINTEXT, ClientInformation.EMPTY, 
fromPrivilegedListener,
+// DelegationTokens require the context authenticated to be a
+// non SecurityProtocol.PLAINTEXT and a non KafkaPrincipal.ANONYMOUS
+val context = new RequestContext(header, "1", InetAddress.getLocalHost, 
new KafkaPrincipal(KafkaPrincipal.USER_TYPE, "Alice"),
+  listenerName, SecurityProtocol.SSL, ClientInformation.EMPTY, 
fromPrivilegedListener,

Review Comment:
   A comment about how we test the failure case via integration tests (e.g. 
DelegationTokenRequestsOnPlainTextTest.scala) would be good to add so people 
don't think we missed testing that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-18 Thread via GitHub


rondagostino commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1298759956


##
core/src/test/scala/unit/kafka/server/KafkaApisTest.scala:
##
@@ -6069,19 +6071,19 @@ class KafkaApisTest {
   @Test
   def testRaftShouldAlwaysForwardCreateTokenRequest(): Unit = {
 metadataCache = MetadataCache.kRaftMetadataCache(brokerId)
-verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = 
true).handleCreateTokenRequest)
+verifyShouldAlwaysForwardErrorMessage(createKafkaApis(raftSupport = 
true).handleCreateTokenRequestZk)

Review Comment:
   I think this change is incorrect.  We are checking that a KRaft broker 
forwards to the KRaft controller, and {handleCreateTokenRequestZk, 
handleRenewTokenRequestZk,handleExpireTokenRequestZk} aren't invoked on KRaft 
brokers.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-18 Thread via GitHub


rondagostino commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1298748051


##
core/src/test/scala/unit/kafka/server/DelegationTokenRequestsTest.scala:
##
@@ -109,13 +127,44 @@ class DelegationTokenRequestsTest extends BaseRequestTest 
with SaslSetup {
 val expireResult2 = adminClient.expireDelegationToken(token2.hmac())
 expiryTimestamp = expireResult2.expiryTimestamp().get()
 
+val expireResult3 = adminClient.expireDelegationToken(token3.hmac())
+expiryTimestamp = expireResult3.expiryTimestamp().get()
+
+TestUtils.waitUntilTrue(() => brokers.forall(server => 
server.tokenCache.tokens().size() == 0),
+  "Timed out waiting for token to propagate to all servers")
+
 tokens = adminClient.describeDelegationToken().delegationTokens().get()
 assertTrue(tokens.size == 0)
 
 //create token with invalid principal type
-val renewer3 = 
List(SecurityUtils.parseKafkaPrincipal("Group:Renewer3")).asJava
-val createResult3 = adminClient.createDelegationToken(new 
CreateDelegationTokenOptions().renewers(renewer3))
-assertThrows(classOf[ExecutionException], () => 
createResult3.delegationToken().get()).getCause.isInstanceOf[InvalidPrincipalTypeException]
+val renewer4 = 
List(SecurityUtils.parseKafkaPrincipal("Group:Renewer4")).asJava
+val createResult4 = adminClient.createDelegationToken(new 
CreateDelegationTokenOptions().renewers(renewer4))
+val createResult4Error = assertThrows(classOf[ExecutionException], () => 
createResult4.delegationToken().get())
+
assertTrue(createResult4Error.getCause.isInstanceOf[InvalidPrincipalTypeException])
+
+// Try to renew a deleted token
+val renewResultPostDelete = adminClient.renewDelegationToken(token1.hmac())
+val renewResultPostDeleteError = assertThrows(classOf[ExecutionException], 
() => renewResultPostDelete.expiryTimestamp().get())
+
assertTrue(renewResultPostDeleteError.getCause.isInstanceOf[DelegationTokenNotFoundException])
+
+// Create a DelegationToken with a short lifetime to validate the expire 
code
+val createResult5 = adminClient.createDelegationToken(new 
CreateDelegationTokenOptions()
+  .renewers(renewer1)
+  .maxlifeTimeMs(60 * 1000))
+val token5 = createResult5.delegationToken().get()
+
+TestUtils.waitUntilTrue(() => brokers.forall(server => 
server.tokenCache.tokens().size() == 1),
+  "Timed out waiting for token to propagate to all servers")
+
+Thread.sleep(2 * 60 *1000)

Review Comment:
   Maybe we can use `TestUtils.waitUntilTrue()` instead, with a 2-minute timeout



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-18 Thread via GitHub


rondagostino commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1298740130


##
core/src/main/scala/kafka/server/BrokerServer.scala:
##
@@ -279,10 +280,7 @@ class BrokerServer(
   )
 
   /* start token manager */
-  if (config.tokenAuthEnabled) {
-throw new UnsupportedOperationException("Delegation tokens are not 
supported")
-  }
-  tokenManager = new DelegationTokenManager(config, tokenCache, time , 
null)
+  tokenManager = new DelegationTokenManager(config, tokenCache, time)
   tokenManager.startup() // does nothing, we just need a token manager in 
order to compile right now...

Review Comment:
   Can we get rid of this comment?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-18 Thread via GitHub


rondagostino commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1298734803


##
core/src/main/scala/kafka/server/ControllerApis.scala:
##
@@ -842,6 +848,114 @@ class ControllerApis(val requestChannel: RequestChannel,
   }
   }
 
+  def allowTokenRequests(request: RequestChannel.Request): Boolean = {
+val protocol = request.context.securityProtocol
+if (request.context.principal.tokenAuthenticated ||
+  // We allow forwarded requests to use PLAINTEXT for testing purposes
+  (request.isForwarded == false && protocol == SecurityProtocol.PLAINTEXT) 
||
+  // disallow requests from 1-way SSL
+  (protocol == SecurityProtocol.SSL && request.context.principal == 
KafkaPrincipal.ANONYMOUS))
+  false
+else
+  true
+  }

Review Comment:
   Special-casing for PLAINTEXT, etc -- smells pretty bad.  I'm thinking the 
appropriate check here is just whether the request is forwarded or not.  Given 
that it takes cluster privileges to send an envelope request, we can trust the 
broker to perform these checks.  The controllers could conceivably be within 
some private network that only the brokers have access to, for example, in that 
case who knows what the authentication between broker and controller looks like.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-18 Thread via GitHub


rondagostino commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1298718349


##
core/src/main/scala/kafka/server/DelegationTokenManager.scala:
##
@@ -165,16 +99,16 @@ object DelegationTokenManager {
 
 class DelegationTokenManager(val config: KafkaConfig,
  val tokenCache: DelegationTokenCache,
- val time: Time,
- val zkClient: KafkaZkClient) extends Logging {
+ val time: Time) extends Logging {
   this.logIdent = s"[Token Manager on Broker ${config.brokerId}]: "

Review Comment:
   Let's change this to refer to `Token Manager on Node` instead since this 
could be on a controller now.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-18 Thread via GitHub


rondagostino commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1298706120


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -2999,7 +2999,38 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleCreateTokenRequest(request: RequestChannel.Request): Unit = {
+val createTokenRequest = request.body[CreateDelegationTokenRequest]
+
+val requester = request.context.principal
+val ownerPrincipalName = createTokenRequest.data.ownerPrincipalName
+val owner = if (ownerPrincipalName == null || ownerPrincipalName.isEmpty) {
+  request.context.principal
+} else {
+  new KafkaPrincipal(createTokenRequest.data.ownerPrincipalType, 
ownerPrincipalName)
+}
+val renewerList = 
createTokenRequest.data.renewers.asScala.toList.map(entry =>
+  new KafkaPrincipal(entry.principalType, entry.principalName))
+
+if (!allowTokenRequests(request)) {
+  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, 
requestThrottleMs,
+  Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, owner, requester))
+} else if (!owner.equals(requester) && 
!authHelper.authorize(request.context, CREATE_TOKENS, USER, owner.toString)) {
+  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, 
requestThrottleMs,
+  Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED, owner, requester))
+} else if (renewerList.exists(principal => principal.getPrincipalType != 
KafkaPrincipal.USER_TYPE)) {
+  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, 
requestThrottleMs,
+  Errors.INVALID_PRINCIPAL_TYPE, owner, requester))
+} else {
+  maybeForwardToController(request, handleCreateTokenRequestZk)
+}
+  }
+
+  def handleCreateTokenRequestZk(request: RequestChannel.Request): Unit = {

Review Comment:
   I wonder if this is a broader problem.  Are there other RPCs that mutate 
Zookeeper that need to check that they are the active controller when migration 
is enabled?  It is likely a short window of time, but there does seem to be a 
possibility that an RPC could make it to a broker and end up mutating Zookeeper 
when in fact the active controller is a KRaft controller.  @mumrah WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-18 Thread via GitHub


rondagostino commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1298687894


##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+private Time time = Time.SYSTEM;
+
+static class Builder {
+private LogContext logContext = null;
+private DelegationTokenCache tokenCache = null;
+private String secretKeyString = null;
+private long tokenDefaultMaxLifetime = 0;
+private long tokenDefaultRenewLifetime = 0;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setTokenCache(DelegationTokenCache tokenCache) {
+this.tokenCache = tokenCache;
+return this;
+}
+
+Builder setTokenKeyString(String secretKeyString) {
+this.secretKeyString = secretKeyString;
+return this;
+}
+
+Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+return this;
+}
+
+Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+return this;
+}
+
+DelegationTokenControlManager build() {
+if (logContext == null) logContext = new LogContext();
+return new DelegationTokenControlManager(
+  logContext,
+  tokenCache,
+  secretKeyString,
+  tokenDefaultMaxLifetime,
+  tokenDefaultRenewLifetime);
+}
+}
+
+private final Logger log;
+private final DelegationTokenCache tokenCache;
+private final String secretKeyString;
+private final long tokenDefaultMaxLifetime;
+private final long tokenDefaultRenewLifetime;
+
+private 

[GitHub] [kafka] rondagostino commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-18 Thread via GitHub


rondagostino commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1298683576


##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+private Time time = Time.SYSTEM;
+
+static class Builder {
+private LogContext logContext = null;
+private DelegationTokenCache tokenCache = null;
+private String tokenSecretKeyString = null;
+private long tokenDefaultMaxLifetimeMs = 0;
+private long tokenDefaultRenewLifetimeMs = 0;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setTokenCache(DelegationTokenCache tokenCache) {
+this.tokenCache = tokenCache;
+return this;
+}
+
+Builder setDelegationTokenSecretKey(String tokenSecretKeyString) {
+this.tokenSecretKeyString = tokenSecretKeyString;
+return this;
+}
+
+Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetimeMs) {
+this.tokenDefaultMaxLifetimeMs = tokenDefaultMaxLifetimeMs;
+return this;
+}
+
+Builder setDelegationTokenExpiryTimeMs(long 
tokenDefaultRenewLifetimeMs) {
+this.tokenDefaultRenewLifetimeMs = tokenDefaultRenewLifetimeMs;
+return this;
+}
+
+DelegationTokenControlManager build() {
+if (logContext == null) logContext = new LogContext();
+return new DelegationTokenControlManager(
+  logContext,
+  tokenCache,
+  tokenSecretKeyString,
+  tokenDefaultMaxLifetimeMs,
+  tokenDefaultRenewLifetimeMs);
+}
+}
+
+private final Logger log;
+private final DelegationTokenCache tokenCache;
+private final String tokenSecretKeyString;
+private final long tokenDefaultMaxLifetimeMs;
+private final long 

[GitHub] [kafka] rondagostino commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-18 Thread via GitHub


rondagostino commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1298682884


##
metadata/src/main/java/org/apache/kafka/controller/DelegationTokenControlManager.java:
##
@@ -0,0 +1,359 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.controller;
+
+import org.apache.kafka.common.message.CreateDelegationTokenRequestData;
+import 
org.apache.kafka.common.message.CreateDelegationTokenRequestData.CreatableRenewers;
+import org.apache.kafka.common.message.CreateDelegationTokenResponseData;
+import org.apache.kafka.common.message.ExpireDelegationTokenRequestData;
+import org.apache.kafka.common.message.ExpireDelegationTokenResponseData;
+import org.apache.kafka.common.message.RenewDelegationTokenRequestData;
+import org.apache.kafka.common.message.RenewDelegationTokenResponseData;
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.requests.ApiError;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.charset.StandardCharsets;
+import javax.crypto.spec.SecretKeySpec;
+import javax.crypto.Mac;
+
+import org.slf4j.Logger;
+
+import java.util.ArrayList;
+import java.util.Base64;
+import java.util.List;
+
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_AUTH_DISABLED;
+import static org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_EXPIRED;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_NOT_FOUND;
+import static 
org.apache.kafka.common.protocol.Errors.DELEGATION_TOKEN_OWNER_MISMATCH;
+import static org.apache.kafka.common.protocol.Errors.INVALID_PRINCIPAL_TYPE;
+import static org.apache.kafka.common.protocol.Errors.NONE;
+import static org.apache.kafka.common.protocol.Errors.UNSUPPORTED_VERSION;
+
+/**
+ * Manages DelegationTokens.
+ */
+public class DelegationTokenControlManager {
+private Time time = Time.SYSTEM;
+
+static class Builder {
+private LogContext logContext = null;
+private DelegationTokenCache tokenCache = null;
+private String secretKeyString = null;
+private long tokenDefaultMaxLifetime = 0;
+private long tokenDefaultRenewLifetime = 0;
+
+Builder setLogContext(LogContext logContext) {
+this.logContext = logContext;
+return this;
+}
+
+Builder setTokenCache(DelegationTokenCache tokenCache) {
+this.tokenCache = tokenCache;
+return this;
+}
+
+Builder setTokenKeyString(String secretKeyString) {
+this.secretKeyString = secretKeyString;
+return this;
+}
+
+Builder setDelegationTokenMaxLifeMs(long tokenDefaultMaxLifetime) {
+this.tokenDefaultMaxLifetime = tokenDefaultMaxLifetime;
+return this;
+}
+
+Builder setDelegationTokenExpiryTimeMs(long tokenDefaultRenewLifetime) 
{
+this.tokenDefaultRenewLifetime = tokenDefaultRenewLifetime;
+return this;
+}
+
+DelegationTokenControlManager build() {
+if (logContext == null) logContext = new LogContext();
+return new DelegationTokenControlManager(
+  logContext,
+  tokenCache,
+  secretKeyString,
+  tokenDefaultMaxLifetime,
+  tokenDefaultRenewLifetime);
+}
+}
+
+private final Logger log;
+private final DelegationTokenCache tokenCache;
+private final String secretKeyString;
+private final long tokenDefaultMaxLifetime;
+private final long tokenDefaultRenewLifetime;
+
+private 

[GitHub] [kafka] rondagostino commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-17 Thread via GitHub


rondagostino commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1297350659


##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -215,9 +215,9 @@ class KafkaApis(val requestChannel: RequestChannel,
 case ApiKeys.DESCRIBE_LOG_DIRS => handleDescribeLogDirsRequest(request)
 case ApiKeys.SASL_AUTHENTICATE => 
handleSaslAuthenticateRequest(request)
 case ApiKeys.CREATE_PARTITIONS => maybeForwardToController(request, 
handleCreatePartitionsRequest)
-case ApiKeys.CREATE_DELEGATION_TOKEN => 
maybeForwardToController(request, handleCreateTokenRequest)
-case ApiKeys.RENEW_DELEGATION_TOKEN => 
maybeForwardToController(request, handleRenewTokenRequest)
-case ApiKeys.EXPIRE_DELEGATION_TOKEN => 
maybeForwardToController(request, handleExpireTokenRequest)
+case ApiKeys.CREATE_DELEGATION_TOKEN => 
handleCreateTokenRequest(request)
+case ApiKeys.RENEW_DELEGATION_TOKEN => handleRenewTokenRequest(request)
+case ApiKeys.EXPIRE_DELEGATION_TOKEN => 
handleExpireTokenRequest(request)

Review Comment:
   Needs a comment about why we can't forward here



##
core/src/main/scala/kafka/server/KafkaApis.scala:
##
@@ -2999,7 +2999,38 @@ class KafkaApis(val requestChannel: RequestChannel,
   }
 
   def handleCreateTokenRequest(request: RequestChannel.Request): Unit = {
+val createTokenRequest = request.body[CreateDelegationTokenRequest]
+
+val requester = request.context.principal
+val ownerPrincipalName = createTokenRequest.data.ownerPrincipalName
+val owner = if (ownerPrincipalName == null || ownerPrincipalName.isEmpty) {
+  request.context.principal
+} else {
+  new KafkaPrincipal(createTokenRequest.data.ownerPrincipalType, 
ownerPrincipalName)
+}
+val renewerList = 
createTokenRequest.data.renewers.asScala.toList.map(entry =>
+  new KafkaPrincipal(entry.principalType, entry.principalName))
+
+if (!allowTokenRequests(request)) {
+  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, 
requestThrottleMs,
+  Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, owner, requester))
+} else if (!owner.equals(requester) && 
!authHelper.authorize(request.context, CREATE_TOKENS, USER, owner.toString)) {
+  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, 
requestThrottleMs,
+  Errors.DELEGATION_TOKEN_AUTHORIZATION_FAILED, owner, requester))
+} else if (renewerList.exists(principal => principal.getPrincipalType != 
KafkaPrincipal.USER_TYPE)) {
+  requestHelper.sendResponseMaybeThrottle(request, requestThrottleMs =>
+
CreateDelegationTokenResponse.prepareResponse(request.context.requestVersion, 
requestThrottleMs,
+  Errors.INVALID_PRINCIPAL_TYPE, owner, requester))
+} else {
+  maybeForwardToController(request, handleCreateTokenRequestZk)
+}
+  }
+
+  def handleCreateTokenRequestZk(request: RequestChannel.Request): Unit = {

Review Comment:
   Need to check that we are the active controller (and in the other 
renew/expire methods as well)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] rondagostino commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-16 Thread via GitHub


rondagostino commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1296233969


##
metadata/src/main/java/org/apache/kafka/metadata/DelegationTokenData.java:
##


Review Comment:
   Is a Unit test necessary, do you think?  I also realize we don't have one 
for `TokenInformation`.



##
metadata/src/test/java/org/apache/kafka/image/DelegationTokenImageTest.java:
##
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.image;
+
+import org.apache.kafka.common.metadata.DelegationTokenRecord;
+import org.apache.kafka.common.metadata.RemoveDelegationTokenRecord;
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import org.apache.kafka.common.utils.SecurityUtils;
+import org.apache.kafka.image.writer.ImageWriterOptions;
+import org.apache.kafka.image.writer.RecordListWriter;
+import org.apache.kafka.metadata.RecordTestUtils;
+import org.apache.kafka.metadata.DelegationTokenData;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.common.MetadataVersion;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+
+@Timeout(value = 40)
+public class DelegationTokenImageTest {
+public final static DelegationTokenImage IMAGE1;
+
+public final static List DELTA1_RECORDS;
+
+final static DelegationTokenDelta DELTA1;
+
+final static DelegationTokenImage IMAGE2;
+
+static DelegationTokenData randomDelegationTokenData(String tokenId, long 
expireTimestamp) {
+TokenInformation ti = new TokenInformation(
+tokenId,
+SecurityUtils.parseKafkaPrincipal(KafkaPrincipal.USER_TYPE + ":" + 
"fred"),
+SecurityUtils.parseKafkaPrincipal(KafkaPrincipal.USER_TYPE + ":" + 
"fred"),
+new ArrayList(),
+0,
+1000,
+expireTimestamp);
+return new DelegationTokenData(ti);
+}
+
+static {
+Map image1 = new HashMap<>();
+image1.put("somerandomuuid1", 
randomDelegationTokenData("somerandomuuid1", 100));
+image1.put("somerandomuuid2", 
randomDelegationTokenData("somerandomuuid2", 100));
+image1.put("somerandomuuid3", 
randomDelegationTokenData("somerandomuuid3", 100));
+IMAGE1 = new DelegationTokenImage(image1);
+
+DELTA1_RECORDS = new ArrayList<>();
+DELTA1_RECORDS.add(new ApiMessageAndVersion(new 
DelegationTokenRecord().
+setOwner(KafkaPrincipal.USER_TYPE + ":" + "fred").
+setRequester(KafkaPrincipal.USER_TYPE + ":" + "fred").
+setIssueTimestamp(0).
+setMaxTimestamp(1000).
+setExpirationTimestamp(200).
+setTokenId("somerandomuuid1"), (short) 0));
+DELTA1_RECORDS.add(new ApiMessageAndVersion(new 
RemoveDelegationTokenRecord().
+setTokenId("somerandomuuid3"), (short) 0));
+
+DELTA1 = new DelegationTokenDelta(IMAGE1);
+RecordTestUtils.replayAll(DELTA1, DELTA1_RECORDS);
+
+Map image2 = new HashMap<>();
+image2.put("somerandomuuid1", 
randomDelegationTokenData("somerandomuuid1", 200));
+image2.put("somerandomuuid2", 
randomDelegationTokenData("somerandomuuid2", 100));
+IMAGE2 = new DelegationTokenImage(image2);
+}
+
+@Test
+public void testEmptyImageRoundTrip() throws Throwable {
+testToImageAndBack(DelegationTokenImage.EMPTY);
+}
+
+@Test
+public void testImage1RoundTrip() throws Throwable {
+testToImageAndBack(IMAGE1);
+}
+
+@Test
+public void testApplyDelta1() throws Throwable {
+assertEquals(IMAGE2, DELTA1.apply());
+}
+
+@Test
+public void testImage2RoundTrip() throws Throwable {
+testToImageAndBack(IMAGE2);
+}
+
+private void testToImageAndBack(DelegationTokenImage image) throws 
Throwable {
+

[GitHub] [kafka] rondagostino commented on a diff in pull request #14083: KAFKA-15219: KRaft support for DelegationTokens

2023-08-14 Thread via GitHub


rondagostino commented on code in PR #14083:
URL: https://github.com/apache/kafka/pull/14083#discussion_r1293962499


##
metadata/src/test/java/org/apache/kafka/image/DelegationTokenImageTest.java:
##


Review Comment:
   Can we include `RemoveDelegationTokenRecord` in a test?



##
metadata/src/main/resources/common/metadata/DelegationTokenRecord.json:
##
@@ -22,8 +22,10 @@
   "fields": [
 { "name": "Owner", "type": "string", "versions": "0+",
   "about": "The delegation token owner." },
+{ "name": "Requester", "type": "string", "versions": "0+",
+  "about": "The delegation token requester." },

Review Comment:
   Normally we append new stuff at the end, but this record has never been used 
before, so I'm okay with inserting in the middle in this case.



##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -173,11 +173,14 @@ public enum MetadataVersion {
 // Adds replica epoch to Fetch request (KIP-903).
 IBP_3_5_IV1(10, "3.5", "IV1", false),
 
-// Support for SCRAM
+// KRaft support for SCRAM
 IBP_3_5_IV2(11, "3.5", "IV2", true),
 
-// Remove leader epoch bump when KRaft controller shrinks the ISR 
(KAFKA-15021)
-IBP_3_6_IV0(12, "3.6", "IV0", false);
+// Support for Remove leader epoch bump when KRaft controller shrinks the 
ISR (KAFKA-15021)
+IBP_3_6_IV0(12, "3.6", "IV0", false),
+
+// KRaft support for DelegationTokens

Review Comment:
   `s/DelegationTokens/Delegation Tokens/`



##
metadata/src/test/java/org/apache/kafka/image/MetadataImageTest.java:
##


Review Comment:
   `testApplyDelta1()` needs to have 
`records.addAll(DelegationTokenImageTest.DELTA1_RECORDS);`. It actually 
concerns me that this test is passing without it.  I added it locally and it 
still passes.  How could that be?  Something must be wrong somewhere.  Could 
you take a look?



##
server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java:
##
@@ -267,6 +270,10 @@ public boolean isLeaderEpochBumpRequiredOnIsrShrink() {
 return !this.isAtLeast(IBP_3_6_IV0);
 }
 
+public boolean isDelegationTokenSupported() {
+return this.isAtLeast(IBP_3_6_IV1);
+}

Review Comment:
   Need a test for this in `MetadataVersionTest`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org