This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8827ce47010 KAFKA-19113: Migrate DelegationTokenManager to server 
module (#19424)
8827ce47010 is described below

commit 8827ce47010e1a6cb35119be2724dd47bf9c1ddb
Author: PoAn Yang <[email protected]>
AuthorDate: Mon Apr 14 22:49:45 2025 +0800

    KAFKA-19113: Migrate DelegationTokenManager to server module (#19424)
    
    1. Migrate DelegationTokenManager to server module.
    2. Rewrite DelegationTokenManager in Java.
    3. Move DelegationTokenManagerConfigs out of KafkaConfig.
    
    Reviewers: Mickael Maison <[email protected]>

---
 checkstyle/import-control-server.xml               |   1 +
 .../kafka/server/builders/KafkaApisBuilder.java    |   2 +-
 .../src/main/scala/kafka/server/BrokerServer.scala |   6 +-
 .../main/scala/kafka/server/ControllerApis.scala   |   6 +-
 .../main/scala/kafka/server/ControllerServer.scala |  17 +--
 .../kafka/server/DelegationTokenManager.scala      | 143 ---------------------
 core/src/main/scala/kafka/server/KafkaApis.scala   |  15 ++-
 core/src/main/scala/kafka/server/KafkaConfig.scala |   9 +-
 .../server/metadata/DelegationTokenPublisher.scala |   2 +-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |   8 --
 .../kafka/server/DelegationTokenManager.java       | 125 ++++++++++++++++++
 .../config/DelegationTokenManagerConfigs.java      |  36 ++++++
 .../config/DelegationTokenManagerConfigsTest.java  |  78 +++++++++++
 13 files changed, 266 insertions(+), 182 deletions(-)

diff --git a/checkstyle/import-control-server.xml 
b/checkstyle/import-control-server.xml
index b45b3c41c27..a35719e8761 100644
--- a/checkstyle/import-control-server.xml
+++ b/checkstyle/import-control-server.xml
@@ -82,6 +82,7 @@
   <allow pkg="org.apache.kafka.raft" />
 
   <subpackage name="server">
+    <allow pkg="javax.crypto" />
     <allow pkg="org.apache.kafka.server" />
     <allow pkg="org.apache.kafka.image" />
     <allow pkg="org.apache.kafka.storage.internals.log" />
diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java 
b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
index 86f8458eef1..8724fc3cb8e 100644
--- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
+++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
@@ -21,7 +21,6 @@ import kafka.coordinator.transaction.TransactionCoordinator;
 import kafka.network.RequestChannel;
 import kafka.server.ApiVersionManager;
 import kafka.server.AutoTopicCreationManager;
-import kafka.server.DelegationTokenManager;
 import kafka.server.FetchManager;
 import kafka.server.ForwardingManager;
 import kafka.server.KafkaApis;
@@ -38,6 +37,7 @@ import org.apache.kafka.coordinator.share.ShareCoordinator;
 import org.apache.kafka.metadata.ConfigRepository;
 import org.apache.kafka.metadata.MetadataCache;
 import org.apache.kafka.server.ClientMetricsManager;
+import org.apache.kafka.server.DelegationTokenManager;
 import org.apache.kafka.server.authorizer.Authorizer;
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
 
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 5f52e6abdcb..398c085d0d5 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -46,7 +46,7 @@ import org.apache.kafka.metadata.publisher.AclPublisher
 import org.apache.kafka.security.CredentialProvider
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.{ApiMessageAndVersion, 
DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
-import org.apache.kafka.server.config.ConfigType
+import org.apache.kafka.server.config.{ConfigType, 
DelegationTokenManagerConfigs}
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin, 
KafkaYammerMetrics}
 import org.apache.kafka.server.network.{EndpointReadyFutures, 
KafkaAuthorizerServerInfo}
@@ -54,7 +54,7 @@ import 
org.apache.kafka.server.share.persister.{DefaultStatePersister, NoOpState
 import org.apache.kafka.server.share.session.ShareSessionCache
 import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper}
 import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
-import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, 
ClientMetricsManager, DelayedActionQueue, ProcessRole}
+import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures, 
ClientMetricsManager, DelayedActionQueue, DelegationTokenManager, ProcessRole}
 import org.apache.kafka.storage.internals.log.LogDirFailureChannel
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 
@@ -355,7 +355,7 @@ class BrokerServer(
       )
 
       /* start token manager */
-      tokenManager = new DelegationTokenManager(config, tokenCache, time)
+      tokenManager = new DelegationTokenManager(new 
DelegationTokenManagerConfigs(config), tokenCache)
 
       /* initializing the groupConfigManager */
       groupConfigManager = new 
GroupConfigManager(config.groupCoordinatorConfig.extractGroupConfigMap(config.shareGroupConfig))
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala 
b/core/src/main/scala/kafka/server/ControllerApis.scala
index b294ddd3108..fbcb0e8572d 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -56,7 +56,7 @@ import 
org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
 import org.apache.kafka.metadata.{BrokerHeartbeatReply, 
BrokerRegistrationReply}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.server.ProcessRole
+import org.apache.kafka.server.{DelegationTokenManager, ProcessRole}
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.{ApiMessageAndVersion, RequestLocal}
 
@@ -970,7 +970,7 @@ class ControllerApis(
           new RenewDelegationTokenResponseData()
               .setThrottleTimeMs(requestThrottleMs)
               .setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code)
-              .setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp)))
+              .setExpiryTimestampMs(DelegationTokenManager.ERROR_TIMESTAMP)))
         CompletableFuture.completedFuture[Unit](())
     } else {
       val context = new ControllerRequestContext(
@@ -994,7 +994,7 @@ class ControllerApis(
           new ExpireDelegationTokenResponseData()
               .setThrottleTimeMs(requestThrottleMs)
               .setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code)
-              .setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp)))
+              .setExpiryTimestampMs(DelegationTokenManager.ERROR_TIMESTAMP)))
       CompletableFuture.completedFuture[Unit](())
     } else {
       val context = new ControllerRequestContext(
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 3ddf97d2705..c2826ec8bfc 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -40,11 +40,11 @@ import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
 import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher}
 import org.apache.kafka.raft.QuorumConfig
 import org.apache.kafka.security.CredentialProvider
-import org.apache.kafka.server.ProcessRole
+import org.apache.kafka.server.{DelegationTokenManager, ProcessRole}
 import org.apache.kafka.server.authorizer.Authorizer
 import 
org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG,
 CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
 import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion, 
NodeToControllerChannelManager}
-import org.apache.kafka.server.config.ConfigType
+import org.apache.kafka.server.config.{ConfigType, 
DelegationTokenManagerConfigs}
 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics, 
LinuxIoMetricsCollector}
 import org.apache.kafka.server.network.{EndpointReadyFutures, 
KafkaAuthorizerServerInfo}
 import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
@@ -206,9 +206,10 @@ class ControllerServer(
         
QuorumFeatures.defaultSupportedFeatureMap(config.unstableFeatureVersionsEnabled),
         controllerNodes.asScala.map(node => Integer.valueOf(node.id())).asJava)
 
+      val delegationTokenManagerConfigs = new 
DelegationTokenManagerConfigs(config)
       val delegationTokenKeyString = {
-        if (config.tokenAuthEnabled) {
-          config.delegationTokenSecretKey.value
+        if (delegationTokenManagerConfigs.tokenAuthEnabled) {
+          delegationTokenManagerConfigs.delegationTokenSecretKey.value
         } else {
           null
         }
@@ -247,9 +248,9 @@ class ControllerServer(
           
setNonFatalFaultHandler(sharedServer.nonFatalQuorumControllerFaultHandler).
           setDelegationTokenCache(tokenCache).
           setDelegationTokenSecretKey(delegationTokenKeyString).
-          setDelegationTokenMaxLifeMs(config.delegationTokenMaxLifeMs).
-          setDelegationTokenExpiryTimeMs(config.delegationTokenExpiryTimeMs).
-          
setDelegationTokenExpiryCheckIntervalMs(config.delegationTokenExpiryCheckIntervalMs).
+          
setDelegationTokenMaxLifeMs(delegationTokenManagerConfigs.delegationTokenMaxLifeMs).
+          
setDelegationTokenExpiryTimeMs(delegationTokenManagerConfigs.delegationTokenExpiryTimeMs).
+          
setDelegationTokenExpiryCheckIntervalMs(delegationTokenManagerConfigs.delegationTokenExpiryCheckIntervalMs).
           
setUncleanLeaderElectionCheckIntervalMs(config.uncleanLeaderElectionCheckIntervalMs).
           setInterBrokerListenerName(config.interBrokerListenerName.value()).
           
setControllerPerformanceSamplePeriodMs(config.controllerPerformanceSamplePeriodMs).
@@ -361,7 +362,7 @@ class ControllerServer(
           config,
           sharedServer.metadataPublishingFaultHandler,
           "controller",
-          new DelegationTokenManager(config, tokenCache, time)
+          new DelegationTokenManager(delegationTokenManagerConfigs, tokenCache)
       ))
 
       // Set up the metrics publisher.
diff --git a/core/src/main/scala/kafka/server/DelegationTokenManager.scala 
b/core/src/main/scala/kafka/server/DelegationTokenManager.scala
deleted file mode 100644
index b74e57999fb..00000000000
--- a/core/src/main/scala/kafka/server/DelegationTokenManager.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.nio.charset.StandardCharsets
-import java.security.InvalidKeyException
-import javax.crypto.spec.SecretKeySpec
-import javax.crypto.{Mac, SecretKey}
-import kafka.utils.Logging
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.security.scram.internals.{ScramFormatter, 
ScramMechanism}
-import org.apache.kafka.common.security.scram.ScramCredential
-import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
-import org.apache.kafka.common.security.token.delegation.{DelegationToken, 
TokenInformation}
-import org.apache.kafka.common.utils.Time
-
-import java.util
-import java.util.stream.Collectors
-import scala.jdk.CollectionConverters._
-import scala.collection.mutable
-
-object DelegationTokenManager {
-  private val DefaultHmacAlgorithm = "HmacSHA512"
-  val ErrorTimestamp = -1
-
-  /**
-   * Convert the byte[] to a secret key
-   * @param keybytes the byte[] to create the secret key from
-   * @return the secret key
-   */
-  private def createSecretKey(keybytes: Array[Byte]) : SecretKey = {
-    new SecretKeySpec(keybytes, DefaultHmacAlgorithm)
-  }
-
-  /**
-   * Compute HMAC of the identifier using the secret key
-   * @param tokenId the bytes of the identifier
-   * @param secretKey  the secret key
-   * @return  String of the generated hmac
-   */
-  def createHmac(tokenId: String, secretKey: SecretKey) : Array[Byte] = {
-    val mac =  Mac.getInstance(DefaultHmacAlgorithm)
-    try
-      mac.init(secretKey)
-    catch {
-      case ike: InvalidKeyException => throw new 
IllegalArgumentException("Invalid key to HMAC computation", ike)
-    }
-    mac.doFinal(tokenId.getBytes(StandardCharsets.UTF_8))
-  }
-
-  def filterToken(requesterPrincipal: KafkaPrincipal, owners : 
Option[List[KafkaPrincipal]], token: TokenInformation,
-                  authorizeToken: String => Boolean, authorizeRequester: 
KafkaPrincipal  => Boolean) : Boolean = {
-
-    val allow =
-    //exclude tokens which are not requested
-      if (owners.isDefined && !owners.get.exists(owner => 
token.ownerOrRenewer(owner))) {
-        false
-        //Owners and the renewers can describe their own tokens
-      } else if (token.ownerOrRenewer(requesterPrincipal)) {
-        true
-        // Check permission for non-owned tokens
-      } else if (authorizeToken(token.tokenId) || 
authorizeRequester(token.owner)) {
-        true
-      }
-      else {
-        false
-      }
-
-    allow
-  }
-}
-
-class DelegationTokenManager(val config: KafkaConfig,
-                             val tokenCache: DelegationTokenCache,
-                             val time: Time) extends Logging {
-  this.logIdent = s"[Token Manager on Node ${config.brokerId}]: "
-
-  import DelegationTokenManager._
-
-  val secretKey: SecretKey = {
-    val keyBytes =  if (config.tokenAuthEnabled) 
config.delegationTokenSecretKey.value.getBytes(StandardCharsets.UTF_8) else null
-    if (keyBytes == null || keyBytes.isEmpty) null
-    else
-      createSecretKey(keyBytes)
-  }
-
-  /**
-   * @param hmacString
-   */
-  private def prepareScramCredentials(hmacString: String) : Map[String, 
ScramCredential] = {
-    val scramCredentialMap = mutable.Map[String, ScramCredential]()
-
-    def scramCredential(mechanism: ScramMechanism): ScramCredential = {
-      new ScramFormatter(mechanism).generateCredential(hmacString, 
mechanism.minIterations)
-    }
-
-    for (mechanism <- ScramMechanism.values)
-      scramCredentialMap(mechanism.mechanismName) = scramCredential(mechanism)
-
-    scramCredentialMap.toMap
-  }
-
-  /**
-   * @param token
-   */
-  def updateToken(token: DelegationToken): Unit = {
-    val hmacString = token.hmacAsBase64String
-    val scramCredentialMap = prepareScramCredentials(hmacString)
-    tokenCache.updateCache(token, scramCredentialMap.asJava)
-  }
-
-  def getDelegationToken(tokenInfo: TokenInformation): DelegationToken = {
-    val hmac = createHmac(tokenInfo.tokenId, secretKey)
-    new DelegationToken(tokenInfo, hmac)
-  }
-
-  /**
-   *
-   * @param tokenId
-   */
-  def removeToken(tokenId: String): Unit = {
-    tokenCache.removeCache(tokenId)
-  }
-
-  def getTokens(filterToken: TokenInformation => Boolean): 
util.List[DelegationToken] = {
-    tokenCache.tokens.stream().filter(token => 
filterToken(token)).map(getDelegationToken).collect(Collectors.toList[DelegationToken])
-  }
-}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index b229f4e63a5..1d482344a43 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -60,9 +60,10 @@ import org.apache.kafka.common.{Node, TopicIdPartition, 
TopicPartition, Uuid}
 import org.apache.kafka.coordinator.group.{Group, GroupConfig, 
GroupConfigManager, GroupCoordinator}
 import org.apache.kafka.coordinator.share.ShareCoordinator
 import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
-import org.apache.kafka.server.{ClientMetricsManager, ProcessRole}
+import org.apache.kafka.server.{ClientMetricsManager, DelegationTokenManager, 
ProcessRole}
 import org.apache.kafka.server.authorizer._
 import org.apache.kafka.server.common.{GroupVersion, RequestLocal, 
TransactionVersion}
+import org.apache.kafka.server.config.DelegationTokenManagerConfigs
 import org.apache.kafka.server.share.context.ShareFetchContext
 import org.apache.kafka.server.share.{ErroneousAndValidPartitionData, 
SharePartitionKey}
 import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
@@ -2201,7 +2202,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           new ExpireDelegationTokenResponseData()
               .setThrottleTimeMs(requestThrottleMs)
               .setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code)
-              .setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp)))
+              .setExpiryTimestampMs(DelegationTokenManager.ERROR_TIMESTAMP)))
     } else {
       forwardToController(request)
     }
@@ -2214,7 +2215,7 @@ class KafkaApis(val requestChannel: RequestChannel,
           new RenewDelegationTokenResponseData()
             .setThrottleTimeMs(requestThrottleMs)
             .setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code)
-            .setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp)))
+            .setExpiryTimestampMs(DelegationTokenManager.ERROR_TIMESTAMP)))
     } else {
       forwardToController(request)
     }
@@ -2233,7 +2234,7 @@ class KafkaApis(val requestChannel: RequestChannel,
 
     if (!allowTokenRequests(request))
       sendResponseCallback(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED, 
Collections.emptyList)
-    else if (!config.tokenAuthEnabled)
+    else if (!new DelegationTokenManagerConfigs(config).tokenAuthEnabled)
       sendResponseCallback(Errors.DELEGATION_TOKEN_AUTH_DISABLED, 
Collections.emptyList)
     else {
       val requestPrincipal = request.context.principal
@@ -2242,10 +2243,10 @@ class KafkaApis(val requestChannel: RequestChannel,
         sendResponseCallback(Errors.NONE, Collections.emptyList)
       }
       else {
-        val owners = if (describeTokenRequest.data.owners == null)
-          None
+        val owners: Optional[util.List[KafkaPrincipal]] = if 
(describeTokenRequest.data.owners == null)
+          Optional.empty()
         else
-          Some(describeTokenRequest.data.owners.asScala.map(p => new 
KafkaPrincipal(p.principalType(), p.principalName)).toList)
+          Optional.of(describeTokenRequest.data.owners.stream().map(p => new 
KafkaPrincipal(p.principalType(), p.principalName)).toList)
         def authorizeToken(tokenId: String) = 
authHelper.authorize(request.context, DESCRIBE, DELEGATION_TOKEN, tokenId)
         def authorizeRequester(owner: KafkaPrincipal) = 
authHelper.authorize(request.context, DESCRIBE_TOKENS, USER, owner.toString)
         def eligible(token: TokenInformation) = DelegationTokenManager
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 87bfd9cd21f..962488bc140 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -44,7 +44,7 @@ import org.apache.kafka.security.authorizer.AuthorizerUtils
 import org.apache.kafka.server.ProcessRole
 import org.apache.kafka.server.authorizer.Authorizer
 import org.apache.kafka.server.common.MetadataVersion
-import org.apache.kafka.server.config.{AbstractKafkaConfig, 
DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs, 
ServerConfigs, ServerLogConfigs}
+import org.apache.kafka.server.config.{AbstractKafkaConfig, KRaftConfigs, 
QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
 import org.apache.kafka.server.metrics.MetricConfigs
 import org.apache.kafka.server.util.Csv
@@ -429,13 +429,6 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   def interBrokerSecurityProtocol = 
getInterBrokerListenerNameAndSecurityProtocol._2
   def saslMechanismInterBrokerProtocol = 
getString(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG)
 
-  /** ********* DelegationToken Configuration **************/
-  val delegationTokenSecretKey = 
getPassword(DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG)
-  val tokenAuthEnabled = delegationTokenSecretKey != null && 
delegationTokenSecretKey.value.nonEmpty
-  val delegationTokenMaxLifeMs = 
getLong(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFETIME_CONFIG)
-  val delegationTokenExpiryTimeMs = 
getLong(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_CONFIG)
-  val delegationTokenExpiryCheckIntervalMs = 
getLong(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_CONFIG)
-
   /** ********* Fetch Configuration **************/
   val maxIncrementalFetchSessionCacheSlots = 
getInt(ServerConfigs.MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG)
   val fetchMaxBytes = getInt(ServerConfigs.FETCH_MAX_BYTES_CONFIG)
diff --git 
a/core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala 
b/core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala
index 34e14442b4d..0e12c34b3c5 100644
--- a/core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala
@@ -17,11 +17,11 @@
 
 package kafka.server.metadata
 
-import kafka.server.DelegationTokenManager
 import kafka.server.KafkaConfig
 import kafka.utils.Logging
 import org.apache.kafka.image.loader.LoaderManifest
 import org.apache.kafka.image.{MetadataDelta, MetadataImage}
+import org.apache.kafka.server.DelegationTokenManager
 import org.apache.kafka.server.fault.FaultHandler
 
 
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 1f6c3755131..f6819ac8080 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1173,14 +1173,6 @@ class KafkaConfigTest {
     assertEquals(123L, config.logFlushIntervalMs)
     assertEquals(CompressionType.SNAPPY, 
config.groupCoordinatorConfig.offsetTopicCompressionType)
     assertEquals(Sensor.RecordingLevel.DEBUG.toString, 
config.metricRecordingLevel)
-    assertEquals(false, config.tokenAuthEnabled)
-    assertEquals(7 * 24 * 60L * 60L * 1000L, config.delegationTokenMaxLifeMs)
-    assertEquals(24 * 60L * 60L * 1000L, config.delegationTokenExpiryTimeMs)
-    assertEquals(1 * 60L * 1000L * 60, 
config.delegationTokenExpiryCheckIntervalMs)
-
-    
defaults.setProperty(DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG,
 "1234567890")
-    val config1 = KafkaConfig.fromProps(defaults)
-    assertEquals(true, config1.tokenAuthEnabled)
   }
 
   @Test
diff --git 
a/server/src/main/java/org/apache/kafka/server/DelegationTokenManager.java 
b/server/src/main/java/org/apache/kafka/server/DelegationTokenManager.java
new file mode 100644
index 00000000000..54832fbd502
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/DelegationTokenManager.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.scram.ScramCredential;
+import org.apache.kafka.common.security.scram.internals.ScramFormatter;
+import org.apache.kafka.common.security.scram.internals.ScramMechanism;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.server.config.DelegationTokenManagerConfigs;
+
+import java.nio.charset.StandardCharsets;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import javax.crypto.Mac;
+import javax.crypto.SecretKey;
+import javax.crypto.spec.SecretKeySpec;
+
+public class DelegationTokenManager {
+    private static final String DEFAULT_HMAC_ALGORITHM = "HmacSHA512";
+    public static final long ERROR_TIMESTAMP = -1;
+
+    private final DelegationTokenCache tokenCache;
+    private final SecretKey secretKey;
+
+    public DelegationTokenManager(DelegationTokenManagerConfigs config, 
DelegationTokenCache tokenCache) {
+        this.tokenCache = tokenCache;
+
+        byte[] keyBytes = config.tokenAuthEnabled() ? 
config.delegationTokenSecretKey().value().getBytes(StandardCharsets.UTF_8) : 
null;
+        if (keyBytes == null || keyBytes.length == 0) {
+            this.secretKey = null;
+        } else {
+            this.secretKey = createSecretKey(keyBytes);
+        }
+    }
+
+    private static SecretKey createSecretKey(byte[] keyBytes) {
+        return new SecretKeySpec(keyBytes, DEFAULT_HMAC_ALGORITHM);
+    }
+
+    public static byte[] createHmac(String tokenId, SecretKey secretKey) {
+        try {
+            Mac mac = Mac.getInstance(DEFAULT_HMAC_ALGORITHM);
+            mac.init(secretKey);
+            return mac.doFinal(tokenId.getBytes(StandardCharsets.UTF_8));
+        } catch (InvalidKeyException e) {
+            throw new IllegalArgumentException("Invalid key to HMAC 
computation", e);
+        } catch (Exception e) {
+            throw new RuntimeException("Error while creating HMAC", e);
+        }
+    }
+
+    private Map<String, ScramCredential> prepareScramCredentials(String 
hmacString) throws NoSuchAlgorithmException {
+        Map<String, ScramCredential> scramCredentialMap = new HashMap<>();
+        for (ScramMechanism mechanism : ScramMechanism.values()) {
+            ScramFormatter formatter = new ScramFormatter(mechanism);
+            scramCredentialMap.put(mechanism.mechanismName(), 
formatter.generateCredential(hmacString, mechanism.minIterations()));
+        }
+        return scramCredentialMap;
+    }
+
+    public void updateToken(DelegationToken token) throws 
NoSuchAlgorithmException {
+        String hmacString = token.hmacAsBase64String();
+        Map<String, ScramCredential> scramCredentialMap = 
prepareScramCredentials(hmacString);
+        tokenCache.updateCache(token, scramCredentialMap);
+    }
+
+    public DelegationToken getDelegationToken(TokenInformation tokenInfo) {
+        byte[] hmac = createHmac(tokenInfo.tokenId(), secretKey);
+        return new DelegationToken(tokenInfo, hmac);
+    }
+
+    public void removeToken(String tokenId) {
+        tokenCache.removeCache(tokenId);
+    }
+
+    public List<DelegationToken> getTokens(Predicate<TokenInformation> 
filterToken) {
+        return tokenCache.tokens().stream()
+            .filter(filterToken)
+            .map(this::getDelegationToken)
+            .toList();
+    }
+
+    public static boolean filterToken(
+        KafkaPrincipal requesterPrincipal,
+        Optional<List<KafkaPrincipal>> owners,
+        TokenInformation token,
+        Function<String, Boolean> authorizeToken,
+        Function<KafkaPrincipal, Boolean> authorizeRequester
+    ) {
+        if (owners.isPresent() && 
owners.get().stream().noneMatch(token::ownerOrRenewer)) {
+            //exclude tokens which are not requested
+            return false;
+        } else if (token.ownerOrRenewer(requesterPrincipal)) {
+            //Owners and the renewers can describe their own tokens
+            return true;
+        } else {
+            // Check permission for non-owned tokens
+            return authorizeToken.apply(token.tokenId()) || 
authorizeRequester.apply(token.owner());
+        }
+    }
+}
diff --git 
a/server/src/main/java/org/apache/kafka/server/config/DelegationTokenManagerConfigs.java
 
b/server/src/main/java/org/apache/kafka/server/config/DelegationTokenManagerConfigs.java
index b07e76326b5..d94426d1835 100644
--- 
a/server/src/main/java/org/apache/kafka/server/config/DelegationTokenManagerConfigs.java
+++ 
b/server/src/main/java/org/apache/kafka/server/config/DelegationTokenManagerConfigs.java
@@ -16,7 +16,9 @@
  */
 package org.apache.kafka.server.config;
 
+import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.types.Password;
 
 import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
 import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
@@ -49,4 +51,38 @@ public class DelegationTokenManagerConfigs {
             
.define(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFETIME_CONFIG, 
LONG, DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT, 
atLeast(1), MEDIUM, 
DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_DOC)
             
.define(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_CONFIG, 
LONG, DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_DEFAULT, 
atLeast(1), MEDIUM, 
DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_DOC)
             
.define(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_CONFIG,
 LONG, 
DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_DEFAULT,
 atLeast(1), LOW, 
DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_DOC);
+
+    private final Password delegationTokenSecretKey;
+    private final boolean tokenAuthEnabled;
+    private final long delegationTokenMaxLifeMs;
+    private final long delegationTokenExpiryTimeMs;
+    private final long delegationTokenExpiryCheckIntervalMs;
+
+    public DelegationTokenManagerConfigs(AbstractConfig config) {
+        this.delegationTokenSecretKey = 
config.getPassword(DELEGATION_TOKEN_SECRET_KEY_CONFIG);
+        this.tokenAuthEnabled = delegationTokenSecretKey != null && 
!delegationTokenSecretKey.value().isEmpty();
+        this.delegationTokenMaxLifeMs = 
config.getLong(DELEGATION_TOKEN_MAX_LIFETIME_CONFIG);
+        this.delegationTokenExpiryTimeMs = 
config.getLong(DELEGATION_TOKEN_EXPIRY_TIME_MS_CONFIG);
+        this.delegationTokenExpiryCheckIntervalMs = 
config.getLong(DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_CONFIG);
+    }
+
+    public Password delegationTokenSecretKey() {
+        return delegationTokenSecretKey;
+    }
+
+    public boolean tokenAuthEnabled() {
+        return tokenAuthEnabled;
+    }
+
+    public long delegationTokenMaxLifeMs() {
+        return delegationTokenMaxLifeMs;
+    }
+
+    public long delegationTokenExpiryTimeMs() {
+        return delegationTokenExpiryTimeMs;
+    }
+
+    public long delegationTokenExpiryCheckIntervalMs() {
+        return delegationTokenExpiryCheckIntervalMs;
+    }
 }
diff --git 
a/server/src/test/java/org/apache/kafka/server/config/DelegationTokenManagerConfigsTest.java
 
b/server/src/test/java/org/apache/kafka/server/config/DelegationTokenManagerConfigsTest.java
new file mode 100644
index 00000000000..4b20952b3ad
--- /dev/null
+++ 
b/server/src/test/java/org/apache/kafka/server/config/DelegationTokenManagerConfigsTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.types.Password;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Map;
+
+import static 
org.apache.kafka.server.config.DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_CONFIG;
+import static 
org.apache.kafka.server.config.DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_CONFIG;
+import static 
org.apache.kafka.server.config.DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFETIME_CONFIG;
+import static 
org.apache.kafka.server.config.DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class DelegationTokenManagerConfigsTest {
+    @Test
+    void testDefaults() {
+        DelegationTokenManagerConfigs config = new 
DelegationTokenManagerConfigs(new 
AbstractConfig(DelegationTokenManagerConfigs.CONFIG_DEF, Map.of()));
+        assertNull(config.delegationTokenSecretKey());
+        assertFalse(config.tokenAuthEnabled());
+        
assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT,
 config.delegationTokenMaxLifeMs());
+        
assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_DEFAULT,
 config.delegationTokenExpiryTimeMs());
+        
assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_DEFAULT,
 config.delegationTokenExpiryCheckIntervalMs());
+    }
+
+    @Test
+    void testOverride() {
+        DelegationTokenManagerConfigs config = new 
DelegationTokenManagerConfigs(
+            new AbstractConfig(DelegationTokenManagerConfigs.CONFIG_DEF,
+                Map.of(
+                    DELEGATION_TOKEN_SECRET_KEY_CONFIG, "test",
+                    DELEGATION_TOKEN_MAX_LIFETIME_CONFIG, "500",
+                    DELEGATION_TOKEN_EXPIRY_TIME_MS_CONFIG, "200",
+                    DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_CONFIG, "100"
+                )
+            )
+        );
+        assertEquals(new Password("test"), config.delegationTokenSecretKey());
+        assertTrue(config.tokenAuthEnabled());
+        assertEquals(500L, config.delegationTokenMaxLifeMs());
+        assertEquals(200L, config.delegationTokenExpiryTimeMs());
+        assertEquals(100L, config.delegationTokenExpiryCheckIntervalMs());
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {
+        DELEGATION_TOKEN_MAX_LIFETIME_CONFIG,
+        DELEGATION_TOKEN_EXPIRY_TIME_MS_CONFIG,
+        DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_CONFIG
+    })
+    void testInvalidProperty(String field) {
+        assertThrows(Exception.class, () -> new DelegationTokenManagerConfigs(
+            new AbstractConfig(DelegationTokenManagerConfigs.CONFIG_DEF, 
Map.of(field, "not_a_number"))));
+    }
+}


Reply via email to