This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 52d2fa5c8b3 KAFKA-17885: Enable clients to rebootstrap based on 
timeout or error code (KIP-1102) (#17720)
52d2fa5c8b3 is described below

commit 52d2fa5c8b3d5f737fe97645c65e73df872e2a67
Author: Rajini Sivaram <[email protected]>
AuthorDate: Wed Nov 13 13:01:08 2024 +0000

    KAFKA-17885: Enable clients to rebootstrap based on timeout or error code 
(KIP-1102) (#17720)
    
    Implementation of 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1102%3A+Enable+clients+to+rebootstrap+based+on+timeout+or+error+code
    - Introduces rebootstrap trigger interval config 
metadata.recovery.rebootstrap.trigger.ms, set to 5 minutes by default
    - Makes rebootstrap the default for metadata.recovery.strategy
    - Adds new error code REBOOTSTRAP_REQUIRED, introduces top-level error code 
in metadata response. On this error, clients rebootstrap.
    - Configs apply to producers, consumers, share consumers, admin clients, 
Connect and KStreams clients.
    
    Reviewers: Apoorv Mittal <[email protected]>,  Manikumar Reddy 
<[email protected]>
---
 checkstyle/suppressions.xml                        |  2 +-
 .../java/org/apache/kafka/clients/ClientUtils.java |  1 +
 .../apache/kafka/clients/CommonClientConfigs.java  | 11 ++-
 .../java/org/apache/kafka/clients/KafkaClient.java |  6 ++
 .../org/apache/kafka/clients/NetworkClient.java    | 96 ++++++++++++++++++++--
 .../kafka/clients/admin/AdminClientConfig.java     | 12 ++-
 .../kafka/clients/admin/KafkaAdminClient.java      | 22 ++++-
 .../admin/internals/AdminMetadataManager.java      | 22 +++++
 .../kafka/clients/consumer/ConsumerConfig.java     |  9 +-
 .../kafka/clients/producer/ProducerConfig.java     |  8 +-
 .../errors/RebootstrapRequiredException.java       | 30 +++++++
 .../org/apache/kafka/common/protocol/Errors.java   |  4 +-
 .../kafka/common/requests/MetadataRequest.java     |  1 +
 .../kafka/common/requests/MetadataResponse.java    |  4 +
 .../resources/common/message/MetadataRequest.json  |  3 +-
 .../resources/common/message/MetadataResponse.json |  8 +-
 .../java/org/apache/kafka/clients/MockClient.java  |  5 ++
 .../apache/kafka/clients/NetworkClientTest.java    | 63 +++++++++++++-
 .../kafka/clients/admin/AdminClientConfigTest.java |  3 +-
 .../admin/internals/AdminMetadataManagerTest.java  | 41 +++++++++
 .../kafka/clients/consumer/ConsumerConfigTest.java |  2 +-
 .../kafka/clients/producer/ProducerConfigTest.java |  2 +-
 .../runtime/distributed/DistributedConfig.java     | 13 ++-
 .../runtime/distributed/WorkerGroupMember.java     |  1 +
 .../integration/ConnectWorkerIntegrationTest.java  |  3 +
 .../kafka/api/AdminClientRebootstrapTest.scala     | 11 ++-
 .../kafka/api/ConsumerRebootstrapTest.scala        | 67 +++++++++++++--
 .../kafka/api/ProducerRebootstrapTest.scala        | 13 +--
 .../integration/kafka/api/RebootstrapTest.scala    | 15 ++--
 .../org/apache/kafka/streams/StreamsConfig.java    | 14 ++++
 30 files changed, 446 insertions(+), 46 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 819b53ddc4a..45873818535 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -119,7 +119,7 @@
               
files="(Sender|Fetcher|FetchRequestManager|OffsetFetcher|KafkaConsumer|Metrics|RequestResponse|TransactionManager|KafkaAdminClient|Message|KafkaProducer)Test.java"/>
 
     <suppress checks="ClassFanOutComplexity"
-              
files="(ConsumerCoordinator|KafkaConsumer|RequestResponse|Fetcher|FetchRequestManager|KafkaAdminClient|Message|KafkaProducer)Test.java"/>
+              
files="(ConsumerCoordinator|KafkaConsumer|RequestResponse|Fetcher|FetchRequestManager|KafkaAdminClient|Message|KafkaProducer|NetworkClient)Test.java"/>
 
     <suppress checks="ClassFanOutComplexity"
               files="MockAdminClient.java"/>
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java 
b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index c2c2fca17d1..e29133d0329 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -247,6 +247,7 @@ public final class ClientUtils {
                     logContext,
                     hostResolver,
                     clientTelemetrySender,
+                    
config.getLong(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG),
                     
MetadataRecoveryStrategy.forName(config.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG))
             );
         } catch (Throwable t) {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index 1a5bb595d6d..9c615bfbcc5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -230,8 +230,15 @@ public class CommonClientConfigs {
             "Brokers appear unavailable when disconnected and no current retry 
attempt is in-progress. " +
             "Consider increasing <code>reconnect.backoff.ms</code> and 
<code>reconnect.backoff.max.ms</code> and " +
             "decreasing <code>socket.connection.setup.timeout.ms</code> and 
<code>socket.connection.setup.timeout.max.ms</code> " +
-            "for the client.";
-    public static final String DEFAULT_METADATA_RECOVERY_STRATEGY = 
MetadataRecoveryStrategy.NONE.name;
+            "for the client. Rebootstrap is also triggered if connection 
cannot be established to any of the brokers for " +
+            "<code>metadata.recovery.rebootstrap.trigger.ms</code> 
milliseconds or if server requests rebootstrap.";
+    public static final String DEFAULT_METADATA_RECOVERY_STRATEGY = 
MetadataRecoveryStrategy.REBOOTSTRAP.name;
+
+    public static final String METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG 
= "metadata.recovery.rebootstrap.trigger.ms";
+    public static final String METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC = 
"If a client configured to rebootstrap using " +
+            "<code>metadata.recovery.strategy=rebootstrap</code> is unable to 
obtain metadata from any of the brokers in the last known " +
+            "metadata for this interval, client repeats the bootstrap process 
using <code>bootstrap.servers</code> configuration.";
+    public static final long DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS 
= 300 * 1000;
 
     /**
      * Postprocess the configuration so that exponential backoff is disabled 
when reconnect backoff
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 46b64986064..78c13985228 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -122,6 +122,12 @@ public interface KafkaClient extends Closeable {
      */
     void close(String nodeId);
 
+    /**
+     * Closes connections to all nodes. All requests on the connections will 
be cleared.  ClientRequest
+     * callbacks will not be invoked for the cleared requests, nor will they 
be returned from poll().
+     */
+    void closeAll();
+
     /**
      * Choose the node with the fewest outstanding requests. This method will 
prefer a node with an existing connection,
      * but will potentially choose a node for which we don't yet have a 
connection if all existing connections are in
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 2f3cd781627..044019d6581 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -115,6 +115,9 @@ public class NetworkClient implements KafkaClient {
     /* time in ms to wait before retrying to create connection to a server */
     private final long reconnectBackoffMs;
 
+    /* Timeout starting from an attempt to fetch metadata after which client 
rebootstraps */
+    private final long rebootstrapTriggerMs;
+
     private final MetadataRecoveryStrategy metadataRecoveryStrategy;
 
     private final Time time;
@@ -166,11 +169,51 @@ public class NetworkClient implements KafkaClient {
              time,
              discoverBrokerVersions,
              apiVersions,
-             null,
              logContext,
+             Long.MAX_VALUE,
              metadataRecoveryStrategy);
     }
 
+    public NetworkClient(Selectable selector,
+                         Metadata metadata,
+                         String clientId,
+                         int maxInFlightRequestsPerConnection,
+                         long reconnectBackoffMs,
+                         long reconnectBackoffMax,
+                         int socketSendBuffer,
+                         int socketReceiveBuffer,
+                         int defaultRequestTimeoutMs,
+                         long connectionSetupTimeoutMs,
+                         long connectionSetupTimeoutMaxMs,
+                         Time time,
+                         boolean discoverBrokerVersions,
+                         ApiVersions apiVersions,
+                         LogContext logContext,
+                         long rebootstrapTriggerMs,
+                         MetadataRecoveryStrategy metadataRecoveryStrategy) {
+        this(null,
+                metadata,
+                selector,
+                clientId,
+                maxInFlightRequestsPerConnection,
+                reconnectBackoffMs,
+                reconnectBackoffMax,
+                socketSendBuffer,
+                socketReceiveBuffer,
+                defaultRequestTimeoutMs,
+                connectionSetupTimeoutMs,
+                connectionSetupTimeoutMaxMs,
+                time,
+                discoverBrokerVersions,
+                apiVersions,
+                null,
+                logContext,
+                new DefaultHostResolver(),
+                null,
+                rebootstrapTriggerMs,
+                metadataRecoveryStrategy);
+    }
+
     public NetworkClient(Selectable selector,
                          Metadata metadata,
                          String clientId,
@@ -207,6 +250,7 @@ public class NetworkClient implements KafkaClient {
              logContext,
              new DefaultHostResolver(),
              null,
+             Long.MAX_VALUE,
              metadataRecoveryStrategy);
     }
 
@@ -245,6 +289,7 @@ public class NetworkClient implements KafkaClient {
              logContext,
              new DefaultHostResolver(),
              null,
+             Long.MAX_VALUE,
              metadataRecoveryStrategy);
     }
 
@@ -267,6 +312,7 @@ public class NetworkClient implements KafkaClient {
                          LogContext logContext,
                          HostResolver hostResolver,
                          ClientTelemetrySender clientTelemetrySender,
+                         long rebootstrapTriggerMs,
                          MetadataRecoveryStrategy metadataRecoveryStrategy) {
         /* It would be better if we could pass `DefaultMetadataUpdater` from 
the public constructor, but it's not
          * possible because `DefaultMetadataUpdater` is an inner class and it 
can only be instantiated after the
@@ -298,6 +344,7 @@ public class NetworkClient implements KafkaClient {
         this.log = logContext.logger(NetworkClient.class);
         this.state = new AtomicReference<>(State.ACTIVE);
         this.telemetrySender = (clientTelemetrySender != null) ? new 
TelemetrySender(clientTelemetrySender) : null;
+        this.rebootstrapTriggerMs = rebootstrapTriggerMs;
         this.metadataRecoveryStrategy = metadataRecoveryStrategy;
     }
 
@@ -401,6 +448,17 @@ public class NetworkClient implements KafkaClient {
         long now = time.milliseconds();
         cancelInFlightRequests(nodeId, now, null, false);
         connectionStates.remove(nodeId);
+        apiVersions.remove(nodeId);
+        nodesNeedingApiVersionsFetch.remove(nodeId);
+    }
+
+    @Override
+    public void closeAll() {
+        log.info("Client requested connection close from all nodes.");
+        List<Node> nodes = this.metadataUpdater.fetchNodes();
+        for (Node node : nodes) {
+            close(node.idString());
+        }
     }
 
     /**
@@ -1116,6 +1174,15 @@ public class NetworkClient implements KafkaClient {
         // Defined if there is a request in progress, null otherwise
         private InProgressData inProgress;
 
+        /*
+         * The time in wall-clock milliseconds when we started attempts to 
fetch metadata. If empty,
+         * metadata has not been requested. This is the start time based on 
which rebootstrap is
+         * triggered if metadata is not obtained for the configured 
rebootstrap trigger interval.
+         * Set to Optional.of(0L) to force rebootstrap immediately.
+         */
+        private Optional<Long> metadataAttemptStartMs = Optional.empty();
+
+
         DefaultMetadataUpdater(Metadata metadata) {
             this.metadata = metadata;
             this.inProgress = null;
@@ -1146,6 +1213,14 @@ public class NetworkClient implements KafkaClient {
                 return metadataTimeout;
             }
 
+            if (metadataRecoveryStrategy == 
MetadataRecoveryStrategy.REBOOTSTRAP) {
+                if (!metadataAttemptStartMs.isPresent())
+                    metadataAttemptStartMs = Optional.of(now);
+                else if (metadataAttemptStartMs.filter(startMs -> now - 
startMs > rebootstrapTriggerMs).isPresent()) {
+                    rebootstrap(now);
+                }
+            }
+
             // Beware that the behavior of this method and the computation of 
timeouts for poll() are
             // highly dependent on the behavior of leastLoadedNode.
             LeastLoadedNode leastLoadedNode = leastLoadedNode(now);
@@ -1153,7 +1228,7 @@ public class NetworkClient implements KafkaClient {
             // Rebootstrap if needed and configured.
             if (metadataRecoveryStrategy == 
MetadataRecoveryStrategy.REBOOTSTRAP
                     && !leastLoadedNode.hasNodeAvailableOrConnectionReady()) {
-                metadata.rebootstrap();
+                rebootstrap(now);
 
                 leastLoadedNode = leastLoadedNode(now);
             }
@@ -1219,13 +1294,18 @@ public class NetworkClient implements KafkaClient {
             if (!errors.isEmpty())
                 log.warn("The metadata response from the cluster reported a 
recoverable issue with correlation id {} : {}", requestHeader.correlationId(), 
errors);
 
-            // When talking to the startup phase of a broker, it is possible 
to receive an empty metadata set, which
-            // we should retry later.
-            if (response.brokers().isEmpty()) {
+            if (metadataRecoveryStrategy == 
MetadataRecoveryStrategy.REBOOTSTRAP && response.topLevelError() == 
Errors.REBOOTSTRAP_REQUIRED) {
+                log.info("Rebootstrap requested by server.");
+                metadataAttemptStartMs = Optional.of(0L); // to force 
rebootstrap
+                this.metadata.requestUpdate(true);
+            } else if (response.brokers().isEmpty()) {
+                // When talking to the startup phase of a broker, it is 
possible to receive an empty metadata set, which
+                // we should retry later.
                 log.trace("Ignoring empty metadata response with correlation 
id {}.", requestHeader.correlationId());
                 this.metadata.failedUpdate(now);
             } else {
                 this.metadata.update(inProgress.requestVersion, response, 
inProgress.isPartialUpdate, now);
+                metadataAttemptStartMs = Optional.empty();
             }
 
             inProgress = null;
@@ -1236,6 +1316,12 @@ public class NetworkClient implements KafkaClient {
             this.metadata.close();
         }
 
+        private void rebootstrap(long now) {
+            closeAll();
+            metadata.rebootstrap();
+            metadataAttemptStartMs = Optional.of(now);
+        }
+
         /**
          * Add a metadata request to the list of sends if we can make one
          */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index 2a00f450c5c..a87af6be154 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -141,6 +141,10 @@ public class AdminClientConfig extends AbstractConfig {
     public static final String METADATA_RECOVERY_STRATEGY_DOC = 
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC;
     public static final String DEFAULT_METADATA_RECOVERY_STRATEGY = 
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY;
 
+    public static final String METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG 
= CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG;
+    public static final String METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC = 
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC;
+    public static final long DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS 
= CommonClientConfigs.DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS;
+
     /**
      * <code>security.providers</code>
      */
@@ -270,7 +274,13 @@ public class AdminClientConfig extends AbstractConfig {
                                         ConfigDef.CaseInsensitiveValidString
                                                 
.in(Utils.enumOptions(MetadataRecoveryStrategy.class)),
                                         Importance.LOW,
-                                        METADATA_RECOVERY_STRATEGY_DOC);
+                                        METADATA_RECOVERY_STRATEGY_DOC)
+                                
.define(METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
+                                        Type.LONG,
+                                        
DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS,
+                                        atLeast(0),
+                                        Importance.LOW,
+                                        
METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC);
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index d9d8ce58460..7e8f42091fc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -406,6 +406,7 @@ public class KafkaAdminClient extends AdminClient {
     private final long retryBackoffMs;
     private final long retryBackoffMaxMs;
     private final ExponentialBackoff retryBackoff;
+    private final long rebootstrapTriggerMs;
     private final MetadataRecoveryStrategy metadataRecoveryStrategy;
     private final AdminFetchMetricsManager adminFetchMetricsManager;
     private final Optional<ClientTelemetryReporter> clientTelemetryReporter;
@@ -632,6 +633,7 @@ public class KafkaAdminClient extends AdminClient {
         List<MetricsReporter> reporters = 
CommonClientConfigs.metricsReporters(this.clientId, config);
         this.clientTelemetryReporter = clientTelemetryReporter;
         this.clientTelemetryReporter.ifPresent(reporters::add);
+        this.rebootstrapTriggerMs = 
config.getLong(AdminClientConfig.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG);
         this.metadataRecoveryStrategy = 
MetadataRecoveryStrategy.forName(config.getString(AdminClientConfig.METADATA_RECOVERY_STRATEGY_CONFIG));
         this.adminFetchMetricsManager = new AdminFetchMetricsManager(metrics);
         config.logUnused();
@@ -722,10 +724,15 @@ public class KafkaAdminClient extends AdminClient {
     private class MetadataUpdateNodeIdProvider implements NodeProvider {
         @Override
         public Node provide() {
-            LeastLoadedNode leastLoadedNode = 
client.leastLoadedNode(time.milliseconds());
+            long now = time.milliseconds();
+            if (metadataRecoveryStrategy == 
MetadataRecoveryStrategy.REBOOTSTRAP &&
+                    metadataManager.needsRebootstrap(now, 
rebootstrapTriggerMs)) {
+                rebootstrap(now);
+            }
+            LeastLoadedNode leastLoadedNode = client.leastLoadedNode(now);
             if (metadataRecoveryStrategy == 
MetadataRecoveryStrategy.REBOOTSTRAP
                     && !leastLoadedNode.hasNodeAvailableOrConnectionReady()) {
-                metadataManager.rebootstrap(time.milliseconds());
+                rebootstrap(now);
             }
 
             return leastLoadedNode.node();
@@ -735,6 +742,11 @@ public class KafkaAdminClient extends AdminClient {
         public boolean supportsUseControllers() {
             return true;
         }
+
+        private void rebootstrap(long now) {
+            client.closeAll();
+            metadataManager.rebootstrap(now);
+        }
     }
 
     private class ConstantNodeIdProvider implements NodeProvider {
@@ -1701,7 +1713,11 @@ public class KafkaAdminClient extends AdminClient {
                 public void handleResponse(AbstractResponse abstractResponse) {
                     MetadataResponse response = (MetadataResponse) 
abstractResponse;
                     long now = time.milliseconds();
-                    metadataManager.update(response.buildCluster(), now);
+
+                    if (response.topLevelError() == 
Errors.REBOOTSTRAP_REQUIRED)
+                        metadataManager.initiateRebootstrap();
+                    else
+                        metadataManager.update(response.buildCluster(), now);
 
                     // Unassign all unsent requests after a metadata refresh 
to allow for a new
                     // destination to be selected from the new metadata
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
index 90b237aa749..09fd000e50e 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
@@ -83,6 +83,15 @@ public class AdminMetadataManager {
      */
     private long lastMetadataFetchAttemptMs = 0;
 
+    /**
+     * The time in wall-clock milliseconds when we started attempts to fetch 
metadata. If empty,
+     * metadata has not been requested. This is the start time based on which 
rebootstrap is
+     * triggered if metadata is not obtained for the configured rebootstrap 
trigger interval.
+     * Set to Optional.of(0L) to force rebootstrap immediately.
+     */
+    private Optional<Long> metadataAttemptStartMs = Optional.empty();
+
+
     /**
      * The current cluster information.
      */
@@ -240,12 +249,18 @@ public class AdminMetadataManager {
         return Math.max(0, refreshBackoffMs - timeSinceAttempt);
     }
 
+    public boolean needsRebootstrap(long now, long rebootstrapTriggerMs) {
+        return metadataAttemptStartMs.filter(startMs -> now - startMs > 
rebootstrapTriggerMs).isPresent();
+    }
+
     /**
      * Transition into the UPDATE_PENDING state.  Updates 
lastMetadataFetchAttemptMs.
      */
     public void transitionToUpdatePending(long now) {
         this.state = State.UPDATE_PENDING;
         this.lastMetadataFetchAttemptMs = now;
+        if (!metadataAttemptStartMs.isPresent())
+            metadataAttemptStartMs = Optional.of(now);
     }
 
     public void updateFailed(Throwable exception) {
@@ -289,17 +304,24 @@ public class AdminMetadataManager {
 
         this.state = State.QUIESCENT;
         this.fatalException = null;
+        this.metadataAttemptStartMs = Optional.empty();
 
         if (!cluster.nodes().isEmpty()) {
             this.cluster = cluster;
         }
     }
 
+    public void initiateRebootstrap() {
+        requestUpdate();
+        this.metadataAttemptStartMs = Optional.of(0L);
+    }
+
     /**
      * Rebootstrap metadata with the cluster previously used for bootstrapping.
      */
     public void rebootstrap(long now) {
         log.info("Rebootstrapping with {}", this.bootstrapCluster);
         update(bootstrapCluster, now);
+        this.metadataAttemptStartMs = Optional.of(now);
     }
 }
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index cee249c280f..ff9bb6c1166 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -654,7 +654,14 @@ public class ConsumerConfig extends AbstractConfig {
                                         ConfigDef.CaseInsensitiveValidString
                                                 
.in(Utils.enumOptions(MetadataRecoveryStrategy.class)),
                                         Importance.LOW,
-                                        
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC);
+                                        
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC)
+                                
.define(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
+                                        Type.LONG,
+                                        
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS,
+                                        atLeast(0),
+                                        Importance.LOW,
+                                        
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC);
+
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 8b360d4d839..217d9495922 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -527,7 +527,13 @@ public class ProducerConfig extends AbstractConfig {
                                         ConfigDef.CaseInsensitiveValidString
                                                 
.in(Utils.enumOptions(MetadataRecoveryStrategy.class)),
                                         Importance.LOW,
-                                        
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC);
+                                        
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC)
+                                
.define(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
+                                        Type.LONG,
+                                        
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS,
+                                        atLeast(0),
+                                        Importance.LOW,
+                                        
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC);
     }
 
     @Override
diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/RebootstrapRequiredException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/RebootstrapRequiredException.java
new file mode 100644
index 00000000000..78a66aabd3e
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/RebootstrapRequiredException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+public class RebootstrapRequiredException extends ApiException {
+    private static final long serialVersionUID = 1L;
+
+    public RebootstrapRequiredException(String message) {
+        super(message);
+    }
+
+    public RebootstrapRequiredException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index a80ec308ebb..309ae7bc86a 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -107,6 +107,7 @@ import 
org.apache.kafka.common.errors.PrincipalDeserializationException;
 import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.ReassignmentInProgressException;
 import org.apache.kafka.common.errors.RebalanceInProgressException;
+import org.apache.kafka.common.errors.RebootstrapRequiredException;
 import org.apache.kafka.common.errors.RecordBatchTooLargeException;
 import org.apache.kafka.common.errors.RecordTooLargeException;
 import org.apache.kafka.common.errors.ReplicaNotAvailableException;
@@ -411,7 +412,8 @@ public enum Errors {
     INVALID_VOTER_KEY(125, "The voter key doesn't match the receiving 
replica's key.", InvalidVoterKeyException::new),
     DUPLICATE_VOTER(126, "The voter is already part of the set of voters.", 
DuplicateVoterException::new),
     VOTER_NOT_FOUND(127, "The voter is not part of the set of voters.", 
VoterNotFoundException::new),
-    INVALID_REGULAR_EXPRESSION(128, "The regular expression is not valid.", 
InvalidRegularExpression::new);
+    INVALID_REGULAR_EXPRESSION(128, "The regular expression is not valid.", 
InvalidRegularExpression::new),
+    REBOOTSTRAP_REQUIRED(129, "Client metadata is stale, client should 
rebootstrap to obtain new metadata.", RebootstrapRequiredException::new);
 
     private static final Logger log = LoggerFactory.getLogger(Errors.class);
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index 48609b1666c..2e60e04b2aa 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -156,6 +156,7 @@ public class MetadataRequest extends AbstractRequest {
         }
 
         responseData.setThrottleTimeMs(throttleTimeMs);
+        responseData.setErrorCode(error.code());
         return new MetadataResponse(responseData, true);
     }
 
diff --git 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index d7d9a6c3ba4..3a7e4f276d9 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -104,6 +104,10 @@ public class MetadataResponse extends AbstractResponse {
         return errors;
     }
 
+    public Errors topLevelError() {
+        return Errors.forCode(data.errorCode());
+    }
+
     /**
      * Get a map of the topicIds which had metadata errors
      * @return the map
diff --git a/clients/src/main/resources/common/message/MetadataRequest.json 
b/clients/src/main/resources/common/message/MetadataRequest.json
index 552dea0a6f8..437bd0ef50b 100644
--- a/clients/src/main/resources/common/message/MetadataRequest.json
+++ b/clients/src/main/resources/common/message/MetadataRequest.json
@@ -18,7 +18,7 @@
   "type": "request",
   "listeners": ["zkBroker", "broker"],
   "name": "MetadataRequest",
-  "validVersions": "0-12",
+  "validVersions": "0-13",
   "deprecatedVersions": "0-3",
   "flexibleVersions": "9+",
   "fields": [
@@ -40,6 +40,7 @@
     // Version 11 deprecates IncludeClusterAuthorizedOperations field. This is 
now exposed
     // by the DescribeCluster API (KIP-700).
     // Version 12 supports topic Id.
+    // Version 13 supports top-level error code in the response.
     { "name": "Topics", "type": "[]MetadataRequestTopic", "versions": "0+", 
"nullableVersions": "1+",
       "about": "The topics to fetch metadata for.", "fields": [
       { "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable": 
true, "about": "The topic id." },
diff --git a/clients/src/main/resources/common/message/MetadataResponse.json 
b/clients/src/main/resources/common/message/MetadataResponse.json
index 408cdc7940a..d65484b3f67 100644
--- a/clients/src/main/resources/common/message/MetadataResponse.json
+++ b/clients/src/main/resources/common/message/MetadataResponse.json
@@ -42,7 +42,8 @@
   // Version 11 deprecates ClusterAuthorizedOperations. This is now exposed
   // by the DescribeCluster API (KIP-700).
   // Version 12 supports topicId.
-  "validVersions": "0-12",
+  // Version 13 supports top-level error code in the response.
+  "validVersions": "0-13",
   "flexibleVersions": "9+",
   "fields": [
     { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", 
"ignorable": true,
@@ -93,6 +94,9 @@
         "about": "32-bit bitfield to represent authorized operations for this 
topic." }
     ]},
     { "name": "ClusterAuthorizedOperations", "type": "int32", "versions": 
"8-10", "default": "-2147483648",
-      "about": "32-bit bitfield to represent authorized operations for this 
cluster." }
+      "about": "32-bit bitfield to represent authorized operations for this 
cluster." },
+    { "name": "ErrorCode", "type": "int16", "versions": "13+", "ignorable": 
true,
+      "about": "The top-level error code, or 0 if there was no error." }
+
   ]
 }
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java 
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 8a195184e93..6386a11a322 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -588,6 +588,11 @@ public class MockClient implements KafkaClient {
         connections.remove(node);
     }
 
+    @Override
+    public void closeAll() {
+        connections.clear();
+    }
+
     @Override
     public LeastLoadedNode leastLoadedNode(long now) {
         // Consistent with NetworkClient, we do not return nodes awaiting 
reconnect backoff
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 89f567157c3..3fd75a261d8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.RebootstrapRequiredException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.message.ApiMessageType;
@@ -59,6 +60,7 @@ import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
 import java.net.InetAddress;
+import java.net.InetSocketAddress;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -241,6 +243,59 @@ public class NetworkClientTest {
         assertEquals(UnsupportedVersionException.class, 
metadataUpdater.getAndClearFailure().getClass());
     }
 
+    @Test
+    public void testRebootstrap() {
+        long rebootstrapTriggerMs = 1000;
+        AtomicInteger rebootstrapCount = new AtomicInteger();
+        Metadata metadata = new Metadata(50, 50, 5000, new LogContext(), new 
ClusterResourceListeners()) {
+            @Override
+            public synchronized void rebootstrap() {
+                super.rebootstrap();
+                rebootstrapCount.incrementAndGet();
+            }
+        };
+
+        NetworkClient client = new NetworkClient(selector, metadata, "mock", 
Integer.MAX_VALUE,
+                reconnectBackoffMsTest, 0, 64 * 1024, 64 * 1024,
+                defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, 
connectionSetupTimeoutMaxMsTest, time, false, new ApiVersions(), new 
LogContext(),
+                rebootstrapTriggerMs,
+                MetadataRecoveryStrategy.REBOOTSTRAP);
+        MetadataUpdater metadataUpdater = TestUtils.fieldValue(client, 
NetworkClient.class, "metadataUpdater");
+        metadata.bootstrap(Collections.singletonList(new 
InetSocketAddress("localhost", 9999)));
+
+        metadata.requestUpdate(true);
+        client.poll(0, time.milliseconds());
+        time.sleep(rebootstrapTriggerMs + 1);
+        client.poll(0, time.milliseconds());
+        assertEquals(1, rebootstrapCount.get());
+        time.sleep(1);
+        client.poll(0, time.milliseconds());
+        assertEquals(1, rebootstrapCount.get());
+
+        metadata.requestUpdate(true);
+        client.poll(0, time.milliseconds());
+        assertEquals(1, rebootstrapCount.get());
+        metadataUpdater.handleFailedRequest(time.milliseconds(), 
Optional.of(new KafkaException()));
+        client.poll(0, time.milliseconds());
+        assertEquals(1, rebootstrapCount.get());
+        time.sleep(rebootstrapTriggerMs);
+        client.poll(0, time.milliseconds());
+        assertEquals(2, rebootstrapCount.get());
+
+        metadata.requestUpdate(true);
+        client.poll(0, time.milliseconds());
+        assertEquals(2, rebootstrapCount.get());
+
+        MetadataRequest.Builder builder = new 
MetadataRequest.Builder(Collections.emptyList(), true);
+        ClientRequest request = client.newClientRequest(node.idString(), 
builder, time.milliseconds(), true);
+        MetadataResponse rebootstrapResponse = (MetadataResponse) 
builder.build().getErrorResponse(0, new 
RebootstrapRequiredException("rebootstrap"));
+        
metadataUpdater.handleSuccessfulResponse(request.makeHeader(builder.latestAllowedVersion()),
 time.milliseconds(), rebootstrapResponse);
+        assertEquals(2, rebootstrapCount.get());
+        time.sleep(50);
+        client.poll(0, time.milliseconds());
+        assertEquals(3, rebootstrapCount.get());
+    }
+
     private void checkSimpleRequestResponse(NetworkClient networkClient) {
         awaitReady(networkClient, node); // has to be before creating any 
request, as it may send ApiVersionsRequest and its response is mocked with 
correlation id 0
         short requestVersion = PRODUCE.latestVersion();
@@ -1086,7 +1141,7 @@ public class NetworkClientTest {
                 reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 
64 * 1024,
                 defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, 
connectionSetupTimeoutMaxMsTest,
                 time, false, new ApiVersions(), null, new LogContext(), 
mockHostResolver, mockClientTelemetrySender,
-                MetadataRecoveryStrategy.NONE);
+                Long.MAX_VALUE, MetadataRecoveryStrategy.NONE);
 
         // Connect to one the initial addresses, then change the addresses and 
disconnect
         client.ready(node, time.milliseconds());
@@ -1147,7 +1202,7 @@ public class NetworkClientTest {
                 reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 
64 * 1024,
                 defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, 
connectionSetupTimeoutMaxMsTest,
                 time, false, new ApiVersions(), null, new LogContext(), 
mockHostResolver, mockClientTelemetrySender,
-                MetadataRecoveryStrategy.NONE);
+                Long.MAX_VALUE, MetadataRecoveryStrategy.NONE);
 
         // First connection attempt should fail
         client.ready(node, time.milliseconds());
@@ -1200,7 +1255,7 @@ public class NetworkClientTest {
                 reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 
64 * 1024,
                 defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, 
connectionSetupTimeoutMaxMsTest,
                 time, false, new ApiVersions(), null, new LogContext(), 
mockHostResolver, mockClientTelemetrySender,
-                MetadataRecoveryStrategy.NONE);
+                Long.MAX_VALUE, MetadataRecoveryStrategy.NONE);
 
         // Connect to one the initial addresses, then change the addresses and 
disconnect
         client.ready(node, time.milliseconds());
@@ -1309,7 +1364,7 @@ public class NetworkClientTest {
             reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 * 
1024,
             defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, 
connectionSetupTimeoutMaxMsTest,
             time, true, new ApiVersions(), null, new LogContext(), new 
DefaultHostResolver(), mockClientTelemetrySender,
-            MetadataRecoveryStrategy.NONE);
+            Long.MAX_VALUE, MetadataRecoveryStrategy.NONE);
 
         // Send the ApiVersionsRequest
         client.ready(node, time.milliseconds());
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientConfigTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientConfigTest.java
index 252aa63109a..6d0ec9e8a8e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientConfigTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientConfigTest.java
@@ -35,7 +35,8 @@ public class AdminClientConfigTest {
     public void testDefaultMetadataRecoveryStrategy() {
         Map<String, Object> configs = new HashMap<>();
         final AdminClientConfig adminClientConfig = new 
AdminClientConfig(configs);
-        assertEquals(MetadataRecoveryStrategy.NONE.name, 
adminClientConfig.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
+        assertEquals(MetadataRecoveryStrategy.REBOOTSTRAP.name, 
adminClientConfig.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
+
     }
 
     @Test
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminMetadataManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminMetadataManagerTest.java
index 54ac4175582..5620dd06a5e 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminMetadataManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminMetadataManagerTest.java
@@ -98,6 +98,47 @@ public class AdminMetadataManagerTest {
         assertTrue(mgr.isReady());
     }
 
+    @Test
+    public void testNeedsRebootstrap() {
+        long rebootstrapTriggerMs = 1000;
+        mgr.update(Cluster.bootstrap(Collections.singletonList(new 
InetSocketAddress("localhost", 9999))), time.milliseconds());
+        assertFalse(mgr.needsRebootstrap(time.milliseconds(), 
rebootstrapTriggerMs));
+        assertFalse(mgr.needsRebootstrap(time.milliseconds() + 2000, 
rebootstrapTriggerMs));
+
+        mgr.transitionToUpdatePending(time.milliseconds());
+        assertFalse(mgr.needsRebootstrap(time.milliseconds(), 
rebootstrapTriggerMs));
+        assertTrue(mgr.needsRebootstrap(time.milliseconds() + 1001, 
rebootstrapTriggerMs));
+
+        time.sleep(100);
+        mgr.updateFailed(new RuntimeException());
+        assertFalse(mgr.needsRebootstrap(time.milliseconds() + 900, 
rebootstrapTriggerMs));
+        assertTrue(mgr.needsRebootstrap(time.milliseconds() + 901, 
rebootstrapTriggerMs));
+
+        time.sleep(1000);
+        mgr.update(mockCluster(), time.milliseconds());
+        assertFalse(mgr.needsRebootstrap(time.milliseconds(), 
rebootstrapTriggerMs));
+        assertFalse(mgr.needsRebootstrap(time.milliseconds() + 2000, 
rebootstrapTriggerMs));
+
+        time.sleep(1000);
+        mgr.transitionToUpdatePending(time.milliseconds());
+        assertFalse(mgr.needsRebootstrap(time.milliseconds(), 
rebootstrapTriggerMs));
+        assertTrue(mgr.needsRebootstrap(time.milliseconds() + 1001, 
rebootstrapTriggerMs));
+
+        time.sleep(1001);
+        assertTrue(mgr.needsRebootstrap(time.milliseconds(), 
rebootstrapTriggerMs));
+        mgr.rebootstrap(time.milliseconds());
+        assertFalse(mgr.needsRebootstrap(time.milliseconds(), 
rebootstrapTriggerMs));
+        assertFalse(mgr.needsRebootstrap(time.milliseconds() + 1000, 
rebootstrapTriggerMs));
+        assertTrue(mgr.needsRebootstrap(time.milliseconds() + 1001, 
rebootstrapTriggerMs));
+
+        mgr.initiateRebootstrap();
+        assertTrue(mgr.needsRebootstrap(time.milliseconds(), 
rebootstrapTriggerMs));
+        mgr.rebootstrap(time.milliseconds());
+        assertFalse(mgr.needsRebootstrap(time.milliseconds(), 
rebootstrapTriggerMs));
+        assertFalse(mgr.needsRebootstrap(time.milliseconds() + 1000, 
rebootstrapTriggerMs));
+        assertTrue(mgr.needsRebootstrap(time.milliseconds() + 1001, 
rebootstrapTriggerMs));
+    }
+
     private static Cluster mockCluster() {
         HashMap<Integer, Node> nodes = new HashMap<>();
         nodes.put(0, new Node(0, "localhost", 8121));
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
index 99c45f05c15..cbe0bd6ef35 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
@@ -210,7 +210,7 @@ public class ConsumerConfigTest {
         configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
keyDeserializerClass);
         configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
valueDeserializerClass);
         final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
-        assertEquals(MetadataRecoveryStrategy.NONE.name, 
consumerConfig.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
+        assertEquals(MetadataRecoveryStrategy.REBOOTSTRAP.name, 
consumerConfig.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
     }
 
     @Test
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
index f3ec9ca96c2..d8c77433545 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
@@ -106,7 +106,7 @@ public class ProducerConfigTest {
         configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
keySerializerClass);
         configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
valueSerializerClass);
         final ProducerConfig producerConfig = new ProducerConfig(configs);
-        assertEquals(MetadataRecoveryStrategy.NONE.name, 
producerConfig.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
+        assertEquals(MetadataRecoveryStrategy.REBOOTSTRAP.name, 
producerConfig.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
     }
 
     @Test
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index 9150000223b..16ab0d47a3c 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -103,6 +103,10 @@ public final class DistributedConfig extends WorkerConfig {
     private static final String METADATA_RECOVERY_STRATEGY_DOC = 
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC;
     public static final String DEFAULT_METADATA_RECOVERY_STRATEGY = 
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY;
 
+    public static final String METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG 
= CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG;
+    private static final String METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC = 
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC;
+    public static final long DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS 
= CommonClientConfigs.DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS;
+
     /**
      * <code>worker.sync.timeout.ms</code>
      */
@@ -526,7 +530,14 @@ public final class DistributedConfig extends WorkerConfig {
                     ConfigDef.CaseInsensitiveValidString
                             
.in(Utils.enumOptions(MetadataRecoveryStrategy.class)),
                     ConfigDef.Importance.LOW,
-                    METADATA_RECOVERY_STRATEGY_DOC);
+                    METADATA_RECOVERY_STRATEGY_DOC)
+            .define(METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
+                    ConfigDef.Type.LONG,
+                    DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS,
+                    atLeast(0),
+                    ConfigDef.Importance.LOW,
+                    METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC);
+
     }
 
     private final ExactlyOnceSourceSupport exactlyOnceSourceSupport;
diff --git 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index f4fdcaf801e..a3982f070a4 100644
--- 
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ 
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -122,6 +122,7 @@ public class WorkerGroupMember {
                     true,
                     new ApiVersions(),
                     logContext,
+                    
config.getLong(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG),
                     
MetadataRecoveryStrategy.forName(config.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG))
             );
             this.client = new ConsumerNetworkClient(
diff --git 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index 50e690057ec..1a78643950d 100644
--- 
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++ 
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.integration;
 
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.provider.FileConfigProvider;
@@ -78,6 +79,7 @@ import javax.ws.rs.core.Response;
 
 import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
 import static 
org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static 
org.apache.kafka.clients.CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG;
 import static 
org.apache.kafka.common.config.AbstractConfig.CONFIG_PROVIDERS_CONFIG;
 import static 
org.apache.kafka.common.config.TopicConfig.DELETE_RETENTION_MS_CONFIG;
 import static org.apache.kafka.common.config.TopicConfig.SEGMENT_MS_CONFIG;
@@ -840,6 +842,7 @@ public class ConnectWorkerIntegrationTest {
         // Workaround for KAFKA-15676, which can cause the scheduled rebalance 
delay to
         // be spuriously triggered after the group coordinator for a Connect 
cluster is bounced
         workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, "0");
+        workerProps.put(METADATA_RECOVERY_STRATEGY_CONFIG, 
MetadataRecoveryStrategy.NONE.name);
 
         useFixedBrokerPort();
 
diff --git 
a/core/src/test/scala/integration/kafka/api/AdminClientRebootstrapTest.scala 
b/core/src/test/scala/integration/kafka/api/AdminClientRebootstrapTest.scala
index 70b514f199b..d9cc326ff94 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientRebootstrapTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientRebootstrapTest.scala
@@ -16,15 +16,18 @@
  */
 package kafka.api
 
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 class AdminClientRebootstrapTest extends RebootstrapTest {
-  @Test
-  def testRebootstrap(): Unit = {
+  @ParameterizedTest
+  @ValueSource(booleans = Array(false, true))
+  def testRebootstrap(useRebootstrapTriggerMs: Boolean): Unit = {
+
     server1.shutdown()
     server1.awaitShutdown()
 
-    val adminClient = createAdminClient(configOverrides = clientOverrides)
+    val adminClient = createAdminClient(configOverrides = 
clientOverrides(useRebootstrapTriggerMs))
 
     // Only the server 0 is available for the admin client during the 
bootstrap.
     adminClient.listTopics().names().get()
diff --git 
a/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala 
b/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
index 51a5afe391b..c7c87ce3b84 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
@@ -16,17 +16,24 @@
  */
 package kafka.api
 
+import kafka.api.ConsumerRebootstrapTest._
+import 
kafka.server.QuorumTestHarness.getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit
 import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.CommonClientConfigs
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
 import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.MethodSource
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
 
-import java.util.Collections
+import java.time.Duration
+import java.util.{Collections, stream}
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.TimeoutException
 
 class ConsumerRebootstrapTest extends RebootstrapTest {
-  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-  
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
-  def testRebootstrap(quorum: String, groupProtocol: String): Unit = {
+  @ParameterizedTest(name = RebootstrapTestName)
+  @MethodSource(Array("rebootstrapTestParams"))
+  def testRebootstrap(quorum: String, groupProtocol: String, 
useRebootstrapTriggerMs: Boolean): Unit = {
     sendRecords(10, 0)
 
     TestUtils.waitUntilTrue(
@@ -37,7 +44,7 @@ class ConsumerRebootstrapTest extends RebootstrapTest {
     server1.shutdown()
     server1.awaitShutdown()
 
-    val consumer = createConsumer(configOverrides = clientOverrides)
+    val consumer = createConsumer(configOverrides = 
clientOverrides(useRebootstrapTriggerMs))
 
     // Only the server 0 is available for the consumer during the bootstrap.
     consumer.assign(Collections.singleton(tp))
@@ -77,6 +84,40 @@ class ConsumerRebootstrapTest extends RebootstrapTest {
     consumeAndVerifyRecords(consumer, 10, 20, startingKeyAndValueIndex = 20, 
startingTimestamp = 20)
   }
 
+  @ParameterizedTest(name = RebootstrapTestName)
+  @MethodSource(Array("rebootstrapTestParams"))
+  def testRebootstrapDisabled(quorum: String, groupProtocol: String, 
useRebootstrapTriggerMs: Boolean): Unit = {
+    server1.shutdown()
+    server1.awaitShutdown()
+
+    val configOverrides = clientOverrides(useRebootstrapTriggerMs)
+    configOverrides.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, 
"none")
+    if (useRebootstrapTriggerMs)
+      
configOverrides.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
 "1000")
+
+    val producer = createProducer(configOverrides = configOverrides)
+    val consumer = createConsumer(configOverrides = configOverrides)
+    val adminClient = createAdminClient(configOverrides = configOverrides)
+
+    // Only the server 0 is available during the bootstrap.
+    val recordMetadata0 = producer.send(new ProducerRecord(topic, part, 0L, 
"key 0".getBytes, "value 0".getBytes)).get(15, TimeUnit.SECONDS)
+    assertEquals(0, recordMetadata0.offset())
+    adminClient.listTopics().names().get(15, TimeUnit.SECONDS)
+    consumer.assign(Collections.singleton(tp))
+    consumeAndVerifyRecords(consumer, 1, 0)
+
+    server0.shutdown()
+    server0.awaitShutdown()
+    server1.startup()
+
+    assertThrows(classOf[TimeoutException], () => producer.send(new 
ProducerRecord(topic, part, "key 2".getBytes, "value 2".getBytes)).get(5, 
TimeUnit.SECONDS))
+    assertThrows(classOf[TimeoutException], () => 
adminClient.listTopics().names().get(5, TimeUnit.SECONDS))
+
+    val producer2 = createProducer(configOverrides = configOverrides)
+    producer2.send(new ProducerRecord(topic, part, 1L, "key 1".getBytes, 
"value 1".getBytes)).get(15, TimeUnit.SECONDS)
+    assertEquals(0, consumer.poll(Duration.ofSeconds(5)).count)
+  }
+
   private def sendRecords(numRecords: Int, from: Int): Unit = {
     val producer: KafkaProducer[Array[Byte], Array[Byte]] = createProducer()
     (from until (numRecords + from)).foreach { i =>
@@ -87,3 +128,17 @@ class ConsumerRebootstrapTest extends RebootstrapTest {
     producer.close()
   }
 }
+
+object ConsumerRebootstrapTest {
+
+  final val RebootstrapTestName = 
s"${TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames}.useRebootstrapTriggerMs={2}"
+  def rebootstrapTestParams: stream.Stream[Arguments] = {
+    assertEquals(1, 
getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit.count())
+    val args = 
getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit
+      .findFirst().get.get
+    stream.Stream.of(
+      Arguments.of((args :+ true):_*),
+      Arguments.of((args :+ false):_*)
+    )
+  }
+}
diff --git 
a/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala 
b/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
index 3cb40b6a0cf..d0eabf370cb 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
@@ -18,18 +18,21 @@ package kafka.api
 
 import org.apache.kafka.clients.producer.ProducerRecord
 import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
 
 class ProducerRebootstrapTest extends RebootstrapTest {
-  @Test
-  def testRebootstrap(): Unit = {
+  @ParameterizedTest
+  @ValueSource(booleans = Array(false, true))
+  def testRebootstrap(useRebootstrapTriggerMs: Boolean): Unit = {
     server1.shutdown()
     server1.awaitShutdown()
 
-    val producer = createProducer(configOverrides = clientOverrides)
+    val producer = createProducer(configOverrides = 
clientOverrides(useRebootstrapTriggerMs))
 
     // Only the server 0 is available for the producer during the bootstrap.
-    producer.send(new ProducerRecord(topic, part, "key 0".getBytes, "value 
0".getBytes)).get()
+    val recordMetadata0 = producer.send(new ProducerRecord(topic, part, "key 
0".getBytes, "value 0".getBytes)).get()
+    assertEquals(0, recordMetadata0.offset())
 
     server0.shutdown()
     server0.awaitShutdown()
diff --git a/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala 
b/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
index b3b044ebcdb..45324a89c6e 100644
--- a/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
+++ b/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
@@ -40,12 +40,17 @@ abstract class RebootstrapTest extends AbstractConsumerTest 
{
       .map(KafkaConfig.fromProps(_, overridingProps))
   }
 
-  def clientOverrides: Properties = {
+  def clientOverrides(useRebootstrapTriggerMs: Boolean): Properties = {
     val overrides = new Properties()
-    
overrides.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, 
"5000")
-    
overrides.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
 "5000")
-    overrides.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "1000")
-    overrides.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG, "1000")
+    if (useRebootstrapTriggerMs) {
+      
overrides.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
 "5000")
+    } else {
+      
overrides.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
 "3600000")
+      
overrides.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG, 
"5000")
+      
overrides.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
 "5000")
+      overrides.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "1000")
+      overrides.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG, 
"1000")
+    }
     overrides.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG, 
"rebootstrap")
     overrides
   }
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index dafef7d7bc4..29b92408ef5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams;
 
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
 import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.clients.admin.AdminClientConfig;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -1147,6 +1148,19 @@ public class StreamsConfig extends AbstractConfig {
                     atLeast(0),
                     Importance.LOW,
                     CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC)
+            .define(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
+                    Type.STRING,
+                    CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY,
+                    ConfigDef.CaseInsensitiveValidString
+                    .in(Utils.enumOptions(MetadataRecoveryStrategy.class)),
+                    Importance.LOW,
+                    CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC)
+            
.define(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
+                    Type.LONG,
+                    
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS,
+                    atLeast(0),
+                    Importance.LOW,
+                    
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC)
             .define(ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
                     Type.CLASS,
                     null,

Reply via email to