lhotari opened a new pull request, #25851:
URL: https://github.com/apache/pulsar/pull/25851

   ### Motivation
   
   [PIP-61: Advertised multiple 
addresses](https://github.com/apache/pulsar/blob/master/pip/pip-61.md) and 
[PIP-95: Smart Listener Selection with Multiple Bind 
Addresses](https://github.com/apache/pulsar/blob/master/pip/pip-95.md) are not 
fully implemented for the broker webservice (HTTP/HTTPS). Support for 
`bindAddresses` was completely missing on the webservice side, and Admin API 
redirects between brokers did not preserve the listener the request originally 
arrived on. This PR closes both gaps and, while doing so, simplifies the 
configuration model. It has two main goals:
   
   **Goal 1 — Simpler configuration: the internal listener does not have to be 
declared in `advertisedListeners` or `bindAddresses`.**
   
   - The `*Port` properties (`brokerServicePort` / `brokerServicePortTls` / 
`webServicePort` / `webServicePortTls`) now serve a dual role: the same port 
number is used both for the local socket bind port **and** for the advertised 
port of the internal listener, with `advertisedAddress` supplying the hostname. 
Setting just the `*Port` properties is therefore enough to fully configure the 
internal listener — no entry in `advertisedListeners` or `bindAddresses` is 
required for it.
   - Adding an external listener no longer forces re-declaring the internal 
one: `advertisedListeners` (and `bindAddresses`, when PIP-95 is used) only 
needs entries for the additional listeners.
   - `internalListenerName` defaults to `internal`. This default is chosen so 
that the auto-configured port-derived internal listener cannot accidentally 
collide with any user-declared listener name. Overriding the internal 
listener's URLs in `advertisedListeners` is discouraged because pointing them 
at an external load balancer would route cluster-internal traffic outside the 
cluster.
   - The previous "use the first advertised listener as the internal one" rule 
was brittle, but is preserved as a backwards-compat fallback: leaving 
`internalListenerName` blank still uses the first entry in 
`advertisedListeners` as the internal listener.
   
   **Goal 2 — PIP-95 smart listener selection for the HTTP/HTTPS Admin API, 
enabling a proxyless deployment.**
   
   This was one of the original PIP-61 goals that the existing code only 
delivered for the Pulsar binary protocol; the Admin API was missing. Before 
this PR, the broker had no way to know which advertised listener a given Admin 
API request belonged to, so admin redirects had to send clients back through a 
single Admin URL — typically requiring an HTTP reverse proxy in front of the 
brokers to multiplex Admin API calls per listener.
   
   With this PR, PIP-95 smart listener selection is extended to HTTP/HTTPS:
   - Each HTTP/HTTPS advertised listener is bound to a unique port via 
`bindAddresses`.
   - The broker identifies the listener from the destination port the request 
arrived on.
   - Admin API redirect responses (lookup redirects, admin forwarding to the 
owner or leader broker) preserve that listener name and route the client to the 
matching listener on the target broker.
   
   This enables a **fully proxyless deployment**: a layer-4 (TCP/IP) load 
balancer in front of the brokers can forward both the Pulsar binary protocol 
and the HTTP/HTTPS Admin API directly to the listener-specific `ip:port` 
configured in `bindAddresses`, with no HTTP reverse proxy needed.
   
   **Goal 3 — Foundation for per-listener authentication and authorization 
plugins.**
   
   This PR does not change any auth plugin behavior, but it puts the necessary 
plumbing in place: after this PR, every incoming connection on both the broker 
service and the web service has its listener identity resolved from the 
destination port (the PIP-95 "smart listener" feature, now extended to 
HTTP/HTTPS as well). With the listener identity available on each request, 
follow-up work can restrict authentication or authorization plugins to specific 
listeners.
   
   A motivating example is the static JWT token that brokers use for internal 
broker-to-broker communication. Today such a token has to be accepted by every 
listener, so leaking it gives an attacker direct access from any 
externally-reachable endpoint. Once auth plugins can be scoped per listener, 
the static JWT can be limited to the internal listener only — a leaked internal 
token then becomes useless without direct network access to the 
cluster-internal listener's `ip:port`, while external clients continue to 
authenticate via OAuth (or similar) on the external listeners.
   
   Additional notable behavior changes:
   
   - Both the Pulsar binary protocol (`pulsar` & `pulsar+ssl`) and the web API 
(`http` & `https`) can now share the same listener name; the previous 
restriction is removed. For HTTP broker lookups, the binary-protocol listener 
name defaults to the webservice listener name when there is a matching `pulsar` 
/ `pulsar+ssl` entry under the same listener.
   
   ### Modifications
   
   Main change — webservice listener support:
   
   - Add bindAddresses + advertisedListeners (http/https) support to the broker 
webservice (`WebService`, `BindAddressValidator`, `MultipleListenerValidator`).
   - Per-request listener tracking: an `AddListenerAttributeFilter` records the 
listener name on the servlet request, and 
`PulsarWebResource#getWebServiceListenerName()` exposes it to admin handlers so 
redirects preserve the listener.
   - Centralize redirect-URI construction in `LookupResult#toRedirectUri(...)` 
to remove duplicated host/port/scheme-swap logic from `NamespacesBase`, 
`PulsarWebResource`, `TopicLookupBase`, and `TopicsBase`. The redirect URI 
carries the correct `authoritative` flag and listener query param. Listener URL 
overrides are applied through a small `UrlOverride` helper so the broker- and 
web-service paths read as a sequence of small steps rather than nested branches.
   - Topic-lookup redirects use a dedicated 
`LookupResult#toLookupRedirectUri(URI)` method that always injects the resolved 
listener name as a `listenerName` query parameter. This is the case where the 
original request may have carried the listener via the `X-Pulsar-Listener-Name` 
header (headers do not survive HTTP redirects, so the parameter form must be 
propagated). Other (admin) redirect paths use `toRedirectUri` and leave the 
request's query string alone, since admin endpoints do not understand 
`listenerName`.
   - The 412 entity emitted when the target broker has no URL for the request 
scheme now mentions the resolved listener name to aid debugging.
   - `LookupOptions` gains `webServiceAdvertisedListenerName` and drops the 
`requestHttps` boolean (the scheme is now derived per-request from the incoming 
`requestUri`).
   - Plumb `brokerId` through `LookupData` / `ServiceLookupData` / 
`LocalBrokerData` / `NamespaceEphemeralData` and use broker-id equality for 
ownership checks rather than URL string comparison.
   - For rolling-upgrade safety, a missing web-service listener on the target 
broker does not fail the lookup; the redirect falls back to the broker's 
default URL. A missing broker-service listener still fails the lookup as before.
   
   Configuration — defaults, migration, and validation:
   
   - Default `internalListenerName` to `internal` and add the public constant 
`ServiceConfiguration.DEFAULT_INTERNAL_LISTENER_NAME`.
   - Backwards-compat fallback: if the user explicitly sets 
`internalListenerName=` (blank) and `advertisedListeners` is non-empty, the 
validator uses the first listener parsed from `advertisedListeners` as the 
internal listener — matching the pre-PR behavior. Only when both are blank does 
the validator fall back to the constant default `internal`.
   - `MultipleListenerValidator` (the new 
`validateAndUpdateAdvertisedListeners` entry point — renamed from 
`validateAndAnalysisAdvertisedListener` so the name reflects that it also 
writes back the resolved `internalListenerName`) now synthesizes the internal 
advertised listener from the *Port properties (`brokerServicePort` / 
`brokerServicePortTls` / `webServicePort` / `webServicePortTls`), using 
`advertisedAddress` as the host. Each *Port value is reused as both the local 
socket bind port and the advertised port for the internal listener. Any URLs 
the user declares explicitly under the internal listener in 
`advertisedListeners` override the corresponding port-derived URLs (the rest 
still fill in from the *Port properties), but doing so is discouraged because 
it can route cluster-internal traffic via whatever external endpoint is 
advertised.
     - *Goal*: Setting just `advertisedAddress` and the *Port properties is 
enough to configure the internal listener; an additional external listener can 
be added to `advertisedListeners` without touching the internal listener at all.
   - `BindAddressValidator` migrations are now tagged with 
`internalListenerName` (was `null`). Validation tolerates exact duplicates 
(`listener:scheme://ip:port`), rejects the same URI assigned to different 
listener names, and rejects the same `ip:port` being used by two different 
schemes (a TCP socket can host only one scheme). Port `0` is exempt from the 
ip:port cross-scheme check because the OS assigns distinct ephemeral ports per 
socket; port `0` is documented as supported only for the internal listener.
     - Routing model per scheme: for the Pulsar binary protocol (`pulsar` / 
`pulsar+ssl`), `bindAddresses` is optional — clients may pass an explicit 
`listenerName` and connect to any binary-protocol port the broker exposes. For 
the HTTP/HTTPS Admin API (`http` / `https`), `bindAddresses` is the only 
routing mechanism, because the broker identifies the listener from the 
destination port the request arrived on; any HTTP/HTTPS advertised listener 
that should be reachable from outside the cluster therefore needs a dedicated 
entry in `bindAddresses` on a unique port.
   - Consistent listener-name validation between `advertisedListeners` and 
`bindAddresses`: listener names must match `[A-Za-z0-9_-]+` (no whitespace, no 
characters that would need URL-encoding). Both validators delegate to a single 
shared `validateListenerName` helper.
   - Whitespace handling: comma-separated entries in both `advertisedListeners` 
and `bindAddresses` are trimmed end-to-end, and empty entries (extra/trailing 
commas, blank-only segments from configurations split across multiple lines) 
are skipped.
   - IPv6 support: bare IPv6 literals are wrapped in square brackets when 
embedded in URLs. The four URL builders in `ServiceConfigurationUtils` 
(`brokerUrl` / `brokerUrlTls` / `webServiceUrl` / `webServiceUrlTls`) rename 
their `host` parameter to `ipOrHost` and apply the formatter — 
`io.netty.util.NetUtil.isValidIpV6Address` decides whether to bracket. The same 
helper is used for ip:port uniqueness keys and error messages in 
`MultipleListenerValidator` and `BindAddressValidator`, so `[::1]:6650` is 
unambiguous in both validation and reporting.
   - `PulsarService.start()`: after the Netty/Jetty servers bind, 
unconditionally sync `brokerServicePort` / `brokerServicePortTls` from the 
actually bound channels (was only refreshing the `Optional.of(0)` case), then 
re-compute the cached `advertisedListeners` map so it reflects the dynamic 
ports. This makes a `bindAddresses=...:0`-only configuration fully supported.
   - `ServiceConfigurationUtils.getAppliedAdvertisedAddress`: tolerate a 
synthesized internal listener that only has a subset of the four URLs populated 
(e.g. only HTTPS); pick the first non-null URL to derive the host.
   - Rewrite the documentation for the eight listener-related properties in 
`ServiceConfiguration` and mirror it in `conf/broker.conf` and 
`conf/standalone.conf`. Each `*Port` property explicitly says it serves as both 
the bind port and the advertised port of the internal listener. 
`advertisedListeners` is reframed so its primary purpose comes first (declaring 
additional, typically external, listeners); the auto-configured internal 
listener and the discouraged-override caveat come after, and the legacy 
fallback (blank `internalListenerName` ⇒ first listener parsed here becomes the 
internal one) is called out in its own paragraph. `internalListenerName` is 
described in terms of cluster-internal traffic, with the Kubernetes 
external-listener guidance. The `bindAddresses` documentation calls out that 
port `0` is supported only for the internal listener. The six shared property 
comment blocks are byte-identical between `broker.conf` and `standalone.conf`.
   
   Bundled changes (not strictly required for the webservice listener support, 
but related to the same redirect/lookup code paths touched by this PR — 
split-out PRs would only churn the same files):
   
   - Move REST-producer topic ownership tracking out of 
`BrokerService.owningTopics` into a dedicated `RestProducerContext`. Eagerly 
registers ownership after a successful lookup to close a race against the 
asynchronous `TopicEvent.LOAD` listener notification.
   - `PulsarService#loadNamespaceTopics`: add a `failedCount` counter so 
partial topic-load failures surface in the log line instead of silently being 
lost.
   - `NamespaceBundle#hasNonPersistentTopic`: make `volatile` to fix a 
concurrency hazard observed in the surrounding code.
   - Rename `RedirectManager` → `RedirectManagerForLoadManagerMigration` (and 
its method `findRedirectLookupResultAsync` → 
`redirectIfLoadBalancerOnBrokerIsNotExpected`). The previous name suggested the 
class managed *all* lookup redirects, which it does not. Its only 
responsibility is the load-manager-migration path: when 
`loadManagerMigrationEnabled` is set, it compares the broker's current 
`loadManagerClassName` against the latest entry in the service lookup data and, 
if they differ, redirects to a random broker running the expected load manager. 
The new name + Javadoc make the narrow scope (and the 
`loadManagerMigrationEnabled` gate) explicit so future readers don't mistake it 
for a general redirect facility.
   
   ### Verifying this change
   
   This change added tests and can be verified as follows:
   
   - `LookupResultTest` — unit coverage for `LookupResult.create(...)` 
factories, redirect URI building, and the backward-compatible `brokerId` 
derivation from `webServiceUrl` / `webServiceUrlTls` host:port. Includes cases 
for `toLookupRedirectUri` injecting the `listenerName` query parameter on 
topic-lookup redirects, `toRedirectUri` leaving the query string alone on admin 
redirects (including when the original request already carried `listenerName`), 
and the listener-aware 412 entity.
   - `MultipleListenerValidatorTest` — defaulting, custom internal listener 
name, port synthesis (binary, web, both, partial), merge semantics with 
explicit URLs, failure paths, whitespace trimming around comma-separated 
entries, and empty-entry skipping.
   - `BindAddressValidatorTest` — migrated-binding listener naming, duplicate 
tolerance, listener-name conflict, ip:port cross-scheme conflict, port-0 
exemption from the ip:port check, legacy-migration ip:port collision, 
whitespace trimming, and empty-entry skipping.
   - `ServiceConfigurationUtilsTest` — hostname, IPv4, bare IPv6, and 
pre-bracketed IPv6 for each of the four URL builders; asserts the resulting 
strings round-trip through `URI.create(...)` with the expected port.
   - `NamespaceServiceListenerResolutionTest` — 
`NamespaceService.resolveBrokerServiceLookupResult` behavior: broker-service 
listener mismatch fails the lookup, web-service listener mismatch falls back to 
the default URL (rolling-upgrade safety), and a fully matching listener 
succeeds for both.
   - `DynamicBindAddressesIntegrationTest` — end-to-end test that boots a 
broker purely from 
`bindAddresses=internal:pulsar://0.0.0.0:0,internal:http://0.0.0.0:0`, asserts 
that the runtime ports flow back into the configuration and the advertised 
listener, that the hostname fallback is used when `advertisedAddress` is null, 
that `pulsar.getBrokerId()` is populated, and that `PulsarAdmin` and 
`PulsarClient` can create a topic / produce / consume over the dynamic ports.
   - `MultipleInternalBindAddressesTest` — confirms that when the internal 
listener carries multiple bindings for the same scheme, the first one becomes 
the primary and that `pulsar.getBrokerServiceUrl()`, 
`pulsar.getWebServiceAddress()`, and the synthesized internal advertised 
listener all reflect that primary.
   - `PulsarMultiListenersWithInternalListenerNameTest` — extended end-to-end 
coverage for the webservice listener routing.
   - `RedirectManagerForLoadManagerMigrationTest` (renamed) — coverage for the 
load-manager-migration redirect path.
   - Existing integration / namespace / topic-owner tests adjusted for the new 
lookup API.
   
   ### Does this pull request potentially affect one of the following parts:
   
   - [x] The public API
   - [x] The REST endpoints
   - [x] The threading model
   - [x] The default values of configurations
   - [x] Anything that affects deployment
   
   ### Documentation
   
   - [x] `doc-not-needed`
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to