This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 375ed19fba6 KAFKA-19100: Use ProcessRole instead of String in AclApis
(#19406)
375ed19fba6 is described below
commit 375ed19fba6580cb157643d153dffeba9f3df435
Author: Xuan-Zhang Gong <[email protected]>
AuthorDate: Tue Apr 8 17:09:55 2025 +0800
KAFKA-19100: Use ProcessRole instead of String in AclApis (#19406)
Use the ProcessRole enum instead of hardcoding the role
Reviewers: Mickael Maison <[email protected]>, PoAn Yang
<[email protected]>, Jhen-Yung Hsu <[email protected]>, Ken Huang
<[email protected]>
---
core/src/main/scala/kafka/server/AclApis.scala | 5 +++--
core/src/main/scala/kafka/server/ControllerApis.scala | 3 ++-
core/src/main/scala/kafka/server/KafkaApis.scala | 4 ++--
3 files changed, 7 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/kafka/server/AclApis.scala
b/core/src/main/scala/kafka/server/AclApis.scala
index fe4adf5f937..4bf57f8a5cc 100644
--- a/core/src/main/scala/kafka/server/AclApis.scala
+++ b/core/src/main/scala/kafka/server/AclApis.scala
@@ -30,6 +30,7 @@ import org.apache.kafka.common.requests._
import org.apache.kafka.common.resource.Resource.CLUSTER_NAME
import org.apache.kafka.common.resource.ResourceType
import org.apache.kafka.security.authorizer.AuthorizerUtils
+import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer._
import java.util
@@ -45,9 +46,9 @@ import scala.jdk.OptionConverters.RichOptional
class AclApis(authHelper: AuthHelper,
authorizer: Option[Authorizer],
requestHelper: RequestHandlerHelper,
- name: String,
+ role: ProcessRole,
config: KafkaConfig) extends Logging {
- this.logIdent = "[AclApis-%s-%s] ".format(name, config.nodeId)
+ this.logIdent = "[AclApis-%s-%s] ".format(role, config.nodeId)
private val alterAclsPurgatory =
new DelayedFuturePurgatory(purgatoryName = "AlterAcls", brokerId =
config.nodeId)
diff --git a/core/src/main/scala/kafka/server/ControllerApis.scala
b/core/src/main/scala/kafka/server/ControllerApis.scala
index 0b3c1ea1dac..da9a1b86322 100644
--- a/core/src/main/scala/kafka/server/ControllerApis.scala
+++ b/core/src/main/scala/kafka/server/ControllerApis.scala
@@ -56,6 +56,7 @@ import
org.apache.kafka.image.publisher.ControllerRegistrationsPublisher
import org.apache.kafka.metadata.{BrokerHeartbeatReply,
BrokerRegistrationReply}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.server.ProcessRole
import org.apache.kafka.server.authorizer.Authorizer
import org.apache.kafka.server.common.{ApiMessageAndVersion, RequestLocal}
@@ -84,7 +85,7 @@ class ControllerApis(
val configHelper = new ConfigHelper(metadataCache, config, metadataCache)
val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time)
val runtimeLoggerManager = new RuntimeLoggerManager(config.nodeId,
logger.underlying)
- private val aclApis = new AclApis(authHelper, authorizer, requestHelper,
"controller", config)
+ private val aclApis = new AclApis(authHelper, authorizer, requestHelper,
ProcessRole.ControllerRole, config)
def isClosed: Boolean = aclApis.isClosed
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala
b/core/src/main/scala/kafka/server/KafkaApis.scala
index 8356bc4e732..30f53f3d26b 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -58,7 +58,7 @@ import org.apache.kafka.common.{Node, TopicIdPartition,
TopicPartition, Uuid}
import org.apache.kafka.coordinator.group.{Group, GroupConfigManager,
GroupCoordinator}
import org.apache.kafka.coordinator.share.ShareCoordinator
import org.apache.kafka.metadata.{ConfigRepository, MetadataCache}
-import org.apache.kafka.server.ClientMetricsManager
+import org.apache.kafka.server.{ClientMetricsManager, ProcessRole}
import org.apache.kafka.server.authorizer._
import org.apache.kafka.server.common.{GroupVersion, RequestLocal,
TransactionVersion}
import org.apache.kafka.server.share.context.ShareFetchContext
@@ -113,7 +113,7 @@ class KafkaApis(val requestChannel: RequestChannel,
val configHelper = new ConfigHelper(metadataCache, config, configRepository)
val authHelper = new AuthHelper(authorizer)
val requestHelper = new RequestHandlerHelper(requestChannel, quotas, time)
- val aclApis = new AclApis(authHelper, authorizer, requestHelper, "broker",
config)
+ val aclApis = new AclApis(authHelper, authorizer, requestHelper,
ProcessRole.BrokerRole, config)
val configManager = new ConfigAdminManager(brokerId, config,
configRepository)
val describeTopicPartitionsRequestHandler = new
DescribeTopicPartitionsRequestHandler(
metadataCache, authHelper, config)