This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch lh-support-webservice-bindaddresses-and-advertised-listeners in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 60e5a4cce161958e79f2bc23bb31a5677b69154a Author: Lari Hotari <[email protected]> AuthorDate: Wed May 20 19:31:22 2026 +0300 Address review comment --- .../apache/pulsar/broker/lookup/LookupResult.java | 27 ++++++++++++++-------- .../broker/namespace/NamespaceEphemeralData.java | 6 +++++ .../pulsar/broker/namespace/NamespaceService.java | 17 +++++++------- .../pulsar/broker/web/RestProducerContext.java | 2 +- .../pulsar/broker/lookup/LookupResultTest.java | 6 ++++- 5 files changed, 38 insertions(+), 20 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java index 3d46ef16cda..c45614a8be3 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java @@ -22,6 +22,8 @@ import static org.apache.pulsar.broker.lookup.v2.TopicLookup.LISTENERNAME_PARAM; import java.net.URI; import java.util.Map; import java.util.Objects; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Response; import javax.ws.rs.core.UriBuilder; import lombok.Builder; import lombok.Getter; @@ -176,14 +178,14 @@ public class LookupResult { } } - // for backwards compatibility, parse the brokerId from webServiceUrl or webServiceUrlTls - // this might be the case temporarily when the broker upgrade happens and there are mixed versions - // of brokers in the cluster + // for backwards compatibility, derive the brokerId from webServiceUrl or webServiceUrlTls by + // parsing the URL and taking host:port. This is a transient state that may occur during a + // rolling upgrade when older brokers in the cluster do not yet publish a brokerId in their + // ephemeral data; once all brokers have been upgraded, the brokerId field is always populated. if (brokerId == null && (webServiceUrl != null || webServiceUrlTls != null)) { - if (webServiceUrl != null) { - brokerId = webServiceUrl.substring("http://".length()); - } else { - brokerId = webServiceUrlTls.substring("https://".length()); + URI url = URI.create(webServiceUrl != null ? webServiceUrl : webServiceUrlTls); + if (url.getHost() != null && url.getPort() != -1) { + brokerId = url.getHost() + ":" + url.getPort(); } } @@ -246,9 +248,14 @@ public class LookupResult { private URI toRedirectUriInternal(URI requestUri, boolean authoritativeRedirect) { boolean requireHttps = "https".equalsIgnoreCase(requestUri.getScheme()); String webServiceUrl = requireHttps ? lookupData.getHttpUrlTls() : lookupData.getHttpUrl(); - Objects.requireNonNull(webServiceUrl, () -> "No " - + (requireHttps ? "https" : "http") - + " URL configured for broker " + lookupData.getBrokerId()); + if (webServiceUrl == null) { + // Preserve the legacy 412 error semantics when the redirect target broker has no URL + // configured for the requested scheme. + throw new WebApplicationException(Response.status(Response.Status.PRECONDITION_FAILED) + .entity("No " + (requireHttps ? "https" : "http") + + " URL configured for broker " + lookupData.getBrokerId()) + .build()); + } URI webServiceUri = URI.create(webServiceUrl); UriBuilder uriBuilder = UriBuilder.fromUri(requestUri) // use the path and query parameters from the request URI diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceEphemeralData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceEphemeralData.java index 4b4cb53ff0a..a580eb1314b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceEphemeralData.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceEphemeralData.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; import lombok.Data; +import lombok.EqualsAndHashCode; import lombok.NoArgsConstructor; import org.apache.pulsar.policies.data.loadbalancer.AdvertisedListener; import org.jspecify.annotations.NonNull; @@ -29,6 +30,11 @@ import org.jspecify.annotations.NonNull; @Data @NoArgsConstructor public class NamespaceEphemeralData { + // The brokerId field was added later. During a rolling upgrade, entries written by older brokers may not + // contain it (LookupResult derives it from the URL host:port in that case). Exclude it from equals/hashCode + // so that consumers of this object do not see spurious "changed" notifications when the same logical owner + // transitions from a missing brokerId to a populated one. + @EqualsAndHashCode.Exclude private String brokerId; private String nativeUrl; private String nativeUrlTls; diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java index b1d97a6a9cc..db6ce90c203 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java @@ -328,11 +328,14 @@ public class NamespaceService implements AutoCloseable { } /** - * Return the LookupResult of the broker who's owning a particular service unit. - * The LookupResult won't necessary be the broker who is owning the service unit. When the cluster contains - * multiple brokers with different load manager implementations, the LookupResult will be - * <p> - * If the service unit is not owned, return an empty optional + * Return the LookupResult of the broker that owns a particular service unit. + * + * <p>The returned LookupResult will not necessarily point to the broker that currently owns the service unit. + * When the cluster contains multiple brokers with different load manager implementations (e.g. during a + * load-manager migration), the LookupResult may point to a broker running the expected load manager, so the + * caller redirects the request to a broker that can handle it. + * + * <p>If the service unit is not owned, return an empty optional. */ public Optional<LookupResult> getLookupResultForWebRequest(ServiceUnitId suName, LookupOptions options) throws Exception { @@ -573,9 +576,7 @@ public class NamespaceService implements AutoCloseable { .get(config.getMetadataStoreOperationTimeoutSeconds(), SECONDS); if (candidateBroker == null) { - Optional<LeaderBroker> currentLeader = pulsar.getLeaderElectionService().getCurrentLeader() - // make a copy to avoid races, lookups will tolerate stale leader information - .map(Function.identity()); + Optional<LeaderBroker> currentLeader = pulsar.getLeaderElectionService().getCurrentLeader(); if (options.isAuthoritative()) { // leader broker already assigned the current broker as owner diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestProducerContext.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestProducerContext.java index 6b9192845ab..488a8be2110 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestProducerContext.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/RestProducerContext.java @@ -52,7 +52,7 @@ public class RestProducerContext implements TopicEventsListener { public void handleEvent(String topicName, TopicEvent event, EventStage stage, Throwable t) { // remove topic from owning topics when it's unloaded if (event == TopicEvent.UNLOAD && stage == EventStage.SUCCESS) { - addOrRemoveTopic(topicName, event == TopicEvent.UNLOAD); + addOrRemoveTopic(topicName, true); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/LookupResultTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/LookupResultTest.java index 553186e24eb..917bd1a3128 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/LookupResultTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/lookup/LookupResultTest.java @@ -25,10 +25,13 @@ import static org.testng.Assert.assertNotNull; import static org.testng.Assert.assertNull; import static org.testng.Assert.assertThrows; import static org.testng.Assert.assertTrue; +import static org.testng.Assert.expectThrows; import java.net.URI; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.Response; import org.apache.pulsar.broker.loadbalance.extensions.data.BrokerLookupData; import org.apache.pulsar.broker.namespace.LookupOptions; import org.apache.pulsar.broker.namespace.NamespaceEphemeralData; @@ -370,7 +373,8 @@ public class LookupResultTest { URI request = URI.create("https://original-host:1234/admin"); - assertThrows(NullPointerException.class, () -> result.toRedirectUri(request)); + WebApplicationException e = expectThrows(WebApplicationException.class, () -> result.toRedirectUri(request)); + assertEquals(e.getResponse().getStatus(), Response.Status.PRECONDITION_FAILED.getStatusCode()); } @Test
