[ https://issues.apache.org/jira/browse/KAFKA-6195?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16649110#comment-16649110 ]
ASF GitHub Bot commented on KAFKA-6195: --------------------------------------- rajinisivaram closed pull request #4485: KAFKA-6195: Resolve DNS aliases in bootstrap.server URL: https://github.com/apache/kafka/pull/4485 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/common/config/ClientDnsLookup.java b/clients/src/main/java/org/apache/kafka/clients/ClientDnsLookup.java similarity index 88% rename from clients/src/main/java/org/apache/kafka/common/config/ClientDnsLookup.java rename to clients/src/main/java/org/apache/kafka/clients/ClientDnsLookup.java index 4a013b96ff8..96d47c344a0 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/ClientDnsLookup.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientDnsLookup.java @@ -14,14 +14,15 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.kafka.common.config; +package org.apache.kafka.clients; import java.util.Locale; public enum ClientDnsLookup { DEFAULT("default"), - USE_ALL_DNS_IPS("use_all_dns_ips"); + USE_ALL_DNS_IPS("use_all_dns_ips"), + RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY("resolve_canonical_bootstrap_servers_only"); private String clientDnsLookup; diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java index 1661ea39695..dce5f3fb65c 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java @@ -17,7 +17,6 @@ package org.apache.kafka.clients; import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.common.config.ClientDnsLookup; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.network.ChannelBuilder; @@ -42,10 +41,12 @@ public final class ClientUtils { private static final Logger log = LoggerFactory.getLogger(ClientUtils.class); - private ClientUtils() {} + private ClientUtils() { + } - public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls) { + public static List<InetSocketAddress> parseAndValidateAddresses(List<String> urls, String clientDnsLookup) { List<InetSocketAddress> addresses = new ArrayList<>(); + ClientDnsLookup clientDnsLookupBehaviour = ClientDnsLookup.forConfig(clientDnsLookup); for (String url : urls) { if (url != null && !url.isEmpty()) { try { @@ -54,15 +55,30 @@ private ClientUtils() {} if (host == null || port == null) throw new ConfigException("Invalid url in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url); - InetSocketAddress address = new InetSocketAddress(host, port); - - if (address.isUnresolved()) { - log.warn("Removing server {} from {} as DNS resolution failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host); + if (clientDnsLookupBehaviour == ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY) { + InetAddress[] inetAddresses = InetAddress.getAllByName(host); + for (InetAddress inetAddress : inetAddresses) { + String resolvedCanonicalName = inetAddress.getCanonicalHostName(); + InetSocketAddress address = new InetSocketAddress(resolvedCanonicalName, port); + if (address.isUnresolved()) { + log.warn("Couldn't resolve server {} from {} as DNS resolution of the canonical hostname [} failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, resolvedCanonicalName, host); + } else { + addresses.add(address); + } + } } else { - addresses.add(address); + InetSocketAddress address = new InetSocketAddress(host, port); + if (address.isUnresolved()) { + log.warn("Couldn't resolve server {} from {} as DNS resolution failed for {}", url, CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, host); + } else { + addresses.add(address); + } } + } catch (IllegalArgumentException e) { throw new ConfigException("Invalid port in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url); + } catch (UnknownHostException e) { + throw new ConfigException("Unknown host in " + CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG + ": " + url); } } } diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java index b697de73c85..f198533525b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java @@ -18,7 +18,6 @@ import java.util.concurrent.ThreadLocalRandom; -import org.apache.kafka.common.config.ClientDnsLookup; import org.apache.kafka.common.errors.AuthenticationException; import java.net.InetAddress; diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java index b179f0272e8..c8e2357b0a6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java +++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java @@ -41,6 +41,11 @@ + "discover the full cluster membership (which may change dynamically), this list need not contain the full set of " + "servers (you may want more than one, though, in case a server is down)."; + public static final String CLIENT_DNS_LOOKUP_CONFIG = "client.dns.lookup"; + public static final String CLIENT_DNS_LOOKUP_DOC = "<p>Controls how the client uses DNS lookups.</p><p>If set to <code>use_all_dns_ips</code> then, when the lookup returns multiple IP addresses for a hostname," + + " they will all be attempted to connect to before failing the connection. Applies to both bootstrap and advertised servers.</p>" + + "<p>If the value is <code>resolve_canonical_bootstrap_servers_only</code> each entry will be resolved and expanded into a list of canonical names.</p>"; + public static final String METADATA_MAX_AGE_CONFIG = "metadata.max.age.ms"; public static final String METADATA_MAX_AGE_DOC = "The period of time in milliseconds after which we force a refresh of metadata even if we haven't seen any partition leadership changes to proactively discover any new brokers or partitions."; @@ -93,9 +98,6 @@ + "elapses the client will resend the request if necessary or fail the request if " + "retries are exhausted."; - public static final String CLIENT_DNS_LOOKUP_CONFIG = "client.dns.lookup"; - public static final String CLIENT_DNS_LOOKUP_DOC = "<p>Controls how the client uses DNS lookups.</p><p>If set to <code>use_all_dns_ips</code> then, when the lookup returns multiple IP addresses for a hostname," - + " they will all be attempted to connect to before failing the connection. Applies to both bootstrap and advertised servers.</p>"; /** * Postprocess the configuration so that exponential backoff is disabled when reconnect backoff diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 7ea05f69f3b..c6f0c0b3fb6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -19,7 +19,6 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.config.ClientDnsLookup; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.metrics.Sensor; diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java index a051de29fb7..47c76ac36af 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java @@ -17,9 +17,9 @@ package org.apache.kafka.clients.admin; +import org.apache.kafka.clients.ClientDnsLookup; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.common.config.ClientDnsLookup; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; @@ -43,6 +43,12 @@ public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; private static final String BOOTSTRAP_SERVERS_DOC = CommonClientConfigs.BOOTSTRAP_SERVERS_DOC; + /** + * <code>client.dns.lookup</code> + */ + public static final String CLIENT_DNS_LOOKUP_CONFIG = CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG; + private static final String CLIENT_DNS_LOOKUP_DOC = CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC; + /** * <code>reconnect.backoff.ms</code> */ @@ -159,12 +165,14 @@ in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()), Importance.LOW, METRICS_RECORDING_LEVEL_DOC) - .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG, + .define(CLIENT_DNS_LOOKUP_CONFIG, Type.STRING, ClientDnsLookup.DEFAULT.toString(), - in(ClientDnsLookup.DEFAULT.toString(), ClientDnsLookup.USE_ALL_DNS_IPS.toString()), + in(ClientDnsLookup.DEFAULT.toString(), + ClientDnsLookup.USE_ALL_DNS_IPS.toString(), + ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()), Importance.MEDIUM, - CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC) + CLIENT_DNS_LOOKUP_DOC) // security support .define(SECURITY_PROTOCOL_CONFIG, Type.STRING, diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 86e14476625..c8418c173f4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -18,10 +18,10 @@ package org.apache.kafka.clients.admin; import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientDnsLookup; import org.apache.kafka.clients.ClientRequest; import org.apache.kafka.clients.ClientResponse; import org.apache.kafka.clients.ClientUtils; -import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.StaleMetadataException; @@ -46,7 +46,6 @@ import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.acl.AclBindingFilter; import org.apache.kafka.common.annotation.InterfaceStability; -import org.apache.kafka.common.config.ClientDnsLookup; import org.apache.kafka.common.config.ConfigResource; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.AuthenticationException; @@ -362,7 +361,7 @@ static KafkaAdminClient createInternal(AdminClientConfig config, TimeoutProcesso config.getInt(AdminClientConfig.SEND_BUFFER_CONFIG), config.getInt(AdminClientConfig.RECEIVE_BUFFER_CONFIG), (int) TimeUnit.HOURS.toMillis(1), - ClientDnsLookup.forConfig(config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)), + ClientDnsLookup.forConfig(config.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG)), time, true, apiVersions, @@ -414,7 +413,8 @@ private KafkaAdminClient(AdminClientConfig config, this.time = time; this.metadataManager = metadataManager; List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses( - config.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG)); + config.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), + config.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG)); metadataManager.update(Cluster.bootstrap(addresses), time.milliseconds()); this.metrics = metrics; this.client = client; diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index a1c9dc250aa..795a762a494 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -16,9 +16,9 @@ */ package org.apache.kafka.clients.consumer; +import org.apache.kafka.clients.ClientDnsLookup; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.common.config.ClientDnsLookup; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; @@ -90,6 +90,9 @@ */ public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; + /** <code>client.dns.lookup</code> */ + public static final String CLIENT_DNS_LOOKUP_CONFIG = CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG; + /** * <code>enable.auto.commit</code> */ @@ -258,7 +261,7 @@ " return the LSO"; public static final String DEFAULT_ISOLATION_LEVEL = IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT); - + static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, @@ -266,6 +269,14 @@ new ConfigDef.NonNullValidator(), Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC) + .define(CLIENT_DNS_LOOKUP_CONFIG, + Type.STRING, + ClientDnsLookup.DEFAULT.toString(), + in(ClientDnsLookup.DEFAULT.toString(), + ClientDnsLookup.USE_ALL_DNS_IPS.toString(), + ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()), + Importance.MEDIUM, + CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC) .define(GROUP_ID_CONFIG, Type.STRING, "", Importance.HIGH, GROUP_ID_DOC) .define(SESSION_TIMEOUT_MS_CONFIG, Type.INT, @@ -453,12 +464,6 @@ in(IsolationLevel.READ_COMMITTED.toString().toLowerCase(Locale.ROOT), IsolationLevel.READ_UNCOMMITTED.toString().toLowerCase(Locale.ROOT)), Importance.MEDIUM, ISOLATION_LEVEL_DOC) - .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG, - Type.STRING, - ClientDnsLookup.DEFAULT.toString(), - in(ClientDnsLookup.DEFAULT.toString(), ClientDnsLookup.USE_ALL_DNS_IPS.toString()), - Importance.MEDIUM, - CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC) // security support .define(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, Type.STRING, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index e79ff07cff0..4061373c450 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -17,8 +17,8 @@ package org.apache.kafka.clients.consumer; import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientDnsLookup; import org.apache.kafka.clients.ClientUtils; -import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.consumer.internals.ConsumerCoordinator; @@ -36,7 +36,6 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.config.ClientDnsLookup; import org.apache.kafka.common.errors.InterruptException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.internals.ClusterResourceListeners; @@ -710,8 +709,10 @@ private KafkaConsumer(ConsumerConfig config, ClusterResourceListeners clusterResourceListeners = configureClusterResourceListeners(keyDeserializer, valueDeserializer, reporters, interceptorList); this.metadata = new Metadata(retryBackoffMs, config.getLong(ConsumerConfig.METADATA_MAX_AGE_CONFIG), true, false, clusterResourceListeners); - List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)); - this.metadata.update(Cluster.bootstrap(addresses), Collections.emptySet(), 0); + List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses( + config.getList(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG), + config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG)); + this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0); String metricGrpPrefix = "consumer"; ConsumerMetrics metricsRegistry = new ConsumerMetrics(metricsTags.keySet(), "consumer"); ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config); @@ -732,7 +733,7 @@ private KafkaConsumer(ConsumerConfig config, config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG), config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG), config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), - ClientDnsLookup.forConfig(config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)), + ClientDnsLookup.forConfig(config.getString(ConsumerConfig.CLIENT_DNS_LOOKUP_CONFIG)), time, true, new ApiVersions(), diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 465e60f449c..c68a014619a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -28,8 +28,8 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientDnsLookup; import org.apache.kafka.clients.ClientUtils; -import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.KafkaClient; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.NetworkClient; @@ -49,7 +49,6 @@ import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.config.ClientDnsLookup; import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.errors.ApiException; import org.apache.kafka.common.errors.AuthenticationException; @@ -407,7 +406,9 @@ public KafkaProducer(Properties properties, Serializer<K> keySerializer, Seriali apiVersions, transactionManager, new BufferPool(this.totalMemorySize, config.getInt(ProducerConfig.BATCH_SIZE_CONFIG), metrics, time, PRODUCER_METRIC_GROUP_NAME)); - List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)); + List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses( + config.getList(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG), + config.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG)); if (metadata != null) { this.metadata = metadata; } else { @@ -449,7 +450,7 @@ Sender newSender(LogContext logContext, KafkaClient kafkaClient, Metadata metada producerConfig.getInt(ProducerConfig.SEND_BUFFER_CONFIG), producerConfig.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), requestTimeoutMs, - ClientDnsLookup.forConfig(producerConfig.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)), + ClientDnsLookup.forConfig(producerConfig.getString(ProducerConfig.CLIENT_DNS_LOOKUP_CONFIG)), time, true, apiVersions, @@ -496,7 +497,9 @@ private static int configureDeliveryTimeout(ProducerConfig config, Logger log) { } private static TransactionManager configureTransactionState(ProducerConfig config, LogContext logContext, Logger log) { + TransactionManager transactionManager = null; + boolean userConfiguredIdempotence = false; if (config.originals().containsKey(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG)) userConfiguredIdempotence = true; diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index f08159d2e37..c63477d97dc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -16,10 +16,10 @@ */ package org.apache.kafka.clients.producer; +import org.apache.kafka.clients.ClientDnsLookup; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.common.config.ClientDnsLookup; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; @@ -52,6 +52,9 @@ /** <code>bootstrap.servers</code> */ public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; + /** <code>client.dns.lookup</code> */ + public static final String CLIENT_DNS_LOOKUP_CONFIG = CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG; + /** <code>metadata.max.age.ms</code> */ public static final String METADATA_MAX_AGE_CONFIG = CommonClientConfigs.METADATA_MAX_AGE_CONFIG; private static final String METADATA_MAX_AGE_DOC = CommonClientConfigs.METADATA_MAX_AGE_DOC; @@ -239,6 +242,14 @@ static { CONFIG = new ConfigDef().define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, Collections.emptyList(), new ConfigDef.NonNullValidator(), Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC) + .define(CLIENT_DNS_LOOKUP_CONFIG, + Type.STRING, + ClientDnsLookup.DEFAULT.toString(), + in(ClientDnsLookup.DEFAULT.toString(), + ClientDnsLookup.USE_ALL_DNS_IPS.toString(), + ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()), + Importance.MEDIUM, + CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC) .define(BUFFER_MEMORY_CONFIG, Type.LONG, 32 * 1024 * 1024L, atLeast(0L), Importance.HIGH, BUFFER_MEMORY_DOC) .define(RETRIES_CONFIG, Type.INT, Integer.MAX_VALUE, between(0, Integer.MAX_VALUE), Importance.HIGH, RETRIES_DOC) .define(ACKS_CONFIG, @@ -347,13 +358,7 @@ null, new ConfigDef.NonEmptyString(), Importance.LOW, - TRANSACTIONAL_ID_DOC) - .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG, - Type.STRING, - ClientDnsLookup.DEFAULT.toString(), - in(ClientDnsLookup.DEFAULT.toString(), ClientDnsLookup.USE_ALL_DNS_IPS.toString()), - Importance.MEDIUM, - CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC); + TRANSACTIONAL_ID_DOC); } @Override diff --git a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java index bea464f14a5..35f52a93c1c 100644 --- a/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/ClientUtilsTest.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.clients; -import org.apache.kafka.common.config.ClientDnsLookup; import org.apache.kafka.common.config.ConfigException; import org.junit.Test; import static org.junit.Assert.assertEquals; @@ -31,27 +30,46 @@ public class ClientUtilsTest { + @Test - public void testParseAndValidateAddresses() { - check("127.0.0.1:8000"); - check("mydomain.com:8080"); - check("[::1]:8000"); - check("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:1234", "mydomain.com:10000"); - List<InetSocketAddress> validatedAddresses = check("some.invalid.hostname.foo.bar.local:9999", "mydomain.com:10000"); + public void testParseAndValidateAddresses() throws UnknownHostException { + checkWithoutLookup("127.0.0.1:8000"); + checkWithoutLookup("localhost:8080"); + checkWithoutLookup("[::1]:8000"); + checkWithoutLookup("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:1234", "localhost:10000"); + List<InetSocketAddress> validatedAddresses = checkWithoutLookup("localhost:10000"); assertEquals(1, validatedAddresses.size()); InetSocketAddress onlyAddress = validatedAddresses.get(0); - assertEquals("mydomain.com", onlyAddress.getHostName()); + assertEquals("localhost", onlyAddress.getHostName()); assertEquals(10000, onlyAddress.getPort()); } + @Test + public void testParseAndValidateAddressesWithReverseLookup() { + checkWithoutLookup("127.0.0.1:8000"); + checkWithoutLookup("localhost:8080"); + checkWithoutLookup("[::1]:8000"); + checkWithoutLookup("[2001:db8:85a3:8d3:1319:8a2e:370:7348]:1234", "localhost:10000"); + List<InetSocketAddress> validatedAddresses = checkWithLookup(Arrays.asList("example.com:10000")); + assertEquals(2, validatedAddresses.size()); + InetSocketAddress address = validatedAddresses.get(0); + assertEquals("93.184.216.34", address.getHostName()); + assertEquals(10000, address.getPort()); + } + + @Test(expected = IllegalArgumentException.class) + public void testInvalidConfig() { + ClientUtils.parseAndValidateAddresses(Arrays.asList("localhost:10000"), "random.value"); + } + @Test(expected = ConfigException.class) public void testNoPort() { - check("127.0.0.1"); + checkWithoutLookup("127.0.0.1"); } @Test(expected = ConfigException.class) public void testOnlyBadHostname() { - check("some.invalid.hostname.foo.bar.local:9999"); + checkWithoutLookup("some.invalid.hostname.foo.bar.local:9999"); } @Test @@ -87,7 +105,12 @@ public void testResolveDnsLookupAllIps() throws UnknownHostException { assertEquals(2, ClientUtils.resolve("kafka.apache.org", ClientDnsLookup.USE_ALL_DNS_IPS).size()); } - private List<InetSocketAddress> check(String... url) { - return ClientUtils.parseAndValidateAddresses(Arrays.asList(url)); + private List<InetSocketAddress> checkWithoutLookup(String... url) { + return ClientUtils.parseAndValidateAddresses(Arrays.asList(url), ClientDnsLookup.DEFAULT.toString()); + } + + private List<InetSocketAddress> checkWithLookup(List<String> url) { + return ClientUtils.parseAndValidateAddresses(url, ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()); } + } diff --git a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java index d4a2a55dd4b..23edaa999ef 100644 --- a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java @@ -27,7 +27,6 @@ import java.net.InetAddress; import java.net.UnknownHostException; -import org.apache.kafka.common.config.ClientDnsLookup; import org.apache.kafka.common.errors.AuthenticationException; import org.apache.kafka.common.utils.MockTime; import org.junit.Before; diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index aaf827feee3..8abe9a40085 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -19,7 +19,6 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.config.ClientDnsLookup; import org.apache.kafka.common.network.NetworkReceive; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.CommonFields; diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 42deee7c204..94d8d5b1afb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.clients.consumer.internals; import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientDnsLookup; import org.apache.kafka.clients.ClientRequest; import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.FetchSessionHandler; @@ -36,7 +37,6 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.config.ClientDnsLookup; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.RecordTooLargeException; import org.apache.kafka.common.errors.SerializationException; @@ -2735,7 +2735,7 @@ private void testGetOffsetsForTimesWithError(Errors errorForP0, String topicName2 = "topic2"; TopicPartition t2p0 = new TopicPartition(topicName2, 0); // Expect a metadata refresh. - metadata.update(Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(Collections.singletonList("1.1.1.1:1111"))), + metadata.update(Cluster.bootstrap(ClientUtils.parseAndValidateAddresses(Collections.singletonList("1.1.1.1:1111"), ClientDnsLookup.DEFAULT.toString())), Collections.<String>emptySet(), time.milliseconds()); diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index 6d4d78cb70f..23ca2aeaed7 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientDnsLookup; import org.apache.kafka.clients.ClientRequest; import org.apache.kafka.clients.Metadata; import org.apache.kafka.clients.MockClient; @@ -44,7 +45,6 @@ import org.apache.kafka.common.MetricNameTemplate; import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.config.ClientDnsLookup; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.NetworkException; import org.apache.kafka.common.errors.OutOfOrderSequenceException; diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java index aac9fb24441..be3a70991f0 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java @@ -16,9 +16,9 @@ */ package org.apache.kafka.connect.runtime; +import org.apache.kafka.clients.ClientDnsLookup; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.common.config.AbstractConfig; -import org.apache.kafka.common.config.ClientDnsLookup; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; @@ -61,6 +61,9 @@ + "than one, though, in case a server is down)."; public static final String BOOTSTRAP_SERVERS_DEFAULT = "localhost:9092"; + public static final String CLIENT_DNS_LOOKUP_CONFIG = CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG; + public static final String CLIENT_DNS_LOOKUP_DOC = CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC; + public static final String KEY_CONVERTER_CLASS_CONFIG = "key.converter"; public static final String KEY_CONVERTER_CLASS_DOC = "Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka." + @@ -223,6 +226,14 @@ protected static ConfigDef baseConfigDef() { return new ConfigDef() .define(BOOTSTRAP_SERVERS_CONFIG, Type.LIST, BOOTSTRAP_SERVERS_DEFAULT, Importance.HIGH, BOOTSTRAP_SERVERS_DOC) + .define(CLIENT_DNS_LOOKUP_CONFIG, + Type.STRING, + ClientDnsLookup.DEFAULT.toString(), + in(ClientDnsLookup.DEFAULT.toString(), + ClientDnsLookup.USE_ALL_DNS_IPS.toString(), + ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString()), + Importance.MEDIUM, + CLIENT_DNS_LOOKUP_DOC) .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, KEY_CONVERTER_CLASS_DOC) .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, @@ -278,13 +289,7 @@ protected static ConfigDef baseConfigDef() { Collections.emptyList(), Importance.LOW, CONFIG_PROVIDERS_DOC) .define(REST_EXTENSION_CLASSES_CONFIG, Type.LIST, "", - Importance.LOW, REST_EXTENSION_CLASSES_DOC) - .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG, - Type.STRING, - ClientDnsLookup.DEFAULT.toString(), - in(ClientDnsLookup.DEFAULT.toString(), ClientDnsLookup.USE_ALL_DNS_IPS.toString()), - Importance.MEDIUM, - CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC); + Importance.LOW, REST_EXTENSION_CLASSES_DOC); } private void logInternalConverterDeprecationWarnings(Map<String, String> props) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java index de8e8b27b74..5725ff5f554 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java @@ -17,6 +17,7 @@ package org.apache.kafka.connect.runtime.distributed; import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientDnsLookup; import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.Metadata; @@ -24,7 +25,6 @@ import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.config.ClientDnsLookup; import org.apache.kafka.common.metrics.JmxReporter; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -96,7 +96,9 @@ public WorkerGroupMember(DistributedConfig config, this.metrics = new Metrics(metricConfig, reporters, time); this.retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG); this.metadata = new Metadata(retryBackoffMs, config.getLong(CommonClientConfigs.METADATA_MAX_AGE_CONFIG), true); - List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses(config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG)); + List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses( + config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG), + config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)); this.metadata.update(Cluster.bootstrap(addresses), Collections.<String>emptySet(), 0); String metricGrpPrefix = "connect"; ChannelBuilder channelBuilder = ClientUtils.createChannelBuilder(config); diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala index 5876b6ec460..aaa09035b50 100644 --- a/core/src/main/scala/kafka/admin/AdminClient.scala +++ b/core/src/main/scala/kafka/admin/AdminClient.scala @@ -23,6 +23,7 @@ import kafka.coordinator.group.GroupOverview import kafka.utils.Logging import org.apache.kafka.clients._ import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, ConsumerProtocol, RequestFuture} +import org.apache.kafka.common.config.ConfigDef.ValidString._ import org.apache.kafka.common.config.ConfigDef.{Importance, Type} import org.apache.kafka.common.config.{AbstractConfig, ConfigDef} import org.apache.kafka.common.errors.{AuthenticationException, TimeoutException} @@ -39,7 +40,6 @@ import org.apache.kafka.common.{Cluster, Node, TopicPartition} import scala.collection.JavaConverters._ import scala.util.{Failure, Success, Try} -import org.apache.kafka.common.config.ClientDnsLookup /** * A Scala administrative client for Kafka which supports managing and inspecting topics, brokers, @@ -386,6 +386,14 @@ object AdminClient { Type.LIST, Importance.HIGH, CommonClientConfigs.BOOTSTRAP_SERVERS_DOC) + .define(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG, + Type.STRING, + ClientDnsLookup.DEFAULT.toString, + in(ClientDnsLookup.DEFAULT.toString, + ClientDnsLookup.USE_ALL_DNS_IPS.toString, + ClientDnsLookup.RESOLVE_CANONICAL_BOOTSTRAP_SERVERS_ONLY.toString), + Importance.MEDIUM, + CommonClientConfigs.CLIENT_DNS_LOOKUP_DOC) .define( CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, ConfigDef.Type.STRING, @@ -429,7 +437,8 @@ object AdminClient { val retryBackoffMs = config.getLong(CommonClientConfigs.RETRY_BACKOFF_MS_CONFIG) val brokerUrls = config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG) - val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls) + val clientDnsLookup = config.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG) + val brokerAddresses = ClientUtils.parseAndValidateAddresses(brokerUrls, clientDnsLookup) val bootstrapCluster = Cluster.bootstrap(brokerAddresses) metadata.update(bootstrapCluster, Collections.emptySet(), 0) diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 86b6f94bdd3..7002219efd2 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -39,8 +39,6 @@ import org.apache.kafka.common.{KafkaException, Node, TopicPartition} import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap import scala.collection.{Set, mutable} -import org.apache.kafka.common.config.ClientDnsLookup - object ControllerChannelManager { val QueueSizeMetricName = "QueueSize" diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala index d0e765c5a41..bd25d94e916 100644 --- a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala +++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala @@ -36,7 +36,6 @@ import java.util.concurrent.{BlockingQueue, ConcurrentHashMap, LinkedBlockingQue import collection.JavaConverters._ import scala.collection.{concurrent, immutable} -import org.apache.kafka.common.config.ClientDnsLookup object TransactionMarkerChannelManager { def apply(config: KafkaConfig, diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 841ea8272fa..bef0663c26f 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -37,7 +37,7 @@ import kafka.security.CredentialProvider import kafka.security.auth.Authorizer import kafka.utils._ import kafka.zk.{BrokerInfo, KafkaZkClient} -import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, NetworkClient, NetworkClientUtils} +import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ManualMetadataUpdater, NetworkClient, NetworkClientUtils} import org.apache.kafka.common.internals.ClusterResourceListeners import org.apache.kafka.common.metrics.{JmxReporter, Metrics, _} import org.apache.kafka.common.network._ @@ -51,7 +51,6 @@ import org.apache.kafka.common.{ClusterResource, Node} import scala.collection.JavaConverters._ import scala.collection.{Map, Seq, mutable} -import org.apache.kafka.common.config.ClientDnsLookup object KafkaServer { // Copy the subset of properties that are relevant to Logs diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala index d15fdae6b97..4e642f3cda2 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherBlockingSend.scala @@ -30,7 +30,6 @@ import org.apache.kafka.common.Node import org.apache.kafka.common.requests.AbstractRequest.Builder import scala.collection.JavaConverters._ -import org.apache.kafka.common.config.ClientDnsLookup trait BlockingSend { diff --git a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala index 7871015f3fb..1f87d7acfec 100644 --- a/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala +++ b/core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala @@ -43,7 +43,6 @@ import org.apache.kafka.common.utils.{LogContext, Time} import org.apache.kafka.common.{Node, TopicPartition} import scala.collection.JavaConverters._ -import org.apache.kafka.common.config.ClientDnsLookup /** * For verifying the consistency among replicas. diff --git a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java index 74ab234cdbe..9f15696fe71 100644 --- a/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java +++ b/tools/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java @@ -20,8 +20,8 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.kafka.clients.ApiVersions; +import org.apache.kafka.clients.ClientDnsLookup; import org.apache.kafka.clients.ClientUtils; -import org.apache.kafka.clients.CommonClientConfigs; import org.apache.kafka.clients.ManualMetadataUpdater; import org.apache.kafka.clients.NetworkClient; import org.apache.kafka.clients.NetworkClientUtils; @@ -30,7 +30,6 @@ import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; -import org.apache.kafka.common.config.ClientDnsLookup; import org.apache.kafka.common.internals.KafkaFutureImpl; import org.apache.kafka.common.metrics.Metrics; import org.apache.kafka.common.network.ChannelBuilder; @@ -127,7 +126,8 @@ public void run() { WorkerUtils.addConfigsToProperties(props, spec.commonClientConf(), spec.commonClientConf()); AdminClientConfig conf = new AdminClientConfig(props); List<InetSocketAddress> addresses = ClientUtils.parseAndValidateAddresses( - conf.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG)); + conf.getList(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG), + conf.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG)); ManualMetadataUpdater updater = new ManualMetadataUpdater(Cluster.bootstrap(addresses).nodes()); while (true) { if (doneFuture.isDone()) { @@ -182,7 +182,7 @@ private boolean attemptConnection(AdminClientConfig conf, 4096, 4096, 1000, - ClientDnsLookup.forConfig(conf.getString(CommonClientConfigs.CLIENT_DNS_LOOKUP_CONFIG)), + ClientDnsLookup.forConfig(conf.getString(AdminClientConfig.CLIENT_DNS_LOOKUP_CONFIG)), Time.SYSTEM, false, new ApiVersions(), ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > DNS alias support for secured connections > ----------------------------------------- > > Key: KAFKA-6195 > URL: https://issues.apache.org/jira/browse/KAFKA-6195 > Project: Kafka > Issue Type: Improvement > Components: clients > Reporter: Jonathan Skrzypek > Priority: Major > > It seems clients can't use a dns alias in front of a secured Kafka cluster. > So applications can only specify a list of hosts or IPs in bootstrap.servers > instead of an alias encompassing all cluster nodes. > Using an alias in bootstrap.servers results in the following error : > javax.security.sasl.SaslException: An error: > (java.security.PrivilegedActionException: javax.security.sasl.SaslException: > GSS initiate failed [Caused by GSSException: No valid credentials provided > (Mechanism level: Fail to create credential. (63) - No service creds)]) > occurred when evaluating SASL token received from the Kafka Broker. Kafka > Client will go to AUTH_FAILED state. [Caused by > javax.security.sasl.SaslException: GSS initiate failed [Caused by > GSSException: No valid credentials provided (Mechanism level: Fail to create > credential. (63) - No service creds)]] > When using SASL/Kerberos authentication, the kafka server principal is of the > form kafka@kafka/broker1.hostname....@example.com > Kerberos requires that the hosts can be resolved by their FQDNs. > During SASL handshake, the client will create a SASL token and then send it > to kafka for auth. > But to create a SASL token the client first needs to be able to validate that > the broker's kerberos is a valid one. > There are 3 potential options : > 1. Creating a single kerberos principal not linked to a host but to an alias > and reference it in the broker jaas file. > But I think the kerberos infrastructure would refuse to validate it, so the > SASL handshake would still fail > 2. Modify the client bootstrap mechanism to detect whether bootstrap.servers > contains a dns alias. If it does, resolve and expand the alias to retrieve > all hostnames behind it and add them to the list of nodes. > This could be done by modifying parseAndValidateAddresses() in ClientUtils > 3. Add a cluster.alias parameter that would be handled by the logic above. > Having another parameter to avoid confusion on how bootstrap.servers works > behind the scene. > Thoughts ? > I would be happy to contribute the change for any of the options. > I believe the ability to use a dns alias instead of static lists of brokers > would bring good deployment flexibility. -- This message was sent by Atlassian JIRA (v7.6.3#76005)