This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8b49130b928 KAFKA-19355 Remove interBrokerListenerName from 
ClusterControlManager (#19866)
8b49130b928 is described below

commit 8b49130b928553ad8ee24188b591c2d006d9a394
Author: Hong-Yi Chen <[email protected]>
AuthorDate: Mon Jun 2 01:18:15 2025 +0800

    KAFKA-19355 Remove interBrokerListenerName from ClusterControlManager 
(#19866)
    
    Following the removal of the ZK-to-KRaft migration code in commit
    85bfdf4, controller-to-broker communication is now handled by the
    control-plane listener (`controller.listener.names`). The
    `interBrokerListenerName` parameter in `ClusterControlManager` is no
    longer referenced on the controller side and can be safely removed as
    dead code.
    
    Reviewers: Lan Ding <[email protected]>, Ken Huang <[email protected]>,
    Chia-Ping Tsai <[email protected]>
---
 core/src/main/scala/kafka/server/ControllerServer.scala     |  1 -
 .../org/apache/kafka/controller/ClusterControlManager.java  | 13 -------------
 .../java/org/apache/kafka/controller/QuorumController.java  |  8 --------
 3 files changed, 22 deletions(-)

diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index ff6821b1bb8..e8427fa7e53 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -252,7 +252,6 @@ class ControllerServer(
           
setDelegationTokenExpiryTimeMs(delegationTokenManagerConfigs.delegationTokenExpiryTimeMs).
           
setDelegationTokenExpiryCheckIntervalMs(delegationTokenManagerConfigs.delegationTokenExpiryCheckIntervalMs).
           
setUncleanLeaderElectionCheckIntervalMs(config.uncleanLeaderElectionCheckIntervalMs).
-          setInterBrokerListenerName(config.interBrokerListenerName.value()).
           
setControllerPerformanceSamplePeriodMs(config.controllerPerformanceSamplePeriodMs).
           
setControllerPerformanceAlwaysLogThresholdMs(config.controllerPerformanceAlwaysLogThresholdMs)
       }
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java 
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
index 64394b07586..14fce5ca0f3 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/ClusterControlManager.java
@@ -92,7 +92,6 @@ public class ClusterControlManager {
         private ReplicaPlacer replicaPlacer = null;
         private FeatureControlManager featureControl = null;
         private BrokerShutdownHandler brokerShutdownHandler = null;
-        private String interBrokerListenerName = "PLAINTEXT";
         private QuorumControllerMetrics metrics = null;
 
         Builder setLogContext(LogContext logContext) {
@@ -135,10 +134,6 @@ public class ClusterControlManager {
             return this;
         }
 
-        Builder setInterBrokerListenerName(String interBrokerListenerName) {
-            this.interBrokerListenerName = interBrokerListenerName;
-            return this;
-        }
 
         Builder setMetrics(QuorumControllerMetrics metrics) {
             this.metrics = metrics;
@@ -175,7 +170,6 @@ public class ClusterControlManager {
                 replicaPlacer,
                 featureControl,
                 brokerShutdownHandler,
-                interBrokerListenerName,
                 metrics
             );
         }
@@ -265,11 +259,6 @@ public class ClusterControlManager {
 
     private final BrokerShutdownHandler brokerShutdownHandler;
 
-    /**
-     * The statically configured inter-broker listener name.
-     */
-    private final String interBrokerListenerName;
-
     /**
      * Maps controller IDs to controller registrations.
      */
@@ -294,7 +283,6 @@ public class ClusterControlManager {
         ReplicaPlacer replicaPlacer,
         FeatureControlManager featureControl,
         BrokerShutdownHandler brokerShutdownHandler,
-        String interBrokerListenerName,
         QuorumControllerMetrics metrics
     ) {
         this.logContext = logContext;
@@ -311,7 +299,6 @@ public class ClusterControlManager {
         this.controllerRegistrations = new TimelineHashMap<>(snapshotRegistry, 
0);
         this.directoryToBroker = new TimelineHashMap<>(snapshotRegistry, 0);
         this.brokerShutdownHandler = brokerShutdownHandler;
-        this.interBrokerListenerName = interBrokerListenerName;
         this.metrics = metrics;
     }
 
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java 
b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
index 365e9503dcb..7dee1e3cd3d 100644
--- a/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
+++ b/metadata/src/main/java/org/apache/kafka/controller/QuorumController.java
@@ -220,7 +220,6 @@ public final class QuorumController implements Controller {
         private long delegationTokenExpiryTimeMs;
         private long delegationTokenExpiryCheckIntervalMs = 
TimeUnit.MINUTES.toMillis(5);
         private long uncleanLeaderElectionCheckIntervalMs = 
TimeUnit.MINUTES.toMillis(5);
-        private String interBrokerListenerName = "PLAINTEXT";
 
         public Builder(int nodeId, String clusterId) {
             this.nodeId = nodeId;
@@ -381,10 +380,6 @@ public final class QuorumController implements Controller {
             return this;
         }
 
-        public Builder setInterBrokerListenerName(String 
interBrokerListenerName) {
-            this.interBrokerListenerName = interBrokerListenerName;
-            return this;
-        }
 
         public QuorumController build() throws Exception {
             if (raftClient == null) {
@@ -443,7 +438,6 @@ public final class QuorumController implements Controller {
                     delegationTokenExpiryTimeMs,
                     delegationTokenExpiryCheckIntervalMs,
                     uncleanLeaderElectionCheckIntervalMs,
-                    interBrokerListenerName,
                     controllerPerformanceSamplePeriodMs,
                     controllerPerformanceAlwaysLogThresholdMs
                 );
@@ -1488,7 +1482,6 @@ public final class QuorumController implements Controller 
{
         long delegationTokenExpiryTimeMs,
         long delegationTokenExpiryCheckIntervalMs,
         long uncleanLeaderElectionCheckIntervalMs,
-        String interBrokerListenerName,
         long controllerPerformanceSamplePeriodMs,
         long controllerPerformanceAlwaysLogThresholdMs
     ) {
@@ -1530,7 +1523,6 @@ public final class QuorumController implements Controller 
{
             setReplicaPlacer(replicaPlacer).
             setFeatureControlManager(featureControl).
             setBrokerShutdownHandler(this::handleBrokerShutdown).
-            setInterBrokerListenerName(interBrokerListenerName).
             setMetrics(controllerMetrics).
             build();
         this.configurationControl = new ConfigurationControlManager.Builder().

Reply via email to