This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.8 by this push:
new 62fb6a3ef19 KAFKA-8206: KIP-899: Allow client to rebootstrap (#13277)
62fb6a3ef19 is described below
commit 62fb6a3ef1996f2b5ec80b7923bf27583531babb
Author: Ivan Yurchenko <[email protected]>
AuthorDate: Wed Jun 12 22:48:32 2024 +0300
KAFKA-8206: KIP-899: Allow client to rebootstrap (#13277)
This commit implements KIP-899: Allow producer and consumer clients to
rebootstrap. It introduces the new setting `metadata.recovery.strategy`,
applicable to all the types of clients.
Reviewers: Greg Harris <[email protected]>, Rajini Sivaram
<[email protected]>
---
.../java/org/apache/kafka/clients/ClientUtils.java | 4 +-
.../apache/kafka/clients/CommonClientConfigs.java | 13 ++++
.../java/org/apache/kafka/clients/KafkaClient.java | 2 +-
.../org/apache/kafka/clients/LeastLoadedNode.java | 43 +++++++++++
.../java/org/apache/kafka/clients/Metadata.java | 8 ++
.../kafka/clients/MetadataRecoveryStrategy.java | 44 +++++++++++
.../org/apache/kafka/clients/NetworkClient.java | 62 ++++++++++-----
.../kafka/clients/admin/AdminClientConfig.java | 14 +++-
.../kafka/clients/admin/KafkaAdminClient.java | 16 +++-
.../admin/internals/AdminMetadataManager.java | 14 ++++
.../kafka/clients/consumer/ConsumerConfig.java | 10 ++-
.../consumer/internals/ConsumerNetworkClient.java | 2 +-
.../consumer/internals/NetworkClientDelegate.java | 4 +-
.../kafka/clients/producer/ProducerConfig.java | 10 ++-
.../kafka/clients/producer/internals/Sender.java | 2 +-
.../java/org/apache/kafka/clients/MockClient.java | 8 +-
.../apache/kafka/clients/NetworkClientTest.java | 72 ++++++++++++++----
.../kafka/clients/admin/AdminClientConfigTest.java | 47 ++++++++++++
.../kafka/clients/consumer/ConsumerConfigTest.java | 20 +++++
.../internals/FetchRequestManagerTest.java | 4 +-
.../clients/consumer/internals/FetcherTest.java | 4 +-
.../kafka/clients/producer/KafkaProducerTest.java | 5 +-
.../kafka/clients/producer/ProducerConfigTest.java | 20 +++++
.../clients/producer/internals/SenderTest.java | 13 ++--
.../runtime/distributed/DistributedConfig.java | 14 +++-
.../runtime/distributed/WorkerGroupMember.java | 5 +-
core/src/main/java/kafka/server/NetworkUtils.java | 4 +-
.../kafka/admin/BrokerApiVersionsCommand.scala | 5 +-
.../controller/ControllerChannelManager.scala | 3 +-
.../TransactionMarkerChannelManager.scala | 3 +-
core/src/main/scala/kafka/raft/RaftManager.scala | 5 +-
.../scala/kafka/server/BrokerBlockingSender.scala | 3 +-
core/src/main/scala/kafka/server/KafkaServer.scala | 5 +-
.../server/NodeToControllerChannelManager.scala | 3 +-
.../kafka/api/AdminClientRebootstrapTest.scala | 48 ++++++++++++
.../kafka/api/ConsumerRebootstrapTest.scala | 87 ++++++++++++++++++++++
.../kafka/api/ProducerRebootstrapTest.scala | 52 +++++++++++++
.../integration/kafka/api/RebootstrapTest.scala | 52 +++++++++++++
.../kafka/tools/ReplicaVerificationTool.java | 4 +-
.../trogdor/workload/ConnectionStressWorker.java | 4 +-
40 files changed, 666 insertions(+), 72 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index 6b6b56059c8..b08bc12950c 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -245,7 +245,9 @@ public final class ClientUtils {
throttleTimeSensor,
logContext,
hostResolver,
- clientTelemetrySender);
+ clientTelemetrySender,
+
MetadataRecoveryStrategy.forName(config.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG))
+ );
} catch (Throwable t) {
closeQuietly(selector, "Selector");
closeQuietly(channelBuilder, "ChannelBuilder");
diff --git
a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index f5a08da2c93..9803d487f41 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -219,6 +219,19 @@ public class CommonClientConfigs {
public static final String DEFAULT_API_TIMEOUT_MS_DOC = "Specifies the
timeout (in milliseconds) for client APIs. " +
"This configuration is used as the default timeout for all client
operations that do not specify a <code>timeout</code> parameter.";
+ public static final String METADATA_RECOVERY_STRATEGY_CONFIG =
"metadata.recovery.strategy";
+ public static final String METADATA_RECOVERY_STRATEGY_DOC = "Controls how
the client recovers when none of the brokers known to it is available. " +
+ "If set to <code>none</code>, the client fails. If set to
<code>rebootstrap</code>, " +
+ "the client repeats the bootstrap process using
<code>bootstrap.servers</code>. " +
+ "Rebootstrapping is useful when a client communicates with brokers
so infrequently " +
+ "that the set of brokers may change entirely before the client
refreshes metadata. " +
+ "Metadata recovery is triggered when all last-known brokers appear
unavailable simultaneously. " +
+ "Brokers appear unavailable when disconnected and no current retry
attempt is in-progress. " +
+ "Consider increasing <code>reconnect.backoff.ms</code> and
<code>reconnect.backoff.max.ms</code> and " +
+ "decreasing <code>socket.connection.setup.timeout.ms</code> and
<code>socket.connection.setup.timeout.max.ms</code> " +
+ "for the client.";
+ public static final String DEFAULT_METADATA_RECOVERY_STRATEGY =
MetadataRecoveryStrategy.NONE.name;
+
/**
* Postprocess the configuration so that exponential backoff is disabled
when reconnect backoff
* is explicitly configured but the maximum reconnect backoff is not
explicitly configured.
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index a03d57b40f8..46b64986064 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -130,7 +130,7 @@ public interface KafkaClient extends Closeable {
* @param now The current time in ms
* @return The node with the fewest in-flight requests.
*/
- Node leastLoadedNode(long now);
+ LeastLoadedNode leastLoadedNode(long now);
/**
* The number of currently in-flight requests for which we have not yet
returned a response
diff --git
a/clients/src/main/java/org/apache/kafka/clients/LeastLoadedNode.java
b/clients/src/main/java/org/apache/kafka/clients/LeastLoadedNode.java
new file mode 100644
index 00000000000..b2b93e6c94b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/clients/LeastLoadedNode.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import org.apache.kafka.common.Node;
+
+public class LeastLoadedNode {
+ private final Node node;
+ private final boolean atLeastOneConnectionReady;
+
+ public LeastLoadedNode(Node node, boolean atLeastOneConnectionReady) {
+ this.node = node;
+ this.atLeastOneConnectionReady = atLeastOneConnectionReady;
+ }
+
+ public Node node() {
+ return node;
+ }
+
+ /**
+ * Indicates if the least loaded node is available or at least a ready
connection exists.
+ *
+ * <p>There may be no node available while ready connections to live nodes
exist. This may happen when
+ * the connections are overloaded with in-flight requests. This function
takes this into account.
+ */
+ public boolean hasNodeAvailableOrConnectionReady() {
+ return node != null || atLeastOneConnectionReady;
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
index 30cad44a4bc..9246da01000 100644
--- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java
+++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java
@@ -82,6 +82,8 @@ public class Metadata implements Closeable {
private final ClusterResourceListeners clusterResourceListeners;
private boolean isClosed;
private final Map<TopicPartition, Integer> lastSeenLeaderEpochs;
+ /** Addresses with which the metadata was originally bootstrapped. */
+ private List<InetSocketAddress> bootstrapAddresses;
/**
* Create a new Metadata instance
@@ -304,6 +306,12 @@ public class Metadata implements Closeable {
this.needFullUpdate = true;
this.updateVersion += 1;
this.metadataSnapshot = MetadataSnapshot.bootstrap(addresses);
+ this.bootstrapAddresses = addresses;
+ }
+
+ public synchronized void rebootstrap() {
+ log.info("Rebootstrapping with {}", this.bootstrapAddresses);
+ this.bootstrap(this.bootstrapAddresses);
}
/**
diff --git
a/clients/src/main/java/org/apache/kafka/clients/MetadataRecoveryStrategy.java
b/clients/src/main/java/org/apache/kafka/clients/MetadataRecoveryStrategy.java
new file mode 100644
index 00000000000..a4e0340c241
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/clients/MetadataRecoveryStrategy.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients;
+
+import java.util.Locale;
+
+/**
+ * Defines the strategies which clients can follow to deal with the situation
when none of the known nodes is available.
+ */
+public enum MetadataRecoveryStrategy {
+ NONE("none"),
+ REBOOTSTRAP("rebootstrap");
+
+ public final String name;
+
+ MetadataRecoveryStrategy(String name) {
+ this.name = name;
+ }
+
+ public static MetadataRecoveryStrategy forName(String name) {
+ if (name == null) {
+ throw new IllegalArgumentException("Illegal
MetadataRecoveryStrategy: null");
+ }
+ try {
+ return
MetadataRecoveryStrategy.valueOf(name.toUpperCase(Locale.ROOT));
+ } catch (IllegalArgumentException e) {
+ throw new IllegalArgumentException("Illegal
MetadataRecoveryStrategy: " + name);
+ }
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 3a7af6617e7..8ac92acd884 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -114,6 +114,8 @@ public class NetworkClient implements KafkaClient {
/* time in ms to wait before retrying to create connection to a server */
private final long reconnectBackoffMs;
+ private final MetadataRecoveryStrategy metadataRecoveryStrategy;
+
private final Time time;
/**
@@ -147,7 +149,8 @@ public class NetworkClient implements KafkaClient {
Time time,
boolean discoverBrokerVersions,
ApiVersions apiVersions,
- LogContext logContext) {
+ LogContext logContext,
+ MetadataRecoveryStrategy metadataRecoveryStrategy) {
this(selector,
metadata,
clientId,
@@ -163,7 +166,8 @@ public class NetworkClient implements KafkaClient {
discoverBrokerVersions,
apiVersions,
null,
- logContext);
+ logContext,
+ metadataRecoveryStrategy);
}
public NetworkClient(Selectable selector,
@@ -181,7 +185,8 @@ public class NetworkClient implements KafkaClient {
boolean discoverBrokerVersions,
ApiVersions apiVersions,
Sensor throttleTimeSensor,
- LogContext logContext) {
+ LogContext logContext,
+ MetadataRecoveryStrategy metadataRecoveryStrategy) {
this(null,
metadata,
selector,
@@ -200,7 +205,8 @@ public class NetworkClient implements KafkaClient {
throttleTimeSensor,
logContext,
new DefaultHostResolver(),
- null);
+ null,
+ metadataRecoveryStrategy);
}
public NetworkClient(Selectable selector,
@@ -217,7 +223,8 @@ public class NetworkClient implements KafkaClient {
Time time,
boolean discoverBrokerVersions,
ApiVersions apiVersions,
- LogContext logContext) {
+ LogContext logContext,
+ MetadataRecoveryStrategy metadataRecoveryStrategy) {
this(metadataUpdater,
null,
selector,
@@ -236,7 +243,8 @@ public class NetworkClient implements KafkaClient {
null,
logContext,
new DefaultHostResolver(),
- null);
+ null,
+ metadataRecoveryStrategy);
}
public NetworkClient(MetadataUpdater metadataUpdater,
@@ -257,7 +265,8 @@ public class NetworkClient implements KafkaClient {
Sensor throttleTimeSensor,
LogContext logContext,
HostResolver hostResolver,
- ClientTelemetrySender clientTelemetrySender) {
+ ClientTelemetrySender clientTelemetrySender,
+ MetadataRecoveryStrategy metadataRecoveryStrategy) {
/* It would be better if we could pass `DefaultMetadataUpdater` from
the public constructor, but it's not
* possible because `DefaultMetadataUpdater` is an inner class and it
can only be instantiated after the
* super constructor is invoked.
@@ -288,6 +297,7 @@ public class NetworkClient implements KafkaClient {
this.log = logContext.logger(NetworkClient.class);
this.state = new AtomicReference<>(State.ACTIVE);
this.telemetrySender = (clientTelemetrySender != null) ? new
TelemetrySender(clientTelemetrySender) : null;
+ this.metadataRecoveryStrategy = metadataRecoveryStrategy;
}
/**
@@ -695,7 +705,7 @@ public class NetworkClient implements KafkaClient {
* @return The node with the fewest in-flight requests.
*/
@Override
- public Node leastLoadedNode(long now) {
+ public LeastLoadedNode leastLoadedNode(long now) {
List<Node> nodes = this.metadataUpdater.fetchNodes();
if (nodes.isEmpty())
throw new IllegalStateException("There are no nodes in the Kafka
cluster");
@@ -705,16 +715,25 @@ public class NetworkClient implements KafkaClient {
Node foundCanConnect = null;
Node foundReady = null;
+ boolean atLeastOneConnectionReady = false;
+
int offset = this.randOffset.nextInt(nodes.size());
for (int i = 0; i < nodes.size(); i++) {
int idx = (offset + i) % nodes.size();
Node node = nodes.get(idx);
+
+ if (!atLeastOneConnectionReady
+ && connectionStates.isReady(node.idString(), now)
+ && selector.isChannelReady(node.idString())) {
+ atLeastOneConnectionReady = true;
+ }
+
if (canSendRequest(node.idString(), now)) {
int currInflight =
this.inFlightRequests.count(node.idString());
if (currInflight == 0) {
// if we find an established connection with no in-flight
requests we can stop right away
log.trace("Found least loaded node {} connected with no
in-flight requests", node);
- return node;
+ return new LeastLoadedNode(node, true);
} else if (currInflight < inflight) {
// otherwise if this is the best we have found so far,
record that
inflight = currInflight;
@@ -738,16 +757,16 @@ public class NetworkClient implements KafkaClient {
// which are being established before connecting to new nodes.
if (foundReady != null) {
log.trace("Found least loaded node {} with {} inflight requests",
foundReady, inflight);
- return foundReady;
+ return new LeastLoadedNode(foundReady, atLeastOneConnectionReady);
} else if (foundConnecting != null) {
log.trace("Found least loaded connecting node {}",
foundConnecting);
- return foundConnecting;
+ return new LeastLoadedNode(foundConnecting,
atLeastOneConnectionReady);
} else if (foundCanConnect != null) {
log.trace("Found least loaded node {} with no active connection",
foundCanConnect);
- return foundCanConnect;
+ return new LeastLoadedNode(foundCanConnect,
atLeastOneConnectionReady);
} else {
log.trace("Least loaded node selection failed to find an available
node");
- return null;
+ return new LeastLoadedNode(null, atLeastOneConnectionReady);
}
}
@@ -1122,13 +1141,22 @@ public class NetworkClient implements KafkaClient {
// Beware that the behavior of this method and the computation of
timeouts for poll() are
// highly dependent on the behavior of leastLoadedNode.
- Node node = leastLoadedNode(now);
- if (node == null) {
+ LeastLoadedNode leastLoadedNode = leastLoadedNode(now);
+
+ // Rebootstrap if needed and configured.
+ if (metadataRecoveryStrategy ==
MetadataRecoveryStrategy.REBOOTSTRAP
+ && !leastLoadedNode.hasNodeAvailableOrConnectionReady()) {
+ metadata.rebootstrap();
+
+ leastLoadedNode = leastLoadedNode(now);
+ }
+
+ if (leastLoadedNode.node() == null) {
log.debug("Give up sending metadata request since no node is
available");
return reconnectBackoffMs;
}
- return maybeUpdate(now, node);
+ return maybeUpdate(now, leastLoadedNode.node());
}
@Override
@@ -1266,7 +1294,7 @@ public class NetworkClient implements KafkaClient {
// Per KIP-714, let's continue to re-use the same broker for as
long as possible.
if (stickyNode == null) {
- stickyNode = leastLoadedNode(now);
+ stickyNode = leastLoadedNode(now).node();
if (stickyNode == null) {
log.debug("Give up sending telemetry request since no node
is available");
return reconnectBackoffMs;
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index d740283f506..b64338b1d1b 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -19,6 +19,7 @@ package org.apache.kafka.clients.admin;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
@@ -139,6 +140,10 @@ public class AdminClientConfig extends AbstractConfig {
public static final String RETRIES_CONFIG =
CommonClientConfigs.RETRIES_CONFIG;
public static final String DEFAULT_API_TIMEOUT_MS_CONFIG =
CommonClientConfigs.DEFAULT_API_TIMEOUT_MS_CONFIG;
+ public static final String METADATA_RECOVERY_STRATEGY_CONFIG =
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG;
+ public static final String METADATA_RECOVERY_STRATEGY_DOC =
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC;
+ public static final String DEFAULT_METADATA_RECOVERY_STRATEGY =
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY;
+
/**
* <code>security.providers</code>
*/
@@ -262,7 +267,14 @@ public class AdminClientConfig extends AbstractConfig {
Importance.MEDIUM,
SECURITY_PROTOCOL_DOC)
.withClientSslSupport()
- .withClientSaslSupport();
+ .withClientSaslSupport()
+ .define(METADATA_RECOVERY_STRATEGY_CONFIG,
+ Type.STRING,
+ DEFAULT_METADATA_RECOVERY_STRATEGY,
+ ConfigDef.CaseInsensitiveValidString
+
.in(Utils.enumOptions(MetadataRecoveryStrategy.class)),
+ Importance.LOW,
+ METADATA_RECOVERY_STRATEGY_DOC);
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 55646e27820..8f693c9965a 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -25,6 +25,8 @@ import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.DefaultHostResolver;
import org.apache.kafka.clients.HostResolver;
import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.LeastLoadedNode;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.StaleMetadataException;
import
org.apache.kafka.clients.admin.CreateTopicsResult.TopicMetadataAndConfig;
@@ -399,6 +401,7 @@ public class KafkaAdminClient extends AdminClient {
private final long retryBackoffMaxMs;
private final ExponentialBackoff retryBackoff;
private final boolean clientTelemetryEnabled;
+ private final MetadataRecoveryStrategy metadataRecoveryStrategy;
/**
* The telemetry requests client instance id.
@@ -612,6 +615,7 @@ public class KafkaAdminClient extends AdminClient {
retryBackoffMaxMs,
CommonClientConfigs.RETRY_BACKOFF_JITTER);
this.clientTelemetryEnabled =
config.getBoolean(AdminClientConfig.ENABLE_METRICS_PUSH_CONFIG);
+ this.metadataRecoveryStrategy =
MetadataRecoveryStrategy.forName(config.getString(AdminClientConfig.METADATA_RECOVERY_STRATEGY_CONFIG));
config.logUnused();
AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics,
time.milliseconds());
log.debug("Kafka admin client initialized");
@@ -698,7 +702,13 @@ public class KafkaAdminClient extends AdminClient {
private class MetadataUpdateNodeIdProvider implements NodeProvider {
@Override
public Node provide() {
- return client.leastLoadedNode(time.milliseconds());
+ LeastLoadedNode leastLoadedNode =
client.leastLoadedNode(time.milliseconds());
+ if (metadataRecoveryStrategy ==
MetadataRecoveryStrategy.REBOOTSTRAP
+ && !leastLoadedNode.hasNodeAvailableOrConnectionReady()) {
+ metadataManager.rebootstrap(time.milliseconds());
+ }
+
+ return leastLoadedNode.node();
}
@Override
@@ -780,7 +790,7 @@ public class KafkaAdminClient extends AdminClient {
if (metadataManager.isReady()) {
// This may return null if all nodes are busy.
// In that case, we will postpone node assignment.
- return client.leastLoadedNode(time.milliseconds());
+ return client.leastLoadedNode(time.milliseconds()).node();
}
metadataManager.requestUpdate();
return null;
@@ -835,7 +845,7 @@ public class KafkaAdminClient extends AdminClient {
} else {
// This may return null if all nodes are busy.
// In that case, we will postpone node assignment.
- return client.leastLoadedNode(time.milliseconds());
+ return client.leastLoadedNode(time.milliseconds()).node();
}
}
metadataManager.requestUpdate();
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
index f8123c40eff..239f6eecef0 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
@@ -92,6 +92,11 @@ public class AdminMetadataManager {
*/
private ApiException fatalException = null;
+ /**
+ * The cluster with which the metadata was bootstrapped.
+ */
+ private Cluster bootstrapCluster;
+
public class AdminMetadataUpdater implements MetadataUpdater {
@Override
public List<Node> fetchNodes() {
@@ -275,6 +280,7 @@ public class AdminMetadataManager {
public void update(Cluster cluster, long now) {
if (cluster.isBootstrapConfigured()) {
log.debug("Setting bootstrap cluster metadata {}.", cluster);
+ bootstrapCluster = cluster;
} else {
log.debug("Updating cluster metadata to {}", cluster);
this.lastMetadataUpdateMs = now;
@@ -287,4 +293,12 @@ public class AdminMetadataManager {
this.cluster = cluster;
}
}
+
+ /**
+ * Rebootstrap metadata with the cluster previously used for bootstrapping.
+ */
+ public void rebootstrap(long now) {
+ log.info("Rebootstrapping with {}", this.bootstrapCluster);
+ update(bootstrapCluster, now);
+ }
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index 76bfe7e91a1..c4c10c404b4 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.common.IsolationLevel;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
@@ -656,7 +657,14 @@ public class ConsumerConfig extends AbstractConfig {
Importance.MEDIUM,
CommonClientConfigs.SECURITY_PROTOCOL_DOC)
.withClientSslSupport()
- .withClientSaslSupport();
+ .withClientSaslSupport()
+
.define(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
+ Type.STRING,
+
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY,
+ ConfigDef.CaseInsensitiveValidString
+
.in(Utils.enumOptions(MetadataRecoveryStrategy.class)),
+ Importance.LOW,
+
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC);
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
index 4699f00c151..50c7eb5b028 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java
@@ -139,7 +139,7 @@ public class ConsumerNetworkClient implements Closeable {
public Node leastLoadedNode() {
lock.lock();
try {
- return client.leastLoadedNode(time.milliseconds());
+ return client.leastLoadedNode(time.milliseconds()).node();
} finally {
lock.unlock();
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
index e2e4d529c00..d069a0d1fb6 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java
@@ -163,7 +163,7 @@ public class NetworkClientDelegate implements AutoCloseable
{
}
boolean doSend(final UnsentRequest r, final long currentTimeMs) {
- Node node = r.node.orElse(client.leastLoadedNode(currentTimeMs));
+ Node node =
r.node.orElse(client.leastLoadedNode(currentTimeMs).node());
if (node == null || nodeUnavailable(node)) {
log.debug("No broker available to send the request: {}.
Retrying.", r);
return false;
@@ -208,7 +208,7 @@ public class NetworkClientDelegate implements AutoCloseable
{
}
public Node leastLoadedNode() {
- return this.client.leastLoadedNode(time.milliseconds());
+ return this.client.leastLoadedNode(time.milliseconds()).node();
}
public void wakeup() {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index c67d60a180a..a59ee81b4a9 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.producer;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.common.compress.GzipCompression;
import org.apache.kafka.common.compress.Lz4Compression;
import org.apache.kafka.common.compress.ZstdCompression;
@@ -528,7 +529,14 @@ public class ProducerConfig extends AbstractConfig {
null,
new ConfigDef.NonEmptyString(),
Importance.LOW,
- TRANSACTIONAL_ID_DOC);
+ TRANSACTIONAL_ID_DOC)
+
.define(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
+ Type.STRING,
+
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY,
+ ConfigDef.CaseInsensitiveValidString
+
.in(Utils.enumOptions(MetadataRecoveryStrategy.class)),
+ Importance.LOW,
+
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC);
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index c4e2b73e8b9..b1a3ab9e293 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -484,7 +484,7 @@ public class Sender implements Runnable {
FindCoordinatorRequest.CoordinatorType coordinatorType =
nextRequestHandler.coordinatorType();
targetNode = coordinatorType != null ?
transactionManager.coordinator(coordinatorType) :
- client.leastLoadedNode(time.milliseconds());
+ client.leastLoadedNode(time.milliseconds()).node();
if (targetNode != null) {
if (!awaitNodeReady(targetNode, coordinatorType)) {
log.trace("Target node {} not ready within request
timeout, will retry when node is ready.", targetNode);
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index d6cdd14f365..86d1ddf5f41 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -319,7 +319,7 @@ public class MockClient implements KafkaClient {
checkTimeoutOfPendingRequests(now);
// We skip metadata updates if all nodes are currently blacked out
- if (metadataUpdater.isUpdateNeeded() && leastLoadedNode(now) != null) {
+ if (metadataUpdater.isUpdateNeeded() && leastLoadedNode(now).node() !=
null) {
MetadataUpdate metadataUpdate = metadataUpdates.poll();
if (metadataUpdate != null) {
metadataUpdater.update(time, metadataUpdate);
@@ -588,13 +588,13 @@ public class MockClient implements KafkaClient {
}
@Override
- public Node leastLoadedNode(long now) {
+ public LeastLoadedNode leastLoadedNode(long now) {
// Consistent with NetworkClient, we do not return nodes awaiting
reconnect backoff
for (Node node : metadataUpdater.fetchNodes()) {
if (!connectionState(node.idString()).isBackingOff(now))
- return node;
+ return new LeastLoadedNode(node, true);
}
- return null;
+ return new LeastLoadedNode(null, false);
}
public void setWakeupHook(Runnable wakeupHook) {
diff --git
a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index cd3ec36f385..4369c8404e1 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -128,7 +128,16 @@ public class NetworkClientTest {
private NetworkClient createNetworkClient(long reconnectBackoffMaxMs) {
return new NetworkClient(selector, metadataUpdater, "mock",
Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMs, 64 * 1024, 64 *
1024,
- defaultRequestTimeoutMs, connectionSetupTimeoutMsTest,
connectionSetupTimeoutMaxMsTest, time, true, new ApiVersions(), new
LogContext());
+ defaultRequestTimeoutMs, connectionSetupTimeoutMsTest,
connectionSetupTimeoutMaxMsTest, time, true, new ApiVersions(), new
LogContext(),
+ MetadataRecoveryStrategy.NONE);
+ }
+
+ private NetworkClient
createNetworkClientWithMaxInFlightRequestsPerConnection(
+ int maxInFlightRequestsPerConnection, long reconnectBackoffMaxMs) {
+ return new NetworkClient(selector, metadataUpdater, "mock",
maxInFlightRequestsPerConnection,
+ reconnectBackoffMsTest, reconnectBackoffMaxMs, 64 * 1024, 64 *
1024,
+ defaultRequestTimeoutMs, connectionSetupTimeoutMsTest,
connectionSetupTimeoutMaxMsTest, time, true, new ApiVersions(), new
LogContext(),
+ MetadataRecoveryStrategy.NONE);
}
private NetworkClient createNetworkClientWithMultipleNodes(long
reconnectBackoffMaxMs, long connectionSetupTimeoutMsTest, int nodeNumber) {
@@ -136,26 +145,30 @@ public class NetworkClientTest {
TestMetadataUpdater metadataUpdater = new TestMetadataUpdater(nodes);
return new NetworkClient(selector, metadataUpdater, "mock",
Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMs, 64 * 1024, 64 *
1024,
- defaultRequestTimeoutMs, connectionSetupTimeoutMsTest,
connectionSetupTimeoutMaxMsTest, time, true, new ApiVersions(), new
LogContext());
+ defaultRequestTimeoutMs, connectionSetupTimeoutMsTest,
connectionSetupTimeoutMaxMsTest, time, true, new ApiVersions(), new
LogContext(),
+ MetadataRecoveryStrategy.NONE);
}
private NetworkClient createNetworkClientWithStaticNodes() {
return new NetworkClient(selector, metadataUpdater,
"mock-static", Integer.MAX_VALUE, 0, 0, 64 * 1024, 64 * 1024,
defaultRequestTimeoutMs,
- connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest,
time, true, new ApiVersions(), new LogContext());
+ connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest,
time, true, new ApiVersions(), new LogContext(),
+ MetadataRecoveryStrategy.NONE);
}
private NetworkClient createNetworkClientWithNoVersionDiscovery(Metadata
metadata) {
return new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, 0, 64 * 1024, 64 * 1024,
- defaultRequestTimeoutMs, connectionSetupTimeoutMsTest,
connectionSetupTimeoutMaxMsTest, time, false, new ApiVersions(), new
LogContext());
+ defaultRequestTimeoutMs, connectionSetupTimeoutMsTest,
connectionSetupTimeoutMaxMsTest, time, false, new ApiVersions(), new
LogContext(),
+ MetadataRecoveryStrategy.NONE);
}
private NetworkClient createNetworkClientWithNoVersionDiscovery() {
return new NetworkClient(selector, metadataUpdater, "mock",
Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest,
64 * 1024, 64 * 1024, defaultRequestTimeoutMs,
- connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest,
time, false, new ApiVersions(), new LogContext());
+ connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest,
time, false, new ApiVersions(), new LogContext(),
+ MetadataRecoveryStrategy.NONE);
}
@BeforeEach
@@ -698,14 +711,18 @@ public class NetworkClientTest {
public void testLeastLoadedNode() {
client.ready(node, time.milliseconds());
assertFalse(client.isReady(node, time.milliseconds()));
- assertEquals(node, client.leastLoadedNode(time.milliseconds()));
+ LeastLoadedNode leastLoadedNode =
client.leastLoadedNode(time.milliseconds());
+ assertEquals(node, leastLoadedNode.node());
+ assertTrue(leastLoadedNode.hasNodeAvailableOrConnectionReady());
awaitReady(client, node);
client.poll(1, time.milliseconds());
assertTrue(client.isReady(node, time.milliseconds()), "The client
should be ready");
// leastloadednode should be our single node
- Node leastNode = client.leastLoadedNode(time.milliseconds());
+ leastLoadedNode = client.leastLoadedNode(time.milliseconds());
+ assertTrue(leastLoadedNode.hasNodeAvailableOrConnectionReady());
+ Node leastNode = leastLoadedNode.node();
assertEquals(leastNode.id(), node.id(), "There should be one
leastloadednode");
// sleep for longer than reconnect backoff
@@ -716,8 +733,29 @@ public class NetworkClientTest {
client.poll(1, time.milliseconds());
assertFalse(client.ready(node, time.milliseconds()), "After we forced
the disconnection the client is no longer ready.");
- leastNode = client.leastLoadedNode(time.milliseconds());
- assertNull(leastNode, "There should be NO leastloadednode");
+ leastLoadedNode = client.leastLoadedNode(time.milliseconds());
+ assertFalse(leastLoadedNode.hasNodeAvailableOrConnectionReady());
+ assertNull(leastLoadedNode.node(), "There should be NO
leastloadednode");
+ }
+
+ @Test
+ public void testHasNodeAvailableOrConnectionReady() {
+ NetworkClient client =
createNetworkClientWithMaxInFlightRequestsPerConnection(1,
reconnectBackoffMaxMsTest);
+ awaitReady(client, node);
+
+ long now = time.milliseconds();
+ LeastLoadedNode leastLoadedNode = client.leastLoadedNode(now);
+ assertEquals(node, leastLoadedNode.node());
+ assertTrue(leastLoadedNode.hasNodeAvailableOrConnectionReady());
+
+ MetadataRequest.Builder builder = new
MetadataRequest.Builder(Collections.emptyList(), true);
+ ClientRequest request = client.newClientRequest(node.idString(),
builder, now, true);
+ client.send(request, now);
+ client.poll(defaultRequestTimeoutMs, now);
+
+ leastLoadedNode = client.leastLoadedNode(now);
+ assertNull(leastLoadedNode.node());
+ assertTrue(leastLoadedNode.hasNodeAvailableOrConnectionReady());
}
@Test
@@ -727,7 +765,7 @@ public class NetworkClientTest {
Set<Node> providedNodeIds = new HashSet<>();
for (int i = 0; i < nodeNumber * 10; i++) {
- Node node = client.leastLoadedNode(time.milliseconds());
+ Node node = client.leastLoadedNode(time.milliseconds()).node();
assertNotNull(node, "Should provide a node");
providedNodeIds.add(node);
client.ready(node, time.milliseconds());
@@ -800,7 +838,7 @@ public class NetworkClientTest {
client.poll(1, time.milliseconds());
// leastloadednode should return null since the node is throttled
- assertNull(client.leastLoadedNode(time.milliseconds()));
+ assertNull(client.leastLoadedNode(time.milliseconds()).node());
}
@Test
@@ -1046,7 +1084,8 @@ public class NetworkClientTest {
NetworkClient client = new NetworkClient(metadataUpdater, null,
selector, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024,
64 * 1024,
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest,
connectionSetupTimeoutMaxMsTest,
- time, false, new ApiVersions(), null, new LogContext(),
mockHostResolver, mockClientTelemetrySender);
+ time, false, new ApiVersions(), null, new LogContext(),
mockHostResolver, mockClientTelemetrySender,
+ MetadataRecoveryStrategy.NONE);
// Connect to one the initial addresses, then change the addresses and
disconnect
client.ready(node, time.milliseconds());
@@ -1106,7 +1145,8 @@ public class NetworkClientTest {
NetworkClient client = new NetworkClient(metadataUpdater, null,
selector, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024,
64 * 1024,
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest,
connectionSetupTimeoutMaxMsTest,
- time, false, new ApiVersions(), null, new LogContext(),
mockHostResolver, mockClientTelemetrySender);
+ time, false, new ApiVersions(), null, new LogContext(),
mockHostResolver, mockClientTelemetrySender,
+ MetadataRecoveryStrategy.NONE);
// First connection attempt should fail
client.ready(node, time.milliseconds());
@@ -1158,7 +1198,8 @@ public class NetworkClientTest {
NetworkClient client = new NetworkClient(metadataUpdater, null,
selector, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024,
64 * 1024,
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest,
connectionSetupTimeoutMaxMsTest,
- time, false, new ApiVersions(), null, new LogContext(),
mockHostResolver, mockClientTelemetrySender);
+ time, false, new ApiVersions(), null, new LogContext(),
mockHostResolver, mockClientTelemetrySender,
+ MetadataRecoveryStrategy.NONE);
// Connect to one the initial addresses, then change the addresses and
disconnect
client.ready(node, time.milliseconds());
@@ -1266,7 +1307,8 @@ public class NetworkClientTest {
NetworkClient client = new NetworkClient(metadataUpdater, null,
selector, "mock", Integer.MAX_VALUE,
reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 *
1024,
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest,
connectionSetupTimeoutMaxMsTest,
- time, true, new ApiVersions(), null, new LogContext(), new
DefaultHostResolver(), mockClientTelemetrySender);
+ time, true, new ApiVersions(), null, new LogContext(), new
DefaultHostResolver(), mockClientTelemetrySender,
+ MetadataRecoveryStrategy.NONE);
// Send the ApiVersionsRequest
client.ready(node, time.milliseconds());
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientConfigTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientConfigTest.java
new file mode 100644
index 00000000000..92dc56fde46
--- /dev/null
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientConfigTest.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.admin;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
+import org.apache.kafka.common.config.ConfigException;
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class AdminClientConfigTest {
+ @Test
+ public void testDefaultMetadataRecoveryStrategy() {
+ Map<String, Object> configs = new HashMap<>();
+ final AdminClientConfig adminClientConfig = new
AdminClientConfig(configs);
+ assertEquals(MetadataRecoveryStrategy.NONE.name,
adminClientConfig.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
+ }
+
+ @Test
+ public void testInvalidMetadataRecoveryStrategy() {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
"abc");
+ ConfigException ce = assertThrows(ConfigException.class, () -> new
AdminClientConfig(configs));
+
assertTrue(ce.getMessage().contains(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
+ }
+}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
index 8e9fa5722fc..0fc8e6ca485 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients.consumer;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.errors.InvalidConfigurationException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
@@ -191,6 +192,25 @@ public class ConsumerConfigTest {
assertEquals(remoteAssignorName,
consumerConfig.getString(ConsumerConfig.GROUP_REMOTE_ASSIGNOR_CONFIG));
}
+ @Test
+ public void testDefaultMetadataRecoveryStrategy() {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
keyDeserializerClass);
+ configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
valueDeserializerClass);
+ final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
+ assertEquals(MetadataRecoveryStrategy.NONE.name,
consumerConfig.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
+ }
+
+ @Test
+ public void testInvalidMetadataRecoveryStrategy() {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
keyDeserializerClass);
+ configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
valueDeserializerClass);
+ configs.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
"abc");
+ ConfigException ce = assertThrows(ConfigException.class, () -> new
ConsumerConfig(configs));
+
assertTrue(ce.getMessage().contains(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
+ }
+
@ParameterizedTest
@CsvSource({"consumer, true", "classic, true", "Consumer, true", "Classic,
true", "invalid, false"})
public void testProtocolConfigValidation(String protocol, boolean isValid)
{
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
index eaabcb8f814..71a267b5246 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetchRequestManagerTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NodeApiVersions;
@@ -1909,7 +1910,8 @@ public class FetchRequestManagerTest {
Node node = cluster.nodes().get(0);
NetworkClient client = new NetworkClient(selector, metadata, "mock",
Integer.MAX_VALUE,
1000, 1000, 64 * 1024, 64 * 1024, 1000, 10 * 1000, 127 * 1000,
- time, true, new ApiVersions(),
metricsManager.throttleTimeSensor(), new LogContext());
+ time, true, new ApiVersions(),
metricsManager.throttleTimeSensor(), new LogContext(),
+ MetadataRecoveryStrategy.NONE);
ApiVersionsResponse apiVersionsResponse =
TestUtils.defaultApiVersionsResponse(
400, ApiMessageType.ListenerType.ZK_BROKER);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 1eac8709934..d0167e9b989 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -21,6 +21,7 @@ import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.FetchSessionHandler;
import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NodeApiVersions;
@@ -1905,7 +1906,8 @@ public class FetcherTest {
Node node = cluster.nodes().get(0);
NetworkClient client = new NetworkClient(selector, metadata, "mock",
Integer.MAX_VALUE,
1000, 1000, 64 * 1024, 64 * 1024, 1000, 10 * 1000, 127 * 1000,
- time, true, new ApiVersions(),
metricsManager.throttleTimeSensor(), new LogContext());
+ time, true, new ApiVersions(),
metricsManager.throttleTimeSensor(), new LogContext(),
+ MetadataRecoveryStrategy.NONE);
ApiVersionsResponse apiVersionsResponse =
TestUtils.defaultApiVersionsResponse(
400, ApiMessageType.ListenerType.ZK_BROKER);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
index 7d4aa5e3a85..17119a25290 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/KafkaProducerTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients.producer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.KafkaClient;
+import org.apache.kafka.clients.LeastLoadedNode;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -735,8 +736,8 @@ public class KafkaProducerTest {
// let mockClient#leastLoadedNode return the node directly so that we
can isolate Metadata calls from KafkaProducer for idempotent producer
MockClient mockClient = new MockClient(Time.SYSTEM, metadata) {
@Override
- public Node leastLoadedNode(long now) {
- return NODE;
+ public LeastLoadedNode leastLoadedNode(long now) {
+ return new LeastLoadedNode(NODE, true);
}
};
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
index d7952320e9f..eba5e8a0a7b 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.clients.producer;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.security.auth.SecurityProtocol;
import org.apache.kafka.common.serialization.ByteArraySerializer;
@@ -98,6 +99,25 @@ public class ProducerConfigTest {
assertTrue(ce.getMessage().contains(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG));
}
+ @Test
+ public void testDefaultMetadataRecoveryStrategy() {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
keySerializerClass);
+ configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
valueSerializerClass);
+ final ProducerConfig producerConfig = new ProducerConfig(configs);
+ assertEquals(MetadataRecoveryStrategy.NONE.name,
producerConfig.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
+ }
+
+ @Test
+ public void testInvalidMetadataRecoveryStrategy() {
+ Map<String, Object> configs = new HashMap<>();
+ configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
keySerializerClass);
+ configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
valueSerializerClass);
+ configs.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
"abc");
+ ConfigException ce = assertThrows(ConfigException.class, () -> new
ProducerConfig(configs));
+
assertTrue(ce.getMessage().contains(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
+ }
+
@Test
public void testCaseInsensitiveSecurityProtocol() {
final String saslSslLowerCase =
SecurityProtocol.SASL_SSL.name.toLowerCase(Locale.ROOT);
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index 5c1088987eb..cfeefc0ae5f 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -19,7 +19,9 @@ package org.apache.kafka.clients.producer.internals;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientResponse;
+import org.apache.kafka.clients.LeastLoadedNode;
import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.MetadataSnapshot;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NetworkClient;
@@ -299,7 +301,8 @@ public class SenderTest {
Node node = cluster.nodes().get(0);
NetworkClient client = new NetworkClient(selector, metadata, "mock",
Integer.MAX_VALUE,
1000, 1000, 64 * 1024, 64 * 1024, 1000, 10 * 1000, 127 * 1000,
- time, true, new ApiVersions(), throttleTimeSensor, logContext);
+ time, true, new ApiVersions(), throttleTimeSensor, logContext,
+ MetadataRecoveryStrategy.NONE);
ApiVersionsResponse apiVersionsResponse =
TestUtils.defaultApiVersionsResponse(
400, ApiMessageType.ListenerType.ZK_BROKER);
@@ -3797,12 +3800,12 @@ public class SenderTest {
client = new MockClient(time, metadata) {
volatile boolean canSendMore = true;
@Override
- public Node leastLoadedNode(long now) {
+ public LeastLoadedNode leastLoadedNode(long now) {
for (Node node : metadata.fetch().nodes()) {
if (isReady(node, now) && canSendMore)
- return node;
+ return new LeastLoadedNode(node, true);
}
- return null;
+ return new LeastLoadedNode(null, false);
}
@Override
@@ -3821,7 +3824,7 @@ public class SenderTest {
while (!client.ready(node, time.milliseconds()))
client.poll(0, time.milliseconds());
client.send(request, time.milliseconds());
- while (client.leastLoadedNode(time.milliseconds()) != null)
+ while (client.leastLoadedNode(time.milliseconds()).node() != null)
client.poll(0, time.milliseconds());
}
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index 14826e982d6..1880fa512df 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -17,6 +17,7 @@
package org.apache.kafka.connect.runtime.distributed;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
@@ -96,6 +97,10 @@ public class DistributedConfig extends WorkerConfig {
public static final String REBALANCE_TIMEOUT_MS_CONFIG =
CommonClientConfigs.REBALANCE_TIMEOUT_MS_CONFIG;
private static final String REBALANCE_TIMEOUT_MS_DOC =
CommonClientConfigs.REBALANCE_TIMEOUT_MS_DOC;
+ public static final String METADATA_RECOVERY_STRATEGY_CONFIG =
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG;
+ private static final String METADATA_RECOVERY_STRATEGY_DOC =
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC;
+ public static final String DEFAULT_METADATA_RECOVERY_STRATEGY =
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY;
+
/**
* <code>worker.sync.timeout.ms</code>
*/
@@ -512,7 +517,14 @@ public class DistributedConfig extends WorkerConfig {
(name, value) ->
validateVerificationAlgorithms(crypto, name, (List<String>) value),
() -> "A list of one or more MAC algorithms, each
supported by the worker JVM"),
ConfigDef.Importance.LOW,
- INTER_WORKER_VERIFICATION_ALGORITHMS_DOC);
+ INTER_WORKER_VERIFICATION_ALGORITHMS_DOC)
+ .define(METADATA_RECOVERY_STRATEGY_CONFIG,
+ ConfigDef.Type.STRING,
+ DEFAULT_METADATA_RECOVERY_STRATEGY,
+ ConfigDef.CaseInsensitiveValidString
+
.in(Utils.enumOptions(MetadataRecoveryStrategy.class)),
+ ConfigDef.Importance.LOW,
+ METADATA_RECOVERY_STRATEGY_DOC);
}
private final ExactlyOnceSourceSupport exactlyOnceSourceSupport;
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 2c3537ea675..2ea83daf048 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.Metadata;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.GroupRebalanceConfig;
@@ -119,7 +120,9 @@ public class WorkerGroupMember {
time,
true,
new ApiVersions(),
- logContext);
+ logContext,
+
MetadataRecoveryStrategy.forName(config.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG))
+ );
this.client = new ConsumerNetworkClient(
logContext,
netClient,
diff --git a/core/src/main/java/kafka/server/NetworkUtils.java
b/core/src/main/java/kafka/server/NetworkUtils.java
index 5607f2623f9..83093c19e10 100644
--- a/core/src/main/java/kafka/server/NetworkUtils.java
+++ b/core/src/main/java/kafka/server/NetworkUtils.java
@@ -18,6 +18,7 @@ package kafka.server;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ManualMetadataUpdater;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.common.Reconfigurable;
import org.apache.kafka.common.metrics.Metrics;
@@ -84,7 +85,8 @@ public class NetworkUtils {
time,
true,
new ApiVersions(),
- logContext
+ logContext,
+ MetadataRecoveryStrategy.NONE
);
}
}
\ No newline at end of file
diff --git a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
index 6cb273f066f..a8b7c8f59b5 100644
--- a/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
+++ b/core/src/main/scala/kafka/admin/BrokerApiVersionsCommand.scala
@@ -26,7 +26,7 @@ import joptsimple.OptionSpec
import kafka.utils.Implicits._
import kafka.utils.Logging
import org.apache.kafka.common.utils.Utils
-import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ClientResponse,
ClientUtils, CommonClientConfigs, Metadata, NetworkClient, NodeApiVersions}
+import org.apache.kafka.clients.{ApiVersions, ClientDnsLookup, ClientResponse,
ClientUtils, CommonClientConfigs, Metadata, MetadataRecoveryStrategy,
NetworkClient, NodeApiVersions}
import org.apache.kafka.clients.consumer.internals.{ConsumerNetworkClient,
RequestFuture}
import org.apache.kafka.common.config.ConfigDef.ValidString._
import org.apache.kafka.common.config.ConfigDef.{Importance, Type}
@@ -310,7 +310,8 @@ object BrokerApiVersionsCommand {
time,
true,
new ApiVersions,
- logContext)
+ logContext,
+ MetadataRecoveryStrategy.NONE)
val highLevelClient = new ConsumerNetworkClient(
logContext,
diff --git
a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index 67d3f3963b9..793b39538e7 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -164,7 +164,8 @@ class ControllerChannelManager(controllerEpoch: () => Int,
time,
false,
new ApiVersions,
- logContext
+ logContext,
+ MetadataRecoveryStrategy.NONE
)
(networkClient, reconfigurableChannelBuilder)
}
diff --git
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
index 068dff4cca6..44176d22763 100644
---
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
+++
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala
@@ -96,7 +96,8 @@ object TransactionMarkerChannelManager {
time,
false,
new ApiVersions,
- logContext
+ logContext,
+ MetadataRecoveryStrategy.NONE
)
new TransactionMarkerChannelManager(config,
diff --git a/core/src/main/scala/kafka/raft/RaftManager.scala
b/core/src/main/scala/kafka/raft/RaftManager.scala
index 48646e0b4d7..65ef855640c 100644
--- a/core/src/main/scala/kafka/raft/RaftManager.scala
+++ b/core/src/main/scala/kafka/raft/RaftManager.scala
@@ -30,7 +30,7 @@ import kafka.server.KafkaConfig
import kafka.utils.CoreUtils
import kafka.utils.FileLock
import kafka.utils.Logging
-import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater,
NetworkClient}
+import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater,
MetadataRecoveryStrategy, NetworkClient}
import org.apache.kafka.common.KafkaException
import org.apache.kafka.common.Node
import org.apache.kafka.common.TopicPartition
@@ -312,7 +312,8 @@ class KafkaRaftManager[T](
time,
discoverBrokerVersions,
apiVersions,
- logContext
+ logContext,
+ MetadataRecoveryStrategy.NONE
)
(controllerListenerName, networkClient)
diff --git a/core/src/main/scala/kafka/server/BrokerBlockingSender.scala
b/core/src/main/scala/kafka/server/BrokerBlockingSender.scala
index 7d9fb0512a5..3cb692045b6 100644
--- a/core/src/main/scala/kafka/server/BrokerBlockingSender.scala
+++ b/core/src/main/scala/kafka/server/BrokerBlockingSender.scala
@@ -96,7 +96,8 @@ class BrokerBlockingSender(sourceBroker: BrokerEndPoint,
time,
false,
new ApiVersions,
- logContext
+ logContext,
+ MetadataRecoveryStrategy.NONE
)
(networkClient, reconfigurableChannelBuilder)
}
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala
b/core/src/main/scala/kafka/server/KafkaServer.scala
index 5b6e04e5a0e..dffd2e7f697 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -30,7 +30,7 @@ import kafka.raft.KafkaRaftManager
import kafka.server.metadata.{OffsetTrackingListener, ZkConfigRepository,
ZkMetadataCache}
import kafka.utils._
import kafka.zk.{AdminZkClient, BrokerInfo, KafkaZkClient}
-import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater,
NetworkClient, NetworkClientUtils}
+import org.apache.kafka.clients.{ApiVersions, ManualMetadataUpdater,
MetadataRecoveryStrategy, NetworkClient, NetworkClientUtils}
import org.apache.kafka.common.config.ConfigException
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.ApiMessageType.ListenerType
@@ -827,7 +827,8 @@ class KafkaServer(
time,
false,
new ApiVersions,
- logContext)
+ logContext,
+ MetadataRecoveryStrategy.NONE)
}
var shutdownSucceeded: Boolean = false
diff --git
a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
index a0e4bbbc463..6ce6e9e0a48 100644
--- a/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/server/NodeToControllerChannelManager.scala
@@ -191,7 +191,8 @@ class NodeToControllerChannelManagerImpl(
time,
true,
apiVersions,
- logContext
+ logContext,
+ MetadataRecoveryStrategy.NONE
)
}
val threadName =
s"${threadNamePrefix}to-controller-${channelName}-channel-manager"
diff --git
a/core/src/test/scala/integration/kafka/api/AdminClientRebootstrapTest.scala
b/core/src/test/scala/integration/kafka/api/AdminClientRebootstrapTest.scala
new file mode 100644
index 00000000000..70b514f199b
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/AdminClientRebootstrapTest.scala
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import org.junit.jupiter.api.Test
+
+class AdminClientRebootstrapTest extends RebootstrapTest {
+ @Test
+ def testRebootstrap(): Unit = {
+ server1.shutdown()
+ server1.awaitShutdown()
+
+ val adminClient = createAdminClient(configOverrides = clientOverrides)
+
+ // Only the server 0 is available for the admin client during the
bootstrap.
+ adminClient.listTopics().names().get()
+
+ server0.shutdown()
+ server0.awaitShutdown()
+ server1.startup()
+
+ // The server 0, originally cached during the bootstrap, is offline.
+ // However, the server 1 from the bootstrap list is online.
+ // Should be able to list topics again.
+ adminClient.listTopics().names().get()
+
+ server1.shutdown()
+ server1.awaitShutdown()
+ server0.startup()
+
+ // The same situation, but the server 1 has gone and server 0 is back.
+ adminClient.listTopics().names().get()
+ }
+}
diff --git
a/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
b/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
new file mode 100644
index 00000000000..9979e3eb91d
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import kafka.utils.TestUtils
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.junit.jupiter.api.Test
+
+import java.util.Collections
+
+class ConsumerRebootstrapTest extends RebootstrapTest {
+ @Test
+ def testRebootstrap(): Unit = {
+ sendRecords(10, 0)
+
+ TestUtils.waitUntilTrue(
+ () => server0.logManager.logsByTopic(tp.topic()).head.logEndOffset ==
server1.logManager.logsByTopic(tp.topic()).head.logEndOffset,
+ "Timeout waiting for records to be replicated"
+ )
+
+ server1.shutdown()
+ server1.awaitShutdown()
+
+ val consumer = createConsumer(configOverrides = clientOverrides)
+
+ // Only the server 0 is available for the consumer during the bootstrap.
+ consumer.assign(Collections.singleton(tp))
+
+ consumeAndVerifyRecords(consumer, 10, 0)
+
+ // Bring back the server 1 and shut down 0.
+ server1.startup()
+
+ TestUtils.waitUntilTrue(
+ () => server0.logManager.logsByTopic(tp.topic()).head.logEndOffset ==
server1.logManager.logsByTopic(tp.topic()).head.logEndOffset,
+ "Timeout waiting for records to be replicated"
+ )
+
+ server0.shutdown()
+ server0.awaitShutdown()
+ sendRecords(10, 10)
+
+ // The server 0, originally cached during the bootstrap, is offline.
+ // However, the server 1 from the bootstrap list is online.
+ // Should be able to consume records.
+ consumeAndVerifyRecords(consumer, 10, 10, startingKeyAndValueIndex = 10,
startingTimestamp = 10)
+
+ // Bring back the server 0 and shut down 1.
+ server0.startup()
+
+ TestUtils.waitUntilTrue(
+ () => server0.logManager.logsByTopic(tp.topic()).head.logEndOffset ==
server1.logManager.logsByTopic(tp.topic()).head.logEndOffset,
+ "Timeout waiting for records to be replicated"
+ )
+
+ server1.shutdown()
+ server1.awaitShutdown()
+ sendRecords(10, 20)
+
+ // The same situation, but the server 1 has gone and server 0 is back.
+ consumeAndVerifyRecords(consumer, 10, 20, startingKeyAndValueIndex = 20,
startingTimestamp = 20)
+ }
+
+ private def sendRecords(numRecords: Int, from: Int): Unit = {
+ val producer: KafkaProducer[Array[Byte], Array[Byte]] = createProducer()
+ (from until (numRecords + from)).foreach { i =>
+ val record = new ProducerRecord(tp.topic(), tp.partition(), i.toLong,
s"key $i".getBytes, s"value $i".getBytes)
+ producer.send(record)
+ }
+ producer.flush()
+ producer.close()
+ }
+}
diff --git
a/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
b/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
new file mode 100644
index 00000000000..3cb40b6a0cf
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import org.apache.kafka.clients.producer.ProducerRecord
+import org.junit.jupiter.api.Assertions.assertEquals
+import org.junit.jupiter.api.Test
+
+class ProducerRebootstrapTest extends RebootstrapTest {
+ @Test
+ def testRebootstrap(): Unit = {
+ server1.shutdown()
+ server1.awaitShutdown()
+
+ val producer = createProducer(configOverrides = clientOverrides)
+
+ // Only the server 0 is available for the producer during the bootstrap.
+ producer.send(new ProducerRecord(topic, part, "key 0".getBytes, "value
0".getBytes)).get()
+
+ server0.shutdown()
+ server0.awaitShutdown()
+ server1.startup()
+
+ // The server 0, originally cached during the bootstrap, is offline.
+ // However, the server 1 from the bootstrap list is online.
+ // Should be able to produce records.
+ val recordMetadata1 = producer.send(new ProducerRecord(topic, part, "key
1".getBytes, "value 1".getBytes)).get()
+ assertEquals(0, recordMetadata1.offset())
+
+ server1.shutdown()
+ server1.awaitShutdown()
+ server0.startup()
+
+ // The same situation, but the server 1 has gone and server 0 is back.
+ val recordMetadata2 = producer.send(new ProducerRecord(topic, part, "key
1".getBytes, "value 1".getBytes)).get()
+ assertEquals(1, recordMetadata2.offset())
+ }
+}
diff --git a/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
b/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
new file mode 100644
index 00000000000..b3b044ebcdb
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.api
+
+import kafka.server.{KafkaConfig, KafkaServer}
+import org.apache.kafka.clients.CommonClientConfigs
+import org.apache.kafka.common.config.TopicConfig
+import org.apache.kafka.coordinator.group.GroupCoordinatorConfig
+
+import java.util.Properties
+
+abstract class RebootstrapTest extends AbstractConsumerTest {
+ override def brokerCount: Int = 2
+
+ def server0: KafkaServer = serverForId(0).get
+ def server1: KafkaServer = serverForId(1).get
+
+ override def generateConfigs: Seq[KafkaConfig] = {
+ val overridingProps = new Properties()
+
overridingProps.put(GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG,
brokerCount.toString)
+ overridingProps.put(TopicConfig.UNCLEAN_LEADER_ELECTION_ENABLE_CONFIG,
"true")
+
+ // In this test, fixed ports are necessary, because brokers must have the
+ // same port after the restart.
+ FixedPortTestUtils.createBrokerConfigs(brokerCount, zkConnect,
enableControlledShutdown = false)
+ .map(KafkaConfig.fromProps(_, overridingProps))
+ }
+
+ def clientOverrides: Properties = {
+ val overrides = new Properties()
+
overrides.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
"5000")
+
overrides.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
"5000")
+ overrides.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "1000")
+ overrides.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG, "1000")
+ overrides.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
"rebootstrap")
+ overrides
+ }
+}
diff --git
a/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java
b/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java
index c3e1cc1f7a0..970e695ba92 100644
--- a/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java
+++ b/tools/src/main/java/org/apache/kafka/tools/ReplicaVerificationTool.java
@@ -23,6 +23,7 @@ import org.apache.kafka.clients.ClientResponse;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.ManualMetadataUpdater;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NetworkClientUtils;
import org.apache.kafka.clients.admin.Admin;
@@ -706,7 +707,8 @@ public class ReplicaVerificationTool {
time,
false,
new ApiVersions(),
- logContext
+ logContext,
+
MetadataRecoveryStrategy.forName(consumerConfig.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG))
);
}
diff --git
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
index 6cd367c06d9..5afd9db26c3 100644
---
a/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
+++
b/trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConnectionStressWorker.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.ManualMetadataUpdater;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.NetworkClient;
import org.apache.kafka.clients.NetworkClientUtils;
import org.apache.kafka.clients.admin.Admin;
@@ -179,7 +180,8 @@ public class ConnectionStressWorker implements TaskWorker {
TIME,
false,
new ApiVersions(),
- logContext)) {
+ logContext,
+ MetadataRecoveryStrategy.NONE)) {
NetworkClientUtils.awaitReady(client, targetNode,
TIME, 500);
}
}