This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 25c662d0de1 [improve][client] Add a way to configure which DNS use (#21227) 25c662d0de1 is described below commit 25c662d0de105a95ce39a5de695d774b60e8419e Author: Diego Salvi <lothruin.mir...@gmail.com> AuthorDate: Wed Oct 25 00:16:39 2023 +0200 [improve][client] Add a way to configure which DNS use (#21227) Co-authored-by: tison <wander4...@gmail.com> --- .../java/org/apache/pulsar/client/api/ClientBuilder.java | 8 ++++++++ .../org/apache/pulsar/client/impl/ClientBuilderImpl.java | 13 +++++++++++++ .../java/org/apache/pulsar/client/impl/ConnectionPool.java | 5 +++++ .../pulsar/client/impl/conf/ClientConfigurationData.java | 10 ++++++++++ .../apache/pulsar/client/impl/ClientBuilderImplTest.java | 14 ++++++++++++++ .../apache/pulsar/client/impl/PulsarClientImplTest.java | 11 +++++++++++ .../client/impl/conf/ConfigurationDataUtilsTest.java | 8 +++++++- 7 files changed, 68 insertions(+), 1 deletion(-) diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java index 8b959690a03..b180f6ba7f9 100644 --- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java +++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ClientBuilder.java @@ -21,6 +21,7 @@ package org.apache.pulsar.client.api; import java.io.Serializable; import java.net.InetSocketAddress; import java.time.Clock; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -595,6 +596,13 @@ public interface ClientBuilder extends Serializable, Cloneable { */ ClientBuilder dnsLookupBind(String address, int port); + /** + * Set dns lookup server addresses. + * @param addresses dnsServerAddresses + * @return + */ + ClientBuilder dnsServerAddresses(List<InetSocketAddress> addresses); + /** * Set socks5 proxy address. * @param socks5ProxyAddress diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java index 7677045f089..9a86d81c93f 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ClientBuilderImpl.java @@ -21,6 +21,7 @@ package org.apache.pulsar.client.impl; import static com.google.common.base.Preconditions.checkArgument; import java.net.InetSocketAddress; import java.time.Clock; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -36,6 +37,7 @@ import org.apache.pulsar.client.api.ServiceUrlProvider; import org.apache.pulsar.client.api.SizeUnit; import org.apache.pulsar.client.impl.conf.ClientConfigurationData; import org.apache.pulsar.client.impl.conf.ConfigurationDataUtils; +import org.apache.pulsar.common.tls.InetAddressUtils; public class ClientBuilderImpl implements ClientBuilder { ClientConfigurationData conf; @@ -393,6 +395,17 @@ public class ClientBuilderImpl implements ClientBuilder { return this; } + @Override + public ClientBuilder dnsServerAddresses(List<InetSocketAddress> addresses) { + for (InetSocketAddress address : addresses) { + String ip = address.getHostString(); + checkArgument(InetAddressUtils.isIPv4Address(ip) || InetAddressUtils.isIPv6Address(ip), + "DnsServerAddresses need to be valid IPv4 or IPv6 addresses"); + } + conf.setDnsServerAddresses(addresses); + return this; + } + @Override public ClientBuilder socks5ProxyAddress(InetSocketAddress socks5ProxyAddress) { conf.setSocks5ProxyAddress(socks5ProxyAddress); diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java index ef3a9249c82..9750911b37c 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConnectionPool.java @@ -29,6 +29,7 @@ import io.netty.channel.EventLoopGroup; import io.netty.resolver.AddressResolver; import io.netty.resolver.dns.DnsAddressResolverGroup; import io.netty.resolver.dns.DnsNameResolverBuilder; +import io.netty.resolver.dns.SequentialDnsServerAddressStreamProvider; import io.netty.util.concurrent.Future; import io.netty.util.concurrent.ScheduledFuture; import java.net.InetSocketAddress; @@ -156,6 +157,10 @@ public class ConnectionPool implements AutoCloseable { conf.getDnsLookupBindPort()); dnsNameResolverBuilder.localAddress(addr); } + List<InetSocketAddress> serverAddresses = conf.getDnsServerAddresses(); + if (serverAddresses != null && !serverAddresses.isEmpty()) { + dnsNameResolverBuilder.nameServerProvider(new SequentialDnsServerAddressStreamProvider(serverAddresses)); + } DnsResolverUtil.applyJdkDnsCacheSettings(dnsNameResolverBuilder); // use DnsAddressResolverGroup to create the AddressResolver since it contains a solution // to prevent cache stampede / thundering herds problem when a DNS entry expires while the system diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java index 7d94675ccba..1dc1c2a8689 100644 --- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java +++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/conf/ClientConfigurationData.java @@ -19,11 +19,14 @@ package org.apache.pulsar.client.impl.conf; import com.fasterxml.jackson.annotation.JsonIgnore; +import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import io.swagger.annotations.ApiModelProperty; import java.io.Serializable; import java.net.InetSocketAddress; import java.net.URI; import java.time.Clock; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; @@ -359,6 +362,13 @@ public class ClientConfigurationData implements Serializable, Cloneable { ) private int dnsLookupBindPort = 0; + @ApiModelProperty( + name = "dnsServerAddresses", + value = "The Pulsar client dns lookup server address" + ) + @SuppressFBWarnings({"EI_EXPOSE_REP2", "EI_EXPOSE_REP"}) + private List<InetSocketAddress> dnsServerAddresses = new ArrayList<>(); + // socks5 @ApiModelProperty( name = "socks5ProxyAddress", diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientBuilderImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientBuilderImplTest.java index 9a39c906b8f..f56427b1203 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientBuilderImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ClientBuilderImplTest.java @@ -23,6 +23,8 @@ import static org.assertj.core.api.Assertions.assertThat; import com.google.gson.Gson; import com.google.gson.JsonObject; import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -101,6 +103,18 @@ public class ClientBuilderImplTest { PulsarClient.builder().dnsLookupBind("localhost", 65536).build(); } + @Test(expectedExceptions = IllegalArgumentException.class) + public void testClientBuilderWithIllegalDNSServerHostname() throws PulsarClientException { + PulsarClient.builder().dnsServerAddresses( + Arrays.asList(new InetSocketAddress("1.2.3.4", 53), new InetSocketAddress("localhost",53))); + } + + @Test() + public void testClientBuilderWithDNSServerIP() throws PulsarClientException { + PulsarClient.builder().dnsServerAddresses( + Arrays.asList(new InetSocketAddress("1.2.3.4", 53))); + } + @Test public void testConnectionMaxIdleSeconds() throws Exception { // test config disabled. diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java index c8278ccbd7a..9a4cfce0cc3 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/PulsarClientImplTest.java @@ -37,6 +37,7 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.channel.EventLoopGroup; +import io.netty.resolver.dns.DefaultDnsServerAddressStreamProvider; import io.netty.util.HashedWheelTimer; import io.netty.util.concurrent.DefaultThreadFactory; import java.lang.reflect.Field; @@ -190,6 +191,16 @@ public class PulsarClientImplTest { client.timer().stop(); } + @Test + public void testInitializeWithDNSServerAddresses() throws Exception { + ClientConfigurationData conf = new ClientConfigurationData(); + conf.setDnsServerAddresses(DefaultDnsServerAddressStreamProvider.defaultAddressList()); + conf.setServiceUrl("pulsar://localhost:6650"); + initializeEventLoopGroup(conf); + PulsarClientImpl client = new PulsarClientImpl(conf, eventLoopGroup); + client.shutdown(); + } + @Test public void testResourceCleanup() throws Exception { ClientConfigurationData conf = new ClientConfigurationData(); diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java index 0e0117d400f..cf4a2ae1813 100644 --- a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java +++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/conf/ConfigurationDataUtilsTest.java @@ -30,6 +30,8 @@ import static org.testng.Assert.fail; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.Arrays; +import java.util.List; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -63,7 +65,10 @@ public class ConfigurationDataUtilsTest { config.put("authParamMap", authParamMap); config.put("dnsLookupBindAddress", "0.0.0.0"); config.put("dnsLookupBindPort", 0); - + List<InetSocketAddress> dnsServerAddresses = Arrays.asList(new InetSocketAddress[] { + new InetSocketAddress("1.1.1.1", 53), new InetSocketAddress("2.2.2.2",100) + }); + config.put("dnsServerAddresses", dnsServerAddresses); confData = ConfigurationDataUtils.loadData(config, confData, ClientConfigurationData.class); assertEquals("pulsar://localhost:6650", confData.getServiceUrl()); assertEquals(70000, confData.getMaxLookupRequest()); @@ -74,6 +79,7 @@ public class ConfigurationDataUtilsTest { assertEquals("v2", confData.getAuthParamMap().get("k2")); assertEquals("0.0.0.0", confData.getDnsLookupBindAddress()); assertEquals(0, confData.getDnsLookupBindPort()); + assertEquals(dnsServerAddresses, confData.getDnsServerAddresses()); } @Test