This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 486b991f221 KAFKA-18711 Move DelegationTokenPublisher to metadata
module (#20475)
486b991f221 is described below
commit 486b991f221dd75637402e2ff0a10d0a482e6547
Author: Maros Orsak <[email protected]>
AuthorDate: Wed Sep 24 14:19:08 2025 +0200
KAFKA-18711 Move DelegationTokenPublisher to metadata module (#20475)
Basically, one of the refactor tasks. In this PR, I have moved
`DelegationTokenPublisher` to the metadata module. Similar to the
`ScramPublisher` migration (commit feee50f476), I have moved
`DelegationTokenManager` to the server-common module, as it would
otherwise create a circular dependency. Moreover, I have made multiple
changes throughout the codebase to reference `DelegationTokenManager`
from server-common instead of the server module.
Reviewers: Mickael Maison <[email protected]>, Chia-Ping Tsai
<[email protected]>
---
checkstyle/import-control-server-common.xml | 4 ++
.../kafka/server/builders/KafkaApisBuilder.java | 2 +-
.../src/main/scala/kafka/server/BrokerServer.scala | 8 +--
.../main/scala/kafka/server/ControllerApis.scala | 3 +-
.../main/scala/kafka/server/ControllerServer.scala | 13 ++--
core/src/main/scala/kafka/server/KafkaApis.scala | 3 +-
.../server/metadata/BrokerMetadataPublisher.scala | 4 +-
.../server/metadata/DelegationTokenPublisher.scala | 83 ----------------------
.../metadata/BrokerMetadataPublisherTest.scala | 2 +-
.../publisher/DelegationTokenPublisher.java | 73 +++++++++++++++++++
.../kafka/security}/DelegationTokenManager.java | 2 +-
.../config/DelegationTokenManagerConfigs.java | 0
12 files changed, 97 insertions(+), 100 deletions(-)
diff --git a/checkstyle/import-control-server-common.xml
b/checkstyle/import-control-server-common.xml
index 21b13ed91d2..95a014b87e4 100644
--- a/checkstyle/import-control-server-common.xml
+++ b/checkstyle/import-control-server-common.xml
@@ -33,6 +33,7 @@
<allow pkg="javax.net.ssl" />
<allow pkg="javax.security" />
<allow pkg="net.jqwik.api" />
+ <allow pkg="javax.crypto" />
<!-- no one depends on the server -->
<disallow pkg="kafka" />
@@ -49,6 +50,9 @@
<!-- persistent collection factories/non-library-specific wrappers -->
<allow pkg="org.apache.kafka.server.immutable" exact-match="true" />
+ <!-- allow config classes for server package -->
+ <allow pkg="org.apache.kafka.server.config" />
+
<subpackage name="queue">
<allow pkg="org.apache.kafka.test" />
</subpackage>
diff --git a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
index ecbb6c8b154..e03ab35e90e 100644
--- a/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
+++ b/core/src/main/java/kafka/server/builders/KafkaApisBuilder.java
@@ -36,9 +36,9 @@ import org.apache.kafka.coordinator.group.GroupCoordinator;
import org.apache.kafka.coordinator.share.ShareCoordinator;
import org.apache.kafka.metadata.ConfigRepository;
import org.apache.kafka.metadata.MetadataCache;
+import org.apache.kafka.security.DelegationTokenManager;
import org.apache.kafka.server.ApiVersionManager;
import org.apache.kafka.server.ClientMetricsManager;
-import org.apache.kafka.server.DelegationTokenManager;
import org.apache.kafka.server.authorizer.Authorizer;
import org.apache.kafka.storage.log.metrics.BrokerTopicStats;
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index 47085169979..689c62b8687 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -42,8 +42,8 @@ import org.apache.kafka.coordinator.share.{ShareCoordinator,
ShareCoordinatorRec
import org.apache.kafka.coordinator.transaction.ProducerIdManager
import org.apache.kafka.image.publisher.{BrokerRegistrationTracker,
MetadataPublisher}
import org.apache.kafka.metadata.{BrokerState, ListenerInfo}
-import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher}
-import org.apache.kafka.security.CredentialProvider
+import org.apache.kafka.metadata.publisher.{AclPublisher,
DelegationTokenPublisher, ScramPublisher}
+import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion,
DirectoryEventHandler, NodeToControllerChannelManager, TopicIdPartition}
import org.apache.kafka.server.config.{ConfigType,
DelegationTokenManagerConfigs}
@@ -54,7 +54,7 @@ import
org.apache.kafka.server.share.persister.{DefaultStatePersister, NoOpState
import org.apache.kafka.server.share.session.ShareSessionCache
import org.apache.kafka.server.util.timer.{SystemTimer, SystemTimerReaper}
import org.apache.kafka.server.util.{Deadline, FutureUtils, KafkaScheduler}
-import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures,
ClientMetricsManager, DefaultApiVersionManager, DelayedActionQueue,
DelegationTokenManager, ProcessRole}
+import org.apache.kafka.server.{AssignmentsManager, BrokerFeatures,
ClientMetricsManager, DefaultApiVersionManager, DelayedActionQueue, ProcessRole}
import org.apache.kafka.server.transaction.AddPartitionsToTxnManager
import org.apache.kafka.storage.internals.log.LogDirFailureChannel
import org.apache.kafka.storage.log.metrics.BrokerTopicStats
@@ -502,7 +502,7 @@ class BrokerServer(
"broker",
credentialProvider),
new DelegationTokenPublisher(
- config,
+ config.nodeId,
sharedServer.metadataPublishingFaultHandler,
"broker",
tokenManager),
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala
b/core/src/main/scala/kafka/server/ControllerApis.scala
index ac9a2d9eff1..f10b769d9c1 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -55,7 +55,8 @@ import org.apache.kafka.metadata.{BrokerHeartbeatReply,
BrokerRegistrationReply}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.raft.RaftManager
-import org.apache.kafka.server.{ApiVersionManager, DelegationTokenManager,
ProcessRole}
+import org.apache.kafka.security.DelegationTokenManager
+import org.apache.kafka.server.{ApiVersionManager, ProcessRole}
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion, RequestLocal}
import org.apache.kafka.server.quota.ControllerMutationQuota
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala
b/core/src/main/scala/kafka/server/ControllerServer.scala
index badcb9b2d8a..e41705ed3ba 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -22,7 +22,7 @@ import kafka.raft.KafkaRaftManager
import kafka.server.QuotaFactory.QuotaManagers
import scala.collection.immutable
-import kafka.server.metadata.{ClientQuotaMetadataManager,
DelegationTokenPublisher, DynamicClientQuotaPublisher, DynamicConfigPublisher,
DynamicTopicClusterQuotaPublisher, KRaftMetadataCache,
KRaftMetadataCachePublisher}
+import kafka.server.metadata.{ClientQuotaMetadataManager,
DynamicClientQuotaPublisher, DynamicConfigPublisher,
DynamicTopicClusterQuotaPublisher, KRaftMetadataCache,
KRaftMetadataCachePublisher}
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.internals.Plugin
import org.apache.kafka.common.message.ApiMessageType.ListenerType
@@ -38,14 +38,15 @@ import
org.apache.kafka.image.publisher.{ControllerRegistrationsPublisher, Metad
import org.apache.kafka.metadata.{KafkaConfigSchema, ListenerInfo}
import org.apache.kafka.metadata.authorizer.ClusterMetadataAuthorizer
import org.apache.kafka.metadata.bootstrap.BootstrapMetadata
-import org.apache.kafka.metadata.publisher.{AclPublisher, FeaturesPublisher,
ScramPublisher}
+import org.apache.kafka.metadata.publisher.{AclPublisher,
DelegationTokenPublisher, FeaturesPublisher, ScramPublisher}
import org.apache.kafka.raft.QuorumConfig
-import org.apache.kafka.security.CredentialProvider
-import org.apache.kafka.server.{DelegationTokenManager, ProcessRole,
SimpleApiVersionManager}
+import org.apache.kafka.security.{CredentialProvider, DelegationTokenManager}
+import org.apache.kafka.server.{ProcessRole, SimpleApiVersionManager}
import org.apache.kafka.server.authorizer.Authorizer
import
org.apache.kafka.server.config.ServerLogConfigs.{ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG,
CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG}
import org.apache.kafka.server.common.{ApiMessageAndVersion, KRaftVersion,
NodeToControllerChannelManager}
-import org.apache.kafka.server.config.{ConfigType,
DelegationTokenManagerConfigs}
+import org.apache.kafka.server.config.ConfigType
+import org.apache.kafka.server.config.DelegationTokenManagerConfigs
import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics,
LinuxIoMetricsCollector}
import org.apache.kafka.server.network.{EndpointReadyFutures,
KafkaAuthorizerServerInfo}
import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
@@ -360,7 +361,7 @@ class ControllerServer(
// We need a tokenManager for the Publisher
// The tokenCache in the tokenManager is the same used in
DelegationTokenControlManager
metadataPublishers.add(new DelegationTokenPublisher(
- config,
+ config.nodeId,
sharedServer.metadataPublishingFaultHandler,
"controller",
new DelegationTokenManager(delegationTokenManagerConfigs, tokenCache)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index d3935c8e507..4cbef3fa648 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -60,7 +60,8 @@ import org.apache.kafka.common.{Node, TopicIdPartition,
TopicPartition, Uuid}
import org.apache.kafka.coordinator.group.{Group, GroupConfig,
GroupConfigManager, GroupCoordinator}
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
-import org.apache.kafka.server.{ApiVersionManager, ClientMetricsManager,
DelegationTokenManager, ProcessRole}
+import org.apache.kafka.security.DelegationTokenManager
+import org.apache.kafka.server.{ApiVersionManager, ClientMetricsManager,
ProcessRole}
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.{GroupVersion, RequestLocal,
ShareVersion, StreamsVersion, TransactionVersion}
import org.apache.kafka.server.config.DelegationTokenManagerConfigs
diff --git
a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
index 30ea835b5be..8df8a275580 100644
--- a/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
+++ b/core/src/main/scala/kafka/server/metadata/BrokerMetadataPublisher.scala
@@ -33,7 +33,7 @@ import
org.apache.kafka.coordinator.transaction.TransactionLogConfig
import org.apache.kafka.image.loader.LoaderManifest
import org.apache.kafka.image.publisher.MetadataPublisher
import org.apache.kafka.image.{MetadataDelta, MetadataImage, TopicDelta}
-import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher}
+import org.apache.kafka.metadata.publisher.{AclPublisher,
DelegationTokenPublisher, ScramPublisher}
import org.apache.kafka.server.common.MetadataVersion.MINIMUM_VERSION
import org.apache.kafka.server.common.{FinalizedFeatures, RequestLocal,
ShareVersion}
import org.apache.kafka.server.fault.FaultHandler
@@ -227,7 +227,7 @@ class BrokerMetadataPublisher(
scramPublisher.onMetadataUpdate(delta, newImage, manifest)
// Apply DelegationToken delta.
- delegationTokenPublisher.onMetadataUpdate(delta, newImage)
+ delegationTokenPublisher.onMetadataUpdate(delta, newImage, manifest)
// Apply ACL delta.
aclPublisher.onMetadataUpdate(delta, newImage, manifest)
diff --git
a/core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala
b/core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala
deleted file mode 100644
index 0e12c34b3c5..00000000000
--- a/core/src/main/scala/kafka/server/metadata/DelegationTokenPublisher.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package kafka.server.metadata
-
-import kafka.server.KafkaConfig
-import kafka.utils.Logging
-import org.apache.kafka.image.loader.LoaderManifest
-import org.apache.kafka.image.{MetadataDelta, MetadataImage}
-import org.apache.kafka.server.DelegationTokenManager
-import org.apache.kafka.server.fault.FaultHandler
-
-
-class DelegationTokenPublisher(
- conf: KafkaConfig,
- faultHandler: FaultHandler,
- nodeType: String,
- tokenManager: DelegationTokenManager,
-) extends Logging with org.apache.kafka.image.publisher.MetadataPublisher {
- logIdent = s"[${name()}] "
-
- var _firstPublish = true
-
- override def name(): String = s"DelegationTokenPublisher $nodeType
id=${conf.nodeId}"
-
- override def onMetadataUpdate(
- delta: MetadataDelta,
- newImage: MetadataImage,
- manifest: LoaderManifest
- ): Unit = {
- onMetadataUpdate(delta, newImage)
- }
-
- def onMetadataUpdate(
- delta: MetadataDelta,
- newImage: MetadataImage,
- ): Unit = {
- val deltaName = if (_firstPublish) {
- s"initial MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}"
- } else {
- s"update MetadataDelta up to ${newImage.highestOffsetAndEpoch().offset}"
- }
- try {
- if (_firstPublish) {
- // Initialize the tokenCache with the Image
- Option(newImage.delegationTokens()).foreach { delegationTokenImage =>
- delegationTokenImage.tokens().forEach { (_, delegationTokenData) =>
-
tokenManager.updateToken(tokenManager.getDelegationToken(delegationTokenData.tokenInformation()))
- }
- }
- _firstPublish = false
- }
- // Apply changes to DelegationTokens.
- Option(delta.delegationTokenDelta()).foreach { delegationTokenDelta =>
- delegationTokenDelta.changes().forEach {
- case (tokenId, delegationTokenData) =>
- if (delegationTokenData.isPresent) {
-
tokenManager.updateToken(tokenManager.getDelegationToken(delegationTokenData.get().tokenInformation()))
- } else {
- tokenManager.removeToken(tokenId)
- }
- }
- }
- } catch {
- case t: Throwable => faultHandler.handleFault("Uncaught exception while
" +
- s"publishing DelegationToken changes from $deltaName", t)
- }
- }
-}
diff --git
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
index 828ca0d7ad4..32727a4c3cc 100644
---
a/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
+++
b/core/src/test/scala/unit/kafka/server/metadata/BrokerMetadataPublisherTest.scala
@@ -40,7 +40,7 @@ import org.apache.kafka.coordinator.group.GroupCoordinator
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.image.{AclsImage, ClientQuotasImage, ClusterImageTest,
ConfigurationsImage, DelegationTokenImage, FeaturesImage, MetadataDelta,
MetadataImage, MetadataImageTest, MetadataProvenance, ProducerIdsImage,
ScramImage, TopicsImage}
import org.apache.kafka.image.loader.LogDeltaManifest
-import org.apache.kafka.metadata.publisher.{AclPublisher, ScramPublisher}
+import org.apache.kafka.metadata.publisher.{AclPublisher,
DelegationTokenPublisher, ScramPublisher}
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.LeaderAndEpoch
import org.apache.kafka.server.common.{KRaftVersion, MetadataVersion,
ShareVersion}
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/publisher/DelegationTokenPublisher.java
b/metadata/src/main/java/org/apache/kafka/metadata/publisher/DelegationTokenPublisher.java
new file mode 100644
index 00000000000..347b0d7f531
--- /dev/null
+++
b/metadata/src/main/java/org/apache/kafka/metadata/publisher/DelegationTokenPublisher.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.metadata.publisher;
+
+import org.apache.kafka.image.DelegationTokenImage;
+import org.apache.kafka.image.MetadataDelta;
+import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.image.loader.LoaderManifest;
+import org.apache.kafka.image.publisher.MetadataPublisher;
+import org.apache.kafka.security.DelegationTokenManager;
+import org.apache.kafka.server.fault.FaultHandler;
+
+public class DelegationTokenPublisher implements MetadataPublisher {
+ private final int nodeId;
+ private final FaultHandler faultHandler;
+ private final String nodeType;
+ private final DelegationTokenManager tokenManager;
+ private boolean firstPublish = true;
+
+ public DelegationTokenPublisher(int nodeId, FaultHandler faultHandler,
String nodeType, DelegationTokenManager tokenManager) {
+ this.nodeId = nodeId;
+ this.faultHandler = faultHandler;
+ this.nodeType = nodeType;
+ this.tokenManager = tokenManager;
+ }
+
+ @Override
+ public final String name() {
+ return "DelegationTokenPublisher " + nodeType + " id=" + nodeId;
+ }
+
+ @Override
+ public void onMetadataUpdate(MetadataDelta delta, MetadataImage newImage,
LoaderManifest manifest) {
+ var first = firstPublish;
+ try {
+ if (firstPublish) {
+ // Initialize the tokenCache with the Image
+ DelegationTokenImage delegationTokenImage =
newImage.delegationTokens();
+ for (var token : delegationTokenImage.tokens().entrySet()) {
+
tokenManager.updateToken(tokenManager.getDelegationToken(token.getValue().tokenInformation()));
+ }
+ firstPublish = false;
+ }
+ // Apply changes to DelegationTokens.
+ for (var token :
delta.getOrCreateDelegationTokenDelta().changes().entrySet()) {
+ var tokenId = token.getKey();
+ var delegationTokenData = token.getValue();
+ if (delegationTokenData.isPresent())
+
tokenManager.updateToken(tokenManager.getDelegationToken(delegationTokenData.get().tokenInformation()));
+ else
+ tokenManager.removeToken(tokenId);
+ }
+ } catch (Throwable t) {
+ var msg = String.format("Uncaught exception while publishing
DelegationToken changes from %s MetadataDelta up to %s",
+ first ? "initial" : "update",
newImage.highestOffsetAndEpoch().offset());
+ faultHandler.handleFault(msg, t);
+ }
+ }
+}
\ No newline at end of file
diff --git
a/server/src/main/java/org/apache/kafka/server/DelegationTokenManager.java
b/server-common/src/main/java/org/apache/kafka/security/DelegationTokenManager.java
similarity index 99%
rename from
server/src/main/java/org/apache/kafka/server/DelegationTokenManager.java
rename to
server-common/src/main/java/org/apache/kafka/security/DelegationTokenManager.java
index 54832fbd502..ef82a0702c8 100644
--- a/server/src/main/java/org/apache/kafka/server/DelegationTokenManager.java
+++
b/server-common/src/main/java/org/apache/kafka/security/DelegationTokenManager.java
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.kafka.server;
+package org.apache.kafka.security;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.scram.ScramCredential;
diff --git
a/server/src/main/java/org/apache/kafka/server/config/DelegationTokenManagerConfigs.java
b/server-common/src/main/java/org/apache/kafka/server/config/DelegationTokenManagerConfigs.java
similarity index 100%
rename from
server/src/main/java/org/apache/kafka/server/config/DelegationTokenManagerConfigs.java
rename to
server-common/src/main/java/org/apache/kafka/server/config/DelegationTokenManagerConfigs.java