chia7712 commented on code in PR #21080:
URL: https://github.com/apache/kafka/pull/21080#discussion_r2960755692


##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -538,7 +539,22 @@ static KafkaAdminClient createInternal(
                 config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
                 config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG),
                 adminAddresses.usingBootstrapControllers());
-            
metadataManager.update(Cluster.bootstrap(adminAddresses.addresses()), 
time.milliseconds());
+
+            // Get the appropriate bootstrap configuration
+            List<String> bootstrapAddressesToUse = 
adminAddresses.usingBootstrapControllers()
+                ? 
config.getList(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG)
+                : config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+
+            // Create unresolved addresses for bootstrap cluster (defer DNS 
resolution to NetworkClient.poll())
+            List<InetSocketAddress> unresolvedAddresses = new ArrayList<>();
+            for (String address : bootstrapAddressesToUse) {
+                String host = Utils.getHost(address);
+                Integer port = Utils.getPort(address);
+                if (host != null && port != null) {
+                    
unresolvedAddresses.add(InetSocketAddress.createUnresolved(host, port));
+                }
+            }
+            metadataManager.update(Cluster.bootstrap(unresolvedAddresses), 
time.milliseconds());

Review Comment:
   The cluster created by `Cluster.bootstrap` has `isBootstrapConfigured=true`, 
so `bootstrapCluster` will be updated immediately. This makes 
`isBootstrapped()` incorrectly return true



##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -538,7 +539,22 @@ static KafkaAdminClient createInternal(
                 config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
                 config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG),
                 adminAddresses.usingBootstrapControllers());
-            
metadataManager.update(Cluster.bootstrap(adminAddresses.addresses()), 
time.milliseconds());
+
+            // Get the appropriate bootstrap configuration
+            List<String> bootstrapAddressesToUse = 
adminAddresses.usingBootstrapControllers()
+                ? 
config.getList(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG)
+                : config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+
+            // Create unresolved addresses for bootstrap cluster (defer DNS 
resolution to NetworkClient.poll())
+            List<InetSocketAddress> unresolvedAddresses = new ArrayList<>();
+            for (String address : bootstrapAddressesToUse) {
+                String host = Utils.getHost(address);
+                Integer port = Utils.getPort(address);
+                if (host != null && port != null) {
+                    
unresolvedAddresses.add(InetSocketAddress.createUnresolved(host, port));
+                }
+            }
+            metadataManager.update(Cluster.bootstrap(unresolvedAddresses), 
time.milliseconds());

Review Comment:
   Maybe we should leverage `MetadataUpdater#bootstrap` to set the resolved 
bootstrap servers once the DNS resolution succeeds in `NetworkClient`?



##########
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##########
@@ -1171,6 +1197,112 @@ private boolean isTelemetryApi(ApiKeys apiKey) {
         return apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || apiKey == 
ApiKeys.PUSH_TELEMETRY;
     }
 
+    public static class BootstrapConfiguration {
+        public final List<String> bootstrapServers;
+        public final ClientDnsLookup clientDnsLookup;
+        public final long bootstrapResolveTimeoutMs;
+        public final long retryBackoffMs;
+        private final boolean isBootstrapDisabled;
+
+        private BootstrapConfiguration(final List<String> bootstrapServers,
+                                       final ClientDnsLookup clientDnsLookup,
+                                       final long bootstrapResolveTimeoutMs,
+                                       final long retryBackoffMs,
+                                       final boolean isBootstrapDisabled) {
+            this.bootstrapServers = bootstrapServers;
+            this.clientDnsLookup = clientDnsLookup;
+            this.bootstrapResolveTimeoutMs = bootstrapResolveTimeoutMs;
+            this.retryBackoffMs = retryBackoffMs;
+            this.isBootstrapDisabled = isBootstrapDisabled;
+        }
+
+        public static BootstrapConfiguration enabled(final List<String> 
bootstrapServers,
+                                                      final ClientDnsLookup 
clientDnsLookup,
+                                                      final long 
bootstrapResolveTimeoutMs,
+                                                      final long 
retryBackoffMs) {
+            return new BootstrapConfiguration(bootstrapServers, 
clientDnsLookup, bootstrapResolveTimeoutMs, retryBackoffMs, false);
+        }
+
+        public static BootstrapConfiguration disabled() {
+            return new BootstrapConfiguration(List.of(), null, 0, 0, true);
+        }
+    }
+
+    /**
+     * Ensures that the client has successfully resolved and bootstrapped with 
the configured bootstrap servers.
+     * This method will retry DNS resolution until one of the following 
conditions is met:
+     * <ul>
+     *   <li>DNS resolution succeeds and bootstrap is complete (method returns 
normally)</li>
+     *   <li>Bootstrap timeout expires (throws 
BootstrapResolutionException)</li>
+     *   <li>Poll timeout is reached but bootstrap timeout hasn't expired 
(method returns to retry later)</li>
+     * </ul>
+     *
+     * @param pollTimeoutMs The poll timeout in milliseconds, controlling how 
long this method should block
+     *                      during a single poll invocation
+     * @param currentTimeMs The current time in milliseconds
+     * @throws BootstrapResolutionException if the bootstrap timeout expires
+     *         before DNS resolution succeeds
+     *
+     * @implNote This method performs blocking DNS lookups via {@link 
InetAddress#getAllByName(String)}.
+     *           Each DNS lookup may block for an indefinite amount of time 
depending on network conditions
+     *           and DNS server responsiveness. As a result, this method may 
block for longer than pollTimeoutMs
+     *           if DNS resolution is slow. The method will retry DNS 
resolution in a loop with brief sleeps
+     *           (up to 100ms) between attempts until one of the exit 
conditions above is met.
+     */
+    void ensureBootstrapped(final long pollTimeoutMs, final long 
currentTimeMs) {
+        if (bootstrapConfiguration.isBootstrapDisabled || 
metadataUpdater.isBootstrapped())
+            return;
+
+        // Start the bootstrap timer on first call to ensure it starts 
counting from the first poll
+        if (!bootstrapTimerStarted) {
+            bootstrapTimer.update(currentTimeMs);
+            
bootstrapTimer.reset(bootstrapConfiguration.bootstrapResolveTimeoutMs);
+            bootstrapTimerStarted = true;
+        }
+
+        // Handle potential overflow when adding timeout to current time
+        long pollDeadlineMs;
+        if (currentTimeMs > Long.MAX_VALUE - pollTimeoutMs)
+            pollDeadlineMs = Long.MAX_VALUE;
+        else
+            pollDeadlineMs = currentTimeMs + pollTimeoutMs;
+
+        while (true) {
+            long now = time.milliseconds();
+            bootstrapTimer.update(now);
+
+            List<InetSocketAddress> servers = ClientUtils.parseAddresses(
+                bootstrapConfiguration.bootstrapServers, 
bootstrapConfiguration.clientDnsLookup);
+
+            if (!servers.isEmpty()) {
+                // Resolution succeeded
+                
bootstrapTimer.reset(bootstrapConfiguration.bootstrapResolveTimeoutMs);
+                metadataUpdater.bootstrap(servers);
+                return;
+            }
+
+            if (bootstrapTimer.isExpired()) {
+                // Bootstrap timeout expired before poll timeout
+                throw new BootstrapResolutionException("Timeout while 
attempting to resolve bootstrap servers.");
+            }
+
+            if (now >= pollDeadlineMs) {
+                // Poll timeout reached but bootstrap timeout hasn't expired 
yet
+                return;
+            }
+
+            // Sleep before retrying to avoid tight loop and reduce load on 
DNS server
+            // Use the standard retry backoff to prevent overloading DNS with 
requests
+            long remainingPollTimeMs = pollDeadlineMs - now;
+            long remainingBootstrapTimeMs = bootstrapTimer.remainingMs();
+            long sleepTimeMs = Math.min(Math.min(remainingPollTimeMs, 
remainingBootstrapTimeMs), bootstrapConfiguration.retryBackoffMs);
+
+            if (sleepTimeMs > 0) {

Review Comment:
   What happens if users try to interrupt the poll?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to