This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch lh-support-webservice-bindaddresses-and-advertised-listeners in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 8d528cd83daf1b8a1ee7656d3620f3a7e9fd307a Author: Lari Hotari <[email protected]> AuthorDate: Wed May 20 22:34:28 2026 +0300 [improve][broker] Default internalListenerName and synthesize internal listener from legacy ports Replace the brittle "use the first advertised listener as internal" rule with a named default. `internalListenerName` now defaults to `internal` and the internal advertised listener is auto-derived from the legacy port properties so that existing deployments keep working without explicit `advertisedListeners`. ServiceConfiguration / configs: - Add `DEFAULT_INTERNAL_LISTENER_NAME = "internal"` and default the field to it. - Rewrite the `advertisedListeners`, `internalListenerName`, and `bindAddresses` docs (in code, broker.conf, and standalone.conf) to describe the merge semantics, the ip:port uniqueness rule, and the cluster-internal intent of the internal listener (incl. the k8s caveat). MultipleListenerValidator: - Synthesize the internal `AdvertisedListener` from `brokerServicePort`, `brokerServicePortTls`, `webServicePort`, and `webServicePortTls`. - Merge with any explicit URLs declared under the internal listener: explicit URLs win, legacy-port URLs fill in the remaining slots, so users can override one URL without redeclaring the rest. - Fail with a clear error if no internal listener can be assembled. BindAddressValidator: - Tag migrated legacy bindings with `internalListenerName` (was `null`). - Tolerate exact duplicates (same `listener:scheme://ip:port`). - Reject conflicts where the same URI maps to different listener names. - Reject the same `ip:port` carrying two different schemes (a TCP socket can only host one scheme); skip port 0 since the OS assigns distinct ephemeral ports per socket. PulsarService.start(): - After binding, unconditionally sync `brokerServicePort` / `brokerServicePortTls` from the actually bound channels (was only refreshing the `Optional.of(0)` case) and re-compute the cached `advertisedListeners` map so it reflects the dynamic ports. ServiceConfigurationUtils.getAppliedAdvertisedAddress: - Tolerate a synthesized listener that has only a subset of the four URLs populated; pick the first non-null URL to derive the host. Tests: - MultipleListenerValidatorTest: cover defaulting, listener-name customization, port-synthesis (binary, web, both, partial), merge semantics, and failure paths. - BindAddressValidatorTest: cover migrated-binding listener naming, duplicate tolerance, listener-name conflict, ip:port cross-scheme conflict, port-0 exemption from the ip:port check, and the legacy-migration collision case. - DynamicBindAddressesIntegrationTest: end-to-end test that boots a broker purely from `bindAddresses=internal:pulsar://0.0.0.0:0,internal:http://0.0.0.0:0`, verifies the runtime ports flow back into the configuration and the advertised listener, hostname fallback when `advertisedAddress` is null, `pulsar.getBrokerId()` is populated, and that `PulsarAdmin` and `PulsarClient` work over the dynamic ports. - MultipleInternalBindAddressesTest: confirms that when the internal listener has multiple bindings for the same scheme, the first one becomes the primary and is what `pulsar.getBrokerServiceUrl()`, `pulsar.getWebServiceAddress()`, and the synthesized internal advertised listener point at. --- conf/broker.conf | 40 +++-- conf/standalone.conf | 25 ++- .../apache/pulsar/broker/ServiceConfiguration.java | 55 ++++-- .../pulsar/broker/ServiceConfigurationUtils.java | 20 ++- .../broker/validator/BindAddressValidator.java | 84 ++++++--- .../validator/MultipleListenerValidator.java | 120 +++++++++++-- .../broker/validator/BindAddressValidatorTest.java | 152 +++++++++++++--- .../validator/MultipleListenerValidatorTest.java | 178 +++++++++++++++++-- .../org/apache/pulsar/broker/PulsarService.java | 17 +- .../web/DynamicBindAddressesIntegrationTest.java | 197 +++++++++++++++++++++ .../web/MultipleInternalBindAddressesTest.java | 131 ++++++++++++++ 11 files changed, 908 insertions(+), 111 deletions(-) diff --git a/conf/broker.conf b/conf/broker.conf index d6086f1e389..3d5ad632b42 100644 --- a/conf/broker.conf +++ b/conf/broker.conf @@ -65,24 +65,44 @@ webServiceTlsCiphers= # Hostname or IP address the service binds on, default is 0.0.0.0. bindAddress=0.0.0.0 -# Extra bind addresses for the service: <listener_name>:<scheme>://<host>:<port>,[...] +# Bind addresses for the broker. +# Comma-separated list of <listener_name>:<scheme>://<ip>:<port> entries. +# Supported schemes: pulsar, pulsar+ssl, http, https. +# The <ip> part selects which local network interface the port binds to: use a specific local +# IP, or 0.0.0.0 to bind on all interfaces. A local hostname is also accepted but not +# recommended. +# At runtime, the legacy brokerServicePort, brokerServicePortTls, webServicePort, and +# webServicePortTls properties are migrated into the bind address list (using bindAddress, +# default 0.0.0.0, as the IP) under the listener named by internalListenerName, and merged with +# the entries declared here. +# Each ip:port may be bound by exactly one (listener, scheme) pair — a TCP socket cannot serve +# two protocol schemes at once. An entry here that exactly matches a migrated legacy binding +# (same listener:scheme://ip:port) is tolerated; assigning the same ip:port to a different +# listener or to a different scheme fails validation. +# Binding the same listener to multiple ports of the same scheme is allowed but rarely useful. bindAddresses= # Hostname or IP address the service advertises to the outside world. # If not set, the value of InetAddress.getLocalHost().getCanonicalHostName() is used. advertisedAddress= -# Used to specify multiple advertised listeners for the broker. -# The value must format as <listener_name>:pulsar://<host>:<port>, -# multiple listeners should separate with commas. -# Do not use this configuration with advertisedAddress and brokerServicePort. -# The Default value is absent means use advertisedAddress and brokerServicePort. +# Advertised listeners for the broker. +# Comma-separated list of <listener_name>:<scheme>://<host>:<port> entries. +# Supported schemes: pulsar, pulsar+ssl, http, https. +# A listener name may be repeated to declare multiple schemes for the same listener. +# URLs declared here for the listener named by internalListenerName take precedence; any URL +# slots left undeclared are filled in from brokerServicePort, brokerServicePortTls, +# webServicePort, and webServicePortTls, so existing deployments keep working after upgrade. # advertisedListeners= -# Used to specify the internal listener name for the broker. -# The listener name must contain in the advertisedListeners. -# The Default value is absent, the broker uses the first listener as the internal listener. -# internalListenerName= +# Name of the listener used for cluster-internal broker-to-broker communication (lookup +# redirects, replication, admin forwarding). +# The internal listener must advertise an address reachable from other brokers in the same +# cluster. It can also serve external traffic when the same DNS names and IPs are routable from +# outside the cluster; this is typically not the case with Kubernetes, where the broker pod IP +# is only reachable in-cluster — configure a separate non-internal listener for external clients +# in that situation. +internalListenerName=internal # Enable or disable the HAProxy protocol. # If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data. diff --git a/conf/standalone.conf b/conf/standalone.conf index 15038c6aa4f..8d4c07e7f01 100644 --- a/conf/standalone.conf +++ b/conf/standalone.conf @@ -40,13 +40,36 @@ webServicePort=8080 # Hostname or IP address the service binds on, default is 0.0.0.0. bindAddress=0.0.0.0 -# Extra bind addresses for the service: <listener_name>:<scheme>://<host>:<port>,[...] +# Bind addresses for the broker. +# Comma-separated list of <listener_name>:<scheme>://<ip>:<port> entries. +# Supported schemes: pulsar, pulsar+ssl, http, https. +# The <ip> part selects which local network interface the port binds to: use a specific local +# IP, or 0.0.0.0 to bind on all interfaces. A local hostname is also accepted but not +# recommended. +# At runtime, the legacy brokerServicePort, brokerServicePortTls, webServicePort, and +# webServicePortTls properties are migrated into the bind address list (using bindAddress, +# default 0.0.0.0, as the IP) under the listener named by internalListenerName, and merged with +# the entries declared here. +# Each ip:port may be bound by exactly one (listener, scheme) pair — a TCP socket cannot serve +# two protocol schemes at once. An entry here that exactly matches a migrated legacy binding +# (same listener:scheme://ip:port) is tolerated; assigning the same ip:port to a different +# listener or to a different scheme fails validation. +# Binding the same listener to multiple ports of the same scheme is allowed but rarely useful. bindAddresses= # Hostname or IP address the service advertises to the outside world. # If not set, the value of InetAddress.getLocalHost().getCanonicalHostName() is used. advertisedAddress= +# Name of the listener used for cluster-internal broker-to-broker communication (lookup +# redirects, replication, admin forwarding). +# The internal listener must advertise an address reachable from other brokers in the same +# cluster. It can also serve external traffic when the same DNS names and IPs are routable from +# outside the cluster; this is typically not the case with Kubernetes, where the broker pod IP +# is only reachable in-cluster — configure a separate non-internal listener for external clients +# in that situation. +internalListenerName=internal + # Enable or disable the HAProxy protocol. # If true, the real IP addresses of consumers and producers can be obtained when getting topic statistics data. haProxyProtocolEnabled=false diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java index 6d8c65d1c1c..9ffdbf57880 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfiguration.java @@ -65,6 +65,12 @@ import org.apache.pulsar.metadata.impl.ZKMetadataStore; @ToString public class ServiceConfiguration implements PulsarConfiguration { + /** + * Default name of the internal advertised listener used for broker-to-broker communication + * within a Pulsar cluster. + */ + public static final String DEFAULT_INTERNAL_LISTENER_NAME = "internal"; + @Category private static final String CATEGORY_SERVER = "Server"; @Category @@ -226,26 +232,45 @@ public class ServiceConfiguration implements PulsarConfiguration { private String advertisedAddress; @FieldContext(category = CATEGORY_SERVER, - doc = "Used to specify multiple advertised listeners for the broker." - + " The value must format as <listener_name>:pulsar://<host>:<port>," - + "multiple listeners should separate with commas." - + "Do not use this configuration with advertisedAddress and brokerServicePort." - + "The Default value is absent means use advertisedAddress and brokerServicePort.") + doc = "Advertised listeners for the broker.\n" + + " Comma-separated list of `<listener_name>:<scheme>://<host>:<port>` entries." + + " Supported schemes: `pulsar`, `pulsar+ssl`, `http`, `https`." + + " A listener name may be repeated to declare multiple schemes for the same listener.\n" + + " URLs declared here for the listener named by `internalListenerName` take precedence;" + + " any URL slots left undeclared are filled in from `brokerServicePort`," + + " `brokerServicePortTls`, `webServicePort`, and `webServicePortTls`, so existing" + + " deployments keep working after upgrade.") private String advertisedListeners; @FieldContext(category = CATEGORY_SERVER, - doc = "Used to specify the internal listener name for the broker." - + "The listener name must contain in the advertisedListeners." - + "The Default value is absent, the broker uses the first listener as the internal listener.") - private String internalListenerName; + doc = "Name of the listener used for cluster-internal broker-to-broker communication" + + " (lookup redirects, replication, admin forwarding).\n" + + " The internal listener must advertise an address reachable from other brokers in" + + " the same cluster. It can also serve external traffic when the same DNS names and" + + " IPs are routable from outside the cluster; this is typically not the case with" + + " Kubernetes, where the broker pod IP is only reachable in-cluster — configure a" + + " separate non-internal listener for external clients in that situation.\n" + + " Defaults to `internal`.") + private String internalListenerName = DEFAULT_INTERNAL_LISTENER_NAME; @FieldContext(category = CATEGORY_SERVER, - doc = "Used to specify additional bind addresses for the broker." - + " The value must format as <listener_name>:<scheme>://<host>:<port>," - + " multiple bind addresses should be separated with commas." - + " Associates each bind address with an advertised listener and protocol handler." - + " Note that the brokerServicePort, brokerServicePortTls, webServicePort, and" - + " webServicePortTls properties define additional bindings.") + doc = "Bind addresses for the broker.\n" + + " Comma-separated list of `<listener_name>:<scheme>://<ip>:<port>` entries." + + " Supported schemes: `pulsar`, `pulsar+ssl`, `http`, `https`." + + " The `<ip>` part selects which local network interface the port binds to:" + + " use a specific local IP, or `0.0.0.0` to bind on all interfaces." + + " A local hostname is also accepted but not recommended.\n" + + " At runtime, the legacy `brokerServicePort`, `brokerServicePortTls`," + + " `webServicePort`, and `webServicePortTls` properties are migrated into the bind" + + " address list (using `bindAddress`, default `0.0.0.0`, as the IP) under the listener" + + " named by `internalListenerName`, and merged with the entries declared here.\n" + + " Each `ip:port` may be bound by exactly one (listener, scheme) pair — a TCP socket" + + " cannot serve two protocol schemes at once. An entry here that exactly matches a" + + " migrated legacy binding (same `listener:scheme://ip:port`) is tolerated; assigning" + + " the same `ip:port` to a different listener or to a different scheme fails" + + " validation.\n" + + " Binding the same listener to multiple ports of the same scheme is allowed but rarely" + + " useful.") private String bindAddresses; @FieldContext(category = CATEGORY_SERVER, diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java index f208a894aea..fb7d5d34706 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/ServiceConfigurationUtils.java @@ -70,9 +70,14 @@ public class ServiceConfigurationUtils { AdvertisedListener advertisedListener = result.get(configuration.getInternalListenerName()); if (advertisedListener != null && !ignoreAdvertisedListener) { - String address = advertisedListener.getBrokerServiceUrl().getHost(); - if (address != null) { - return address; + // The listener may have been synthesized from a subset of the legacy ports, so any of + // the four URLs may be null. Pick the first non-null URL to derive the host. + URI url = firstNonNullUri(advertisedListener.getBrokerServiceUrl(), + advertisedListener.getBrokerServiceUrlTls(), + advertisedListener.getBrokerHttpUrl(), + advertisedListener.getBrokerHttpsUrl()); + if (url != null && url.getHost() != null) { + return url.getHost(); } } @@ -113,6 +118,15 @@ public class ServiceConfigurationUtils { return port.map(p -> URI.create(String.format("%s://%s:%d", scheme, hostname, p))).orElse(null); } + private static URI firstNonNullUri(URI... uris) { + for (URI uri : uris) { + if (uri != null) { + return uri; + } + } + return null; + } + /** * Gets the web service address (hostname). */ diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/BindAddressValidator.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/BindAddressValidator.java index 358ccba6c24..41ae9452d2a 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/BindAddressValidator.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/BindAddressValidator.java @@ -25,10 +25,9 @@ import java.util.Collection; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.function.Function; +import java.util.Objects; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.util.stream.Collectors; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.ServiceConfigurationUtils; @@ -43,13 +42,27 @@ public class BindAddressValidator { /** * Validate the configuration of `bindAddresses`. - * @param config the pulsar broker configure. + * + * <p>Bindings derived from the legacy port properties (`brokerServicePort`, `brokerServicePortTls`, + * `webServicePort`, `webServicePortTls`) are associated with the listener identified by + * `internalListenerName` and merged with the entries declared in `bindAddresses`. Exact duplicates + * (same `listener:scheme://ip:port`) are tolerated. Two failure modes are rejected: + * <ol> + * <li>the same {@code scheme://ip:port} mapped to different listener names, and + * <li>the same {@code ip:port} mapped to two different protocol schemes — because a TCP socket + * can only be bound by one process, an IP+port can carry at most one scheme. + * </ol> + * + * @param config the pulsar broker configuration. * @param schemes a filter on the schemes of the bind addresses, or null to not apply a filter. * @return a list of bind addresses. */ public static List<BindAddress> validateBindAddresses(ServiceConfiguration config, Collection<String> schemes) { - // migrate the existing configuration properties - List<BindAddress> addresses = migrateBindAddresses(config); + String internalListenerName = StringUtils.defaultIfBlank(config.getInternalListenerName(), + ServiceConfiguration.DEFAULT_INTERNAL_LISTENER_NAME); + + // migrate the legacy port-based configuration to bind addresses tagged with the internal listener name + List<BindAddress> addresses = migrateBindAddresses(config, internalListenerName); // parse the list of additional bind addresses Arrays.stream(StringUtils.split(StringUtils.defaultString(config.getBindAddresses()), ",")) @@ -68,38 +81,69 @@ public class BindAddressValidator { addresses.removeIf(a -> !schemes.contains(a.getAddress().getScheme())); } - // remove duplicates so that a bind address migrated from legacy configuration properties is not duplicated - Map<URI, BindAddress> uniqueBindAddresses = - addresses.stream().collect(Collectors.toMap(BindAddress::getAddress, Function.identity(), (a, b) -> { - if (StringUtils.isNotBlank(a.getListenerName()) && StringUtils.isBlank(b.getListenerName())) { - return a; - } - return b; - }, LinkedHashMap::new)); + // Deduplicate by full URI (scheme + ip + port). Tolerate exact duplicates (same URI and + // same listener name) so that a user's bindAddresses entry that matches a migrated binding + // is accepted; reject same URI assigned to different listener names. + Map<URI, BindAddress> uniqueBindAddresses = new LinkedHashMap<>(); + for (BindAddress addr : addresses) { + BindAddress existing = uniqueBindAddresses.get(addr.getAddress()); + if (existing == null) { + uniqueBindAddresses.put(addr.getAddress(), addr); + } else if (Objects.equals(existing.getListenerName(), addr.getListenerName())) { + // exact duplicate, tolerate + continue; + } else { + throw new IllegalArgumentException("bindAddresses: conflicting listener names for " + + addr.getAddress() + ": `" + existing.getListenerName() + "` and `" + + addr.getListenerName() + "`"); + } + } + + // ip:port uniqueness across protocol schemes. A TCP socket can only be bound by one + // listener+scheme combination, so two bindings that share host:port but differ in scheme + // (e.g. pulsar://0.0.0.0:8080 and http://0.0.0.0:8080) cannot both be active. Port 0 is + // skipped because it means "OS-assigned ephemeral port" — the kernel will hand out a unique + // port to each socket, so two port-0 entries with the same IP cannot actually collide. + Map<String, BindAddress> uniqueIpPort = new LinkedHashMap<>(); + for (BindAddress addr : uniqueBindAddresses.values()) { + if (addr.getAddress().getPort() == 0) { + continue; + } + String ipPort = addr.getAddress().getHost() + ":" + addr.getAddress().getPort(); + BindAddress prior = uniqueIpPort.putIfAbsent(ipPort, addr); + if (prior != null) { + throw new IllegalArgumentException("bindAddresses: ip:port `" + ipPort + + "` is bound by two schemes: `" + prior.getListenerName() + ":" + + prior.getAddress().getScheme() + "` and `" + addr.getListenerName() + ":" + + addr.getAddress().getScheme() + "`; an ip:port can carry only one scheme"); + } + } return new ArrayList<>(uniqueBindAddresses.values()); } /** - * Generates bind addresses based on legacy configuration properties. + * Generates bind addresses based on legacy configuration properties. The synthesized bindings are + * tagged with the {@code internalListenerName} so that {@link MultipleListenerValidator} and + * downstream code can correlate them with the internal advertised listener. */ - private static List<BindAddress> migrateBindAddresses(ServiceConfiguration config) { - List<BindAddress> addresses = new ArrayList<>(2); + private static List<BindAddress> migrateBindAddresses(ServiceConfiguration config, String internalListenerName) { + List<BindAddress> addresses = new ArrayList<>(4); String bindAddress = config.getBindAddress(); if (config.getBrokerServicePort().isPresent()) { - addresses.add(new BindAddress(null, + addresses.add(new BindAddress(internalListenerName, URI.create(ServiceConfigurationUtils.brokerUrl(bindAddress, config.getBrokerServicePort().get())))); } if (config.getBrokerServicePortTls().isPresent()) { - addresses.add(new BindAddress(null, URI.create( + addresses.add(new BindAddress(internalListenerName, URI.create( ServiceConfigurationUtils.brokerUrlTls(bindAddress, config.getBrokerServicePortTls().get())))); } if (config.getWebServicePort().isPresent()) { - addresses.add(new BindAddress(null, URI.create( + addresses.add(new BindAddress(internalListenerName, URI.create( ServiceConfigurationUtils.webServiceUrl(bindAddress, config.getWebServicePort().get())))); } if (config.getWebServicePortTls().isPresent()) { - addresses.add(new BindAddress(null, URI.create( + addresses.add(new BindAddress(internalListenerName, URI.create( ServiceConfigurationUtils.webServiceUrlTls(bindAddress, config.getWebServicePortTls().get())))); } return addresses; diff --git a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java index c400078e5a1..c300da9682f 100644 --- a/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java +++ b/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/validator/MultipleListenerValidator.java @@ -20,7 +20,6 @@ package org.apache.pulsar.broker.validator; import java.net.URI; import java.util.ArrayList; -import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -29,6 +28,7 @@ import java.util.Set; import java.util.TreeSet; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.ServiceConfigurationUtils; import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener; /** @@ -37,18 +37,79 @@ import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener; public final class MultipleListenerValidator { /** - * Validate the configuration of `advertisedListeners`, `internalListenerName`. - * 1. `advertisedListeners` consists of a comma-separated list of endpoints. - * 2. Each endpoint consists of a listener name and an associated address (`listener:scheme://host:port`). - * 3. A listener name may be repeated to define both a non-TLS and a TLS endpoint. - * 4. Duplicate definitions are disallowed. - * 5. If `internalListenerName` is absent, set it to the first listener defined in `advertisedListeners`. - * @param config the pulsar broker configure. - * @return + * Validate the configuration of `advertisedListeners` and `internalListenerName`. + * <ol> + * <li>`advertisedListeners` is a comma-separated list of endpoints in the form + * `listener:scheme://host:port`. Supported schemes are `pulsar`, `pulsar+ssl`, `http`, and `https`. + * <li>A listener name may be repeated to define multiple endpoints (e.g. binary and HTTPS) for the + * same listener; duplicate definitions for the same scheme are rejected. + * <li>`internalListenerName` identifies the listener used for cluster-internal broker-to-broker + * communication. It defaults to {@value ServiceConfiguration#DEFAULT_INTERNAL_LISTENER_NAME}. + * An internal listener entry is synthesized from the legacy `brokerServicePort`, + * `brokerServicePortTls`, `webServicePort`, and `webServicePortTls` properties; any URLs the + * user has explicitly declared for the internal listener in `advertisedListeners` take + * precedence and the legacy-port URLs only fill in the unspecified slots. This allows the + * internal listener to override individual URLs (e.g. a custom TLS hostname) while still + * benefiting from auto-population for the rest. + * </ol> + * @param config the pulsar broker configuration. + * @return the parsed and validated advertised listeners, keyed by listener name. */ public static Map<String, AdvertisedListener> validateAndAnalysisAdvertisedListener(ServiceConfiguration config) { + String internalListenerName = StringUtils.defaultIfBlank(config.getInternalListenerName(), + ServiceConfiguration.DEFAULT_INTERNAL_LISTENER_NAME); + if (StringUtils.isBlank(config.getInternalListenerName())) { + config.setInternalListenerName(internalListenerName); + } + + Map<String, AdvertisedListener> result = parseAdvertisedListeners(config); + + // Merge the legacy-port-derived internal listener URLs into any explicit configuration. + // Explicit URLs in `advertisedListeners` take precedence; the legacy-port URLs fill in the + // unspecified slots so that broker-to-broker communication keeps working without forcing the + // user to redeclare every URL when they only want to override one. + AdvertisedListener fromLegacyPorts = buildInternalListenerFromLegacyPorts(config); + if (fromLegacyPorts != null) { + AdvertisedListener explicit = result.get(internalListenerName); + result.put(internalListenerName, mergeListeners(explicit, fromLegacyPorts)); + } + + if (!result.isEmpty() && !result.containsKey(internalListenerName)) { + throw new IllegalArgumentException("the `advertisedListeners` configuration does not contain " + + "an entry for the internal listener `" + internalListenerName + "`, and the legacy " + + "port properties are not configured so an internal listener cannot be synthesized"); + } + + return result; + } + + /** + * Merges two {@link AdvertisedListener} entries. URLs from {@code override} take precedence; URLs + * from {@code fallback} only fill in the slots that {@code override} leaves null. Either argument + * may be null. + */ + private static AdvertisedListener mergeListeners(AdvertisedListener override, AdvertisedListener fallback) { + if (override == null) { + return fallback; + } + if (fallback == null) { + return override; + } + return AdvertisedListener.builder() + .brokerServiceUrl(override.getBrokerServiceUrl() != null + ? override.getBrokerServiceUrl() : fallback.getBrokerServiceUrl()) + .brokerServiceUrlTls(override.getBrokerServiceUrlTls() != null + ? override.getBrokerServiceUrlTls() : fallback.getBrokerServiceUrlTls()) + .brokerHttpUrl(override.getBrokerHttpUrl() != null + ? override.getBrokerHttpUrl() : fallback.getBrokerHttpUrl()) + .brokerHttpsUrl(override.getBrokerHttpsUrl() != null + ? override.getBrokerHttpsUrl() : fallback.getBrokerHttpsUrl()) + .build(); + } + + private static Map<String, AdvertisedListener> parseAdvertisedListeners(ServiceConfiguration config) { if (StringUtils.isBlank(config.getAdvertisedListeners())) { - return Collections.emptyMap(); + return new LinkedHashMap<>(); } Optional<String> firstListenerName = Optional.empty(); Map<String, List<String>> listeners = new LinkedHashMap<>(); @@ -59,20 +120,18 @@ public final class MultipleListenerValidator { + str + " do not contain listener name"); } String listenerName = StringUtils.trim(str.substring(0, index)); - if (!firstListenerName.isPresent()) { + if (firstListenerName.isEmpty()) { firstListenerName = Optional.of(listenerName); } String value = StringUtils.trim(str.substring(index + 1)); listeners.computeIfAbsent(listenerName, k -> new ArrayList<>(2)); listeners.get(listenerName).add(value); } + // For backward compatibility, if `internalListenerName` was left blank, default it to the first + // listener parsed from `advertisedListeners`. if (StringUtils.isBlank(config.getInternalListenerName())) { config.setInternalListenerName(firstListenerName.get()); } - if (!listeners.containsKey(config.getInternalListenerName())) { - throw new IllegalArgumentException("the `advertisedListeners` configure do not contain " - + "`internalListenerName` entry"); - } final Map<String, AdvertisedListener> result = new LinkedHashMap<>(); final Map<String, Set<String>> reverseMappings = new LinkedHashMap<>(); for (final Map.Entry<String, List<String>> entry : listeners.entrySet()) { @@ -136,4 +195,35 @@ public final class MultipleListenerValidator { return result; } + /** + * Synthesize an {@link AdvertisedListener} for the internal listener from the legacy port + * configuration (`brokerServicePort`, `brokerServicePortTls`, `webServicePort`, + * `webServicePortTls`). Returns {@code null} if no binary port and no web port is set; the caller + * is then responsible for raising an error if the internal listener is still missing after + * parsing `advertisedListeners`. + */ + private static AdvertisedListener buildInternalListenerFromLegacyPorts(ServiceConfiguration config) { + boolean hasBinaryPort = config.getBrokerServicePort().isPresent() + || config.getBrokerServicePortTls().isPresent(); + boolean hasWebPort = config.getWebServicePort().isPresent() + || config.getWebServicePortTls().isPresent(); + if (!hasBinaryPort && !hasWebPort) { + return null; + } + String host = ServiceConfigurationUtils.getDefaultOrConfiguredAddress(config.getAdvertisedAddress()); + return AdvertisedListener.builder() + .brokerServiceUrl(config.getBrokerServicePort() + .map(port -> URI.create(ServiceConfigurationUtils.brokerUrl(host, port))).orElse(null)) + .brokerServiceUrlTls(config.getBrokerServicePortTls() + .map(port -> URI.create(ServiceConfigurationUtils.brokerUrlTls(host, port))).orElse(null)) + .brokerHttpUrl(config.getWebServicePort() + .map(port -> URI.create(ServiceConfigurationUtils.webServiceUrl(host, port))).orElse(null)) + .brokerHttpsUrl(config.getWebServicePortTls() + .map(port -> URI.create(ServiceConfigurationUtils.webServiceUrlTls(host, port))).orElse(null)) + .build(); + } + + // Prevent instantiation + private MultipleListenerValidator() { + } } diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/BindAddressValidatorTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/BindAddressValidatorTest.java index a1af57b633c..3495cb1b74c 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/BindAddressValidatorTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/BindAddressValidatorTest.java @@ -18,7 +18,9 @@ */ package org.apache.pulsar.broker.validator; -import static org.testng.AssertJUnit.assertEquals; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.expectThrows; import java.net.URI; import java.util.Arrays; import java.util.Collections; @@ -58,9 +60,9 @@ public class BindAddressValidatorTest { ServiceConfiguration config = newEmptyConfiguration(); config.setBindAddresses("internal:pulsar://0.0.0.0:6650,internal:pulsar+ssl://0.0.0.0:6651"); List<BindAddress> addresses = BindAddressValidator.validateBindAddresses(config, null); - assertEquals(Arrays.asList( + assertEquals(addresses, Arrays.asList( new BindAddress("internal", URI.create("pulsar://0.0.0.0:6650")), - new BindAddress("internal", URI.create("pulsar+ssl://0.0.0.0:6651"))), addresses); + new BindAddress("internal", URI.create("pulsar+ssl://0.0.0.0:6651")))); } @Test @@ -68,9 +70,9 @@ public class BindAddressValidatorTest { ServiceConfiguration config = newEmptyConfiguration(); config.setBindAddresses("internal:pulsar://0.0.0.0:6650,external:pulsar+ssl://0.0.0.0:6651"); List<BindAddress> addresses = BindAddressValidator.validateBindAddresses(config, null); - assertEquals(Arrays.asList( + assertEquals(addresses, Arrays.asList( new BindAddress("internal", URI.create("pulsar://0.0.0.0:6650")), - new BindAddress("external", URI.create("pulsar+ssl://0.0.0.0:6651"))), addresses); + new BindAddress("external", URI.create("pulsar+ssl://0.0.0.0:6651")))); } @Test @@ -82,20 +84,20 @@ public class BindAddressValidatorTest { config.setWebServicePortTls(Optional.of(443)); config.setBindAddress("0.0.0.0"); List<BindAddress> addresses = BindAddressValidator.validateBindAddresses(config, null); - assertEquals(Arrays.asList( - new BindAddress(null, URI.create("pulsar://0.0.0.0:6650")), - new BindAddress(null, URI.create("pulsar+ssl://0.0.0.0:6651")), - new BindAddress(null, URI.create("http://0.0.0.0:8080")), - new BindAddress(null, URI.create("https://0.0.0.0:443"))), addresses); + assertEquals(addresses, Arrays.asList( + new BindAddress("internal", URI.create("pulsar://0.0.0.0:6650")), + new BindAddress("internal", URI.create("pulsar+ssl://0.0.0.0:6651")), + new BindAddress("internal", URI.create("http://0.0.0.0:8080")), + new BindAddress("internal", URI.create("https://0.0.0.0:443")))); } @Test public void testMigrationWithDefaults() { ServiceConfiguration config = new ServiceConfiguration(); List<BindAddress> addresses = BindAddressValidator.validateBindAddresses(config, null); - assertEquals(Arrays.asList( - new BindAddress(null, URI.create("pulsar://0.0.0.0:6650")), - new BindAddress(null, URI.create("http://0.0.0.0:8080"))), addresses); + assertEquals(addresses, Arrays.asList( + new BindAddress("internal", URI.create("pulsar://0.0.0.0:6650")), + new BindAddress("internal", URI.create("http://0.0.0.0:8080")))); } @Test @@ -104,9 +106,9 @@ public class BindAddressValidatorTest { config.setBrokerServicePort(Optional.of(6650)); config.setBindAddresses("extra:pulsar://0.0.0.0:6652"); List<BindAddress> addresses = BindAddressValidator.validateBindAddresses(config, null); - assertEquals(Arrays.asList( - new BindAddress(null, URI.create("pulsar://0.0.0.0:6650")), - new BindAddress("extra", URI.create("pulsar://0.0.0.0:6652"))), addresses); + assertEquals(addresses, Arrays.asList( + new BindAddress("internal", URI.create("pulsar://0.0.0.0:6650")), + new BindAddress("extra", URI.create("pulsar://0.0.0.0:6652")))); } @Test @@ -118,20 +120,118 @@ public class BindAddressValidatorTest { List<BindAddress> addresses; addresses = BindAddressValidator.validateBindAddresses(config, null); - assertEquals(Arrays.asList( - new BindAddress(null, URI.create("pulsar://0.0.0.0:6650")), - new BindAddress(null, URI.create("pulsar+ssl://0.0.0.0:6651")), + assertEquals(addresses, Arrays.asList( + new BindAddress("internal", URI.create("pulsar://0.0.0.0:6650")), + new BindAddress("internal", URI.create("pulsar+ssl://0.0.0.0:6651")), new BindAddress("extra", URI.create("pulsar://0.0.0.0:6652")), - new BindAddress("extra", URI.create("http://0.0.0.0:8080"))), addresses); + new BindAddress("extra", URI.create("http://0.0.0.0:8080")))); addresses = BindAddressValidator.validateBindAddresses(config, Arrays.asList("pulsar", "pulsar+ssl")); - assertEquals(Arrays.asList( - new BindAddress(null, URI.create("pulsar://0.0.0.0:6650")), - new BindAddress(null, URI.create("pulsar+ssl://0.0.0.0:6651")), - new BindAddress("extra", URI.create("pulsar://0.0.0.0:6652"))), addresses); + assertEquals(addresses, Arrays.asList( + new BindAddress("internal", URI.create("pulsar://0.0.0.0:6650")), + new BindAddress("internal", URI.create("pulsar+ssl://0.0.0.0:6651")), + new BindAddress("extra", URI.create("pulsar://0.0.0.0:6652")))); addresses = BindAddressValidator.validateBindAddresses(config, Collections.singletonList("http")); - assertEquals(Collections.singletonList( - new BindAddress("extra", URI.create("http://0.0.0.0:8080"))), addresses); + assertEquals(addresses, Collections.singletonList( + new BindAddress("extra", URI.create("http://0.0.0.0:8080")))); + } + + @Test + public void testMigrationUsesConfiguredInternalListenerName() { + ServiceConfiguration config = newEmptyConfiguration(); + config.setInternalListenerName("region1"); + config.setBrokerServicePort(Optional.of(6650)); + config.setWebServicePort(Optional.of(8080)); + List<BindAddress> addresses = BindAddressValidator.validateBindAddresses(config, null); + assertEquals(addresses, Arrays.asList( + new BindAddress("region1", URI.create("pulsar://0.0.0.0:6650")), + new BindAddress("region1", URI.create("http://0.0.0.0:8080")))); + } + + @Test + public void testDuplicateMatchingMigratedBindingIsTolerated() { + // User explicitly re-declares the migrated binding under the same listener name; this must + // be accepted without throwing. + ServiceConfiguration config = newEmptyConfiguration(); + config.setBrokerServicePort(Optional.of(6650)); + config.setBindAddresses("internal:pulsar://0.0.0.0:6650"); + List<BindAddress> addresses = BindAddressValidator.validateBindAddresses(config, null); + assertEquals(addresses, Collections.singletonList( + new BindAddress("internal", URI.create("pulsar://0.0.0.0:6650")))); + } + + @Test + public void testDuplicateConflictFailsWithError() { + // The migrated binding uses the internal listener name, but the user re-declares the same + // URI under a different listener name. This is a conflict and must be rejected. + ServiceConfiguration config = newEmptyConfiguration(); + config.setBrokerServicePort(Optional.of(6650)); + config.setBindAddresses("external:pulsar://0.0.0.0:6650"); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, + () -> BindAddressValidator.validateBindAddresses(config, null)); + assertTrue(e.getMessage().contains("conflicting listener names"), + "expected conflict message but got: " + e.getMessage()); + } + + @Test + public void testDuplicateBindAddressesEntriesAreTolerated() { + // Two identical entries in bindAddresses should not cause a failure. + ServiceConfiguration config = newEmptyConfiguration(); + config.setBindAddresses("extra:pulsar://0.0.0.0:6652,extra:pulsar://0.0.0.0:6652"); + List<BindAddress> addresses = BindAddressValidator.validateBindAddresses(config, null); + assertEquals(addresses, Collections.singletonList( + new BindAddress("extra", URI.create("pulsar://0.0.0.0:6652")))); + } + + @Test + public void testSameIpPortDifferentSchemeSameListenerFails() { + // Same ip:port with two different schemes (even for the same listener) cannot both be bound. + ServiceConfiguration config = newEmptyConfiguration(); + config.setBindAddresses("internal:pulsar://0.0.0.0:8080,internal:http://0.0.0.0:8080"); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, + () -> BindAddressValidator.validateBindAddresses(config, null)); + assertTrue(e.getMessage().contains("0.0.0.0:8080"), + "expected ip:port in message but got: " + e.getMessage()); + assertTrue(e.getMessage().contains("only one scheme"), + "expected scheme-collision wording but got: " + e.getMessage()); + } + + @Test + public void testSameIpPortDifferentSchemeDifferentListenerFails() { + // Same ip:port with two different schemes and two different listener names also fails for + // the same reason: an ip:port can only be bound by one (listener, scheme). + ServiceConfiguration config = newEmptyConfiguration(); + config.setBindAddresses("internal:pulsar://0.0.0.0:8080,external:http://0.0.0.0:8080"); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, + () -> BindAddressValidator.validateBindAddresses(config, null)); + assertTrue(e.getMessage().contains("0.0.0.0:8080"), + "expected ip:port in message but got: " + e.getMessage()); + } + + @Test + public void testDynamicPortZeroIsSkippedFromIpPortUniqueness() { + // Port 0 means "OS-assigned ephemeral port", so two port-0 bindings with the same IP cannot + // actually collide — each socket gets a different kernel-assigned port. The ip:port + // uniqueness check therefore skips port-0 entries. + ServiceConfiguration config = newEmptyConfiguration(); + config.setBindAddresses("internal:pulsar://0.0.0.0:0,internal:http://0.0.0.0:0"); + List<BindAddress> addresses = BindAddressValidator.validateBindAddresses(config, null); + assertEquals(addresses, Arrays.asList( + new BindAddress("internal", URI.create("pulsar://0.0.0.0:0")), + new BindAddress("internal", URI.create("http://0.0.0.0:0")))); + } + + @Test + public void testLegacyMigrationIpPortCollisionFails() { + // brokerServicePort and webServicePort collide on the same ip:port: the migrated bindings + // would produce pulsar://0.0.0.0:8080 and http://0.0.0.0:8080, which cannot coexist. + ServiceConfiguration config = newEmptyConfiguration(); + config.setBrokerServicePort(Optional.of(8080)); + config.setWebServicePort(Optional.of(8080)); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, + () -> BindAddressValidator.validateBindAddresses(config, null)); + assertTrue(e.getMessage().contains("0.0.0.0:8080"), + "expected ip:port in message but got: " + e.getMessage()); } } diff --git a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java index 7e79a40355f..438490d70c1 100644 --- a/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java +++ b/pulsar-broker-common/src/test/java/org/apache/pulsar/broker/validator/MultipleListenerValidatorTest.java @@ -18,11 +18,18 @@ */ package org.apache.pulsar.broker.validator; -import static org.testng.AssertJUnit.assertEquals; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.expectThrows; import java.net.InetAddress; +import java.net.URI; +import java.util.Map; import java.util.Optional; import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.ServiceConfigurationUtils; +import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener; import org.testng.annotations.Test; /** @@ -34,7 +41,8 @@ public class MultipleListenerValidatorTest { @SuppressWarnings("deprecation") @Test public void testGetAppliedAdvertised() throws Exception { - ServiceConfiguration config = new ServiceConfiguration(); + ServiceConfiguration config = newConfigWithNoPorts(); + config.setBrokerServicePort(Optional.of(6650)); config.setBrokerServicePortTls(Optional.of(6651)); config.setAdvertisedListeners("internal:pulsar://192.0.0.1:6660, internal:pulsar+ssl://192.0.0.1:6651"); config.setInternalListenerName("internal"); @@ -43,7 +51,7 @@ public class MultipleListenerValidatorTest { assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, true), InetAddress.getLocalHost().getCanonicalHostName()); - config = new ServiceConfiguration(); + config = newConfigWithNoPorts(); config.setBrokerServicePortTls(Optional.of(6651)); config.setAdvertisedAddress("192.0.0.2"); assertEquals(ServiceConfigurationUtils.getAppliedAdvertisedAddress(config, false), @@ -60,7 +68,8 @@ public class MultipleListenerValidatorTest { @Test public void testListenerDefaulting() { - ServiceConfiguration config = new ServiceConfiguration(); + ServiceConfiguration config = newConfigWithNoPorts(); + config.setInternalListenerName(null); config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660, internal:pulsar+ssl://127.0.0.1:6651"); MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config); assertEquals("internal", config.getInternalListenerName()); @@ -68,14 +77,14 @@ public class MultipleListenerValidatorTest { @Test(expectedExceptions = IllegalArgumentException.class) public void testMalformedListener() { - ServiceConfiguration config = new ServiceConfiguration(); + ServiceConfiguration config = newConfigWithNoPorts(); config.setAdvertisedListeners(":pulsar://127.0.0.1:6660"); MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config); } @Test(expectedExceptions = IllegalArgumentException.class) public void testListenerDuplicate_1() { - ServiceConfiguration config = new ServiceConfiguration(); + ServiceConfiguration config = newConfigWithNoPorts(); config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660, internal:pulsar+ssl://127.0.0.1:6651," + " internal:pulsar://192.168.1.11:6660, internal:pulsar+ssl://192.168.1.11:6651"); config.setInternalListenerName("internal"); @@ -84,7 +93,7 @@ public class MultipleListenerValidatorTest { @Test(expectedExceptions = IllegalArgumentException.class) public void testListenerDuplicate_2() { - ServiceConfiguration config = new ServiceConfiguration(); + ServiceConfiguration config = newConfigWithNoPorts(); config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660," + " internal:pulsar://192.168.1.11:6660"); config.setInternalListenerName("internal"); MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config); @@ -92,7 +101,7 @@ public class MultipleListenerValidatorTest { @Test(expectedExceptions = IllegalArgumentException.class) public void testListenerDuplicate_3() { - ServiceConfiguration config = new ServiceConfiguration(); + ServiceConfiguration config = newConfigWithNoPorts(); config.setAdvertisedListeners(" internal:pulsar+ssl://127.0.0.1:6661," + " internal:pulsar+ssl://192.168.1.11:6661"); config.setInternalListenerName("internal"); @@ -101,18 +110,159 @@ public class MultipleListenerValidatorTest { @Test(expectedExceptions = IllegalArgumentException.class) public void testDifferentListenerWithSameHostPort() { - ServiceConfiguration config = new ServiceConfiguration(); + ServiceConfiguration config = newConfigWithNoPorts(); config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660," + " external:pulsar://127.0.0.1:6660"); config.setInternalListenerName("internal"); MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config); } - @Test(expectedExceptions = IllegalArgumentException.class) - public void testWithoutListenerNameInAdvertisedListeners() { + @Test + public void testInternalListenerSynthesizedFromLegacyPorts() { + // No advertisedListeners configured, only legacy ports. The internal listener is synthesized. + ServiceConfiguration config = newConfigWithNoPorts(); + config.setAdvertisedAddress("broker-1.example.com"); + config.setBrokerServicePort(Optional.of(6650)); + config.setWebServicePort(Optional.of(8080)); + Map<String, AdvertisedListener> listeners = + MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config); + AdvertisedListener internal = listeners.get(ServiceConfiguration.DEFAULT_INTERNAL_LISTENER_NAME); + assertNotNull(internal, "expected an `internal` listener to be synthesized from legacy ports"); + assertEquals(internal.getBrokerServiceUrl(), URI.create("pulsar://broker-1.example.com:6650")); + assertEquals(internal.getBrokerHttpUrl(), URI.create("http://broker-1.example.com:8080")); + assertNull(internal.getBrokerServiceUrlTls()); + assertNull(internal.getBrokerHttpsUrl()); + } + + @Test + public void testInternalListenerSynthesizedWithAllPorts() { + ServiceConfiguration config = newConfigWithNoPorts(); + config.setAdvertisedAddress("broker-1.example.com"); + config.setBrokerServicePort(Optional.of(6650)); + config.setBrokerServicePortTls(Optional.of(6651)); + config.setWebServicePort(Optional.of(8080)); + config.setWebServicePortTls(Optional.of(8443)); + Map<String, AdvertisedListener> listeners = + MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config); + AdvertisedListener internal = listeners.get(ServiceConfiguration.DEFAULT_INTERNAL_LISTENER_NAME); + assertNotNull(internal); + assertEquals(internal.getBrokerServiceUrl(), URI.create("pulsar://broker-1.example.com:6650")); + assertEquals(internal.getBrokerServiceUrlTls(), URI.create("pulsar+ssl://broker-1.example.com:6651")); + assertEquals(internal.getBrokerHttpUrl(), URI.create("http://broker-1.example.com:8080")); + assertEquals(internal.getBrokerHttpsUrl(), URI.create("https://broker-1.example.com:8443")); + } + + @Test + public void testInternalListenerAutoAddedWhenAdvertisedListenersDoesNotContainIt() { + // User configures advertisedListeners with their own listener name but does not declare the + // internal listener. The validator must auto-add it from the legacy ports. + ServiceConfiguration config = newConfigWithNoPorts(); + config.setAdvertisedAddress("broker-1.example.com"); + config.setBrokerServicePort(Optional.of(6650)); + config.setWebServicePort(Optional.of(8080)); + config.setAdvertisedListeners("region1:pulsar://region1.example.com:6660," + + "region1:pulsar+ssl://region1.example.com:6661"); + Map<String, AdvertisedListener> listeners = + MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config); + assertEquals(listeners.size(), 2); + AdvertisedListener region1 = listeners.get("region1"); + assertNotNull(region1); + assertEquals(region1.getBrokerServiceUrl(), URI.create("pulsar://region1.example.com:6660")); + AdvertisedListener internal = listeners.get("internal"); + assertNotNull(internal, "internal listener should have been auto-added from legacy ports"); + assertEquals(internal.getBrokerServiceUrl(), URI.create("pulsar://broker-1.example.com:6650")); + assertEquals(internal.getBrokerHttpUrl(), URI.create("http://broker-1.example.com:8080")); + } + + @Test + public void testExplicitInternalListenerUrlsTakePrecedenceOverLegacyPorts() { + // When the user explicitly declares URLs for the internal listener in advertisedListeners, those + // URLs take precedence over the legacy-port-derived values. The legacy values still fill in any + // URL slots the user did not declare. + ServiceConfiguration config = newConfigWithNoPorts(); + config.setAdvertisedAddress("broker-1.example.com"); + config.setBrokerServicePort(Optional.of(6650)); + config.setWebServicePort(Optional.of(8080)); + config.setAdvertisedListeners("internal:pulsar://explicit.example.com:6660," + + "internal:http://explicit.example.com:8888"); + Map<String, AdvertisedListener> listeners = + MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config); + assertEquals(listeners.size(), 1); + AdvertisedListener internal = listeners.get("internal"); + assertNotNull(internal); + // Both URLs were declared explicitly: the user-provided values win over the legacy ports. + assertEquals(internal.getBrokerServiceUrl(), URI.create("pulsar://explicit.example.com:6660")); + assertEquals(internal.getBrokerHttpUrl(), URI.create("http://explicit.example.com:8888")); + // The user did not declare TLS variants and the legacy TLS ports are unset. + assertNull(internal.getBrokerServiceUrlTls()); + assertNull(internal.getBrokerHttpsUrl()); + } + + @Test + public void testPartialInternalListenerOverrideMergesWithLegacyPorts() { + // The user overrides only one URL (the binary endpoint) for the internal listener. The remaining + // URLs must be filled in from the legacy-port configuration so the listener is complete. + ServiceConfiguration config = newConfigWithNoPorts(); + config.setAdvertisedAddress("broker-1.example.com"); + config.setBrokerServicePort(Optional.of(6650)); + config.setBrokerServicePortTls(Optional.of(6651)); + config.setWebServicePort(Optional.of(8080)); + config.setWebServicePortTls(Optional.of(8443)); + config.setAdvertisedListeners("internal:pulsar://override.example.com:6660"); + Map<String, AdvertisedListener> listeners = + MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config); + AdvertisedListener internal = listeners.get("internal"); + assertNotNull(internal); + // Overridden by the explicit advertisedListeners entry: + assertEquals(internal.getBrokerServiceUrl(), URI.create("pulsar://override.example.com:6660")); + // Filled in from the legacy ports: + assertEquals(internal.getBrokerServiceUrlTls(), URI.create("pulsar+ssl://broker-1.example.com:6651")); + assertEquals(internal.getBrokerHttpUrl(), URI.create("http://broker-1.example.com:8080")); + assertEquals(internal.getBrokerHttpsUrl(), URI.create("https://broker-1.example.com:8443")); + } + + @Test + public void testInternalListenerNameCustomizable() { + // The internal listener name is configurable; the validator must look for the configured name. + ServiceConfiguration config = newConfigWithNoPorts(); + config.setInternalListenerName("region1"); + config.setAdvertisedAddress("broker-1.example.com"); + config.setBrokerServicePort(Optional.of(6650)); + config.setWebServicePort(Optional.of(8080)); + config.setAdvertisedListeners("region1:pulsar://region1.example.com:6660," + + "region1:http://region1.example.com:8888"); + Map<String, AdvertisedListener> listeners = + MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config); + assertEquals(listeners.size(), 1); + AdvertisedListener region1 = listeners.get("region1"); + assertNotNull(region1); + assertEquals(region1.getBrokerServiceUrl(), URI.create("pulsar://region1.example.com:6660")); + } + + @Test + public void testFailureWhenNoInternalListenerAvailable() { + // Custom internal listener name + no matching entry in advertisedListeners + no ports for the + // legacy fallback => validation must fail with a clear message. + ServiceConfiguration config = newConfigWithNoPorts(); + config.setInternalListenerName("region1"); + config.setAdvertisedListeners("region2:pulsar://region2.example.com:6660"); + IllegalArgumentException e = expectThrows(IllegalArgumentException.class, + () -> MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config)); + assertTrue(e.getMessage().contains("region1"), "expected error to mention the listener name: " + + e.getMessage()); + } + + @Test + public void testInternalListenerNameDefaultsToInternalConstant() { ServiceConfiguration config = new ServiceConfiguration(); - config.setAdvertisedListeners(" internal:pulsar://127.0.0.1:6660, internal:pulsar+ssl://127.0.0.1:6651"); - config.setInternalListenerName("external"); - MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config); + assertEquals(config.getInternalListenerName(), ServiceConfiguration.DEFAULT_INTERNAL_LISTENER_NAME); } + private ServiceConfiguration newConfigWithNoPorts() { + ServiceConfiguration config = new ServiceConfiguration(); + config.setBrokerServicePort(Optional.empty()); + config.setBrokerServicePortTls(Optional.empty()); + config.setWebServicePort(Optional.empty()); + config.setWebServicePortTls(Optional.empty()); + return config; + } } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index c15153f3443..f85d12116ef 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -972,13 +972,16 @@ public class PulsarService implements AutoCloseable, ShutdownService { this.addWebServerHandlers(webService, metricsServlet, this.config); this.webService.start(); - // Refresh addresses and update configuration, since the port might have been dynamically assigned - if (config.getBrokerServicePort().equals(Optional.of(0))) { - config.setBrokerServicePort(brokerService.getListenPort()); - } - if (config.getBrokerServicePortTls().equals(Optional.of(0))) { - config.setBrokerServicePortTls(brokerService.getListenPortTls()); - } + // Refresh addresses and update configuration based on the actual bound ports. This is + // necessary both for dynamic ports (`Optional.of(0)`) and for the case where the broker + // is configured only via `bindAddresses` (legacy port properties left as + // `Optional.empty()`), so that downstream code — in particular + // MultipleListenerValidator.validateAndAnalysisAdvertisedListener — can synthesize the + // internal advertised listener from the now-known ports. + brokerService.getListenPort().ifPresent(port -> config.setBrokerServicePort(Optional.of(port))); + brokerService.getListenPortTls().ifPresent(port -> config.setBrokerServicePortTls(Optional.of(port))); + // Recompute the cached advertised listener map now that the bound ports are known. + this.advertisedListeners = MultipleListenerValidator.validateAndAnalysisAdvertisedListener(config); this.webServiceAddress = webAddress(config); this.webServiceAddressTls = webAddressTls(config); this.brokerServiceUrl = brokerUrl(config); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/DynamicBindAddressesIntegrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/DynamicBindAddressesIntegrationTest.java new file mode 100644 index 00000000000..60cbc82f3ba --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/DynamicBindAddressesIntegrationTest.java @@ -0,0 +1,197 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.web; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import java.net.InetAddress; +import java.net.URI; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Producer; +import org.apache.pulsar.client.api.PulsarClient; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.SubscriptionInitialPosition; +import org.apache.pulsar.common.policies.data.ClusterData; +import org.apache.pulsar.common.policies.data.TenantInfo; +import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Integration test for configuring the broker entirely through {@code bindAddresses} with dynamic + * ports (port {@code 0}). Verifies that: + * <ul> + * <li>None of the legacy {@code brokerServicePort}/{@code webServicePort} (and TLS variants) + * need to be set when {@code bindAddresses} supplies {@code pulsar://} and {@code http://} + * entries under the internal listener;</li> + * <li>after the broker binds, the actual bound ports are reflected back into the configuration + * and the internal advertised listener is synthesized with the real ports;</li> + * <li>{@code advertisedAddress} defaults to the local hostname when left blank;</li> + * <li>{@code PulsarAdmin} and {@code PulsarClient} can talk to the broker via the resolved + * service URLs.</li> + * </ul> + * + * <p>This exercises the post-bind dependency: only after the Netty/Jetty servers have bound is + * the dynamic port known, and only then can the advertised listener map be populated. + */ +@Test(groups = "broker") +public class DynamicBindAddressesIntegrationTest extends MockedPulsarServiceBaseTest { + + @BeforeMethod + @Override + protected void setup() throws Exception { + super.internalSetup(); + // Provision the standard `public/default` namespace that the admin/client tests below use. + // This cannot be done from inside the constructor flow because the admin client only becomes + // reachable after the dynamic web port has been bound and surfaced into the configuration. + admin.clusters().createCluster(conf.getClusterName(), + ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build()); + admin.tenants().createTenant("public", + TenantInfo.builder() + .allowedClusters(Set.of(conf.getClusterName())) + .build()); + admin.namespaces().createNamespace("public/default"); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + // Clear every legacy port property — the broker must come up purely from `bindAddresses`. + conf.setBrokerServicePort(Optional.empty()); + conf.setBrokerServicePortTls(Optional.empty()); + conf.setWebServicePort(Optional.empty()); + conf.setWebServicePortTls(Optional.empty()); + // Bind to all interfaces on a dynamically assigned port for both schemes, tagged with the + // internal listener so the connectors are selected as the primary ones for broker-to-broker + // communication. + conf.setBindAddresses("internal:pulsar://0.0.0.0:0,internal:http://0.0.0.0:0"); + // Force `advertisedAddress` to be blank so we exercise the hostname-fallback path. + conf.setAdvertisedAddress(null); + // No explicit `advertisedListeners` either; the internal listener must be synthesized from + // the dynamically bound ports. + conf.setAdvertisedListeners(null); + } + + @Test + public void testRuntimeListenerSynthesizedFromDynamicBindAddresses() { + ServiceConfiguration runtimeConf = pulsar.getConfiguration(); + + // After binding, the legacy port properties must reflect the actual dynamic ports. + assertTrue(runtimeConf.getBrokerServicePort().isPresent(), + "brokerServicePort should be populated from the bound port"); + assertTrue(runtimeConf.getWebServicePort().isPresent(), + "webServicePort should be populated from the bound port"); + int boundPulsarPort = runtimeConf.getBrokerServicePort().get(); + int boundWebPort = runtimeConf.getWebServicePort().get(); + assertFalse(boundPulsarPort == 0, "the dynamic pulsar port must not still be 0"); + assertFalse(boundWebPort == 0, "the dynamic web port must not still be 0"); + // TLS variants stay empty because no TLS binding was declared. + assertFalse(runtimeConf.getBrokerServicePortTls().isPresent()); + assertFalse(runtimeConf.getWebServicePortTls().isPresent()); + + // The advertised address must default to the local hostname. + String expectedHost; + try { + expectedHost = InetAddress.getLocalHost().getCanonicalHostName(); + } catch (Exception e) { + throw new AssertionError("unable to resolve canonical local hostname for the test", e); + } + + // The internal listener must now exist in the cached map with both URLs populated. + Map<String, AdvertisedListener> listeners = pulsar.getAdvertisedListeners(); + assertNotNull(listeners, "advertised listener map must not be null"); + AdvertisedListener internal = listeners.get(ServiceConfiguration.DEFAULT_INTERNAL_LISTENER_NAME); + assertNotNull(internal, + "the `internal` listener must have been synthesized after binding, but the map was: " + + listeners); + assertEquals(internal.getBrokerServiceUrl(), + URI.create("pulsar://" + expectedHost + ":" + boundPulsarPort)); + assertEquals(internal.getBrokerHttpUrl(), + URI.create("http://" + expectedHost + ":" + boundWebPort)); + + // The top-level service URLs must agree with the listener. + assertEquals(pulsar.getBrokerServiceUrl(), "pulsar://" + expectedHost + ":" + boundPulsarPort); + assertEquals(pulsar.getWebServiceAddress(), "http://" + expectedHost + ":" + boundWebPort); + + // The broker id is derived from the (post-bind) advertised address and web port. + String brokerId = pulsar.getBrokerId(); + assertNotNull(brokerId, "brokerId must be initialized after start()"); + assertEquals(brokerId, expectedHost + ":" + boundWebPort); + } + + @Test + public void testPulsarAdminCanCreateTopicOverDynamicBindAddress() throws Exception { + String namespace = "public/default"; + String topic = "persistent://" + namespace + "/dynamic-bind-admin-" + UUID.randomUUID(); + + // The `admin` field is built against the runtime web URL by MockedPulsarServiceBaseTest; + // exercise it with a real round-trip to confirm the URL is reachable. + try (PulsarAdmin localAdmin = PulsarAdmin.builder() + .serviceHttpUrl(pulsar.getWebServiceAddress()) + .build()) { + localAdmin.topics().createNonPartitionedTopic(topic); + assertTrue(localAdmin.topics().getList(namespace).contains(topic), + "topic should be listed under its namespace after creation"); + } + } + + @Test + public void testPulsarClientProduceConsumeOverDynamicBindAddress() throws Exception { + String topic = "persistent://public/default/dynamic-bind-pubsub-" + UUID.randomUUID(); + String subscription = "test-sub"; + String payload = "hello-dynamic-bind"; + + try (PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsar.getBrokerServiceUrl()) + .build()) { + try (Consumer<String> consumer = client.newConsumer(Schema.STRING) + .topic(topic) + .subscriptionName(subscription) + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest) + .subscribe(); + Producer<String> producer = client.newProducer(Schema.STRING) + .topic(topic) + .create()) { + producer.send(payload); + Message<String> received = consumer.receive(10, TimeUnit.SECONDS); + assertNotNull(received, "consumer must receive the message produced over the dynamic port"); + assertEquals(received.getValue(), payload); + consumer.acknowledge(received); + } + } + } +} diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/MultipleInternalBindAddressesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/MultipleInternalBindAddressesTest.java new file mode 100644 index 00000000000..8b55c4b4b88 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/web/MultipleInternalBindAddressesTest.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker.web; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertTrue; +import java.net.InetAddress; +import java.net.URI; +import java.util.Optional; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Verifies behavior when the internal listener has more than one bind address for the same protocol + * scheme. In that case the broker accepts every binding but uses the first declared one (per scheme) + * as the primary, which is what drives {@code pulsar.getBrokerServiceUrl()}, + * {@code pulsar.getWebServiceAddress()}, and the synthesized internal advertised listener. + */ +@Test(groups = "broker") +public class MultipleInternalBindAddressesTest extends MockedPulsarServiceBaseTest { + + @BeforeMethod + @Override + protected void setup() throws Exception { + super.internalSetup(); + } + + @AfterMethod(alwaysRun = true) + @Override + protected void cleanup() throws Exception { + super.internalCleanup(); + } + + @Override + protected void doInitConf() throws Exception { + super.doInitConf(); + // Drive everything off `bindAddresses` so the validator/binder cannot reach back to legacy + // ports to find a "default" primary. + conf.setBrokerServicePort(Optional.empty()); + conf.setBrokerServicePortTls(Optional.empty()); + conf.setWebServicePort(Optional.empty()); + conf.setWebServicePortTls(Optional.empty()); + // Two pulsar bindings and two http bindings for the internal listener, on different IPs so + // the ip:port uniqueness check is satisfied (each combination is unique). Both use port 0 + // so the OS assigns distinct dynamic ports. + conf.setBindAddresses( + "internal:pulsar://0.0.0.0:0," + + "internal:pulsar://127.0.0.1:0," + + "internal:http://0.0.0.0:0," + + "internal:http://127.0.0.1:0"); + // Hostname-fallback path for the advertised address. + conf.setAdvertisedAddress(null); + conf.setAdvertisedListeners(null); + } + + @Test + public void testFirstInternalBindingPerSchemeBecomesPrimary() throws Exception { + ServiceConfiguration runtimeConf = pulsar.getConfiguration(); + + // The primary listen ports are surfaced via the dedicated methods. By contract, these come + // from the FIRST binding (per scheme) that matches the internal listener. + int primaryPulsarPort = pulsar.getBrokerListenPort().orElseThrow(); + int primaryWebPort = pulsar.getWebService().getListenPortHTTP().orElseThrow(); + assertTrue(primaryPulsarPort > 0, "primary pulsar port must be a real OS-assigned port"); + assertTrue(primaryWebPort > 0, "primary web port must be a real OS-assigned port"); + assertTrue(primaryPulsarPort != primaryWebPort, + "pulsar and web primaries must have been assigned distinct ports"); + + // After binding, the configuration's legacy ports must reflect the primary binding's ports + // (not the additional bindings' ports), which is what downstream synthesis depends on. + assertEquals(runtimeConf.getBrokerServicePort(), Optional.of(primaryPulsarPort)); + assertEquals(runtimeConf.getWebServicePort(), Optional.of(primaryWebPort)); + + // The bindAddresses config string still records all the original entries the user declared, + // proving the multi-binding configuration was accepted up-front. + String declared = runtimeConf.getBindAddresses(); + assertNotNull(declared); + assertEquals(countOccurrences(declared, "internal:pulsar://"), 2, + "two internal:pulsar:// bindings should be in the declared config: " + declared); + assertEquals(countOccurrences(declared, "internal:http://"), 2, + "two internal:http:// bindings should be in the declared config: " + declared); + + // pulsar.getBrokerServiceUrl() / pulsar.getWebServiceAddress() must use the primary ports + // (not any of the additional bindings' ports). + String expectedHost = InetAddress.getLocalHost().getCanonicalHostName(); + assertEquals(pulsar.getBrokerServiceUrl(), "pulsar://" + expectedHost + ":" + primaryPulsarPort); + assertEquals(pulsar.getWebServiceAddress(), "http://" + expectedHost + ":" + primaryWebPort); + + // The synthesized internal advertised listener must agree with the same primary ports. + AdvertisedListener internal = + pulsar.getAdvertisedListeners().get(ServiceConfiguration.DEFAULT_INTERNAL_LISTENER_NAME); + assertNotNull(internal, + "the `internal` listener must have been synthesized after binding, but the map was: " + + pulsar.getAdvertisedListeners()); + assertEquals(internal.getBrokerServiceUrl(), + URI.create("pulsar://" + expectedHost + ":" + primaryPulsarPort)); + assertEquals(internal.getBrokerHttpUrl(), + URI.create("http://" + expectedHost + ":" + primaryWebPort)); + } + + private static int countOccurrences(String haystack, String needle) { + int count = 0; + int index = 0; + while ((index = haystack.indexOf(needle, index)) != -1) { + count++; + index += needle.length(); + } + return count; + } +}
