This is an automated email from the ASF dual-hosted git repository.

lhotari pushed a commit to branch 
lh-support-webservice-bindaddresses-and-advertised-listeners
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit 8d528cd83daf1b8a1ee7656d3620f3a7e9fd307a
Author: Lari Hotari <[email protected]>
AuthorDate: Wed May 20 22:34:28 2026 +0300

    [improve][broker] Default internalListenerName and synthesize internal 
listener from legacy ports
    
    Replace the brittle "use the first advertised listener as internal" rule 
with a
    named default. `internalListenerName` now defaults to `internal` and the
    internal advertised listener is auto-derived from the legacy port 
properties so
    that existing deployments keep working without explicit 
`advertisedListeners`.
    
    ServiceConfiguration / configs:
    - Add `DEFAULT_INTERNAL_LISTENER_NAME = "internal"` and default the field 
to it.
    - Rewrite the `advertisedListeners`, `internalListenerName`, and 
`bindAddresses`
      docs (in code, broker.conf, and standalone.conf) to describe the merge
      semantics, the ip:port uniqueness rule, and the cluster-internal intent 
of the
      internal listener (incl. the k8s caveat).
    
    MultipleListenerValidator:
    - Synthesize the internal `AdvertisedListener` from `brokerServicePort`,
      `brokerServicePortTls`, `webServicePort`, and `webServicePortTls`.
    - Merge with any explicit URLs declared under the internal listener: 
explicit
      URLs win, legacy-port URLs fill in the remaining slots, so users can 
override
      one URL without redeclaring the rest.
    - Fail with a clear error if no internal listener can be assembled.
    
    BindAddressValidator:
    - Tag migrated legacy bindings with `internalListenerName` (was `null`).
    - Tolerate exact duplicates (same `listener:scheme://ip:port`).
    - Reject conflicts where the same URI maps to different listener names.
    - Reject the same `ip:port` carrying two different schemes (a TCP socket can
      only host one scheme); skip port 0 since the OS assigns distinct ephemeral
      ports per socket.
    
    PulsarService.start():
    - After binding, unconditionally sync `brokerServicePort` /
      `brokerServicePortTls` from the actually bound channels (was only 
refreshing
      the `Optional.of(0)` case) and re-compute the cached `advertisedListeners`
      map so it reflects the dynamic ports.
    
    ServiceConfigurationUtils.getAppliedAdvertisedAddress:
    - Tolerate a synthesized listener that has only a subset of the four URLs
      populated; pick the first non-null URL to derive the host.
    
    Tests:
    - MultipleListenerValidatorTest: cover defaulting, listener-name 
customization,
      port-synthesis (binary, web, both, partial), merge semantics, and failure
      paths.
    - BindAddressValidatorTest: cover migrated-binding listener naming, 
duplicate
      tolerance, listener-name conflict, ip:port cross-scheme conflict, port-0
      exemption from the ip:port check, and the legacy-migration collision case.
    - DynamicBindAddressesIntegrationTest: end-to-end test that boots a broker
      purely from 
`bindAddresses=internal:pulsar://0.0.0.0:0,internal:http://0.0.0.0:0`,
      verifies the runtime ports flow back into the configuration and the
      advertised listener, hostname fallback when `advertisedAddress` is null,
      `pulsar.getBrokerId()` is populated, and that `PulsarAdmin` and 
`PulsarClient`
      work over the dynamic ports.
    - MultipleInternalBindAddressesTest: confirms that when the internal 
listener
      has multiple bindings for the same scheme, the first one becomes the 
primary
      and is what `pulsar.getBrokerServiceUrl()`, 
`pulsar.getWebServiceAddress()`,
      and the synthesized internal advertised listener point at.
---
 conf/broker.conf                                   |  40 +++--
 conf/standalone.conf                               |  25 ++-
 .../apache/pulsar/broker/ServiceConfiguration.java |  55 ++++--
 .../pulsar/broker/ServiceConfigurationUtils.java   |  20 ++-
 .../broker/validator/BindAddressValidator.java     |  84 ++++++---
 .../validator/MultipleListenerValidator.java       | 120 +++++++++++--
 .../broker/validator/BindAddressValidatorTest.java | 152 +++++++++++++---
 .../validator/MultipleListenerValidatorTest.java   | 178 +++++++++++++++++--
 .../org/apache/pulsar/broker/PulsarService.java    |  17 +-
 .../web/DynamicBindAddressesIntegrationTest.java   | 197 +++++++++++++++++++++
 .../web/MultipleInternalBindAddressesTest.java     | 131 ++++++++++++++
 11 files changed, 908 insertions(+), 111 deletions(-)

diff --git a/conf/broker.conf b/conf/broker.conf
index d6086f1e389..3d5ad632b42 100644
--- a/conf/broker.conf
+++ b/conf/broker.conf
@@ -65,24 +65,44 @@ webServiceTlsCiphers=
 # Hostname or IP address the service binds on, default is 0.0.0.0.
 bindAddress=0.0.0.0
 
-# Extra bind addresses for the service: 
<listener_name>:<scheme>://<host>:<port>,[...]
+# Bind addresses for the broker.
+# Comma-separated list of <listener_name>:<scheme>://<ip>:<port> entries.
+# Supported schemes: pulsar, pulsar+ssl, http, https.
+# The <ip> part selects which local network interface the port binds to: use a 
specific local
+# IP, or 0.0.0.0 to bind on all interfaces. A local hostname is also accepted 
but not
+# recommended.
+# At runtime, the legacy brokerServicePort, brokerServicePortTls, 
webServicePort, and
+# webServicePortTls properties are migrated into the bind address list (using 
bindAddress,
+# default 0.0.0.0, as the IP) under the listener named by 
internalListenerName, and merged with
+# the entries declared here.
+# Each ip:port may be bound by exactly one (listener, scheme) pair — a TCP 
socket cannot serve
+# two protocol schemes at once. An entry here that exactly matches a migrated 
legacy binding
+# (same listener:scheme://ip:port) is tolerated; assigning the same ip:port to 
a different
+# listener or to a different scheme fails validation.
+# Binding the same listener to multiple ports of the same scheme is allowed 
but rarely useful.
 bindAddresses=
 
 # Hostname or IP address the service advertises to the outside world.
 # If not set, the value of InetAddress.getLocalHost().getCanonicalHostName() 
is used.
 advertisedAddress=
 
-# Used to specify multiple advertised listeners for the broker.
-# The value must format as <listener_name>:pulsar://<host>:<port>,
-# multiple listeners should separate with commas.
-# Do not use this configuration with advertisedAddress and brokerServicePort.
-# The Default value is absent means use advertisedAddress and 
brokerServicePort.
+# Advertised listeners for the broker.
+# Comma-separated list of <listener_name>:<scheme>://<host>:<port> entries.
+# Supported schemes: pulsar, pulsar+ssl, http, https.
+# A listener name may be repeated to declare multiple schemes for the same 
listener.
+# URLs declared here for the listener named by internalListenerName take 
precedence; any URL
+# slots left undeclared are filled in from brokerServicePort, 
brokerServicePortTls,
+# webServicePort, and webServicePortTls, so existing deployments keep working 
after upgrade.
 # advertisedListeners=
 
-# Used to specify the internal listener name for the broker.
-# The listener name must contain in the advertisedListeners.
-# The Default value is absent, the broker uses the first listener as the 
internal listener.
-# internalListenerName=
+# Name of the listener used for cluster-internal broker-to-broker 
communication (lookup
+# redirects, replication, admin forwarding).
+# The internal listener must advertise an address reachable from other brokers 
in the same
+# cluster. It can also serve external traffic when the same DNS names and IPs 
are routable from
+# outside the cluster; this is typically not the case with Kubernetes, where 
the broker pod IP
+# is only reachable in-cluster — configure a separate non-internal listener 
for external clients
+# in that situation.
+internalListenerName=internal
 
 # Enable or disable the HAProxy protocol.
 # If true, the real IP addresses of consumers and producers can be obtained 
when getting topic statistics data.
diff --git a/conf/standalone.conf b/conf/standalone.conf
index 15038c6aa4f..8d4c07e7f01 100644
--- a/conf/standalone.conf
+++ b/conf/standalone.conf
@@ -40,13 +40,36 @@ webServicePort=8080
 # Hostname or IP address the service binds on, default is 0.0.0.0.
 bindAddress=0.0.0.0
 
-# Extra bind addresses for the service: 
<listener_name>:<scheme>://<host>:<port>,[...]
+# Bind addresses for the broker.
+# Comma-separated list of <listener_name>:<scheme>://<ip>:<port> entries.
+# Supported schemes: pulsar, pulsar+ssl, http, https.
+# The <ip> part selects which local network interface the port binds to: use a 
specific local
+# IP, or 0.0.0.0 to bind on all interfaces. A local hostname is also accepted 
but not
+# recommended.
+# At runtime, the legacy brokerServicePort, brokerServicePortTls, 
webServicePort, and
+# webServicePortTls properties are migrated into the bind address list (using 
bindAddress,
+# default 0.0.0.0, as the IP) under the listener named by 
internalListenerName, and merged with
+# the entries declared here.
+# Each ip:port may be bound by exactly one (listener, scheme) pair — a TCP 
socket cannot serve
+# two protocol schemes at once. An entry here that exactly matches a migrated 
legacy binding
+# (same listener:scheme://ip:port) is tolerated; assigning the same ip:port to 
a different
+# listener or to a different scheme fails validation.
+# Binding the same listener to multiple ports of the same scheme is allowed 
but rarely useful.
 bindAddresses=
 
 # Hostname or IP address the service advertises to the outside world.
 # If not set, the value of InetAddress.getLocalHost().getCanonicalHostName() 
is used.
 advertisedAddress=
 
+# Name of the listener used for cluster-internal broker-to-broker 
communication (lookup
+# redirects, replication, admin forwarding).
+# The internal listener must advertise an address reachable from other brokers 
in the same
+# cluster. It can also serve external traffic when the same DNS names and IPs 
are routable from
+# outside the cluster; this is typically not the case with Kubernetes, where 
the broker pod IP
+# is only reachable in-cluster — configure a separate non-internal listener 
for external clients
+# in that situation.
+internalListenerName=internal
+
 # Enable or disable the HAProxy protocol.
 # If true, the real IP addresses of consumers and producers can be obtained 
when getting topic statistics data.
 haProxyProtocolEnabled=false
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
index 6d8c65d1c1c..9ffdbf57880 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java
@@ -65,6 +65,12 @@ import org.apache.pulsar.metadata.impl.ZKMetadataStore;
 @ToString
 public class ServiceConfiguration implements PulsarConfiguration {
 
+    /**
+     * Default name of the internal advertised listener used for 
broker-to-broker communication
+     * within a Pulsar cluster.
+     */
+    public static final String DEFAULT_INTERNAL_LISTENER_NAME = "internal";
+
     @Category
     private static final String CATEGORY_SERVER = "Server";
     @Category
@@ -226,26 +232,45 @@ public class ServiceConfiguration implements 
PulsarConfiguration {
     private String advertisedAddress;
 
     @FieldContext(category = CATEGORY_SERVER,
-            doc = "Used to specify multiple advertised listeners for the 
broker."
-                    + " The value must format as 
<listener_name>:pulsar://<host>:<port>,"
-                    + "multiple listeners should separate with commas."
-                    + "Do not use this configuration with advertisedAddress 
and brokerServicePort."
-                    + "The Default value is absent means use advertisedAddress 
and brokerServicePort.")
+            doc = "Advertised listeners for the broker.\n"
+                    + " Comma-separated list of 
`<listener_name>:<scheme>://<host>:<port>` entries."
+                    + " Supported schemes: `pulsar`, `pulsar+ssl`, `http`, 
`https`."
+                    + " A listener name may be repeated to declare multiple 
schemes for the same listener.\n"
+                    + " URLs declared here for the listener named by 
`internalListenerName` take precedence;"
+                    + " any URL slots left undeclared are filled in from 
`brokerServicePort`,"
+                    + " `brokerServicePortTls`, `webServicePort`, and 
`webServicePortTls`, so existing"
+                    + " deployments keep working after upgrade.")
     private String advertisedListeners;
 
     @FieldContext(category = CATEGORY_SERVER,
-            doc = "Used to specify the internal listener name for the broker."
-                    + "The listener name must contain in the 
advertisedListeners."
-                    + "The Default value is absent, the broker uses the first 
listener as the internal listener.")
-    private String internalListenerName;
+            doc = "Name of the listener used for cluster-internal 
broker-to-broker communication"
+                    + " (lookup redirects, replication, admin forwarding).\n"
+                    + " The internal listener must advertise an address 
reachable from other brokers in"
+                    + " the same cluster. It can also serve external traffic 
when the same DNS names and"
+                    + " IPs are routable from outside the cluster; this is 
typically not the case with"
+                    + " Kubernetes, where the broker pod IP is only reachable 
in-cluster — configure a"
+                    + " separate non-internal listener for external clients in 
that situation.\n"
+                    + " Defaults to `internal`.")
+    private String internalListenerName = DEFAULT_INTERNAL_LISTENER_NAME;
 
     @FieldContext(category = CATEGORY_SERVER,
-            doc = "Used to specify additional bind addresses for the broker."
-                    + " The value must format as 
<listener_name>:<scheme>://<host>:<port>,"
-                    + " multiple bind addresses should be separated with 
commas."
-                    + " Associates each bind address with an advertised 
listener and protocol handler."
-                    + " Note that the brokerServicePort, brokerServicePortTls, 
webServicePort, and"
-                    + " webServicePortTls properties define additional 
bindings.")
+            doc = "Bind addresses for the broker.\n"
+                    + " Comma-separated list of 
`<listener_name>:<scheme>://<ip>:<port>` entries."
+                    + " Supported schemes: `pulsar`, `pulsar+ssl`, `http`, 
`https`."
+                    + " The `<ip>` part selects which local network interface 
the port binds to:"
+                    + " use a specific local IP, or `0.0.0.0` to bind on all 
interfaces."
+                    + " A local hostname is also accepted but not 
recommended.\n"
+                    + " At runtime, the legacy `brokerServicePort`, 
`brokerServicePortTls`,"
+                    + " `webServicePort`, and `webServicePortTls` properties 
are migrated into the bind"
+                    + " address list (using `bindAddress`, default `0.0.0.0`, 
as the IP) under the listener"
+                    + " named by `internalListenerName`, and merged with the 
entries declared here.\n"
+                    + " Each `ip:port` may be bound by exactly one (listener, 
scheme) pair — a TCP socket"
+                    + " cannot serve two protocol schemes at once. An entry 
here that exactly matches a"
+                    + " migrated legacy binding (same 
`listener:scheme://ip:port`) is tolerated; assigning"
+                    + " the same `ip:port` to a different listener or to a 
different scheme fails"
+                    + " validation.\n"
+                    + " Binding the same listener to multiple ports of the 
same scheme is allowed but rarely"
+                    + " useful.")
     private String bindAddresses;
 
     @FieldContext(category = CATEGORY_SERVER,
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java
index f208a894aea..fb7d5d34706 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java
@@ -70,9 +70,14 @@ public class ServiceConfigurationUtils {
 
         AdvertisedListener advertisedListener = 
result.get(configuration.getInternalListenerName());
         if (advertisedListener != null && !ignoreAdvertisedListener) {
-            String address = 
advertisedListener.getBrokerServiceUrl().getHost();
-            if (address != null) {
-                return address;
+            // The listener may have been synthesized from a subset of the 
legacy ports, so any of
+            // the four URLs may be null. Pick the first non-null URL to 
derive the host.
+            URI url = firstNonNullUri(advertisedListener.getBrokerServiceUrl(),
+                    advertisedListener.getBrokerServiceUrlTls(),
+                    advertisedListener.getBrokerHttpUrl(),
+                    advertisedListener.getBrokerHttpsUrl());
+            if (url != null && url.getHost() != null) {
+                return url.getHost();
             }
         }
 
@@ -113,6 +118,15 @@ public class ServiceConfigurationUtils {
         return port.map(p -> URI.create(String.format("%s://%s:%d", scheme, 
hostname, p))).orElse(null);
     }
 
+    private static URI firstNonNullUri(URI... uris) {
+        for (URI uri : uris) {
+            if (uri != null) {
+                return uri;
+            }
+        }
+        return null;
+    }
+
     /**
      * Gets the web service address (hostname).
      */
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/BindAddressValidator.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/BindAddressValidator.java
index 358ccba6c24..41ae9452d2a 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/BindAddressValidator.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/BindAddressValidator.java
@@ -25,10 +25,9 @@ import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.function.Function;
+import java.util.Objects;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-import java.util.stream.Collectors;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
@@ -43,13 +42,27 @@ public class BindAddressValidator {
 
     /**
      * Validate the configuration of `bindAddresses`.
-     * @param config the pulsar broker configure.
+     *
+     * <p>Bindings derived from the legacy port properties 
(`brokerServicePort`, `brokerServicePortTls`,
+     * `webServicePort`, `webServicePortTls`) are associated with the listener 
identified by
+     * `internalListenerName` and merged with the entries declared in 
`bindAddresses`. Exact duplicates
+     * (same `listener:scheme://ip:port`) are tolerated. Two failure modes are 
rejected:
+     * <ol>
+     *   <li>the same {@code scheme://ip:port} mapped to different listener 
names, and
+     *   <li>the same {@code ip:port} mapped to two different protocol schemes 
— because a TCP socket
+     *       can only be bound by one process, an IP+port can carry at most 
one scheme.
+     * </ol>
+     *
+     * @param config the pulsar broker configuration.
      * @param schemes a filter on the schemes of the bind addresses, or null 
to not apply a filter.
      * @return a list of bind addresses.
      */
     public static List<BindAddress> validateBindAddresses(ServiceConfiguration 
config, Collection<String> schemes) {
-        // migrate the existing configuration properties
-        List<BindAddress> addresses = migrateBindAddresses(config);
+        String internalListenerName = 
StringUtils.defaultIfBlank(config.getInternalListenerName(),
+                ServiceConfiguration.DEFAULT_INTERNAL_LISTENER_NAME);
+
+        // migrate the legacy port-based configuration to bind addresses 
tagged with the internal listener name
+        List<BindAddress> addresses = migrateBindAddresses(config, 
internalListenerName);
 
         // parse the list of additional bind addresses
         
Arrays.stream(StringUtils.split(StringUtils.defaultString(config.getBindAddresses()),
 ","))
@@ -68,38 +81,69 @@ public class BindAddressValidator {
             addresses.removeIf(a -> 
!schemes.contains(a.getAddress().getScheme()));
         }
 
-        // remove duplicates so that a bind address migrated from legacy 
configuration properties is not duplicated
-        Map<URI, BindAddress> uniqueBindAddresses =
-                
addresses.stream().collect(Collectors.toMap(BindAddress::getAddress, 
Function.identity(), (a, b) -> {
-                    if (StringUtils.isNotBlank(a.getListenerName()) && 
StringUtils.isBlank(b.getListenerName())) {
-                        return a;
-                    }
-                    return b;
-                }, LinkedHashMap::new));
+        // Deduplicate by full URI (scheme + ip + port). Tolerate exact 
duplicates (same URI and
+        // same listener name) so that a user's bindAddresses entry that 
matches a migrated binding
+        // is accepted; reject same URI assigned to different listener names.
+        Map<URI, BindAddress> uniqueBindAddresses = new LinkedHashMap<>();
+        for (BindAddress addr : addresses) {
+            BindAddress existing = uniqueBindAddresses.get(addr.getAddress());
+            if (existing == null) {
+                uniqueBindAddresses.put(addr.getAddress(), addr);
+            } else if (Objects.equals(existing.getListenerName(), 
addr.getListenerName())) {
+                // exact duplicate, tolerate
+                continue;
+            } else {
+                throw new IllegalArgumentException("bindAddresses: conflicting 
listener names for "
+                        + addr.getAddress() + ": `" + 
existing.getListenerName() + "` and `"
+                        + addr.getListenerName() + "`");
+            }
+        }
+
+        // ip:port uniqueness across protocol schemes. A TCP socket can only 
be bound by one
+        // listener+scheme combination, so two bindings that share host:port 
but differ in scheme
+        // (e.g. pulsar://0.0.0.0:8080 and http://0.0.0.0:8080) cannot both be 
active. Port 0 is
+        // skipped because it means "OS-assigned ephemeral port" — the kernel 
will hand out a unique
+        // port to each socket, so two port-0 entries with the same IP cannot 
actually collide.
+        Map<String, BindAddress> uniqueIpPort = new LinkedHashMap<>();
+        for (BindAddress addr : uniqueBindAddresses.values()) {
+            if (addr.getAddress().getPort() == 0) {
+                continue;
+            }
+            String ipPort = addr.getAddress().getHost() + ":" + 
addr.getAddress().getPort();
+            BindAddress prior = uniqueIpPort.putIfAbsent(ipPort, addr);
+            if (prior != null) {
+                throw new IllegalArgumentException("bindAddresses: ip:port `" 
+ ipPort
+                        + "` is bound by two schemes: `" + 
prior.getListenerName() + ":"
+                        + prior.getAddress().getScheme() + "` and `" + 
addr.getListenerName() + ":"
+                        + addr.getAddress().getScheme() + "`; an ip:port can 
carry only one scheme");
+            }
+        }
 
         return new ArrayList<>(uniqueBindAddresses.values());
     }
 
     /**
-     * Generates bind addresses based on legacy configuration properties.
+     * Generates bind addresses based on legacy configuration properties. The 
synthesized bindings are
+     * tagged with the {@code internalListenerName} so that {@link 
MultipleListenerValidator} and
+     * downstream code can correlate them with the internal advertised 
listener.
      */
-    private static List<BindAddress> migrateBindAddresses(ServiceConfiguration 
config) {
-        List<BindAddress> addresses = new ArrayList<>(2);
+    private static List<BindAddress> migrateBindAddresses(ServiceConfiguration 
config, String internalListenerName) {
+        List<BindAddress> addresses = new ArrayList<>(4);
         String bindAddress = config.getBindAddress();
         if (config.getBrokerServicePort().isPresent()) {
-            addresses.add(new BindAddress(null,
+            addresses.add(new BindAddress(internalListenerName,
                     
URI.create(ServiceConfigurationUtils.brokerUrl(bindAddress, 
config.getBrokerServicePort().get()))));
         }
         if (config.getBrokerServicePortTls().isPresent()) {
-            addresses.add(new BindAddress(null, URI.create(
+            addresses.add(new BindAddress(internalListenerName, URI.create(
                     ServiceConfigurationUtils.brokerUrlTls(bindAddress, 
config.getBrokerServicePortTls().get()))));
         }
         if (config.getWebServicePort().isPresent()) {
-            addresses.add(new BindAddress(null, URI.create(
+            addresses.add(new BindAddress(internalListenerName, URI.create(
                     ServiceConfigurationUtils.webServiceUrl(bindAddress, 
config.getWebServicePort().get()))));
         }
         if (config.getWebServicePortTls().isPresent()) {
-            addresses.add(new BindAddress(null, URI.create(
+            addresses.add(new BindAddress(internalListenerName, URI.create(
                     ServiceConfigurationUtils.webServiceUrlTls(bindAddress, 
config.getWebServicePortTls().get()))));
         }
         return addresses;
diff --git 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java
 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java
index c400078e5a1..c300da9682f 100644
--- 
a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java
+++ 
b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java
@@ -20,7 +20,6 @@ package org.apache.pulsar.broker.validator;
 
 import java.net.URI;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -29,6 +28,7 @@ import java.util.Set;
 import java.util.TreeSet;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.ServiceConfigurationUtils;
 import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
 
 /**
@@ -37,18 +37,79 @@ import 
org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
 public final class MultipleListenerValidator {
 
     /**
-     * Validate the configuration of `advertisedListeners`, 
`internalListenerName`.
-     * 1. `advertisedListeners` consists of a comma-separated list of 
endpoints.
-     * 2. Each endpoint consists of a listener name and an associated address 
(`listener:scheme://host:port`).
-     * 3. A listener name may be repeated to define both a non-TLS and a TLS 
endpoint.
-     * 4. Duplicate definitions are disallowed.
-     * 5. If `internalListenerName` is absent, set it to the first listener 
defined in `advertisedListeners`.
-     * @param config the pulsar broker configure.
-     * @return
+     * Validate the configuration of `advertisedListeners` and 
`internalListenerName`.
+     * <ol>
+     * <li>`advertisedListeners` is a comma-separated list of endpoints in the 
form
+     *     `listener:scheme://host:port`. Supported schemes are `pulsar`, 
`pulsar+ssl`, `http`, and `https`.
+     * <li>A listener name may be repeated to define multiple endpoints (e.g. 
binary and HTTPS) for the
+     *     same listener; duplicate definitions for the same scheme are 
rejected.
+     * <li>`internalListenerName` identifies the listener used for 
cluster-internal broker-to-broker
+     *     communication. It defaults to {@value 
ServiceConfiguration#DEFAULT_INTERNAL_LISTENER_NAME}.
+     *     An internal listener entry is synthesized from the legacy 
`brokerServicePort`,
+     *     `brokerServicePortTls`, `webServicePort`, and `webServicePortTls` 
properties; any URLs the
+     *     user has explicitly declared for the internal listener in 
`advertisedListeners` take
+     *     precedence and the legacy-port URLs only fill in the unspecified 
slots. This allows the
+     *     internal listener to override individual URLs (e.g. a custom TLS 
hostname) while still
+     *     benefiting from auto-population for the rest.
+     * </ol>
+     * @param config the pulsar broker configuration.
+     * @return the parsed and validated advertised listeners, keyed by 
listener name.
      */
     public static Map<String, AdvertisedListener> 
validateAndAnalysisAdvertisedListener(ServiceConfiguration config) {
+        String internalListenerName = 
StringUtils.defaultIfBlank(config.getInternalListenerName(),
+                ServiceConfiguration.DEFAULT_INTERNAL_LISTENER_NAME);
+        if (StringUtils.isBlank(config.getInternalListenerName())) {
+            config.setInternalListenerName(internalListenerName);
+        }
+
+        Map<String, AdvertisedListener> result = 
parseAdvertisedListeners(config);
+
+        // Merge the legacy-port-derived internal listener URLs into any 
explicit configuration.
+        // Explicit URLs in `advertisedListeners` take precedence; the 
legacy-port URLs fill in the
+        // unspecified slots so that broker-to-broker communication keeps 
working without forcing the
+        // user to redeclare every URL when they only want to override one.
+        AdvertisedListener fromLegacyPorts = 
buildInternalListenerFromLegacyPorts(config);
+        if (fromLegacyPorts != null) {
+            AdvertisedListener explicit = result.get(internalListenerName);
+            result.put(internalListenerName, mergeListeners(explicit, 
fromLegacyPorts));
+        }
+
+        if (!result.isEmpty() && !result.containsKey(internalListenerName)) {
+            throw new IllegalArgumentException("the `advertisedListeners` 
configuration does not contain "
+                    + "an entry for the internal listener `" + 
internalListenerName + "`, and the legacy "
+                    + "port properties are not configured so an internal 
listener cannot be synthesized");
+        }
+
+        return result;
+    }
+
+    /**
+     * Merges two {@link AdvertisedListener} entries. URLs from {@code 
override} take precedence; URLs
+     * from {@code fallback} only fill in the slots that {@code override} 
leaves null. Either argument
+     * may be null.
+     */
+    private static AdvertisedListener mergeListeners(AdvertisedListener 
override, AdvertisedListener fallback) {
+        if (override == null) {
+            return fallback;
+        }
+        if (fallback == null) {
+            return override;
+        }
+        return AdvertisedListener.builder()
+                .brokerServiceUrl(override.getBrokerServiceUrl() != null
+                        ? override.getBrokerServiceUrl() : 
fallback.getBrokerServiceUrl())
+                .brokerServiceUrlTls(override.getBrokerServiceUrlTls() != null
+                        ? override.getBrokerServiceUrlTls() : 
fallback.getBrokerServiceUrlTls())
+                .brokerHttpUrl(override.getBrokerHttpUrl() != null
+                        ? override.getBrokerHttpUrl() : 
fallback.getBrokerHttpUrl())
+                .brokerHttpsUrl(override.getBrokerHttpsUrl() != null
+                        ? override.getBrokerHttpsUrl() : 
fallback.getBrokerHttpsUrl())
+                .build();
+    }
+
+    private static Map<String, AdvertisedListener> 
parseAdvertisedListeners(ServiceConfiguration config) {
         if (StringUtils.isBlank(config.getAdvertisedListeners())) {
-            return Collections.emptyMap();
+            return new LinkedHashMap<>();
         }
         Optional<String> firstListenerName = Optional.empty();
         Map<String, List<String>> listeners = new LinkedHashMap<>();
@@ -59,20 +120,18 @@ public final class MultipleListenerValidator {
                         + str + " do not contain listener name");
             }
             String listenerName = StringUtils.trim(str.substring(0, index));
-            if (!firstListenerName.isPresent()) {
+            if (firstListenerName.isEmpty()) {
                 firstListenerName = Optional.of(listenerName);
             }
             String value = StringUtils.trim(str.substring(index + 1));
             listeners.computeIfAbsent(listenerName, k -> new ArrayList<>(2));
             listeners.get(listenerName).add(value);
         }
+        // For backward compatibility, if `internalListenerName` was left 
blank, default it to the first
+        // listener parsed from `advertisedListeners`.
         if (StringUtils.isBlank(config.getInternalListenerName())) {
             config.setInternalListenerName(firstListenerName.get());
         }
-        if (!listeners.containsKey(config.getInternalListenerName())) {
-            throw new IllegalArgumentException("the `advertisedListeners` 
configure do not contain "
-                    + "`internalListenerName` entry");
-        }
         final Map<String, AdvertisedListener> result = new LinkedHashMap<>();
         final Map<String, Set<String>> reverseMappings = new LinkedHashMap<>();
         for (final Map.Entry<String, List<String>> entry : 
listeners.entrySet()) {
@@ -136,4 +195,35 @@ public final class MultipleListenerValidator {
         return result;
     }
 
+    /**
+     * Synthesize an {@link AdvertisedListener} for the internal listener from 
the legacy port
+     * configuration (`brokerServicePort`, `brokerServicePortTls`, 
`webServicePort`,
+     * `webServicePortTls`). Returns {@code null} if no binary port and no web 
port is set; the caller
+     * is then responsible for raising an error if the internal listener is 
still missing after
+     * parsing `advertisedListeners`.
+     */
+    private static AdvertisedListener 
buildInternalListenerFromLegacyPorts(ServiceConfiguration config) {
+        boolean hasBinaryPort = config.getBrokerServicePort().isPresent()
+                || config.getBrokerServicePortTls().isPresent();
+        boolean hasWebPort = config.getWebServicePort().isPresent()
+                || config.getWebServicePortTls().isPresent();
+        if (!hasBinaryPort && !hasWebPort) {
+            return null;
+        }
+        String host = 
ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress());
+        return AdvertisedListener.builder()
+                .brokerServiceUrl(config.getBrokerServicePort()
+                        .map(port -> 
URI.create(ServiceConfigurationUtils.brokerUrl(host, port))).orElse(null))
+                .brokerServiceUrlTls(config.getBrokerServicePortTls()
+                        .map(port -> 
URI.create(ServiceConfigurationUtils.brokerUrlTls(host, port))).orElse(null))
+                .brokerHttpUrl(config.getWebServicePort()
+                        .map(port -> 
URI.create(ServiceConfigurationUtils.webServiceUrl(host, port))).orElse(null))
+                .brokerHttpsUrl(config.getWebServicePortTls()
+                        .map(port -> 
URI.create(ServiceConfigurationUtils.webServiceUrlTls(host, 
port))).orElse(null))
+                .build();
+    }
+
+    // Prevent instantiation
+    private MultipleListenerValidator() {
+    }
 }
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/BindAddressValidatorTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/BindAddressValidatorTest.java
index a1af57b633c..3495cb1b74c 100644
--- 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/BindAddressValidatorTest.java
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/BindAddressValidatorTest.java
@@ -18,7 +18,9 @@
  */
 package org.apache.pulsar.broker.validator;
 
-import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
 import java.net.URI;
 import java.util.Arrays;
 import java.util.Collections;
@@ -58,9 +60,9 @@ public class BindAddressValidatorTest {
         ServiceConfiguration config = newEmptyConfiguration();
         
config.setBindAddresses("internal:pulsar://0.0.0.0:6650,internal:pulsar+ssl://0.0.0.0:6651");
         List<BindAddress> addresses = 
BindAddressValidator.validateBindAddresses(config, null);
-        assertEquals(Arrays.asList(
+        assertEquals(addresses, Arrays.asList(
                 new BindAddress("internal", 
URI.create("pulsar://0.0.0.0:6650")),
-                new BindAddress("internal", 
URI.create("pulsar+ssl://0.0.0.0:6651"))), addresses);
+                new BindAddress("internal", 
URI.create("pulsar+ssl://0.0.0.0:6651"))));
     }
 
     @Test
@@ -68,9 +70,9 @@ public class BindAddressValidatorTest {
         ServiceConfiguration config = newEmptyConfiguration();
         
config.setBindAddresses("internal:pulsar://0.0.0.0:6650,external:pulsar+ssl://0.0.0.0:6651");
         List<BindAddress> addresses = 
BindAddressValidator.validateBindAddresses(config, null);
-        assertEquals(Arrays.asList(
+        assertEquals(addresses, Arrays.asList(
                 new BindAddress("internal", 
URI.create("pulsar://0.0.0.0:6650")),
-                new BindAddress("external", 
URI.create("pulsar+ssl://0.0.0.0:6651"))), addresses);
+                new BindAddress("external", 
URI.create("pulsar+ssl://0.0.0.0:6651"))));
     }
 
     @Test
@@ -82,20 +84,20 @@ public class BindAddressValidatorTest {
         config.setWebServicePortTls(Optional.of(443));
         config.setBindAddress("0.0.0.0");
         List<BindAddress> addresses = 
BindAddressValidator.validateBindAddresses(config, null);
-        assertEquals(Arrays.asList(
-                new BindAddress(null, URI.create("pulsar://0.0.0.0:6650")),
-                new BindAddress(null, URI.create("pulsar+ssl://0.0.0.0:6651")),
-                new BindAddress(null, URI.create("http://0.0.0.0:8080";)),
-                new BindAddress(null, URI.create("https://0.0.0.0:443";))), 
addresses);
+        assertEquals(addresses, Arrays.asList(
+                new BindAddress("internal", 
URI.create("pulsar://0.0.0.0:6650")),
+                new BindAddress("internal", 
URI.create("pulsar+ssl://0.0.0.0:6651")),
+                new BindAddress("internal", URI.create("http://0.0.0.0:8080";)),
+                new BindAddress("internal", 
URI.create("https://0.0.0.0:443";))));
     }
 
     @Test
     public void testMigrationWithDefaults() {
         ServiceConfiguration config = new ServiceConfiguration();
         List<BindAddress> addresses = 
BindAddressValidator.validateBindAddresses(config, null);
-        assertEquals(Arrays.asList(
-                new BindAddress(null, URI.create("pulsar://0.0.0.0:6650")),
-                new BindAddress(null, URI.create("http://0.0.0.0:8080";))), 
addresses);
+        assertEquals(addresses, Arrays.asList(
+                new BindAddress("internal", 
URI.create("pulsar://0.0.0.0:6650")),
+                new BindAddress("internal", 
URI.create("http://0.0.0.0:8080";))));
     }
 
     @Test
@@ -104,9 +106,9 @@ public class BindAddressValidatorTest {
         config.setBrokerServicePort(Optional.of(6650));
         config.setBindAddresses("extra:pulsar://0.0.0.0:6652");
         List<BindAddress> addresses = 
BindAddressValidator.validateBindAddresses(config, null);
-        assertEquals(Arrays.asList(
-                new BindAddress(null, URI.create("pulsar://0.0.0.0:6650")),
-                new BindAddress("extra", 
URI.create("pulsar://0.0.0.0:6652"))), addresses);
+        assertEquals(addresses, Arrays.asList(
+                new BindAddress("internal", 
URI.create("pulsar://0.0.0.0:6650")),
+                new BindAddress("extra", 
URI.create("pulsar://0.0.0.0:6652"))));
     }
 
     @Test
@@ -118,20 +120,118 @@ public class BindAddressValidatorTest {
 
         List<BindAddress> addresses;
         addresses = BindAddressValidator.validateBindAddresses(config, null);
-        assertEquals(Arrays.asList(
-                new BindAddress(null, URI.create("pulsar://0.0.0.0:6650")),
-                new BindAddress(null, URI.create("pulsar+ssl://0.0.0.0:6651")),
+        assertEquals(addresses, Arrays.asList(
+                new BindAddress("internal", 
URI.create("pulsar://0.0.0.0:6650")),
+                new BindAddress("internal", 
URI.create("pulsar+ssl://0.0.0.0:6651")),
                 new BindAddress("extra", URI.create("pulsar://0.0.0.0:6652")),
-                new BindAddress("extra", URI.create("http://0.0.0.0:8080";))), 
addresses);
+                new BindAddress("extra", URI.create("http://0.0.0.0:8080";))));
 
         addresses = BindAddressValidator.validateBindAddresses(config, 
Arrays.asList("pulsar", "pulsar+ssl"));
-        assertEquals(Arrays.asList(
-                new BindAddress(null, URI.create("pulsar://0.0.0.0:6650")),
-                new BindAddress(null, URI.create("pulsar+ssl://0.0.0.0:6651")),
-                new BindAddress("extra", 
URI.create("pulsar://0.0.0.0:6652"))), addresses);
+        assertEquals(addresses, Arrays.asList(
+                new BindAddress("internal", 
URI.create("pulsar://0.0.0.0:6650")),
+                new BindAddress("internal", 
URI.create("pulsar+ssl://0.0.0.0:6651")),
+                new BindAddress("extra", 
URI.create("pulsar://0.0.0.0:6652"))));
 
         addresses = BindAddressValidator.validateBindAddresses(config, 
Collections.singletonList("http"));
-        assertEquals(Collections.singletonList(
-                new BindAddress("extra", URI.create("http://0.0.0.0:8080";))), 
addresses);
+        assertEquals(addresses, Collections.singletonList(
+                new BindAddress("extra", URI.create("http://0.0.0.0:8080";))));
+    }
+
+    @Test
+    public void testMigrationUsesConfiguredInternalListenerName() {
+        ServiceConfiguration config = newEmptyConfiguration();
+        config.setInternalListenerName("region1");
+        config.setBrokerServicePort(Optional.of(6650));
+        config.setWebServicePort(Optional.of(8080));
+        List<BindAddress> addresses = 
BindAddressValidator.validateBindAddresses(config, null);
+        assertEquals(addresses, Arrays.asList(
+                new BindAddress("region1", 
URI.create("pulsar://0.0.0.0:6650")),
+                new BindAddress("region1", 
URI.create("http://0.0.0.0:8080";))));
+    }
+
+    @Test
+    public void testDuplicateMatchingMigratedBindingIsTolerated() {
+        // User explicitly re-declares the migrated binding under the same 
listener name; this must
+        // be accepted without throwing.
+        ServiceConfiguration config = newEmptyConfiguration();
+        config.setBrokerServicePort(Optional.of(6650));
+        config.setBindAddresses("internal:pulsar://0.0.0.0:6650");
+        List<BindAddress> addresses = 
BindAddressValidator.validateBindAddresses(config, null);
+        assertEquals(addresses, Collections.singletonList(
+                new BindAddress("internal", 
URI.create("pulsar://0.0.0.0:6650"))));
+    }
+
+    @Test
+    public void testDuplicateConflictFailsWithError() {
+        // The migrated binding uses the internal listener name, but the user 
re-declares the same
+        // URI under a different listener name. This is a conflict and must be 
rejected.
+        ServiceConfiguration config = newEmptyConfiguration();
+        config.setBrokerServicePort(Optional.of(6650));
+        config.setBindAddresses("external:pulsar://0.0.0.0:6650");
+        IllegalArgumentException e = 
expectThrows(IllegalArgumentException.class,
+                () -> BindAddressValidator.validateBindAddresses(config, 
null));
+        assertTrue(e.getMessage().contains("conflicting listener names"),
+                "expected conflict message but got: " + e.getMessage());
+    }
+
+    @Test
+    public void testDuplicateBindAddressesEntriesAreTolerated() {
+        // Two identical entries in bindAddresses should not cause a failure.
+        ServiceConfiguration config = newEmptyConfiguration();
+        
config.setBindAddresses("extra:pulsar://0.0.0.0:6652,extra:pulsar://0.0.0.0:6652");
+        List<BindAddress> addresses = 
BindAddressValidator.validateBindAddresses(config, null);
+        assertEquals(addresses, Collections.singletonList(
+                new BindAddress("extra", 
URI.create("pulsar://0.0.0.0:6652"))));
+    }
+
+    @Test
+    public void testSameIpPortDifferentSchemeSameListenerFails() {
+        // Same ip:port with two different schemes (even for the same 
listener) cannot both be bound.
+        ServiceConfiguration config = newEmptyConfiguration();
+        
config.setBindAddresses("internal:pulsar://0.0.0.0:8080,internal:http://0.0.0.0:8080";);
+        IllegalArgumentException e = 
expectThrows(IllegalArgumentException.class,
+                () -> BindAddressValidator.validateBindAddresses(config, 
null));
+        assertTrue(e.getMessage().contains("0.0.0.0:8080"),
+                "expected ip:port in message but got: " + e.getMessage());
+        assertTrue(e.getMessage().contains("only one scheme"),
+                "expected scheme-collision wording but got: " + 
e.getMessage());
+    }
+
+    @Test
+    public void testSameIpPortDifferentSchemeDifferentListenerFails() {
+        // Same ip:port with two different schemes and two different listener 
names also fails for
+        // the same reason: an ip:port can only be bound by one (listener, 
scheme).
+        ServiceConfiguration config = newEmptyConfiguration();
+        
config.setBindAddresses("internal:pulsar://0.0.0.0:8080,external:http://0.0.0.0:8080";);
+        IllegalArgumentException e = 
expectThrows(IllegalArgumentException.class,
+                () -> BindAddressValidator.validateBindAddresses(config, 
null));
+        assertTrue(e.getMessage().contains("0.0.0.0:8080"),
+                "expected ip:port in message but got: " + e.getMessage());
+    }
+
+    @Test
+    public void testDynamicPortZeroIsSkippedFromIpPortUniqueness() {
+        // Port 0 means "OS-assigned ephemeral port", so two port-0 bindings 
with the same IP cannot
+        // actually collide — each socket gets a different kernel-assigned 
port. The ip:port
+        // uniqueness check therefore skips port-0 entries.
+        ServiceConfiguration config = newEmptyConfiguration();
+        
config.setBindAddresses("internal:pulsar://0.0.0.0:0,internal:http://0.0.0.0:0";);
+        List<BindAddress> addresses = 
BindAddressValidator.validateBindAddresses(config, null);
+        assertEquals(addresses, Arrays.asList(
+                new BindAddress("internal", URI.create("pulsar://0.0.0.0:0")),
+                new BindAddress("internal", URI.create("http://0.0.0.0:0";))));
+    }
+
+    @Test
+    public void testLegacyMigrationIpPortCollisionFails() {
+        // brokerServicePort and webServicePort collide on the same ip:port: 
the migrated bindings
+        // would produce pulsar://0.0.0.0:8080 and http://0.0.0.0:8080, which 
cannot coexist.
+        ServiceConfiguration config = newEmptyConfiguration();
+        config.setBrokerServicePort(Optional.of(8080));
+        config.setWebServicePort(Optional.of(8080));
+        IllegalArgumentException e = 
expectThrows(IllegalArgumentException.class,
+                () -> BindAddressValidator.validateBindAddresses(config, 
null));
+        assertTrue(e.getMessage().contains("0.0.0.0:8080"),
+                "expected ip:port in message but got: " + e.getMessage());
     }
 }
diff --git 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java
 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java
index 7e79a40355f..438490d70c1 100644
--- 
a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java
+++ 
b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java
@@ -18,11 +18,18 @@
  */
 package org.apache.pulsar.broker.validator;
 
-import static org.testng.AssertJUnit.assertEquals;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
 import java.net.InetAddress;
+import java.net.URI;
+import java.util.Map;
 import java.util.Optional;
 import org.apache.pulsar.broker.ServiceConfiguration;
 import org.apache.pulsar.broker.ServiceConfigurationUtils;
+import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
 import org.testng.annotations.Test;
 
 /**
@@ -34,7 +41,8 @@ public class MultipleListenerValidatorTest {
     @SuppressWarnings("deprecation")
     @Test
     public void testGetAppliedAdvertised() throws Exception {
-        ServiceConfiguration config = new ServiceConfiguration();
+        ServiceConfiguration config = newConfigWithNoPorts();
+        config.setBrokerServicePort(Optional.of(6650));
         config.setBrokerServicePortTls(Optional.of(6651));
         config.setAdvertisedListeners("internal:pulsar://192.0.0.1:6660, 
internal:pulsar+ssl://192.0.0.1:6651");
         config.setInternalListenerName("internal");
@@ -43,7 +51,7 @@ public class MultipleListenerValidatorTest {
         
assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, 
true),
                 InetAddress.getLocalHost().getCanonicalHostName());
 
-        config = new ServiceConfiguration();
+        config = newConfigWithNoPorts();
         config.setBrokerServicePortTls(Optional.of(6651));
         config.setAdvertisedAddress("192.0.0.2");
         
assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, 
false),
@@ -60,7 +68,8 @@ public class MultipleListenerValidatorTest {
 
     @Test
     public void testListenerDefaulting() {
-        ServiceConfiguration config = new ServiceConfiguration();
+        ServiceConfiguration config = newConfigWithNoPorts();
+        config.setInternalListenerName(null);
         config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660, 
internal:pulsar+ssl://127.0.0.1:6651");
         
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
         assertEquals("internal", config.getInternalListenerName());
@@ -68,14 +77,14 @@ public class MultipleListenerValidatorTest {
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testMalformedListener() {
-        ServiceConfiguration config = new ServiceConfiguration();
+        ServiceConfiguration config = newConfigWithNoPorts();
         config.setAdvertisedListeners(":pulsar://127.0.0.1:6660");
         
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
     }
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testListenerDuplicate_1() {
-        ServiceConfiguration config = new ServiceConfiguration();
+        ServiceConfiguration config = newConfigWithNoPorts();
         config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660, 
internal:pulsar+ssl://127.0.0.1:6651,"
                 + " internal:pulsar://192.168.1.11:6660, 
internal:pulsar+ssl://192.168.1.11:6651");
         config.setInternalListenerName("internal");
@@ -84,7 +93,7 @@ public class MultipleListenerValidatorTest {
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testListenerDuplicate_2() {
-        ServiceConfiguration config = new ServiceConfiguration();
+        ServiceConfiguration config = newConfigWithNoPorts();
         config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660," + " 
internal:pulsar://192.168.1.11:6660");
         config.setInternalListenerName("internal");
         
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
@@ -92,7 +101,7 @@ public class MultipleListenerValidatorTest {
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testListenerDuplicate_3() {
-        ServiceConfiguration config = new ServiceConfiguration();
+        ServiceConfiguration config = newConfigWithNoPorts();
         config.setAdvertisedListeners(" internal:pulsar+ssl://127.0.0.1:6661,"
                 + " internal:pulsar+ssl://192.168.1.11:6661");
         config.setInternalListenerName("internal");
@@ -101,18 +110,159 @@ public class MultipleListenerValidatorTest {
 
     @Test(expectedExceptions = IllegalArgumentException.class)
     public void testDifferentListenerWithSameHostPort() {
-        ServiceConfiguration config = new ServiceConfiguration();
+        ServiceConfiguration config = newConfigWithNoPorts();
         config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660," + " 
external:pulsar://127.0.0.1:6660");
         config.setInternalListenerName("internal");
         
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
     }
 
-    @Test(expectedExceptions = IllegalArgumentException.class)
-    public void testWithoutListenerNameInAdvertisedListeners() {
+    @Test
+    public void testInternalListenerSynthesizedFromLegacyPorts() {
+        // No advertisedListeners configured, only legacy ports. The internal 
listener is synthesized.
+        ServiceConfiguration config = newConfigWithNoPorts();
+        config.setAdvertisedAddress("broker-1.example.com");
+        config.setBrokerServicePort(Optional.of(6650));
+        config.setWebServicePort(Optional.of(8080));
+        Map<String, AdvertisedListener> listeners =
+                
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+        AdvertisedListener internal = 
listeners.get(ServiceConfiguration.DEFAULT_INTERNAL_LISTENER_NAME);
+        assertNotNull(internal, "expected an `internal` listener to be 
synthesized from legacy ports");
+        assertEquals(internal.getBrokerServiceUrl(), 
URI.create("pulsar://broker-1.example.com:6650"));
+        assertEquals(internal.getBrokerHttpUrl(), 
URI.create("http://broker-1.example.com:8080";));
+        assertNull(internal.getBrokerServiceUrlTls());
+        assertNull(internal.getBrokerHttpsUrl());
+    }
+
+    @Test
+    public void testInternalListenerSynthesizedWithAllPorts() {
+        ServiceConfiguration config = newConfigWithNoPorts();
+        config.setAdvertisedAddress("broker-1.example.com");
+        config.setBrokerServicePort(Optional.of(6650));
+        config.setBrokerServicePortTls(Optional.of(6651));
+        config.setWebServicePort(Optional.of(8080));
+        config.setWebServicePortTls(Optional.of(8443));
+        Map<String, AdvertisedListener> listeners =
+                
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+        AdvertisedListener internal = 
listeners.get(ServiceConfiguration.DEFAULT_INTERNAL_LISTENER_NAME);
+        assertNotNull(internal);
+        assertEquals(internal.getBrokerServiceUrl(), 
URI.create("pulsar://broker-1.example.com:6650"));
+        assertEquals(internal.getBrokerServiceUrlTls(), 
URI.create("pulsar+ssl://broker-1.example.com:6651"));
+        assertEquals(internal.getBrokerHttpUrl(), 
URI.create("http://broker-1.example.com:8080";));
+        assertEquals(internal.getBrokerHttpsUrl(), 
URI.create("https://broker-1.example.com:8443";));
+    }
+
+    @Test
+    public void 
testInternalListenerAutoAddedWhenAdvertisedListenersDoesNotContainIt() {
+        // User configures advertisedListeners with their own listener name 
but does not declare the
+        // internal listener. The validator must auto-add it from the legacy 
ports.
+        ServiceConfiguration config = newConfigWithNoPorts();
+        config.setAdvertisedAddress("broker-1.example.com");
+        config.setBrokerServicePort(Optional.of(6650));
+        config.setWebServicePort(Optional.of(8080));
+        
config.setAdvertisedListeners("region1:pulsar://region1.example.com:6660,"
+                + "region1:pulsar+ssl://region1.example.com:6661");
+        Map<String, AdvertisedListener> listeners =
+                
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+        assertEquals(listeners.size(), 2);
+        AdvertisedListener region1 = listeners.get("region1");
+        assertNotNull(region1);
+        assertEquals(region1.getBrokerServiceUrl(), 
URI.create("pulsar://region1.example.com:6660"));
+        AdvertisedListener internal = listeners.get("internal");
+        assertNotNull(internal, "internal listener should have been auto-added 
from legacy ports");
+        assertEquals(internal.getBrokerServiceUrl(), 
URI.create("pulsar://broker-1.example.com:6650"));
+        assertEquals(internal.getBrokerHttpUrl(), 
URI.create("http://broker-1.example.com:8080";));
+    }
+
+    @Test
+    public void 
testExplicitInternalListenerUrlsTakePrecedenceOverLegacyPorts() {
+        // When the user explicitly declares URLs for the internal listener in 
advertisedListeners, those
+        // URLs take precedence over the legacy-port-derived values. The 
legacy values still fill in any
+        // URL slots the user did not declare.
+        ServiceConfiguration config = newConfigWithNoPorts();
+        config.setAdvertisedAddress("broker-1.example.com");
+        config.setBrokerServicePort(Optional.of(6650));
+        config.setWebServicePort(Optional.of(8080));
+        
config.setAdvertisedListeners("internal:pulsar://explicit.example.com:6660,"
+                + "internal:http://explicit.example.com:8888";);
+        Map<String, AdvertisedListener> listeners =
+                
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+        assertEquals(listeners.size(), 1);
+        AdvertisedListener internal = listeners.get("internal");
+        assertNotNull(internal);
+        // Both URLs were declared explicitly: the user-provided values win 
over the legacy ports.
+        assertEquals(internal.getBrokerServiceUrl(), 
URI.create("pulsar://explicit.example.com:6660"));
+        assertEquals(internal.getBrokerHttpUrl(), 
URI.create("http://explicit.example.com:8888";));
+        // The user did not declare TLS variants and the legacy TLS ports are 
unset.
+        assertNull(internal.getBrokerServiceUrlTls());
+        assertNull(internal.getBrokerHttpsUrl());
+    }
+
+    @Test
+    public void testPartialInternalListenerOverrideMergesWithLegacyPorts() {
+        // The user overrides only one URL (the binary endpoint) for the 
internal listener. The remaining
+        // URLs must be filled in from the legacy-port configuration so the 
listener is complete.
+        ServiceConfiguration config = newConfigWithNoPorts();
+        config.setAdvertisedAddress("broker-1.example.com");
+        config.setBrokerServicePort(Optional.of(6650));
+        config.setBrokerServicePortTls(Optional.of(6651));
+        config.setWebServicePort(Optional.of(8080));
+        config.setWebServicePortTls(Optional.of(8443));
+        
config.setAdvertisedListeners("internal:pulsar://override.example.com:6660");
+        Map<String, AdvertisedListener> listeners =
+                
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+        AdvertisedListener internal = listeners.get("internal");
+        assertNotNull(internal);
+        // Overridden by the explicit advertisedListeners entry:
+        assertEquals(internal.getBrokerServiceUrl(), 
URI.create("pulsar://override.example.com:6660"));
+        // Filled in from the legacy ports:
+        assertEquals(internal.getBrokerServiceUrlTls(), 
URI.create("pulsar+ssl://broker-1.example.com:6651"));
+        assertEquals(internal.getBrokerHttpUrl(), 
URI.create("http://broker-1.example.com:8080";));
+        assertEquals(internal.getBrokerHttpsUrl(), 
URI.create("https://broker-1.example.com:8443";));
+    }
+
+    @Test
+    public void testInternalListenerNameCustomizable() {
+        // The internal listener name is configurable; the validator must look 
for the configured name.
+        ServiceConfiguration config = newConfigWithNoPorts();
+        config.setInternalListenerName("region1");
+        config.setAdvertisedAddress("broker-1.example.com");
+        config.setBrokerServicePort(Optional.of(6650));
+        config.setWebServicePort(Optional.of(8080));
+        
config.setAdvertisedListeners("region1:pulsar://region1.example.com:6660,"
+                + "region1:http://region1.example.com:8888";);
+        Map<String, AdvertisedListener> listeners =
+                
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+        assertEquals(listeners.size(), 1);
+        AdvertisedListener region1 = listeners.get("region1");
+        assertNotNull(region1);
+        assertEquals(region1.getBrokerServiceUrl(), 
URI.create("pulsar://region1.example.com:6660"));
+    }
+
+    @Test
+    public void testFailureWhenNoInternalListenerAvailable() {
+        // Custom internal listener name + no matching entry in 
advertisedListeners + no ports for the
+        // legacy fallback => validation must fail with a clear message.
+        ServiceConfiguration config = newConfigWithNoPorts();
+        config.setInternalListenerName("region1");
+        
config.setAdvertisedListeners("region2:pulsar://region2.example.com:6660");
+        IllegalArgumentException e = 
expectThrows(IllegalArgumentException.class,
+                () -> 
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config));
+        assertTrue(e.getMessage().contains("region1"), "expected error to 
mention the listener name: "
+                + e.getMessage());
+    }
+
+    @Test
+    public void testInternalListenerNameDefaultsToInternalConstant() {
         ServiceConfiguration config = new ServiceConfiguration();
-        config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660, 
internal:pulsar+ssl://127.0.0.1:6651");
-        config.setInternalListenerName("external");
-        
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
+        assertEquals(config.getInternalListenerName(), 
ServiceConfiguration.DEFAULT_INTERNAL_LISTENER_NAME);
     }
 
+    private ServiceConfiguration newConfigWithNoPorts() {
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setBrokerServicePort(Optional.empty());
+        config.setBrokerServicePortTls(Optional.empty());
+        config.setWebServicePort(Optional.empty());
+        config.setWebServicePortTls(Optional.empty());
+        return config;
+    }
 }
diff --git 
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java 
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index c15153f3443..f85d12116ef 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -972,13 +972,16 @@ public class PulsarService implements AutoCloseable, 
ShutdownService {
             this.addWebServerHandlers(webService, metricsServlet, this.config);
             this.webService.start();
 
-            // Refresh addresses and update configuration, since the port 
might have been dynamically assigned
-            if (config.getBrokerServicePort().equals(Optional.of(0))) {
-                config.setBrokerServicePort(brokerService.getListenPort());
-            }
-            if (config.getBrokerServicePortTls().equals(Optional.of(0))) {
-                
config.setBrokerServicePortTls(brokerService.getListenPortTls());
-            }
+            // Refresh addresses and update configuration based on the actual 
bound ports. This is
+            // necessary both for dynamic ports (`Optional.of(0)`) and for the 
case where the broker
+            // is configured only via `bindAddresses` (legacy port properties 
left as
+            // `Optional.empty()`), so that downstream code — in particular
+            // MultipleListenerValidator.validateAndAnalysisAdvertisedListener 
— can synthesize the
+            // internal advertised listener from the now-known ports.
+            brokerService.getListenPort().ifPresent(port -> 
config.setBrokerServicePort(Optional.of(port)));
+            brokerService.getListenPortTls().ifPresent(port -> 
config.setBrokerServicePortTls(Optional.of(port)));
+            // Recompute the cached advertised listener map now that the bound 
ports are known.
+            this.advertisedListeners = 
MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config);
             this.webServiceAddress = webAddress(config);
             this.webServiceAddressTls = webAddressTls(config);
             this.brokerServiceUrl = brokerUrl(config);
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/DynamicBindAddressesIntegrationTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/DynamicBindAddressesIntegrationTest.java
new file mode 100644
index 00000000000..60cbc82f3ba
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/DynamicBindAddressesIntegrationTest.java
@@ -0,0 +1,197 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.web;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.net.InetAddress;
+import java.net.URI;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.Producer;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Integration test for configuring the broker entirely through {@code 
bindAddresses} with dynamic
+ * ports (port {@code 0}). Verifies that:
+ * <ul>
+ *   <li>None of the legacy {@code brokerServicePort}/{@code webServicePort} 
(and TLS variants)
+ *       need to be set when {@code bindAddresses} supplies {@code pulsar://} 
and {@code http://}
+ *       entries under the internal listener;</li>
+ *   <li>after the broker binds, the actual bound ports are reflected back 
into the configuration
+ *       and the internal advertised listener is synthesized with the real 
ports;</li>
+ *   <li>{@code advertisedAddress} defaults to the local hostname when left 
blank;</li>
+ *   <li>{@code PulsarAdmin} and {@code PulsarClient} can talk to the broker 
via the resolved
+ *       service URLs.</li>
+ * </ul>
+ *
+ * <p>This exercises the post-bind dependency: only after the Netty/Jetty 
servers have bound is
+ * the dynamic port known, and only then can the advertised listener map be 
populated.
+ */
+@Test(groups = "broker")
+public class DynamicBindAddressesIntegrationTest extends 
MockedPulsarServiceBaseTest {
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+        // Provision the standard `public/default` namespace that the 
admin/client tests below use.
+        // This cannot be done from inside the constructor flow because the 
admin client only becomes
+        // reachable after the dynamic web port has been bound and surfaced 
into the configuration.
+        admin.clusters().createCluster(conf.getClusterName(),
+                
ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
+        admin.tenants().createTenant("public",
+                TenantInfo.builder()
+                        .allowedClusters(Set.of(conf.getClusterName()))
+                        .build());
+        admin.namespaces().createNamespace("public/default");
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        // Clear every legacy port property — the broker must come up purely 
from `bindAddresses`.
+        conf.setBrokerServicePort(Optional.empty());
+        conf.setBrokerServicePortTls(Optional.empty());
+        conf.setWebServicePort(Optional.empty());
+        conf.setWebServicePortTls(Optional.empty());
+        // Bind to all interfaces on a dynamically assigned port for both 
schemes, tagged with the
+        // internal listener so the connectors are selected as the primary 
ones for broker-to-broker
+        // communication.
+        
conf.setBindAddresses("internal:pulsar://0.0.0.0:0,internal:http://0.0.0.0:0";);
+        // Force `advertisedAddress` to be blank so we exercise the 
hostname-fallback path.
+        conf.setAdvertisedAddress(null);
+        // No explicit `advertisedListeners` either; the internal listener 
must be synthesized from
+        // the dynamically bound ports.
+        conf.setAdvertisedListeners(null);
+    }
+
+    @Test
+    public void testRuntimeListenerSynthesizedFromDynamicBindAddresses() {
+        ServiceConfiguration runtimeConf = pulsar.getConfiguration();
+
+        // After binding, the legacy port properties must reflect the actual 
dynamic ports.
+        assertTrue(runtimeConf.getBrokerServicePort().isPresent(),
+                "brokerServicePort should be populated from the bound port");
+        assertTrue(runtimeConf.getWebServicePort().isPresent(),
+                "webServicePort should be populated from the bound port");
+        int boundPulsarPort = runtimeConf.getBrokerServicePort().get();
+        int boundWebPort = runtimeConf.getWebServicePort().get();
+        assertFalse(boundPulsarPort == 0, "the dynamic pulsar port must not 
still be 0");
+        assertFalse(boundWebPort == 0, "the dynamic web port must not still be 
0");
+        // TLS variants stay empty because no TLS binding was declared.
+        assertFalse(runtimeConf.getBrokerServicePortTls().isPresent());
+        assertFalse(runtimeConf.getWebServicePortTls().isPresent());
+
+        // The advertised address must default to the local hostname.
+        String expectedHost;
+        try {
+            expectedHost = InetAddress.getLocalHost().getCanonicalHostName();
+        } catch (Exception e) {
+            throw new AssertionError("unable to resolve canonical local 
hostname for the test", e);
+        }
+
+        // The internal listener must now exist in the cached map with both 
URLs populated.
+        Map<String, AdvertisedListener> listeners = 
pulsar.getAdvertisedListeners();
+        assertNotNull(listeners, "advertised listener map must not be null");
+        AdvertisedListener internal = 
listeners.get(ServiceConfiguration.DEFAULT_INTERNAL_LISTENER_NAME);
+        assertNotNull(internal,
+                "the `internal` listener must have been synthesized after 
binding, but the map was: "
+                        + listeners);
+        assertEquals(internal.getBrokerServiceUrl(),
+                URI.create("pulsar://" + expectedHost + ":" + 
boundPulsarPort));
+        assertEquals(internal.getBrokerHttpUrl(),
+                URI.create("http://"; + expectedHost + ":" + boundWebPort));
+
+        // The top-level service URLs must agree with the listener.
+        assertEquals(pulsar.getBrokerServiceUrl(), "pulsar://" + expectedHost 
+ ":" + boundPulsarPort);
+        assertEquals(pulsar.getWebServiceAddress(), "http://"; + expectedHost + 
":" + boundWebPort);
+
+        // The broker id is derived from the (post-bind) advertised address 
and web port.
+        String brokerId = pulsar.getBrokerId();
+        assertNotNull(brokerId, "brokerId must be initialized after start()");
+        assertEquals(brokerId, expectedHost + ":" + boundWebPort);
+    }
+
+    @Test
+    public void testPulsarAdminCanCreateTopicOverDynamicBindAddress() throws 
Exception {
+        String namespace = "public/default";
+        String topic = "persistent://" + namespace + "/dynamic-bind-admin-" + 
UUID.randomUUID();
+
+        // The `admin` field is built against the runtime web URL by 
MockedPulsarServiceBaseTest;
+        // exercise it with a real round-trip to confirm the URL is reachable.
+        try (PulsarAdmin localAdmin = PulsarAdmin.builder()
+                .serviceHttpUrl(pulsar.getWebServiceAddress())
+                .build()) {
+            localAdmin.topics().createNonPartitionedTopic(topic);
+            assertTrue(localAdmin.topics().getList(namespace).contains(topic),
+                    "topic should be listed under its namespace after 
creation");
+        }
+    }
+
+    @Test
+    public void testPulsarClientProduceConsumeOverDynamicBindAddress() throws 
Exception {
+        String topic = "persistent://public/default/dynamic-bind-pubsub-" + 
UUID.randomUUID();
+        String subscription = "test-sub";
+        String payload = "hello-dynamic-bind";
+
+        try (PulsarClient client = PulsarClient.builder()
+                .serviceUrl(pulsar.getBrokerServiceUrl())
+                .build()) {
+            try (Consumer<String> consumer = client.newConsumer(Schema.STRING)
+                    .topic(topic)
+                    .subscriptionName(subscription)
+                    
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
+                    .subscribe();
+                 Producer<String> producer = client.newProducer(Schema.STRING)
+                         .topic(topic)
+                         .create()) {
+                producer.send(payload);
+                Message<String> received = consumer.receive(10, 
TimeUnit.SECONDS);
+                assertNotNull(received, "consumer must receive the message 
produced over the dynamic port");
+                assertEquals(received.getValue(), payload);
+                consumer.acknowledge(received);
+            }
+        }
+    }
+}
diff --git 
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/MultipleInternalBindAddressesTest.java
 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/MultipleInternalBindAddressesTest.java
new file mode 100644
index 00000000000..8b55c4b4b88
--- /dev/null
+++ 
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/MultipleInternalBindAddressesTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.web;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertTrue;
+import java.net.InetAddress;
+import java.net.URI;
+import java.util.Optional;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest;
+import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Verifies behavior when the internal listener has more than one bind address 
for the same protocol
+ * scheme. In that case the broker accepts every binding but uses the first 
declared one (per scheme)
+ * as the primary, which is what drives {@code pulsar.getBrokerServiceUrl()},
+ * {@code pulsar.getWebServiceAddress()}, and the synthesized internal 
advertised listener.
+ */
+@Test(groups = "broker")
+public class MultipleInternalBindAddressesTest extends 
MockedPulsarServiceBaseTest {
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        super.internalSetup();
+    }
+
+    @AfterMethod(alwaysRun = true)
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @Override
+    protected void doInitConf() throws Exception {
+        super.doInitConf();
+        // Drive everything off `bindAddresses` so the validator/binder cannot 
reach back to legacy
+        // ports to find a "default" primary.
+        conf.setBrokerServicePort(Optional.empty());
+        conf.setBrokerServicePortTls(Optional.empty());
+        conf.setWebServicePort(Optional.empty());
+        conf.setWebServicePortTls(Optional.empty());
+        // Two pulsar bindings and two http bindings for the internal 
listener, on different IPs so
+        // the ip:port uniqueness check is satisfied (each combination is 
unique). Both use port 0
+        // so the OS assigns distinct dynamic ports.
+        conf.setBindAddresses(
+                "internal:pulsar://0.0.0.0:0,"
+                        + "internal:pulsar://127.0.0.1:0,"
+                        + "internal:http://0.0.0.0:0,";
+                        + "internal:http://127.0.0.1:0";);
+        // Hostname-fallback path for the advertised address.
+        conf.setAdvertisedAddress(null);
+        conf.setAdvertisedListeners(null);
+    }
+
+    @Test
+    public void testFirstInternalBindingPerSchemeBecomesPrimary() throws 
Exception {
+        ServiceConfiguration runtimeConf = pulsar.getConfiguration();
+
+        // The primary listen ports are surfaced via the dedicated methods. By 
contract, these come
+        // from the FIRST binding (per scheme) that matches the internal 
listener.
+        int primaryPulsarPort = pulsar.getBrokerListenPort().orElseThrow();
+        int primaryWebPort = 
pulsar.getWebService().getListenPortHTTP().orElseThrow();
+        assertTrue(primaryPulsarPort > 0, "primary pulsar port must be a real 
OS-assigned port");
+        assertTrue(primaryWebPort > 0, "primary web port must be a real 
OS-assigned port");
+        assertTrue(primaryPulsarPort != primaryWebPort,
+                "pulsar and web primaries must have been assigned distinct 
ports");
+
+        // After binding, the configuration's legacy ports must reflect the 
primary binding's ports
+        // (not the additional bindings' ports), which is what downstream 
synthesis depends on.
+        assertEquals(runtimeConf.getBrokerServicePort(), 
Optional.of(primaryPulsarPort));
+        assertEquals(runtimeConf.getWebServicePort(), 
Optional.of(primaryWebPort));
+
+        // The bindAddresses config string still records all the original 
entries the user declared,
+        // proving the multi-binding configuration was accepted up-front.
+        String declared = runtimeConf.getBindAddresses();
+        assertNotNull(declared);
+        assertEquals(countOccurrences(declared, "internal:pulsar://"), 2,
+                "two internal:pulsar:// bindings should be in the declared 
config: " + declared);
+        assertEquals(countOccurrences(declared, "internal:http://";), 2,
+                "two internal:http:// bindings should be in the declared 
config: " + declared);
+
+        // pulsar.getBrokerServiceUrl() / pulsar.getWebServiceAddress() must 
use the primary ports
+        // (not any of the additional bindings' ports).
+        String expectedHost = 
InetAddress.getLocalHost().getCanonicalHostName();
+        assertEquals(pulsar.getBrokerServiceUrl(), "pulsar://" + expectedHost 
+ ":" + primaryPulsarPort);
+        assertEquals(pulsar.getWebServiceAddress(), "http://"; + expectedHost + 
":" + primaryWebPort);
+
+        // The synthesized internal advertised listener must agree with the 
same primary ports.
+        AdvertisedListener internal =
+                
pulsar.getAdvertisedListeners().get(ServiceConfiguration.DEFAULT_INTERNAL_LISTENER_NAME);
+        assertNotNull(internal,
+                "the `internal` listener must have been synthesized after 
binding, but the map was: "
+                        + pulsar.getAdvertisedListeners());
+        assertEquals(internal.getBrokerServiceUrl(),
+                URI.create("pulsar://" + expectedHost + ":" + 
primaryPulsarPort));
+        assertEquals(internal.getBrokerHttpUrl(),
+                URI.create("http://"; + expectedHost + ":" + primaryWebPort));
+    }
+
+    private static int countOccurrences(String haystack, String needle) {
+        int count = 0;
+        int index = 0;
+        while ((index = haystack.indexOf(needle, index)) != -1) {
+            count++;
+            index += needle.length();
+        }
+        return count;
+    }
+}

Reply via email to