This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 8827ce47010 KAFKA-19113: Migrate DelegationTokenManager to server
module (#19424)
8827ce47010 is described below
commit 8827ce47010e1a6cb35119be2724dd47bf9c1ddb
Author: PoAn Yang <[email protected]>
AuthorDate: Mon Apr 14 22:49:45 2025 +0800
KAFKA-19113: Migrate DelegationTokenManager to server module (#19424)
1. Migrate DelegationTokenManager to server module.
2. Rewrite DelegationTokenManager in Java.
3. Move DelegationTokenManagerConfigs out of KafkaConfig.
Reviewers: Mickael Maison <[email protected]>
---
checkstyle/import-control-server.xml | 1 +
.../kafka/server/builders/KafkaApisBuilder.java | 2 +-
.../src/main/scala/kafka/server/BrokerServer.scala | 6 +-
.../main/scala/kafka/server/ControllerApis.scala | 6 +-
.../main/scala/kafka/server/ControllerServer.scala | 17 +--
.../kafka/server/DelegationTokenManager.scala | 143 ---------------------
core/src/main/scala/kafka/server/KafkaApis.scala | 15 ++-
core/src/main/scala/kafka/server/KafkaConfig.scala | 9 +-
.../server/metadata/DelegationTokenPublisher.scala | 2 +-
.../scala/unit/kafka/server/KafkaConfigTest.scala | 8 --
.../kafka/server/DelegationTokenManager.java | 125 ++++++++++++++++++
.../config/DelegationTokenManagerConfigs.java | 36 ++++++
.../config/DelegationTokenManagerConfigsTest.java | 78 +++++++++++
13 files changed, 266 insertions(+), 182 deletions(-)
diff --git a/checkstyle/import-control-server.xml
b/checkstyle/import-control-server.xml
index b45b3c41c27..a35719e8761 100644
--- a/checkstyle/import-control-server.xml
+++ b/checkstyle/import-control-server.xml
@@ -82,6 +82,7 @@
<allow pkg="org.apache.kafka.raft" />
<subpackage name="server">
+ <allow pkg="javax.crypto" />
<allow pkg="org.apache.kafka.server" />
<allow pkg="org.apache.kafka.image" />
<allow pkg="org.apache.kafka.storage.internals.log" />
diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
index 86f8458eef1..8724fc3cb8e 100644
--- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
+++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
@@ -21,7 +21,6 @@ import kafka.coordinator.transaction.TransactionCoordinator;
import kafka.network.RequestChannel;
import kafka.server.ApiVersionManager;
import kafka.server.AutoTopicCreationManager;
-import kafka.server.DelegationTokenManager;
import kafka.server.FetchManager;
import kafka.server.ForwardingManager;
import kafka.server.KafkaApis;
@@ -38,6 +37,7 @@ import org.apache.kafka.coordinator.share.ShareCoordinator;
import org.apache.kafka.metadata.ConfigRepository;
import org.apache.kafka.metadata.MetadataCache;
import org.apache.kafka.server.ClientMetricsManager;
+import org.apache.kafka.server.DelegationTokenManager;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 5f52e6abdcb..398c085d0d5 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -46,7 +46,7 @@ import org.apache.kafka.metadata.publisher.AclPublisher
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion,
DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
-import org.apache.kafka.server.config.ConfigType
+import org.apache.kafka.server.config.{ConfigType,
DelegationTokenManagerConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.{ClientMetricsReceiverPlugin,
KafkaYammerMetrics}
import org.apache.kafka.server.network.{EndpointReadyFutures,
KafkaAuthorizerServerInfo}
@@ -54,7 +54,7 @@ import
org.apache.kafka.server.share.persister.{DefaultStatePersister, NoOpState
import org.apache.kafka.server.share.session.ShareSessionCache
import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper}
import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
-import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures,
ClientMetricsManager, DelayedActionQueue, ProcessRole}
+import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures,
ClientMetricsManager, DelayedActionQueue, DelegationTokenManager, ProcessRole}
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@@ -355,7 +355,7 @@ class BrokerServer(
)
/* start token manager */
- tokenManager = new DelegationTokenManager(config, tokenCache, time)
+ tokenManager = new DelegationTokenManager(new
DelegationTokenManagerConfigs(config), tokenCache)
/* initializing the groupConfigManager */
groupConfigManager = new
GroupConfigManager(config.groupCoordinatorConfig.extractGroupConfigMap(config.shareGroupConfig))
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala
b/core/src/main/scala/kafka/server/ControllerApis.scala
index b294ddd3108..fbcb0e8572d 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -56,7 +56,7 @@ import
org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
import org.apache.kafka.metadata.{BrokerHeartbeatReply,
BrokerRegistrationReply}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.server.ProcessRole
+import org.apache.kafka.server.{DelegationTokenManager, ProcessRole}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion, RequestLocal}
@@ -970,7 +970,7 @@ class ControllerApis(
new RenewDelegationTokenResponseData()
.setThrottleTimeMs(requestThrottleMs)
.setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code)
- .setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp)))
+ .setExpiryTimestampMs(DelegationTokenManager.ERROR_TIMESTAMP)))
CompletableFuture.completedFuture[Unit](())
} else {
val context = new ControllerRequestContext(
@@ -994,7 +994,7 @@ class ControllerApis(
new ExpireDelegationTokenResponseData()
.setThrottleTimeMs(requestThrottleMs)
.setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code)
- .setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp)))
+ .setExpiryTimestampMs(DelegationTokenManager.ERROR_TIMESTAMP)))
CompletableFuture.completedFuture[Unit](())
} else {
val context = new ControllerRequestContext(
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 3ddf97d2705..c2826ec8bfc 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -40,11 +40,11 @@ import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher}
import org.apache.kafka.raft.QuorumConfig
import org.apache.kafka.security.CredentialProvider
-import org.apache.kafka.server.ProcessRole
+import org.apache.kafka.server.{DelegationTokenManager, ProcessRole}
import org.apache.kafka.server.authorizer.Authorizer
import
org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG,
CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion,
NodeToControllerChannelManager}
-import org.apache.kafka.server.config.ConfigType
+import org.apache.kafka.server.config.{ConfigType,
DelegationTokenManagerConfigs}
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics,
LinuxIoMetricsCollector}
import org.apache.kafka.server.network.{EndpointReadyFutures,
KafkaAuthorizerServerInfo}
import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
@@ -206,9 +206,10 @@ class ControllerServer(
QuorumFeatures.defaultSupportedFeatureMap(config.unstableFeatureVersionsEnabled),
controllerNodes.asScala.map(node => Integer.valueOf(node.id())).asJava)
+ val delegationTokenManagerConfigs = new
DelegationTokenManagerConfigs(config)
val delegationTokenKeyString = {
- if (config.tokenAuthEnabled) {
- config.delegationTokenSecretKey.value
+ if (delegationTokenManagerConfigs.tokenAuthEnabled) {
+ delegationTokenManagerConfigs.delegationTokenSecretKey.value
} else {
null
}
@@ -247,9 +248,9 @@ class ControllerServer(
setNonFatalFaultHandler(sharedServer.nonFatalQuorumControllerFaultHandler).
setDelegationTokenCache(tokenCache).
setDelegationTokenSecretKey(delegationTokenKeyString).
- setDelegationTokenMaxLifeMs(config.delegationTokenMaxLifeMs).
- setDelegationTokenExpiryTimeMs(config.delegationTokenExpiryTimeMs).
-
setDelegationTokenExpiryCheckIntervalMs(config.delegationTokenExpiryCheckIntervalMs).
+
setDelegationTokenMaxLifeMs(delegationTokenManagerConfigs.delegationTokenMaxLifeMs).
+
setDelegationTokenExpiryTimeMs(delegationTokenManagerConfigs.delegationTokenExpiryTimeMs).
+
setDelegationTokenExpiryCheckIntervalMs(delegationTokenManagerConfigs.delegationTokenExpiryCheckIntervalMs).
setUncleanLeaderElectionCheckIntervalMs(config.uncleanLeaderElectionCheckIntervalMs).
setInterBrokerListenerName(config.interBrokerListenerName.value()).
setControllerPerformanceSamplePeriodMs(config.controllerPerformanceSamplePeriodMs).
@@ -361,7 +362,7 @@ class ControllerServer(
config,
sharedServer.metadataPublishingFaultHandler,
"controller",
- new DelegationTokenManager(config, tokenCache, time)
+ new DelegationTokenManager(delegationTokenManagerConfigs, tokenCache)
))
// Set up the metrics publisher.
diff --git a/core/src/main/scala/kafka/server/DelegationTokenManager.scala
b/core/src/main/scala/kafka/server/DelegationTokenManager.scala
deleted file mode 100644
index b74e57999fb..00000000000
--- a/core/src/main/scala/kafka/server/DelegationTokenManager.scala
+++ /dev/null
@@ -1,143 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server
-
-import java.nio.charset.StandardCharsets
-import java.security.InvalidKeyException
-import javax.crypto.spec.SecretKeySpec
-import javax.crypto.{Mac, SecretKey}
-import kafka.utils.Logging
-import org.apache.kafka.common.security.auth.KafkaPrincipal
-import org.apache.kafka.common.security.scram.internals.{ScramFormatter,
ScramMechanism}
-import org.apache.kafka.common.security.scram.ScramCredential
-import
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
-import org.apache.kafka.common.security.token.delegation.{DelegationToken,
TokenInformation}
-import org.apache.kafka.common.utils.Time
-
-import java.util
-import java.util.stream.Collectors
-import scala.jdk.CollectionConverters._
-import scala.collection.mutable
-
-object DelegationTokenManager {
- private val DefaultHmacAlgorithm = "HmacSHA512"
- val ErrorTimestamp = -1
-
- /**
- * Convert the byte[] to a secret key
- * @param keybytes the byte[] to create the secret key from
- * @return the secret key
- */
- private def createSecretKey(keybytes: Array[Byte]) : SecretKey = {
- new SecretKeySpec(keybytes, DefaultHmacAlgorithm)
- }
-
- /**
- * Compute HMAC of the identifier using the secret key
- * @param tokenId the bytes of the identifier
- * @param secretKey the secret key
- * @return String of the generated hmac
- */
- def createHmac(tokenId: String, secretKey: SecretKey) : Array[Byte] = {
- val mac = Mac.getInstance(DefaultHmacAlgorithm)
- try
- mac.init(secretKey)
- catch {
- case ike: InvalidKeyException => throw new
IllegalArgumentException("Invalid key to HMAC computation", ike)
- }
- mac.doFinal(tokenId.getBytes(StandardCharsets.UTF_8))
- }
-
- def filterToken(requesterPrincipal: KafkaPrincipal, owners :
Option[List[KafkaPrincipal]], token: TokenInformation,
- authorizeToken: String => Boolean, authorizeRequester:
KafkaPrincipal => Boolean) : Boolean = {
-
- val allow =
- //exclude tokens which are not requested
- if (owners.isDefined && !owners.get.exists(owner =>
token.ownerOrRenewer(owner))) {
- false
- //Owners and the renewers can describe their own tokens
- } else if (token.ownerOrRenewer(requesterPrincipal)) {
- true
- // Check permission for non-owned tokens
- } else if (authorizeToken(token.tokenId) ||
authorizeRequester(token.owner)) {
- true
- }
- else {
- false
- }
-
- allow
- }
-}
-
-class DelegationTokenManager(val config: KafkaConfig,
- val tokenCache: DelegationTokenCache,
- val time: Time) extends Logging {
- this.logIdent = s"[Token Manager on Node ${config.brokerId}]: "
-
- import DelegationTokenManager._
-
- val secretKey: SecretKey = {
- val keyBytes = if (config.tokenAuthEnabled)
config.delegationTokenSecretKey.value.getBytes(StandardCharsets.UTF_8) else null
- if (keyBytes == null || keyBytes.isEmpty) null
- else
- createSecretKey(keyBytes)
- }
-
- /**
- * @param hmacString
- */
- private def prepareScramCredentials(hmacString: String) : Map[String,
ScramCredential] = {
- val scramCredentialMap = mutable.Map[String, ScramCredential]()
-
- def scramCredential(mechanism: ScramMechanism): ScramCredential = {
- new ScramFormatter(mechanism).generateCredential(hmacString,
mechanism.minIterations)
- }
-
- for (mechanism <- ScramMechanism.values)
- scramCredentialMap(mechanism.mechanismName) = scramCredential(mechanism)
-
- scramCredentialMap.toMap
- }
-
- /**
- * @param token
- */
- def updateToken(token: DelegationToken): Unit = {
- val hmacString = token.hmacAsBase64String
- val scramCredentialMap = prepareScramCredentials(hmacString)
- tokenCache.updateCache(token, scramCredentialMap.asJava)
- }
-
- def getDelegationToken(tokenInfo: TokenInformation): DelegationToken = {
- val hmac = createHmac(tokenInfo.tokenId, secretKey)
- new DelegationToken(tokenInfo, hmac)
- }
-
- /**
- *
- * @param tokenId
- */
- def removeToken(tokenId: String): Unit = {
- tokenCache.removeCache(tokenId)
- }
-
- def getTokens(filterToken: TokenInformation => Boolean):
util.List[DelegationToken] = {
- tokenCache.tokens.stream().filter(token =>
filterToken(token)).map(getDelegationToken).collect(Collectors.toList[DelegationToken])
- }
-}
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index b229f4e63a5..1d482344a43 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -60,9 +60,10 @@ import org.apache.kafka.common.{Node, TopicIdPartition,
TopicPartition, Uuid}
import org.apache.kafka.coordinator.group.{Group, GroupConfig,
GroupConfigManager, GroupCoordinator}
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
-import org.apache.kafka.server.{ClientMetricsManager, ProcessRole}
+import org.apache.kafka.server.{ClientMetricsManager, DelegationTokenManager,
ProcessRole}
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.{GroupVersion, RequestLocal,
TransactionVersion}
+import org.apache.kafka.server.config.DelegationTokenManagerConfigs
import org.apache.kafka.server.share.context.ShareFetchContext
import org.apache.kafka.server.share.{ErroneousAndValidPartitionData,
SharePartitionKey}
import org.apache.kafka.server.share.acknowledge.ShareAcknowledgementBatch
@@ -2201,7 +2202,7 @@ class KafkaApis(val requestChannel: RequestChannel,
new ExpireDelegationTokenResponseData()
.setThrottleTimeMs(requestThrottleMs)
.setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code)
- .setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp)))
+ .setExpiryTimestampMs(DelegationTokenManager.ERROR_TIMESTAMP)))
} else {
forwardToController(request)
}
@@ -2214,7 +2215,7 @@ class KafkaApis(val requestChannel: RequestChannel,
new RenewDelegationTokenResponseData()
.setThrottleTimeMs(requestThrottleMs)
.setErrorCode(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED.code)
- .setExpiryTimestampMs(DelegationTokenManager.ErrorTimestamp)))
+ .setExpiryTimestampMs(DelegationTokenManager.ERROR_TIMESTAMP)))
} else {
forwardToController(request)
}
@@ -2233,7 +2234,7 @@ class KafkaApis(val requestChannel: RequestChannel,
if (!allowTokenRequests(request))
sendResponseCallback(Errors.DELEGATION_TOKEN_REQUEST_NOT_ALLOWED,
Collections.emptyList)
- else if (!config.tokenAuthEnabled)
+ else if (!new DelegationTokenManagerConfigs(config).tokenAuthEnabled)
sendResponseCallback(Errors.DELEGATION_TOKEN_AUTH_DISABLED,
Collections.emptyList)
else {
val requestPrincipal = request.context.principal
@@ -2242,10 +2243,10 @@ class KafkaApis(val requestChannel: RequestChannel,
sendResponseCallback(Errors.NONE, Collections.emptyList)
}
else {
- val owners = if (describeTokenRequest.data.owners == null)
- None
+ val owners: Optional[util.List[KafkaPrincipal]] = if
(describeTokenRequest.data.owners == null)
+ Optional.empty()
else
- Some(describeTokenRequest.data.owners.asScala.map(p => new
KafkaPrincipal(p.principalType(), p.principalName)).toList)
+ Optional.of(describeTokenRequest.data.owners.stream().map(p => new
KafkaPrincipal(p.principalType(), p.principalName)).toList)
def authorizeToken(tokenId: String) =
authHelper.authorize(request.context, DESCRIBE, DELEGATION_TOKEN, tokenId)
def authorizeRequester(owner: KafkaPrincipal) =
authHelper.authorize(request.context, DESCRIBE_TOKENS, USER, owner.toString)
def eligible(token: TokenInformation) = DelegationTokenManager
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 87bfd9cd21f..962488bc140 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -44,7 +44,7 @@ import org.apache.kafka.security.authorizer.AuthorizerUtils
import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.MetadataVersion
-import org.apache.kafka.server.config.{AbstractKafkaConfig,
DelegationTokenManagerConfigs, KRaftConfigs, QuotaConfig, ReplicationConfigs,
ServerConfigs, ServerLogConfigs}
+import org.apache.kafka.server.config.{AbstractKafkaConfig, KRaftConfigs,
QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
import org.apache.kafka.server.metrics.MetricConfigs
import org.apache.kafka.server.util.Csv
@@ -429,13 +429,6 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
def interBrokerSecurityProtocol =
getInterBrokerListenerNameAndSecurityProtocol._2
def saslMechanismInterBrokerProtocol =
getString(BrokerSecurityConfigs.SASL_MECHANISM_INTER_BROKER_PROTOCOL_CONFIG)
- /** ********* DelegationToken Configuration **************/
- val delegationTokenSecretKey =
getPassword(DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG)
- val tokenAuthEnabled = delegationTokenSecretKey != null &&
delegationTokenSecretKey.value.nonEmpty
- val delegationTokenMaxLifeMs =
getLong(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFETIME_CONFIG)
- val delegationTokenExpiryTimeMs =
getLong(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_CONFIG)
- val delegationTokenExpiryCheckIntervalMs =
getLong(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_CONFIG)
-
/** ********* Fetch Configuration **************/
val maxIncrementalFetchSessionCacheSlots =
getInt(ServerConfigs.MAX_INCREMENTAL_FETCH_SESSION_CACHE_SLOTS_CONFIG)
val fetchMaxBytes = getInt(ServerConfigs.FETCH_MAX_BYTES_CONFIG)
diff --git
a/core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala
b/core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala
index 34e14442b4d..0e12c34b3c5 100644
--- a/core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala
@@ -17,11 +17,11 @@
package kafka.server.metadata
-import kafka.server.DelegationTokenManager
import kafka.server.KafkaConfig
import kafka.utils.Logging
import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.{MetadataDelta, MetadataImage}
+import org.apache.kafka.server.DelegationTokenManager
import org.apache.kafka.server.fault.FaultHandler
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 1f6c3755131..f6819ac8080 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -1173,14 +1173,6 @@ class KafkaConfigTest {
assertEquals(123L, config.logFlushIntervalMs)
assertEquals(CompressionType.SNAPPY,
config.groupCoordinatorConfig.offsetTopicCompressionType)
assertEquals(Sensor.RecordingLevel.DEBUG.toString,
config.metricRecordingLevel)
- assertEquals(false, config.tokenAuthEnabled)
- assertEquals(7 * 24 * 60L * 60L * 1000L, config.delegationTokenMaxLifeMs)
- assertEquals(24 * 60L * 60L * 1000L, config.delegationTokenExpiryTimeMs)
- assertEquals(1 * 60L * 1000L * 60,
config.delegationTokenExpiryCheckIntervalMs)
-
-
defaults.setProperty(DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG,
"1234567890")
- val config1 = KafkaConfig.fromProps(defaults)
- assertEquals(true, config1.tokenAuthEnabled)
}
@Test
diff --git
a/server/src/main/java/org/apache/kafka/server/DelegationTokenManager.java
b/server/src/main/java/org/apache/kafka/server/DelegationTokenManager.java
new file mode 100644
index 00000000000..54832fbd502
--- /dev/null
+++ b/server/src/main/java/org/apache/kafka/server/DelegationTokenManager.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server;
+
+import org.apache.kafka.common.security.auth.KafkaPrincipal;
+import org.apache.kafka.common.security.scram.ScramCredential;
+import org.apache.kafka.common.security.scram.internals.ScramFormatter;
+import org.apache.kafka.common.security.scram.internals.ScramMechanism;
+import org.apache.kafka.common.security.token.delegation.DelegationToken;
+import org.apache.kafka.common.security.token.delegation.TokenInformation;
+import
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache;
+import org.apache.kafka.server.config.DelegationTokenManagerConfigs;
+
+import java.nio.charset.StandardCharsets;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Predicate;
+
+import javax.crypto.Mac;
+import javax.crypto.SecretKey;
+import javax.crypto.spec.SecretKeySpec;
+
+public class DelegationTokenManager {
+ private static final String DEFAULT_HMAC_ALGORITHM = "HmacSHA512";
+ public static final long ERROR_TIMESTAMP = -1;
+
+ private final DelegationTokenCache tokenCache;
+ private final SecretKey secretKey;
+
+ public DelegationTokenManager(DelegationTokenManagerConfigs config,
DelegationTokenCache tokenCache) {
+ this.tokenCache = tokenCache;
+
+ byte[] keyBytes = config.tokenAuthEnabled() ?
config.delegationTokenSecretKey().value().getBytes(StandardCharsets.UTF_8) :
null;
+ if (keyBytes == null || keyBytes.length == 0) {
+ this.secretKey = null;
+ } else {
+ this.secretKey = createSecretKey(keyBytes);
+ }
+ }
+
+ private static SecretKey createSecretKey(byte[] keyBytes) {
+ return new SecretKeySpec(keyBytes, DEFAULT_HMAC_ALGORITHM);
+ }
+
+ public static byte[] createHmac(String tokenId, SecretKey secretKey) {
+ try {
+ Mac mac = Mac.getInstance(DEFAULT_HMAC_ALGORITHM);
+ mac.init(secretKey);
+ return mac.doFinal(tokenId.getBytes(StandardCharsets.UTF_8));
+ } catch (InvalidKeyException e) {
+ throw new IllegalArgumentException("Invalid key to HMAC
computation", e);
+ } catch (Exception e) {
+ throw new RuntimeException("Error while creating HMAC", e);
+ }
+ }
+
+ private Map<String, ScramCredential> prepareScramCredentials(String
hmacString) throws NoSuchAlgorithmException {
+ Map<String, ScramCredential> scramCredentialMap = new HashMap<>();
+ for (ScramMechanism mechanism : ScramMechanism.values()) {
+ ScramFormatter formatter = new ScramFormatter(mechanism);
+ scramCredentialMap.put(mechanism.mechanismName(),
formatter.generateCredential(hmacString, mechanism.minIterations()));
+ }
+ return scramCredentialMap;
+ }
+
+ public void updateToken(DelegationToken token) throws
NoSuchAlgorithmException {
+ String hmacString = token.hmacAsBase64String();
+ Map<String, ScramCredential> scramCredentialMap =
prepareScramCredentials(hmacString);
+ tokenCache.updateCache(token, scramCredentialMap);
+ }
+
+ public DelegationToken getDelegationToken(TokenInformation tokenInfo) {
+ byte[] hmac = createHmac(tokenInfo.tokenId(), secretKey);
+ return new DelegationToken(tokenInfo, hmac);
+ }
+
+ public void removeToken(String tokenId) {
+ tokenCache.removeCache(tokenId);
+ }
+
+ public List<DelegationToken> getTokens(Predicate<TokenInformation>
filterToken) {
+ return tokenCache.tokens().stream()
+ .filter(filterToken)
+ .map(this::getDelegationToken)
+ .toList();
+ }
+
+ public static boolean filterToken(
+ KafkaPrincipal requesterPrincipal,
+ Optional<List<KafkaPrincipal>> owners,
+ TokenInformation token,
+ Function<String, Boolean> authorizeToken,
+ Function<KafkaPrincipal, Boolean> authorizeRequester
+ ) {
+ if (owners.isPresent() &&
owners.get().stream().noneMatch(token::ownerOrRenewer)) {
+ //exclude tokens which are not requested
+ return false;
+ } else if (token.ownerOrRenewer(requesterPrincipal)) {
+ //Owners and the renewers can describe their own tokens
+ return true;
+ } else {
+ // Check permission for non-owned tokens
+ return authorizeToken.apply(token.tokenId()) ||
authorizeRequester.apply(token.owner());
+ }
+ }
+}
diff --git
a/server/src/main/java/org/apache/kafka/server/config/DelegationTokenManagerConfigs.java
b/server/src/main/java/org/apache/kafka/server/config/DelegationTokenManagerConfigs.java
index b07e76326b5..d94426d1835 100644
---
a/server/src/main/java/org/apache/kafka/server/config/DelegationTokenManagerConfigs.java
+++
b/server/src/main/java/org/apache/kafka/server/config/DelegationTokenManagerConfigs.java
@@ -16,7 +16,9 @@
*/
package org.apache.kafka.server.config;
+import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.types.Password;
import static org.apache.kafka.common.config.ConfigDef.Importance.LOW;
import static org.apache.kafka.common.config.ConfigDef.Importance.MEDIUM;
@@ -49,4 +51,38 @@ public class DelegationTokenManagerConfigs {
.define(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFETIME_CONFIG,
LONG, DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT,
atLeast(1), MEDIUM,
DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_DOC)
.define(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_CONFIG,
LONG, DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_DEFAULT,
atLeast(1), MEDIUM,
DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_DOC)
.define(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_CONFIG,
LONG,
DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_DEFAULT,
atLeast(1), LOW,
DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_DOC);
+
+ private final Password delegationTokenSecretKey;
+ private final boolean tokenAuthEnabled;
+ private final long delegationTokenMaxLifeMs;
+ private final long delegationTokenExpiryTimeMs;
+ private final long delegationTokenExpiryCheckIntervalMs;
+
+ public DelegationTokenManagerConfigs(AbstractConfig config) {
+ this.delegationTokenSecretKey =
config.getPassword(DELEGATION_TOKEN_SECRET_KEY_CONFIG);
+ this.tokenAuthEnabled = delegationTokenSecretKey != null &&
!delegationTokenSecretKey.value().isEmpty();
+ this.delegationTokenMaxLifeMs =
config.getLong(DELEGATION_TOKEN_MAX_LIFETIME_CONFIG);
+ this.delegationTokenExpiryTimeMs =
config.getLong(DELEGATION_TOKEN_EXPIRY_TIME_MS_CONFIG);
+ this.delegationTokenExpiryCheckIntervalMs =
config.getLong(DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_CONFIG);
+ }
+
+ public Password delegationTokenSecretKey() {
+ return delegationTokenSecretKey;
+ }
+
+ public boolean tokenAuthEnabled() {
+ return tokenAuthEnabled;
+ }
+
+ public long delegationTokenMaxLifeMs() {
+ return delegationTokenMaxLifeMs;
+ }
+
+ public long delegationTokenExpiryTimeMs() {
+ return delegationTokenExpiryTimeMs;
+ }
+
+ public long delegationTokenExpiryCheckIntervalMs() {
+ return delegationTokenExpiryCheckIntervalMs;
+ }
}
diff --git
a/server/src/test/java/org/apache/kafka/server/config/DelegationTokenManagerConfigsTest.java
b/server/src/test/java/org/apache/kafka/server/config/DelegationTokenManagerConfigsTest.java
new file mode 100644
index 00000000000..4b20952b3ad
--- /dev/null
+++
b/server/src/test/java/org/apache/kafka/server/config/DelegationTokenManagerConfigsTest.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.config;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.types.Password;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import java.util.Map;
+
+import static
org.apache.kafka.server.config.DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_CONFIG;
+import static
org.apache.kafka.server.config.DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_CONFIG;
+import static
org.apache.kafka.server.config.DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFETIME_CONFIG;
+import static
org.apache.kafka.server.config.DelegationTokenManagerConfigs.DELEGATION_TOKEN_SECRET_KEY_CONFIG;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class DelegationTokenManagerConfigsTest {
+ @Test
+ void testDefaults() {
+ DelegationTokenManagerConfigs config = new
DelegationTokenManagerConfigs(new
AbstractConfig(DelegationTokenManagerConfigs.CONFIG_DEF, Map.of()));
+ assertNull(config.delegationTokenSecretKey());
+ assertFalse(config.tokenAuthEnabled());
+
assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_MAX_LIFE_TIME_MS_DEFAULT,
config.delegationTokenMaxLifeMs());
+
assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_TIME_MS_DEFAULT,
config.delegationTokenExpiryTimeMs());
+
assertEquals(DelegationTokenManagerConfigs.DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_DEFAULT,
config.delegationTokenExpiryCheckIntervalMs());
+ }
+
+ @Test
+ void testOverride() {
+ DelegationTokenManagerConfigs config = new
DelegationTokenManagerConfigs(
+ new AbstractConfig(DelegationTokenManagerConfigs.CONFIG_DEF,
+ Map.of(
+ DELEGATION_TOKEN_SECRET_KEY_CONFIG, "test",
+ DELEGATION_TOKEN_MAX_LIFETIME_CONFIG, "500",
+ DELEGATION_TOKEN_EXPIRY_TIME_MS_CONFIG, "200",
+ DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_CONFIG, "100"
+ )
+ )
+ );
+ assertEquals(new Password("test"), config.delegationTokenSecretKey());
+ assertTrue(config.tokenAuthEnabled());
+ assertEquals(500L, config.delegationTokenMaxLifeMs());
+ assertEquals(200L, config.delegationTokenExpiryTimeMs());
+ assertEquals(100L, config.delegationTokenExpiryCheckIntervalMs());
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {
+ DELEGATION_TOKEN_MAX_LIFETIME_CONFIG,
+ DELEGATION_TOKEN_EXPIRY_TIME_MS_CONFIG,
+ DELEGATION_TOKEN_EXPIRY_CHECK_INTERVAL_MS_CONFIG
+ })
+ void testInvalidProperty(String field) {
+ assertThrows(Exception.class, () -> new DelegationTokenManagerConfigs(
+ new AbstractConfig(DelegationTokenManagerConfigs.CONFIG_DEF,
Map.of(field, "not_a_number"))));
+ }
+}