This is an automated email from the ASF dual-hosted git repository.

frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 81881dee838 KAFKA-18760: Deprecate Optional<String> and return String 
from public Endpoint#listener (#19191)
81881dee838 is described below

commit 81881dee838f420176ee31a2352bf56811648f74
Author: PoAn Yang <[email protected]>
AuthorDate: Wed Apr 30 12:15:33 2025 +0800

    KAFKA-18760: Deprecate Optional<String> and return String from public 
Endpoint#listener (#19191)
    
    * Deprecate org.apache.kafka.common.Endpoint#listenerName.
    * Add org.apache.kafka.common.Endpoint#listener to replace
    org.apache.kafka.common.Endpoint#listenerName.
    * Replace org.apache.kafka.network.EndPoint with
    org.apache.kafka.common.Endpoint.
    * Deprecate org.apache.kafka.clients.admin.RaftVoterEndpoint#name
    * Add org.apache.kafka.clients.admin.RaftVoterEndpoint#listener to
    replace org.apache.kafka.clients.admin.RaftVoterEndpoint#name
    
    Reviewers: Chia-Ping Tsai <[email protected]>, TaiJuWu
     <[email protected]>, Jhen-Yung Hsu <[email protected]>, TengYao
     Chi <[email protected]>, Ken Huang <[email protected]>, Bagda
     Parth  , Kuan-Po Tseng <[email protected]>
    
    ---------
    
    Signed-off-by: PoAn Yang <[email protected]>
---
 .../kafka/clients/admin/KafkaAdminClient.java      |  2 +-
 .../kafka/clients/admin/RaftVoterEndpoint.java     | 27 ++++++---
 .../java/org/apache/kafka/common/Endpoint.java     | 25 +++++---
 .../main/scala/kafka/network/SocketServer.scala    | 67 +++++++++++-----------
 .../src/main/scala/kafka/server/BrokerServer.scala |  2 +-
 .../main/scala/kafka/server/ControllerServer.scala |  2 +-
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 16 +++---
 core/src/main/scala/kafka/server/KafkaConfig.scala | 47 ++++++++-------
 .../main/scala/kafka/tools/TestRaftServer.scala    |  3 +-
 core/src/main/scala/kafka/utils/CoreUtils.scala    | 15 +++--
 .../kafka/api/SslAdminIntegrationTest.scala        |  2 +-
 .../kafka/server/KRaftClusterTest.scala            |  2 +-
 ...ListenersWithSameSecurityProtocolBaseTest.scala |  4 +-
 .../unit/kafka/network/SocketServerTest.scala      | 16 +++---
 .../scala/unit/kafka/raft/RaftManagerTest.scala    |  3 +-
 .../server/ControllerRegistrationManagerTest.scala |  2 +-
 .../kafka/server/DynamicBrokerConfigTest.scala     |  2 +-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  | 29 +++++-----
 .../kafka/server/RegistrationTestContext.scala     |  2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  2 +-
 .../apache/kafka/metadata/BrokerRegistration.java  |  4 +-
 .../org/apache/kafka/metadata/ListenerInfo.java    | 20 +++----
 .../metadata/authorizer/StandardAuthorizer.java    |  2 +-
 .../apache/kafka/metadata/ListenerInfoTest.java    | 11 ++--
 .../authorizer/StandardAuthorizerTest.java         |  4 +-
 .../kafka/server/network/EndpointReadyFutures.java |  4 +-
 .../server/network/EndpointReadyFuturesTest.java   |  2 +-
 .../java/org/apache/kafka/network/EndPoint.java    | 48 ----------------
 .../org/apache/kafka/common/test/TestUtils.java    |  6 +-
 .../apache/kafka/tools/MetadataQuorumCommand.java  |  8 +--
 30 files changed, 179 insertions(+), 200 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index d4c9ec9beeb..5cb5cc292ea 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -4948,7 +4948,7 @@ public class KafkaAdminClient extends AdminClient {
                     new AddRaftVoterRequestData.ListenerCollection();
                 endpoints.forEach(endpoint ->
                     listeners.add(new AddRaftVoterRequestData.Listener().
-                        setName(endpoint.name()).
+                        setName(endpoint.listener()).
                         setHost(endpoint.host()).
                         setPort(endpoint.port())));
                 return new AddRaftVoterRequest.Builder(
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/RaftVoterEndpoint.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/RaftVoterEndpoint.java
index 984ac999393..ba5b39284eb 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/RaftVoterEndpoint.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/RaftVoterEndpoint.java
@@ -26,7 +26,7 @@ import java.util.Objects;
  */
 @InterfaceStability.Stable
 public class RaftVoterEndpoint {
-    private final String name;
+    private final String listener;
     private final String host;
     private final int port;
 
@@ -49,22 +49,33 @@ public class RaftVoterEndpoint {
     /**
      * Create an endpoint for a metadata quorum voter.
      *
-     * @param name              The human-readable name for this endpoint. For 
example, CONTROLLER.
+     * @param listener          The human-readable name for this endpoint. For 
example, CONTROLLER.
      * @param host              The DNS hostname for this endpoint.
      * @param port              The network port for this endpoint.
      */
     public RaftVoterEndpoint(
-        String name,
+        String listener,
         String host,
         int port
     ) {
-        this.name = requireNonNullAllCapsNonEmpty(name);
+        this.listener = requireNonNullAllCapsNonEmpty(listener);
         this.host = Objects.requireNonNull(host);
         this.port = port;
     }
 
+    /**
+     * The listener name for this endpoint.
+     */
+    public String listener() {
+        return listener;
+    }
+
+    /**
+     * @deprecated Since 4.1. Use {@link #listener()} instead. This function 
will be removed in 5.0.
+     */
+    @Deprecated(since = "4.1", forRemoval = true)
     public String name() {
-        return name;
+        return listener;
     }
 
     public String host() {
@@ -79,20 +90,20 @@ public class RaftVoterEndpoint {
     public boolean equals(Object o) {
         if (o == null || (!o.getClass().equals(getClass()))) return false;
         RaftVoterEndpoint other = (RaftVoterEndpoint) o;
-        return name.equals(other.name) &&
+        return listener.equals(other.listener) &&
             host.equals(other.host) &&
             port == other.port;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(name, host, port);
+        return Objects.hash(listener, host, port);
     }
 
     @Override
     public String toString() {
         // enclose IPv6 hosts in square brackets for readability
         String hostString = host.contains(":") ? "[" + host + "]" : host;
-        return name + "://" + hostString + ":" + port;
+        return listener + "://" + hostString + ":" + port;
     }
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/Endpoint.java 
b/clients/src/main/java/org/apache/kafka/common/Endpoint.java
index 8d5e8c6d16a..baa1045929f 100644
--- a/clients/src/main/java/org/apache/kafka/common/Endpoint.java
+++ b/clients/src/main/java/org/apache/kafka/common/Endpoint.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.common;
 
-import org.apache.kafka.common.annotation.InterfaceStability;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 
 import java.util.Objects;
@@ -26,27 +25,35 @@ import java.util.Optional;
  * Represents a broker endpoint.
  */
 
[email protected]
 public class Endpoint {
 
-    private final String listenerName;
+    private final String listener;
     private final SecurityProtocol securityProtocol;
     private final String host;
     private final int port;
 
-    public Endpoint(String listenerName, SecurityProtocol securityProtocol, 
String host, int port) {
-        this.listenerName = listenerName;
+    public Endpoint(String listener, SecurityProtocol securityProtocol, String 
host, int port) {
+        this.listener = listener;
         this.securityProtocol = securityProtocol;
         this.host = host;
         this.port = port;
     }
 
+    /**
+     * Returns the listener name of this endpoint.
+     */
+    public String listener() {
+        return listener;
+    }
+
     /**
      * Returns the listener name of this endpoint. This is non-empty for 
endpoints provided
      * to broker plugins, but may be empty when used in clients.
+     * @deprecated Since 4.1. Use {@link #listener()} instead. This function 
will be removed in 5.0.
      */
+    @Deprecated(since = "4.1", forRemoval = true)
     public Optional<String> listenerName() {
-        return Optional.ofNullable(listenerName);
+        return Optional.ofNullable(listener);
     }
 
     /**
@@ -80,7 +87,7 @@ public class Endpoint {
         }
 
         Endpoint that = (Endpoint) o;
-        return Objects.equals(this.listenerName, that.listenerName) &&
+        return Objects.equals(this.listener, that.listener) &&
             Objects.equals(this.securityProtocol, that.securityProtocol) &&
             Objects.equals(this.host, that.host) &&
             this.port == that.port;
@@ -89,13 +96,13 @@ public class Endpoint {
 
     @Override
     public int hashCode() {
-        return Objects.hash(listenerName, securityProtocol, host, port);
+        return Objects.hash(listener, securityProtocol, host, port);
     }
 
     @Override
     public String toString() {
         return "Endpoint(" +
-            "listenerName='" + listenerName + '\'' +
+            "listenerName='" + listener + '\'' +
             ", securityProtocol=" + securityProtocol +
             ", host='" + host + '\'' +
             ", port=" + port +
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala 
b/core/src/main/scala/kafka/network/SocketServer.scala
index 4163b563f01..884c00002c5 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -29,7 +29,6 @@ import kafka.network.Processor._
 import kafka.network.RequestChannel.{CloseConnectionResponse, 
EndThrottlingResponse, NoOpResponse, SendResponse, StartThrottlingResponse}
 import kafka.network.SocketServer._
 import kafka.server.{BrokerReconfigurable, KafkaConfig}
-import org.apache.kafka.network.EndPoint
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import kafka.utils._
 import org.apache.kafka.common.config.ConfigException
@@ -96,7 +95,7 @@ class SocketServer(
   memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, 
memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
   private val memoryPool = if (config.queuedMaxBytes > 0) new 
SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, 
memoryPoolSensor) else MemoryPool.NONE
   // data-plane
-  private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, 
DataPlaneAcceptor]()
+  private[network] val dataPlaneAcceptors = new ConcurrentHashMap[Endpoint, 
DataPlaneAcceptor]()
   val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, time, 
apiVersionManager.newRequestMetrics)
 
   private[this] val nextProcessorId: AtomicInteger = new AtomicInteger(0)
@@ -161,8 +160,8 @@ class SocketServer(
    * Therefore, we do not know that any particular request processor will be 
running by the end of
    * this function -- just that it might be running.
    *
-   * @param authorizerFutures     Future per [[EndPoint]] used to wait before 
starting the
-   *                              processor corresponding to the [[EndPoint]]. 
Any endpoint
+   * @param authorizerFutures     Future per [[Endpoint]] used to wait before 
starting the
+   *                              processor corresponding to the [[Endpoint]]. 
Any endpoint
    *                              that does not appear in this map will be 
started once all
    *                              authorizerFutures are complete.
    *
@@ -181,7 +180,7 @@ class SocketServer(
       // Because of ephemeral ports, we need to match acceptors to futures by 
looking at
       // the listener name, rather than the endpoint object.
       val authorizerFuture = authorizerFutures.find {
-        case (endpoint, _) => 
acceptor.endPoint.listenerName.value().equals(endpoint.listenerName().get())
+        case (endpoint, _) => 
acceptor.endPoint.listener.equals(endpoint.listener())
       } match {
         case None => allAuthorizerFuturesComplete
         case Some((_, future)) => future
@@ -210,23 +209,24 @@ class SocketServer(
     enableFuture
   }
 
-  private def createDataPlaneAcceptorAndProcessors(endpoint: EndPoint): Unit = 
synchronized {
+  private def createDataPlaneAcceptorAndProcessors(endpoint: Endpoint): Unit = 
synchronized {
     if (stopped) {
       throw new RuntimeException("Can't create new data plane acceptor and 
processors: SocketServer is stopped.")
     }
-    val parsedConfigs = 
config.valuesFromThisConfigWithPrefixOverride(endpoint.listenerName.configPrefix)
-    connectionQuotas.addListener(config, endpoint.listenerName)
-    val isPrivilegedListener = config.interBrokerListenerName == 
endpoint.listenerName
+    val listenerName =  ListenerName.normalised(endpoint.listener)
+    val parsedConfigs = 
config.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix)
+    connectionQuotas.addListener(config, listenerName)
+    val isPrivilegedListener = config.interBrokerListenerName == listenerName
     val dataPlaneAcceptor = createDataPlaneAcceptor(endpoint, 
isPrivilegedListener, dataPlaneRequestChannel)
     config.addReconfigurable(dataPlaneAcceptor)
     dataPlaneAcceptor.configure(parsedConfigs)
     dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
-    info(s"Created data-plane acceptor and processors for endpoint : 
${endpoint.listenerName}")
+    info(s"Created data-plane acceptor and processors for endpoint : 
${listenerName}")
   }
 
-  private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap
+  private def endpoints = config.listeners.map(l => 
ListenerName.normalised(l.listener) -> l).toMap
 
-  protected def createDataPlaneAcceptor(endPoint: EndPoint, 
isPrivilegedListener: Boolean, requestChannel: RequestChannel): 
DataPlaneAcceptor = {
+  protected def createDataPlaneAcceptor(endPoint: Endpoint, 
isPrivilegedListener: Boolean, requestChannel: RequestChannel): 
DataPlaneAcceptor = {
     new DataPlaneAcceptor(this, endPoint, config, nodeId, connectionQuotas, 
time, isPrivilegedListener, requestChannel, metrics, credentialProvider, 
logContext, memoryPool, apiVersionManager)
   }
 
@@ -277,7 +277,7 @@ class SocketServer(
   /**
    * This method is called to dynamically add listeners.
    */
-  def addListeners(listenersAdded: Seq[EndPoint]): Unit = synchronized {
+  def addListeners(listenersAdded: Seq[Endpoint]): Unit = synchronized {
     if (stopped) {
       throw new RuntimeException("can't add new listeners: SocketServer is 
stopped.")
     }
@@ -297,10 +297,10 @@ class SocketServer(
     }
   }
 
-  def removeListeners(listenersRemoved: Seq[EndPoint]): Unit = synchronized {
+  def removeListeners(listenersRemoved: Seq[Endpoint]): Unit = synchronized {
     info(s"Removing data-plane listeners for endpoints $listenersRemoved")
     listenersRemoved.foreach { endpoint =>
-      connectionQuotas.removeListener(config, endpoint.listenerName)
+      connectionQuotas.removeListener(config, 
ListenerName.normalised(endpoint.listener))
       dataPlaneAcceptors.asScala.remove(endpoint).foreach { acceptor =>
         acceptor.beginShutdown()
         acceptor.close()
@@ -345,7 +345,7 @@ class SocketServer(
   // For test usage
   def dataPlaneAcceptor(listenerName: String): Option[DataPlaneAcceptor] = {
     dataPlaneAcceptors.asScala.foreach { case (endPoint, acceptor) =>
-      if (endPoint.listenerName.value() == listenerName)
+      if (endPoint.listener == listenerName)
         return Some(acceptor)
     }
     None
@@ -374,7 +374,7 @@ object DataPlaneAcceptor {
 }
 
 class DataPlaneAcceptor(socketServer: SocketServer,
-                        endPoint: EndPoint,
+                        endPoint: Endpoint,
                         config: KafkaConfig,
                         nodeId: Int,
                         connectionQuotas: ConnectionQuotas,
@@ -404,7 +404,7 @@ class DataPlaneAcceptor(socketServer: SocketServer,
    * Returns the listener name associated with this reconfigurable. 
Listener-specific
    * configs corresponding to this listener name are provided for 
reconfiguration.
    */
-  override def listenerName(): ListenerName = endPoint.listenerName
+  override def listenerName(): ListenerName = 
ListenerName.normalised(endPoint.listener)
 
   /**
    * Returns the names of configs that may be reconfigured.
@@ -451,7 +451,7 @@ class DataPlaneAcceptor(socketServer: SocketServer,
     val newNumNetworkThreads = 
configs.get(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG).asInstanceOf[Int]
 
     if (newNumNetworkThreads != processors.length) {
-      info(s"Resizing network thread pool size for ${endPoint.listenerName} 
listener from ${processors.length} to $newNumNetworkThreads")
+      info(s"Resizing network thread pool size for ${endPoint.listener} 
listener from ${processors.length} to $newNumNetworkThreads")
       if (newNumNetworkThreads > processors.length) {
         addProcessors(newNumNetworkThreads - processors.length)
       } else if (newNumNetworkThreads < processors.length) {
@@ -472,7 +472,7 @@ class DataPlaneAcceptor(socketServer: SocketServer,
  * Thread that accepts and configures new connections. There is one of these 
per endpoint.
  */
 private[kafka] abstract class Acceptor(val socketServer: SocketServer,
-                                       val endPoint: EndPoint,
+                                       val endPoint: Endpoint,
                                        var config: KafkaConfig,
                                        nodeId: Int,
                                        val connectionQuotas: ConnectionQuotas,
@@ -515,7 +515,7 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
   private val backwardCompatibilityMetricGroup = new 
KafkaMetricsGroup("kafka.network", "Acceptor")
   private val blockedPercentMeterMetricName = 
backwardCompatibilityMetricGroup.metricName(
     "AcceptorBlockedPercent",
-    Map(ListenerMetricTag -> endPoint.listenerName.value).asJava)
+    Map(ListenerMetricTag -> endPoint.listener).asJava)
   private val blockedPercentMeter = 
metricsGroup.newMeter(blockedPercentMeterMetricName,"blocked time", 
TimeUnit.NANOSECONDS)
   private var currentProcessorIndex = 0
   private[network] val throttledSockets = new 
mutable.PriorityQueue[DelayedCloseSocket]()
@@ -523,7 +523,7 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
   private[network] val startedFuture = new CompletableFuture[Void]()
 
   val thread: KafkaThread = KafkaThread.nonDaemon(
-    
s"data-plane-kafka-socket-acceptor-${endPoint.listenerName}-${endPoint.securityProtocol}-${endPoint.port}",
+    
s"data-plane-kafka-socket-acceptor-${endPoint.listener}-${endPoint.securityProtocol}-${endPoint.port}",
     this)
 
   def start(): Unit = synchronized {
@@ -535,19 +535,19 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
         serverChannel = openServerSocket(endPoint.host, endPoint.port, 
listenBacklogSize)
         debug(s"Opened endpoint ${endPoint.host}:${endPoint.port}")
       }
-      debug(s"Starting processors for listener ${endPoint.listenerName}")
+      debug(s"Starting processors for listener ${endPoint.listener}")
       processors.foreach(_.start())
-      debug(s"Starting acceptor thread for listener ${endPoint.listenerName}")
+      debug(s"Starting acceptor thread for listener ${endPoint.listener}")
       thread.start()
       startedFuture.complete(null)
       started.set(true)
     } catch {
       case e: ClosedChannelException =>
-        debug(s"Refusing to start acceptor for ${endPoint.listenerName} since 
the acceptor has already been shut down.")
+        debug(s"Refusing to start acceptor for ${endPoint.listener} since the 
acceptor has already been shut down.")
         startedFuture.completeExceptionally(e)
       case t: Throwable =>
-        error(s"Unable to start acceptor for ${endPoint.listenerName}", t)
-        startedFuture.completeExceptionally(new RuntimeException(s"Unable to 
start acceptor for ${endPoint.listenerName}", t))
+        error(s"Unable to start acceptor for ${endPoint.listener}", t)
+        startedFuture.completeExceptionally(new RuntimeException(s"Unable to 
start acceptor for ${endPoint.listener}", t))
     }
   }
 
@@ -628,7 +628,7 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
       new InetSocketAddress(host, port)
     }
     val serverChannel = socketServer.socketFactory.openServerSocket(
-      endPoint.listenerName.value(),
+      endPoint.listener,
       socketAddress,
       listenBacklogSize,
       recvBufferSize)
@@ -682,14 +682,15 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
   private def accept(key: SelectionKey): Option[SocketChannel] = {
     val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
     val socketChannel = serverSocketChannel.accept()
+    val listenerName = ListenerName.normalised(endPoint.listener)
     try {
-      connectionQuotas.inc(endPoint.listenerName, 
socketChannel.socket.getInetAddress, blockedPercentMeter)
+      connectionQuotas.inc(listenerName, socketChannel.socket.getInetAddress, 
blockedPercentMeter)
       configureAcceptedSocketChannel(socketChannel)
       Some(socketChannel)
     } catch {
       case e: TooManyConnectionsException =>
         info(s"Rejected connection from ${e.ip}, address already has the 
configured maximum of ${e.count} connections.")
-        connectionQuotas.closeChannel(this, endPoint.listenerName, 
socketChannel)
+        connectionQuotas.closeChannel(this, listenerName, socketChannel)
         None
       case e: ConnectionThrottledException =>
         val ip = socketChannel.socket.getInetAddress
@@ -699,7 +700,7 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
         None
       case e: IOException =>
         error(s"Encountered an error while configuring the connection, closing 
it.", e)
-        connectionQuotas.closeChannel(this, endPoint.listenerName, 
socketChannel)
+        connectionQuotas.closeChannel(this, listenerName, socketChannel)
         None
     }
   }
@@ -741,7 +742,7 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
   def wakeup(): Unit = nioSelector.wakeup()
 
   def addProcessors(toCreate: Int): Unit = synchronized {
-    val listenerName = endPoint.listenerName
+    val listenerName = ListenerName.normalised(endPoint.listener)
     val securityProtocol = endPoint.securityProtocol
     val listenerProcessors = new ArrayBuffer[Processor]()
 
@@ -761,7 +762,7 @@ private[kafka] abstract class Acceptor(val socketServer: 
SocketServer,
                    listenerName: ListenerName,
                    securityProtocol: SecurityProtocol,
                    connectionDisconnectListeners: 
Seq[ConnectionDisconnectListener]): Processor = {
-    val name = 
s"data-plane-kafka-network-thread-$nodeId-${endPoint.listenerName}-${endPoint.securityProtocol}-$id"
+    val name = 
s"data-plane-kafka-network-thread-$nodeId-${endPoint.listener}-${endPoint.securityProtocol}-$id"
     new Processor(id,
                   time,
                   config.socketRequestMaxBytes,
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala 
b/core/src/main/scala/kafka/server/BrokerServer.scala
index a2c6d4b98f5..172c02ae924 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -274,7 +274,7 @@ class BrokerServer(
       clientQuotaMetadataManager = new 
ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)
 
       val listenerInfo = 
ListenerInfo.create(Optional.of(config.interBrokerListenerName.value()),
-          config.effectiveAdvertisedBrokerListeners.map(_.toPublic()).asJava).
+          config.effectiveAdvertisedBrokerListeners.asJava).
             withWildcardHostnamesResolved().
             withEphemeralPortsCorrected(name => socketServer.boundPort(new 
ListenerName(name)))
 
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala 
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 3fbc333f85f..e20deee1af9 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -174,7 +174,7 @@ class ControllerServer(
         sharedServer.socketFactory)
 
       val listenerInfo = ListenerInfo
-        
.create(config.effectiveAdvertisedControllerListeners.map(_.toPublic).asJava)
+        .create(config.effectiveAdvertisedControllerListeners.asJava)
         .withWildcardHostnamesResolved()
         .withEphemeralPortsCorrected(name => socketServer.boundPort(new 
ListenerName(name)))
       
socketServerFirstBoundPortFuture.complete(listenerInfo.firstListener().port())
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala 
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 27b5c8e16d4..dc4a09e0d23 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -27,7 +27,7 @@ import kafka.raft.KafkaRaftManager
 import kafka.server.DynamicBrokerConfig._
 import kafka.utils.{CoreUtils, Logging}
 import org.apache.kafka.common.Reconfigurable
-import org.apache.kafka.network.EndPoint
+import org.apache.kafka.common.Endpoint
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, 
ConfigException, ConfigResource, SaslConfigs, SslConfigs}
 import org.apache.kafka.common.metadata.{ConfigRecord, MetadataRecordType}
@@ -961,9 +961,9 @@ class DynamicListenerConfig(server: KafkaBroker) extends 
BrokerReconfigurable wi
 
   def validateReconfiguration(newConfig: KafkaConfig): Unit = {
     val oldConfig = server.config
-    val newListeners = newConfig.listeners.map(_.listenerName).toSet
-    val oldAdvertisedListeners = 
oldConfig.effectiveAdvertisedBrokerListeners.map(_.listenerName).toSet
-    val oldListeners = oldConfig.listeners.map(_.listenerName).toSet
+    val newListeners = newConfig.listeners.map(l => 
ListenerName.normalised(l.listener)).toSet
+    val oldAdvertisedListeners = 
oldConfig.effectiveAdvertisedBrokerListeners.map(l => 
ListenerName.normalised(l.listener)).toSet
+    val oldListeners = oldConfig.listeners.map(l => 
ListenerName.normalised(l.listener)).toSet
     if (!oldAdvertisedListeners.subsetOf(newListeners))
       throw new ConfigException(s"Advertised listeners 
'$oldAdvertisedListeners' must be a subset of listeners '$newListeners'")
     if 
(!newListeners.subsetOf(newConfig.effectiveListenerSecurityProtocolMap.keySet))
@@ -988,8 +988,8 @@ class DynamicListenerConfig(server: KafkaBroker) extends 
BrokerReconfigurable wi
     val newListenerMap = listenersToMap(newListeners)
     val oldListeners = oldConfig.listeners
     val oldListenerMap = listenersToMap(oldListeners)
-    val listenersRemoved = oldListeners.filterNot(e => 
newListenerMap.contains(e.listenerName))
-    val listenersAdded = newListeners.filterNot(e => 
oldListenerMap.contains(e.listenerName))
+    val listenersRemoved = oldListeners.filterNot(e => 
newListenerMap.contains(ListenerName.normalised(e.listener)))
+    val listenersAdded = newListeners.filterNot(e => 
oldListenerMap.contains(ListenerName.normalised(e.listener)))
     if (listenersRemoved.nonEmpty || listenersAdded.nonEmpty) {
       LoginManager.closeAll() // Clear SASL login cache to force re-login
       if (listenersRemoved.nonEmpty) 
server.socketServer.removeListeners(listenersRemoved)
@@ -997,8 +997,8 @@ class DynamicListenerConfig(server: KafkaBroker) extends 
BrokerReconfigurable wi
     }
   }
 
-  private def listenersToMap(listeners: Seq[EndPoint]): Map[ListenerName, 
EndPoint] =
-    listeners.map(e => (e.listenerName, e)).toMap
+  private def listenersToMap(listeners: Seq[Endpoint]): Map[ListenerName, 
Endpoint] =
+    listeners.map(e => (ListenerName.normalised(e.listener), e)).toMap
 
 }
 
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index b982e72e141..53c70f4737a 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
 import java.util.Properties
 import kafka.utils.{CoreUtils, Logging}
 import kafka.utils.Implicits._
-import org.apache.kafka.common.Reconfigurable
+import org.apache.kafka.common.{Endpoint, KafkaException, Reconfigurable}
 import org.apache.kafka.common.config.{ConfigDef, ConfigException, 
ConfigResource, TopicConfig}
 import org.apache.kafka.common.config.ConfigDef.ConfigKey
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
@@ -34,7 +34,6 @@ import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.security.auth.KafkaPrincipalSerde
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.network.EndPoint
 import org.apache.kafka.coordinator.group.Group.GroupType
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
 import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
@@ -145,6 +144,14 @@ object KafkaConfig {
     }
     output
   }
+
+  private def parseListenerName(connectionString: String): String = {
+    val firstColon = connectionString.indexOf(':')
+    if (firstColon < 0) {
+      throw new KafkaException(s"Unable to parse a listener name from 
$connectionString")
+    }
+    connectionString.substring(0, firstColon).toUpperCase(util.Locale.ROOT)
+  }
 }
 
 /**
@@ -276,8 +283,8 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
   }
 
   val earlyStartListeners: Set[ListenerName] = {
-    val listenersSet = listeners.map(_.listenerName).toSet
-    val controllerListenersSet = controllerListeners.map(_.listenerName).toSet
+    val listenersSet = listeners.map(l => 
ListenerName.normalised(l.listener)).toSet
+    val controllerListenersSet = controllerListeners.map(l => 
ListenerName.normalised(l.listener)).toSet
     Option(getString(ServerConfigs.EARLY_START_LISTENERS_CONFIG)) match {
       case None => controllerListenersSet
       case Some(str) =>
@@ -459,7 +466,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
     }
   }
 
-  def listeners: Seq[EndPoint] =
+  def listeners: Seq[Endpoint] =
     
CoreUtils.listenerListToEndPoints(getString(SocketServerConfigs.LISTENERS_CONFIG),
 effectiveListenerSecurityProtocolMap)
 
   def controllerListenerNames: Seq[String] = {
@@ -471,23 +478,23 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
     }
   }
 
-  def controllerListeners: Seq[EndPoint] =
-    listeners.filter(l => 
controllerListenerNames.contains(l.listenerName.value()))
+  def controllerListeners: Seq[Endpoint] =
+    listeners.filter(l => controllerListenerNames.contains(l.listener))
 
   def saslMechanismControllerProtocol: String = 
getString(KRaftConfigs.SASL_MECHANISM_CONTROLLER_PROTOCOL_CONFIG)
 
-  def dataPlaneListeners: Seq[EndPoint] = {
+  def dataPlaneListeners: Seq[Endpoint] = {
     listeners.filterNot { listener =>
-      val name = listener.listenerName.value()
+      val name = listener.listener
         controllerListenerNames.contains(name)
     }
   }
 
-  def effectiveAdvertisedControllerListeners: Seq[EndPoint] = {
+  def effectiveAdvertisedControllerListeners: Seq[Endpoint] = {
     val advertisedListenersProp = 
getString(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
     val controllerAdvertisedListeners = if (advertisedListenersProp != null) {
       CoreUtils.listenerListToEndPoints(advertisedListenersProp, 
effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
-        .filter(l => controllerListenerNames.contains(l.listenerName.value()))
+        .filter(l => controllerListenerNames.contains(l.listener))
     } else {
       Seq.empty
     }
@@ -495,16 +502,16 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
 
     controllerListenerNames.flatMap { name =>
       controllerAdvertisedListeners
-        .find(endpoint => 
endpoint.listenerName.equals(ListenerName.normalised(name)))
+        .find(endpoint => 
ListenerName.normalised(endpoint.listener).equals(ListenerName.normalised(name)))
         .orElse(
           // If users don't define advertised.listeners, the advertised 
controller listeners inherit from listeners configuration
           // which match listener names in controller.listener.names.
           // Removing "0.0.0.0" host to avoid validation errors. This is to be 
compatible with the old behavior before 3.9.
           // The null or "" host does a reverse lookup in 
ListenerInfo#withWildcardHostnamesResolved.
           controllerListenersValue
-            .find(endpoint => 
endpoint.listenerName.equals(ListenerName.normalised(name)))
+            .find(endpoint => 
ListenerName.normalised(endpoint.listener).equals(ListenerName.normalised(name)))
             .map(endpoint => if (endpoint.host == "0.0.0.0") {
-              new EndPoint(null, endpoint.port, endpoint.listenerName, 
endpoint.securityProtocol)
+              new Endpoint(endpoint.listener, endpoint.securityProtocol, null, 
endpoint.port)
             } else {
               endpoint
             })
@@ -512,7 +519,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
     }
   }
 
-  def effectiveAdvertisedBrokerListeners: Seq[EndPoint] = {
+  def effectiveAdvertisedBrokerListeners: Seq[Endpoint] = {
     // Use advertised listeners if defined, fallback to listeners otherwise
     val advertisedListenersProp = 
getString(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
     val advertisedListeners = if (advertisedListenersProp != null) {
@@ -521,7 +528,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
       listeners
     }
     // Only expose broker listeners
-    advertisedListeners.filterNot(l => 
controllerListenerNames.contains(l.listenerName.value()))
+    advertisedListeners.filterNot(l => 
controllerListenerNames.contains(l.listener))
   }
 
   private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName, 
SecurityProtocol) = {
@@ -563,7 +570,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
       // check controller listener names (they won't appear in listeners when 
process.roles=broker)
       // as well as listeners for occurrences of SSL or SASL_*
       if (controllerListenerNames.exists(isSslOrSasl) ||
-        
Csv.parseCsvList(getString(SocketServerConfigs.LISTENERS_CONFIG)).asScala.exists(listenerValue
 => isSslOrSasl(EndPoint.parseListenerName(listenerValue)))) {
+        
Csv.parseCsvList(getString(SocketServerConfigs.LISTENERS_CONFIG)).asScala.exists(listenerValue
 => isSslOrSasl(KafkaConfig.parseListenerName(listenerValue)))) {
         mapValue // don't add default mappings since we found something that 
is SSL or SASL_*
       } else {
         // add the PLAINTEXT mappings for all controller listener names that 
are not explicitly PLAINTEXT
@@ -591,7 +598,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
     require(replicaFetchWaitMaxMs <= replicaLagTimeMaxMs, 
"replica.fetch.wait.max.ms should always be less than or equal to 
replica.lag.time.max.ms" +
       " to prevent frequent changes in ISR")
 
-    val advertisedBrokerListenerNames = 
effectiveAdvertisedBrokerListeners.map(_.listenerName).toSet
+    val advertisedBrokerListenerNames = 
effectiveAdvertisedBrokerListeners.map(l => 
ListenerName.normalised(l.listener)).toSet
 
     // validate KRaft-related configs
     val voterIds = QuorumConfig.parseVoterIds(quorumConfig.voters)
@@ -614,7 +621,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
         s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must contain at 
least one value appearing in the '${SocketServerConfigs.LISTENERS_CONFIG}' 
configuration when running the KRaft controller role")
     }
     def 
validateControllerListenerNamesMustAppearInListenersForKRaftController(): Unit 
= {
-      val listenerNameValues = listeners.map(_.listenerName.value).toSet
+      val listenerNameValues = listeners.map(_.listener).toSet
       require(controllerListenerNames.forall(cln => 
listenerNameValues.contains(cln)),
         s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must only contain 
values appearing in the '${SocketServerConfigs.LISTENERS_CONFIG}' configuration 
when running the KRaft controller role")
     }
@@ -681,7 +688,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
util.Map[_, _])
       validateControllerListenerNamesMustAppearInListenersForKRaftController()
     }
 
-    val listenerNames = listeners.map(_.listenerName).toSet
+    val listenerNames = listeners.map(l => 
ListenerName.normalised(l.listener)).toSet
     if (processRoles.contains(ProcessRole.BrokerRole)) {
       validateAdvertisedBrokerListenersNonEmptyForBroker()
       require(advertisedBrokerListenerNames.contains(interBrokerListenerName),
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala 
b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index c07538aadad..a47b9fd4d47 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -30,6 +30,7 @@ import 
org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing
 import org.apache.kafka.common.metrics.stats.{Meter, Percentile, Percentiles}
+import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.{ObjectSerializationCache, Writable}
 import org.apache.kafka.common.security.scram.internals.ScramMechanism
 import 
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
@@ -88,7 +89,7 @@ class TestRaftServer(
     val endpoints = Endpoints.fromInetSocketAddresses(
       config.effectiveAdvertisedControllerListeners
         .map { endpoint =>
-          (endpoint.listenerName, 
InetSocketAddress.createUnresolved(endpoint.host, endpoint.port))
+          (ListenerName.normalised(endpoint.listener), 
InetSocketAddress.createUnresolved(endpoint.host, endpoint.port))
         }
         .toMap
         .asJava
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala 
b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 98a06f80f46..d41c796374b 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -25,8 +25,8 @@ import com.typesafe.scalalogging.Logger
 import javax.management.ObjectName
 import scala.collection._
 import scala.collection.Seq
-import org.apache.kafka.network.EndPoint
 import org.apache.commons.validator.routines.InetAddressValidator
+import org.apache.kafka.common.Endpoint
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.security.auth.SecurityProtocol
 import org.apache.kafka.common.utils.Utils
@@ -122,22 +122,22 @@ object CoreUtils {
 
   def inWriteLock[T](lock: ReadWriteLock)(fun: => T): T = 
inLock[T](lock.writeLock)(fun)
 
-  def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol]): Seq[EndPoint] = {
+  def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol]): Seq[Endpoint] = {
     listenerListToEndPoints(listeners, securityProtocolMap, 
requireDistinctPorts = true)
   }
 
-  private def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners: 
String): Unit = {
+  private def checkDuplicateListenerPorts(endpoints: Seq[Endpoint], listeners: 
String): Unit = {
     val distinctPorts = endpoints.map(_.port).distinct
     require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener 
must have a different port, listeners: $listeners")
   }
 
-  def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[EndPoint] = {
+  def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean): 
Seq[Endpoint] = {
     def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
       (inetAddressValidator.isValidInet4Address(first) && 
inetAddressValidator.isValidInet6Address(second)) ||
         (inetAddressValidator.isValidInet6Address(first) && 
inetAddressValidator.isValidInet4Address(second))
 
-    def validate(endPoints: Seq[EndPoint]): Unit = {
-      val distinctListenerNames = endPoints.map(_.listenerName).distinct
+    def validate(endPoints: Seq[Endpoint]): Unit = {
+      val distinctListenerNames = endPoints.map(_.listener).distinct
       require(distinctListenerNames.size == endPoints.size, s"Each listener 
must have a different name, listeners: $listeners")
 
       val (duplicatePorts, _) = endPoints.filter {
@@ -186,8 +186,7 @@ object CoreUtils {
     }
 
     val endPoints = try {
-      SocketServerConfigs.listenerListToEndPoints(listeners, 
securityProtocolMap.asJava).
-        asScala.map(EndPoint.fromPublic)
+      SocketServerConfigs.listenerListToEndPoints(listeners, 
securityProtocolMap.asJava).asScala
     } catch {
       case e: Exception =>
         throw new IllegalArgumentException(s"Error creating broker listeners 
from '$listeners': ${e.getMessage}", e)
diff --git 
a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
index 4b1b94336f7..7263fa82847 100644
--- a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
@@ -353,7 +353,7 @@ class SslAdminIntegrationTest extends 
SaslSslAdminIntegrationTest {
     val controllerListenerName = 
ListenerName.forSecurityProtocol(extraControllerSecurityProtocol)
     val config = controllerServers.map { s =>
       val listener = s.config.effectiveAdvertisedControllerListeners
-        .find(_.listenerName == controllerListenerName)
+        .find(_.listener == controllerListenerName.value)
         .getOrElse(throw new IllegalArgumentException(s"Could not find 
listener with name $controllerListenerName"))
       Utils.formatAddress(listener.host, 
s.socketServer.boundPort(controllerListenerName))
     }.mkString(",")
diff --git 
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala 
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index ac227893d9f..234518ea83a 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -113,7 +113,7 @@ class KRaftClusterTest {
       cluster.format()
       cluster.startup()
       val controller = 
cluster.controllers().values().iterator().asScala.filter(_.controller.isActive).next()
-      val port = 
controller.socketServer.boundPort(controller.config.controllerListeners.head.listenerName)
+      val port = 
controller.socketServer.boundPort(ListenerName.normalised(controller.config.controllerListeners.head.listener))
 
       // shutdown active controller
       controller.shutdown()
diff --git 
a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
 
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
index bcde663eb52..55e5e3e7d92 100644
--- 
a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
+++ 
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
@@ -132,7 +132,7 @@ abstract class 
MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
     TestUtils.ensureConsistentKRaftMetadata(servers, controllerServer)
 
     servers.head.config.listeners.foreach { endPoint =>
-      val listenerName = endPoint.listenerName
+      val listenerName = ListenerName.normalised(endPoint.listener)
 
       val trustStoreFile =
         if (JaasTestUtils.usesSslTransportLayer(endPoint.securityProtocol)) 
Some(this.trustStoreFile)
@@ -155,7 +155,7 @@ abstract class 
MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
       }
 
       if (JaasTestUtils.usesSaslAuthentication(endPoint.securityProtocol)) {
-        kafkaServerSaslMechanisms(endPoint.listenerName.value).foreach { 
mechanism =>
+        kafkaServerSaslMechanisms(endPoint.listener).foreach { mechanism =>
           addProducerConsumer(listenerName, mechanism, 
Some(kafkaClientSaslProperties(mechanism, dynamicJaasConfig = true)))
         }
       } else {
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala 
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 1d793e726b8..ac1a98e7fc9 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -22,6 +22,7 @@ import com.yammer.metrics.core.{Gauge, Meter}
 import kafka.server._
 import kafka.utils.Implicits._
 import kafka.utils.TestUtils
+import org.apache.kafka.common.Endpoint
 import org.apache.kafka.common.memory.MemoryPool
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
 import org.apache.kafka.common.message.{ProduceRequestData, 
SaslAuthenticateRequestData, SaslHandshakeRequestData, VoteRequestData}
@@ -36,7 +37,6 @@ import 
org.apache.kafka.common.security.scram.internals.ScramMechanism
 import org.apache.kafka.common.utils._
 import org.apache.kafka.network.RequestConvertToJson
 import org.apache.kafka.network.SocketServerConfigs
-import org.apache.kafka.network.EndPoint
 import org.apache.kafka.security.CredentialProvider
 import org.apache.kafka.server.{ApiVersionManager, SimpleApiVersionManager}
 import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
@@ -91,10 +91,10 @@ class SocketServerTest {
 
   private val kafkaLogger = LogManager.getLogger("kafka")
   private var logLevelToRestore: Level = _
-  def endpoint: EndPoint = {
+  def endpoint: Endpoint = {
     KafkaConfig.fromProps(props, doLog = false).dataPlaneListeners.head
   }
-  def listener: String = endpoint.listenerName.value
+  def listener: String = endpoint.listener
   val uncaughtExceptions = new AtomicInteger(0)
 
   @BeforeEach
@@ -840,7 +840,7 @@ class SocketServerTest {
 
       // same as SocketServer.createAcceptor,
       // except the Acceptor overriding a method to inject the exception
-      override protected def createDataPlaneAcceptor(endPoint: EndPoint, 
isPrivilegedListener: Boolean, requestChannel: RequestChannel): 
DataPlaneAcceptor = {
+      override protected def createDataPlaneAcceptor(endPoint: Endpoint, 
isPrivilegedListener: Boolean, requestChannel: RequestChannel): 
DataPlaneAcceptor = {
 
         new DataPlaneAcceptor(this, endPoint, this.config, nodeId, 
connectionQuotas, time, false, requestChannel, serverMetrics, 
this.credentialProvider, new LogContext(), MemoryPool.NONE, 
this.apiVersionManager) {
           override protected def configureAcceptedSocketChannel(socketChannel: 
SocketChannel): Unit = {
@@ -1858,7 +1858,7 @@ class SocketServerTest {
       val failedFuture = new CompletableFuture[Void]()
       failedFuture.completeExceptionally(new RuntimeException("authorizer 
startup failed"))
       assertThrows(classOf[ExecutionException], () => {
-        newServer.enableRequestProcessing(Map(endpoint.toPublic -> 
failedFuture)).get()
+        newServer.enableRequestProcessing(Map(endpoint -> failedFuture)).get()
       })
     } finally {
       shutdownServerAndMetrics(newServer)
@@ -1891,7 +1891,7 @@ class SocketServerTest {
       val authorizerFuture = new CompletableFuture[Void]()
       val enableFuture = newServer.enableRequestProcessing(
         newServer.dataPlaneAcceptors.keys().asScala.
-          map(_.toPublic).map(k => k -> authorizerFuture).toMap)
+          map(k => k -> authorizerFuture).toMap)
       assertFalse(authorizerFuture.isDone)
       assertFalse(enableFuture.isDone)
       newServer.dataPlaneAcceptors.values().forEach(a => 
assertNull(a.serverChannel))
@@ -1992,7 +1992,7 @@ class SocketServerTest {
   }
 
   class TestableAcceptor(socketServer: SocketServer,
-                         endPoint: EndPoint,
+                         endPoint: Endpoint,
                          cfg: KafkaConfig,
                          nodeId: Int,
                          connectionQuotas: ConnectionQuotas,
@@ -2098,7 +2098,7 @@ class SocketServerTest {
     connectionDisconnectListeners = connectionDisconnectListeners
   ) {
 
-    override def createDataPlaneAcceptor(endPoint: EndPoint, 
isPrivilegedListener: Boolean, requestChannel: RequestChannel) : 
DataPlaneAcceptor = {
+    override def createDataPlaneAcceptor(endPoint: Endpoint, 
isPrivilegedListener: Boolean, requestChannel: RequestChannel) : 
DataPlaneAcceptor = {
       new TestableAcceptor(this, endPoint, this.config, 0, connectionQuotas, 
time, isPrivilegedListener, requestChannel, this.metrics, 
this.credentialProvider, new LogContext, MemoryPool.NONE, 
this.apiVersionManager, connectionQueueSize)
     }
 
diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala 
b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
index 3c816f635db..38187b11eb8 100644
--- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
@@ -28,6 +28,7 @@ import kafka.utils.TestUtils
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.Uuid
 import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.network.SocketServerConfigs
 import org.apache.kafka.raft.{Endpoints, MetadataLogConfig, QuorumConfig}
@@ -88,7 +89,7 @@ class RaftManagerTest {
     val endpoints = Endpoints.fromInetSocketAddresses(
       config.effectiveAdvertisedControllerListeners
         .map { endpoint =>
-          (endpoint.listenerName, 
InetSocketAddress.createUnresolved(endpoint.host, endpoint.port))
+          (ListenerName.normalised(endpoint.listener), 
InetSocketAddress.createUnresolved(endpoint.host, endpoint.port))
         }
         .toMap
         .asJava
diff --git 
a/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala 
b/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala
index 68f775fb3e7..46ea20758e2 100644
--- 
a/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala
@@ -76,7 +76,7 @@ class ControllerRegistrationManagerTest {
       "controller-registration-manager-test-",
       createSupportedFeatures(MetadataVersion.IBP_3_7_IV0),
       RecordTestUtils.createTestControllerRegistration(1, 
false).incarnationId(),
-      
ListenerInfo.create(context.config.controllerListeners.map(_.toPublic).asJava),
+      ListenerInfo.create(context.config.controllerListeners.asJava),
       new ExponentialBackoff(1, 2, 100, 0.02))
   }
 
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 17ad2200dcc..5f87b20d1f5 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -1036,7 +1036,7 @@ class DynamicBrokerConfigTest {
     props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
"SASL_PLAINTEXT://localhost:8181")
     ctx.config.dynamicConfig.updateDefaultConfig(props)
     ctx.config.effectiveAdvertisedBrokerListeners.foreach(e =>
-      assertEquals(SecurityProtocol.PLAINTEXT.name, e.listenerName.value)
+      assertEquals(SecurityProtocol.PLAINTEXT.name, e.listener)
     )
     
assertFalse(ctx.currentDefaultLogConfig.get().originals().containsKey(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG))
   }
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala 
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 517741cf2d8..55c88666c70 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -22,7 +22,7 @@ import java.util
 import java.util.{Arrays, Collections, Properties}
 import kafka.utils.TestUtils.assertBadConfigContainingMessage
 import kafka.utils.{CoreUtils, TestUtils}
-import org.apache.kafka.common.Node
+import org.apache.kafka.common.{Endpoint, Node}
 import org.apache.kafka.common.config.{ConfigException, SaslConfigs, 
SecurityConfig, SslConfigs, TopicConfig}
 import org.apache.kafka.common.metrics.Sensor
 import org.apache.kafka.common.network.ListenerName
@@ -35,7 +35,6 @@ import 
org.apache.kafka.coordinator.group.GroupCoordinatorConfig
 import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
 import org.apache.kafka.coordinator.transaction.{TransactionLogConfig, 
TransactionStateManagerConfig}
 import org.apache.kafka.network.SocketServerConfigs
-import org.apache.kafka.network.EndPoint
 import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig}
 import org.apache.kafka.server.config.{DelegationTokenManagerConfigs, 
KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs, 
ServerTopicConfigSynonyms}
 import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
@@ -343,7 +342,7 @@ class KafkaConfigTest {
 
     val config = KafkaConfig.fromProps(props)
     assertEquals(
-      Seq(new EndPoint("lb1.example.com", 9000, 
ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT)),
+      Seq(new Endpoint("CONTROLLER", SecurityProtocol.PLAINTEXT, 
"lb1.example.com", 9000)),
       config.effectiveAdvertisedControllerListeners
     )
   }
@@ -359,7 +358,7 @@ class KafkaConfigTest {
 
     val config = KafkaConfig.fromProps(props)
     assertEquals(
-      Seq(new EndPoint("localhost", 9093, 
ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT)),
+      Seq(new Endpoint("CONTROLLER", SecurityProtocol.PLAINTEXT, "localhost", 
9093)),
       config.effectiveAdvertisedControllerListeners
     )
   }
@@ -377,8 +376,8 @@ class KafkaConfigTest {
     val config = KafkaConfig.fromProps(props)
     assertEquals(
       Seq(
-        new EndPoint("lb1.example.com", 9000, 
ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT),
-        new EndPoint("localhost", 9094, 
ListenerName.normalised("CONTROLLER_NEW"), SecurityProtocol.PLAINTEXT)
+        new Endpoint("CONTROLLER", SecurityProtocol.PLAINTEXT, 
"lb1.example.com", 9000),
+        new Endpoint("CONTROLLER_NEW", SecurityProtocol.PLAINTEXT, 
"localhost", 9094)
       ),
       config.effectiveAdvertisedControllerListeners
     )
@@ -507,9 +506,9 @@ class KafkaConfigTest {
     props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG, 
"REPLICATION")
     val config = KafkaConfig.fromProps(props)
     val expectedListeners = Seq(
-      new EndPoint("localhost", 9091, new ListenerName("CLIENT"), 
SecurityProtocol.SSL),
-      new EndPoint("localhost", 9092, new ListenerName("REPLICATION"), 
SecurityProtocol.SSL),
-      new EndPoint("localhost", 9093, new ListenerName("INTERNAL"), 
SecurityProtocol.PLAINTEXT))
+      new Endpoint("CLIENT", SecurityProtocol.SSL, "localhost", 9091),
+      new Endpoint("REPLICATION", SecurityProtocol.SSL, "localhost", 9092),
+      new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9093))
     assertEquals(expectedListeners, config.listeners)
     assertEquals(expectedListeners, config.effectiveAdvertisedBrokerListeners)
     val expectedSecurityProtocolMap = Map(
@@ -536,14 +535,14 @@ class KafkaConfigTest {
     val config = KafkaConfig.fromProps(props)
 
     val expectedListeners = Seq(
-      new EndPoint("localhost", 9091, new ListenerName("EXTERNAL"), 
SecurityProtocol.SSL),
-      new EndPoint("localhost", 9093, new ListenerName("INTERNAL"), 
SecurityProtocol.PLAINTEXT)
+      new Endpoint("EXTERNAL", SecurityProtocol.SSL, "localhost", 9091),
+      new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9093)
     )
     assertEquals(expectedListeners, config.listeners)
 
     val expectedAdvertisedListeners = Seq(
-      new EndPoint("lb1.example.com", 9000, new ListenerName("EXTERNAL"), 
SecurityProtocol.SSL),
-      new EndPoint("host1", 9093, new ListenerName("INTERNAL"), 
SecurityProtocol.PLAINTEXT)
+      new Endpoint("EXTERNAL", SecurityProtocol.SSL, "lb1.example.com", 9000),
+      new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "host1", 9093)
     )
     assertEquals(expectedAdvertisedListeners, 
config.effectiveAdvertisedBrokerListeners)
 
@@ -593,8 +592,8 @@ class KafkaConfigTest {
     props.setProperty(SocketServerConfigs.LISTENERS_CONFIG, 
"plaintext://localhost:9091,SsL://localhost:9092")
     
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG, 
"PLAINTEXT:PLAINTEXT,SSL:SSL,CONTROLLER:PLAINTEXT")
     val config = KafkaConfig.fromProps(props)
-    assertEquals(Some("SSL://localhost:9092"), 
config.listeners.find(_.listenerName.value == 
"SSL").map(JTestUtils.endpointToString))
-    assertEquals(Some("PLAINTEXT://localhost:9091"), 
config.listeners.find(_.listenerName.value == 
"PLAINTEXT").map(JTestUtils.endpointToString))
+    assertEquals(Some("SSL://localhost:9092"), 
config.listeners.find(_.listener == "SSL").map(JTestUtils.endpointToString))
+    assertEquals(Some("PLAINTEXT://localhost:9091"), 
config.listeners.find(_.listener == 
"PLAINTEXT").map(JTestUtils.endpointToString))
   }
 
   private def listenerListToEndPoints(listenerList: String,
diff --git 
a/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala 
b/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala
index 9bf4d4d7e00..dd5968055e0 100644
--- a/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala
+++ b/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala
@@ -66,7 +66,7 @@ class RegistrationTestContext(
   val controllerEpoch = new AtomicInteger(123)
   config.effectiveAdvertisedBrokerListeners.foreach { ep =>
     advertisedListeners.add(new Listener().setHost(ep.host).
-      setName(ep.listenerName.value()).
+      setName(ep.listener).
       setPort(ep.port.shortValue()).
       setSecurityProtocol(ep.securityProtocol.id))
   }
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 8281509f609..f237ee3a339 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -181,7 +181,7 @@ object TestUtils extends Logging {
     listenerName: ListenerName
   ): String = {
     brokers.map { s =>
-      val listener = 
s.config.effectiveAdvertisedBrokerListeners.find(_.listenerName == 
listenerName).getOrElse(
+      val listener = 
s.config.effectiveAdvertisedBrokerListeners.find(_.listener == 
listenerName.value).getOrElse(
         sys.error(s"Could not find listener with name ${listenerName.value}"))
       formatAddress(listener.host, s.boundPort(listenerName))
     }.mkString(",")
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java 
b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
index 4f5e9dcca2d..56b2bdf375e 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
@@ -85,7 +85,7 @@ public class BrokerRegistration {
         public Builder setListeners(List<Endpoint> listeners) {
             Map<String, Endpoint> listenersMap = new HashMap<>();
             for (Endpoint endpoint : listeners) {
-                listenersMap.put(endpoint.listenerName().get(), endpoint);
+                listenersMap.put(endpoint.listener(), endpoint);
             }
             this.listeners = listenersMap;
             return this;
@@ -170,7 +170,7 @@ public class BrokerRegistration {
         this.incarnationId = incarnationId;
         Map<String, Endpoint> newListeners = new HashMap<>(listeners.size());
         for (Entry<String, Endpoint> entry : listeners.entrySet()) {
-            if (entry.getValue().listenerName().isEmpty()) {
+            if (entry.getValue().listener().isEmpty()) {
                 throw new IllegalArgumentException("Broker listeners must be 
named.");
             }
             newListeners.put(entry.getKey(), entry.getValue());
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/ListenerInfo.java 
b/metadata/src/main/java/org/apache/kafka/metadata/ListenerInfo.java
index 294e8b0cbd9..58fe7bc0324 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/ListenerInfo.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/ListenerInfo.java
@@ -188,14 +188,14 @@ public final class ListenerInfo {
     ) {
         LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
         for (Endpoint listener : rawListeners) {
-            String name = listener.listenerName().get();
+            String name = listener.listener();
             if (Optional.of(name).equals(firstListenerName)) {
                 listeners.put(name, listener);
                 break;
             }
         }
         for (Endpoint listener : rawListeners) {
-            String name = listener.listenerName().get();
+            String name = listener.listener();
             if (!Optional.of(name).equals(firstListenerName)) {
                 listeners.put(name, listener);
             }
@@ -236,11 +236,11 @@ public final class ListenerInfo {
             if (entry.getValue().host() == null || 
entry.getValue().host().trim().isEmpty()) {
                 String newHost = 
InetAddress.getLocalHost().getCanonicalHostName();
                 Endpoint prevEndpoint = entry.getValue();
-                newListeners.put(entry.getKey(), new 
Endpoint(prevEndpoint.listenerName().get(),
+                newListeners.put(entry.getKey(), new 
Endpoint(prevEndpoint.listener(),
                         prevEndpoint.securityProtocol(),
                         newHost,
                         prevEndpoint.port()));
-                log.info("{}: resolved wildcard host to {}", 
entry.getValue().listenerName().get(),
+                log.info("{}: resolved wildcard host to {}", 
entry.getValue().listener(),
                         newHost);
             } else {
                 newListeners.put(entry.getKey(), entry.getValue());
@@ -268,9 +268,9 @@ public final class ListenerInfo {
                 Endpoint prevEndpoint = entry.getValue();
                 int newPort = getBoundPortCallback.apply(entry.getKey());
                 checkPortIsSerializable(newPort);
-                log.info("{}: resolved ephemeral port to {}", 
entry.getValue().listenerName().get(),
+                log.info("{}: resolved ephemeral port to {}", 
entry.getValue().listener(),
                         newPort);
-                newListeners.put(entry.getKey(), new 
Endpoint(prevEndpoint.listenerName().get(),
+                newListeners.put(entry.getKey(), new 
Endpoint(prevEndpoint.listener(),
                         prevEndpoint.securityProtocol(),
                         prevEndpoint.host(),
                         newPort));
@@ -309,7 +309,7 @@ public final class ListenerInfo {
             checkHostIsSerializable(endpoint.host());
             collection.add(new ControllerRegistrationRequestData.Listener().
                 setHost(endpoint.host()).
-                setName(endpoint.listenerName().get()).
+                setName(endpoint.listener()).
                 setPort(endpoint.port()).
                 setSecurityProtocol(endpoint.securityProtocol().id));
         });
@@ -324,7 +324,7 @@ public final class ListenerInfo {
             checkHostIsSerializable(endpoint.host());
             collection.add(new RegisterControllerRecord.ControllerEndpoint().
                 setHost(endpoint.host()).
-                setName(endpoint.listenerName().get()).
+                setName(endpoint.listener()).
                 setPort(endpoint.port()).
                 setSecurityProtocol(endpoint.securityProtocol().id));
         });
@@ -339,7 +339,7 @@ public final class ListenerInfo {
             checkHostIsSerializable(endpoint.host());
             collection.add(new BrokerRegistrationRequestData.Listener().
                 setHost(endpoint.host()).
-                setName(endpoint.listenerName().get()).
+                setName(endpoint.listener()).
                 setPort(endpoint.port()).
                 setSecurityProtocol(endpoint.securityProtocol().id));
         });
@@ -354,7 +354,7 @@ public final class ListenerInfo {
             checkHostIsSerializable(endpoint.host());
             collection.add(new RegisterBrokerRecord.BrokerEndpoint().
                 setHost(endpoint.host()).
-                setName(endpoint.listenerName().get()).
+                setName(endpoint.listener()).
                 setPort(endpoint.port()).
                 setSecurityProtocol(endpoint.securityProtocol().id));
         });
diff --git 
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
 
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
index 474c593bb67..2d97683e14e 100644
--- 
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
+++ 
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
@@ -132,7 +132,7 @@ public class StandardAuthorizer implements 
ClusterMetadataAuthorizer, Monitorabl
         Map<Endpoint, CompletableFuture<Void>> result = new HashMap<>();
         for (Endpoint endpoint : serverInfo.endpoints()) {
             if (serverInfo.earlyStartListeners().contains(
-                    endpoint.listenerName().orElse(""))) {
+                    endpoint.listener())) {
                 result.put(endpoint, CompletableFuture.completedFuture(null));
             } else {
                 result.put(endpoint, initialLoadFuture);
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/ListenerInfoTest.java 
b/metadata/src/test/java/org/apache/kafka/metadata/ListenerInfoTest.java
index effde563917..24f8ebfd292 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/ListenerInfoTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/ListenerInfoTest.java
@@ -27,6 +27,7 @@ import org.junit.jupiter.params.provider.ValueSource;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -93,16 +94,16 @@ public class ListenerInfoTest {
             endpoints.add(ALL.get((i + startIndex) % ALL.size()));
         }
         ListenerInfo listenerInfo = ListenerInfo.create(endpoints);
-        assertEquals(ALL.get(startIndex).listenerName().get(),
-            listenerInfo.firstListener().listenerName().get());
+        assertEquals(ALL.get(startIndex).listener(),
+            listenerInfo.firstListener().listener());
     }
 
     @ParameterizedTest
     @ValueSource(ints = {0, 1, 2, 3})
     public void testCreateWithExplicitFirstListener(int startIndex) {
-        ListenerInfo listenerInfo = 
ListenerInfo.create(ALL.get(startIndex).listenerName(), ALL);
-        assertEquals(ALL.get(startIndex).listenerName().get(),
-            listenerInfo.firstListener().listenerName().get());
+        ListenerInfo listenerInfo = 
ListenerInfo.create(Optional.of(ALL.get(startIndex).listener()), ALL);
+        assertEquals(ALL.get(startIndex).listener(),
+            listenerInfo.firstListener().listener());
     }
 
     @Test
diff --git 
a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
 
b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
index b383d568c0e..b2f9fa68b3a 100644
--- 
a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
+++ 
b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
@@ -125,8 +125,8 @@ public class StandardAuthorizerTest {
         public Collection<String> earlyStartListeners() {
             List<String> result = new ArrayList<>();
             for (Endpoint endpoint : endpoints) {
-                if (endpoint.listenerName().get().equals("CONTROLLER")) {
-                    result.add(endpoint.listenerName().get());
+                if (endpoint.listener().equals("CONTROLLER")) {
+                    result.add(endpoint.listener());
                 }
             }
             return result;
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/network/EndpointReadyFutures.java
 
b/server-common/src/main/java/org/apache/kafka/server/network/EndpointReadyFutures.java
index b81b9b191f9..b02352cca04 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/network/EndpointReadyFutures.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/network/EndpointReadyFutures.java
@@ -113,7 +113,7 @@ public class EndpointReadyFutures {
                 List<String> notInInfo = new ArrayList<>();
                 for (Endpoint endpoint : effectiveStartFutures.keySet()) {
                     if (!info.endpoints().contains(endpoint)) {
-                        
notInInfo.add(endpoint.listenerName().orElse("[none]"));
+                        notInInfo.add(endpoint.listener());
                     }
                 }
                 throw new RuntimeException("Found authorizer futures that 
weren't included " +
@@ -146,7 +146,7 @@ public class EndpointReadyFutures {
         final CompletableFuture<Void> future;
 
         EndpointReadyFuture(Endpoint endpoint, Collection<String> stageNames) {
-            this.endpointName = endpoint.listenerName().orElse("UNNAMED");
+            this.endpointName = endpoint.listener();
             this.incomplete = new TreeSet<>(stageNames);
             this.future = new CompletableFuture<>();
         }
diff --git 
a/server-common/src/test/java/org/apache/kafka/server/network/EndpointReadyFuturesTest.java
 
b/server-common/src/test/java/org/apache/kafka/server/network/EndpointReadyFuturesTest.java
index 649fa9cd64b..8ec3711ff8c 100644
--- 
a/server-common/src/test/java/org/apache/kafka/server/network/EndpointReadyFuturesTest.java
+++ 
b/server-common/src/test/java/org/apache/kafka/server/network/EndpointReadyFuturesTest.java
@@ -56,7 +56,7 @@ public final class EndpointReadyFuturesTest {
             Endpoint... endpoints
     ) {
         for (Endpoint endpoint : endpoints) {
-            String name = endpoint.listenerName().get();
+            String name = endpoint.listener();
             CompletableFuture<Void> future = 
readyFutures.futures().get(endpoint);
             assertNotNull(future, "Unable to find future for " + name);
             assertTrue(future.isDone(), "Future for " + name + " is not 
done.");
diff --git a/server/src/main/java/org/apache/kafka/network/EndPoint.java 
b/server/src/main/java/org/apache/kafka/network/EndPoint.java
deleted file mode 100644
index ab1a0e81d6b..00000000000
--- a/server/src/main/java/org/apache/kafka/network/EndPoint.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.network;
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.network.ListenerName;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
-
-import java.util.Locale;
-
-public record EndPoint(
-        String host,
-        int port,
-        ListenerName listenerName,
-        SecurityProtocol securityProtocol
-) {
-    public static String parseListenerName(String connectionString) {
-        int firstColon = connectionString.indexOf(':');
-        if (firstColon < 0) {
-            throw new KafkaException("Unable to parse a listener name from " + 
connectionString);
-        }
-        return connectionString.substring(0, 
firstColon).toUpperCase(Locale.ROOT);
-    }
-
-    public static EndPoint fromPublic(org.apache.kafka.common.Endpoint 
endpoint) {
-        return new EndPoint(endpoint.host(), endpoint.port(),
-                new ListenerName(endpoint.listenerName().get()), 
endpoint.securityProtocol());
-    }
-
-    public org.apache.kafka.common.Endpoint toPublic() {
-        return new org.apache.kafka.common.Endpoint(listenerName.value(), 
securityProtocol, host, port);
-    }
-}
\ No newline at end of file
diff --git 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
index 041b24780c1..e2361749318 100644
--- 
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
+++ 
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
@@ -18,13 +18,13 @@ package org.apache.kafka.common.test;
 
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.Endpoint;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.network.ListenerName;
 import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.network.EndPoint;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -106,10 +106,10 @@ public class TestUtils {
     /**
      * Convert EndPoint to String
      */
-    public static String endpointToString(EndPoint endPoint) {
+    public static String endpointToString(Endpoint endPoint) {
         String host = endPoint.host();
         int port = endPoint.port();
-        ListenerName listenerName = endPoint.listenerName();
+        ListenerName listenerName = 
ListenerName.normalised(endPoint.listener());
 
         String hostport = (host == null) ? (":" + port) : 
Utils.formatAddress(host, port);
         return listenerName.value() + "://" + hostport;
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java 
b/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
index a286524af4d..668f1c172fa 100644
--- a/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
@@ -395,10 +395,10 @@ public class MetadataQuorumCommand {
         Map<String, Endpoint> listeners = new HashMap<>();
         SocketServerConfigs.listenerListToEndPoints(
             props.getOrDefault(SocketServerConfigs.LISTENERS_CONFIG, 
"").toString(),
-            __ -> SecurityProtocol.PLAINTEXT).forEach(e -> 
listeners.put(e.listenerName().get(), e));
+            __ -> SecurityProtocol.PLAINTEXT).forEach(e -> 
listeners.put(e.listener(), e));
         SocketServerConfigs.listenerListToEndPoints(
             
props.getOrDefault(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG, 
"").toString(),
-            __ -> SecurityProtocol.PLAINTEXT).forEach(e -> 
listeners.put(e.listenerName().get(), e));
+            __ -> SecurityProtocol.PLAINTEXT).forEach(e -> 
listeners.put(e.listener(), e));
         if (!props.containsKey(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG)) 
{
             throw new 
TerseException(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG +
                 " was not found. Is this a valid controller configuration 
file?");
@@ -412,7 +412,7 @@ public class MetadataQuorumCommand {
                 throw new TerseException("Cannot find information about 
controller listener name: " +
                     listenerName);
             }
-            results.add(new RaftVoterEndpoint(endpoint.listenerName().get(),
+            results.add(new RaftVoterEndpoint(endpoint.listener(),
                     endpoint.host() == null ? "localhost" : endpoint.host(),
                     endpoint.port()));
         }
@@ -443,7 +443,7 @@ public class MetadataQuorumCommand {
         output.append(" and endpoints: ");
         String prefix = "";
         for (RaftVoterEndpoint endpoint : endpoints) {
-            output.append(prefix).append(endpoint.name()).append("://");
+            output.append(prefix).append(endpoint.listener()).append("://");
             if (endpoint.host().contains(":")) {
                 output.append("[");
             }

Reply via email to