jsancio commented on code in PR #18387:
URL: https://github.com/apache/kafka/pull/18387#discussion_r1942018528
##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -560,7 +560,17 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
controllerListenerNames.flatMap { name =>
controllerAdvertisedListeners
.find(endpoint =>
endpoint.listenerName.equals(ListenerName.normalised(name)))
- .orElse(controllerListenersValue.find(endpoint =>
endpoint.listenerName.equals(ListenerName.normalised(name))))
+ .orElse(
+ // If users don't define advertised.listeners, the advertised
controller listeners inherit from listeners configuration
+ // which match listener names in controller.listener.names. Also,
removing "0.0.0.0" host to avoid validation errors.
+ controllerListenersValue
+ .find(endpoint =>
endpoint.listenerName.equals(ListenerName.normalised(name)))
+ .map(endpoint => if (endpoint.host == "0.0.0.0") {
+ new EndPoint(null, endpoint.port, endpoint.listenerName,
endpoint.securityProtocol)
Review Comment:
I think that `null` or `""` works in some network because in the
`ControllerServer` code we do a reverse lookup.
```scala
val listenerInfo = ListenerInfo
.create(config.effectiveAdvertisedControllerListeners.map(_.toJava).asJava)
.withWildcardHostnamesResolved()
.withEphemeralPortsCorrected(name => socketServer.boundPort(new
ListenerName(name)))
```
Notice the use of `withWildcardHostnamesResolved`. Can you document this in
the comment above?
I should also mentioned that `0.0.0.0` is IPv4. Kafka also supports IPv6
which is encoded as `::`.
##########
core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala:
##########
@@ -1278,6 +1278,44 @@ class KafkaConfigTest {
KafkaConfig.fromProps(props)
}
+ @Test
+ def
testImplicitAllBindingControllerListenersCanBeAdvertisedForKRaftCombined():
Unit = {
+ val props = new Properties()
+ props.setProperty(KRaftConfigs.PROCESS_ROLES_CONFIG, "controller")
Review Comment:
The name of the method doesn't look correct since the process role is just
controller.
##########
core/src/main/scala/kafka/server/KafkaConfig.scala:
##########
@@ -575,7 +585,8 @@ class KafkaConfig private(doLog: Boolean, val props:
util.Map[_, _])
if (advertisedListenersProp != null) {
CoreUtils.listenerListToEndPoints(advertisedListenersProp,
effectiveListenerSecurityProtocolMap, requireDistinctPorts=false)
} else {
- listeners
+ // Only use implicit broker listeners here. Implicit controller
listeners are handled in effectiveAdvertisedControllerListeners.
+ listeners.filterNot(l =>
controllerListenerNames.contains(l.listenerName.value()))
Review Comment:
Do you need this? `advertisedListeners` is a private method that contains
all of the advertise listeners if it is set or listeners if it is not set.
`effectiveAdvertisedBrokerListeners` and
`effectiveAdvertisedControllerListeners` do the necessary filtering.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]