This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 81881dee838 KAFKA-18760: Deprecate Optional<String> and return String
from public Endpoint#listener (#19191)
81881dee838 is described below
commit 81881dee838f420176ee31a2352bf56811648f74
Author: PoAn Yang <[email protected]>
AuthorDate: Wed Apr 30 12:15:33 2025 +0800
KAFKA-18760: Deprecate Optional<String> and return String from public
Endpoint#listener (#19191)
* Deprecate org.apache.kafka.common.Endpoint#listenerName.
* Add org.apache.kafka.common.Endpoint#listener to replace
org.apache.kafka.common.Endpoint#listenerName.
* Replace org.apache.kafka.network.EndPoint with
org.apache.kafka.common.Endpoint.
* Deprecate org.apache.kafka.clients.admin.RaftVoterEndpoint#name
* Add org.apache.kafka.clients.admin.RaftVoterEndpoint#listener to
replace org.apache.kafka.clients.admin.RaftVoterEndpoint#name
Reviewers: Chia-Ping Tsai <[email protected]>, TaiJuWu
<[email protected]>, Jhen-Yung Hsu <[email protected]>, TengYao
Chi <[email protected]>, Ken Huang <[email protected]>, Bagda
Parth , Kuan-Po Tseng <[email protected]>
---------
Signed-off-by: PoAn Yang <[email protected]>
---
.../kafka/clients/admin/KafkaAdminClient.java | 2 +-
.../kafka/clients/admin/RaftVoterEndpoint.java | 27 ++++++---
.../java/org/apache/kafka/common/Endpoint.java | 25 +++++---
.../main/scala/kafka/network/SocketServer.scala | 67 +++++++++++-----------
.../src/main/scala/kafka/server/BrokerServer.scala | 2 +-
.../main/scala/kafka/server/ControllerServer.scala | 2 +-
.../scala/kafka/server/DynamicBrokerConfig.scala | 16 +++---
core/src/main/scala/kafka/server/KafkaConfig.scala | 47 ++++++++-------
.../main/scala/kafka/tools/TestRaftServer.scala | 3 +-
core/src/main/scala/kafka/utils/CoreUtils.scala | 15 +++--
.../kafka/api/SslAdminIntegrationTest.scala | 2 +-
.../kafka/server/KRaftClusterTest.scala | 2 +-
...ListenersWithSameSecurityProtocolBaseTest.scala | 4 +-
.../unit/kafka/network/SocketServerTest.scala | 16 +++---
.../scala/unit/kafka/raft/RaftManagerTest.scala | 3 +-
.../server/ControllerRegistrationManagerTest.scala | 2 +-
.../kafka/server/DynamicBrokerConfigTest.scala | 2 +-
.../scala/unit/kafka/server/KafkaConfigTest.scala | 29 +++++-----
.../kafka/server/RegistrationTestContext.scala | 2 +-
.../test/scala/unit/kafka/utils/TestUtils.scala | 2 +-
.../apache/kafka/metadata/BrokerRegistration.java | 4 +-
.../org/apache/kafka/metadata/ListenerInfo.java | 20 +++----
.../metadata/authorizer/StandardAuthorizer.java | 2 +-
.../apache/kafka/metadata/ListenerInfoTest.java | 11 ++--
.../authorizer/StandardAuthorizerTest.java | 4 +-
.../kafka/server/network/EndpointReadyFutures.java | 4 +-
.../server/network/EndpointReadyFuturesTest.java | 2 +-
.../java/org/apache/kafka/network/EndPoint.java | 48 ----------------
.../org/apache/kafka/common/test/TestUtils.java | 6 +-
.../apache/kafka/tools/MetadataQuorumCommand.java | 8 +--
30 files changed, 179 insertions(+), 200 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index d4c9ec9beeb..5cb5cc292ea 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -4948,7 +4948,7 @@ public class KafkaAdminClient extends AdminClient {
new AddRaftVoterRequestData.ListenerCollection();
endpoints.forEach(endpoint ->
listeners.add(new AddRaftVoterRequestData.Listener().
- setName(endpoint.name()).
+ setName(endpoint.listener()).
setHost(endpoint.host()).
setPort(endpoint.port())));
return new AddRaftVoterRequest.Builder(
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/RaftVoterEndpoint.java
b/clients/src/main/java/org/apache/kafka/clients/admin/RaftVoterEndpoint.java
index 984ac999393..ba5b39284eb 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/RaftVoterEndpoint.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/RaftVoterEndpoint.java
@@ -26,7 +26,7 @@ import java.util.Objects;
*/
@InterfaceStability.Stable
public class RaftVoterEndpoint {
- private final String name;
+ private final String listener;
private final String host;
private final int port;
@@ -49,22 +49,33 @@ public class RaftVoterEndpoint {
/**
* Create an endpoint for a metadata quorum voter.
*
- * @param name The human-readable name for this endpoint. For
example, CONTROLLER.
+ * @param listener The human-readable name for this endpoint. For
example, CONTROLLER.
* @param host The DNS hostname for this endpoint.
* @param port The network port for this endpoint.
*/
public RaftVoterEndpoint(
- String name,
+ String listener,
String host,
int port
) {
- this.name = requireNonNullAllCapsNonEmpty(name);
+ this.listener = requireNonNullAllCapsNonEmpty(listener);
this.host = Objects.requireNonNull(host);
this.port = port;
}
+ /**
+ * The listener name for this endpoint.
+ */
+ public String listener() {
+ return listener;
+ }
+
+ /**
+ * @deprecated Since 4.1. Use {@link #listener()} instead. This function
will be removed in 5.0.
+ */
+ @Deprecated(since = "4.1", forRemoval = true)
public String name() {
- return name;
+ return listener;
}
public String host() {
@@ -79,20 +90,20 @@ public class RaftVoterEndpoint {
public boolean equals(Object o) {
if (o == null || (!o.getClass().equals(getClass()))) return false;
RaftVoterEndpoint other = (RaftVoterEndpoint) o;
- return name.equals(other.name) &&
+ return listener.equals(other.listener) &&
host.equals(other.host) &&
port == other.port;
}
@Override
public int hashCode() {
- return Objects.hash(name, host, port);
+ return Objects.hash(listener, host, port);
}
@Override
public String toString() {
// enclose IPv6 hosts in square brackets for readability
String hostString = host.contains(":") ? "[" + host + "]" : host;
- return name + "://" + hostString + ":" + port;
+ return listener + "://" + hostString + ":" + port;
}
}
diff --git a/clients/src/main/java/org/apache/kafka/common/Endpoint.java
b/clients/src/main/java/org/apache/kafka/common/Endpoint.java
index 8d5e8c6d16a..baa1045929f 100644
--- a/clients/src/main/java/org/apache/kafka/common/Endpoint.java
+++ b/clients/src/main/java/org/apache/kafka/common/Endpoint.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.common;
-import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import java.util.Objects;
@@ -26,27 +25,35 @@ import java.util.Optional;
* Represents a broker endpoint.
*/
[email protected]
public class Endpoint {
- private final String listenerName;
+ private final String listener;
private final SecurityProtocol securityProtocol;
private final String host;
private final int port;
- public Endpoint(String listenerName, SecurityProtocol securityProtocol,
String host, int port) {
- this.listenerName = listenerName;
+ public Endpoint(String listener, SecurityProtocol securityProtocol, String
host, int port) {
+ this.listener = listener;
this.securityProtocol = securityProtocol;
this.host = host;
this.port = port;
}
+ /**
+ * Returns the listener name of this endpoint.
+ */
+ public String listener() {
+ return listener;
+ }
+
/**
* Returns the listener name of this endpoint. This is non-empty for
endpoints provided
* to broker plugins, but may be empty when used in clients.
+ * @deprecated Since 4.1. Use {@link #listener()} instead. This function
will be removed in 5.0.
*/
+ @Deprecated(since = "4.1", forRemoval = true)
public Optional<String> listenerName() {
- return Optional.ofNullable(listenerName);
+ return Optional.ofNullable(listener);
}
/**
@@ -80,7 +87,7 @@ public class Endpoint {
}
Endpoint that = (Endpoint) o;
- return Objects.equals(this.listenerName, that.listenerName) &&
+ return Objects.equals(this.listener, that.listener) &&
Objects.equals(this.securityProtocol, that.securityProtocol) &&
Objects.equals(this.host, that.host) &&
this.port == that.port;
@@ -89,13 +96,13 @@ public class Endpoint {
@Override
public int hashCode() {
- return Objects.hash(listenerName, securityProtocol, host, port);
+ return Objects.hash(listener, securityProtocol, host, port);
}
@Override
public String toString() {
return "Endpoint(" +
- "listenerName='" + listenerName + '\'' +
+ "listenerName='" + listener + '\'' +
", securityProtocol=" + securityProtocol +
", host='" + host + '\'' +
", port=" + port +
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala
b/core/src/main/scala/kafka/network/SocketServer.scala
index 4163b563f01..884c00002c5 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -29,7 +29,6 @@ import kafka.network.Processor._
import kafka.network.RequestChannel.{CloseConnectionResponse,
EndThrottlingResponse, NoOpResponse, SendResponse, StartThrottlingResponse}
import kafka.network.SocketServer._
import kafka.server.{BrokerReconfigurable, KafkaConfig}
-import org.apache.kafka.network.EndPoint
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import kafka.utils._
import org.apache.kafka.common.config.ConfigException
@@ -96,7 +95,7 @@ class SocketServer(
memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS,
memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
private val memoryPool = if (config.queuedMaxBytes > 0) new
SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false,
memoryPoolSensor) else MemoryPool.NONE
// data-plane
- private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint,
DataPlaneAcceptor]()
+ private[network] val dataPlaneAcceptors = new ConcurrentHashMap[Endpoint,
DataPlaneAcceptor]()
val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests, time,
apiVersionManager.newRequestMetrics)
private[this] val nextProcessorId: AtomicInteger = new AtomicInteger(0)
@@ -161,8 +160,8 @@ class SocketServer(
* Therefore, we do not know that any particular request processor will be
running by the end of
* this function -- just that it might be running.
*
- * @param authorizerFutures Future per [[EndPoint]] used to wait before
starting the
- * processor corresponding to the [[EndPoint]].
Any endpoint
+ * @param authorizerFutures Future per [[Endpoint]] used to wait before
starting the
+ * processor corresponding to the [[Endpoint]].
Any endpoint
* that does not appear in this map will be
started once all
* authorizerFutures are complete.
*
@@ -181,7 +180,7 @@ class SocketServer(
// Because of ephemeral ports, we need to match acceptors to futures by
looking at
// the listener name, rather than the endpoint object.
val authorizerFuture = authorizerFutures.find {
- case (endpoint, _) =>
acceptor.endPoint.listenerName.value().equals(endpoint.listenerName().get())
+ case (endpoint, _) =>
acceptor.endPoint.listener.equals(endpoint.listener())
} match {
case None => allAuthorizerFuturesComplete
case Some((_, future)) => future
@@ -210,23 +209,24 @@ class SocketServer(
enableFuture
}
- private def createDataPlaneAcceptorAndProcessors(endpoint: EndPoint): Unit =
synchronized {
+ private def createDataPlaneAcceptorAndProcessors(endpoint: Endpoint): Unit =
synchronized {
if (stopped) {
throw new RuntimeException("Can't create new data plane acceptor and
processors: SocketServer is stopped.")
}
- val parsedConfigs =
config.valuesFromThisConfigWithPrefixOverride(endpoint.listenerName.configPrefix)
- connectionQuotas.addListener(config, endpoint.listenerName)
- val isPrivilegedListener = config.interBrokerListenerName ==
endpoint.listenerName
+ val listenerName = ListenerName.normalised(endpoint.listener)
+ val parsedConfigs =
config.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix)
+ connectionQuotas.addListener(config, listenerName)
+ val isPrivilegedListener = config.interBrokerListenerName == listenerName
val dataPlaneAcceptor = createDataPlaneAcceptor(endpoint,
isPrivilegedListener, dataPlaneRequestChannel)
config.addReconfigurable(dataPlaneAcceptor)
dataPlaneAcceptor.configure(parsedConfigs)
dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
- info(s"Created data-plane acceptor and processors for endpoint :
${endpoint.listenerName}")
+ info(s"Created data-plane acceptor and processors for endpoint :
${listenerName}")
}
- private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap
+ private def endpoints = config.listeners.map(l =>
ListenerName.normalised(l.listener) -> l).toMap
- protected def createDataPlaneAcceptor(endPoint: EndPoint,
isPrivilegedListener: Boolean, requestChannel: RequestChannel):
DataPlaneAcceptor = {
+ protected def createDataPlaneAcceptor(endPoint: Endpoint,
isPrivilegedListener: Boolean, requestChannel: RequestChannel):
DataPlaneAcceptor = {
new DataPlaneAcceptor(this, endPoint, config, nodeId, connectionQuotas,
time, isPrivilegedListener, requestChannel, metrics, credentialProvider,
logContext, memoryPool, apiVersionManager)
}
@@ -277,7 +277,7 @@ class SocketServer(
/**
* This method is called to dynamically add listeners.
*/
- def addListeners(listenersAdded: Seq[EndPoint]): Unit = synchronized {
+ def addListeners(listenersAdded: Seq[Endpoint]): Unit = synchronized {
if (stopped) {
throw new RuntimeException("can't add new listeners: SocketServer is
stopped.")
}
@@ -297,10 +297,10 @@ class SocketServer(
}
}
- def removeListeners(listenersRemoved: Seq[EndPoint]): Unit = synchronized {
+ def removeListeners(listenersRemoved: Seq[Endpoint]): Unit = synchronized {
info(s"Removing data-plane listeners for endpoints $listenersRemoved")
listenersRemoved.foreach { endpoint =>
- connectionQuotas.removeListener(config, endpoint.listenerName)
+ connectionQuotas.removeListener(config,
ListenerName.normalised(endpoint.listener))
dataPlaneAcceptors.asScala.remove(endpoint).foreach { acceptor =>
acceptor.beginShutdown()
acceptor.close()
@@ -345,7 +345,7 @@ class SocketServer(
// For test usage
def dataPlaneAcceptor(listenerName: String): Option[DataPlaneAcceptor] = {
dataPlaneAcceptors.asScala.foreach { case (endPoint, acceptor) =>
- if (endPoint.listenerName.value() == listenerName)
+ if (endPoint.listener == listenerName)
return Some(acceptor)
}
None
@@ -374,7 +374,7 @@ object DataPlaneAcceptor {
}
class DataPlaneAcceptor(socketServer: SocketServer,
- endPoint: EndPoint,
+ endPoint: Endpoint,
config: KafkaConfig,
nodeId: Int,
connectionQuotas: ConnectionQuotas,
@@ -404,7 +404,7 @@ class DataPlaneAcceptor(socketServer: SocketServer,
* Returns the listener name associated with this reconfigurable.
Listener-specific
* configs corresponding to this listener name are provided for
reconfiguration.
*/
- override def listenerName(): ListenerName = endPoint.listenerName
+ override def listenerName(): ListenerName =
ListenerName.normalised(endPoint.listener)
/**
* Returns the names of configs that may be reconfigured.
@@ -451,7 +451,7 @@ class DataPlaneAcceptor(socketServer: SocketServer,
val newNumNetworkThreads =
configs.get(SocketServerConfigs.NUM_NETWORK_THREADS_CONFIG).asInstanceOf[Int]
if (newNumNetworkThreads != processors.length) {
- info(s"Resizing network thread pool size for ${endPoint.listenerName}
listener from ${processors.length} to $newNumNetworkThreads")
+ info(s"Resizing network thread pool size for ${endPoint.listener}
listener from ${processors.length} to $newNumNetworkThreads")
if (newNumNetworkThreads > processors.length) {
addProcessors(newNumNetworkThreads - processors.length)
} else if (newNumNetworkThreads < processors.length) {
@@ -472,7 +472,7 @@ class DataPlaneAcceptor(socketServer: SocketServer,
* Thread that accepts and configures new connections. There is one of these
per endpoint.
*/
private[kafka] abstract class Acceptor(val socketServer: SocketServer,
- val endPoint: EndPoint,
+ val endPoint: Endpoint,
var config: KafkaConfig,
nodeId: Int,
val connectionQuotas: ConnectionQuotas,
@@ -515,7 +515,7 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
private val backwardCompatibilityMetricGroup = new
KafkaMetricsGroup("kafka.network", "Acceptor")
private val blockedPercentMeterMetricName =
backwardCompatibilityMetricGroup.metricName(
"AcceptorBlockedPercent",
- Map(ListenerMetricTag -> endPoint.listenerName.value).asJava)
+ Map(ListenerMetricTag -> endPoint.listener).asJava)
private val blockedPercentMeter =
metricsGroup.newMeter(blockedPercentMeterMetricName,"blocked time",
TimeUnit.NANOSECONDS)
private var currentProcessorIndex = 0
private[network] val throttledSockets = new
mutable.PriorityQueue[DelayedCloseSocket]()
@@ -523,7 +523,7 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
private[network] val startedFuture = new CompletableFuture[Void]()
val thread: KafkaThread = KafkaThread.nonDaemon(
-
s"data-plane-kafka-socket-acceptor-${endPoint.listenerName}-${endPoint.securityProtocol}-${endPoint.port}",
+
s"data-plane-kafka-socket-acceptor-${endPoint.listener}-${endPoint.securityProtocol}-${endPoint.port}",
this)
def start(): Unit = synchronized {
@@ -535,19 +535,19 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
serverChannel = openServerSocket(endPoint.host, endPoint.port,
listenBacklogSize)
debug(s"Opened endpoint ${endPoint.host}:${endPoint.port}")
}
- debug(s"Starting processors for listener ${endPoint.listenerName}")
+ debug(s"Starting processors for listener ${endPoint.listener}")
processors.foreach(_.start())
- debug(s"Starting acceptor thread for listener ${endPoint.listenerName}")
+ debug(s"Starting acceptor thread for listener ${endPoint.listener}")
thread.start()
startedFuture.complete(null)
started.set(true)
} catch {
case e: ClosedChannelException =>
- debug(s"Refusing to start acceptor for ${endPoint.listenerName} since
the acceptor has already been shut down.")
+ debug(s"Refusing to start acceptor for ${endPoint.listener} since the
acceptor has already been shut down.")
startedFuture.completeExceptionally(e)
case t: Throwable =>
- error(s"Unable to start acceptor for ${endPoint.listenerName}", t)
- startedFuture.completeExceptionally(new RuntimeException(s"Unable to
start acceptor for ${endPoint.listenerName}", t))
+ error(s"Unable to start acceptor for ${endPoint.listener}", t)
+ startedFuture.completeExceptionally(new RuntimeException(s"Unable to
start acceptor for ${endPoint.listener}", t))
}
}
@@ -628,7 +628,7 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
new InetSocketAddress(host, port)
}
val serverChannel = socketServer.socketFactory.openServerSocket(
- endPoint.listenerName.value(),
+ endPoint.listener,
socketAddress,
listenBacklogSize,
recvBufferSize)
@@ -682,14 +682,15 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
private def accept(key: SelectionKey): Option[SocketChannel] = {
val serverSocketChannel = key.channel().asInstanceOf[ServerSocketChannel]
val socketChannel = serverSocketChannel.accept()
+ val listenerName = ListenerName.normalised(endPoint.listener)
try {
- connectionQuotas.inc(endPoint.listenerName,
socketChannel.socket.getInetAddress, blockedPercentMeter)
+ connectionQuotas.inc(listenerName, socketChannel.socket.getInetAddress,
blockedPercentMeter)
configureAcceptedSocketChannel(socketChannel)
Some(socketChannel)
} catch {
case e: TooManyConnectionsException =>
info(s"Rejected connection from ${e.ip}, address already has the
configured maximum of ${e.count} connections.")
- connectionQuotas.closeChannel(this, endPoint.listenerName,
socketChannel)
+ connectionQuotas.closeChannel(this, listenerName, socketChannel)
None
case e: ConnectionThrottledException =>
val ip = socketChannel.socket.getInetAddress
@@ -699,7 +700,7 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
None
case e: IOException =>
error(s"Encountered an error while configuring the connection, closing
it.", e)
- connectionQuotas.closeChannel(this, endPoint.listenerName,
socketChannel)
+ connectionQuotas.closeChannel(this, listenerName, socketChannel)
None
}
}
@@ -741,7 +742,7 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
def wakeup(): Unit = nioSelector.wakeup()
def addProcessors(toCreate: Int): Unit = synchronized {
- val listenerName = endPoint.listenerName
+ val listenerName = ListenerName.normalised(endPoint.listener)
val securityProtocol = endPoint.securityProtocol
val listenerProcessors = new ArrayBuffer[Processor]()
@@ -761,7 +762,7 @@ private[kafka] abstract class Acceptor(val socketServer:
SocketServer,
listenerName: ListenerName,
securityProtocol: SecurityProtocol,
connectionDisconnectListeners:
Seq[ConnectionDisconnectListener]): Processor = {
- val name =
s"data-plane-kafka-network-thread-$nodeId-${endPoint.listenerName}-${endPoint.securityProtocol}-$id"
+ val name =
s"data-plane-kafka-network-thread-$nodeId-${endPoint.listener}-${endPoint.securityProtocol}-$id"
new Processor(id,
time,
config.socketRequestMaxBytes,
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala
b/core/src/main/scala/kafka/server/BrokerServer.scala
index a2c6d4b98f5..172c02ae924 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -274,7 +274,7 @@ class BrokerServer(
clientQuotaMetadataManager = new
ClientQuotaMetadataManager(quotaManagers, socketServer.connectionQuotas)
val listenerInfo =
ListenerInfo.create(Optional.of(config.interBrokerListenerName.value()),
- config.effectiveAdvertisedBrokerListeners.map(_.toPublic()).asJava).
+ config.effectiveAdvertisedBrokerListeners.asJava).
withWildcardHostnamesResolved().
withEphemeralPortsCorrected(name => socketServer.boundPort(new
ListenerName(name)))
diff --git a/core/src/main/scala/kafka/server/ControllerServer.scala
b/core/src/main/scala/kafka/server/ControllerServer.scala
index 3fbc333f85f..e20deee1af9 100644
--- a/core/src/main/scala/kafka/server/ControllerServer.scala
+++ b/core/src/main/scala/kafka/server/ControllerServer.scala
@@ -174,7 +174,7 @@ class ControllerServer(
sharedServer.socketFactory)
val listenerInfo = ListenerInfo
-
.create(config.effectiveAdvertisedControllerListeners.map(_.toPublic).asJava)
+ .create(config.effectiveAdvertisedControllerListeners.asJava)
.withWildcardHostnamesResolved()
.withEphemeralPortsCorrected(name => socketServer.boundPort(new
ListenerName(name)))
socketServerFirstBoundPortFuture.complete(listenerInfo.firstListener().port())
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 27b5c8e16d4..dc4a09e0d23 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -27,7 +27,7 @@ import kafka.raft.KafkaRaftManager
import kafka.server.DynamicBrokerConfig._
import kafka.utils.{CoreUtils, Logging}
import org.apache.kafka.common.Reconfigurable
-import org.apache.kafka.network.EndPoint
+import org.apache.kafka.common.Endpoint
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
import org.apache.kafka.common.config.{AbstractConfig, ConfigDef,
ConfigException, ConfigResource, SaslConfigs, SslConfigs}
import org.apache.kafka.common.metadata.{ConfigRecord, MetadataRecordType}
@@ -961,9 +961,9 @@ class DynamicListenerConfig(server: KafkaBroker) extends
BrokerReconfigurable wi
def validateReconfiguration(newConfig: KafkaConfig): Unit = {
val oldConfig = server.config
- val newListeners = newConfig.listeners.map(_.listenerName).toSet
- val oldAdvertisedListeners =
oldConfig.effectiveAdvertisedBrokerListeners.map(_.listenerName).toSet
- val oldListeners = oldConfig.listeners.map(_.listenerName).toSet
+ val newListeners = newConfig.listeners.map(l =>
ListenerName.normalised(l.listener)).toSet
+ val oldAdvertisedListeners =
oldConfig.effectiveAdvertisedBrokerListeners.map(l =>
ListenerName.normalised(l.listener)).toSet
+ val oldListeners = oldConfig.listeners.map(l =>
ListenerName.normalised(l.listener)).toSet
if (!oldAdvertisedListeners.subsetOf(newListeners))
throw new ConfigException(s"Advertised listeners
'$oldAdvertisedListeners' must be a subset of listeners '$newListeners'")
if
(!newListeners.subsetOf(newConfig.effectiveListenerSecurityProtocolMap.keySet))
@@ -988,8 +988,8 @@ class DynamicListenerConfig(server: KafkaBroker) extends
BrokerReconfigurable wi
val newListenerMap = listenersToMap(newListeners)
val oldListeners = oldConfig.listeners
val oldListenerMap = listenersToMap(oldListeners)
- val listenersRemoved = oldListeners.filterNot(e =>
newListenerMap.contains(e.listenerName))
- val listenersAdded = newListeners.filterNot(e =>
oldListenerMap.contains(e.listenerName))
+ val listenersRemoved = oldListeners.filterNot(e =>
newListenerMap.contains(ListenerName.normalised(e.listener)))
+ val listenersAdded = newListeners.filterNot(e =>
oldListenerMap.contains(ListenerName.normalised(e.listener)))
if (listenersRemoved.nonEmpty || listenersAdded.nonEmpty) {
LoginManager.closeAll() // Clear SASL login cache to force re-login
if (listenersRemoved.nonEmpty)
server.socketServer.removeListeners(listenersRemoved)
@@ -997,8 +997,8 @@ class DynamicListenerConfig(server: KafkaBroker) extends
BrokerReconfigurable wi
}
}
- private def listenersToMap(listeners: Seq[EndPoint]): Map[ListenerName,
EndPoint] =
- listeners.map(e => (e.listenerName, e)).toMap
+ private def listenersToMap(listeners: Seq[Endpoint]): Map[ListenerName,
Endpoint] =
+ listeners.map(e => (ListenerName.normalised(e.listener), e)).toMap
}
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index b982e72e141..53c70f4737a 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -22,7 +22,7 @@ import java.util.concurrent.TimeUnit
import java.util.Properties
import kafka.utils.{CoreUtils, Logging}
import kafka.utils.Implicits._
-import org.apache.kafka.common.Reconfigurable
+import org.apache.kafka.common.{Endpoint, KafkaException, Reconfigurable}
import org.apache.kafka.common.config.{ConfigDef, ConfigException,
ConfigResource, TopicConfig}
import org.apache.kafka.common.config.ConfigDef.ConfigKey
import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
@@ -34,7 +34,6 @@ import org.apache.kafka.common.record.TimestampType
import org.apache.kafka.common.security.auth.KafkaPrincipalSerde
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.network.EndPoint
import org.apache.kafka.coordinator.group.Group.GroupType
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.group.{GroupConfig, GroupCoordinatorConfig}
@@ -145,6 +144,14 @@ object KafkaConfig {
}
output
}
+
+ private def parseListenerName(connectionString: String): String = {
+ val firstColon = connectionString.indexOf(':')
+ if (firstColon < 0) {
+ throw new KafkaException(s"Unable to parse a listener name from
$connectionString")
+ }
+ connectionString.substring(0, firstColon).toUpperCase(util.Locale.ROOT)
+ }
}
/**
@@ -276,8 +283,8 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
}
val earlyStartListeners: Set[ListenerName] = {
- val listenersSet = listeners.map(_.listenerName).toSet
- val controllerListenersSet = controllerListeners.map(_.listenerName).toSet
+ val listenersSet = listeners.map(l =>
ListenerName.normalised(l.listener)).toSet
+ val controllerListenersSet = controllerListeners.map(l =>
ListenerName.normalised(l.listener)).toSet
Option(getString(ServerConfigs.EARLY_START_LISTENERS_CONFIG)) match {
case None => controllerListenersSet
case Some(str) =>
@@ -459,7 +466,7 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
}
}
- def listeners: Seq[EndPoint] =
+ def listeners: Seq[Endpoint] =
CoreUtils.listenerListToEndPoints(getString(SocketServerConfigs.LISTENERS_CONFIG),
effectiveListenerSecurityProtocolMap)
def controllerListenerNames: Seq[String] = {
@@ -471,23 +478,23 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
}
}
- def controllerListeners: Seq[EndPoint] =
- listeners.filter(l =>
controllerListenerNames.contains(l.listenerName.value()))
+ def controllerListeners: Seq[Endpoint] =
+ listeners.filter(l => controllerListenerNames.contains(l.listener))
def saslMechanismControllerProtocol: String =
getString(KRaftConfigs.SASL_MECHANISM_CONTROLLER_PROTOCOL_CONFIG)
- def dataPlaneListeners: Seq[EndPoint] = {
+ def dataPlaneListeners: Seq[Endpoint] = {
listeners.filterNot { listener =>
- val name = listener.listenerName.value()
+ val name = listener.listener
controllerListenerNames.contains(name)
}
}
- def effectiveAdvertisedControllerListeners: Seq[EndPoint] = {
+ def effectiveAdvertisedControllerListeners: Seq[Endpoint] = {
val advertisedListenersProp =
getString(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
val controllerAdvertisedListeners = if (advertisedListenersProp != null) {
CoreUtils.listenerListToEndPoints(advertisedListenersProp,
effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
- .filter(l => controllerListenerNames.contains(l.listenerName.value()))
+ .filter(l => controllerListenerNames.contains(l.listener))
} else {
Seq.empty
}
@@ -495,16 +502,16 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
controllerListenerNames.flatMap { name =>
controllerAdvertisedListeners
- .find(endpoint =>
endpoint.listenerName.equals(ListenerName.normalised(name)))
+ .find(endpoint =>
ListenerName.normalised(endpoint.listener).equals(ListenerName.normalised(name)))
.orElse(
// If users don't define advertised.listeners, the advertised
controller listeners inherit from listeners configuration
// which match listener names in controller.listener.names.
// Removing "0.0.0.0" host to avoid validation errors. This is to be
compatible with the old behavior before 3.9.
// The null or "" host does a reverse lookup in
ListenerInfo#withWildcardHostnamesResolved.
controllerListenersValue
- .find(endpoint =>
endpoint.listenerName.equals(ListenerName.normalised(name)))
+ .find(endpoint =>
ListenerName.normalised(endpoint.listener).equals(ListenerName.normalised(name)))
.map(endpoint => if (endpoint.host == "0.0.0.0") {
- new EndPoint(null, endpoint.port, endpoint.listenerName,
endpoint.securityProtocol)
+ new Endpoint(endpoint.listener, endpoint.securityProtocol, null,
endpoint.port)
} else {
endpoint
})
@@ -512,7 +519,7 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
}
}
- def effectiveAdvertisedBrokerListeners: Seq[EndPoint] = {
+ def effectiveAdvertisedBrokerListeners: Seq[Endpoint] = {
// Use advertised listeners if defined, fallback to listeners otherwise
val advertisedListenersProp =
getString(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG)
val advertisedListeners = if (advertisedListenersProp != null) {
@@ -521,7 +528,7 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
listeners
}
// Only expose broker listeners
- advertisedListeners.filterNot(l =>
controllerListenerNames.contains(l.listenerName.value()))
+ advertisedListeners.filterNot(l =>
controllerListenerNames.contains(l.listener))
}
private def getInterBrokerListenerNameAndSecurityProtocol: (ListenerName,
SecurityProtocol) = {
@@ -563,7 +570,7 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
// check controller listener names (they won't appear in listeners when
process.roles=broker)
// as well as listeners for occurrences of SSL or SASL_*
if (controllerListenerNames.exists(isSslOrSasl) ||
-
Csv.parseCsvList(getString(SocketServerConfigs.LISTENERS_CONFIG)).asScala.exists(listenerValue
=> isSslOrSasl(EndPoint.parseListenerName(listenerValue)))) {
+
Csv.parseCsvList(getString(SocketServerConfigs.LISTENERS_CONFIG)).asScala.exists(listenerValue
=> isSslOrSasl(KafkaConfig.parseListenerName(listenerValue)))) {
mapValue // don't add default mappings since we found something that
is SSL or SASL_*
} else {
// add the PLAINTEXT mappings for all controller listener names that
are not explicitly PLAINTEXT
@@ -591,7 +598,7 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
require(replicaFetchWaitMaxMs <= replicaLagTimeMaxMs,
"replica.fetch.wait.max.ms should always be less than or equal to
replica.lag.time.max.ms" +
" to prevent frequent changes in ISR")
- val advertisedBrokerListenerNames =
effectiveAdvertisedBrokerListeners.map(_.listenerName).toSet
+ val advertisedBrokerListenerNames =
effectiveAdvertisedBrokerListeners.map(l =>
ListenerName.normalised(l.listener)).toSet
// validate KRaft-related configs
val voterIds = QuorumConfig.parseVoterIds(quorumConfig.voters)
@@ -614,7 +621,7 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must contain at
least one value appearing in the '${SocketServerConfigs.LISTENERS_CONFIG}'
configuration when running the KRaft controller role")
}
def
validateControllerListenerNamesMustAppearInListenersForKRaftController(): Unit
= {
- val listenerNameValues = listeners.map(_.listenerName.value).toSet
+ val listenerNameValues = listeners.map(_.listener).toSet
require(controllerListenerNames.forall(cln =>
listenerNameValues.contains(cln)),
s"${KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG} must only contain
values appearing in the '${SocketServerConfigs.LISTENERS_CONFIG}' configuration
when running the KRaft controller role")
}
@@ -681,7 +688,7 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
validateControllerListenerNamesMustAppearInListenersForKRaftController()
}
- val listenerNames = listeners.map(_.listenerName).toSet
+ val listenerNames = listeners.map(l =>
ListenerName.normalised(l.listener)).toSet
if (processRoles.contains(ProcessRole.BrokerRole)) {
validateAdvertisedBrokerListenersNonEmptyForBroker()
require(advertisedBrokerListenerNames.contains(interBrokerListenerName),
diff --git a/core/src/main/scala/kafka/tools/TestRaftServer.scala
b/core/src/main/scala/kafka/tools/TestRaftServer.scala
index c07538aadad..a47b9fd4d47 100644
--- a/core/src/main/scala/kafka/tools/TestRaftServer.scala
+++ b/core/src/main/scala/kafka/tools/TestRaftServer.scala
@@ -30,6 +30,7 @@ import
org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.metrics.stats.Percentiles.BucketSizing
import org.apache.kafka.common.metrics.stats.{Meter, Percentile, Percentiles}
+import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.{ObjectSerializationCache, Writable}
import org.apache.kafka.common.security.scram.internals.ScramMechanism
import
org.apache.kafka.common.security.token.delegation.internals.DelegationTokenCache
@@ -88,7 +89,7 @@ class TestRaftServer(
val endpoints = Endpoints.fromInetSocketAddresses(
config.effectiveAdvertisedControllerListeners
.map { endpoint =>
- (endpoint.listenerName,
InetSocketAddress.createUnresolved(endpoint.host, endpoint.port))
+ (ListenerName.normalised(endpoint.listener),
InetSocketAddress.createUnresolved(endpoint.host, endpoint.port))
}
.toMap
.asJava
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala
b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 98a06f80f46..d41c796374b 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -25,8 +25,8 @@ import com.typesafe.scalalogging.Logger
import javax.management.ObjectName
import scala.collection._
import scala.collection.Seq
-import org.apache.kafka.network.EndPoint
import org.apache.commons.validator.routines.InetAddressValidator
+import org.apache.kafka.common.Endpoint
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.security.auth.SecurityProtocol
import org.apache.kafka.common.utils.Utils
@@ -122,22 +122,22 @@ object CoreUtils {
def inWriteLock[T](lock: ReadWriteLock)(fun: => T): T =
inLock[T](lock.writeLock)(fun)
- def listenerListToEndPoints(listeners: String, securityProtocolMap:
Map[ListenerName, SecurityProtocol]): Seq[EndPoint] = {
+ def listenerListToEndPoints(listeners: String, securityProtocolMap:
Map[ListenerName, SecurityProtocol]): Seq[Endpoint] = {
listenerListToEndPoints(listeners, securityProtocolMap,
requireDistinctPorts = true)
}
- private def checkDuplicateListenerPorts(endpoints: Seq[EndPoint], listeners:
String): Unit = {
+ private def checkDuplicateListenerPorts(endpoints: Seq[Endpoint], listeners:
String): Unit = {
val distinctPorts = endpoints.map(_.port).distinct
require(distinctPorts.size == endpoints.map(_.port).size, s"Each listener
must have a different port, listeners: $listeners")
}
- def listenerListToEndPoints(listeners: String, securityProtocolMap:
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean):
Seq[EndPoint] = {
+ def listenerListToEndPoints(listeners: String, securityProtocolMap:
Map[ListenerName, SecurityProtocol], requireDistinctPorts: Boolean):
Seq[Endpoint] = {
def validateOneIsIpv4AndOtherIpv6(first: String, second: String): Boolean =
(inetAddressValidator.isValidInet4Address(first) &&
inetAddressValidator.isValidInet6Address(second)) ||
(inetAddressValidator.isValidInet6Address(first) &&
inetAddressValidator.isValidInet4Address(second))
- def validate(endPoints: Seq[EndPoint]): Unit = {
- val distinctListenerNames = endPoints.map(_.listenerName).distinct
+ def validate(endPoints: Seq[Endpoint]): Unit = {
+ val distinctListenerNames = endPoints.map(_.listener).distinct
require(distinctListenerNames.size == endPoints.size, s"Each listener
must have a different name, listeners: $listeners")
val (duplicatePorts, _) = endPoints.filter {
@@ -186,8 +186,7 @@ object CoreUtils {
}
val endPoints = try {
- SocketServerConfigs.listenerListToEndPoints(listeners,
securityProtocolMap.asJava).
- asScala.map(EndPoint.fromPublic)
+ SocketServerConfigs.listenerListToEndPoints(listeners,
securityProtocolMap.asJava).asScala
} catch {
case e: Exception =>
throw new IllegalArgumentException(s"Error creating broker listeners
from '$listeners': ${e.getMessage}", e)
diff --git
a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
index 4b1b94336f7..7263fa82847 100644
--- a/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SslAdminIntegrationTest.scala
@@ -353,7 +353,7 @@ class SslAdminIntegrationTest extends
SaslSslAdminIntegrationTest {
val controllerListenerName =
ListenerName.forSecurityProtocol(extraControllerSecurityProtocol)
val config = controllerServers.map { s =>
val listener = s.config.effectiveAdvertisedControllerListeners
- .find(_.listenerName == controllerListenerName)
+ .find(_.listener == controllerListenerName.value)
.getOrElse(throw new IllegalArgumentException(s"Could not find
listener with name $controllerListenerName"))
Utils.formatAddress(listener.host,
s.socketServer.boundPort(controllerListenerName))
}.mkString(",")
diff --git
a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
index ac227893d9f..234518ea83a 100644
--- a/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
+++ b/core/src/test/scala/integration/kafka/server/KRaftClusterTest.scala
@@ -113,7 +113,7 @@ class KRaftClusterTest {
cluster.format()
cluster.startup()
val controller =
cluster.controllers().values().iterator().asScala.filter(_.controller.isActive).next()
- val port =
controller.socketServer.boundPort(controller.config.controllerListeners.head.listenerName)
+ val port =
controller.socketServer.boundPort(ListenerName.normalised(controller.config.controllerListeners.head.listener))
// shutdown active controller
controller.shutdown()
diff --git
a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
index bcde663eb52..55e5e3e7d92 100644
---
a/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
+++
b/core/src/test/scala/integration/kafka/server/MultipleListenersWithSameSecurityProtocolBaseTest.scala
@@ -132,7 +132,7 @@ abstract class
MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
TestUtils.ensureConsistentKRaftMetadata(servers, controllerServer)
servers.head.config.listeners.foreach { endPoint =>
- val listenerName = endPoint.listenerName
+ val listenerName = ListenerName.normalised(endPoint.listener)
val trustStoreFile =
if (JaasTestUtils.usesSslTransportLayer(endPoint.securityProtocol))
Some(this.trustStoreFile)
@@ -155,7 +155,7 @@ abstract class
MultipleListenersWithSameSecurityProtocolBaseTest extends QuorumT
}
if (JaasTestUtils.usesSaslAuthentication(endPoint.securityProtocol)) {
- kafkaServerSaslMechanisms(endPoint.listenerName.value).foreach {
mechanism =>
+ kafkaServerSaslMechanisms(endPoint.listener).foreach { mechanism =>
addProducerConsumer(listenerName, mechanism,
Some(kafkaClientSaslProperties(mechanism, dynamicJaasConfig = true)))
}
} else {
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index 1d793e726b8..ac1a98e7fc9 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -22,6 +22,7 @@ import com.yammer.metrics.core.{Gauge, Meter}
import kafka.server._
import kafka.utils.Implicits._
import kafka.utils.TestUtils
+import org.apache.kafka.common.Endpoint
import org.apache.kafka.common.memory.MemoryPool
import org.apache.kafka.common.message.ApiMessageType.ListenerType
import org.apache.kafka.common.message.{ProduceRequestData,
SaslAuthenticateRequestData, SaslHandshakeRequestData, VoteRequestData}
@@ -36,7 +37,6 @@ import
org.apache.kafka.common.security.scram.internals.ScramMechanism
import org.apache.kafka.common.utils._
import org.apache.kafka.network.RequestConvertToJson
import org.apache.kafka.network.SocketServerConfigs
-import org.apache.kafka.network.EndPoint
import org.apache.kafka.security.CredentialProvider
import org.apache.kafka.server.{ApiVersionManager, SimpleApiVersionManager}
import org.apache.kafka.server.common.{FinalizedFeatures, MetadataVersion}
@@ -91,10 +91,10 @@ class SocketServerTest {
private val kafkaLogger = LogManager.getLogger("kafka")
private var logLevelToRestore: Level = _
- def endpoint: EndPoint = {
+ def endpoint: Endpoint = {
KafkaConfig.fromProps(props, doLog = false).dataPlaneListeners.head
}
- def listener: String = endpoint.listenerName.value
+ def listener: String = endpoint.listener
val uncaughtExceptions = new AtomicInteger(0)
@BeforeEach
@@ -840,7 +840,7 @@ class SocketServerTest {
// same as SocketServer.createAcceptor,
// except the Acceptor overriding a method to inject the exception
- override protected def createDataPlaneAcceptor(endPoint: EndPoint,
isPrivilegedListener: Boolean, requestChannel: RequestChannel):
DataPlaneAcceptor = {
+ override protected def createDataPlaneAcceptor(endPoint: Endpoint,
isPrivilegedListener: Boolean, requestChannel: RequestChannel):
DataPlaneAcceptor = {
new DataPlaneAcceptor(this, endPoint, this.config, nodeId,
connectionQuotas, time, false, requestChannel, serverMetrics,
this.credentialProvider, new LogContext(), MemoryPool.NONE,
this.apiVersionManager) {
override protected def configureAcceptedSocketChannel(socketChannel:
SocketChannel): Unit = {
@@ -1858,7 +1858,7 @@ class SocketServerTest {
val failedFuture = new CompletableFuture[Void]()
failedFuture.completeExceptionally(new RuntimeException("authorizer
startup failed"))
assertThrows(classOf[ExecutionException], () => {
- newServer.enableRequestProcessing(Map(endpoint.toPublic ->
failedFuture)).get()
+ newServer.enableRequestProcessing(Map(endpoint -> failedFuture)).get()
})
} finally {
shutdownServerAndMetrics(newServer)
@@ -1891,7 +1891,7 @@ class SocketServerTest {
val authorizerFuture = new CompletableFuture[Void]()
val enableFuture = newServer.enableRequestProcessing(
newServer.dataPlaneAcceptors.keys().asScala.
- map(_.toPublic).map(k => k -> authorizerFuture).toMap)
+ map(k => k -> authorizerFuture).toMap)
assertFalse(authorizerFuture.isDone)
assertFalse(enableFuture.isDone)
newServer.dataPlaneAcceptors.values().forEach(a =>
assertNull(a.serverChannel))
@@ -1992,7 +1992,7 @@ class SocketServerTest {
}
class TestableAcceptor(socketServer: SocketServer,
- endPoint: EndPoint,
+ endPoint: Endpoint,
cfg: KafkaConfig,
nodeId: Int,
connectionQuotas: ConnectionQuotas,
@@ -2098,7 +2098,7 @@ class SocketServerTest {
connectionDisconnectListeners = connectionDisconnectListeners
) {
- override def createDataPlaneAcceptor(endPoint: EndPoint,
isPrivilegedListener: Boolean, requestChannel: RequestChannel) :
DataPlaneAcceptor = {
+ override def createDataPlaneAcceptor(endPoint: Endpoint,
isPrivilegedListener: Boolean, requestChannel: RequestChannel) :
DataPlaneAcceptor = {
new TestableAcceptor(this, endPoint, this.config, 0, connectionQuotas,
time, isPrivilegedListener, requestChannel, this.metrics,
this.credentialProvider, new LogContext, MemoryPool.NONE,
this.apiVersionManager, connectionQueueSize)
}
diff --git a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
index 3c816f635db..38187b11eb8 100644
--- a/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/raft/RaftManagerTest.scala
@@ -28,6 +28,7 @@ import kafka.utils.TestUtils
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.Uuid
import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.utils.Time
import org.apache.kafka.network.SocketServerConfigs
import org.apache.kafka.raft.{Endpoints, MetadataLogConfig, QuorumConfig}
@@ -88,7 +89,7 @@ class RaftManagerTest {
val endpoints = Endpoints.fromInetSocketAddresses(
config.effectiveAdvertisedControllerListeners
.map { endpoint =>
- (endpoint.listenerName,
InetSocketAddress.createUnresolved(endpoint.host, endpoint.port))
+ (ListenerName.normalised(endpoint.listener),
InetSocketAddress.createUnresolved(endpoint.host, endpoint.port))
}
.toMap
.asJava
diff --git
a/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala
b/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala
index 68f775fb3e7..46ea20758e2 100644
---
a/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala
+++
b/core/src/test/scala/unit/kafka/server/ControllerRegistrationManagerTest.scala
@@ -76,7 +76,7 @@ class ControllerRegistrationManagerTest {
"controller-registration-manager-test-",
createSupportedFeatures(MetadataVersion.IBP_3_7_IV0),
RecordTestUtils.createTestControllerRegistration(1,
false).incarnationId(),
-
ListenerInfo.create(context.config.controllerListeners.map(_.toPublic).asJava),
+ ListenerInfo.create(context.config.controllerListeners.asJava),
new ExponentialBackoff(1, 2, 100, 0.02))
}
diff --git
a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 17ad2200dcc..5f87b20d1f5 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -1036,7 +1036,7 @@ class DynamicBrokerConfigTest {
props.put(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
"SASL_PLAINTEXT://localhost:8181")
ctx.config.dynamicConfig.updateDefaultConfig(props)
ctx.config.effectiveAdvertisedBrokerListeners.foreach(e =>
- assertEquals(SecurityProtocol.PLAINTEXT.name, e.listenerName.value)
+ assertEquals(SecurityProtocol.PLAINTEXT.name, e.listener)
)
assertFalse(ctx.currentDefaultLogConfig.get().originals().containsKey(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG))
}
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 517741cf2d8..55c88666c70 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -22,7 +22,7 @@ import java.util
import java.util.{Arrays, Collections, Properties}
import kafka.utils.TestUtils.assertBadConfigContainingMessage
import kafka.utils.{CoreUtils, TestUtils}
-import org.apache.kafka.common.Node
+import org.apache.kafka.common.{Endpoint, Node}
import org.apache.kafka.common.config.{ConfigException, SaslConfigs,
SecurityConfig, SslConfigs, TopicConfig}
import org.apache.kafka.common.metrics.Sensor
import org.apache.kafka.common.network.ListenerName
@@ -35,7 +35,6 @@ import
org.apache.kafka.coordinator.group.GroupCoordinatorConfig
import org.apache.kafka.coordinator.group.modern.share.ShareGroupConfig
import org.apache.kafka.coordinator.transaction.{TransactionLogConfig,
TransactionStateManagerConfig}
import org.apache.kafka.network.SocketServerConfigs
-import org.apache.kafka.network.EndPoint
import org.apache.kafka.raft.{MetadataLogConfig, QuorumConfig}
import org.apache.kafka.server.config.{DelegationTokenManagerConfigs,
KRaftConfigs, QuotaConfig, ReplicationConfigs, ServerConfigs, ServerLogConfigs,
ServerTopicConfigSynonyms}
import org.apache.kafka.server.log.remote.storage.RemoteLogManagerConfig
@@ -343,7 +342,7 @@ class KafkaConfigTest {
val config = KafkaConfig.fromProps(props)
assertEquals(
- Seq(new EndPoint("lb1.example.com", 9000,
ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT)),
+ Seq(new Endpoint("CONTROLLER", SecurityProtocol.PLAINTEXT,
"lb1.example.com", 9000)),
config.effectiveAdvertisedControllerListeners
)
}
@@ -359,7 +358,7 @@ class KafkaConfigTest {
val config = KafkaConfig.fromProps(props)
assertEquals(
- Seq(new EndPoint("localhost", 9093,
ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT)),
+ Seq(new Endpoint("CONTROLLER", SecurityProtocol.PLAINTEXT, "localhost",
9093)),
config.effectiveAdvertisedControllerListeners
)
}
@@ -377,8 +376,8 @@ class KafkaConfigTest {
val config = KafkaConfig.fromProps(props)
assertEquals(
Seq(
- new EndPoint("lb1.example.com", 9000,
ListenerName.normalised("CONTROLLER"), SecurityProtocol.PLAINTEXT),
- new EndPoint("localhost", 9094,
ListenerName.normalised("CONTROLLER_NEW"), SecurityProtocol.PLAINTEXT)
+ new Endpoint("CONTROLLER", SecurityProtocol.PLAINTEXT,
"lb1.example.com", 9000),
+ new Endpoint("CONTROLLER_NEW", SecurityProtocol.PLAINTEXT,
"localhost", 9094)
),
config.effectiveAdvertisedControllerListeners
)
@@ -507,9 +506,9 @@ class KafkaConfigTest {
props.setProperty(ReplicationConfigs.INTER_BROKER_LISTENER_NAME_CONFIG,
"REPLICATION")
val config = KafkaConfig.fromProps(props)
val expectedListeners = Seq(
- new EndPoint("localhost", 9091, new ListenerName("CLIENT"),
SecurityProtocol.SSL),
- new EndPoint("localhost", 9092, new ListenerName("REPLICATION"),
SecurityProtocol.SSL),
- new EndPoint("localhost", 9093, new ListenerName("INTERNAL"),
SecurityProtocol.PLAINTEXT))
+ new Endpoint("CLIENT", SecurityProtocol.SSL, "localhost", 9091),
+ new Endpoint("REPLICATION", SecurityProtocol.SSL, "localhost", 9092),
+ new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9093))
assertEquals(expectedListeners, config.listeners)
assertEquals(expectedListeners, config.effectiveAdvertisedBrokerListeners)
val expectedSecurityProtocolMap = Map(
@@ -536,14 +535,14 @@ class KafkaConfigTest {
val config = KafkaConfig.fromProps(props)
val expectedListeners = Seq(
- new EndPoint("localhost", 9091, new ListenerName("EXTERNAL"),
SecurityProtocol.SSL),
- new EndPoint("localhost", 9093, new ListenerName("INTERNAL"),
SecurityProtocol.PLAINTEXT)
+ new Endpoint("EXTERNAL", SecurityProtocol.SSL, "localhost", 9091),
+ new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "localhost", 9093)
)
assertEquals(expectedListeners, config.listeners)
val expectedAdvertisedListeners = Seq(
- new EndPoint("lb1.example.com", 9000, new ListenerName("EXTERNAL"),
SecurityProtocol.SSL),
- new EndPoint("host1", 9093, new ListenerName("INTERNAL"),
SecurityProtocol.PLAINTEXT)
+ new Endpoint("EXTERNAL", SecurityProtocol.SSL, "lb1.example.com", 9000),
+ new Endpoint("INTERNAL", SecurityProtocol.PLAINTEXT, "host1", 9093)
)
assertEquals(expectedAdvertisedListeners,
config.effectiveAdvertisedBrokerListeners)
@@ -593,8 +592,8 @@ class KafkaConfigTest {
props.setProperty(SocketServerConfigs.LISTENERS_CONFIG,
"plaintext://localhost:9091,SsL://localhost:9092")
props.setProperty(SocketServerConfigs.LISTENER_SECURITY_PROTOCOL_MAP_CONFIG,
"PLAINTEXT:PLAINTEXT,SSL:SSL,CONTROLLER:PLAINTEXT")
val config = KafkaConfig.fromProps(props)
- assertEquals(Some("SSL://localhost:9092"),
config.listeners.find(_.listenerName.value ==
"SSL").map(JTestUtils.endpointToString))
- assertEquals(Some("PLAINTEXT://localhost:9091"),
config.listeners.find(_.listenerName.value ==
"PLAINTEXT").map(JTestUtils.endpointToString))
+ assertEquals(Some("SSL://localhost:9092"),
config.listeners.find(_.listener == "SSL").map(JTestUtils.endpointToString))
+ assertEquals(Some("PLAINTEXT://localhost:9091"),
config.listeners.find(_.listener ==
"PLAINTEXT").map(JTestUtils.endpointToString))
}
private def listenerListToEndPoints(listenerList: String,
diff --git
a/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala
b/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala
index 9bf4d4d7e00..dd5968055e0 100644
--- a/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala
+++ b/core/src/test/scala/unit/kafka/server/RegistrationTestContext.scala
@@ -66,7 +66,7 @@ class RegistrationTestContext(
val controllerEpoch = new AtomicInteger(123)
config.effectiveAdvertisedBrokerListeners.foreach { ep =>
advertisedListeners.add(new Listener().setHost(ep.host).
- setName(ep.listenerName.value()).
+ setName(ep.listener).
setPort(ep.port.shortValue()).
setSecurityProtocol(ep.securityProtocol.id))
}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 8281509f609..f237ee3a339 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -181,7 +181,7 @@ object TestUtils extends Logging {
listenerName: ListenerName
): String = {
brokers.map { s =>
- val listener =
s.config.effectiveAdvertisedBrokerListeners.find(_.listenerName ==
listenerName).getOrElse(
+ val listener =
s.config.effectiveAdvertisedBrokerListeners.find(_.listener ==
listenerName.value).getOrElse(
sys.error(s"Could not find listener with name ${listenerName.value}"))
formatAddress(listener.host, s.boundPort(listenerName))
}.mkString(",")
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
index 4f5e9dcca2d..56b2bdf375e 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/BrokerRegistration.java
@@ -85,7 +85,7 @@ public class BrokerRegistration {
public Builder setListeners(List<Endpoint> listeners) {
Map<String, Endpoint> listenersMap = new HashMap<>();
for (Endpoint endpoint : listeners) {
- listenersMap.put(endpoint.listenerName().get(), endpoint);
+ listenersMap.put(endpoint.listener(), endpoint);
}
this.listeners = listenersMap;
return this;
@@ -170,7 +170,7 @@ public class BrokerRegistration {
this.incarnationId = incarnationId;
Map<String, Endpoint> newListeners = new HashMap<>(listeners.size());
for (Entry<String, Endpoint> entry : listeners.entrySet()) {
- if (entry.getValue().listenerName().isEmpty()) {
+ if (entry.getValue().listener().isEmpty()) {
throw new IllegalArgumentException("Broker listeners must be
named.");
}
newListeners.put(entry.getKey(), entry.getValue());
diff --git a/metadata/src/main/java/org/apache/kafka/metadata/ListenerInfo.java
b/metadata/src/main/java/org/apache/kafka/metadata/ListenerInfo.java
index 294e8b0cbd9..58fe7bc0324 100644
--- a/metadata/src/main/java/org/apache/kafka/metadata/ListenerInfo.java
+++ b/metadata/src/main/java/org/apache/kafka/metadata/ListenerInfo.java
@@ -188,14 +188,14 @@ public final class ListenerInfo {
) {
LinkedHashMap<String, Endpoint> listeners = new LinkedHashMap<>();
for (Endpoint listener : rawListeners) {
- String name = listener.listenerName().get();
+ String name = listener.listener();
if (Optional.of(name).equals(firstListenerName)) {
listeners.put(name, listener);
break;
}
}
for (Endpoint listener : rawListeners) {
- String name = listener.listenerName().get();
+ String name = listener.listener();
if (!Optional.of(name).equals(firstListenerName)) {
listeners.put(name, listener);
}
@@ -236,11 +236,11 @@ public final class ListenerInfo {
if (entry.getValue().host() == null ||
entry.getValue().host().trim().isEmpty()) {
String newHost =
InetAddress.getLocalHost().getCanonicalHostName();
Endpoint prevEndpoint = entry.getValue();
- newListeners.put(entry.getKey(), new
Endpoint(prevEndpoint.listenerName().get(),
+ newListeners.put(entry.getKey(), new
Endpoint(prevEndpoint.listener(),
prevEndpoint.securityProtocol(),
newHost,
prevEndpoint.port()));
- log.info("{}: resolved wildcard host to {}",
entry.getValue().listenerName().get(),
+ log.info("{}: resolved wildcard host to {}",
entry.getValue().listener(),
newHost);
} else {
newListeners.put(entry.getKey(), entry.getValue());
@@ -268,9 +268,9 @@ public final class ListenerInfo {
Endpoint prevEndpoint = entry.getValue();
int newPort = getBoundPortCallback.apply(entry.getKey());
checkPortIsSerializable(newPort);
- log.info("{}: resolved ephemeral port to {}",
entry.getValue().listenerName().get(),
+ log.info("{}: resolved ephemeral port to {}",
entry.getValue().listener(),
newPort);
- newListeners.put(entry.getKey(), new
Endpoint(prevEndpoint.listenerName().get(),
+ newListeners.put(entry.getKey(), new
Endpoint(prevEndpoint.listener(),
prevEndpoint.securityProtocol(),
prevEndpoint.host(),
newPort));
@@ -309,7 +309,7 @@ public final class ListenerInfo {
checkHostIsSerializable(endpoint.host());
collection.add(new ControllerRegistrationRequestData.Listener().
setHost(endpoint.host()).
- setName(endpoint.listenerName().get()).
+ setName(endpoint.listener()).
setPort(endpoint.port()).
setSecurityProtocol(endpoint.securityProtocol().id));
});
@@ -324,7 +324,7 @@ public final class ListenerInfo {
checkHostIsSerializable(endpoint.host());
collection.add(new RegisterControllerRecord.ControllerEndpoint().
setHost(endpoint.host()).
- setName(endpoint.listenerName().get()).
+ setName(endpoint.listener()).
setPort(endpoint.port()).
setSecurityProtocol(endpoint.securityProtocol().id));
});
@@ -339,7 +339,7 @@ public final class ListenerInfo {
checkHostIsSerializable(endpoint.host());
collection.add(new BrokerRegistrationRequestData.Listener().
setHost(endpoint.host()).
- setName(endpoint.listenerName().get()).
+ setName(endpoint.listener()).
setPort(endpoint.port()).
setSecurityProtocol(endpoint.securityProtocol().id));
});
@@ -354,7 +354,7 @@ public final class ListenerInfo {
checkHostIsSerializable(endpoint.host());
collection.add(new RegisterBrokerRecord.BrokerEndpoint().
setHost(endpoint.host()).
- setName(endpoint.listenerName().get()).
+ setName(endpoint.listener()).
setPort(endpoint.port()).
setSecurityProtocol(endpoint.securityProtocol().id));
});
diff --git
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
index 474c593bb67..2d97683e14e 100644
---
a/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
+++
b/metadata/src/main/java/org/apache/kafka/metadata/authorizer/StandardAuthorizer.java
@@ -132,7 +132,7 @@ public class StandardAuthorizer implements
ClusterMetadataAuthorizer, Monitorabl
Map<Endpoint, CompletableFuture<Void>> result = new HashMap<>();
for (Endpoint endpoint : serverInfo.endpoints()) {
if (serverInfo.earlyStartListeners().contains(
- endpoint.listenerName().orElse(""))) {
+ endpoint.listener())) {
result.put(endpoint, CompletableFuture.completedFuture(null));
} else {
result.put(endpoint, initialLoadFuture);
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/ListenerInfoTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/ListenerInfoTest.java
index effde563917..24f8ebfd292 100644
--- a/metadata/src/test/java/org/apache/kafka/metadata/ListenerInfoTest.java
+++ b/metadata/src/test/java/org/apache/kafka/metadata/ListenerInfoTest.java
@@ -27,6 +27,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import java.util.ArrayList;
import java.util.List;
+import java.util.Optional;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
@@ -93,16 +94,16 @@ public class ListenerInfoTest {
endpoints.add(ALL.get((i + startIndex) % ALL.size()));
}
ListenerInfo listenerInfo = ListenerInfo.create(endpoints);
- assertEquals(ALL.get(startIndex).listenerName().get(),
- listenerInfo.firstListener().listenerName().get());
+ assertEquals(ALL.get(startIndex).listener(),
+ listenerInfo.firstListener().listener());
}
@ParameterizedTest
@ValueSource(ints = {0, 1, 2, 3})
public void testCreateWithExplicitFirstListener(int startIndex) {
- ListenerInfo listenerInfo =
ListenerInfo.create(ALL.get(startIndex).listenerName(), ALL);
- assertEquals(ALL.get(startIndex).listenerName().get(),
- listenerInfo.firstListener().listenerName().get());
+ ListenerInfo listenerInfo =
ListenerInfo.create(Optional.of(ALL.get(startIndex).listener()), ALL);
+ assertEquals(ALL.get(startIndex).listener(),
+ listenerInfo.firstListener().listener());
}
@Test
diff --git
a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
index b383d568c0e..b2f9fa68b3a 100644
---
a/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
+++
b/metadata/src/test/java/org/apache/kafka/metadata/authorizer/StandardAuthorizerTest.java
@@ -125,8 +125,8 @@ public class StandardAuthorizerTest {
public Collection<String> earlyStartListeners() {
List<String> result = new ArrayList<>();
for (Endpoint endpoint : endpoints) {
- if (endpoint.listenerName().get().equals("CONTROLLER")) {
- result.add(endpoint.listenerName().get());
+ if (endpoint.listener().equals("CONTROLLER")) {
+ result.add(endpoint.listener());
}
}
return result;
diff --git
a/server-common/src/main/java/org/apache/kafka/server/network/EndpointReadyFutures.java
b/server-common/src/main/java/org/apache/kafka/server/network/EndpointReadyFutures.java
index b81b9b191f9..b02352cca04 100644
---
a/server-common/src/main/java/org/apache/kafka/server/network/EndpointReadyFutures.java
+++
b/server-common/src/main/java/org/apache/kafka/server/network/EndpointReadyFutures.java
@@ -113,7 +113,7 @@ public class EndpointReadyFutures {
List<String> notInInfo = new ArrayList<>();
for (Endpoint endpoint : effectiveStartFutures.keySet()) {
if (!info.endpoints().contains(endpoint)) {
-
notInInfo.add(endpoint.listenerName().orElse("[none]"));
+ notInInfo.add(endpoint.listener());
}
}
throw new RuntimeException("Found authorizer futures that
weren't included " +
@@ -146,7 +146,7 @@ public class EndpointReadyFutures {
final CompletableFuture<Void> future;
EndpointReadyFuture(Endpoint endpoint, Collection<String> stageNames) {
- this.endpointName = endpoint.listenerName().orElse("UNNAMED");
+ this.endpointName = endpoint.listener();
this.incomplete = new TreeSet<>(stageNames);
this.future = new CompletableFuture<>();
}
diff --git
a/server-common/src/test/java/org/apache/kafka/server/network/EndpointReadyFuturesTest.java
b/server-common/src/test/java/org/apache/kafka/server/network/EndpointReadyFuturesTest.java
index 649fa9cd64b..8ec3711ff8c 100644
---
a/server-common/src/test/java/org/apache/kafka/server/network/EndpointReadyFuturesTest.java
+++
b/server-common/src/test/java/org/apache/kafka/server/network/EndpointReadyFuturesTest.java
@@ -56,7 +56,7 @@ public final class EndpointReadyFuturesTest {
Endpoint... endpoints
) {
for (Endpoint endpoint : endpoints) {
- String name = endpoint.listenerName().get();
+ String name = endpoint.listener();
CompletableFuture<Void> future =
readyFutures.futures().get(endpoint);
assertNotNull(future, "Unable to find future for " + name);
assertTrue(future.isDone(), "Future for " + name + " is not
done.");
diff --git a/server/src/main/java/org/apache/kafka/network/EndPoint.java
b/server/src/main/java/org/apache/kafka/network/EndPoint.java
deleted file mode 100644
index ab1a0e81d6b..00000000000
--- a/server/src/main/java/org/apache/kafka/network/EndPoint.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.kafka.network;
-
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.network.ListenerName;
-import org.apache.kafka.common.security.auth.SecurityProtocol;
-
-import java.util.Locale;
-
-public record EndPoint(
- String host,
- int port,
- ListenerName listenerName,
- SecurityProtocol securityProtocol
-) {
- public static String parseListenerName(String connectionString) {
- int firstColon = connectionString.indexOf(':');
- if (firstColon < 0) {
- throw new KafkaException("Unable to parse a listener name from " +
connectionString);
- }
- return connectionString.substring(0,
firstColon).toUpperCase(Locale.ROOT);
- }
-
- public static EndPoint fromPublic(org.apache.kafka.common.Endpoint
endpoint) {
- return new EndPoint(endpoint.host(), endpoint.port(),
- new ListenerName(endpoint.listenerName().get()),
endpoint.securityProtocol());
- }
-
- public org.apache.kafka.common.Endpoint toPublic() {
- return new org.apache.kafka.common.Endpoint(listenerName.value(),
securityProtocol, host, port);
- }
-}
\ No newline at end of file
diff --git
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
index 041b24780c1..e2361749318 100644
---
a/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
+++
b/test-common/test-common-runtime/src/main/java/org/apache/kafka/common/test/TestUtils.java
@@ -18,13 +18,13 @@ package org.apache.kafka.common.test;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.common.Endpoint;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.network.ListenerName;
import org.apache.kafka.common.utils.Exit;
import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.network.EndPoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -106,10 +106,10 @@ public class TestUtils {
/**
* Convert EndPoint to String
*/
- public static String endpointToString(EndPoint endPoint) {
+ public static String endpointToString(Endpoint endPoint) {
String host = endPoint.host();
int port = endPoint.port();
- ListenerName listenerName = endPoint.listenerName();
+ ListenerName listenerName =
ListenerName.normalised(endPoint.listener());
String hostport = (host == null) ? (":" + port) :
Utils.formatAddress(host, port);
return listenerName.value() + "://" + hostport;
diff --git
a/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
b/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
index a286524af4d..668f1c172fa 100644
--- a/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
+++ b/tools/src/main/java/org/apache/kafka/tools/MetadataQuorumCommand.java
@@ -395,10 +395,10 @@ public class MetadataQuorumCommand {
Map<String, Endpoint> listeners = new HashMap<>();
SocketServerConfigs.listenerListToEndPoints(
props.getOrDefault(SocketServerConfigs.LISTENERS_CONFIG,
"").toString(),
- __ -> SecurityProtocol.PLAINTEXT).forEach(e ->
listeners.put(e.listenerName().get(), e));
+ __ -> SecurityProtocol.PLAINTEXT).forEach(e ->
listeners.put(e.listener(), e));
SocketServerConfigs.listenerListToEndPoints(
props.getOrDefault(SocketServerConfigs.ADVERTISED_LISTENERS_CONFIG,
"").toString(),
- __ -> SecurityProtocol.PLAINTEXT).forEach(e ->
listeners.put(e.listenerName().get(), e));
+ __ -> SecurityProtocol.PLAINTEXT).forEach(e ->
listeners.put(e.listener(), e));
if (!props.containsKey(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG))
{
throw new
TerseException(KRaftConfigs.CONTROLLER_LISTENER_NAMES_CONFIG +
" was not found. Is this a valid controller configuration
file?");
@@ -412,7 +412,7 @@ public class MetadataQuorumCommand {
throw new TerseException("Cannot find information about
controller listener name: " +
listenerName);
}
- results.add(new RaftVoterEndpoint(endpoint.listenerName().get(),
+ results.add(new RaftVoterEndpoint(endpoint.listener(),
endpoint.host() == null ? "localhost" : endpoint.host(),
endpoint.port()));
}
@@ -443,7 +443,7 @@ public class MetadataQuorumCommand {
output.append(" and endpoints: ");
String prefix = "";
for (RaftVoterEndpoint endpoint : endpoints) {
- output.append(prefix).append(endpoint.name()).append("://");
+ output.append(prefix).append(endpoint.listener()).append("://");
if (endpoint.host().contains(":")) {
output.append("[");
}