This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch lh-support-webservice-bindaddresses-and-advertised-listeners in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit ae50a871d7b86f76002de685622b647cfc969b3a Author: Lari Hotari <[email protected]> AuthorDate: Wed May 20 14:52:46 2026 +0300 [fix][broker] Fix CI failures in webservice bindAddresses/advertisedListeners PR Address five failing unit tests from the previous CI run: - PulsarService.start() rejected configurations without webServicePort/ webServicePortTls even when http/https bindAddresses were configured; relax the precondition to also accept http/https bindAddresses (fixes PulsarMultiListenersWithoutInternalListenerNameTest.setup). - BrokerLookupData.toLookupResult silently fell back to default URLs when an unknown advertisedListenerName was requested; restore the legacy PulsarServerException so callers (ExtensibleLoadManagerWrapper, ServerCnx) fail fast (fixes BrokerLookupDataTest.testConstructors). - LookupResult.toRedirectUri unconditionally removed the listenerName query parameter when the result had no brokerServiceListenerName; preserve the caller's original listenerName when the result doesn't specify one (fixes LookupResultTest.testToRedirectUriPreservesExistingQueryParameters). - PulsarWebResource.getWebServiceListenerName threw NPE in unit tests where httpRequest is not injected; return null when httpRequest is null (fixes PersistentTopicsTest.testCreateExistedPartition and similar mock-based admin tests). - Update RedirectManagerForLoadManagerMigrationTest to set loadManagerMigrationEnabled=true since the redirect manager now no-ops unless the flag is enabled. - Update NamespacesTest to mock getLookupResultForWebRequest with a real LookupResult instead of a URL (the mocks were not updated when the method signature changed, causing ClassCastException at runtime). --- .../org/apache/pulsar/broker/PulsarService.java | 9 +++- .../extensions/data/BrokerLookupData.java | 5 ++ .../apache/pulsar/broker/lookup/LookupResult.java | 6 +-- .../pulsar/broker/web/PulsarWebResource.java | 3 ++ .../apache/pulsar/broker/admin/NamespacesTest.java | 59 +++++++++++++++++----- ...RedirectManagerForLoadManagerMigrationTest.java | 1 + 6 files changed, 64 insertions(+), 19 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java index b587fa3f935..c15153f3443 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java @@ -39,6 +39,7 @@ import java.net.MalformedURLException; import java.time.Clock; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; @@ -135,6 +136,7 @@ import org.apache.pulsar.broker.transaction.buffer.TransactionBufferProvider; import org.apache.pulsar.broker.transaction.buffer.impl.TransactionBufferClientImpl; import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider; import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStoreProvider; +import org.apache.pulsar.broker.validator.BindAddressValidator; import org.apache.pulsar.broker.validator.MultipleListenerValidator; import org.apache.pulsar.broker.validator.TransactionBatchedWriteValidator; import org.apache.pulsar.broker.web.RestException; @@ -858,8 +860,11 @@ public class PulsarService implements AutoCloseable, ShutdownService { throw new PulsarServerException("Cannot start the service once it was stopped"); } - if (config.getWebServicePort().isEmpty() && config.getWebServicePortTls().isEmpty()) { - throw new IllegalArgumentException("webServicePort/webServicePortTls must be present"); + if (config.getWebServicePort().isEmpty() + && config.getWebServicePortTls().isEmpty() + && BindAddressValidator.validateBindAddresses(config, Arrays.asList("http", "https")).isEmpty()) { + throw new IllegalArgumentException( + "webServicePort/webServicePortTls or http/https bindAddresses must be present"); } if (config.isAuthorizationEnabled() && !config.isAuthenticationEnabled()) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java index 09ccf94bc87..f3dbec5fadd 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/extensions/data/BrokerLookupData.java @@ -89,6 +89,11 @@ public record BrokerLookupData (String brokerId, } public LookupResult toLookupResult(LookupOptions options) throws PulsarServerException { + if (options.hasAdvertisedListenerName() + && !advertisedListeners.containsKey(options.getAdvertisedListenerName())) { + throw new PulsarServerException("the broker do not have " + + options.getAdvertisedListenerName() + " listener"); + } return LookupResult.create(this, options); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java index 0cd472f8114..3d46ef16cda 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/LookupResult.java @@ -262,12 +262,10 @@ public class LookupResult { // remove the parameter when the type is not redirect uriBuilder.replaceQueryParam("authoritative"); } - // pass the listener parameter + // override the listener parameter only when the lookup result specifies one; + // otherwise leave the original request's listenerName query parameter untouched if (StringUtils.isNotBlank(brokerServiceListenerName)) { uriBuilder.replaceQueryParam(LISTENERNAME_PARAM, brokerServiceListenerName); - } else { - // remove the parameter when the listener name is not specified - uriBuilder.replaceQueryParam(LISTENERNAME_PARAM); } return uriBuilder.build(); } diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java index b8f41d0d327..5bf0ea14da7 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/web/PulsarWebResource.java @@ -185,6 +185,9 @@ public abstract class PulsarWebResource { } public String getWebServiceListenerName() { + if (httpRequest == null) { + return null; + } return (String) httpRequest.getAttribute(WebService.ATTRIBUTE_LISTENER_NAME); } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java index 7f31df02344..0f935908580 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/NamespacesTest.java @@ -37,7 +37,6 @@ import static org.testng.Assert.fail; import com.google.common.collect.Sets; import java.lang.reflect.Field; import java.net.URI; -import java.net.URL; import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; @@ -77,6 +76,7 @@ import org.apache.pulsar.broker.BrokerTestUtil; import org.apache.pulsar.broker.admin.v2.Namespaces; import org.apache.pulsar.broker.admin.v2.PersistentTopics; import org.apache.pulsar.broker.auth.MockedPulsarServiceBaseTest; +import org.apache.pulsar.broker.lookup.LookupResult; import org.apache.pulsar.broker.namespace.LookupOptions; import org.apache.pulsar.broker.namespace.NamespaceEphemeralData; import org.apache.pulsar.broker.namespace.NamespaceService; @@ -738,7 +738,10 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest { doReturn(uri).when(uriInfo).getRequestUri(); // setup to redirect to another broker in the same cluster - doReturn(Optional.of(new URL("http://otherhost" + ":" + 8080))).when(nsSvc) + doReturn(Optional.of(LookupResult.builder() + .type(LookupResult.Type.RedirectUrl) + .httpUrl("http://otherhost:8080") + .build())).when(nsSvc) .getLookupResultForWebRequest(Mockito.argThat(new ArgumentMatcher<NamespaceName>() { @Override public boolean matches(NamespaceName nsname) { @@ -783,7 +786,10 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest { new byte[0], null, null); // setup ownership to localhost - URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress()); + LookupResult localWebServiceUrl = LookupResult.builder() + .type(LookupResult.Type.BrokerUrl) + .httpUrl(pulsar.getSafeWebServiceAddress()) + .build(); LookupOptions options = LookupOptions.builder().authoritative(false) .readOnly(false).build(); doReturn(Optional.of(localWebServiceUrl)).when(nsSvc).getLookupResultForWebRequest(testNs, options); @@ -862,7 +868,10 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest { uriField.set(namespaces, uriInfo); doReturn(URI.create(pulsar.getWebServiceAddress() + "/dummy/uri")).when(uriInfo).getRequestUri(); - URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress()); + LookupResult localWebServiceUrl = LookupResult.builder() + .type(LookupResult.Type.BrokerUrl) + .httpUrl(pulsar.getSafeWebServiceAddress()) + .build(); String bundledNsLocal = "test-delete-namespace-with-bundles"; List<String> boundaries = List.of("0x00000000", "0x80000000", "0xffffffff"); BundlesData bundleData = BundlesData.builder() @@ -940,7 +949,10 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest { @Test public void testUnloadNamespaces() throws Exception { final NamespaceName testNs = this.testLocalNamespaces.get(1); - URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress()); + LookupResult localWebServiceUrl = LookupResult.builder() + .type(LookupResult.Type.BrokerUrl) + .httpUrl(pulsar.getSafeWebServiceAddress()) + .build(); doReturn(Optional.of(localWebServiceUrl)).when(nsSvc) .getLookupResultForWebRequest(Mockito.argThat(ns -> ns.equals(testNs)), Mockito.any()); doReturn(true).when(nsSvc).isServiceUnitOwned(Mockito.argThat(ns -> ns.equals(testNs))); @@ -959,7 +971,10 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest { @SuppressWarnings("deprecation") @Test public void testSplitBundles() throws Exception { - URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress()); + LookupResult localWebServiceUrl = LookupResult.builder() + .type(LookupResult.Type.BrokerUrl) + .httpUrl(pulsar.getSafeWebServiceAddress()) + .build(); String bundledNsLocal = "test-bundled-namespace-1"; List<String> boundaries = List.of("0x00000000", "0xffffffff"); BundlesData bundleData = BundlesData.builder() @@ -1004,7 +1019,10 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest { @SuppressWarnings("deprecation") @Test public void testSplitBundleWithUnDividedRange() throws Exception { - URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress()); + LookupResult localWebServiceUrl = LookupResult.builder() + .type(LookupResult.Type.BrokerUrl) + .httpUrl(pulsar.getSafeWebServiceAddress()) + .build(); String bundledNsLocal = "test-bundled-namespace-1"; List<String> boundaries = List.of("0x00000000", "0x08375b1a", "0x08375b1b", "0xffffffff"); BundlesData bundleData = BundlesData.builder() @@ -1035,7 +1053,10 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest { @Test public void testUnloadNamespaceWithBundles() throws Exception { - URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress()); + LookupResult localWebServiceUrl = LookupResult.builder() + .type(LookupResult.Type.BrokerUrl) + .httpUrl(pulsar.getSafeWebServiceAddress()) + .build(); String bundledNsLocal = "test-bundled-namespace-1"; List<String> boundaries = List.of("0x00000000", "0x80000000", "0xffffffff"); BundlesData bundleData = BundlesData.builder() @@ -1113,7 +1134,10 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest { @Test public void testRetention() throws Exception { try { - URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress()); + LookupResult localWebServiceUrl = LookupResult.builder() + .type(LookupResult.Type.BrokerUrl) + .httpUrl(pulsar.getSafeWebServiceAddress()) + .build(); String bundledNsLocal = "test-bundled-namespace-1"; List<String> boundaries = List.of("0x00000000", "0xffffffff"); BundlesData bundleData = BundlesData.builder() @@ -1203,7 +1227,10 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest { @SuppressWarnings("deprecation") @Test public void testValidateTopicOwnership() throws Exception { - URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress()); + LookupResult localWebServiceUrl = LookupResult.builder() + .type(LookupResult.Type.BrokerUrl) + .httpUrl(pulsar.getSafeWebServiceAddress()) + .build(); String bundledNsLocal = "test-bundled-namespace-1"; List<String> boundaries = List.of("0x00000000", "0xffffffff"); BundlesData bundleData = BundlesData.builder() @@ -1596,7 +1623,7 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest { admin.namespaces().deleteNamespace(namespace); } - private void mockWebUrl(URL localWebServiceUrl, NamespaceName namespace) throws Exception { + private void mockWebUrl(LookupResult localWebServiceUrl, NamespaceName namespace) throws Exception { doReturn(Optional.of(localWebServiceUrl)).when(nsSvc) .getLookupResultForWebRequest(Mockito.argThat(bundle -> bundle.getNamespaceObject().equals(namespace)), Mockito.any()); @@ -2057,7 +2084,10 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest { int initBundleCount = 4; BundlesData data = BundlesData.builder().numBundles(initBundleCount).build(); admin.namespaces().createNamespace(namespace, data); - URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress()); + LookupResult localWebServiceUrl = LookupResult.builder() + .type(LookupResult.Type.BrokerUrl) + .httpUrl(pulsar.getSafeWebServiceAddress()) + .build(); final NamespaceName testNs = NamespaceName.get(namespace); mockWebUrl(localWebServiceUrl, testNs); for (int i = 0; i < 10; i++) { @@ -2618,7 +2648,10 @@ public class NamespacesTest extends MockedPulsarServiceBaseTest { @Test public void testBundleValidationAfterSplit() throws Exception { - URL localWebServiceUrl = new URL(pulsar.getSafeWebServiceAddress()); + LookupResult localWebServiceUrl = LookupResult.builder() + .type(LookupResult.Type.BrokerUrl) + .httpUrl(pulsar.getSafeWebServiceAddress()) + .build(); String bundledNsLocal = "test-bundle-validation-after-split"; List<String> boundaries = List.of("0x00000000", "0xffffffff"); BundlesData bundleData = BundlesData.builder() diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerForLoadManagerMigrationTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerForLoadManagerMigrationTest.java index 057a71f2a60..ee28f730080 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerForLoadManagerMigrationTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/manager/RedirectManagerForLoadManagerMigrationTest.java @@ -54,6 +54,7 @@ public class RedirectManagerForLoadManagerMigrationTest { configuration.setLoadManagerClassName(ModularLoadManagerImpl.class.getName()); configuration.setLoadBalancerDebugModeEnabled(true); + configuration.setLoadManagerMigrationEnabled(true); // Test 1: No load manager class name found. doReturn(CompletableFuture.completedFuture(
