This is an automated email from the ASF dual-hosted git repository.
lhotari pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 8559b689fc6 [feat][client] Implement PIP-234 for sharing thread pools
and DNS resolver/cache across multiple Pulsar Client instances (#24790)
8559b689fc6 is described below
commit 8559b689fc6cdca2d6e13aeffb56996fc46a6f04
Author: Lari Hotari <[email protected]>
AuthorDate: Sun Sep 28 02:21:51 2025 +0300
[feat][client] Implement PIP-234 for sharing thread pools and DNS
resolver/cache across multiple Pulsar Client instances (#24790)
---
build/run_unit_group.sh | 5 +-
.../org/apache/pulsar/broker/PulsarService.java | 2 +-
.../broker/auth/MockedPulsarServiceBaseTest.java | 15 +-
.../broker/service/ExclusiveProducerTest.java | 2 +
.../broker/service/TopicTerminationTest.java | 2 +
.../pulsar/client/api/ConsumerCleanupTest.java | 2 +
.../pulsar/client/api/ProducerCleanupTest.java | 2 +
...MultiListenersWithInternalListenerNameTest.java | 8 +-
.../client/api/SimpleProducerConsumerTest.java | 58 +++++
.../pulsar/client/impl/PulsarTestClient.java | 2 +-
.../pulsar/client/impl/TopicDoesNotExistsTest.java | 4 +-
.../apache/pulsar/client/api/ClientBuilder.java | 14 +
.../pulsar/client/api/DnsResolverConfig.java | 126 +++++++++
.../pulsar/client/api/EventLoopGroupConfig.java | 28 +-
.../client/api/PulsarClientSharedResources.java | 127 +++++++++
.../api/PulsarClientSharedResourcesBuilder.java | 122 +++++++++
.../apache/pulsar/client/api/ThreadPoolConfig.java | 48 ++++
.../org/apache/pulsar/client/api/TimerConfig.java | 35 ++-
.../PulsarClientImplementationBinding.java | 3 +
.../pulsar/client/impl/AutoClusterFailover.java | 11 +-
.../pulsar/client/impl/ClientBuilderImpl.java | 20 +-
.../apache/pulsar/client/impl/ConnectionPool.java | 2 +-
.../client/impl/ControlledClusterFailover.java | 26 +-
.../pulsar/client/impl/DnsResolverGroupImpl.java | 54 +++-
.../org/apache/pulsar/client/impl/HttpClient.java | 14 +-
.../pulsar/client/impl/HttpLookupService.java | 14 +-
.../pulsar/client/impl/PulsarClientImpl.java | 67 ++---
.../PulsarClientImplementationBindingImpl.java | 6 +
.../impl/PulsarClientResourcesConfigurer.java | 149 +++++++++++
.../PulsarClientSharedResourcesBuilderImpl.java | 283 +++++++++++++++++++++
.../impl/PulsarClientSharedResourcesImpl.java | 160 ++++++++++++
.../pulsar/client/util/ExecutorProvider.java | 6 +-
.../client/util/ScheduledExecutorProvider.java | 3 +
.../src/main/resources/findbugsExclude.xml | 9 +
.../client/impl/ControlledClusterFailoverTest.java | 6 +
...PulsarClientSharedResourcesBuilderImplTest.java | 136 ++++++++++
.../pulsar/common/util/netty/DnsResolverUtil.java | 118 ++++++++-
.../pulsar/common/util/netty/EventLoopUtil.java | 76 +++++-
.../pulsar/common/util/netty/DnsResolverTest.java | 1 +
.../common/util/netty/DnsResolverUtilTest.java | 68 +++++
40 files changed, 1727 insertions(+), 107 deletions(-)
diff --git a/build/run_unit_group.sh b/build/run_unit_group.sh
index 02347648b94..cfdf94d6436 100755
--- a/build/run_unit_group.sh
+++ b/build/run_unit_group.sh
@@ -177,10 +177,13 @@ function test_group_other() {
**/OffloadersCacheTest.java
**/PrimitiveSchemaTest.java,
**/BlobStoreManagedLedgerOffloaderTest.java,
- **/BlobStoreManagedLedgerOffloaderStreamingTest.java'
+ **/BlobStoreManagedLedgerOffloaderStreamingTest.java,
+ **/DnsResolverTest.java'
mvn_test -pl managed-ledger -Dinclude='**/ManagedLedgerTest.java,
**/OffloadersCacheTest.java'
+ # DnsResolverTest needs to be run separately since it relies on static field
values
+ mvn_test -pl pulsar-common -Dinclude='**/DnsResolverTest.java'
mvn_test -pl tiered-storage/jcloud
-Dinclude='**/BlobStoreManagedLedgerOffloaderTest.java'
mvn_test -pl tiered-storage/jcloud
-Dinclude='**/BlobStoreManagedLedgerOffloaderStreamingTest.java'
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
index aa5c0b0ad2d..15fe1dcd610 100644
--- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
+++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/PulsarService.java
@@ -401,7 +401,7 @@ public class PulsarService implements AutoCloseable,
ShutdownService {
this.brokerClientSharedLookupExecutorProvider =
new ScheduledExecutorProvider(1,
"broker-client-shared-lookup-executor");
this.brokerClientSharedDnsResolverGroup =
- new DnsResolverGroupImpl(this.ioEventLoopGroup,
+ new DnsResolverGroupImpl(
loadBrokerClientProperties(new
ClientConfigurationData()));
// here in the constructor we don't have the offloader scheduler yet
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
index 6fdfee86b9f..b349ba71922 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/auth/MockedPulsarServiceBaseTest.java
@@ -182,7 +182,15 @@ public abstract class MockedPulsarServiceBaseTest extends
TestRetrySupport {
}
private URI resolveLookupUrl() {
- if (isTcpLookup) {
+ return resolveLookupUrl(isTcpLookup);
+ }
+
+ protected String brokerServiceUrl(boolean usePulsarBinaryProtocol) {
+ return resolveLookupUrl(usePulsarBinaryProtocol).toString();
+ }
+
+ private URI resolveLookupUrl(boolean usePulsarBinaryProtocol) {
+ if (usePulsarBinaryProtocol) {
return URI.create(pulsar.getBrokerServiceUrl());
} else {
return URI.create(brokerUrl != null
@@ -801,5 +809,10 @@ public abstract class MockedPulsarServiceBaseTest extends
TestRetrySupport {
BrokerTestUtil.logTopicStats(log, admin, topic);
}
+ @DataProvider(name = "trueFalse")
+ public static Object[][] trueFalse() {
+ return new Object[][] { { Boolean.TRUE }, { Boolean.FALSE } };
+ }
+
private static final Logger log =
LoggerFactory.getLogger(MockedPulsarServiceBaseTest.class);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
index 9a070efa95d..c108d5b8ca1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/ExclusiveProducerTest.java
@@ -49,6 +49,8 @@ public class ExclusiveProducerTest extends BrokerTestBase {
@BeforeClass
protected void setup() throws Exception {
+ // use Pulsar binary lookup since the HTTP client shares the Pulsar
client timer
+ isTcpLookup = true;
baseSetup();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
index a2436e4360e..e4e361eb0b1 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/TopicTerminationTest.java
@@ -56,6 +56,8 @@ public class TopicTerminationTest extends BrokerTestBase {
@BeforeMethod
@Override
protected void setup() throws Exception {
+ // use Pulsar binary lookup since the HTTP client shares the Pulsar
client timer
+ isTcpLookup = true;
super.baseSetup();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java
index cc7c2cabb87..0cbfde06381 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ConsumerCleanupTest.java
@@ -34,6 +34,8 @@ public class ConsumerCleanupTest extends ProducerConsumerBase
{
@BeforeClass
@Override
protected void setup() throws Exception {
+ // use Pulsar binary lookup since the HTTP client shares the Pulsar
client timer
+ isTcpLookup = true;
super.internalSetup();
super.producerBaseSetup();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCleanupTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCleanupTest.java
index 5ad3c85441b..f4daba3bf05 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCleanupTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/ProducerCleanupTest.java
@@ -33,6 +33,8 @@ public class ProducerCleanupTest extends ProducerConsumerBase
{
@BeforeMethod
@Override
protected void setup() throws Exception {
+ // use Pulsar binary lookup since the HTTP client shares the Pulsar
client timer
+ isTcpLookup = true;
super.internalSetup();
super.producerBaseSetup();
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
index 094c32c7ae6..2844d211a30 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/PulsarMultiListenersWithInternalListenerNameTest.java
@@ -22,6 +22,8 @@ import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doReturn;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.resolver.DefaultNameResolver;
+import io.netty.util.concurrent.ImmediateEventExecutor;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.InetAddress;
@@ -137,7 +139,8 @@ public class
PulsarMultiListenersWithInternalListenerNameTest extends MockedPuls
conf.setMaxLookupRedirects(10);
@Cleanup
- LookupService lookupService = useHttp ? new
HttpLookupService(InstrumentProvider.NOOP, conf, eventExecutors) :
+ LookupService lookupService = useHttp ? new
HttpLookupService(InstrumentProvider.NOOP, conf, eventExecutors,
+ null, new
DefaultNameResolver(ImmediateEventExecutor.INSTANCE)) :
new BinaryProtoLookupService((PulsarClientImpl)
this.pulsarClient,
lookupUrl.toString(), "internal", false, this.executorService);
TopicName topicName =
TopicName.get("persistent://public/default/test");
@@ -173,7 +176,8 @@ public class
PulsarMultiListenersWithInternalListenerNameTest extends MockedPuls
conf.setMaxLookupRedirects(10);
@Cleanup
- HttpLookupService lookupService = new
HttpLookupService(InstrumentProvider.NOOP, conf, eventExecutors);
+ HttpLookupService lookupService = new
HttpLookupService(InstrumentProvider.NOOP, conf, eventExecutors, null,
+ new DefaultNameResolver(ImmediateEventExecutor.INSTANCE));
NamespaceService namespaceService = pulsar.getNamespaceService();
LookupResult lookupResult = new
LookupResult(pulsar.getWebServiceAddress(), null,
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
index 2b1d83be591..4e15062b1e2 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/SimpleProducerConsumerTest.java
@@ -42,6 +42,7 @@ import io.netty.channel.ChannelDuplexHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.util.Timeout;
import java.io.ByteArrayInputStream;
+import java.io.Closeable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
@@ -5304,4 +5305,61 @@ public class SimpleProducerConsumerTest extends
ProducerConsumerBase {
producer.close();
consumer.close();
}
+
+ @Test(dataProvider = "trueFalse")
+ public void testResourceSharingEndToEnd(boolean usePulsarBinaryProtocol)
throws Exception {
+ log.info("-- Starting {} test --", methodName);
+ String topic =
BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/my-topic");
+ int numberOfClients = 50;
+ List<PulsarClient> clients = new ArrayList<>();
+ @Cleanup
+ Closeable closeClients = () -> {
+ for (PulsarClient client : clients) {
+ try {
+ client.close();
+ } catch (PulsarClientException e) {
+ log.error("Failed to close client {}", client, e);
+ }
+ }
+ };
+ @Cleanup
+ PulsarClientSharedResources sharedResources =
PulsarClientSharedResources.builder().build();
+ List<Consumer<byte[]>> consumers = new ArrayList<>();
+
+ for (int i = 0; i < numberOfClients; i++) {
+ PulsarClient client = PulsarClient.builder()
+ .serviceUrl(brokerServiceUrl(usePulsarBinaryProtocol))
+ .sharedResources(sharedResources)
+ .build();
+ clients.add(client);
+ consumers.add(client.newConsumer().topic(topic)
+ .subscriptionName("my-subscriber-name" + i).subscribe());
+ }
+
+ ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer()
+ .topic(topic);
+
+ Producer<byte[]> producer = producerBuilder.create();
+ for (int i = 0; i < 10; i++) {
+ String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ }
+
+ for (Consumer<byte[]> consumer : consumers) {
+ Message<byte[]> msg = null;
+ Set<String> messageSet = new HashSet<>();
+ for (int i = 0; i < 10; i++) {
+ msg = consumer.receive(RECEIVE_TIMEOUT_SECONDS,
TimeUnit.SECONDS);
+ String receivedMessage = new String(msg.getData());
+ log.debug("Received message: [{}]", receivedMessage);
+ String expectedMessage = "my-message-" + i;
+ testMessageOrderAndDuplicates(messageSet, receivedMessage,
expectedMessage);
+ }
+ // Acknowledge the consumption of all messages at once
+ consumer.acknowledgeCumulative(msg);
+ consumer.close();
+ }
+
+ log.info("-- Exiting {} test --", methodName);
+ }
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
index 27301db9179..db330951f74 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/PulsarTestClient.java
@@ -91,7 +91,7 @@ public class PulsarTestClient extends PulsarClientImpl {
private PulsarTestClient(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup, ConnectionPool cnxPool,
AtomicReference<Supplier<ClientCnx>>
clientCnxSupplierReference)
throws PulsarClientException {
- super(conf, eventLoopGroup, cnxPool);
+ super(conf, eventLoopGroup, cnxPool, null, null, null, null, null, new
DnsResolverGroupImpl(conf));
// workaround initialization order issue so that ClientCnx can be
created in this class
clientCnxSupplierReference.set(this::createClientCnx);
}
diff --git
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
index b48b97978b7..f528c827549 100644
---
a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
+++
b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/TopicDoesNotExistsTest.java
@@ -39,6 +39,8 @@ public class TopicDoesNotExistsTest extends
ProducerConsumerBase {
@Override
@BeforeClass
public void setup() throws Exception {
+ // use Pulsar binary lookup since the HTTP client shares the Pulsar
client timer
+ isTcpLookup = true;
conf.setAllowAutoTopicCreation(false);
super.internalSetup();
super.producerBaseSetup();
@@ -61,7 +63,7 @@ public class TopicDoesNotExistsTest extends
ProducerConsumerBase {
.create();
Assert.fail("Create producer should failed while topic does not
exists.");
} catch (PulsarClientException e) {
- Assert.assertTrue(e instanceof
PulsarClientException.NotFoundException);
+ Assert.assertTrue(e instanceof
PulsarClientException.TopicDoesNotExistException);
}
Thread.sleep(2000);
HashedWheelTimer timer = (HashedWheelTimer) ((PulsarClientImpl)
pulsarClient).timer();
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
index 034c167a9b3..d31d42bbe63 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java
@@ -734,4 +734,18 @@ public interface ClientBuilder extends Serializable,
Cloneable {
* @throws IllegalArgumentException if the length of description exceeds 64
*/
ClientBuilder description(String description);
+
+ /**
+ * Provide a set of shared client resources to be reused by this client.
+ * <p>
+ * Providing a shared resource instance allows PulsarClient instances to
share resources
+ * (such as IO/event loops, timers, executors, DNS resolver/cache) with
other PulsarClient
+ * instances, reducing memory footprint and thread usage when creating
many clients in the same JVM.
+ *
+ * @param sharedResources the shared resources instance created with
{@link PulsarClientSharedResources#builder()}
+ * @return the client builder instance
+ * @see PulsarClientSharedResources
+ * @see PulsarClientSharedResourcesBuilder
+ */
+ ClientBuilder sharedResources(PulsarClientSharedResources sharedResources);
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DnsResolverConfig.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DnsResolverConfig.java
new file mode 100644
index 00000000000..09c8f5fccfc
--- /dev/null
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/DnsResolverConfig.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.net.InetSocketAddress;
+
+/**
+ * Configuration interface for DNS resolver settings.
+ */
+public interface DnsResolverConfig {
+ /**
+ * Sets the minimum TTL for DNS records.
+ * Defaults to 0.
+ *
+ * @param minTtl the minimum TTL value
+ * @return the DNS resolver configuration instance for chained calls
+ */
+ DnsResolverConfig minTtl(int minTtl);
+
+ /**
+ * Sets the maximum TTL for DNS records.
+ * Defaults to JDK level setting.
+ *
+ * @param maxTtl the maximum TTL value
+ * @return the DNS resolver configuration instance for chained calls
+ */
+ DnsResolverConfig maxTtl(int maxTtl);
+
+ /**
+ * Sets the TTL for negative DNS responses.
+ * Defaults to JDK level setting.
+ *
+ * @param negativeTtl the negative TTL value
+ * @return the DNS resolver configuration instance for chained calls
+ */
+ DnsResolverConfig negativeTtl(int negativeTtl);
+
+ /**
+ * Sets the query timeout in milliseconds.
+ * Defaults to the OS level setting.
+ *
+ * @param queryTimeoutMillis the timeout value in milliseconds
+ * @return the DNS resolver configuration instance for chained calls
+ */
+ DnsResolverConfig queryTimeoutMillis(long queryTimeoutMillis);
+
+ /**
+ * Enables or disables TCP fallback for DNS queries.
+ * Defaults to true.
+ *
+ * @param tcpFallbackEnabled true to enable TCP fallback, false to disable
+ * @return the DNS resolver configuration instance for chained calls
+ */
+ DnsResolverConfig tcpFallbackEnabled(boolean tcpFallbackEnabled);
+
+ /**
+ * Enables or disables TCP fallback on timeout for DNS queries.
+ * Defaults to true.
+ *
+ * @param tcpFallbackOnTimeoutEnabled true to enable TCP fallback on
timeout, false to disable
+ * @return the DNS resolver configuration instance for chained calls
+ */
+ DnsResolverConfig tcpFallbackOnTimeoutEnabled(boolean
tcpFallbackOnTimeoutEnabled);
+
+ /**
+ * Sets the ndots value for DNS resolution. Set this to 1 to disable the
use of DNS search domains.
+ * Defaults to the OS level configuration.
+ *
+ * @param ndots the ndots value
+ * @return the DNS resolver configuration instance for chained calls
+ */
+ DnsResolverConfig ndots(int ndots);
+
+ /**
+ * Sets the search domains for DNS resolution.
+ * Defaults to the OS level configuration.
+ *
+ * @param searchDomains collection of search domains
+ * @return the DNS resolver configuration instance for chained calls
+ */
+ DnsResolverConfig searchDomains(Iterable<String> searchDomains);
+
+ /**
+ * Sets the local bind address and port for DNS lookup client.
+ * By default any available port is used. This setting is mainly in cases
where it's necessary to bind
+ * the DNS client to a specific address and optionally also the port.
+ *
+ * @param localAddress the socket address (address and port) to bind the
DNS client to
+ * @return the DNS resolver configuration instance for chained calls
+ */
+ DnsResolverConfig localAddress(InetSocketAddress localAddress);
+
+ /**
+ * Sets the DNS server addresses to be used for DNS lookups.
+ * Defaults to the OS level configuration.
+ *
+ * @param addresses collection of DNS server socket addresses (address and
port)
+ * @return the DNS resolver configuration instance for chained calls
+ */
+ DnsResolverConfig serverAddresses(Iterable<InetSocketAddress> addresses);
+
+ /**
+ * Enable detailed trace information for resolution failure exception
messages.
+ * Defaults to true.
+ *
+ * @param traceEnabled true to enable tracing, false to disable
+ * @return the DNS resolver configuration instance for chained calls
+ */
+ DnsResolverConfig traceEnabled(boolean traceEnabled);
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/EventLoopGroupConfig.java
similarity index 60%
copy from
pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
copy to
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/EventLoopGroupConfig.java
index 3ebd41ebe56..84b8b78ee41 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/EventLoopGroupConfig.java
@@ -16,21 +16,17 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.client.util;
+package org.apache.pulsar.client.api;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import lombok.extern.slf4j.Slf4j;
-
-@Slf4j
-public class ScheduledExecutorProvider extends ExecutorProvider {
-
- public ScheduledExecutorProvider(int numThreads, String poolName) {
- super(numThreads, poolName);
- }
-
- @Override
- protected ExecutorService createExecutor(ExtendedThreadFactory
threadFactory) {
- return Executors.newSingleThreadScheduledExecutor(threadFactory);
- }
+/**
+ * Configuration interface for event loop group settings.
+ */
+public interface EventLoopGroupConfig extends
ThreadPoolConfig<EventLoopGroupConfig> {
+ /**
+ * Enables or disables busy-wait polling mode.
+ *
+ * @param enableBusyWait true to enable busy-wait polling, false to disable
+ * @return this config instance for method chaining
+ */
+ EventLoopGroupConfig enableBusyWait(boolean enableBusyWait);
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResources.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResources.java
new file mode 100644
index 00000000000..34a56ebd608
--- /dev/null
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResources.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import java.util.Set;
+import org.apache.pulsar.client.internal.DefaultImplementation;
+
+/**
+ * Manages shared resources across multiple PulsarClient instances to optimize
resource utilization.
+ * This interface provides access to common resources such as thread pools and
DNS resolver/cache
+ * that can be shared between multiple PulsarClient instances.
+ *
+ * This allows creating a large number of PulsarClient instances in a single
JVM without wasting resources.
+ * Sharing the DNS resolver and cache will help reduce the number of DNS
lookups performed by the clients
+ * and improve latency and reduce load on the DNS servers.
+ *
+ * <p>Key features:
+ * <ul>
+ * <li>Thread pool sharing for various Pulsar operations (IO, timer, lookup,
etc.)</li>
+ * <li>Shared DNS resolver and cache</li>
+ * <li>Resource lifecycle management</li>
+ * </ul>
+ *
+ * <p>Usage example:
+ * <pre>{@code
+ * // To share all possible resources across multiple PulsarClient instances
+ * PulsarClientSharedResources sharedResources =
PulsarClientSharedResources.builder()
+ * .build();
+ * // Use these resources with multiple PulsarClient instances
+ * PulsarClient client1 =
PulsarClient.builder().sharedResources(sharedResources).build();
+ * PulsarClient client2 =
PulsarClient.builder().sharedResources(sharedResources).build();
+ * client1.close();
+ * client2.close();
+ * sharedResources.close();
+ * }</pre>
+ *
+ * <p>The instance should be created using the {@link
PulsarClientSharedResourcesBuilder},
+ * which can be obtained via the {@link #builder()} method.
+ *
+ * @see PulsarClientSharedResourcesBuilder
+ * @see SharedResource
+ */
+public interface PulsarClientSharedResources extends AutoCloseable {
+ enum SharedResource {
+ // pulsar-io threadpool
+ EventLoopGroup(SharedResourceType.EventLoopGroup),
+ // pulsar-external-listener threadpool
+ ListenerExecutor(SharedResourceType.ThreadPool),
+ // pulsar-timer threadpool
+ Timer(SharedResourceType.Timer),
+ // pulsar-client-internal threadpool
+ InternalExecutor(SharedResourceType.ThreadPool),
+ // pulsar-client-scheduled threadpool
+ ScheduledExecutor(SharedResourceType.ThreadPool),
+ // pulsar-lookup threadpool
+ LookupExecutor(SharedResourceType.ThreadPool),
+ // DNS resolver and cache that must be shared together with
eventLoopGroup
+ DnsResolver(SharedResourceType.DnsResolver);
+
+ private final SharedResourceType type;
+
+ SharedResource(SharedResourceType type) {
+ this.type = type;
+ }
+
+ public SharedResourceType getType() {
+ return type;
+ }
+ }
+
+ enum SharedResourceType {
+ EventLoopGroup,
+ ThreadPool,
+ Timer,
+ DnsResolver;
+ }
+
+ /**
+ * Checks if a resource type is contained in the shared resources instance.
+ *
+ * @param sharedResource the type of resource to check
+ * @return true if the resource type is contained in this instance, false
otherwise
+ */
+ boolean contains(SharedResource sharedResource);
+
+ /**
+ * Gets all resource types contained in this shared resources instance.
+ *
+ * @return set of resource types available in this instance
+ */
+ Set<SharedResource> getSharedResources();
+
+ /**
+ * Creates a new builder for constructing instances of {@link
PulsarClientSharedResources}.
+ *
+ * @return a new {@link PulsarClientSharedResourcesBuilder} which can be
used to configure
+ * and build shared resources for use across multiple PulsarClient
instances.
+ */
+ static PulsarClientSharedResourcesBuilder builder() {
+ return
DefaultImplementation.getDefaultImplementation().newSharedResourcesBuilder();
+ }
+
+ /**
+ * Closes this instance of PulsarClientSharedResources and releases all
associated resources.
+ * All PulsarClient instances using these shared resources must be closed
before calling this method.
+ *
+ * @throws PulsarClientException if there is an error while closing the
shared resources
+ */
+ @Override
+ void close() throws PulsarClientException;
+}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResourcesBuilder.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResourcesBuilder.java
new file mode 100644
index 00000000000..3a94d5352b5
--- /dev/null
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/PulsarClientSharedResourcesBuilder.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static
org.apache.pulsar.client.api.PulsarClientSharedResources.SharedResource;
+import java.util.Collection;
+import java.util.function.Consumer;
+
+/**
+ * Builder for configuring and creating {@link PulsarClientSharedResources}.
+ * <p>
+ * Shared resources allow multiple PulsarClient instances to reuse common
components
+ * (for example executors, timers, DNS resolver, event loop, connection pool),
+ * reducing memory footprint and thread count when many clients are created in
the same JVM.
+ * This allows creating a large number of PulsarClient instances in a single
JVM without wasting resources.
+ * Sharing the DNS resolver and cache will help reduce the number of DNS
lookups performed by the clients
+ * and improve latency and reduce load on the DNS servers.
+ * <p>
+ * Typical usage:
+ * <pre>{@code
+ * PulsarClientSharedResources shared = PulsarClientSharedResources
+ * .builder()
+ * // shared resources can be configured using a ClientBuilder
+ * .configureResources(PulsarClient.builder().ioThreads(4))
+ * .build();
+ *
+ * PulsarClient client = PulsarClient.builder()
+ * .sharedResources(shared)
+ * .serviceUrl("pulsar://localhost:6650")
+ * .build();
+ *
+ * client.close();
+ * // it's necessary to close the shared resources after usage
+ * shared.close();
+ * }</pre>
+ */
+public interface PulsarClientSharedResourcesBuilder {
+ /**
+ * Optionally limits the shared resource types to be created and exposed
by the resulting
+ * {@link PulsarClientSharedResources}. If this method isn't called,
resources for all resource types
+ * will be created and exposed.
+ * Calling this method is additive and the builder will accumulate the
shared resource types from all method calls.
+ *
+ * @param sharedResource one or more resource types to include
+ * @return this builder
+ */
+ PulsarClientSharedResourcesBuilder resourceTypes(SharedResource...
sharedResource);
+
+ /**
+ * @param sharedResource one or more resource types to include
+ * @return this builder
+ * @see {@link #resourceTypes(SharedResource...)}
+ */
+ PulsarClientSharedResourcesBuilder
resourceTypes(Collection<SharedResource> sharedResource);
+
+ /**
+ * Share only the configured resources. It's not allowed to use {@link
#resourceTypes(SharedResource...)} when
+ * this method is called.
+ * @return this builder instance for method chaining
+ */
+ PulsarClientSharedResourcesBuilder shareConfigured();
+
+ /**
+ * Builds the {@link PulsarClientSharedResources} instance. It is
necessary to call
+ * {@link PulsarClientSharedResources#close()} on the instance to release
resources. All clients must be closed
+ * before closing the shared resources.
+ *
+ * @return the shared resources to be used in {@link
ClientBuilder#sharedResources(PulsarClientSharedResources)}
+ */
+ PulsarClientSharedResources build();
+
+ /**
+ * Configures a thread pool for the specified shared resource type.
+ *
+ * @param sharedResource the type of shared resource to configure
+ * @param configurer a consumer that configures the thread pool
settings
+ * @return this builder instance for method chaining
+ */
+ PulsarClientSharedResourcesBuilder configureThreadPool(SharedResource
sharedResource,
+
Consumer<ThreadPoolConfig<?>> configurer);
+
+ /**
+ * Configures the event loop group settings.
+ *
+ * @param configurer a consumer that configures the event loop group
settings
+ * @return this builder instance for method chaining
+ */
+ PulsarClientSharedResourcesBuilder
configureEventLoop(Consumer<EventLoopGroupConfig> configurer);
+
+ /**
+ * Configures the DNS resolver settings.
+ *
+ * @param configurer a consumer that configures the DNS resolver settings
+ * @return this builder instance for method chaining
+ */
+ PulsarClientSharedResourcesBuilder
configureDnsResolver(Consumer<DnsResolverConfig> configurer);
+
+ /**
+ * Configures the timer settings.
+ *
+ * @param configurer a consumer that configures the timer settings
+ * @return this builder instance for method chaining
+ */
+ PulsarClientSharedResourcesBuilder configureTimer(Consumer<TimerConfig>
configurer);
+
+}
\ No newline at end of file
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ThreadPoolConfig.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ThreadPoolConfig.java
new file mode 100644
index 00000000000..dd57a6cebfe
--- /dev/null
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ThreadPoolConfig.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+/**
+ * Configuration interface for thread pool settings.
+ */
+public interface ThreadPoolConfig<T extends ThreadPoolConfig<T>> {
+ /**
+ * Sets the name of the thread pool.
+ *
+ * @param name the name to set for the thread pool
+ * @return this config instance for method chaining
+ */
+ T name(String name);
+
+ /**
+ * Sets the number of threads in the thread pool.
+ *
+ * @param numberOfThreads the number of threads to use
+ * @return this config instance for method chaining
+ */
+ T numberOfThreads(int numberOfThreads);
+
+ /**
+ * Sets whether the threads should be daemon threads.
+ *
+ * @param daemon true if the threads should be daemon threads, false
otherwise
+ * @return this config instance for method chaining
+ */
+ T daemon(boolean daemon);
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TimerConfig.java
similarity index 54%
copy from
pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
copy to
pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TimerConfig.java
index 3ebd41ebe56..1c1235704df 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/TimerConfig.java
@@ -16,21 +16,28 @@
* specific language governing permissions and limitations
* under the License.
*/
-package org.apache.pulsar.client.util;
+package org.apache.pulsar.client.api;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import lombok.extern.slf4j.Slf4j;
+import java.util.concurrent.TimeUnit;
-@Slf4j
-public class ScheduledExecutorProvider extends ExecutorProvider {
-
- public ScheduledExecutorProvider(int numThreads, String poolName) {
- super(numThreads, poolName);
- }
+/**
+ * Configuration interface for timer settings.
+ */
+public interface TimerConfig {
+ /**
+ * Sets the name of the timer.
+ *
+ * @param name the name to set for the timer
+ * @return this config instance for method chaining
+ */
+ TimerConfig name(String name);
- @Override
- protected ExecutorService createExecutor(ExtendedThreadFactory
threadFactory) {
- return Executors.newSingleThreadScheduledExecutor(threadFactory);
- }
+ /**
+ * Sets the tick duration for the timer.
+ *
+ * @param tickDuration the duration of each tick
+ * @param timeUnit the time unit for the tick duration
+ * @return this config instance for method chaining
+ */
+ TimerConfig tickDuration(long tickDuration, TimeUnit timeUnit);
}
diff --git
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
index 8fd05bff265..b5f2a3a468e 100644
---
a/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
+++
b/pulsar-client-api/src/main/java/org/apache/pulsar/client/internal/PulsarClientImplementationBinding.java
@@ -36,6 +36,7 @@ import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessagePayloadFactory;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientSharedResourcesBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.api.schema.GenericRecord;
@@ -255,4 +256,6 @@ public interface PulsarClientImplementationBinding {
Map<String, String> propertiesValue);
TopicMessageId newTopicMessageId(String topic, MessageId messageId);
+
+ PulsarClientSharedResourcesBuilder newSharedResourcesBuilder();
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
index 844d1e2d253..7615ec9f810 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/AutoClusterFailover.java
@@ -20,6 +20,7 @@ package org.apache.pulsar.client.impl;
import static
org.apache.pulsar.common.util.Runnables.catchingAndLoggingThrowables;
import com.google.common.base.Strings;
+import io.netty.resolver.AddressResolver;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
@@ -63,6 +64,7 @@ public class AutoClusterFailover implements
ServiceUrlProvider {
private final long intervalMs;
private static final int TIMEOUT = 30_000;
private final PulsarServiceNameResolver resolver;
+ private AddressResolver<InetSocketAddress> addressResolver;
private AutoClusterFailover(AutoClusterFailoverBuilderImpl builder) {
this.primary = builder.primary;
@@ -86,6 +88,7 @@ public class AutoClusterFailover implements
ServiceUrlProvider {
@Override
public void initialize(PulsarClient client) {
this.pulsarClient = (PulsarClientImpl) client;
+ this.addressResolver = pulsarClient.getAddressResolver();
ClientConfigurationData config = pulsarClient.getConfiguration();
if (config != null) {
this.primaryAuthentication = config.getAuthentication();
@@ -128,8 +131,14 @@ public class AutoClusterFailover implements
ServiceUrlProvider {
try {
resolver.updateServiceUrl(url);
InetSocketAddress endpoint = resolver.resolveHost();
+ if (!endpoint.isUnresolved()) {
+ // create an unresolved endpoint to ensure that DNS lookup is
performed again
+ endpoint =
InetSocketAddress.createUnresolved(endpoint.getHostString(),
endpoint.getPort());
+ }
+ // Use Netty's DNS resolver and settings to resolve the host name
+ InetSocketAddress resolvedEndpoint =
addressResolver.resolve(endpoint).get(TIMEOUT, TimeUnit.MILLISECONDS);
try (Socket socket = new Socket()) {
- socket.connect(new InetSocketAddress(endpoint.getHostName(),
endpoint.getPort()), TIMEOUT);
+ socket.connect(resolvedEndpoint, TIMEOUT);
}
return true;
} catch (Exception e) {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
index d031c8e1710..5b3a52d5e42 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java
@@ -34,6 +34,7 @@ import org.apache.pulsar.client.api.ProxyProtocol;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import
org.apache.pulsar.client.api.PulsarClientException.UnsupportedAuthenticationException;
+import org.apache.pulsar.client.api.PulsarClientSharedResources;
import org.apache.pulsar.client.api.ServiceUrlProvider;
import org.apache.pulsar.client.api.SizeUnit;
import org.apache.pulsar.client.impl.auth.AuthenticationDisabled;
@@ -43,7 +44,9 @@ import org.apache.pulsar.common.tls.InetAddressUtils;
import org.apache.pulsar.common.util.DefaultPulsarSslFactory;
public class ClientBuilderImpl implements ClientBuilder {
+ private static final long serialVersionUID = 1L;
ClientConfigurationData conf;
+ private transient PulsarClientSharedResourcesImpl sharedResources;
public ClientBuilderImpl() {
this(new ClientConfigurationData());
@@ -68,12 +71,19 @@ public class ClientBuilderImpl implements ClientBuilder {
if (conf.getAuthentication() == null || conf.getAuthentication() ==
AuthenticationDisabled.INSTANCE) {
setAuthenticationFromPropsIfAvailable(conf);
}
- return new PulsarClientImpl(conf);
+ PulsarClientImpl.PulsarClientImplBuilder instanceBuilder =
PulsarClientImpl.builder();
+ instanceBuilder.conf(conf);
+ if (sharedResources != null) {
+ sharedResources.applyTo(instanceBuilder);
+ }
+ return instanceBuilder.build();
}
@Override
public ClientBuilder clone() {
- return new ClientBuilderImpl(conf.clone());
+ ClientBuilderImpl clientBuilder = new ClientBuilderImpl(conf.clone());
+ clientBuilder.sharedResources = sharedResources;
+ return clientBuilder;
}
@Override
@@ -486,6 +496,12 @@ public class ClientBuilderImpl implements ClientBuilder {
return this;
}
+ @Override
+ public ClientBuilder sharedResources(PulsarClientSharedResources
sharedResources) {
+ this.sharedResources = (PulsarClientSharedResourcesImpl)
sharedResources;
+ return this;
+ }
+
@Override
public ClientBuilder lookupProperties(Map<String, String> properties) {
conf.setLookupProperties(properties);
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
index f8412b5bf29..fea599ea408 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java
@@ -191,7 +191,7 @@ public class ConnectionPool implements AutoCloseable {
private Supplier<AddressResolver<InetSocketAddress>>
createAddressResolver(ClientConfigurationData conf,
EventLoopGroup eventLoopGroup) {
if (dnsResolverGroup == null) {
- dnsResolverGroup = new DnsResolverGroupImpl(eventLoopGroup, conf);
+ dnsResolverGroup = new DnsResolverGroupImpl(conf);
}
return () -> dnsResolverGroup.createAddressResolver(eventLoopGroup);
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
index c52a70c6cd7..032069786a4 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ControlledClusterFailover.java
@@ -27,6 +27,8 @@ import io.netty.handler.codec.http.HttpResponse;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.charset.StandardCharsets;
+import java.util.Collections;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
@@ -59,27 +61,27 @@ public class ControlledClusterFailover implements
ServiceUrlProvider {
private static final int DEFAULT_CONNECT_TIMEOUT_IN_SECONDS = 10;
private static final int DEFAULT_READ_TIMEOUT_IN_SECONDS = 30;
private static final int DEFAULT_MAX_REDIRECTS = 20;
+ private final Map<String, String> headers;
+ private final String urlProvider;
private PulsarClientImpl pulsarClient;
private volatile String currentPulsarServiceUrl;
private volatile ControlledConfiguration currentControlledConfiguration;
private final ScheduledExecutorService executor;
private final long interval;
- private final AsyncHttpClient httpClient;
- private final BoundRequestBuilder requestBuilder;
+ private AsyncHttpClient httpClient;
+ private BoundRequestBuilder requestBuilder;
private ControlledClusterFailover(ControlledClusterFailoverBuilderImpl
builder) throws IOException {
this.currentPulsarServiceUrl = builder.defaultServiceUrl;
this.interval = builder.interval;
this.executor = Executors.newSingleThreadScheduledExecutor(
new
ExecutorProvider.ExtendedThreadFactory("pulsar-service-provider"));
-
- this.httpClient = buildHttpClient();
- this.requestBuilder = httpClient.prepareGet(builder.urlProvider)
- .addHeader("Accept", "application/json");
-
+ urlProvider = builder.urlProvider;
if (builder.header != null && !builder.header.isEmpty()) {
- builder.header.forEach(requestBuilder::addHeader);
+ headers = new LinkedHashMap<>(builder.header);
+ } else {
+ headers = Collections.emptyMap();
}
}
@@ -101,6 +103,8 @@ public class ControlledClusterFailover implements
ServiceUrlProvider {
&& super.keepAlive(remoteAddress, ahcRequest, request,
response);
}
});
+ confBuilder.setNettyTimer(pulsarClient.timer());
+ confBuilder.setEventLoopGroup(pulsarClient.eventLoopGroup());
AsyncHttpClientConfig config = confBuilder.build();
return new DefaultAsyncHttpClient(config);
}
@@ -108,6 +112,12 @@ public class ControlledClusterFailover implements
ServiceUrlProvider {
@Override
public void initialize(PulsarClient client) {
this.pulsarClient = (PulsarClientImpl) client;
+ this.httpClient = buildHttpClient();
+ this.requestBuilder = httpClient.prepareGet(urlProvider)
+ // share Pulsar client DNS resolver and cache
+ .setNameResolver(pulsarClient.getNameResolver())
+ .addHeader("Accept", "application/json");
+ headers.forEach(requestBuilder::addHeader);
// start to check service url every 30 seconds
this.executor.scheduleAtFixedRate(catchingAndLoggingThrowables(() -> {
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DnsResolverGroupImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DnsResolverGroupImpl.java
index 61af7968f81..0702e6324ba 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DnsResolverGroupImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/DnsResolverGroupImpl.java
@@ -41,36 +41,68 @@ import org.apache.pulsar.common.util.netty.EventLoopUtil;
public class DnsResolverGroupImpl implements AutoCloseable {
private final DnsAddressResolverGroup dnsAddressResolverGroup;
- public DnsResolverGroupImpl(EventLoopGroup eventLoopGroup,
ClientConfigurationData conf) {
+ public DnsResolverGroupImpl(ClientConfigurationData conf) {
Optional<InetSocketAddress> bindAddress =
Optional.ofNullable(conf.getDnsLookupBindAddress())
.map(addr -> new InetSocketAddress(addr,
conf.getDnsLookupBindPort()));
Optional<DnsServerAddressStreamProvider> dnsServerAddresses =
Optional.ofNullable(conf.getDnsServerAddresses())
.filter(Predicate.not(List::isEmpty))
.map(SequentialDnsServerAddressStreamProvider::new);
- this.dnsAddressResolverGroup =
createAddressResolverGroup(eventLoopGroup, bindAddress, dnsServerAddresses);
+ this.dnsAddressResolverGroup = createAddressResolverGroup(bindAddress,
dnsServerAddresses);
}
- public DnsResolverGroupImpl(EventLoopGroup eventLoopGroup,
Optional<InetSocketAddress> bindAddress,
- Optional<DnsServerAddressStreamProvider>
dnsServerAddresses) {
- this.dnsAddressResolverGroup =
createAddressResolverGroup(eventLoopGroup, bindAddress, dnsServerAddresses);
+ public
DnsResolverGroupImpl(PulsarClientSharedResourcesBuilderImpl.DnsResolverResourceConfig
dnsConfig) {
+ this.dnsAddressResolverGroup = createAddressResolverGroup(dnsConfig);
}
- private static DnsAddressResolverGroup
createAddressResolverGroup(EventLoopGroup eventLoopGroup,
-
Optional<InetSocketAddress> bindAddress,
+ private DnsAddressResolverGroup createAddressResolverGroup(
+ PulsarClientSharedResourcesBuilderImpl.DnsResolverResourceConfig
dnsConfig) {
+ DnsNameResolverBuilder dnsNameResolverBuilder = new
DnsNameResolverBuilder()
+ .traceEnabled(dnsConfig.traceEnabled)
+ .channelType(EventLoopUtil.getDatagramChannelClass());
+ if (dnsConfig.tcpFallbackEnabled ||
dnsConfig.tcpFallbackOnTimeoutEnabled) {
+
dnsNameResolverBuilder.socketChannelType(EventLoopUtil.getClientSocketChannelClass(),
+ dnsConfig.tcpFallbackOnTimeoutEnabled);
+ }
+ dnsNameResolverBuilder
+ .ttl(dnsConfig.minTtl, dnsConfig.maxTtl)
+ .negativeTtl(dnsConfig.negativeTtl);
+ if (dnsConfig.queryTimeoutMillis > -1L) {
+
dnsNameResolverBuilder.queryTimeoutMillis(dnsConfig.queryTimeoutMillis);
+ }
+ if (dnsConfig.ndots > -1) {
+ dnsNameResolverBuilder.ndots(dnsConfig.ndots);
+ }
+ if (dnsConfig.localAddress != null) {
+ dnsNameResolverBuilder.localAddress(dnsConfig.localAddress);
+ }
+ if (dnsConfig.serverAddresses != null) {
+ Optional.ofNullable(dnsConfig.serverAddresses)
+ .map(SequentialDnsServerAddressStreamProvider::new)
+ .ifPresent(dnsServerAddressStreamProvider -> {
+
dnsNameResolverBuilder.nameServerProvider(dnsServerAddressStreamProvider);
+ });
+ }
+ if (dnsConfig.searchDomains != null) {
+ dnsNameResolverBuilder.searchDomains(dnsConfig.searchDomains);
+ }
+ return new DnsAddressResolverGroup(dnsNameResolverBuilder);
+ }
+
+ private static DnsAddressResolverGroup
createAddressResolverGroup(Optional<InetSocketAddress> bindAddress,
Optional<DnsServerAddressStreamProvider>
dnsServerAddresses) {
- DnsNameResolverBuilder dnsNameResolverBuilder =
createDnsNameResolverBuilder(eventLoopGroup);
+ DnsNameResolverBuilder dnsNameResolverBuilder =
createDnsNameResolverBuilder();
bindAddress.ifPresent(dnsNameResolverBuilder::localAddress);
dnsServerAddresses.ifPresent(dnsNameResolverBuilder::nameServerProvider);
return new DnsAddressResolverGroup(dnsNameResolverBuilder);
}
- private static DnsNameResolverBuilder
createDnsNameResolverBuilder(EventLoopGroup eventLoopGroup) {
+ private static DnsNameResolverBuilder createDnsNameResolverBuilder() {
DnsNameResolverBuilder dnsNameResolverBuilder = new
DnsNameResolverBuilder()
.traceEnabled(true)
-
.channelType(EventLoopUtil.getDatagramChannelClass(eventLoopGroup))
-
.socketChannelType(EventLoopUtil.getClientSocketChannelClass(eventLoopGroup),
true);
+ .channelType(EventLoopUtil.getDatagramChannelClass())
+
.socketChannelType(EventLoopUtil.getClientSocketChannelClass(), true);
DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder);
return dnsNameResolverBuilder;
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
index 633ca606388..72fcf82b859 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpClient.java
@@ -21,9 +21,12 @@ package org.apache.pulsar.client.impl;
import io.netty.channel.EventLoopGroup;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponse;
+import io.netty.resolver.NameResolver;
+import io.netty.util.Timer;
import java.io.Closeable;
import java.io.IOException;
import java.net.HttpURLConnection;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
@@ -65,16 +68,20 @@ public class HttpClient implements Closeable {
protected final AsyncHttpClient httpClient;
protected final ServiceNameResolver serviceNameResolver;
+ private final NameResolver<InetAddress> nameResolver;
protected final Authentication authentication;
protected final ClientConfigurationData clientConf;
protected ScheduledExecutorService executorService;
protected PulsarSslFactory sslFactory;
- protected HttpClient(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup) throws PulsarClientException {
+ protected HttpClient(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup, Timer timer,
+ NameResolver<InetAddress> nameResolver)
+ throws PulsarClientException {
this.authentication = conf.getAuthentication();
this.clientConf = conf;
this.serviceNameResolver = new
PulsarServiceNameResolver(conf.getServiceUrlQuarantineInitDurationMs(),
conf.getServiceUrlQuarantineMaxDurationMs());
+ this.nameResolver = nameResolver;
this.serviceNameResolver.updateServiceUrl(conf.getServiceUrl());
DefaultAsyncHttpClientConfig.Builder confBuilder = new
DefaultAsyncHttpClientConfig.Builder();
@@ -128,6 +135,7 @@ public class HttpClient implements Closeable {
}
}
confBuilder.setEventLoopGroup(eventLoopGroup);
+ confBuilder.setNettyTimer(timer);
AsyncHttpClientConfig config = confBuilder.build();
httpClient = new DefaultAsyncHttpClient(config);
@@ -184,7 +192,9 @@ public class HttpClient implements Closeable {
// auth complete, use a new builder
BoundRequestBuilder builder = httpClient.prepareGet(requestUrl)
- .setHeader("Accept", "application/json");
+ // share the DNS resolver and cache with Pulsar client
+ .setNameResolver(nameResolver)
+ .setHeader("Accept", "application/json");
if (authData.hasDataForHttp()) {
Set<Entry<String, String>> headers;
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
index 4a5557fa869..08c9956b5be 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/HttpLookupService.java
@@ -19,7 +19,10 @@
package org.apache.pulsar.client.impl;
import io.netty.channel.EventLoopGroup;
+import io.netty.resolver.NameResolver;
+import io.netty.util.Timer;
import io.opentelemetry.api.common.Attributes;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.URI;
import java.nio.ByteBuffer;
@@ -65,10 +68,17 @@ public class HttpLookupService implements LookupService {
private final LatencyHistogram histoGetSchema;
private final LatencyHistogram histoListTopics;
+ @Deprecated
public HttpLookupService(InstrumentProvider instrumentProvider,
ClientConfigurationData conf,
- EventLoopGroup eventLoopGroup)
+ EventLoopGroup eventLoopGroup) throws
PulsarClientException {
+ this(instrumentProvider, conf, eventLoopGroup, null, null);
+ }
+
+ public HttpLookupService(InstrumentProvider instrumentProvider,
ClientConfigurationData conf,
+ EventLoopGroup eventLoopGroup, Timer timer,
+ NameResolver<InetAddress> nameResolver)
throws PulsarClientException {
- this.httpClient = new HttpClient(conf, eventLoopGroup);
+ this.httpClient = new HttpClient(conf, eventLoopGroup, timer,
nameResolver);
this.useTls = conf.isUseTls();
this.listenerName = conf.getListenerName();
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
index d38d35926a4..6ffdfa55a9b 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImpl.java
@@ -25,9 +25,10 @@ import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.netty.channel.EventLoopGroup;
import io.netty.resolver.AddressResolver;
-import io.netty.util.HashedWheelTimer;
+import io.netty.resolver.NameResolver;
import io.netty.util.Timer;
import java.io.IOException;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.time.Clock;
import java.time.Duration;
@@ -44,7 +45,6 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -97,7 +97,7 @@ import org.apache.pulsar.common.topics.TopicsPatternFactory;
import org.apache.pulsar.common.util.Backoff;
import org.apache.pulsar.common.util.BackoffBuilder;
import org.apache.pulsar.common.util.FutureUtil;
-import org.apache.pulsar.common.util.netty.EventLoopUtil;
+import org.apache.pulsar.common.util.netty.DnsResolverUtil;
import org.jspecify.annotations.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -131,6 +131,7 @@ public class PulsarClientImpl implements PulsarClient {
private final boolean createdEventLoopGroup;
private final boolean createdCnxPool;
private final DnsResolverGroupImpl dnsResolverGroupLocalInstance;
+ @Getter
private final AddressResolver<InetSocketAddress> addressResolver;
public enum State {
@@ -199,12 +200,12 @@ public class PulsarClientImpl implements PulsarClient {
}
@Builder(builderClassName = "PulsarClientImplBuilder")
- private PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup, ConnectionPool connectionPool,
- Timer timer, ExecutorProvider
externalExecutorProvider,
- ExecutorProvider internalExecutorProvider,
- ScheduledExecutorProvider
scheduledExecutorProvider,
- ExecutorProvider lookupExecutorProvider,
- DnsResolverGroupImpl dnsResolverGroup) throws
PulsarClientException {
+ PulsarClientImpl(ClientConfigurationData conf, EventLoopGroup
eventLoopGroup, ConnectionPool connectionPool,
+ Timer timer, ExecutorProvider externalExecutorProvider,
+ ExecutorProvider internalExecutorProvider,
+ ScheduledExecutorProvider scheduledExecutorProvider,
+ ExecutorProvider lookupExecutorProvider,
+ DnsResolverGroupImpl dnsResolverGroup) throws
PulsarClientException {
EventLoopGroup eventLoopGroupReference = null;
ConnectionPool connectionPoolReference = null;
@@ -222,22 +223,26 @@ public class PulsarClientImpl implements PulsarClient {
this.createdExecutorProviders = externalExecutorProvider == null;
this.createdScheduledProviders = scheduledExecutorProvider == null;
this.createdLookupProviders = lookupExecutorProvider == null;
- eventLoopGroupReference = eventLoopGroup != null ? eventLoopGroup
: getEventLoopGroup(conf);
+ eventLoopGroupReference = eventLoopGroup != null ? eventLoopGroup :
+ PulsarClientResourcesConfigurer.createEventLoopGroup(conf);
this.eventLoopGroup = eventLoopGroupReference;
this.instrumentProvider = new
InstrumentProvider(conf.getOpenTelemetry());
clientClock = conf.getClock();
conf.getAuthentication().start();
this.scheduledExecutorProvider = scheduledExecutorProvider != null
? scheduledExecutorProvider :
- new ScheduledExecutorProvider(conf.getNumIoThreads(),
"pulsar-client-scheduled");
+
PulsarClientResourcesConfigurer.createScheduledExecutorProvider(conf);
if (connectionPool != null) {
connectionPoolReference = connectionPool;
dnsResolverGroupLocalInstance = null;
- addressResolver = null;
+ addressResolver = dnsResolverGroup != null
+ ?
dnsResolverGroup.createAddressResolver(eventLoopGroupReference) : null;
} else {
DnsResolverGroupImpl dnsResolverGroupReference;
if (dnsResolverGroup == null) {
dnsResolverGroupReference =
- dnsResolverGroupLocalInstance = new
DnsResolverGroupImpl(eventLoopGroupReference, conf);
+ dnsResolverGroupLocalInstance =
+
PulsarClientResourcesConfigurer.createDnsResolverGroup(conf
+ );
} else {
dnsResolverGroupReference = dnsResolverGroup;
dnsResolverGroupLocalInstance = null;
@@ -254,24 +259,25 @@ public class PulsarClientImpl implements PulsarClient {
}
this.cnxPool = connectionPoolReference;
this.externalExecutorProvider = externalExecutorProvider != null ?
externalExecutorProvider :
- new ExecutorProvider(conf.getNumListenerThreads(),
"pulsar-external-listener");
+
PulsarClientResourcesConfigurer.createExternalExecutorProvider(conf);
this.internalExecutorProvider = internalExecutorProvider != null ?
internalExecutorProvider :
- new ExecutorProvider(conf.getNumIoThreads(),
"pulsar-client-internal");
+
PulsarClientResourcesConfigurer.createInternalExecutorProvider(conf);
this.lookupExecutorProvider = lookupExecutorProvider != null ?
lookupExecutorProvider :
- new ExecutorProvider(1, "pulsar-client-lookup");
+
PulsarClientResourcesConfigurer.createLookupExecutorProvider();
+ if (timer == null) {
+ this.timer = PulsarClientResourcesConfigurer.createTimer();
+ needStopTimer = true;
+ } else {
+ this.timer = timer;
+ }
if (conf.getServiceUrl().startsWith("http")) {
- lookup = new HttpLookupService(instrumentProvider, conf,
this.eventLoopGroup);
+ lookup = new HttpLookupService(instrumentProvider, conf,
this.eventLoopGroup, this.timer,
+ getNameResolver());
} else {
lookup = new BinaryProtoLookupService(this,
conf.getServiceUrl(), conf.getListenerName(),
conf.isUseTls(),
this.scheduledExecutorProvider.getExecutor(),
this.lookupExecutorProvider.getExecutor());
}
- if (timer == null) {
- this.timer = new
HashedWheelTimer(getThreadFactory("pulsar-timer"), 1, TimeUnit.MILLISECONDS);
- needStopTimer = true;
- } else {
- this.timer = timer;
- }
if (conf.getServiceUrlProvider() != null) {
conf.getServiceUrlProvider().initialize(this);
@@ -1211,7 +1217,7 @@ public class PulsarClientImpl implements PulsarClient {
public LookupService createLookup(String url) throws PulsarClientException
{
if (url.startsWith("http")) {
- return new HttpLookupService(instrumentProvider, conf,
eventLoopGroup);
+ return new HttpLookupService(instrumentProvider, conf,
eventLoopGroup, timer, getNameResolver());
} else {
return new BinaryProtoLookupService(this, url,
conf.getListenerName(), conf.isUseTls(),
externalExecutorProvider.getExecutor());
@@ -1299,15 +1305,6 @@ public class PulsarClientImpl implements PulsarClient {
});
}
- private static EventLoopGroup getEventLoopGroup(ClientConfigurationData
conf) {
- ThreadFactory threadFactory = getThreadFactory("pulsar-client-io");
- return EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(),
conf.isEnableBusyWait(), threadFactory);
- }
-
- private static ThreadFactory getThreadFactory(String poolName) {
- return new ExecutorProvider.ExtendedThreadFactory(poolName,
Thread.currentThread().isDaemon());
- }
-
void cleanupProducer(ProducerBase<?> producer) {
producers.remove(producer);
}
@@ -1415,4 +1412,8 @@ public class PulsarClientImpl implements PulsarClient {
public TransactionBuilder newTransaction() {
return new TransactionBuilderImpl(this, tcClient);
}
+
+ NameResolver<InetAddress> getNameResolver() {
+ return DnsResolverUtil.adaptToNameResolver(addressResolver);
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
index 6fe95140967..0351477985f 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientImplementationBindingImpl.java
@@ -37,6 +37,7 @@ import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.MessageIdAdv;
import org.apache.pulsar.client.api.MessagePayloadFactory;
import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientSharedResourcesBuilder;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TopicMessageId;
import org.apache.pulsar.client.api.schema.GenericRecord;
@@ -403,4 +404,9 @@ public final class PulsarClientImplementationBindingImpl
implements PulsarClient
}
return new TopicMessageIdImpl(topic, messageIdAdv);
}
+
+ @Override
+ public PulsarClientSharedResourcesBuilder newSharedResourcesBuilder() {
+ return new PulsarClientSharedResourcesBuilderImpl();
+ }
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientResourcesConfigurer.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientResourcesConfigurer.java
new file mode 100644
index 00000000000..28f97e92159
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientResourcesConfigurer.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.HashedWheelTimer;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.client.util.ScheduledExecutorProvider;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
+
+class PulsarClientResourcesConfigurer {
+ static final String NAME_TIMER = "pulsar-timer";
+ static final String POOL_NAME_LOOKUP_EXECUTOR = "pulsar-client-lookup";
+ static final String POOL_NAME_INTERNAL_EXECUTOR = "pulsar-client-internal";
+ static final String POOL_NAME_LISTENER_EXECUTOR =
"pulsar-external-listener";
+ static final String POOL_NAME_SCHEDULED_EXECUTOR =
"pulsar-client-scheduled";
+ static final String POOL_NAME_EVENT_LOOP_GROUP = "pulsar-client-io";
+ static final int LOOKUP_EXECUTOR_NUM_THREADS = 1;
+
+ static HashedWheelTimer createTimer() {
+ return createTimerInternal(NAME_TIMER, 1L, TimeUnit.MILLISECONDS);
+ }
+
+ private static HashedWheelTimer createTimerInternal(String poolName, long
tickDuration, TimeUnit timeUnit) {
+ return new HashedWheelTimer(createThreadFactory(poolName),
tickDuration, timeUnit);
+ }
+
+ static HashedWheelTimer
createTimer(PulsarClientSharedResourcesBuilderImpl.TimerResourceConfig
resourceConfig) {
+ if (resourceConfig == null) {
+ resourceConfig = new
PulsarClientSharedResourcesBuilderImpl.TimerResourceConfig();
+ }
+ return createTimerInternal(resourceConfig.name,
resourceConfig.tickDuration,
+ resourceConfig.tickDurationTimeUnit);
+ }
+
+ static ExecutorProvider createLookupExecutorProvider() {
+ return createExecutorProviderInternal(LOOKUP_EXECUTOR_NUM_THREADS,
POOL_NAME_LOOKUP_EXECUTOR);
+ }
+
+ static ExecutorProvider createLookupExecutorProviderWithResourceConfig(
+ PulsarClientSharedResourcesBuilderImpl.ThreadPoolResourceConfig
resourceConfig) {
+ return createExecutorProviderInternal(resourceConfig,
POOL_NAME_LOOKUP_EXECUTOR);
+ }
+
+ private static ExecutorProvider createExecutorProviderInternal(int
numThreads, String poolName) {
+ return createExecutorProviderInternal(numThreads, poolName,
Thread.currentThread().isDaemon());
+ }
+
+ private static ExecutorProvider createExecutorProviderInternal(int
numThreads, String poolName, boolean daemon) {
+ return new ExecutorProvider(numThreads, poolName, daemon);
+ }
+
+ static ExecutorProvider
createInternalExecutorProvider(ClientConfigurationData conf) {
+ return createExecutorProviderInternal(conf.getNumIoThreads(),
POOL_NAME_INTERNAL_EXECUTOR);
+ }
+
+ static ExecutorProvider createInternalExecutorProviderWithResourceConfig(
+ PulsarClientSharedResourcesBuilderImpl.ThreadPoolResourceConfig
resourceConfig) {
+ return createExecutorProviderInternal(resourceConfig,
POOL_NAME_INTERNAL_EXECUTOR);
+ }
+
+ static ExecutorProvider
createExternalExecutorProvider(ClientConfigurationData conf) {
+ return createExecutorProviderInternal(conf.getNumListenerThreads(),
POOL_NAME_LISTENER_EXECUTOR);
+ }
+
+ static ExecutorProvider createExternalExecutorProviderWithResourceConfig(
+ PulsarClientSharedResourcesBuilderImpl.ThreadPoolResourceConfig
resourceConfig) {
+ return createExecutorProviderInternal(resourceConfig,
POOL_NAME_LISTENER_EXECUTOR);
+ }
+
+ private static ExecutorProvider createExecutorProviderInternal(
+ PulsarClientSharedResourcesBuilderImpl.ThreadPoolResourceConfig
resourceConfig, String defaultPoolName) {
+ if (resourceConfig == null) {
+ resourceConfig = new
PulsarClientSharedResourcesBuilderImpl.ThreadPoolResourceConfig();
+ }
+ String poolName = StringUtils.isNotBlank(resourceConfig.name) ?
resourceConfig.name : defaultPoolName;
+ return createExecutorProviderInternal(resourceConfig.numberOfThreads,
poolName, resourceConfig.daemon);
+ }
+
+ static ScheduledExecutorProvider
createScheduledExecutorProvider(ClientConfigurationData conf) {
+ return new ScheduledExecutorProvider(conf.getNumIoThreads(),
POOL_NAME_SCHEDULED_EXECUTOR);
+ }
+
+ static ScheduledExecutorProvider
createScheduledExecutorProviderWithResourceConfig(
+ PulsarClientSharedResourcesBuilderImpl.ThreadPoolResourceConfig
resourceConfig) {
+ if (resourceConfig == null) {
+ resourceConfig = new
PulsarClientSharedResourcesBuilderImpl.ThreadPoolResourceConfig();
+ }
+ String poolName =
+ StringUtils.isNotBlank(resourceConfig.name) ?
resourceConfig.name : POOL_NAME_SCHEDULED_EXECUTOR;
+ return new ScheduledExecutorProvider(resourceConfig.numberOfThreads,
poolName, resourceConfig.daemon);
+ }
+
+ static EventLoopGroup createEventLoopGroup(ClientConfigurationData conf) {
+ ThreadFactory threadFactory =
createThreadFactory(POOL_NAME_EVENT_LOOP_GROUP);
+ return EventLoopUtil.newEventLoopGroup(conf.getNumIoThreads(),
conf.isEnableBusyWait(), threadFactory);
+ }
+
+ static EventLoopGroup createEventLoopGroupWithResourceConfig(
+ PulsarClientSharedResourcesBuilderImpl.EventLoopResourceConfig
+ eventLoopResourceConfig) {
+ if (eventLoopResourceConfig == null) {
+ eventLoopResourceConfig = new
PulsarClientSharedResourcesBuilderImpl.EventLoopResourceConfig();
+ }
+ ThreadFactory threadFactory =
createThreadFactory(eventLoopResourceConfig.name,
eventLoopResourceConfig.daemon);
+ return
EventLoopUtil.newEventLoopGroup(eventLoopResourceConfig.numberOfThreads,
+ eventLoopResourceConfig.enableBusyWait, threadFactory);
+ }
+
+ static ThreadFactory createThreadFactory(String poolName) {
+ return createThreadFactory(poolName,
Thread.currentThread().isDaemon());
+ }
+
+ static ThreadFactory createThreadFactory(String poolName, boolean daemon) {
+ return new ExecutorProvider.ExtendedThreadFactory(poolName, daemon);
+ }
+
+ static DnsResolverGroupImpl createDnsResolverGroup(ClientConfigurationData
conf) {
+ return new DnsResolverGroupImpl(conf);
+ }
+
+ static DnsResolverGroupImpl createDnsResolverGroupWithResourceConfig(
+ PulsarClientSharedResourcesBuilderImpl.DnsResolverResourceConfig
resourceConfig) {
+ if (resourceConfig == null) {
+ resourceConfig = new
PulsarClientSharedResourcesBuilderImpl.DnsResolverResourceConfig();
+ }
+ return new DnsResolverGroupImpl(resourceConfig);
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesBuilderImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesBuilderImpl.java
new file mode 100644
index 00000000000..fe9d43dc41e
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesBuilderImpl.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import com.google.common.collect.Lists;
+import java.net.InetSocketAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+import org.apache.pulsar.client.api.DnsResolverConfig;
+import org.apache.pulsar.client.api.EventLoopGroupConfig;
+import org.apache.pulsar.client.api.PulsarClientSharedResources;
+import org.apache.pulsar.client.api.PulsarClientSharedResourcesBuilder;
+import org.apache.pulsar.client.api.ThreadPoolConfig;
+import org.apache.pulsar.client.api.TimerConfig;
+import org.apache.pulsar.common.util.netty.DnsResolverUtil;
+
+public class PulsarClientSharedResourcesBuilderImpl implements
PulsarClientSharedResourcesBuilder {
+ Set<PulsarClientSharedResources.SharedResource> sharedResources = new
HashSet<>();
+ Map<PulsarClientSharedResources.SharedResource, ResourceConfig>
resourceConfigs = new HashMap<>();
+ private boolean shareConfigured;
+
+ interface ResourceConfig {
+
+ }
+
+ abstract static class NamedResourceConfig<T> implements ResourceConfig {
+ String name;
+
+ public T name(String name) {
+ this.name = name;
+ return self();
+ }
+
+ @SuppressWarnings("unchecked")
+ T self() {
+ return (T) this;
+ }
+ }
+
+ static class ThreadPoolResourceConfig extends
NamedResourceConfig<ThreadPoolConfig> implements ThreadPoolConfig {
+ int numberOfThreads = Runtime.getRuntime().availableProcessors();
+ boolean daemon = Thread.currentThread().isDaemon();
+
+ @Override
+ public ThreadPoolConfig numberOfThreads(int numberOfThreads) {
+ this.numberOfThreads = numberOfThreads;
+ return this;
+ }
+
+ @Override
+ public ThreadPoolConfig daemon(boolean daemon) {
+ this.daemon = daemon;
+ return this;
+ }
+ }
+
+ static class EventLoopResourceConfig extends
NamedResourceConfig<EventLoopGroupConfig>
+ implements EventLoopGroupConfig {
+ int numberOfThreads = Runtime.getRuntime().availableProcessors();
+ boolean daemon = Thread.currentThread().isDaemon();
+ boolean enableBusyWait;
+
+ EventLoopResourceConfig() {
+ name = PulsarClientResourcesConfigurer.POOL_NAME_EVENT_LOOP_GROUP;
+ }
+
+ @Override
+ public EventLoopGroupConfig numberOfThreads(int numberOfThreads) {
+ this.numberOfThreads = numberOfThreads;
+ return this;
+ }
+
+ @Override
+ public EventLoopGroupConfig daemon(boolean daemon) {
+ this.daemon = daemon;
+ return this;
+ }
+
+ @Override
+ public EventLoopGroupConfig enableBusyWait(boolean enableBusyWait) {
+ this.enableBusyWait = enableBusyWait;
+ return this;
+ }
+ }
+
+ static class TimerResourceConfig extends NamedResourceConfig<TimerConfig>
implements TimerConfig {
+ long tickDuration = 1L;
+ TimeUnit tickDurationTimeUnit = TimeUnit.MILLISECONDS;
+
+ TimerResourceConfig() {
+ name = PulsarClientResourcesConfigurer.NAME_TIMER;
+ }
+
+ @Override
+ public TimerConfig tickDuration(long tickDuration, TimeUnit timeUnit) {
+ this.tickDuration = tickDuration;
+ this.tickDurationTimeUnit = timeUnit;
+ return this;
+ }
+ }
+
+ static class DnsResolverResourceConfig implements ResourceConfig,
DnsResolverConfig {
+ InetSocketAddress localAddress;
+ Collection<InetSocketAddress> serverAddresses;
+ int minTtl = DnsResolverUtil.getDefaultMinTTL();
+ int maxTtl = DnsResolverUtil.getDefaultTTL();
+ int negativeTtl = DnsResolverUtil.getDefaultNegativeTTL();
+ long queryTimeoutMillis = -1L;
+ boolean traceEnabled = true;
+ boolean tcpFallbackEnabled = true;
+ boolean tcpFallbackOnTimeoutEnabled = true;
+ int ndots = -1;
+ Collection<String> searchDomains;
+
+ @Override
+ public DnsResolverConfig localAddress(InetSocketAddress localAddress) {
+ this.localAddress = localAddress;
+ return this;
+ }
+
+ @Override
+ public DnsResolverConfig serverAddresses(Iterable<InetSocketAddress>
addresses) {
+ this.serverAddresses = Lists.newArrayList(addresses);
+ return this;
+ }
+
+ @Override
+ public DnsResolverConfig minTtl(int minTtl) {
+ this.minTtl = minTtl;
+ return this;
+ }
+
+ @Override
+ public DnsResolverConfig maxTtl(int maxTtl) {
+ this.maxTtl = maxTtl;
+ return this;
+ }
+
+ @Override
+ public DnsResolverConfig negativeTtl(int negativeTtl) {
+ this.negativeTtl = negativeTtl;
+ return this;
+ }
+
+ @Override
+ public DnsResolverConfig queryTimeoutMillis(long queryTimeoutMillis) {
+ this.queryTimeoutMillis = queryTimeoutMillis;
+ return this;
+ }
+
+ @Override
+ public DnsResolverConfig traceEnabled(boolean traceEnabled) {
+ this.traceEnabled = traceEnabled;
+ return this;
+ }
+
+ @Override
+ public DnsResolverConfig tcpFallbackEnabled(boolean
tcpFallbackEnabled) {
+ this.tcpFallbackEnabled = tcpFallbackEnabled;
+ return this;
+ }
+
+ @Override
+ public DnsResolverConfig tcpFallbackOnTimeoutEnabled(boolean
tcpFallbackOnTimeoutEnabled) {
+ this.tcpFallbackOnTimeoutEnabled = tcpFallbackOnTimeoutEnabled;
+ return this;
+ }
+
+ @Override
+ public DnsResolverConfig ndots(int ndots) {
+ this.ndots = ndots;
+ return this;
+ }
+
+ @Override
+ public DnsResolverConfig searchDomains(Iterable<String> searchDomains)
{
+ this.searchDomains = Lists.newArrayList(searchDomains);
+ return this;
+ }
+ }
+
+ @Override
+ public PulsarClientSharedResourcesBuilder resourceTypes(
+ PulsarClientSharedResources.SharedResource... sharedResource) {
+ if (shareConfigured) {
+ throw new IllegalStateException("Cannot set resourceTypes when
shareConfigured() has already been called");
+ }
+ return resourceTypes(List.of(sharedResource));
+ }
+
+ @Override
+ public PulsarClientSharedResourcesBuilder resourceTypes(
+ Collection<PulsarClientSharedResources.SharedResource>
sharedResource) {
+ if (shareConfigured) {
+ throw new IllegalStateException("Cannot set resourceTypes when
shareConfigured() has already been called");
+ }
+ sharedResources.addAll(sharedResource);
+ return this;
+ }
+
+ @Override
+ public PulsarClientSharedResourcesBuilder shareConfigured() {
+ if (!sharedResources.isEmpty()) {
+ throw new IllegalStateException("Cannot use shareConfigured() when
resourceTypes has already been set");
+ }
+ shareConfigured = true;
+ return this;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T extends ResourceConfig> T
getOrCreateConfig(PulsarClientSharedResources.SharedResource sharedResource) {
+ return (T) resourceConfigs.computeIfAbsent(sharedResource, k -> {
+ switch (sharedResource.getType()) {
+ case EventLoopGroup:
+ return new EventLoopResourceConfig();
+ case DnsResolver:
+ return new DnsResolverResourceConfig();
+ case ThreadPool:
+ return new ThreadPoolResourceConfig();
+ case Timer:
+ return new TimerResourceConfig();
+ default:
+ throw new IllegalArgumentException("Unknown resource type:
" + sharedResource.getType());
+ }
+ });
+ }
+
+ @Override
+ public PulsarClientSharedResourcesBuilder configureThreadPool(
+ PulsarClientSharedResources.SharedResource sharedResource,
Consumer<ThreadPoolConfig<?>> configurer) {
+ if (sharedResource.getType() !=
PulsarClientSharedResources.SharedResourceType.ThreadPool) {
+ throw new IllegalArgumentException("The shared resource " +
sharedResource + " doesn't support thread pool"
+ + " configuration");
+ }
+ configurer.accept(getOrCreateConfig(sharedResource));
+ return this;
+ }
+
+ @Override
+ public PulsarClientSharedResourcesBuilder
configureEventLoop(Consumer<EventLoopGroupConfig> configurer) {
+
configurer.accept(getOrCreateConfig(PulsarClientSharedResources.SharedResource.EventLoopGroup));
+ return this;
+ }
+
+ @Override
+ public PulsarClientSharedResourcesBuilder
configureDnsResolver(Consumer<DnsResolverConfig> configurer) {
+
configurer.accept(getOrCreateConfig(PulsarClientSharedResources.SharedResource.DnsResolver));
+ return this;
+ }
+
+ @Override
+ public PulsarClientSharedResourcesBuilder
configureTimer(Consumer<TimerConfig> configurer) {
+
configurer.accept(getOrCreateConfig(PulsarClientSharedResources.SharedResource.Timer));
+ return this;
+ }
+
+ @Override
+ public PulsarClientSharedResources build() {
+ return new PulsarClientSharedResourcesImpl(sharedResources,
resourceConfigs, shareConfigured);
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesImpl.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesImpl.java
new file mode 100644
index 00000000000..29953784681
--- /dev/null
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesImpl.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import static
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createDnsResolverGroupWithResourceConfig;
+import static
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createEventLoopGroupWithResourceConfig;
+import static
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createExternalExecutorProviderWithResourceConfig;
+import static
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createInternalExecutorProviderWithResourceConfig;
+import static
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createLookupExecutorProviderWithResourceConfig;
+import static
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createScheduledExecutorProviderWithResourceConfig;
+import static
org.apache.pulsar.client.impl.PulsarClientResourcesConfigurer.createTimer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.util.Timer;
+import java.util.EnumSet;
+import java.util.Map;
+import java.util.Set;
+import lombok.Getter;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientSharedResources;
+import org.apache.pulsar.client.util.ExecutorProvider;
+import org.apache.pulsar.client.util.ScheduledExecutorProvider;
+import org.apache.pulsar.common.util.netty.EventLoopUtil;
+
+@Getter
+public class PulsarClientSharedResourcesImpl implements
PulsarClientSharedResources {
+ Set<SharedResource> sharedResources;
+ protected final EventLoopGroup ioEventLoopGroup;
+ private final ExecutorProvider internalExecutorProvider;
+ private final ExecutorProvider externalExecutorProvider;
+ private final ScheduledExecutorProvider scheduledExecutorProvider;
+ private final Timer timer;
+ private final ExecutorProvider lookupExecutorProvider;
+ private final DnsResolverGroupImpl dnsResolverGroup;
+
+ public PulsarClientSharedResourcesImpl(Set<SharedResource> sharedResources,
+ Map<SharedResource,
PulsarClientSharedResourcesBuilderImpl.ResourceConfig>
+ resourceConfigs, boolean
shareConfigured) {
+ if (shareConfigured) {
+ if (resourceConfigs.isEmpty()) {
+ throw new IllegalArgumentException(
+ "No resources have been configured while using
shareConfigured mode");
+ }
+ this.sharedResources = Set.copyOf(resourceConfigs.keySet());
+ } else {
+ if (sharedResources.isEmpty()) {
+ this.sharedResources = EnumSet.allOf(SharedResource.class);
+ } else {
+ this.sharedResources = Set.copyOf(sharedResources);
+ }
+ }
+ this.ioEventLoopGroup =
this.sharedResources.contains(SharedResource.EventLoopGroup)
+ ? createEventLoopGroupWithResourceConfig(
+ getResourceConfig(resourceConfigs,
SharedResource.EventLoopGroup))
+ : null;
+ this.externalExecutorProvider =
this.sharedResources.contains(SharedResource.ListenerExecutor)
+ ? createExternalExecutorProviderWithResourceConfig(
+ getResourceConfig(resourceConfigs,
SharedResource.ListenerExecutor))
+ : null;
+ this.internalExecutorProvider =
this.sharedResources.contains(SharedResource.InternalExecutor)
+ ? createInternalExecutorProviderWithResourceConfig(
+ getResourceConfig(resourceConfigs,
SharedResource.InternalExecutor))
+ : null;
+ this.scheduledExecutorProvider =
this.sharedResources.contains(SharedResource.ScheduledExecutor)
+ ? createScheduledExecutorProviderWithResourceConfig(
+ getResourceConfig(resourceConfigs,
SharedResource.ScheduledExecutor))
+ : null;
+ this.lookupExecutorProvider =
this.sharedResources.contains(SharedResource.LookupExecutor)
+ ? createLookupExecutorProviderWithResourceConfig(
+ getResourceConfig(resourceConfigs,
SharedResource.LookupExecutor))
+ : null;
+ this.timer = this.sharedResources.contains(SharedResource.Timer)
+ ? createTimer(getResourceConfig(resourceConfigs,
SharedResource.Timer))
+ : null;
+ this.dnsResolverGroup =
this.sharedResources.contains(SharedResource.DnsResolver)
+ ? createDnsResolverGroupWithResourceConfig(
+ getResourceConfig(resourceConfigs, SharedResource.DnsResolver))
+ : null;
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T extends PulsarClientSharedResourcesBuilderImpl.ResourceConfig>
T getResourceConfig(
+ Map<SharedResource,
PulsarClientSharedResourcesBuilderImpl.ResourceConfig> resourceConfigs,
+ SharedResource resource) {
+ return (T) resourceConfigs.get(resource);
+ }
+
+ @Override
+ public boolean contains(SharedResource sharedResource) {
+ return sharedResources.contains(sharedResource);
+ }
+
+ public Set<SharedResource> getSharedResources() {
+ return sharedResources;
+ }
+
+ @Override
+ public void close() throws PulsarClientException {
+ if (externalExecutorProvider != null) {
+ externalExecutorProvider.shutdownNow();
+ }
+ if (internalExecutorProvider != null) {
+ internalExecutorProvider.shutdownNow();
+ }
+ if (scheduledExecutorProvider != null) {
+ scheduledExecutorProvider.shutdownNow();
+ }
+ if (lookupExecutorProvider != null) {
+ lookupExecutorProvider.shutdownNow();
+ }
+ if (dnsResolverGroup != null) {
+ dnsResolverGroup.close();
+ }
+ if (timer != null) {
+ timer.stop();
+ }
+ if (ioEventLoopGroup != null) {
+ EventLoopUtil.shutdownGracefully(ioEventLoopGroup);
+ }
+ }
+
+ public void applyTo(PulsarClientImpl.PulsarClientImplBuilder
instanceBuilder) {
+ if (externalExecutorProvider != null) {
+ instanceBuilder.externalExecutorProvider(externalExecutorProvider);
+ }
+ if (internalExecutorProvider != null) {
+ instanceBuilder.internalExecutorProvider(internalExecutorProvider);
+ }
+ if (scheduledExecutorProvider != null) {
+
instanceBuilder.scheduledExecutorProvider(scheduledExecutorProvider);
+ }
+ if (lookupExecutorProvider != null) {
+ instanceBuilder.lookupExecutorProvider(lookupExecutorProvider);
+ }
+ if (dnsResolverGroup != null) {
+ instanceBuilder.dnsResolverGroup(dnsResolverGroup);
+ }
+ if (timer != null) {
+ instanceBuilder.timer(timer);
+ }
+ if (ioEventLoopGroup != null) {
+ instanceBuilder.eventLoopGroup(ioEventLoopGroup);
+ }
+ }
+}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
index 88654c51300..ae3b532a054 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ExecutorProvider.java
@@ -61,13 +61,17 @@ public class ExecutorProvider {
}
public ExecutorProvider(int numThreads, String poolName) {
+ this(numThreads, poolName, Thread.currentThread().isDaemon());
+ }
+
+ public ExecutorProvider(int numThreads, String poolName, boolean daemon) {
checkArgument(numThreads > 0);
this.numThreads = numThreads;
Objects.requireNonNull(poolName);
executors = new ArrayList<>(numThreads);
for (int i = 0; i < numThreads; i++) {
ExtendedThreadFactory threadFactory = new ExtendedThreadFactory(
- poolName, Thread.currentThread().isDaemon());
+ poolName, daemon);
ExecutorService executor = createExecutor(threadFactory);
executors.add(Pair.of(executor, threadFactory));
}
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
index 3ebd41ebe56..b47659e514a 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/util/ScheduledExecutorProvider.java
@@ -24,6 +24,9 @@ import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ScheduledExecutorProvider extends ExecutorProvider {
+ public ScheduledExecutorProvider(int numThreads, String poolName, boolean
daemon) {
+ super(numThreads, poolName, daemon);
+ }
public ScheduledExecutorProvider(int numThreads, String poolName) {
super(numThreads, poolName);
diff --git a/pulsar-client/src/main/resources/findbugsExclude.xml
b/pulsar-client/src/main/resources/findbugsExclude.xml
index f7cf6b9cfd5..802c03622b0 100644
--- a/pulsar-client/src/main/resources/findbugsExclude.xml
+++ b/pulsar-client/src/main/resources/findbugsExclude.xml
@@ -257,6 +257,10 @@
<Method name="<init>"/>
<Bug pattern="EI_EXPOSE_REP2"/>
</Match>
+ <Match>
+ <Class
name="org.apache.pulsar.client.impl.PulsarClientSharedResourcesImpl"/>
+ <Bug pattern="EI_EXPOSE_REP"/>
+ </Match>
<Match>
<Class name="org.apache.pulsar.client.impl.ClientCnx"/>
<Method name="getPendingRequests"/>
@@ -712,6 +716,11 @@
<Method name="<init>"/>
<Bug pattern="EI_EXPOSE_REP2"/>
</Match>
+ <Match>
+ <Class name="org.apache.pulsar.client.impl.PulsarClientImpl"/>
+ <Method name="<init>"/>
+ <Bug pattern="EI_EXPOSE_REP2"/>
+ </Match>
<Match>
<Class name="org.apache.pulsar.client.impl.PulsarClientImpl"/>
<Method name="getConfiguration"/>
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
index 265af6dd23c..ca4aca6c329 100644
---
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ControlledClusterFailoverTest.java
@@ -54,6 +54,12 @@ public class ControlledClusterFailoverTest {
.build();
ControlledClusterFailover controlledClusterFailover =
(ControlledClusterFailover) provider;
+
+ PulsarClientImpl pulsarClient = mock(PulsarClientImpl.class);
+ ConnectionPool connectionPool = mock(ConnectionPool.class);
+ when(pulsarClient.getCnxPool()).thenReturn(connectionPool);
+ controlledClusterFailover.initialize(pulsarClient);
+
Request request =
controlledClusterFailover.getRequestBuilder().build();
Assert.assertTrue(provider instanceof ControlledClusterFailover);
diff --git
a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesBuilderImplTest.java
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesBuilderImplTest.java
new file mode 100644
index 00000000000..f01906911a6
--- /dev/null
+++
b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientSharedResourcesBuilderImplTest.java
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.client.api.PulsarClient;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.api.PulsarClientSharedResources;
+import org.testng.annotations.Test;
+
+public class PulsarClientSharedResourcesBuilderImplTest {
+ @Test
+ public void testSharedResources() throws PulsarClientException {
+
runClientsWithSharedResources(PulsarClientSharedResources.builder().build(),
1000, false);
+
runClientsWithSharedResources(PulsarClientSharedResources.builder().build(),
1000, true);
+ }
+
+ private static void
runClientsWithSharedResources(PulsarClientSharedResources sharedResources, int
numberOfClients,
+ boolean useHttpLookup)
+ throws PulsarClientException {
+ List<PulsarClient> clients = new ArrayList<>();
+ for (int i = 0; i < numberOfClients; i++) {
+ clients.add(PulsarClient.builder()
+ .serviceUrl(useHttpLookup ? "http://localhost:8080" :
"pulsar://localhost:6650")
+ .sharedResources(sharedResources)
+ .build());
+ }
+ for (PulsarClient client : clients) {
+ client.close();
+ }
+ sharedResources.close();
+ }
+
+ @Test
+ public void testPulsarClientSharedResourcesBuilderUsingPublicAPI() throws
PulsarClientException {
+ PulsarClientSharedResources sharedResources =
PulsarClientSharedResources.builder()
+ .configureEventLoop(eventLoopGroupConfig -> {
+ eventLoopGroupConfig
+ .name("testEventLoop")
+ .numberOfThreads(10);
+ })
+ .configureDnsResolver(dnsResolverConfig -> {
+ dnsResolverConfig.localAddress(new InetSocketAddress(0));
+ })
+
.configureThreadPool(PulsarClientSharedResources.SharedResource.ListenerExecutor,
+ threadPoolConfig -> {
+ threadPoolConfig.name("testListenerThreadPool")
+ .daemon(true)
+ .numberOfThreads(12);
+ }
+ )
+
.configureThreadPool(PulsarClientSharedResources.SharedResource.InternalExecutor,
+ threadPoolConfig -> {
+ threadPoolConfig.name("testInternalThreadPool")
+ .daemon(true)
+ .numberOfThreads(2);
+ }
+ )
+
.configureThreadPool(PulsarClientSharedResources.SharedResource.LookupExecutor,
+ threadPoolConfig -> {
+ threadPoolConfig.name("testLookupThreadPool")
+ .daemon(true)
+ .numberOfThreads(1);
+ }
+ )
+
.configureThreadPool(PulsarClientSharedResources.SharedResource.ScheduledExecutor,
+ threadPoolConfig -> {
+ threadPoolConfig.name("testSchedulerThreadPool")
+ .daemon(true)
+ .numberOfThreads(1);
+ }
+ )
+ .configureTimer(timerConfig -> {
+ timerConfig.name("testTimer").tickDuration(100,
TimeUnit.MILLISECONDS);
+ })
+ .build();
+ runClientsWithSharedResources(sharedResources, 1000, false);
+ sharedResources.close();
+ }
+
+ @Test
+ public void testPartialSharing() throws PulsarClientException {
+ PulsarClientSharedResources sharedResources =
+ PulsarClientSharedResources.builder()
+ .shareConfigured()
+ .configureEventLoop(eventLoopGroupConfig -> {
+
eventLoopGroupConfig.name("testEventLoop").numberOfThreads(10);
+ }).configureDnsResolver(dnsResolverConfig -> {
+ dnsResolverConfig.localAddress(new InetSocketAddress(0));
+ }).build();
+ runClientsWithSharedResources(sharedResources, 2, false);
+ sharedResources.close();
+ }
+
+ @Test
+ public void testDnsResolverConfig() throws PulsarClientException {
+ PulsarClientSharedResources sharedResources =
+ PulsarClientSharedResources.builder()
+ .shareConfigured()
+ .configureDnsResolver(dnsResolverConfig -> {
+ dnsResolverConfig
+ .localAddress(new InetSocketAddress(0))
+ .serverAddresses(List.of(new
InetSocketAddress("8.8.8.8", 53)))
+ .minTtl(0)
+ .maxTtl(45)
+ .negativeTtl(10)
+ .ndots(1)
+ .searchDomains(List.of("mycompany.com"))
+ .queryTimeoutMillis(5000L)
+ .tcpFallbackEnabled(true)
+ .tcpFallbackOnTimeoutEnabled(false)
+ .traceEnabled(false);
+ }).build();
+ runClientsWithSharedResources(sharedResources, 2, false);
+ sharedResources.close();
+ }
+}
\ No newline at end of file
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
index bcff83acd94..92c025092d3 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/DnsResolverUtil.java
@@ -18,10 +18,24 @@
*/
package org.apache.pulsar.common.util.netty;
+import io.netty.resolver.AddressResolver;
+import io.netty.resolver.DefaultNameResolver;
+import io.netty.resolver.InetNameResolver;
+import io.netty.resolver.InetSocketAddressResolver;
+import io.netty.resolver.NameResolver;
import io.netty.resolver.dns.DnsNameResolverBuilder;
+import io.netty.util.concurrent.EventExecutor;
+import io.netty.util.concurrent.GenericFutureListener;
+import io.netty.util.concurrent.ImmediateEventExecutor;
+import io.netty.util.concurrent.Promise;
+import java.lang.reflect.Field;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.security.Security;
+import java.util.List;
import java.util.Optional;
import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.reflect.FieldUtils;
@Slf4j
public class DnsResolverUtil {
@@ -61,8 +75,15 @@ public class DnsResolverUtil {
.map(Integer::decode)
.filter(i -> i > 0)
.orElseGet(() -> {
- if (System.getSecurityManager() == null) {
- return JDK_DEFAULT_TTL;
+ try {
+ if (System.getSecurityManager() == null) {
+ return JDK_DEFAULT_TTL;
+ }
+ } catch (Throwable t) {
+ log.warn("Cannot use current logic to resolve JDK
default DNS TTL settings. Use "
+ + "sun.net.inetaddr.ttl and
sun.net.inetaddr.negative.ttl system "
+ + "properties for setting default
values for DNS TTL settings. {}",
+ t.getMessage());
}
return DEFAULT_TTL;
});
@@ -92,4 +113,97 @@ public class DnsResolverUtil {
dnsNameResolverBuilder.ttl(MIN_TTL, TTL);
dnsNameResolverBuilder.negativeTtl(NEGATIVE_TTL);
}
+
+ public static int getDefaultMinTTL() {
+ return MIN_TTL;
+ }
+
+ public static int getDefaultTTL() {
+ return TTL;
+ }
+
+ public static int getDefaultNegativeTTL() {
+ return NEGATIVE_TTL;
+ }
+
+ /**
+ * Extract the underlying Netty NameResolver from an AddressResolver
instance or creates an adapter as the
+ * fallback. If null is passed in a default Netty NameResolver will be
returned which delegates to the
+ * blocking JDK DNS resolver.
+ *
+ * @param addressResolver Netty AddressResolver instance or null
+ * @return Netty NameResolver instance
+ */
+ @SuppressWarnings("unchecked")
+ public static NameResolver<InetAddress>
adaptToNameResolver(AddressResolver<InetSocketAddress> addressResolver) {
+ if (addressResolver == null) {
+ return new DefaultNameResolver(ImmediateEventExecutor.INSTANCE);
+ }
+ // Use reflection to extract underlying Netty NameResolver instance.
+ if (InetSocketAddressResolver.class.isInstance(addressResolver)) {
+ try {
+ Field nameResolverField =
+
FieldUtils.getDeclaredField(InetSocketAddressResolver.class, "nameResolver",
true);
+ if (nameResolverField != null) {
+ return (NameResolver<InetAddress>)
FieldUtils.readField(nameResolverField, addressResolver);
+ } else {
+ log.warn("Could not find nameResolver Field in
InetSocketAddressResolver instance.");
+ }
+ } catch (Throwable t) {
+ log.warn("Failed to extract NameResolver from
InetSocketAddressResolver instance. {}", t.getMessage());
+ }
+ }
+ // fallback to use an adapter if reflection fails
+ log.info("Creating NameResolver adapter that wraps an AddressResolver
instance.");
+ return createNameResolverAdapter(addressResolver,
ImmediateEventExecutor.INSTANCE);
+ }
+
+ /**
+ * Creates a NameResolver adapter that wraps an AddressResolver instance.
+ * <p>
+ * This adapter is necessary because Netty doesn't provide a direct
implementation for converting
+ * between AddressResolver and NameResolver, while AsyncHttpClient
specifically requires a NameResolver.
+ * The adapter handles the resolution of hostnames to IP addresses by
delegating to the underlying
+ * AddressResolver.
+ *
+ * @param addressResolver the AddressResolver instance to adapt, handling
InetSocketAddress resolution
+ * @param executor the EventExecutor to be used for executing
resolution tasks
+ * @return a NameResolver instance that wraps the provided AddressResolver
+ */
+ static NameResolver<InetAddress> createNameResolverAdapter(
+ AddressResolver<InetSocketAddress> addressResolver, EventExecutor
executor) {
+ return new InetNameResolver(executor) {
+ @Override
+ protected void doResolve(String inetHost, Promise<InetAddress>
promise) throws Exception {
+ Promise<InetSocketAddress> delegatedPromise =
executor().newPromise();
+
addressResolver.resolve(InetSocketAddress.createUnresolved(inetHost, 1),
delegatedPromise);
+ delegatedPromise.addListener(new
GenericFutureListener<Promise<InetSocketAddress>>() {
+ @Override
+ public void operationComplete(Promise<InetSocketAddress>
future) throws Exception {
+ if (future.isSuccess()) {
+ promise.setSuccess(future.get().getAddress());
+ } else {
+ promise.setFailure(future.cause());
+ }
+ }
+ });
+ }
+
+ @Override
+ protected void doResolveAll(String inetHost,
Promise<List<InetAddress>> promise) throws Exception {
+ Promise<List<InetSocketAddress>> delegatedPromise =
executor().newPromise();
+
addressResolver.resolveAll(InetSocketAddress.createUnresolved(inetHost, 1),
delegatedPromise);
+ delegatedPromise.addListener(new
GenericFutureListener<Promise<List<InetSocketAddress>>>() {
+ @Override
+ public void
operationComplete(Promise<List<InetSocketAddress>> future) throws Exception {
+ if (future.isSuccess()) {
+
promise.setSuccess(future.get().stream().map(InetSocketAddress::getAddress).toList());
+ } else {
+ promise.setFailure(future.cause());
+ }
+ }
+ });
+ }
+ };
+ }
}
diff --git
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java
index cfc766890f0..501fe53f3a3 100644
---
a/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java
+++
b/pulsar-common/src/main/java/org/apache/pulsar/common/util/netty/EventLoopUtil.java
@@ -57,13 +57,7 @@ public class EventLoopUtil {
*/
public static EventLoopGroup newEventLoopGroup(int nThreads, boolean
enableBusyWait, ThreadFactory threadFactory) {
if (Epoll.isAvailable()) {
- String enableIoUring = System.getProperty(ENABLE_IO_URING);
-
- // By default, io_uring will not be enabled, even if available.
The environment variable will be used:
- // enable.io_uring=1
- if (StringUtils.equalsAnyIgnoreCase(enableIoUring, "1", "true")) {
- // Throw exception if IOUring cannot be used
- IOUring.ensureAvailability();
+ if (isIoUringEnabledAndAvailable()) {
return new IOUringEventLoopGroup(nThreads, threadFactory);
} else {
if (!enableBusyWait) {
@@ -96,6 +90,17 @@ public class EventLoopUtil {
}
}
+ private static boolean isIoUringEnabledAndAvailable() {
+ // By default, io_uring will not be enabled, even if available. The
environment variable will be used:
+ // enable.io_uring=1
+ boolean ioUringEnabled =
StringUtils.equalsAnyIgnoreCase(System.getProperty(ENABLE_IO_URING), "1",
"true");
+ if (ioUringEnabled) {
+ // Throw exception if IOUring cannot be used
+ IOUring.ensureAvailability();
+ }
+ return ioUringEnabled;
+ }
+
/**
* Return a SocketChannel class suitable for the given EventLoopGroup
implementation.
*
@@ -112,6 +117,25 @@ public class EventLoopUtil {
}
}
+ /**
+ * Returns the most appropriate SocketChannel implementation based on the
system's capabilities
+ * and configuration. Precedence:
+ * 1) IO_uring (if available and enabled via 'pulsar.enableUring')
+ * 2) Epoll (if available)
+ * 3) NIO (fallback)
+ */
+ public static Class<? extends SocketChannel> getClientSocketChannelClass()
{
+ if (Epoll.isAvailable()) {
+ if (isIoUringEnabledAndAvailable()) {
+ return IOUringSocketChannel.class;
+ } else {
+ return EpollSocketChannel.class;
+ }
+ } else {
+ return NioSocketChannel.class;
+ }
+ }
+
public static Class<? extends ServerSocketChannel>
getServerSocketChannelClass(EventLoopGroup eventLoopGroup) {
if (eventLoopGroup instanceof IOUringEventLoopGroup) {
return IOUringServerSocketChannel.class;
@@ -122,6 +146,25 @@ public class EventLoopUtil {
}
}
+ /**
+ * Returns the most appropriate ServerSocketChannel implementation based
on the system's capabilities
+ * and configuration. Precedence:
+ * 1) IO_uring (if available and enabled via 'pulsar.enableUring')
+ * 2) Epoll (if available)
+ * 3) NIO (fallback)
+ */
+ public static Class<? extends ServerSocketChannel>
getServerSocketChannelClass() {
+ if (Epoll.isAvailable()) {
+ if (isIoUringEnabledAndAvailable()) {
+ return IOUringServerSocketChannel.class;
+ } else {
+ return EpollServerSocketChannel.class;
+ }
+ } else {
+ return NioServerSocketChannel.class;
+ }
+ }
+
public static Class<? extends DatagramChannel>
getDatagramChannelClass(EventLoopGroup eventLoopGroup) {
if (eventLoopGroup instanceof IOUringEventLoopGroup) {
return IOUringDatagramChannel.class;
@@ -132,6 +175,25 @@ public class EventLoopUtil {
}
}
+ /**
+ * Returns the most appropriate DatagramChannel implementation based on
the system's capabilities
+ * and configuration. Precedence:
+ * 1) IO_uring (if available and enabled via 'pulsar.enableUring')
+ * 2) Epoll (if available)
+ * 3) NIO (fallback)
+ */
+ public static Class<? extends DatagramChannel> getDatagramChannelClass() {
+ if (Epoll.isAvailable()) {
+ if (isIoUringEnabledAndAvailable()) {
+ return IOUringDatagramChannel.class;
+ } else {
+ return EpollDatagramChannel.class;
+ }
+ } else {
+ return NioDatagramChannel.class;
+ }
+ }
+
public static void enableTriggeredMode(ServerBootstrap bootstrap) {
if (Epoll.isAvailable()) {
bootstrap.childOption(EpollChannelOption.EPOLL_MODE,
EpollMode.LEVEL_TRIGGERED);
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java
index 46599cc45a0..62b59f89368 100644
---
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverTest.java
@@ -31,6 +31,7 @@ import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+// This test needs to be run separately since it relies on static field values
public class DnsResolverTest {
private static final int MIN_TTL = 0;
private static final int TTL = 101;
diff --git
a/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverUtilTest.java
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverUtilTest.java
new file mode 100644
index 00000000000..beb68be6ed3
--- /dev/null
+++
b/pulsar-common/src/test/java/org/apache/pulsar/common/util/netty/DnsResolverUtilTest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.util.netty;
+
+import static org.testng.Assert.assertEquals;
+import com.google.common.net.InetAddresses;
+import io.netty.resolver.AddressResolver;
+import io.netty.resolver.DefaultNameResolver;
+import io.netty.resolver.NameResolver;
+import io.netty.util.concurrent.ImmediateEventExecutor;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import org.testng.annotations.Test;
+
+public class DnsResolverUtilTest {
+
+ @Test
+ public void testNameResolverAdapter() throws ExecutionException,
InterruptedException {
+ ImmediateEventExecutor executor = ImmediateEventExecutor.INSTANCE;
+ AddressResolver<InetSocketAddress> addressResolver =
+ new DefaultNameResolver(executor).asAddressResolver();
+ NameResolver<InetAddress> nameResolverAdapter =
+ DnsResolverUtil.createNameResolverAdapter(addressResolver,
executor);
+ testNameResolver(nameResolverAdapter);
+ }
+
+ @Test
+ public void testNameResolverExtraction() throws ExecutionException,
InterruptedException {
+ ImmediateEventExecutor executor = ImmediateEventExecutor.INSTANCE;
+ AddressResolver<InetSocketAddress> addressResolver =
+ new DefaultNameResolver(executor).asAddressResolver();
+ NameResolver<InetAddress> nameResolverAdapter =
+ DnsResolverUtil.adaptToNameResolver(addressResolver);
+ testNameResolver(nameResolverAdapter);
+ }
+
+ @Test
+ public void testNameResolverNullFallback() throws ExecutionException,
InterruptedException {
+ NameResolver<InetAddress> nameResolverAdapter =
DnsResolverUtil.adaptToNameResolver(null);
+ testNameResolver(nameResolverAdapter);
+ }
+
+ private static void testNameResolver(NameResolver<InetAddress>
nameResolver)
+ throws InterruptedException, ExecutionException {
+ InetAddress inetAddress = nameResolver.resolve("8.8.8.8").get();
+ assertEquals(inetAddress, InetAddresses.forString("8.8.8.8"));
+ List<InetAddress> inetAddresses =
nameResolver.resolveAll("8.8.8.8").get();
+ assertEquals(inetAddresses.get(0), InetAddresses.forString("8.8.8.8"));
+ }
+}
\ No newline at end of file