chia7712 commented on code in PR #21080:
URL: https://github.com/apache/kafka/pull/21080#discussion_r2960755692
##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -538,7 +539,22 @@ static KafkaAdminClient createInternal(
config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG),
adminAddresses.usingBootstrapControllers());
-
metadataManager.update(Cluster.bootstrap(adminAddresses.addresses()),
time.milliseconds());
+
+ // Get the appropriate bootstrap configuration
+ List<String> bootstrapAddressesToUse =
adminAddresses.usingBootstrapControllers()
+ ?
config.getList(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG)
+ : config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+
+ // Create unresolved addresses for bootstrap cluster (defer DNS
resolution to NetworkClient.poll())
+ List<InetSocketAddress> unresolvedAddresses = new ArrayList<>();
+ for (String address : bootstrapAddressesToUse) {
+ String host = Utils.getHost(address);
+ Integer port = Utils.getPort(address);
+ if (host != null && port != null) {
+
unresolvedAddresses.add(InetSocketAddress.createUnresolved(host, port));
+ }
+ }
+ metadataManager.update(Cluster.bootstrap(unresolvedAddresses),
time.milliseconds());
Review Comment:
The cluster created by `Cluster.bootstrap` has `isBootstrapConfigured=true`,
so `bootstrapCluster` will be updated immediately. This makes
`isBootstrapped()` incorrectly return true
##########
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java:
##########
@@ -538,7 +539,22 @@ static KafkaAdminClient createInternal(
config.getLong(AdminClientConfig.RETRY_BACKOFF_MS_CONFIG),
config.getLong(AdminClientConfig.METADATA_MAX_AGE_CONFIG),
adminAddresses.usingBootstrapControllers());
-
metadataManager.update(Cluster.bootstrap(adminAddresses.addresses()),
time.milliseconds());
+
+ // Get the appropriate bootstrap configuration
+ List<String> bootstrapAddressesToUse =
adminAddresses.usingBootstrapControllers()
+ ?
config.getList(AdminClientConfig.BOOTSTRAP_CONTROLLERS_CONFIG)
+ : config.getList(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG);
+
+ // Create unresolved addresses for bootstrap cluster (defer DNS
resolution to NetworkClient.poll())
+ List<InetSocketAddress> unresolvedAddresses = new ArrayList<>();
+ for (String address : bootstrapAddressesToUse) {
+ String host = Utils.getHost(address);
+ Integer port = Utils.getPort(address);
+ if (host != null && port != null) {
+
unresolvedAddresses.add(InetSocketAddress.createUnresolved(host, port));
+ }
+ }
+ metadataManager.update(Cluster.bootstrap(unresolvedAddresses),
time.milliseconds());
Review Comment:
Maybe we should leverage `MetadataUpdater#bootstrap` to set the resolved
bootstrap servers once the DNS resolution succeeds in `NetworkClient`?
##########
clients/src/main/java/org/apache/kafka/clients/NetworkClient.java:
##########
@@ -1171,6 +1197,112 @@ private boolean isTelemetryApi(ApiKeys apiKey) {
return apiKey == ApiKeys.GET_TELEMETRY_SUBSCRIPTIONS || apiKey ==
ApiKeys.PUSH_TELEMETRY;
}
+ public static class BootstrapConfiguration {
+ public final List<String> bootstrapServers;
+ public final ClientDnsLookup clientDnsLookup;
+ public final long bootstrapResolveTimeoutMs;
+ public final long retryBackoffMs;
+ private final boolean isBootstrapDisabled;
+
+ private BootstrapConfiguration(final List<String> bootstrapServers,
+ final ClientDnsLookup clientDnsLookup,
+ final long bootstrapResolveTimeoutMs,
+ final long retryBackoffMs,
+ final boolean isBootstrapDisabled) {
+ this.bootstrapServers = bootstrapServers;
+ this.clientDnsLookup = clientDnsLookup;
+ this.bootstrapResolveTimeoutMs = bootstrapResolveTimeoutMs;
+ this.retryBackoffMs = retryBackoffMs;
+ this.isBootstrapDisabled = isBootstrapDisabled;
+ }
+
+ public static BootstrapConfiguration enabled(final List<String>
bootstrapServers,
+ final ClientDnsLookup
clientDnsLookup,
+ final long
bootstrapResolveTimeoutMs,
+ final long
retryBackoffMs) {
+ return new BootstrapConfiguration(bootstrapServers,
clientDnsLookup, bootstrapResolveTimeoutMs, retryBackoffMs, false);
+ }
+
+ public static BootstrapConfiguration disabled() {
+ return new BootstrapConfiguration(List.of(), null, 0, 0, true);
+ }
+ }
+
+ /**
+ * Ensures that the client has successfully resolved and bootstrapped with
the configured bootstrap servers.
+ * This method will retry DNS resolution until one of the following
conditions is met:
+ * <ul>
+ * <li>DNS resolution succeeds and bootstrap is complete (method returns
normally)</li>
+ * <li>Bootstrap timeout expires (throws
BootstrapResolutionException)</li>
+ * <li>Poll timeout is reached but bootstrap timeout hasn't expired
(method returns to retry later)</li>
+ * </ul>
+ *
+ * @param pollTimeoutMs The poll timeout in milliseconds, controlling how
long this method should block
+ * during a single poll invocation
+ * @param currentTimeMs The current time in milliseconds
+ * @throws BootstrapResolutionException if the bootstrap timeout expires
+ * before DNS resolution succeeds
+ *
+ * @implNote This method performs blocking DNS lookups via {@link
InetAddress#getAllByName(String)}.
+ * Each DNS lookup may block for an indefinite amount of time
depending on network conditions
+ * and DNS server responsiveness. As a result, this method may
block for longer than pollTimeoutMs
+ * if DNS resolution is slow. The method will retry DNS
resolution in a loop with brief sleeps
+ * (up to 100ms) between attempts until one of the exit
conditions above is met.
+ */
+ void ensureBootstrapped(final long pollTimeoutMs, final long
currentTimeMs) {
+ if (bootstrapConfiguration.isBootstrapDisabled ||
metadataUpdater.isBootstrapped())
+ return;
+
+ // Start the bootstrap timer on first call to ensure it starts
counting from the first poll
+ if (!bootstrapTimerStarted) {
+ bootstrapTimer.update(currentTimeMs);
+
bootstrapTimer.reset(bootstrapConfiguration.bootstrapResolveTimeoutMs);
+ bootstrapTimerStarted = true;
+ }
+
+ // Handle potential overflow when adding timeout to current time
+ long pollDeadlineMs;
+ if (currentTimeMs > Long.MAX_VALUE - pollTimeoutMs)
+ pollDeadlineMs = Long.MAX_VALUE;
+ else
+ pollDeadlineMs = currentTimeMs + pollTimeoutMs;
+
+ while (true) {
+ long now = time.milliseconds();
+ bootstrapTimer.update(now);
+
+ List<InetSocketAddress> servers = ClientUtils.parseAddresses(
+ bootstrapConfiguration.bootstrapServers,
bootstrapConfiguration.clientDnsLookup);
+
+ if (!servers.isEmpty()) {
+ // Resolution succeeded
+
bootstrapTimer.reset(bootstrapConfiguration.bootstrapResolveTimeoutMs);
+ metadataUpdater.bootstrap(servers);
+ return;
+ }
+
+ if (bootstrapTimer.isExpired()) {
+ // Bootstrap timeout expired before poll timeout
+ throw new BootstrapResolutionException("Timeout while
attempting to resolve bootstrap servers.");
+ }
+
+ if (now >= pollDeadlineMs) {
+ // Poll timeout reached but bootstrap timeout hasn't expired
yet
+ return;
+ }
+
+ // Sleep before retrying to avoid tight loop and reduce load on
DNS server
+ // Use the standard retry backoff to prevent overloading DNS with
requests
+ long remainingPollTimeMs = pollDeadlineMs - now;
+ long remainingBootstrapTimeMs = bootstrapTimer.remainingMs();
+ long sleepTimeMs = Math.min(Math.min(remainingPollTimeMs,
remainingBootstrapTimeMs), bootstrapConfiguration.retryBackoffMs);
+
+ if (sleepTimeMs > 0) {
Review Comment:
What happens if users try to interrupt the poll?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]