This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
     new 62fb6a3ef19 KAFKA-8206: KIP-899: Allow client to rebootstrap  (#13277)
62fb6a3ef19 is described below

commit 62fb6a3ef1996f2b5ec80b7923bf27583531babb
Author: Ivan Yurchenko <[email protected]>
AuthorDate: Wed Jun 12 22:48:32 2024 +0300

    KAFKA-8206: KIP-899: Allow client to rebootstrap  (#13277)
    
    This commit implements KIP-899: Allow producer and consumer clients to 
rebootstrap. It introduces the new setting `metadata.recovery.strategy`, 
applicable to all the types of clients.
    
    Reviewers: Greg Harris <[email protected]>, Rajini Sivaram 
<[email protected]>
---
 .../java/org/apache/kafka/clients/ClientUtils.java |  4 +-
 .../apache/kafka/clients/CommonClientConfigs.java  | 13 ++++
 .../java/org/apache/kafka/clients/KafkaClient.java |  2 +-
 .../org/apache/kafka/clients/LeastLoadedNode.java  | 43 +++++++++++
 .../java/org/apache/kafka/clients/Metadata.java    |  8 ++
 .../kafka/clients/MetadataRecoveryStrategy.java    | 44 +++++++++++
 .../org/apache/kafka/clients/NetworkClient.java    | 62 ++++++++++-----
 .../kafka/clients/admin/AdminClientConfig.java     | 14 +++-
 .../kafka/clients/admin/KafkaAdminClient.java      | 16 +++-
 .../admin/internals/AdminMetadataManager.java      | 14 ++++
 .../kafka/clients/consumer/ConsumerConfig.java     | 10 ++-
 .../consumer/internals/ConsumerNetworkClient.java  |  2 +-
 .../consumer/internals/NetworkClientDelegate.java  |  4 +-
 .../kafka/clients/producer/ProducerConfig.java     | 10 ++-
 .../kafka/clients/producer/internals/Sender.java   |  2 +-
 .../java/org/apache/kafka/clients/MockClient.java  |  8 +-
 .../apache/kafka/clients/NetworkClientTest.java    | 72 ++++++++++++++----
 .../kafka/clients/admin/AdminClientConfigTest.java | 47 ++++++++++++
 .../kafka/clients/consumer/ConsumerConfigTest.java | 20 +++++
 .../internals/FetchRequestManagerTest.java         |  4 +-
 .../clients/consumer/internals/FetcherTest.java    |  4 +-
 .../kafka/clients/producer/KafkaProducerTest.java  |  5 +-
 .../kafka/clients/producer/ProducerConfigTest.java | 20 +++++
 .../clients/producer/internals/SenderTest.java     | 13 ++--
 .../runtime/distributed/DistributedConfig.java     | 14 +++-
 .../runtime/distributed/WorkerGroupMember.java     |  5 +-
 core/src/main/java/kafka/server/NetworkUtils.java  |  4 +-
 .../kafka/admin/BrokerApiVersionsCommand.scala     |  5 +-
 .../controller/ControllerChannelManager.scala      |  3 +-
 .../TransactionMarkerChannelManager.scala          |  3 +-
 core/src/main/scala/kafka/raft/RaftManager.scala   |  5 +-
 .../scala/kafka/server/BrokerBlockingSender.scala  |  3 +-
 core/src/main/scala/kafka/server/KafkaServer.scala |  5 +-
 .../server/NodeToControllerChannelManager.scala    |  3 +-
 .../kafka/api/AdminClientRebootstrapTest.scala     | 48 ++++++++++++
 .../kafka/api/ConsumerRebootstrapTest.scala        | 87 ++++++++++++++++++++++
 .../kafka/api/ProducerRebootstrapTest.scala        | 52 +++++++++++++
 .../integration/kafka/api/RebootstrapTest.scala    | 52 +++++++++++++
 .../kafka/tools/ReplicaVerificationTool.java       |  4 +-
 .../trogdor/workload/ConnectionStressWorker.java   |  4 +-
 40 files changed, 666 insertions(+), 72 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index 6b6b56059c8..b08bc12950c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -245,7 +245,9 @@ public final class ClientUtils {
                     throttleTimeSensor,
                     logContext,
                     hostResolver,
-                    clientTelemetrySender);
+                    clientTelemetrySender,
+                    
MetadataRecoveryStrategy.forName(config.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG))
+            );
         } catch (Throwable t) {
             closeQuietly(selector, "Selector");
             closeQuietly(channelBuilder, "ChannelBuilder");
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index f5a08da2c93..9803d487f41 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -219,6 +219,19 @@ public class CommonClientConfigs {
     public static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the 
timeout (in milliseconds) for client APIs. " +
             "This configuration is used as the default timeout for all client 
operations that do not specify a <code>timeout</code> parameter.";
 
+    public static final String METADATA_RECOVERY_STRATEGY_CONFIG = 
"metadata.recovery.strategy";
+    public static final String METADATA_RECOVERY_STRATEGY_DOC = "Controls how 
the client recovers when none of the brokers known to it is available. " +
+            "If set to <code>none</code>, the client fails. If set to 
<code>rebootstrap</code>, " +
+            "the client repeats the bootstrap process using 
<code>bootstrap.servers</code>. " +
+            "Rebootstrapping is useful when a client communicates with brokers 
so infrequently " +
+            "that the set of brokers may change entirely before the client 
refreshes metadata. " +
+            "Metadata recovery is triggered when all last-known brokers appear 
unavailable simultaneously. " +
+            "Brokers appear unavailable when disconnected and no current retry 
attempt is in-progress. " +
+            "Consider increasing <code>reconnect.backoff.ms</code> and 
<code>reconnect.backoff.max.ms</code> and " +
+            "decreasing <code>socket.connection.setup.timeout.ms</code> and 
<code>socket.connection.setup.timeout.max.ms</code> " +
+            "for the client.";
+    public static final String DEFAULT_METADATA_RECOVERY_STRATEGY = 
MetadataRecoveryStrategy.NONE.name;
+
     /**
      * Postprocess the configuration so that exponential backoff is disabled 
when reconnect backoff
      * is explicitly configured but the maximum reconnect backoff is not 
explicitly configured.
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index a03d57b40f8..46b64986064 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -130,7 +130,7 @@ public interface KafkaClient extends Closeable {
      * @param now The current time in ms
      * @return The node with the fewest in-flight requests.
      */
-    Node leastLoadedNode(long now);
+    LeastLoadedNode leastLoadedNode(long now);
 
     /**
      * The number of currently in-flight requests for which we have not yet 
returned a response
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/LeastLoadedNode.java 
b/clients/src/main/java/org/apache/kafka/clients/LeastLoadedNode.java
new file mode 100644
index 00000000000..b2b93e6c94b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/LeastLoadedNode.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.Node;
+
+public class LeastLoadedNode {
+    private final Node node;
+    private final boolean atLeastOneConnectionReady;
+
+    public LeastLoadedNode(Node node, boolean atLeastOneConnectionReady) {
+        this.node = node;
+        this.atLeastOneConnectionReady = atLeastOneConnectionReady;
+    }
+
+    public Node node() {
+        return node;
+    }
+
+    /**
+     * Indicates if the least loaded node is available or at least a ready 
connection exists.
+     *
+     * <p>There may be no node available while ready connections to live nodes 
exist. This may happen when
+     * the connections are overloaded with in-flight requests. This function 
takes this into account.
+     */
+    public boolean hasNodeAvailableOrConnectionReady() {
+        return node != null || atLeastOneConnectionReady;
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java 
b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 30cad44a4bc..9246da01000 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -82,6 +82,8 @@ public class Metadata implements Closeable {
     private final ClusterResourceListeners clusterResourceListeners;
     private boolean isClosed;
     private final Map<TopicPartition, Integer> lastSeenLeaderEpochs;
+    /** Addresses with which the metadata was originally bootstrapped. */
+    private List<InetSocketAddress> bootstrapAddresses;
 
     /**
      * Create a new Metadata instance
@@ -304,6 +306,12 @@ public class Metadata implements Closeable {
         this.needFullUpdate = true;
         this.updateVersion += 1;
         this.metadataSnapshot = MetadataSnapshot.bootstrap(addresses);
+        this.bootstrapAddresses = addresses;
+    }
+
+    public synchronized void rebootstrap() {
+        log.info("Rebootstrapping with {}", this.bootstrapAddresses);
+        this.bootstrap(this.bootstrapAddresses);
     }
 
     /**
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/MetadataRecoveryStrategy.java 
b/clients/src/main/java/org/apache/kafka/clients/MetadataRecoveryStrategy.java
new file mode 100644
index 00000000000..a4e0340c241
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/MetadataRecoveryStrategy.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import java.util.Locale;
+
+/**
+ * Defines the strategies which clients can follow to deal with the situation 
when none of the known nodes is available.
+ */
+public enum MetadataRecoveryStrategy {
+    NONE("none"),
+    REBOOTSTRAP("rebootstrap");
+
+    public final String name;
+
+    MetadataRecoveryStrategy(String name) {
+        this.name = name;
+    }
+
+    public static MetadataRecoveryStrategy forName(String name) {
+        if (name == null) {
+            throw new IllegalArgumentException("Illegal 
MetadataRecoveryStrategy: null");
+        }
+        try {
+            return 
MetadataRecoveryStrategy.valueOf(name.toUpperCase(Locale.ROOT));
+        } catch (IllegalArgumentException e) {
+            throw new IllegalArgumentException("Illegal 
MetadataRecoveryStrategy: " + name);
+        }
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 3a7af6617e7..8ac92acd884 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -114,6 +114,8 @@ public class NetworkClient implements KafkaClient {
     /* time in ms to wait before retrying to create connection to a server */
     private final long reconnectBackoffMs;
 
+    private final MetadataRecoveryStrategy metadataRecoveryStrategy;
+
     private final Time time;
 
     /**
@@ -147,7 +149,8 @@ public class NetworkClient implements KafkaClient {
                          Time time,
                          boolean discoverBrokerVersions,
                          ApiVersions apiVersions,
-                         LogContext logContext) {
+                         LogContext logContext,
+                         MetadataRecoveryStrategy metadataRecoveryStrategy) {
         this(selector,
              metadata,
              clientId,
@@ -163,7 +166,8 @@ public class NetworkClient implements KafkaClient {
              discoverBrokerVersions,
              apiVersions,
              null,
-             logContext);
+             logContext,
+             metadataRecoveryStrategy);
     }
 
     public NetworkClient(Selectable selector,
@@ -181,7 +185,8 @@ public class NetworkClient implements KafkaClient {
                          boolean discoverBrokerVersions,
                          ApiVersions apiVersions,
                          Sensor throttleTimeSensor,
-                         LogContext logContext) {
+                         LogContext logContext,
+                         MetadataRecoveryStrategy metadataRecoveryStrategy) {
         this(null,
              metadata,
              selector,
@@ -200,7 +205,8 @@ public class NetworkClient implements KafkaClient {
              throttleTimeSensor,
              logContext,
              new DefaultHostResolver(),
-             null);
+             null,
+             metadataRecoveryStrategy);
     }
 
     public NetworkClient(Selectable selector,
@@ -217,7 +223,8 @@ public class NetworkClient implements KafkaClient {
                          Time time,
                          boolean discoverBrokerVersions,
                          ApiVersions apiVersions,
-                         LogContext logContext) {
+                         LogContext logContext,
+                         MetadataRecoveryStrategy metadataRecoveryStrategy) {
         this(metadataUpdater,
              null,
              selector,
@@ -236,7 +243,8 @@ public class NetworkClient implements KafkaClient {
              null,
              logContext,
              new DefaultHostResolver(),
-             null);
+             null,
+             metadataRecoveryStrategy);
     }
 
     public NetworkClient(MetadataUpdater metadataUpdater,
@@ -257,7 +265,8 @@ public class NetworkClient implements KafkaClient {
                          Sensor throttleTimeSensor,
                          LogContext logContext,
                          HostResolver hostResolver,
-                         ClientTelemetrySender clientTelemetrySender) {
+                         ClientTelemetrySender clientTelemetrySender,
+                         MetadataRecoveryStrategy metadataRecoveryStrategy) {
         /* It would be better if we could pass `DefaultMetadataUpdater` from 
the public constructor, but it's not
          * possible because `DefaultMetadataUpdater` is an inner class and it 
can only be instantiated after the
          * super constructor is invoked.
@@ -288,6 +297,7 @@ public class NetworkClient implements KafkaClient {
         this.log = logContext.logger(NetworkClient.class);
         this.state = new AtomicReference<>(State.ACTIVE);
         this.telemetrySender = (clientTelemetrySender != null) ? new 
TelemetrySender(clientTelemetrySender) : null;
+        this.metadataRecoveryStrategy = metadataRecoveryStrategy;
     }
 
     /**
@@ -695,7 +705,7 @@ public class NetworkClient implements KafkaClient {
      * @return The node with the fewest in-flight requests.
      */
     @Override
-    public Node leastLoadedNode(long now) {
+    public LeastLoadedNode leastLoadedNode(long now) {
         List<Node> nodes = this.metadataUpdater.fetchNodes();
         if (nodes.isEmpty())
             throw new IllegalStateException("There are no nodes in the Kafka 
cluster");
@@ -705,16 +715,25 @@ public class NetworkClient implements KafkaClient {
         Node foundCanConnect = null;
         Node foundReady = null;
 
+        boolean atLeastOneConnectionReady = false;
+
         int offset = this.randOffset.nextInt(nodes.size());
         for (int i = 0; i < nodes.size(); i++) {
             int idx = (offset + i) % nodes.size();
             Node node = nodes.get(idx);
+
+            if (!atLeastOneConnectionReady
+                    && connectionStates.isReady(node.idString(), now)
+                    && selector.isChannelReady(node.idString())) {
+                atLeastOneConnectionReady = true;
+            }
+
             if (canSendRequest(node.idString(), now)) {
                 int currInflight = 
this.inFlightRequests.count(node.idString());
                 if (currInflight == 0) {
                     // if we find an established connection with no in-flight 
requests we can stop right away
                     log.trace("Found least loaded node {} connected with no 
in-flight requests", node);
-                    return node;
+                    return new LeastLoadedNode(node, true);
                 } else if (currInflight < inflight) {
                     // otherwise if this is the best we have found so far, 
record that
                     inflight = currInflight;
@@ -738,16 +757,16 @@ public class NetworkClient implements KafkaClient {
         // which are being established before connecting to new nodes.
         if (foundReady != null) {
             log.trace("Found least loaded node {} with {} inflight requests", 
foundReady, inflight);
-            return foundReady;
+            return new LeastLoadedNode(foundReady, atLeastOneConnectionReady);
         } else if (foundConnecting != null) {
             log.trace("Found least loaded connecting node {}", 
foundConnecting);
-            return foundConnecting;
+            return new LeastLoadedNode(foundConnecting, 
atLeastOneConnectionReady);
         } else if (foundCanConnect != null) {
             log.trace("Found least loaded node {} with no active connection", 
foundCanConnect);
-            return foundCanConnect;
+            return new LeastLoadedNode(foundCanConnect, 
atLeastOneConnectionReady);
         } else {
             log.trace("Least loaded node selection failed to find an available 
node");
-            return null;
+            return new LeastLoadedNode(null, atLeastOneConnectionReady);
         }
     }
 
@@ -1122,13 +1141,22 @@ public class NetworkClient implements KafkaClient {
 
             // Beware that the behavior of this method and the computation of 
timeouts for poll() are
             // highly dependent on the behavior of leastLoadedNode.
-            Node node = leastLoadedNode(now);
-            if (node == null) {
+            LeastLoadedNode leastLoadedNode = leastLoadedNode(now);
+
+            // Rebootstrap if needed and configured.
+            if (metadataRecoveryStrategy == 
MetadataRecoveryStrategy.REBOOTSTRAP
+                    && !leastLoadedNode.hasNodeAvailableOrConnectionReady()) {
+                metadata.rebootstrap();
+
+                leastLoadedNode = leastLoadedNode(now);
+            }
+
+            if (leastLoadedNode.node() == null) {
                 log.debug("Give up sending metadata request since no node is 
available");
                 return reconnectBackoffMs;
             }
 
-            return maybeUpdate(now, node);
+            return maybeUpdate(now, leastLoadedNode.node());
         }
 
         @Override
@@ -1266,7 +1294,7 @@ public class NetworkClient implements KafkaClient {
 
             // Per KIP-714, let's continue to re-use the same broker for as 
long as possible.
             if (stickyNode == null) {
-                stickyNode = leastLoadedNode(now);
+                stickyNode = leastLoadedNode(now).node();
                 if (stickyNode == null) {
                     log.debug("Give up sending telemetry request since no node 
is available");
                     return reconnectBackoffMs;
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index d740283f506..b64338b1d1b 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.admin;
 
 import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
@@ -139,6 +140,10 @@ public class AdminClientConfig extends AbstractConfig {
     public static final String RETRIES_CONFIG = 
CommonClientConfigs.RETRIES_CONFIG;
     public static final String DEFAULT_API_TIMEOUT_MS_CONFIG = 
CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG;
 
+    public static final String METADATA_RECOVERY_STRATEGY_CONFIG = 
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG;
+    public static final String METADATA_RECOVERY_STRATEGY_DOC = 
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC;
+    public static final String DEFAULT_METADATA_RECOVERY_STRATEGY = 
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY;
+
     /**
      * <code>security.providers</code>
      */
@@ -262,7 +267,14 @@ public class AdminClientConfig extends AbstractConfig {
                                         Importance.MEDIUM,
                                         SECURITY_PROTOCOL_DOC)
                                 .withClientSslSupport()
-                                .withClientSaslSupport();
+                                .withClientSaslSupport()
+                                .define(METADATA_RECOVERY_STRATEGY_CONFIG,
+                                        Type.STRING,
+                                        DEFAULT_METADATA_RECOVERY_STRATEGY,
+                                        ConfigDef.CaseInsensitiveValidString
+                                                
.in(Utils.enumOptions(MetadataRecoveryStrategy.class)),
+                                        Importance.LOW,
+                                        METADATA_RECOVERY_STRATEGY_DOC);
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 55646e27820..8f693c9965a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -25,6 +25,8 @@ import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.DefaultHostResolver;
 import org.apache.kafka.clients.HostResolver;
 import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.LeastLoadedNode;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.StaleMetadataException;
 import 
org.apache.kafka.clients.admin.CreateTopicsResult.TopicMetadataAndConfig;
@@ -399,6 +401,7 @@ public class KafkaAdminClient extends AdminClient {
     private final long retryBackoffMaxMs;
     private final ExponentialBackoff retryBackoff;
     private final boolean clientTelemetryEnabled;
+    private final MetadataRecoveryStrategy metadataRecoveryStrategy;
 
     /**
      * The telemetry requests client instance id.
@@ -612,6 +615,7 @@ public class KafkaAdminClient extends AdminClient {
             retryBackoffMaxMs,
             CommonClientConfigs.RETRY_BACKOFF_JITTER);
         this.clientTelemetryEnabled = 
config.getBoolean(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG);
+        this.metadataRecoveryStrategy = 
MetadataRecoveryStrategy.forName(config.getString(AdminClientConfig.METADATA_RECOVERY_STRATEGY_CONFIG));
         config.logUnused();
         AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, 
time.milliseconds());
         log.debug("Kafka admin client initialized");
@@ -698,7 +702,13 @@ public class KafkaAdminClient extends AdminClient {
     private class MetadataUpdateNodeIdProvider implements NodeProvider {
         @Override
         public Node provide() {
-            return client.leastLoadedNode(time.milliseconds());
+            LeastLoadedNode leastLoadedNode = 
client.leastLoadedNode(time.milliseconds());
+            if (metadataRecoveryStrategy == 
MetadataRecoveryStrategy.REBOOTSTRAP
+                    && !leastLoadedNode.hasNodeAvailableOrConnectionReady()) {
+                metadataManager.rebootstrap(time.milliseconds());
+            }
+
+            return leastLoadedNode.node();
         }
 
         @Override
@@ -780,7 +790,7 @@ public class KafkaAdminClient extends AdminClient {
             if (metadataManager.isReady()) {
                 // This may return null if all nodes are busy.
                 // In that case, we will postpone node assignment.
-                return client.leastLoadedNode(time.milliseconds());
+                return client.leastLoadedNode(time.milliseconds()).node();
             }
             metadataManager.requestUpdate();
             return null;
@@ -835,7 +845,7 @@ public class KafkaAdminClient extends AdminClient {
                 } else {
                     // This may return null if all nodes are busy.
                     // In that case, we will postpone node assignment.
-                    return client.leastLoadedNode(time.milliseconds());
+                    return client.leastLoadedNode(time.milliseconds()).node();
                 }
             }
             metadataManager.requestUpdate();
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
index f8123c40eff..239f6eecef0 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
@@ -92,6 +92,11 @@ public class AdminMetadataManager {
      */
     private ApiException fatalException = null;
 
+    /**
+     * The cluster with which the metadata was bootstrapped.
+     */
+    private Cluster bootstrapCluster;
+
     public class AdminMetadataUpdater implements MetadataUpdater {
         @Override
         public List<Node> fetchNodes() {
@@ -275,6 +280,7 @@ public class AdminMetadataManager {
     public void update(Cluster cluster, long now) {
         if (cluster.isBootstrapConfigured()) {
             log.debug("Setting bootstrap cluster metadata {}.", cluster);
+            bootstrapCluster = cluster;
         } else {
             log.debug("Updating cluster metadata to {}", cluster);
             this.lastMetadataUpdateMs = now;
@@ -287,4 +293,12 @@ public class AdminMetadataManager {
             this.cluster = cluster;
         }
     }
+
+    /**
+     * Rebootstrap metadata with the cluster previously used for bootstrapping.
+     */
+    public void rebootstrap(long now) {
+        log.info("Rebootstrapping with {}", this.bootstrapCluster);
+        update(bootstrapCluster, now);
+    }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 76bfe7e91a1..c4c10c404b4 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer;
 
 import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
 import org.apache.kafka.common.IsolationLevel;
 import org.apache.kafka.common.config.AbstractConfig;
 import org.apache.kafka.common.config.ConfigDef;
@@ -656,7 +657,14 @@ public class ConsumerConfig extends AbstractConfig {
                                         Importance.MEDIUM,
                                         
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
                                 .withClientSslSupport()
-                                .withClientSaslSupport();
+                                .withClientSaslSupport()
+                                
.define(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
+                                        Type.STRING,
+                                        
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY,
+                                        ConfigDef.CaseInsensitiveValidString
+                                                
.in(Utils.enumOptions(MetadataRecoveryStrategy.class)),
+                                        Importance.LOW,
+                                        
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC);
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 4699f00c151..50c7eb5b028 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -139,7 +139,7 @@ public class ConsumerNetworkClient implements Closeable {
     public Node leastLoadedNode() {
         lock.lock();
         try {
-            return client.leastLoadedNode(time.milliseconds());
+            return client.leastLoadedNode(time.milliseconds()).node();
         } finally {
             lock.unlock();
         }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
index e2e4d529c00..d069a0d1fb6 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
@@ -163,7 +163,7 @@ public class NetworkClientDelegate implements AutoCloseable 
{
     }
 
     boolean doSend(final UnsentRequest r, final long currentTimeMs) {
-        Node node = r.node.orElse(client.leastLoadedNode(currentTimeMs));
+        Node node = 
r.node.orElse(client.leastLoadedNode(currentTimeMs).node());
         if (node == null || nodeUnavailable(node)) {
             log.debug("No broker available to send the request: {}. 
Retrying.", r);
             return false;
@@ -208,7 +208,7 @@ public class NetworkClientDelegate implements AutoCloseable 
{
     }
 
     public Node leastLoadedNode() {
-        return this.client.leastLoadedNode(time.milliseconds());
+        return this.client.leastLoadedNode(time.milliseconds()).node();
     }
 
     public void wakeup() {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index c67d60a180a..a59ee81b4a9 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.producer;
 
 import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
 import org.apache.kafka.common.compress.GzipCompression;
 import org.apache.kafka.common.compress.Lz4Compression;
 import org.apache.kafka.common.compress.ZstdCompression;
@@ -528,7 +529,14 @@ public class ProducerConfig extends AbstractConfig {
                                         null,
                                         new ConfigDef.NonEmptyString(),
                                         Importance.LOW,
-                                        TRANSACTIONAL_ID_DOC);
+                                        TRANSACTIONAL_ID_DOC)
+                                
.define(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
+                                        Type.STRING,
+                                        
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY,
+                                        ConfigDef.CaseInsensitiveValidString
+                                                
.in(Utils.enumOptions(MetadataRecoveryStrategy.class)),
+                                        Importance.LOW,
+                                        
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC);
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index c4e2b73e8b9..b1a3ab9e293 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -484,7 +484,7 @@ public class Sender implements Runnable {
             FindCoordinatorRequest.CoordinatorType coordinatorType = 
nextRequestHandler.coordinatorType();
             targetNode = coordinatorType != null ?
                     transactionManager.coordinator(coordinatorType) :
-                    client.leastLoadedNode(time.milliseconds());
+                    client.leastLoadedNode(time.milliseconds()).node();
             if (targetNode != null) {
                 if (!awaitNodeReady(targetNode, coordinatorType)) {
                     log.trace("Target node {} not ready within request 
timeout, will retry when node is ready.", targetNode);
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java 
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index d6cdd14f365..86d1ddf5f41 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -319,7 +319,7 @@ public class MockClient implements KafkaClient {
         checkTimeoutOfPendingRequests(now);
 
         // We skip metadata updates if all nodes are currently blacked out
-        if (metadataUpdater.isUpdateNeeded() && leastLoadedNode(now) != null) {
+        if (metadataUpdater.isUpdateNeeded() && leastLoadedNode(now).node() != 
null) {
             MetadataUpdate metadataUpdate = metadataUpdates.poll();
             if (metadataUpdate != null) {
                 metadataUpdater.update(time, metadataUpdate);
@@ -588,13 +588,13 @@ public class MockClient implements KafkaClient {
     }
 
     @Override
-    public Node leastLoadedNode(long now) {
+    public LeastLoadedNode leastLoadedNode(long now) {
         // Consistent with NetworkClient, we do not return nodes awaiting 
reconnect backoff
         for (Node node : metadataUpdater.fetchNodes()) {
             if (!connectionState(node.idString()).isBackingOff(now))
-                return node;
+                return new LeastLoadedNode(node, true);
         }
-        return null;
+        return new LeastLoadedNode(null, false);
     }
 
     public void setWakeupHook(Runnable wakeupHook) {
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index cd3ec36f385..4369c8404e1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -128,7 +128,16 @@ public class NetworkClientTest {
     private NetworkClient createNetworkClient(long reconnectBackoffMaxMs) {
         return new NetworkClient(selector, metadataUpdater, "mock", 
Integer.MAX_VALUE,
                 reconnectBackoffMsTest, reconnectBackoffMaxMs, 64 * 1024, 64 * 
1024,
-                defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, 
connectionSetupTimeoutMaxMsTest, time, true, new ApiVersions(), new 
LogContext());
+                defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, 
connectionSetupTimeoutMaxMsTest, time, true, new ApiVersions(), new 
LogContext(),
+                MetadataRecoveryStrategy.NONE);
+    }
+
+    private NetworkClient 
createNetworkClientWithMaxInFlightRequestsPerConnection(
+            int maxInFlightRequestsPerConnection, long reconnectBackoffMaxMs) {
+        return new NetworkClient(selector, metadataUpdater, "mock", 
maxInFlightRequestsPerConnection,
+                reconnectBackoffMsTest, reconnectBackoffMaxMs, 64 * 1024, 64 * 
1024,
+                defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, 
connectionSetupTimeoutMaxMsTest, time, true, new ApiVersions(), new 
LogContext(),
+                MetadataRecoveryStrategy.NONE);
     }
 
     private NetworkClient createNetworkClientWithMultipleNodes(long 
reconnectBackoffMaxMs, long connectionSetupTimeoutMsTest, int nodeNumber) {
@@ -136,26 +145,30 @@ public class NetworkClientTest {
         TestMetadataUpdater metadataUpdater = new TestMetadataUpdater(nodes);
         return new NetworkClient(selector, metadataUpdater, "mock", 
Integer.MAX_VALUE,
                 reconnectBackoffMsTest, reconnectBackoffMaxMs, 64 * 1024, 64 * 
1024,
-                defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, 
connectionSetupTimeoutMaxMsTest, time, true, new ApiVersions(), new 
LogContext());
+                defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, 
connectionSetupTimeoutMaxMsTest, time, true, new ApiVersions(), new 
LogContext(),
+                MetadataRecoveryStrategy.NONE);
     }
 
     private NetworkClient createNetworkClientWithStaticNodes() {
         return new NetworkClient(selector, metadataUpdater,
                 "mock-static", Integer.MAX_VALUE, 0, 0, 64 * 1024, 64 * 1024, 
defaultRequestTimeoutMs,
-                connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, 
time, true, new ApiVersions(), new LogContext());
+                connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, 
time, true, new ApiVersions(), new LogContext(),
+                MetadataRecoveryStrategy.NONE);
     }
 
     private NetworkClient createNetworkClientWithNoVersionDiscovery(Metadata 
metadata) {
         return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
                 reconnectBackoffMsTest, 0, 64 * 1024, 64 * 1024,
-                defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, 
connectionSetupTimeoutMaxMsTest, time, false, new ApiVersions(), new 
LogContext());
+                defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, 
connectionSetupTimeoutMaxMsTest, time, false, new ApiVersions(), new 
LogContext(),
+                MetadataRecoveryStrategy.NONE);
     }
 
     private NetworkClient createNetworkClientWithNoVersionDiscovery() {
         return new NetworkClient(selector, metadataUpdater, "mock", 
Integer.MAX_VALUE,
                 reconnectBackoffMsTest, reconnectBackoffMaxMsTest,
                 64 * 1024, 64 * 1024, defaultRequestTimeoutMs,
-                connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, 
time, false, new ApiVersions(), new LogContext());
+                connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, 
time, false, new ApiVersions(), new LogContext(),
+                MetadataRecoveryStrategy.NONE);
     }
 
     @BeforeEach
@@ -698,14 +711,18 @@ public class NetworkClientTest {
     public void testLeastLoadedNode() {
         client.ready(node, time.milliseconds());
         assertFalse(client.isReady(node, time.milliseconds()));
-        assertEquals(node, client.leastLoadedNode(time.milliseconds()));
+        LeastLoadedNode leastLoadedNode = 
client.leastLoadedNode(time.milliseconds());
+        assertEquals(node, leastLoadedNode.node());
+        assertTrue(leastLoadedNode.hasNodeAvailableOrConnectionReady());
 
         awaitReady(client, node);
         client.poll(1, time.milliseconds());
         assertTrue(client.isReady(node, time.milliseconds()), "The client 
should be ready");
 
         // leastloadednode should be our single node
-        Node leastNode = client.leastLoadedNode(time.milliseconds());
+        leastLoadedNode = client.leastLoadedNode(time.milliseconds());
+        assertTrue(leastLoadedNode.hasNodeAvailableOrConnectionReady());
+        Node leastNode = leastLoadedNode.node();
         assertEquals(leastNode.id(), node.id(), "There should be one 
leastloadednode");
 
         // sleep for longer than reconnect backoff
@@ -716,8 +733,29 @@ public class NetworkClientTest {
 
         client.poll(1, time.milliseconds());
         assertFalse(client.ready(node, time.milliseconds()), "After we forced 
the disconnection the client is no longer ready.");
-        leastNode = client.leastLoadedNode(time.milliseconds());
-        assertNull(leastNode, "There should be NO leastloadednode");
+        leastLoadedNode = client.leastLoadedNode(time.milliseconds());
+        assertFalse(leastLoadedNode.hasNodeAvailableOrConnectionReady());
+        assertNull(leastLoadedNode.node(), "There should be NO 
leastloadednode");
+    }
+
+    @Test
+    public void testHasNodeAvailableOrConnectionReady() {
+        NetworkClient client = 
createNetworkClientWithMaxInFlightRequestsPerConnection(1, 
reconnectBackoffMaxMsTest);
+        awaitReady(client, node);
+
+        long now = time.milliseconds();
+        LeastLoadedNode leastLoadedNode = client.leastLoadedNode(now);
+        assertEquals(node, leastLoadedNode.node());
+        assertTrue(leastLoadedNode.hasNodeAvailableOrConnectionReady());
+
+        MetadataRequest.Builder builder = new 
MetadataRequest.Builder(Collections.emptyList(), true);
+        ClientRequest request = client.newClientRequest(node.idString(), 
builder, now, true);
+        client.send(request, now);
+        client.poll(defaultRequestTimeoutMs, now);
+
+        leastLoadedNode = client.leastLoadedNode(now);
+        assertNull(leastLoadedNode.node());
+        assertTrue(leastLoadedNode.hasNodeAvailableOrConnectionReady());
     }
 
     @Test
@@ -727,7 +765,7 @@ public class NetworkClientTest {
 
         Set<Node> providedNodeIds = new HashSet<>();
         for (int i = 0; i < nodeNumber * 10; i++) {
-            Node node = client.leastLoadedNode(time.milliseconds());
+            Node node = client.leastLoadedNode(time.milliseconds()).node();
             assertNotNull(node, "Should provide a node");
             providedNodeIds.add(node);
             client.ready(node, time.milliseconds());
@@ -800,7 +838,7 @@ public class NetworkClientTest {
         client.poll(1, time.milliseconds());
 
         // leastloadednode should return null since the node is throttled
-        assertNull(client.leastLoadedNode(time.milliseconds()));
+        assertNull(client.leastLoadedNode(time.milliseconds()).node());
     }
 
     @Test
@@ -1046,7 +1084,8 @@ public class NetworkClientTest {
         NetworkClient client = new NetworkClient(metadataUpdater, null, 
selector, "mock", Integer.MAX_VALUE,
                 reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 
64 * 1024,
                 defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, 
connectionSetupTimeoutMaxMsTest,
-                time, false, new ApiVersions(), null, new LogContext(), 
mockHostResolver, mockClientTelemetrySender);
+                time, false, new ApiVersions(), null, new LogContext(), 
mockHostResolver, mockClientTelemetrySender,
+                MetadataRecoveryStrategy.NONE);
 
         // Connect to one the initial addresses, then change the addresses and 
disconnect
         client.ready(node, time.milliseconds());
@@ -1106,7 +1145,8 @@ public class NetworkClientTest {
         NetworkClient client = new NetworkClient(metadataUpdater, null, 
selector, "mock", Integer.MAX_VALUE,
                 reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 
64 * 1024,
                 defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, 
connectionSetupTimeoutMaxMsTest,
-                time, false, new ApiVersions(), null, new LogContext(), 
mockHostResolver, mockClientTelemetrySender);
+                time, false, new ApiVersions(), null, new LogContext(), 
mockHostResolver, mockClientTelemetrySender,
+                MetadataRecoveryStrategy.NONE);
 
         // First connection attempt should fail
         client.ready(node, time.milliseconds());
@@ -1158,7 +1198,8 @@ public class NetworkClientTest {
         NetworkClient client = new NetworkClient(metadataUpdater, null, 
selector, "mock", Integer.MAX_VALUE,
                 reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 
64 * 1024,
                 defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, 
connectionSetupTimeoutMaxMsTest,
-                time, false, new ApiVersions(), null, new LogContext(), 
mockHostResolver, mockClientTelemetrySender);
+                time, false, new ApiVersions(), null, new LogContext(), 
mockHostResolver, mockClientTelemetrySender,
+                MetadataRecoveryStrategy.NONE);
 
         // Connect to one the initial addresses, then change the addresses and 
disconnect
         client.ready(node, time.milliseconds());
@@ -1266,7 +1307,8 @@ public class NetworkClientTest {
         NetworkClient client = new NetworkClient(metadataUpdater, null, 
selector, "mock", Integer.MAX_VALUE,
             reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 
1024,
             defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, 
connectionSetupTimeoutMaxMsTest,
-            time, true, new ApiVersions(), null, new LogContext(), new 
DefaultHostResolver(), mockClientTelemetrySender);
+            time, true, new ApiVersions(), null, new LogContext(), new 
DefaultHostResolver(), mockClientTelemetrySender,
+            MetadataRecoveryStrategy.NONE);
 
         // Send the ApiVersionsRequest
         client.ready(node, time.milliseconds());
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientConfigTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientConfigTest.java
new file mode 100644
index 00000000000..92dc56fde46
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientConfigTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
+import org.apache.kafka.common.config.ConfigException;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class AdminClientConfigTest {
+    @Test
+    public void testDefaultMetadataRecoveryStrategy() {
+        Map<String, Object> configs = new HashMap<>();
+        final AdminClientConfig adminClientConfig = new 
AdminClientConfig(configs);
+        assertEquals(MetadataRecoveryStrategy.NONE.name, 
adminClientConfig.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
+    }
+
+    @Test
+    public void testInvalidMetadataRecoveryStrategy() {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, 
"abc");
+        ConfigException ce = assertThrows(ConfigException.class, () -> new 
AdminClientConfig(configs));
+        
assertTrue(ce.getMessage().contains(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
index 8e9fa5722fc..0fc8e6ca485 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients.consumer;
 
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.errors.InvalidConfigurationException;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
@@ -191,6 +192,25 @@ public class ConsumerConfigTest {
         assertEquals(remoteAssignorName, 
consumerConfig.getString(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG));
     }
 
+    @Test
+    public void testDefaultMetadataRecoveryStrategy() {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializerClass);
+        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClass);
+        final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
+        assertEquals(MetadataRecoveryStrategy.NONE.name, 
consumerConfig.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
+    }
+
+    @Test
+    public void testInvalidMetadataRecoveryStrategy() {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializerClass);
+        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClass);
+        configs.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, 
"abc");
+        ConfigException ce = assertThrows(ConfigException.class, () -> new 
ConsumerConfig(configs));
+        
assertTrue(ce.getMessage().contains(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
+    }
+
     @ParameterizedTest
     @CsvSource({"consumer, true", "classic, true", "Consumer, true", "Classic, 
true", "invalid, false"})
     public void testProtocolConfigValidation(String protocol, boolean isValid) 
{
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
index eaabcb8f814..71a267b5246 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.KafkaClient;
 import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.NodeApiVersions;
@@ -1909,7 +1910,8 @@ public class FetchRequestManagerTest {
         Node node = cluster.nodes().get(0);
         NetworkClient client = new NetworkClient(selector, metadata, "mock", 
Integer.MAX_VALUE,
                 1000, 1000, 64 * 1024, 64 * 1024, 1000, 10 * 1000, 127 * 1000,
-                time, true, new ApiVersions(), 
metricsManager.throttleTimeSensor(), new LogContext());
+                time, true, new ApiVersions(), 
metricsManager.throttleTimeSensor(), new LogContext(),
+                MetadataRecoveryStrategy.NONE);
 
         ApiVersionsResponse apiVersionsResponse = 
TestUtils.defaultApiVersionsResponse(
                 400, ApiMessageType.ListenerType.ZK_BROKER);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 1eac8709934..d0167e9b989 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.FetchSessionHandler;
 import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.NodeApiVersions;
@@ -1905,7 +1906,8 @@ public class FetcherTest {
         Node node = cluster.nodes().get(0);
         NetworkClient client = new NetworkClient(selector, metadata, "mock", 
Integer.MAX_VALUE,
                 1000, 1000, 64 * 1024, 64 * 1024, 1000, 10 * 1000, 127 * 1000,
-                time, true, new ApiVersions(), 
metricsManager.throttleTimeSensor(), new LogContext());
+                time, true, new ApiVersions(), 
metricsManager.throttleTimeSensor(), new LogContext(),
+                MetadataRecoveryStrategy.NONE);
 
         ApiVersionsResponse apiVersionsResponse = 
TestUtils.defaultApiVersionsResponse(
             400, ApiMessageType.ListenerType.ZK_BROKER);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 7d4aa5e3a85..17119a25290 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.producer;
 
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.LeastLoadedNode;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -735,8 +736,8 @@ public class KafkaProducerTest {
         // let mockClient#leastLoadedNode return the node directly so that we 
can isolate Metadata calls from KafkaProducer for idempotent producer
         MockClient mockClient = new MockClient(Time.SYSTEM, metadata) {
             @Override
-            public Node leastLoadedNode(long now) {
-                return NODE;
+            public LeastLoadedNode leastLoadedNode(long now) {
+                return new LeastLoadedNode(NODE, true);
             }
         };
 
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
index d7952320e9f..eba5e8a0a7b 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.clients.producer;
 
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -98,6 +99,25 @@ public class ProducerConfigTest {
         
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
     }
 
+    @Test
+    public void testDefaultMetadataRecoveryStrategy() {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
keySerializerClass);
+        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
valueSerializerClass);
+        final ProducerConfig producerConfig = new ProducerConfig(configs);
+        assertEquals(MetadataRecoveryStrategy.NONE.name, 
producerConfig.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
+    }
+
+    @Test
+    public void testInvalidMetadataRecoveryStrategy() {
+        Map<String, Object> configs = new HashMap<>();
+        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
keySerializerClass);
+        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
valueSerializerClass);
+        configs.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, 
"abc");
+        ConfigException ce = assertThrows(ConfigException.class, () -> new 
ProducerConfig(configs));
+        
assertTrue(ce.getMessage().contains(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
+    }
+
     @Test
     public void testCaseInsensitiveSecurityProtocol() {
         final String saslSslLowerCase = 
SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 5c1088987eb..cfeefc0ae5f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -19,7 +19,9 @@ package org.apache.kafka.clients.producer.internals;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.LeastLoadedNode;
 import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
 import org.apache.kafka.clients.MetadataSnapshot;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NetworkClient;
@@ -299,7 +301,8 @@ public class SenderTest {
         Node node = cluster.nodes().get(0);
         NetworkClient client = new NetworkClient(selector, metadata, "mock", 
Integer.MAX_VALUE,
                 1000, 1000, 64 * 1024, 64 * 1024, 1000, 10 * 1000, 127 * 1000,
-                time, true, new ApiVersions(), throttleTimeSensor, logContext);
+                time, true, new ApiVersions(), throttleTimeSensor, logContext,
+                MetadataRecoveryStrategy.NONE);
 
         ApiVersionsResponse apiVersionsResponse = 
TestUtils.defaultApiVersionsResponse(
             400, ApiMessageType.ListenerType.ZK_BROKER);
@@ -3797,12 +3800,12 @@ public class SenderTest {
         client = new MockClient(time, metadata) {
             volatile boolean canSendMore = true;
             @Override
-            public Node leastLoadedNode(long now) {
+            public LeastLoadedNode leastLoadedNode(long now) {
                 for (Node node : metadata.fetch().nodes()) {
                     if (isReady(node, now) && canSendMore)
-                        return node;
+                        return new LeastLoadedNode(node, true);
                 }
-                return null;
+                return new LeastLoadedNode(null, false);
             }
 
             @Override
@@ -3821,7 +3824,7 @@ public class SenderTest {
         while (!client.ready(node, time.milliseconds()))
             client.poll(0, time.milliseconds());
         client.send(request, time.milliseconds());
-        while (client.leastLoadedNode(time.milliseconds()) != null)
+        while (client.leastLoadedNode(time.milliseconds()).node() != null)
             client.poll(0, time.milliseconds());
     }
 
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index 14826e982d6..1880fa512df 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.connect.runtime.distributed;
 
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigException;
@@ -96,6 +97,10 @@ public class DistributedConfig extends WorkerConfig {
     public static final String REBALANCE_TIMEOUT_MS_CONFIG = 
CommonClientConfigs.REBALANCE_TIMEOUT_MS_CONFIG;
     private static final String REBALANCE_TIMEOUT_MS_DOC = 
CommonClientConfigs.REBALANCE_TIMEOUT_MS_DOC;
 
+    public static final String METADATA_RECOVERY_STRATEGY_CONFIG = 
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG;
+    private static final String METADATA_RECOVERY_STRATEGY_DOC = 
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC;
+    public static final String DEFAULT_METADATA_RECOVERY_STRATEGY = 
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY;
+
     /**
      * <code>worker.sync.timeout.ms</code>
      */
@@ -512,7 +517,14 @@ public class DistributedConfig extends WorkerConfig {
                             (name, value) -> 
validateVerificationAlgorithms(crypto, name, (List<String>) value),
                             () -> "A list of one or more MAC algorithms, each 
supported by the worker JVM"),
                     ConfigDef.Importance.LOW,
-                    INTER_WORKER_VERIFICATION_ALGORITHMS_DOC);
+                    INTER_WORKER_VERIFICATION_ALGORITHMS_DOC)
+            .define(METADATA_RECOVERY_STRATEGY_CONFIG,
+                    ConfigDef.Type.STRING,
+                    DEFAULT_METADATA_RECOVERY_STRATEGY,
+                    ConfigDef.CaseInsensitiveValidString
+                            
.in(Utils.enumOptions(MetadataRecoveryStrategy.class)),
+                    ConfigDef.Importance.LOW,
+                    METADATA_RECOVERY_STRATEGY_DOC);
     }
 
     private final ExactlyOnceSourceSupport exactlyOnceSourceSupport;
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 2c3537ea675..2ea83daf048 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
 import org.apache.kafka.clients.GroupRebalanceConfig;
@@ -119,7 +120,9 @@ public class WorkerGroupMember {
                     time,
                     true,
                     new ApiVersions(),
-                    logContext);
+                    logContext,
+                    
MetadataRecoveryStrategy.forName(config.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG))
+            );
             this.client = new ConsumerNetworkClient(
                     logContext,
                     netClient,
diff --git a/core/src/main/java/kafka/server/NetworkUtils.java 
b/core/src/main/java/kafka/server/NetworkUtils.java
index 5607f2623f9..83093c19e10 100644
--- a/core/src/main/java/kafka/server/NetworkUtils.java
+++ b/core/src/main/java/kafka/server/NetworkUtils.java
@@ -18,6 +18,7 @@ package kafka.server;
 
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ManualMetadataUpdater;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.common.Reconfigurable;
 import org.apache.kafka.common.metrics.Metrics;
@@ -84,7 +85,8 @@ public class NetworkUtils {
             time,
             true,
             new ApiVersions(),
-            logContext
+            logContext,
+            MetadataRecoveryStrategy.NONE
         );
     }
 }
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala 
b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
index 6cb273f066f..a8b7c8f59b5 100644
--- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
@@ -26,7 +26,7 @@ import joptsimple.OptionSpec
 import kafka.utils.Implicits._
 import kafka.utils.Logging
 import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ClientResponse, 
ClientUtils, CommonClientConfigs, Metadata, NetworkClient, NodeApiVersions}
+import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ClientResponse, 
ClientUtils, CommonClientConfigs, Metadata, MetadataRecoveryStrategy, 
NetworkClient, NodeApiVersions}
 import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient, 
RequestFuture}
 import org.apache.kafka.common.config.ConfigDef.ValidString._
 import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
@@ -310,7 +310,8 @@ object BrokerApiVersionsCommand {
         time,
         true,
         new ApiVersions,
-        logContext)
+        logContext,
+        MetadataRecoveryStrategy.NONE)
 
       val highLevelClient = new ConsumerNetworkClient(
         logContext,
diff --git 
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 67d3f3963b9..793b39538e7 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -164,7 +164,8 @@ class ControllerChannelManager(controllerEpoch: () => Int,
         time,
         false,
         new ApiVersions,
-        logContext
+        logContext,
+        MetadataRecoveryStrategy.NONE
       )
       (networkClient, reconfigurableChannelBuilder)
     }
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 068dff4cca6..44176d22763 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -96,7 +96,8 @@ object TransactionMarkerChannelManager {
       time,
       false,
       new ApiVersions,
-      logContext
+      logContext,
+      MetadataRecoveryStrategy.NONE
     )
 
     new TransactionMarkerChannelManager(config,
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala 
b/core/src/main/scala/kafka/raft/RaftManager.scala
index 48646e0b4d7..65ef855640c 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -30,7 +30,7 @@ import kafka.server.KafkaConfig
 import kafka.utils.CoreUtils
 import kafka.utils.FileLock
 import kafka.utils.Logging
-import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, 
NetworkClient}
+import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, 
MetadataRecoveryStrategy, NetworkClient}
 import org.apache.kafka.common.KafkaException
 import org.apache.kafka.common.Node
 import org.apache.kafka.common.TopicPartition
@@ -312,7 +312,8 @@ class KafkaRaftManager[T](
       time,
       discoverBrokerVersions,
       apiVersions,
-      logContext
+      logContext,
+      MetadataRecoveryStrategy.NONE
     )
 
     (controllerListenerName, networkClient)
diff --git a/core/src/main/scala/kafka/server/BrokerBlockingSender.scala 
b/core/src/main/scala/kafka/server/BrokerBlockingSender.scala
index 7d9fb0512a5..3cb692045b6 100644
--- a/core/src/main/scala/kafka/server/BrokerBlockingSender.scala
+++ b/core/src/main/scala/kafka/server/BrokerBlockingSender.scala
@@ -96,7 +96,8 @@ class BrokerBlockingSender(sourceBroker: BrokerEndPoint,
       time,
       false,
       new ApiVersions,
-      logContext
+      logContext,
+      MetadataRecoveryStrategy.NONE
     )
     (networkClient, reconfigurableChannelBuilder)
   }
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala 
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 5b6e04e5a0e..dffd2e7f697 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -30,7 +30,7 @@ import kafka.raft.KafkaRaftManager
 import kafka.server.metadata.{OffsetTrackingListener, ZkConfigRepository, 
ZkMetadataCache}
 import kafka.utils._
 import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient}
-import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, 
NetworkClient, NetworkClientUtils}
+import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater, 
MetadataRecoveryStrategy, NetworkClient, NetworkClientUtils}
 import org.apache.kafka.common.config.ConfigException
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.message.ApiMessageType.ListenerType
@@ -827,7 +827,8 @@ class KafkaServer(
           time,
           false,
           new ApiVersions,
-          logContext)
+          logContext,
+          MetadataRecoveryStrategy.NONE)
       }
 
       var shutdownSucceeded: Boolean = false
diff --git 
a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala 
b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
index a0e4bbbc463..6ce6e9e0a48 100644
--- a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
@@ -191,7 +191,8 @@ class NodeToControllerChannelManagerImpl(
         time,
         true,
         apiVersions,
-        logContext
+        logContext,
+        MetadataRecoveryStrategy.NONE
       )
     }
     val threadName = 
s"${threadNamePrefix}to-controller-${channelName}-channel-manager"
diff --git 
a/core/src/test/scala/integration/kafka/api/AdminClientRebootstrapTest.scala 
b/core/src/test/scala/integration/kafka/api/AdminClientRebootstrapTest.scala
new file mode 100644
index 00000000000..70b514f199b
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/AdminClientRebootstrapTest.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import org.junit.jupiter.api.Test
+
+class AdminClientRebootstrapTest extends RebootstrapTest {
+  @Test
+  def testRebootstrap(): Unit = {
+    server1.shutdown()
+    server1.awaitShutdown()
+
+    val adminClient = createAdminClient(configOverrides = clientOverrides)
+
+    // Only the server 0 is available for the admin client during the 
bootstrap.
+    adminClient.listTopics().names().get()
+
+    server0.shutdown()
+    server0.awaitShutdown()
+    server1.startup()
+
+    // The server 0, originally cached during the bootstrap, is offline.
+    // However, the server 1 from the bootstrap list is online.
+    // Should be able to list topics again.
+    adminClient.listTopics().names().get()
+
+    server1.shutdown()
+    server1.awaitShutdown()
+    server0.startup()
+
+    // The same situation, but the server 1 has gone and server 0 is back.
+    adminClient.listTopics().names().get()
+  }
+}
diff --git 
a/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala 
b/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
new file mode 100644
index 00000000000..9979e3eb91d
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.junit.jupiter.api.Test
+
+import java.util.Collections
+
+class ConsumerRebootstrapTest extends RebootstrapTest {
+  @Test
+  def testRebootstrap(): Unit = {
+    sendRecords(10, 0)
+
+    TestUtils.waitUntilTrue(
+      () => server0.logManager.logsByTopic(tp.topic()).head.logEndOffset == 
server1.logManager.logsByTopic(tp.topic()).head.logEndOffset,
+      "Timeout waiting for records to be replicated"
+    )
+
+    server1.shutdown()
+    server1.awaitShutdown()
+
+    val consumer = createConsumer(configOverrides = clientOverrides)
+
+    // Only the server 0 is available for the consumer during the bootstrap.
+    consumer.assign(Collections.singleton(tp))
+
+    consumeAndVerifyRecords(consumer, 10, 0)
+
+    // Bring back the server 1 and shut down 0.
+    server1.startup()
+
+    TestUtils.waitUntilTrue(
+      () => server0.logManager.logsByTopic(tp.topic()).head.logEndOffset == 
server1.logManager.logsByTopic(tp.topic()).head.logEndOffset,
+      "Timeout waiting for records to be replicated"
+    )
+
+    server0.shutdown()
+    server0.awaitShutdown()
+    sendRecords(10, 10)
+
+    // The server 0, originally cached during the bootstrap, is offline.
+    // However, the server 1 from the bootstrap list is online.
+    // Should be able to consume records.
+    consumeAndVerifyRecords(consumer, 10, 10, startingKeyAndValueIndex = 10, 
startingTimestamp = 10)
+
+    // Bring back the server 0 and shut down 1.
+    server0.startup()
+
+    TestUtils.waitUntilTrue(
+      () => server0.logManager.logsByTopic(tp.topic()).head.logEndOffset == 
server1.logManager.logsByTopic(tp.topic()).head.logEndOffset,
+      "Timeout waiting for records to be replicated"
+    )
+
+    server1.shutdown()
+    server1.awaitShutdown()
+    sendRecords(10, 20)
+
+    // The same situation, but the server 1 has gone and server 0 is back.
+    consumeAndVerifyRecords(consumer, 10, 20, startingKeyAndValueIndex = 20, 
startingTimestamp = 20)
+  }
+
+  private def sendRecords(numRecords: Int, from: Int): Unit = {
+    val producer: KafkaProducer[Array[Byte], Array[Byte]] = createProducer()
+    (from until (numRecords + from)).foreach { i =>
+      val record = new ProducerRecord(tp.topic(), tp.partition(), i.toLong, 
s"key $i".getBytes, s"value $i".getBytes)
+      producer.send(record)
+    }
+    producer.flush()
+    producer.close()
+  }
+}
diff --git 
a/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala 
b/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
new file mode 100644
index 00000000000..3cb40b6a0cf
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+class ProducerRebootstrapTest extends RebootstrapTest {
+  @Test
+  def testRebootstrap(): Unit = {
+    server1.shutdown()
+    server1.awaitShutdown()
+
+    val producer = createProducer(configOverrides = clientOverrides)
+
+    // Only the server 0 is available for the producer during the bootstrap.
+    producer.send(new ProducerRecord(topic, part, "key 0".getBytes, "value 
0".getBytes)).get()
+
+    server0.shutdown()
+    server0.awaitShutdown()
+    server1.startup()
+
+    // The server 0, originally cached during the bootstrap, is offline.
+    // However, the server 1 from the bootstrap list is online.
+    // Should be able to produce records.
+    val recordMetadata1 = producer.send(new ProducerRecord(topic, part, "key 
1".getBytes, "value 1".getBytes)).get()
+    assertEquals(0, recordMetadata1.offset())
+
+    server1.shutdown()
+    server1.awaitShutdown()
+    server0.startup()
+
+    // The same situation, but the server 1 has gone and server 0 is back.
+    val recordMetadata2 = producer.send(new ProducerRecord(topic, part, "key 
1".getBytes, "value 1".getBytes)).get()
+    assertEquals(1, recordMetadata2.offset())
+  }
+}
diff --git a/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala 
b/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
new file mode 100644
index 00000000000..b3b044ebcdb
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import kafka.server.{KafkaConfig, KafkaServer}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+
+import java.util.Properties
+
+abstract class RebootstrapTest extends AbstractConsumerTest {
+  override def brokerCount: Int = 2
+
+  def server0: KafkaServer = serverForId(0).get
+  def server1: KafkaServer = serverForId(1).get
+
+  override def generateConfigs: Seq[KafkaConfig] = {
+    val overridingProps = new Properties()
+    
overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
 brokerCount.toString)
+    overridingProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG, 
"true")
+
+    // In this test, fixed ports are necessary, because brokers must have the
+    // same port after the restart.
+    FixedPortTestUtils.createBrokerConfigs(brokerCount, zkConnect, 
enableControlledShutdown = false)
+      .map(KafkaConfig.fromProps(_, overridingProps))
+  }
+
+  def clientOverrides: Properties = {
+    val overrides = new Properties()
+    
overrides.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, 
"5000")
+    
overrides.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
 "5000")
+    overrides.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "1000")
+    overrides.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG, "1000")
+    overrides.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, 
"rebootstrap")
+    overrides
+  }
+}
diff --git 
a/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java 
b/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java
index c3e1cc1f7a0..970e695ba92 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.ClientResponse;
 import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.ManualMetadataUpdater;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.NetworkClientUtils;
 import org.apache.kafka.clients.admin.Admin;
@@ -706,7 +707,8 @@ public class ReplicaVerificationTool {
                 time,
                 false,
                 new ApiVersions(),
-                logContext
+                logContext,
+                
MetadataRecoveryStrategy.forName(consumerConfig.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG))
             );
         }
 
diff --git 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
index 6cd367c06d9..5afd9db26c3 100644
--- 
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
+++ 
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.JsonNode;
 import org.apache.kafka.clients.ApiVersions;
 import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.ManualMetadataUpdater;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
 import org.apache.kafka.clients.NetworkClient;
 import org.apache.kafka.clients.NetworkClientUtils;
 import org.apache.kafka.clients.admin.Admin;
@@ -179,7 +180,8 @@ public class ConnectionStressWorker implements TaskWorker {
                             TIME,
                             false,
                             new ApiVersions(),
-                            logContext)) {
+                            logContext,
+                            MetadataRecoveryStrategy.NONE)) {
                             NetworkClientUtils.awaitReady(client, targetNode, 
TIME, 500);
                         }
                     }

Reply via email to