This is an automated email from the ASF dual-hosted git repository. lhotari pushed a commit to branch branch-4.1 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit e626c0232adef93fdbe43e44fff0eba560b22538 Author: Ruimin MA <[email protected]> AuthorDate: Tue Nov 4 23:42:33 2025 +0800 [feat][client] PIP-234: Support shared resources in PulsarAdmin to reduce thread usage (#24893) (cherry picked from commit d186946f6f35f478d7e8e49ca5ac4986cc5568d2) --- .../pulsar/client/admin/PulsarAdminBuilder.java | 14 +++ .../admin/internal/PulsarAdminBuilderImpl.java | 12 ++- .../client/admin/internal/PulsarAdminImpl.java | 10 +- .../admin/internal/http/AsyncHttpConnector.java | 104 ++++++++++++++++++--- .../internal/http/AsyncHttpConnectorProvider.java | 5 +- .../admin/internal/PulsarAdminBuilderImplTest.java | 96 +++++++++++++++++++ .../internal/http/AsyncHttpConnectorTest.java | 10 +- 7 files changed, 228 insertions(+), 23 deletions(-) diff --git a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java index 216376c7eb6..4add07d588a 100644 --- a/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java +++ b/pulsar-client-admin-api/src/main/java/org/apache/pulsar/client/admin/PulsarAdminBuilder.java @@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.api.PulsarClientSharedResources; /** * Builder class for a {@link PulsarAdmin} instance. @@ -393,4 +394,17 @@ public interface PulsarAdminBuilder { * @throws IllegalArgumentException if the length of description exceeds 64 */ PulsarAdminBuilder description(String description); + + /** + * Provide a set of shared client resources to be reused by this client. + * <p> + * Providing a shared resource instance allows PulsarClient instances to share resources + * (only support IO/event loops, timers, DNS resolver/cache) with other PulsarClient + * instances, reducing memory footprint and thread usage when creating many clients in the same JVM. + * + * @param sharedResources the shared resources instance created with {@link PulsarClientSharedResources#builder()} + * @return the adminClient builder instance + */ + PulsarAdminBuilder sharedResources(PulsarClientSharedResources sharedResources); + } \ No newline at end of file diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java index a046f13f60e..f94f6890736 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImpl.java @@ -29,6 +29,8 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.PulsarClientException; import org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException; +import org.apache.pulsar.client.api.PulsarClientSharedResources; +import org.apache.pulsar.client.impl.PulsarClientSharedResourcesImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; @@ -39,10 +41,12 @@ public class PulsarAdminBuilderImpl implements PulsarAdminBuilder { private ClassLoader clientBuilderClassLoader = null; private boolean acceptGzipCompression = true; + private transient PulsarClientSharedResourcesImpl sharedResources; @Override public PulsarAdmin build() throws PulsarClientException { - return new PulsarAdminImpl(conf.getServiceUrl(), conf, clientBuilderClassLoader, acceptGzipCompression); + return new PulsarAdminImpl(conf.getServiceUrl(), conf, + clientBuilderClassLoader, acceptGzipCompression, sharedResources); } public PulsarAdminBuilderImpl() { @@ -292,4 +296,10 @@ public class PulsarAdminBuilderImpl implements PulsarAdminBuilder { this.conf.setDescription(description); return this; } + + @Override + public PulsarAdminBuilder sharedResources(PulsarClientSharedResources sharedResources) { + this.sharedResources = (PulsarClientSharedResourcesImpl) sharedResources; + return this; + } } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java index aaea8a89f8d..e4ca7724ca1 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/PulsarAdminImpl.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import javax.ws.rs.client.Client; import javax.ws.rs.client.ClientBuilder; import javax.ws.rs.client.WebTarget; +import lombok.Getter; import org.apache.commons.lang3.StringUtils; import org.apache.pulsar.client.admin.Bookies; import org.apache.pulsar.client.admin.BrokerStats; @@ -56,6 +57,7 @@ import org.apache.pulsar.client.admin.internal.http.AsyncHttpConnectorProvider; import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationFactory; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.PulsarClientSharedResourcesImpl; import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.common.net.ServiceURI; @@ -91,6 +93,7 @@ public class PulsarAdminImpl implements PulsarAdmin { private final ResourceQuotas resourceQuotas; private final ClientConfigurationData clientConfigData; private final Client client; + @Getter private final AsyncHttpConnector asyncHttpConnector; private final String serviceUrl; private final Lookup lookups; @@ -106,11 +109,12 @@ public class PulsarAdminImpl implements PulsarAdmin { public PulsarAdminImpl(String serviceUrl, ClientConfigurationData clientConfigData, ClassLoader clientBuilderClassLoader) throws PulsarClientException { - this(serviceUrl, clientConfigData, clientBuilderClassLoader, true); + this(serviceUrl, clientConfigData, clientBuilderClassLoader, true, null); } public PulsarAdminImpl(String serviceUrl, ClientConfigurationData clientConfigData, - ClassLoader clientBuilderClassLoader, boolean acceptGzipCompression) + ClassLoader clientBuilderClassLoader, boolean acceptGzipCompression, + PulsarClientSharedResourcesImpl sharedResources) throws PulsarClientException { checkArgument(StringUtils.isNotBlank(serviceUrl), "Service URL needs to be specified"); @@ -157,7 +161,7 @@ public class PulsarAdminImpl implements PulsarAdmin { Math.toIntExact(clientConfigData.getConnectionTimeoutMs()), Math.toIntExact(clientConfigData.getReadTimeoutMs()), Math.toIntExact(clientConfigData.getRequestTimeoutMs()), - clientConfigData.getAutoCertRefreshSeconds()); + clientConfigData.getAutoCertRefreshSeconds(), sharedResources); long requestTimeoutMs = clientConfigData.getRequestTimeoutMs(); this.clusters = new ClustersImpl(root, auth, requestTimeoutMs); diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java index b9ef758e3c6..ed17df8bd73 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnector.java @@ -28,12 +28,15 @@ import static org.asynchttpclient.util.HttpConstants.ResponseStatusCodes.SEE_OTH import static org.asynchttpclient.util.HttpConstants.ResponseStatusCodes.TEMPORARY_REDIRECT_307; import static org.asynchttpclient.util.MiscUtils.isNonEmpty; import com.spotify.futures.ConcurrencyReducer; +import io.netty.channel.EventLoopGroup; import io.netty.handler.codec.http.DefaultHttpHeaders; import io.netty.handler.codec.http.HttpRequest; import io.netty.handler.codec.http.HttpResponse; +import io.netty.resolver.NameResolver; import io.netty.util.concurrent.DefaultThreadFactory; import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.URI; import java.security.GeneralSecurityException; @@ -53,6 +56,7 @@ import javax.ws.rs.ProcessingException; import javax.ws.rs.client.Client; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response.Status; +import lombok.Data; import lombok.Getter; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; @@ -60,13 +64,17 @@ import org.apache.commons.lang3.Validate; import org.apache.pulsar.PulsarVersion; import org.apache.pulsar.client.admin.internal.PulsarAdminImpl; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.impl.PulsarClientSharedResourcesImpl; import org.apache.pulsar.client.impl.PulsarServiceNameResolver; import org.apache.pulsar.client.impl.ServiceNameResolver; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; +import org.apache.pulsar.client.util.ExecutorProvider; import org.apache.pulsar.client.util.PulsarHttpAsyncSslEngineFactory; import org.apache.pulsar.common.util.FutureUtil; import org.apache.pulsar.common.util.PulsarSslConfiguration; import org.apache.pulsar.common.util.PulsarSslFactory; +import org.apache.pulsar.common.util.netty.DnsResolverUtil; +import org.apache.pulsar.common.util.netty.EventLoopUtil; import org.asynchttpclient.AsyncCompletionHandlerBase; import org.asynchttpclient.AsyncHandler; import org.asynchttpclient.AsyncHttpClient; @@ -103,6 +111,10 @@ public class AsyncHttpConnector implements Connector, AsyncHttpRequestExecutor { new DefaultThreadFactory("delayer")); private ScheduledExecutorService sslRefresher; private final boolean acceptGzipCompression; + @Getter + private final NameResolver<InetAddress> nameResolver; + private final EventLoopGroup eventLoopGroup; + private final boolean createdEventLoopGroup; private final Map<String, ConcurrencyReducer<Response>> concurrencyReducers = new ConcurrentHashMap<>(); private PulsarSslFactory sslFactory; @@ -112,33 +124,66 @@ public class AsyncHttpConnector implements Connector, AsyncHttpRequestExecutor { (int) client.getConfiguration().getProperty(ClientProperties.READ_TIMEOUT), PulsarAdminImpl.DEFAULT_REQUEST_TIMEOUT_SECONDS * 1000, autoCertRefreshTimeSeconds, - conf, acceptGzipCompression); + conf, acceptGzipCompression, null); } @SneakyThrows public AsyncHttpConnector(int connectTimeoutMs, int readTimeoutMs, int requestTimeoutMs, int autoCertRefreshTimeSeconds, ClientConfigurationData conf, - boolean acceptGzipCompression) { + boolean acceptGzipCompression, + PulsarClientSharedResourcesImpl sharedResources) { Validate.notEmpty(conf.getServiceUrl(), "Service URL is not provided"); serviceNameResolver = new PulsarServiceNameResolver(); String serviceUrl = conf.getServiceUrl(); serviceNameResolver.updateServiceUrl(serviceUrl); this.acceptGzipCompression = acceptGzipCompression; + SharedResourceHolder sharedResourceHolder = + buildResourcesIfConfigured(sharedResources); + this.nameResolver = sharedResourceHolder.getNameResolver(); + this.eventLoopGroup = sharedResourceHolder.getEventLoopGroup(); + this.createdEventLoopGroup = sharedResourceHolder.isCreateEventLoop(); AsyncHttpClientConfig asyncHttpClientConfig = createAsyncHttpClientConfig(conf, connectTimeoutMs, readTimeoutMs, requestTimeoutMs, - autoCertRefreshTimeSeconds); + autoCertRefreshTimeSeconds, sharedResources); httpClient = createAsyncHttpClient(asyncHttpClientConfig); this.requestTimeout = requestTimeoutMs > 0 ? Duration.ofMillis(requestTimeoutMs) : null; this.maxRetries = httpClient.getConfig().getMaxRequestRetry(); } + private SharedResourceHolder buildResourcesIfConfigured( + PulsarClientSharedResourcesImpl sharedResources) { + EventLoopGroup eventLoopGroup = null; + NameResolver<InetAddress> nameResolver = null; + boolean createdEventLoopGroup = false; + if (sharedResources != null && sharedResources.getDnsResolverGroup() != null) { + if (sharedResources.getIoEventLoopGroup() != null) { + eventLoopGroup = sharedResources.getIoEventLoopGroup(); + } else { + // build an EventLoopGroup with default value + eventLoopGroup = EventLoopUtil.newEventLoopGroup( + Runtime.getRuntime().availableProcessors(), false, + new ExecutorProvider.ExtendedThreadFactory("pulsar-admin-client-io", + Thread.currentThread().isDaemon())); + createdEventLoopGroup = true; + } + nameResolver = DnsResolverUtil.adaptToNameResolver( + sharedResources.getDnsResolverGroup().createAddressResolver(eventLoopGroup)); + } else { + return SharedResourceHolder.EMPTY; + } + return new SharedResourceHolder(nameResolver, eventLoopGroup, createdEventLoopGroup); + } + private AsyncHttpClientConfig createAsyncHttpClientConfig(ClientConfigurationData conf, int connectTimeoutMs, int readTimeoutMs, - int requestTimeoutMs, int autoCertRefreshTimeSeconds) + int requestTimeoutMs, + int autoCertRefreshTimeSeconds, + PulsarClientSharedResourcesImpl sharedResources) throws GeneralSecurityException, IOException { DefaultAsyncHttpClientConfig.Builder confBuilder = new DefaultAsyncHttpClientConfig.Builder(); - configureAsyncHttpClientConfig(conf, connectTimeoutMs, readTimeoutMs, requestTimeoutMs, confBuilder); + configureAsyncHttpClientConfig(conf, connectTimeoutMs, + readTimeoutMs, requestTimeoutMs, confBuilder, sharedResources); if (conf.getServiceUrl().startsWith("https://")) { configureAsyncHttpClientSslEngineFactory(conf, autoCertRefreshTimeSeconds, confBuilder); } @@ -148,7 +193,8 @@ public class AsyncHttpConnector implements Connector, AsyncHttpRequestExecutor { private void configureAsyncHttpClientConfig(ClientConfigurationData conf, int connectTimeoutMs, int readTimeoutMs, int requestTimeoutMs, - DefaultAsyncHttpClientConfig.Builder confBuilder) { + DefaultAsyncHttpClientConfig.Builder confBuilder, + PulsarClientSharedResourcesImpl sharedResources) { if (conf.getConnectionsPerBroker() > 0) { confBuilder.setMaxConnectionsPerHost(conf.getConnectionsPerBroker()); // Use the request timeout value for acquireFreeChannelTimeout so that we don't need to add @@ -159,6 +205,14 @@ public class AsyncHttpConnector implements Connector, AsyncHttpRequestExecutor { if (conf.getConnectionMaxIdleSeconds() > 0) { confBuilder.setPooledConnectionIdleTimeout(conf.getConnectionMaxIdleSeconds() * 1000); } + if (sharedResources != null) { + if (this.eventLoopGroup != null) { + confBuilder.setEventLoopGroup(this.eventLoopGroup); + } + if (sharedResources.getTimer() != null) { + confBuilder.setNettyTimer(sharedResources.getTimer()); + } + } confBuilder.setCookieStore(null); confBuilder.setUseProxyProperties(true); confBuilder.setFollowRedirect(false); @@ -177,7 +231,7 @@ public class AsyncHttpConnector implements Connector, AsyncHttpRequestExecutor { HttpRequest request, HttpResponse response) { // Close connection upon a server error or per HTTP spec return (response.status().code() / 100 != 5) - && super.keepAlive(remoteAddress, ahcRequest, request, response); + && super.keepAlive(remoteAddress, ahcRequest, request, response); } }); confBuilder.setDisableHttpsEndpointIdentificationAlgorithm(!conf.isTlsHostnameVerificationEnable()); @@ -331,9 +385,9 @@ public class AsyncHttpConnector implements Connector, AsyncHttpRequestExecutor { throwable); } resultFuture.completeExceptionally( - new RetryException("Could not complete the operation. Number of retries " - + "has been exhausted. Failed reason: " + throwable.getMessage(), - throwable)); + new RetryException("Could not complete the operation. Number of retries " + + "has been exhausted. Failed reason: " + throwable.getMessage(), + throwable)); } } } else { @@ -376,7 +430,7 @@ public class AsyncHttpConnector implements Connector, AsyncHttpRequestExecutor { } public CompletableFuture<Response> executeRequest(Request request, - Supplier<AsyncHandler<Response>> handlerSupplier) { + Supplier<AsyncHandler<Response>> handlerSupplier) { return executeRequest(request, handlerSupplier, 0); } @@ -426,6 +480,9 @@ public class AsyncHttpConnector implements Connector, AsyncHttpRequestExecutor { if (switchToGet) { builder.setMethod(GET); } + if (this.nameResolver != null) { + builder.setNameResolver(this.nameResolver); + } builder.setUri(newUri); if (keepBody) { builder.setCharset(request.getCharset()); @@ -433,7 +490,7 @@ public class AsyncHttpConnector implements Connector, AsyncHttpRequestExecutor { builder.setFormParams(request.getFormParams()); } else if (request.getStringData() != null) { builder.setBody(request.getStringData()); - } else if (request.getByteData() != null){ + } else if (request.getByteData() != null) { builder.setBody(request.getByteData()); } else if (request.getByteBufferData() != null) { builder.setBody(request.getByteBufferData()); @@ -485,6 +542,9 @@ public class AsyncHttpConnector implements Connector, AsyncHttpRequestExecutor { BoundRequestBuilder builder = httpClient.prepare(currentRequest.getMethod(), currentRequest.getUri().toString()); + if (this.nameResolver != null) { + builder.setNameResolver(this.nameResolver); + } if (currentRequest.hasEntity()) { ByteArrayOutputStream outStream = new ByteArrayOutputStream(); currentRequest.setStreamProvider(contentLength -> outStream); @@ -518,6 +578,9 @@ public class AsyncHttpConnector implements Connector, AsyncHttpRequestExecutor { if (sslRefresher != null) { sslRefresher.shutdownNow(); } + if (createdEventLoopGroup && eventLoopGroup != null && !eventLoopGroup.isShutdown()) { + eventLoopGroup.shutdownGracefully(); + } } catch (IOException e) { log.warn("Failed to close http client", e); } @@ -556,4 +619,21 @@ public class AsyncHttpConnector implements Connector, AsyncHttpRequestExecutor { } } + @Data + private static class SharedResourceHolder { + static final SharedResourceHolder EMPTY = new SharedResourceHolder(null, null, false); + + final NameResolver<InetAddress> nameResolver; + final EventLoopGroup eventLoopGroup; + final boolean createEventLoop; + + SharedResourceHolder(NameResolver<InetAddress> nameResolver, + EventLoopGroup eventLoopGroup, + boolean createEventLoop) { + this.nameResolver = nameResolver; + this.eventLoopGroup = eventLoopGroup; + this.createEventLoop = createEventLoop; + } + } + } diff --git a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java index d20dc848494..caaa356f7c7 100644 --- a/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java +++ b/pulsar-client-admin/src/main/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorProvider.java @@ -20,6 +20,7 @@ package org.apache.pulsar.client.admin.internal.http; import javax.ws.rs.client.Client; import javax.ws.rs.core.Configuration; +import org.apache.pulsar.client.impl.PulsarClientSharedResourcesImpl; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.glassfish.jersey.client.spi.Connector; import org.glassfish.jersey.client.spi.ConnectorProvider; @@ -51,8 +52,8 @@ public class AsyncHttpConnectorProvider implements ConnectorProvider { public AsyncHttpConnector getConnector(int connectTimeoutMs, int readTimeoutMs, int requestTimeoutMs, - int autoCertRefreshTimeSeconds) { + int autoCertRefreshTimeSeconds, PulsarClientSharedResourcesImpl sharedResources) { return new AsyncHttpConnector(connectTimeoutMs, readTimeoutMs, requestTimeoutMs, autoCertRefreshTimeSeconds, - conf, acceptGzipCompression); + conf, acceptGzipCompression, sharedResources); } } diff --git a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java index 9339c893492..987b054426b 100644 --- a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java +++ b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/PulsarAdminBuilderImplTest.java @@ -23,10 +23,16 @@ import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException import static org.assertj.core.api.Assertions.assertThatThrownBy; import com.google.gson.Gson; import com.google.gson.JsonObject; +import io.netty.channel.EventLoopGroup; +import io.netty.resolver.NameResolver; +import io.netty.util.Timer; import java.io.IOException; +import java.net.InetAddress; +import java.net.InetSocketAddress; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.TimeUnit; import lombok.Cleanup; import lombok.SneakyThrows; import org.apache.pulsar.client.admin.PulsarAdmin; @@ -35,6 +41,7 @@ import org.apache.pulsar.client.api.Authentication; import org.apache.pulsar.client.api.AuthenticationDataProvider; import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport; import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.client.api.PulsarClientSharedResources; import org.apache.pulsar.client.impl.auth.AuthenticationDisabled; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.testng.Assert; @@ -188,6 +195,95 @@ public class PulsarAdminBuilderImplTest { PulsarAdmin.builder().serviceHttpUrl("http://localhost:8080").description("forked").build(); } + @Test + public void testClientBuildWithSharedResources() throws PulsarClientException { + PulsarClientSharedResources sharedResources = PulsarClientSharedResources.builder() + .configureEventLoop(eventLoopGroupConfig -> { + eventLoopGroupConfig + .name("testEventLoop") + .numberOfThreads(20); + }) + .configureDnsResolver(dnsResolverConfig -> { + dnsResolverConfig.localAddress(new InetSocketAddress(0)); + }) + .configureTimer(timerConfig -> { + timerConfig.name("testTimer").tickDuration(100, TimeUnit.MILLISECONDS); + }) + .build(); + // create two adminClients and check if they share the same event loop group and netty timer + @Cleanup + PulsarAdminImpl pulsarAdminImpl1 = + (PulsarAdminImpl) PulsarAdmin.builder() + .serviceHttpUrl("http://localhost:8080") + .sharedResources(sharedResources) + .build(); + @Cleanup + PulsarAdminImpl pulsarAdminImpl2 = + (PulsarAdminImpl) PulsarAdmin.builder() + .serviceHttpUrl("http://localhost:8080") + .sharedResources(sharedResources) + .build(); + + EventLoopGroup eventLoopGroup1 = + pulsarAdminImpl1.getAsyncHttpConnector().getHttpClient().getConfig().getEventLoopGroup(); + EventLoopGroup eventLoopGroup2 = + pulsarAdminImpl2.getAsyncHttpConnector().getHttpClient().getConfig().getEventLoopGroup(); + Timer nettyTimer1 = pulsarAdminImpl1.getAsyncHttpConnector().getHttpClient().getConfig().getNettyTimer(); + Timer nettyTimer2 = pulsarAdminImpl2.getAsyncHttpConnector().getHttpClient().getConfig().getNettyTimer(); + assertThat(eventLoopGroup1).isSameAs(eventLoopGroup2); + assertThat(nettyTimer1).isSameAs(nettyTimer2); + sharedResources.close(); + } + + @Test + public void testClientBuildWithSharedDnsResolverOnly() throws PulsarClientException { + PulsarClientSharedResources sharedResources = PulsarClientSharedResources.builder() + .shareConfigured() + .configureDnsResolver(dnsResolverConfig -> { + dnsResolverConfig.localAddress(new InetSocketAddress(0)); + }) + .build(); + + @Cleanup + PulsarAdminImpl pulsarAdminImpl1 = + (PulsarAdminImpl) PulsarAdmin.builder() + .serviceHttpUrl("http://localhost:8080") + .sharedResources(sharedResources) + .build(); + @Cleanup + PulsarAdminImpl pulsarAdminImpl2 = + (PulsarAdminImpl) PulsarAdmin.builder() + .serviceHttpUrl("http://localhost:8080") + .sharedResources(sharedResources) + .build(); + + EventLoopGroup eventLoopGroup1 = + pulsarAdminImpl1.getAsyncHttpConnector().getHttpClient().getConfig().getEventLoopGroup(); + EventLoopGroup eventLoopGroup2 = + pulsarAdminImpl2.getAsyncHttpConnector().getHttpClient().getConfig().getEventLoopGroup(); + Timer nettyTimer1 = pulsarAdminImpl1.getAsyncHttpConnector().getHttpClient().getConfig().getNettyTimer(); + Timer nettyTimer2 = pulsarAdminImpl2.getAsyncHttpConnector().getHttpClient().getConfig().getNettyTimer(); + NameResolver<InetAddress> nameResolver1 = pulsarAdminImpl1.getAsyncHttpConnector().getNameResolver(); + NameResolver<InetAddress> nameResolver2 = pulsarAdminImpl2.getAsyncHttpConnector().getNameResolver(); + + // test eventLoop will be created when dnsResolver is configured + assertThat(eventLoopGroup1).isNotNull(); + assertThat(eventLoopGroup2).isNotNull(); + assertThat(eventLoopGroup2).isNotSameAs(eventLoopGroup1); + + // timer will not be created when timer is not configured + assertThat(nettyTimer1).isSameAs(nettyTimer2).isNull(); + + assertThat(nameResolver1).isNotNull(); + assertThat(nameResolver2).isNotNull(); + + // test eventLoop will shut down when AsyncHttpConnector is closed + pulsarAdminImpl1.getAsyncHttpConnector().close(); + assertThat(eventLoopGroup1.isShuttingDown()).isTrue(); + + sharedResources.close(); + } + @Test public void testClientDescriptionLengthExceed64() { String longDescription = "a".repeat(65); diff --git a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java index f8518b59310..e0b4b13a03d 100644 --- a/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java +++ b/pulsar-client-admin/src/test/java/org/apache/pulsar/client/admin/internal/http/AsyncHttpConnectorTest.java @@ -176,7 +176,7 @@ public class AsyncHttpConnectorTest { }; @Cleanup AsyncHttpConnector connector = new AsyncHttpConnector(5000, requestTimeout, - requestTimeout, 0, conf, false) { + requestTimeout, 0, conf, false, null) { @Override protected CompletableFuture<Response> oneShot(InetSocketAddress host, ClientRequest request) { // delay the response to simulate a timeout @@ -226,7 +226,7 @@ public class AsyncHttpConnectorTest { @Cleanup AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000, - 5000, 0, conf, false); + 5000, 0, conf, false, null); Request request = new RequestBuilder("GET") .setUrl("http://localhost:" + server.port() + "/admin/v2/clusters") @@ -272,7 +272,7 @@ public class AsyncHttpConnectorTest { @Cleanup AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000, - 5000, 0, conf, false); + 5000, 0, conf, false, null); Request request = new RequestBuilder("GET") .setUrl("http://localhost:" + server.port() + "/path1") @@ -298,7 +298,7 @@ public class AsyncHttpConnectorTest { @Cleanup AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000, - 5000, 0, conf, false); + 5000, 0, conf, false, null); Request request = new RequestBuilder("POST") .setUrl("http://localhost:" + server.port() + "/path1") @@ -322,7 +322,7 @@ public class AsyncHttpConnectorTest { @Cleanup AsyncHttpConnector connector = new AsyncHttpConnector(5000, 5000, - 5000, 0, conf, false); + 5000, 0, conf, false, null); Request request = new RequestBuilder("POST") .setUrl("http://localhost:" + server.port() + "/concurrency-test")
