[ 
https://issues.apache.org/jira/browse/KAFKA-6195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16649110#comment-16649110
 ] 

ASF GitHub Bot commented on KAFKA-6195:
---------------------------------------

rajinisivaram closed pull request #4485: KAFKA-6195: Resolve DNS aliases in 
bootstrap.server
URL: https://github.com/apache/kafka/pull/4485
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/config/ClientDnsLookup.java 
b/clients/src/main/java/org/apache/kafka/clients/ClientDnsLookup.java
similarity index 88%
rename from 
clients/src/main/java/org/apache/kafka/common/config/ClientDnsLookup.java
rename to clients/src/main/java/org/apache/kafka/clients/ClientDnsLookup.java
index 4a013b96ff8..96d47c344a0 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ClientDnsLookup.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientDnsLookup.java
@@ -14,14 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package org.apache.kafka.common.config;
+package org.apache.kafka.clients;
 
 import java.util.Locale;
 
 public enum ClientDnsLookup {
 
     DEFAULT("default"),
-    USE_ALL_DNS_IPS("use_all_dns_ips");
+    USE_ALL_DNS_IPS("use_all_dns_ips"),
+    
RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY("resolve_canonical_bootstrap_servers_only");
 
     private String clientDnsLookup;
 
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index 1661ea39695..dce5f3fb65c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.clients;
 
 import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.SaslConfigs;
 import org.apache.kafka.common.network.ChannelBuilder;
@@ -42,10 +41,12 @@
 public final class ClientUtils {
     private static final Logger log = 
LoggerFactory.getLogger(ClientUtils.class);
 
-    private ClientUtils() {}
+    private ClientUtils() {
+    }
 
-    public static List<InetSocketAddress> 
parseAndValidateAddresses(List<String> urls) {
+    public static List<InetSocketAddress> 
parseAndValidateAddresses(List<String> urls, String clientDnsLookup) {
         List<InetSocketAddress> addresses = new ArrayList<>();
+        ClientDnsLookup clientDnsLookupBehaviour = 
ClientDnsLookup.forConfig(clientDnsLookup);
         for (String url : urls) {
             if (url != null && !url.isEmpty()) {
                 try {
@@ -54,15 +55,30 @@ private ClientUtils() {}
                     if (host == null || port == null)
                         throw new ConfigException("Invalid url in " + 
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
 
-                    InetSocketAddress address = new InetSocketAddress(host, 
port);
-
-                    if (address.isUnresolved()) {
-                        log.warn("Removing server {} from {} as DNS resolution 
failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host);
+                    if (clientDnsLookupBehaviour == 
ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) {
+                        InetAddress[] inetAddresses = 
InetAddress.getAllByName(host);
+                        for (InetAddress inetAddress : inetAddresses) {
+                            String resolvedCanonicalName = 
inetAddress.getCanonicalHostName();
+                            InetSocketAddress address = new 
InetSocketAddress(resolvedCanonicalName, port);
+                            if (address.isUnresolved()) {
+                                log.warn("Couldn't resolve server {} from {} 
as DNS resolution of the canonical hostname [} failed for {}", url, 
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, resolvedCanonicalName, host);
+                            } else {
+                                addresses.add(address);
+                            }
+                        }
                     } else {
-                        addresses.add(address);
+                        InetSocketAddress address = new 
InetSocketAddress(host, port);
+                        if (address.isUnresolved()) {
+                            log.warn("Couldn't resolve server {} from {} as 
DNS resolution failed for {}", url, 
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host);
+                        } else {
+                            addresses.add(address);
+                        }
                     }
+
                 } catch (IllegalArgumentException e) {
                     throw new ConfigException("Invalid port in " + 
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
+                } catch (UnknownHostException e) {
+                    throw new ConfigException("Unknown host in " + 
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url);
                 }
             }
         }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index b697de73c85..f198533525b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -18,7 +18,6 @@
 
 import java.util.concurrent.ThreadLocalRandom;
 
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.errors.AuthenticationException;
 
 import java.net.InetAddress;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index b179f0272e8..c8e2357b0a6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -41,6 +41,11 @@
                                                        + "discover the full 
cluster membership (which may change dynamically), this list need not contain 
the full set of "
                                                        + "servers (you may 
want more than one, though, in case a server is down).";
 
+    public static final String CLIENT_DNS_LOOKUP_CONFIG = "client.dns.lookup";
+    public static final String CLIENT_DNS_LOOKUP_DOC = "<p>Controls how the 
client uses DNS lookups.</p><p>If set to <code>use_all_dns_ips</code> then, 
when the lookup returns multiple IP addresses for a hostname,"
+            + " they will all be attempted to connect to before failing the 
connection. Applies to both bootstrap and advertised servers.</p>"
+            + "<p>If the value is 
<code>resolve_canonical_bootstrap_servers_only</code> each entry will be 
resolved and expanded into a list of canonical names.</p>";
+
     public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms";
     public static final String METADATA_MAX_AGE_DOC = "The period of time in 
milliseconds after which we force a refresh of metadata even if we haven't seen 
any partition leadership changes to proactively discover any new brokers or 
partitions.";
 
@@ -93,9 +98,6 @@
                                                          + "elapses the client 
will resend the request if necessary or fail the request if "
                                                          + "retries are 
exhausted.";
 
-    public static final String CLIENT_DNS_LOOKUP_CONFIG = "client.dns.lookup";
-    public static final String CLIENT_DNS_LOOKUP_DOC = "<p>Controls how the 
client uses DNS lookups.</p><p>If set to <code>use_all_dns_ips</code> then, 
when the lookup returns multiple IP addresses for a hostname,"
-                                                        + " they will all be 
attempted to connect to before failing the connection. Applies to both 
bootstrap and advertised servers.</p>";
 
     /**
      * Postprocess the configuration so that exponential backoff is disabled 
when reconnect backoff
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 7ea05f69f3b..c6f0c0b3fb6 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -19,7 +19,6 @@
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.metrics.Sensor;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index a051de29fb7..47c76ac36af 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -17,9 +17,9 @@
 
 package org.apache.kafka.clients.admin;
 
+import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
@@ -43,6 +43,12 @@
     public static final String BOOTSTRAP_SERVERS_CONFIG = 
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
     private static final String BOOTSTRAP_SERVERS_DOC = 
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC;
 
+    /**
+     * <code>client.dns.lookup</code>
+     */
+    public static final String CLIENT_DNS_LOOKUP_CONFIG = 
CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG;
+    private static final String CLIENT_DNS_LOOKUP_DOC = 
CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC;
+
     /**
      * <code>reconnect.backoff.ms</code>
      */
@@ -159,12 +165,14 @@
                                         
in(Sensor.RecordingLevel.INFO.toString(), 
Sensor.RecordingLevel.DEBUG.toString()),
                                         Importance.LOW,
                                         METRICS_RECORDING_LEVEL_DOC)
-                                
.define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
+                                .define(CLIENT_DNS_LOOKUP_CONFIG,
                                         Type.STRING,
                                         ClientDnsLookup.DEFAULT.toString(),
-                                        in(ClientDnsLookup.DEFAULT.toString(), 
ClientDnsLookup.USE_ALL_DNS_IPS.toString()),
+                                        in(ClientDnsLookup.DEFAULT.toString(),
+                                           
ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
+                                           
ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
                                         Importance.MEDIUM,
-                                        
CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
+                                        CLIENT_DNS_LOOKUP_DOC)
                                 // security support
                                 .define(SECURITY_PROTOCOL_CONFIG,
                                         Type.STRING,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 86e14476625..c8418c173f4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -18,10 +18,10 @@
 package org.apache.kafka.clients.admin;
 
 import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.ClientUtils;
-import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.StaleMetadataException;
@@ -46,7 +46,6 @@
 import org.apache.kafka.common.acl.AclBinding;
 import org.apache.kafka.common.acl.AclBindingFilter;
 import org.apache.kafka.common.annotation.InterfaceStability;
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigResource;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.AuthenticationException;
@@ -362,7 +361,7 @@ static KafkaAdminClient createInternal(AdminClientConfig 
config, TimeoutProcesso
                 config.getInt(AdminClientConfig.SEND_BUFFER_CONFIG),
                 config.getInt(AdminClientConfig.RECEIVE_BUFFER_CONFIG),
                 (int) TimeUnit.HOURS.toMillis(1),
-                
ClientDnsLookup.forConfig(config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)),
+                
ClientDnsLookup.forConfig(config.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG)),
                 time,
                 true,
                 apiVersions,
@@ -414,7 +413,8 @@ private KafkaAdminClient(AdminClientConfig config,
         this.time = time;
         this.metadataManager = metadataManager;
         List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(
-            config.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG));
+            config.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+            config.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG));
         metadataManager.update(Cluster.bootstrap(addresses), 
time.milliseconds());
         this.metrics = metrics;
         this.client = client;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index a1c9dc250aa..795a762a494 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -16,9 +16,9 @@
  */
 package org.apache.kafka.clients.consumer;
 
+import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
@@ -90,6 +90,9 @@
      */
     public static final String BOOTSTRAP_SERVERS_CONFIG = 
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
 
+    /** <code>client.dns.lookup</code> */
+    public static final String CLIENT_DNS_LOOKUP_CONFIG = 
CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG;
+
     /**
      * <code>enable.auto.commit</code>
      */
@@ -258,7 +261,7 @@
             " return the LSO";
 
     public static final String DEFAULT_ISOLATION_LEVEL = 
IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT);
-    
+
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG,
                                         Type.LIST,
@@ -266,6 +269,14 @@
                                         new ConfigDef.NonNullValidator(),
                                         Importance.HIGH,
                                         
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
+                                .define(CLIENT_DNS_LOOKUP_CONFIG,
+                                        Type.STRING,
+                                        ClientDnsLookup.DEFAULT.toString(),
+                                        in(ClientDnsLookup.DEFAULT.toString(),
+                                           
ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
+                                           
ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
+                                        Importance.MEDIUM,
+                                        
CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
                                 .define(GROUP_ID_CONFIG, Type.STRING, "", 
Importance.HIGH, GROUP_ID_DOC)
                                 .define(SESSION_TIMEOUT_MS_CONFIG,
                                         Type.INT,
@@ -453,12 +464,6 @@
                                         
in(IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT), 
IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT)),
                                         Importance.MEDIUM,
                                         ISOLATION_LEVEL_DOC)
-                                
.define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
-                                        Type.STRING,
-                                        ClientDnsLookup.DEFAULT.toString(),
-                                        in(ClientDnsLookup.DEFAULT.toString(), 
ClientDnsLookup.USE_ALL_DNS_IPS.toString()),
-                                        Importance.MEDIUM,
-                                        
CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
                                 // security support
                                 
.define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
                                         Type.STRING,
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index e79ff07cff0..4061373c450 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -17,8 +17,8 @@
 package org.apache.kafka.clients.consumer;
 
 import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.ClientUtils;
-import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator;
@@ -36,7 +36,6 @@
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.errors.InterruptException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
@@ -710,8 +709,10 @@ private KafkaConsumer(ConsumerConfig config,
             ClusterResourceListeners clusterResourceListeners = 
configureClusterResourceListeners(keyDeserializer, valueDeserializer, 
reporters, interceptorList);
             this.metadata = new Metadata(retryBackoffMs, 
config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG),
                     true, false, clusterResourceListeners);
-            List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG));
-            this.metadata.update(Cluster.bootstrap(addresses), 
Collections.emptySet(), 0);
+            List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(
+                    config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
+                    config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG));
+            this.metadata.update(Cluster.bootstrap(addresses), 
Collections.<String>emptySet(), 0);
             String metricGrpPrefix = "consumer";
             ConsumerMetrics metricsRegistry = new 
ConsumerMetrics(metricsTags.keySet(), "consumer");
             ChannelBuilder channelBuilder = 
ClientUtils.createChannelBuilder(config);
@@ -732,7 +733,7 @@ private KafkaConsumer(ConsumerConfig config,
                     config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG),
                     config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG),
                     config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG),
-                    
ClientDnsLookup.forConfig(config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)),
+                    
ClientDnsLookup.forConfig(config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG)),
                     time,
                     true,
                     new ApiVersions(),
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 465e60f449c..c68a014619a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -28,8 +28,8 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.ClientUtils;
-import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.NetworkClient;
@@ -49,7 +49,6 @@
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.ApiException;
 import org.apache.kafka.common.errors.AuthenticationException;
@@ -407,7 +406,9 @@ public KafkaProducer(Properties properties, Serializer<K> 
keySerializer, Seriali
                     apiVersions,
                     transactionManager,
                     new BufferPool(this.totalMemorySize, 
config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, 
PRODUCER_METRIC_GROUP_NAME));
-            List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));
+            List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(
+                    config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
+                    config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG));
             if (metadata != null) {
                 this.metadata = metadata;
             } else {
@@ -449,7 +450,7 @@ Sender newSender(LogContext logContext, KafkaClient 
kafkaClient, Metadata metada
                 producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG),
                 producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG),
                 requestTimeoutMs,
-                
ClientDnsLookup.forConfig(producerConfig.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)),
+                
ClientDnsLookup.forConfig(producerConfig.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG)),
                 time,
                 true,
                 apiVersions,
@@ -496,7 +497,9 @@ private static int configureDeliveryTimeout(ProducerConfig 
config, Logger log) {
     }
 
     private static TransactionManager configureTransactionState(ProducerConfig 
config, LogContext logContext, Logger log) {
+
         TransactionManager transactionManager = null;
+
         boolean userConfiguredIdempotence = false;
         if 
(config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG))
             userConfiguredIdempotence = true;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index f08159d2e37..c63477d97dc 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -16,10 +16,10 @@
  */
 package org.apache.kafka.clients.producer;
 
+import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
@@ -52,6 +52,9 @@
     /** <code>bootstrap.servers</code> */
     public static final String BOOTSTRAP_SERVERS_CONFIG = 
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
 
+    /** <code>client.dns.lookup</code> */
+    public static final String CLIENT_DNS_LOOKUP_CONFIG = 
CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG;
+
     /** <code>metadata.max.age.ms</code> */
     public static final String METADATA_MAX_AGE_CONFIG = 
CommonClientConfigs.METADATA_MAX_AGE_CONFIG;
     private static final String METADATA_MAX_AGE_DOC = 
CommonClientConfigs.METADATA_MAX_AGE_DOC;
@@ -239,6 +242,14 @@
 
     static {
         CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, 
Collections.emptyList(), new ConfigDef.NonNullValidator(), Importance.HIGH, 
CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
+                                .define(CLIENT_DNS_LOOKUP_CONFIG,
+                                        Type.STRING,
+                                        ClientDnsLookup.DEFAULT.toString(),
+                                        in(ClientDnsLookup.DEFAULT.toString(),
+                                           
ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
+                                           
ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
+                                        Importance.MEDIUM,
+                                        
CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
                                 .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 
1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC)
                                 .define(RETRIES_CONFIG, Type.INT, 
Integer.MAX_VALUE, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC)
                                 .define(ACKS_CONFIG,
@@ -347,13 +358,7 @@
                                         null,
                                         new ConfigDef.NonEmptyString(),
                                         Importance.LOW,
-                                        TRANSACTIONAL_ID_DOC)
-                                
.define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
-                                        Type.STRING,
-                                        ClientDnsLookup.DEFAULT.toString(),
-                                        in(ClientDnsLookup.DEFAULT.toString(), 
ClientDnsLookup.USE_ALL_DNS_IPS.toString()),
-                                        Importance.MEDIUM,
-                                        
CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC);
+                                        TRANSACTIONAL_ID_DOC);
     }
 
     @Override
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java 
b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
index bea464f14a5..35f52a93c1c 100644
--- a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java
@@ -16,7 +16,6 @@
  */
 package org.apache.kafka.clients;
 
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigException;
 import org.junit.Test;
 import static org.junit.Assert.assertEquals;
@@ -31,27 +30,46 @@
 
 public class ClientUtilsTest {
 
+
     @Test
-    public void testParseAndValidateAddresses() {
-        check("127.0.0.1:8000");
-        check("mydomain.com:8080");
-        check("[::1]:8000");
-        check("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:1234", 
"mydomain.com:10000");
-        List<InetSocketAddress> validatedAddresses = 
check("some.invalid.hostname.foo.bar.local:9999", "mydomain.com:10000");
+    public void testParseAndValidateAddresses() throws UnknownHostException {
+        checkWithoutLookup("127.0.0.1:8000");
+        checkWithoutLookup("localhost:8080");
+        checkWithoutLookup("[::1]:8000");
+        checkWithoutLookup("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:1234", 
"localhost:10000");
+        List<InetSocketAddress> validatedAddresses = 
checkWithoutLookup("localhost:10000");
         assertEquals(1, validatedAddresses.size());
         InetSocketAddress onlyAddress = validatedAddresses.get(0);
-        assertEquals("mydomain.com", onlyAddress.getHostName());
+        assertEquals("localhost", onlyAddress.getHostName());
         assertEquals(10000, onlyAddress.getPort());
     }
 
+    @Test
+    public void testParseAndValidateAddressesWithReverseLookup() {
+        checkWithoutLookup("127.0.0.1:8000");
+        checkWithoutLookup("localhost:8080");
+        checkWithoutLookup("[::1]:8000");
+        checkWithoutLookup("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:1234", 
"localhost:10000");
+        List<InetSocketAddress> validatedAddresses = 
checkWithLookup(Arrays.asList("example.com:10000"));
+        assertEquals(2, validatedAddresses.size());
+        InetSocketAddress address = validatedAddresses.get(0);
+        assertEquals("93.184.216.34", address.getHostName());
+        assertEquals(10000, address.getPort());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testInvalidConfig() {
+        
ClientUtils.parseAndValidateAddresses(Arrays.asList("localhost:10000"), 
"random.value");
+    }
+
     @Test(expected = ConfigException.class)
     public void testNoPort() {
-        check("127.0.0.1");
+        checkWithoutLookup("127.0.0.1");
     }
 
     @Test(expected = ConfigException.class)
     public void testOnlyBadHostname() {
-        check("some.invalid.hostname.foo.bar.local:9999");
+        checkWithoutLookup("some.invalid.hostname.foo.bar.local:9999");
     }
 
     @Test
@@ -87,7 +105,12 @@ public void testResolveDnsLookupAllIps() throws 
UnknownHostException {
         assertEquals(2, ClientUtils.resolve("kafka.apache.org", 
ClientDnsLookup.USE_ALL_DNS_IPS).size());
     }
 
-    private List<InetSocketAddress> check(String... url) {
-        return ClientUtils.parseAndValidateAddresses(Arrays.asList(url));
+    private List<InetSocketAddress> checkWithoutLookup(String... url) {
+        return ClientUtils.parseAndValidateAddresses(Arrays.asList(url), 
ClientDnsLookup.DEFAULT.toString());
+    }
+
+    private List<InetSocketAddress> checkWithLookup(List<String> url) {
+        return ClientUtils.parseAndValidateAddresses(url, 
ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString());
     }
+
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
index d4a2a55dd4b..23edaa999ef 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
@@ -27,7 +27,6 @@
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.errors.AuthenticationException;
 import org.apache.kafka.common.utils.MockTime;
 import org.junit.Before;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index aaf827feee3..8abe9a40085 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -19,7 +19,6 @@
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.network.NetworkReceive;
 import org.apache.kafka.common.protocol.ApiKeys;
 import org.apache.kafka.common.protocol.CommonFields;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 42deee7c204..94d8d5b1afb 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients.consumer.internals;
 
 import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.FetchSessionHandler;
@@ -36,7 +37,6 @@
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.errors.InvalidTopicException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.SerializationException;
@@ -2735,7 +2735,7 @@ private void testGetOffsetsForTimesWithError(Errors 
errorForP0,
         String topicName2 = "topic2";
         TopicPartition t2p0 = new TopicPartition(topicName2, 0);
         // Expect a metadata refresh.
-        
metadata.update(Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(Collections.singletonList("1.1.1.1:1111"))),
+        
metadata.update(Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(Collections.singletonList("1.1.1.1:1111"),
 ClientDnsLookup.DEFAULT.toString())),
                         Collections.<String>emptySet(),
                         time.milliseconds());
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 6d4d78cb70f..23ca2aeaed7 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -32,6 +32,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.Metadata;
 import org.apache.kafka.clients.MockClient;
@@ -44,7 +45,6 @@
 import org.apache.kafka.common.MetricNameTemplate;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.NetworkException;
 import org.apache.kafka.common.errors.OutOfOrderSequenceException;
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index aac9fb24441..be3a70991f0 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -16,9 +16,9 @@
  */
 package org.apache.kafka.connect.runtime;
 
+import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.common.config.AbstractConfig;
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
@@ -61,6 +61,9 @@
             + "than one, though, in case a server is down).";
     public static final String BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092";
 
+    public static final String CLIENT_DNS_LOOKUP_CONFIG = 
CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG;
+    public static final String CLIENT_DNS_LOOKUP_DOC = 
CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC;
+
     public static final String KEY_CONVERTER_CLASS_CONFIG = "key.converter";
     public static final String KEY_CONVERTER_CLASS_DOC =
             "Converter class used to convert between Kafka Connect format and 
the serialized form that is written to Kafka." +
@@ -223,6 +226,14 @@ protected static ConfigDef baseConfigDef() {
         return new ConfigDef()
                 .define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, 
BOOTSTRAP_SERVERS_DEFAULT,
                         Importance.HIGH, BOOTSTRAP_SERVERS_DOC)
+                .define(CLIENT_DNS_LOOKUP_CONFIG,
+                        Type.STRING,
+                        ClientDnsLookup.DEFAULT.toString(),
+                        in(ClientDnsLookup.DEFAULT.toString(),
+                           ClientDnsLookup.USE_ALL_DNS_IPS.toString(),
+                           
ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()),
+                        Importance.MEDIUM,
+                        CLIENT_DNS_LOOKUP_DOC)
                 .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS,
                         Importance.HIGH, KEY_CONVERTER_CLASS_DOC)
                 .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS,
@@ -278,13 +289,7 @@ protected static ConfigDef baseConfigDef() {
                         Collections.emptyList(),
                         Importance.LOW, CONFIG_PROVIDERS_DOC)
                 .define(REST_EXTENSION_CLASSES_CONFIG, Type.LIST, "",
-                        Importance.LOW, REST_EXTENSION_CLASSES_DOC)
-                .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
-                        Type.STRING,
-                        ClientDnsLookup.DEFAULT.toString(),
-                        in(ClientDnsLookup.DEFAULT.toString(), 
ClientDnsLookup.USE_ALL_DNS_IPS.toString()),
-                        Importance.MEDIUM,
-                        CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC);    
+                        Importance.LOW, REST_EXTENSION_CLASSES_DOC);
     }
 
     private void logInternalConverterDeprecationWarnings(Map<String, String> 
props) {
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index de8e8b27b74..5725ff5f554 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.runtime.distributed;
 
 import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.Metadata;
@@ -24,7 +25,6 @@
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.metrics.JmxReporter;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -96,7 +96,9 @@ public WorkerGroupMember(DistributedConfig config,
             this.metrics = new Metrics(metricConfig, reporters, time);
             this.retryBackoffMs = 
config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG);
             this.metadata = new Metadata(retryBackoffMs, 
config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG), true);
-            List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG));
+            List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(
+                    
config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
+                    
config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG));
             this.metadata.update(Cluster.bootstrap(addresses), 
Collections.<String>emptySet(), 0);
             String metricGrpPrefix = "connect";
             ChannelBuilder channelBuilder = 
ClientUtils.createChannelBuilder(config);
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala 
b/core/src/main/scala/kafka/admin/AdminClient.scala
index 5876b6ec460..aaa09035b50 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -23,6 +23,7 @@ import kafka.coordinator.group.GroupOverview
 import kafka.utils.Logging
 import org.apache.kafka.clients._
 import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, 
ConsumerProtocol, RequestFuture}
+import org.apache.kafka.common.config.ConfigDef.ValidString._
 import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef}
 import org.apache.kafka.common.errors.{AuthenticationException, 
TimeoutException}
@@ -39,7 +40,6 @@ import org.apache.kafka.common.{Cluster, Node, TopicPartition}
 
 import scala.collection.JavaConverters._
 import scala.util.{Failure, Success, Try}
-import org.apache.kafka.common.config.ClientDnsLookup
 
 /**
   * A Scala administrative client for Kafka which supports managing and 
inspecting topics, brokers,
@@ -386,6 +386,14 @@ object AdminClient {
         Type.LIST,
         Importance.HIGH,
         CommonClientConfigs.BOOTSTRAP_SERVERS_DOC)
+      .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG,
+        Type.STRING,
+        ClientDnsLookup.DEFAULT.toString,
+        in(ClientDnsLookup.DEFAULT.toString,
+           ClientDnsLookup.USE_ALL_DNS_IPS.toString,
+           ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString),
+        Importance.MEDIUM,
+        CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC)
       .define(
         CommonClientConfigs.SECURITY_PROTOCOL_CONFIG,
         ConfigDef.Type.STRING,
@@ -429,7 +437,8 @@ object AdminClient {
     val retryBackoffMs = 
config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG)
 
     val brokerUrls = 
config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)
-    val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls)
+    val clientDnsLookup = 
config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)
+    val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls, 
clientDnsLookup)
     val bootstrapCluster = Cluster.bootstrap(brokerAddresses)
     metadata.update(bootstrapCluster, Collections.emptySet(), 0)
 
diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 86b6f94bdd3..7002219efd2 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -39,8 +39,6 @@ import org.apache.kafka.common.{KafkaException, Node, 
TopicPartition}
 import scala.collection.JavaConverters._
 import scala.collection.mutable.HashMap
 import scala.collection.{Set, mutable}
-import org.apache.kafka.common.config.ClientDnsLookup
-
 
 object ControllerChannelManager {
   val QueueSizeMetricName = "QueueSize"
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index d0e765c5a41..bd25d94e916 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -36,7 +36,6 @@ import java.util.concurrent.{BlockingQueue, 
ConcurrentHashMap, LinkedBlockingQue
 
 import collection.JavaConverters._
 import scala.collection.{concurrent, immutable}
-import org.apache.kafka.common.config.ClientDnsLookup
 
 object TransactionMarkerChannelManager {
   def apply(config: KafkaConfig,
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 841ea8272fa..bef0663c26f 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -37,7 +37,7 @@ import kafka.security.CredentialProvider
 import kafka.security.auth.Authorizer
 import kafka.utils._
 import kafka.zk.{BrokerInfo, KafkaZkClient}
-import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, 
NetworkClient, NetworkClientUtils}
+import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, 
ManualMetadataUpdater, NetworkClient, NetworkClientUtils}
 import org.apache.kafka.common.internals.ClusterResourceListeners
 import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _}
 import org.apache.kafka.common.network._
@@ -51,7 +51,6 @@ import org.apache.kafka.common.{ClusterResource, Node}
 
 import scala.collection.JavaConverters._
 import scala.collection.{Map, Seq, mutable}
-import org.apache.kafka.common.config.ClientDnsLookup
 
 object KafkaServer {
   // Copy the subset of properties that are relevant to Logs
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala 
b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
index d15fdae6b97..4e642f3cda2 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala
@@ -30,7 +30,6 @@ import org.apache.kafka.common.Node
 import org.apache.kafka.common.requests.AbstractRequest.Builder
 
 import scala.collection.JavaConverters._
-import org.apache.kafka.common.config.ClientDnsLookup
 
 trait BlockingSend {
 
diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala 
b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
index 7871015f3fb..1f87d7acfec 100644
--- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
+++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala
@@ -43,7 +43,6 @@ import org.apache.kafka.common.utils.{LogContext, Time}
 import org.apache.kafka.common.{Node, TopicPartition}
 
 import scala.collection.JavaConverters._
-import org.apache.kafka.common.config.ClientDnsLookup
 
 /**
  * For verifying the consistency among replicas.
diff --git 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
index 74ab234cdbe..9f15696fe71 100644
--- 
a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
+++ 
b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
@@ -20,8 +20,8 @@
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.kafka.clients.ApiVersions;
+import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.ClientUtils;
-import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.ManualMetadataUpdater;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.NetworkClientUtils;
@@ -30,7 +30,6 @@
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
-import org.apache.kafka.common.config.ClientDnsLookup;
 import org.apache.kafka.common.internals.KafkaFutureImpl;
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.network.ChannelBuilder;
@@ -127,7 +126,8 @@ public void run() {
                 WorkerUtils.addConfigsToProperties(props, 
spec.commonClientConf(), spec.commonClientConf());
                 AdminClientConfig conf = new AdminClientConfig(props);
                 List<InetSocketAddress> addresses = 
ClientUtils.parseAndValidateAddresses(
-                    conf.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG));
+                        
conf.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG),
+                        
conf.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG));
                 ManualMetadataUpdater updater = new 
ManualMetadataUpdater(Cluster.bootstrap(addresses).nodes());
                 while (true) {
                     if (doneFuture.isDone()) {
@@ -182,7 +182,7 @@ private boolean attemptConnection(AdminClientConfig conf,
                                     4096,
                                     4096,
                                     1000,
-                                    
ClientDnsLookup.forConfig(conf.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)),
+                                    
ClientDnsLookup.forConfig(conf.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG)),
                                     Time.SYSTEM,
                                     false,
                                     new ApiVersions(),


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> DNS alias support for secured connections
> -----------------------------------------
>
>                 Key: KAFKA-6195
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6195
>             Project: Kafka
>          Issue Type: Improvement
>          Components: clients
>            Reporter: Jonathan Skrzypek
>            Priority: Major
>
> It seems clients can't use a dns alias in front of a secured Kafka cluster.
> So applications can only specify a list of hosts or IPs in bootstrap.servers 
> instead of an alias encompassing all cluster nodes.
> Using an alias in bootstrap.servers results in the following error : 
> javax.security.sasl.SaslException: An error: 
> (java.security.PrivilegedActionException: javax.security.sasl.SaslException: 
> GSS initiate failed [Caused by GSSException: No valid credentials provided 
> (Mechanism level: Fail to create credential. (63) - No service creds)]) 
> occurred when evaluating SASL token received from the Kafka Broker. Kafka 
> Client will go to AUTH_FAILED state. [Caused by 
> javax.security.sasl.SaslException: GSS initiate failed [Caused by 
> GSSException: No valid credentials provided (Mechanism level: Fail to create 
> credential. (63) - No service creds)]]
> When using SASL/Kerberos authentication, the kafka server principal is of the 
> form kafka@kafka/broker1.hostname....@example.com
> Kerberos requires that the hosts can be resolved by their FQDNs.
> During SASL handshake, the client will create a SASL token and then send it 
> to kafka for auth.
> But to create a SASL token the client first needs to be able to validate that 
> the broker's kerberos is a valid one.
> There are 3 potential options :
> 1. Creating a single kerberos principal not linked to a host but to an alias 
> and reference it in the broker jaas file.
> But I think the kerberos infrastructure would refuse to validate it, so the 
> SASL handshake would still fail
> 2. Modify the client bootstrap mechanism to detect whether bootstrap.servers 
> contains a dns alias. If it does, resolve and expand the alias to retrieve 
> all hostnames behind it and add them to the list of nodes.
> This could be done by modifying parseAndValidateAddresses() in ClientUtils
> 3. Add a cluster.alias parameter that would be handled by the logic above. 
> Having another parameter to avoid confusion on how bootstrap.servers works 
> behind the scene.
> Thoughts ?
> I would be happy to contribute the change for any of the options.
> I believe the ability to use a dns alias instead of static lists of brokers 
> would bring good deployment flexibility.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to