This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 52d2fa5c8b3 KAFKA-17885: Enable clients to rebootstrap based on
timeout or error code (KIP-1102) (#17720)
52d2fa5c8b3 is described below
commit 52d2fa5c8b3d5f737fe97645c65e73df872e2a67
Author: Rajini Sivaram <[email protected]>
AuthorDate: Wed Nov 13 13:01:08 2024 +0000
KAFKA-17885: Enable clients to rebootstrap based on timeout or error code
(KIP-1102) (#17720)
Implementation of
https://cwiki.apache.org/confluence/display/KAFKA/KIP-1102%3A+Enable+clients+to+rebootstrap+based+on+timeout+or+error+code
- Introduces rebootstrap trigger interval config
metadata.recovery.rebootstrap.trigger.ms, set to 5 minutes by default
- Makes rebootstrap the default for metadata.recovery.strategy
- Adds new error code REBOOTSTRAP_REQUIRED, introduces top-level error code
in metadata response. On this error, clients rebootstrap.
- Configs apply to producers, consumers, share consumers, admin clients,
Connect and KStreams clients.
Reviewers: Apoorv Mittal <[email protected]>, Manikumar Reddy
<[email protected]>
---
checkstyle/suppressions.xml | 2 +-
.../java/org/apache/kafka/clients/ClientUtils.java | 1 +
.../apache/kafka/clients/CommonClientConfigs.java | 11 ++-
.../java/org/apache/kafka/clients/KafkaClient.java | 6 ++
.../org/apache/kafka/clients/NetworkClient.java | 96 ++++++++++++++++++++--
.../kafka/clients/admin/AdminClientConfig.java | 12 ++-
.../kafka/clients/admin/KafkaAdminClient.java | 22 ++++-
.../admin/internals/AdminMetadataManager.java | 22 +++++
.../kafka/clients/consumer/ConsumerConfig.java | 9 +-
.../kafka/clients/producer/ProducerConfig.java | 8 +-
.../errors/RebootstrapRequiredException.java | 30 +++++++
.../org/apache/kafka/common/protocol/Errors.java | 4 +-
.../kafka/common/requests/MetadataRequest.java | 1 +
.../kafka/common/requests/MetadataResponse.java | 4 +
.../resources/common/message/MetadataRequest.json | 3 +-
.../resources/common/message/MetadataResponse.json | 8 +-
.../java/org/apache/kafka/clients/MockClient.java | 5 ++
.../apache/kafka/clients/NetworkClientTest.java | 63 +++++++++++++-
.../kafka/clients/admin/AdminClientConfigTest.java | 3 +-
.../admin/internals/AdminMetadataManagerTest.java | 41 +++++++++
.../kafka/clients/consumer/ConsumerConfigTest.java | 2 +-
.../kafka/clients/producer/ProducerConfigTest.java | 2 +-
.../runtime/distributed/DistributedConfig.java | 13 ++-
.../runtime/distributed/WorkerGroupMember.java | 1 +
.../integration/ConnectWorkerIntegrationTest.java | 3 +
.../kafka/api/AdminClientRebootstrapTest.scala | 11 ++-
.../kafka/api/ConsumerRebootstrapTest.scala | 67 +++++++++++++--
.../kafka/api/ProducerRebootstrapTest.scala | 13 +--
.../integration/kafka/api/RebootstrapTest.scala | 15 ++--
.../org/apache/kafka/streams/StreamsConfig.java | 14 ++++
30 files changed, 446 insertions(+), 46 deletions(-)
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 819b53ddc4a..45873818535 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -119,7 +119,7 @@
files="(Sender|Fetcher|FetchRequestManager|OffsetFetcher|KafkaConsumer|Metrics|RequestResponse|TransactionManager|KafkaAdminClient|Message|KafkaProducer)Test.java"/>
<suppress checks="ClassFanOutComplexity"
-
files="(ConsumerCoordinator|KafkaConsumer|RequestResponse|Fetcher|FetchRequestManager|KafkaAdminClient|Message|KafkaProducer)Test.java"/>
+
files="(ConsumerCoordinator|KafkaConsumer|RequestResponse|Fetcher|FetchRequestManager|KafkaAdminClient|Message|KafkaProducer|NetworkClient)Test.java"/>
<suppress checks="ClassFanOutComplexity"
files="MockAdminClient.java"/>
diff --git a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
index c2c2fca17d1..e29133d0329 100644
--- a/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
+++ b/clients/src/main/java/org/apache/kafka/clients/ClientUtils.java
@@ -247,6 +247,7 @@ public final class ClientUtils {
logContext,
hostResolver,
clientTelemetrySender,
+
config.getLong(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG),
MetadataRecoveryStrategy.forName(config.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG))
);
} catch (Throwable t) {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
index 1a5bb595d6d..9c615bfbcc5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java
@@ -230,8 +230,15 @@ public class CommonClientConfigs {
"Brokers appear unavailable when disconnected and no current retry
attempt is in-progress. " +
"Consider increasing <code>reconnect.backoff.ms</code> and
<code>reconnect.backoff.max.ms</code> and " +
"decreasing <code>socket.connection.setup.timeout.ms</code> and
<code>socket.connection.setup.timeout.max.ms</code> " +
- "for the client.";
- public static final String DEFAULT_METADATA_RECOVERY_STRATEGY =
MetadataRecoveryStrategy.NONE.name;
+ "for the client. Rebootstrap is also triggered if connection
cannot be established to any of the brokers for " +
+ "<code>metadata.recovery.rebootstrap.trigger.ms</code>
milliseconds or if server requests rebootstrap.";
+ public static final String DEFAULT_METADATA_RECOVERY_STRATEGY =
MetadataRecoveryStrategy.REBOOTSTRAP.name;
+
+ public static final String METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG
= "metadata.recovery.rebootstrap.trigger.ms";
+ public static final String METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC =
"If a client configured to rebootstrap using " +
+ "<code>metadata.recovery.strategy=rebootstrap</code> is unable to
obtain metadata from any of the brokers in the last known " +
+ "metadata for this interval, client repeats the bootstrap process
using <code>bootstrap.servers</code> configuration.";
+ public static final long DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS
= 300 * 1000;
/**
* Postprocess the configuration so that exponential backoff is disabled
when reconnect backoff
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 46b64986064..78c13985228 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -122,6 +122,12 @@ public interface KafkaClient extends Closeable {
*/
void close(String nodeId);
+ /**
+ * Closes connections to all nodes. All requests on the connections will
be cleared. ClientRequest
+ * callbacks will not be invoked for the cleared requests, nor will they
be returned from poll().
+ */
+ void closeAll();
+
/**
* Choose the node with the fewest outstanding requests. This method will
prefer a node with an existing connection,
* but will potentially choose a node for which we don't yet have a
connection if all existing connections are in
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 2f3cd781627..044019d6581 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -115,6 +115,9 @@ public class NetworkClient implements KafkaClient {
/* time in ms to wait before retrying to create connection to a server */
private final long reconnectBackoffMs;
+ /* Timeout starting from an attempt to fetch metadata after which client
rebootstraps */
+ private final long rebootstrapTriggerMs;
+
private final MetadataRecoveryStrategy metadataRecoveryStrategy;
private final Time time;
@@ -166,11 +169,51 @@ public class NetworkClient implements KafkaClient {
time,
discoverBrokerVersions,
apiVersions,
- null,
logContext,
+ Long.MAX_VALUE,
metadataRecoveryStrategy);
}
+ public NetworkClient(Selectable selector,
+ Metadata metadata,
+ String clientId,
+ int maxInFlightRequestsPerConnection,
+ long reconnectBackoffMs,
+ long reconnectBackoffMax,
+ int socketSendBuffer,
+ int socketReceiveBuffer,
+ int defaultRequestTimeoutMs,
+ long connectionSetupTimeoutMs,
+ long connectionSetupTimeoutMaxMs,
+ Time time,
+ boolean discoverBrokerVersions,
+ ApiVersions apiVersions,
+ LogContext logContext,
+ long rebootstrapTriggerMs,
+ MetadataRecoveryStrategy metadataRecoveryStrategy) {
+ this(null,
+ metadata,
+ selector,
+ clientId,
+ maxInFlightRequestsPerConnection,
+ reconnectBackoffMs,
+ reconnectBackoffMax,
+ socketSendBuffer,
+ socketReceiveBuffer,
+ defaultRequestTimeoutMs,
+ connectionSetupTimeoutMs,
+ connectionSetupTimeoutMaxMs,
+ time,
+ discoverBrokerVersions,
+ apiVersions,
+ null,
+ logContext,
+ new DefaultHostResolver(),
+ null,
+ rebootstrapTriggerMs,
+ metadataRecoveryStrategy);
+ }
+
public NetworkClient(Selectable selector,
Metadata metadata,
String clientId,
@@ -207,6 +250,7 @@ public class NetworkClient implements KafkaClient {
logContext,
new DefaultHostResolver(),
null,
+ Long.MAX_VALUE,
metadataRecoveryStrategy);
}
@@ -245,6 +289,7 @@ public class NetworkClient implements KafkaClient {
logContext,
new DefaultHostResolver(),
null,
+ Long.MAX_VALUE,
metadataRecoveryStrategy);
}
@@ -267,6 +312,7 @@ public class NetworkClient implements KafkaClient {
LogContext logContext,
HostResolver hostResolver,
ClientTelemetrySender clientTelemetrySender,
+ long rebootstrapTriggerMs,
MetadataRecoveryStrategy metadataRecoveryStrategy) {
/* It would be better if we could pass `DefaultMetadataUpdater` from
the public constructor, but it's not
* possible because `DefaultMetadataUpdater` is an inner class and it
can only be instantiated after the
@@ -298,6 +344,7 @@ public class NetworkClient implements KafkaClient {
this.log = logContext.logger(NetworkClient.class);
this.state = new AtomicReference<>(State.ACTIVE);
this.telemetrySender = (clientTelemetrySender != null) ? new
TelemetrySender(clientTelemetrySender) : null;
+ this.rebootstrapTriggerMs = rebootstrapTriggerMs;
this.metadataRecoveryStrategy = metadataRecoveryStrategy;
}
@@ -401,6 +448,17 @@ public class NetworkClient implements KafkaClient {
long now = time.milliseconds();
cancelInFlightRequests(nodeId, now, null, false);
connectionStates.remove(nodeId);
+ apiVersions.remove(nodeId);
+ nodesNeedingApiVersionsFetch.remove(nodeId);
+ }
+
+ @Override
+ public void closeAll() {
+ log.info("Client requested connection close from all nodes.");
+ List<Node> nodes = this.metadataUpdater.fetchNodes();
+ for (Node node : nodes) {
+ close(node.idString());
+ }
}
/**
@@ -1116,6 +1174,15 @@ public class NetworkClient implements KafkaClient {
// Defined if there is a request in progress, null otherwise
private InProgressData inProgress;
+ /*
+ * The time in wall-clock milliseconds when we started attempts to
fetch metadata. If empty,
+ * metadata has not been requested. This is the start time based on
which rebootstrap is
+ * triggered if metadata is not obtained for the configured
rebootstrap trigger interval.
+ * Set to Optional.of(0L) to force rebootstrap immediately.
+ */
+ private Optional<Long> metadataAttemptStartMs = Optional.empty();
+
+
DefaultMetadataUpdater(Metadata metadata) {
this.metadata = metadata;
this.inProgress = null;
@@ -1146,6 +1213,14 @@ public class NetworkClient implements KafkaClient {
return metadataTimeout;
}
+ if (metadataRecoveryStrategy ==
MetadataRecoveryStrategy.REBOOTSTRAP) {
+ if (!metadataAttemptStartMs.isPresent())
+ metadataAttemptStartMs = Optional.of(now);
+ else if (metadataAttemptStartMs.filter(startMs -> now -
startMs > rebootstrapTriggerMs).isPresent()) {
+ rebootstrap(now);
+ }
+ }
+
// Beware that the behavior of this method and the computation of
timeouts for poll() are
// highly dependent on the behavior of leastLoadedNode.
LeastLoadedNode leastLoadedNode = leastLoadedNode(now);
@@ -1153,7 +1228,7 @@ public class NetworkClient implements KafkaClient {
// Rebootstrap if needed and configured.
if (metadataRecoveryStrategy ==
MetadataRecoveryStrategy.REBOOTSTRAP
&& !leastLoadedNode.hasNodeAvailableOrConnectionReady()) {
- metadata.rebootstrap();
+ rebootstrap(now);
leastLoadedNode = leastLoadedNode(now);
}
@@ -1219,13 +1294,18 @@ public class NetworkClient implements KafkaClient {
if (!errors.isEmpty())
log.warn("The metadata response from the cluster reported a
recoverable issue with correlation id {} : {}", requestHeader.correlationId(),
errors);
- // When talking to the startup phase of a broker, it is possible
to receive an empty metadata set, which
- // we should retry later.
- if (response.brokers().isEmpty()) {
+ if (metadataRecoveryStrategy ==
MetadataRecoveryStrategy.REBOOTSTRAP && response.topLevelError() ==
Errors.REBOOTSTRAP_REQUIRED) {
+ log.info("Rebootstrap requested by server.");
+ metadataAttemptStartMs = Optional.of(0L); // to force
rebootstrap
+ this.metadata.requestUpdate(true);
+ } else if (response.brokers().isEmpty()) {
+ // When talking to the startup phase of a broker, it is
possible to receive an empty metadata set, which
+ // we should retry later.
log.trace("Ignoring empty metadata response with correlation
id {}.", requestHeader.correlationId());
this.metadata.failedUpdate(now);
} else {
this.metadata.update(inProgress.requestVersion, response,
inProgress.isPartialUpdate, now);
+ metadataAttemptStartMs = Optional.empty();
}
inProgress = null;
@@ -1236,6 +1316,12 @@ public class NetworkClient implements KafkaClient {
this.metadata.close();
}
+ private void rebootstrap(long now) {
+ closeAll();
+ metadata.rebootstrap();
+ metadataAttemptStartMs = Optional.of(now);
+ }
+
/**
* Add a metadata request to the list of sends if we can make one
*/
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
index 2a00f450c5c..a87af6be154 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
@@ -141,6 +141,10 @@ public class AdminClientConfig extends AbstractConfig {
public static final String METADATA_RECOVERY_STRATEGY_DOC =
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC;
public static final String DEFAULT_METADATA_RECOVERY_STRATEGY =
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY;
+ public static final String METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG
= CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG;
+ public static final String METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC =
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC;
+ public static final long DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS
= CommonClientConfigs.DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS;
+
/**
* <code>security.providers</code>
*/
@@ -270,7 +274,13 @@ public class AdminClientConfig extends AbstractConfig {
ConfigDef.CaseInsensitiveValidString
.in(Utils.enumOptions(MetadataRecoveryStrategy.class)),
Importance.LOW,
- METADATA_RECOVERY_STRATEGY_DOC);
+ METADATA_RECOVERY_STRATEGY_DOC)
+
.define(METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
+ Type.LONG,
+
DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS,
+ atLeast(0),
+ Importance.LOW,
+
METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC);
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index d9d8ce58460..7e8f42091fc 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -406,6 +406,7 @@ public class KafkaAdminClient extends AdminClient {
private final long retryBackoffMs;
private final long retryBackoffMaxMs;
private final ExponentialBackoff retryBackoff;
+ private final long rebootstrapTriggerMs;
private final MetadataRecoveryStrategy metadataRecoveryStrategy;
private final AdminFetchMetricsManager adminFetchMetricsManager;
private final Optional<ClientTelemetryReporter> clientTelemetryReporter;
@@ -632,6 +633,7 @@ public class KafkaAdminClient extends AdminClient {
List<MetricsReporter> reporters =
CommonClientConfigs.metricsReporters(this.clientId, config);
this.clientTelemetryReporter = clientTelemetryReporter;
this.clientTelemetryReporter.ifPresent(reporters::add);
+ this.rebootstrapTriggerMs =
config.getLong(AdminClientConfig.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG);
this.metadataRecoveryStrategy =
MetadataRecoveryStrategy.forName(config.getString(AdminClientConfig.METADATA_RECOVERY_STRATEGY_CONFIG));
this.adminFetchMetricsManager = new AdminFetchMetricsManager(metrics);
config.logUnused();
@@ -722,10 +724,15 @@ public class KafkaAdminClient extends AdminClient {
private class MetadataUpdateNodeIdProvider implements NodeProvider {
@Override
public Node provide() {
- LeastLoadedNode leastLoadedNode =
client.leastLoadedNode(time.milliseconds());
+ long now = time.milliseconds();
+ if (metadataRecoveryStrategy ==
MetadataRecoveryStrategy.REBOOTSTRAP &&
+ metadataManager.needsRebootstrap(now,
rebootstrapTriggerMs)) {
+ rebootstrap(now);
+ }
+ LeastLoadedNode leastLoadedNode = client.leastLoadedNode(now);
if (metadataRecoveryStrategy ==
MetadataRecoveryStrategy.REBOOTSTRAP
&& !leastLoadedNode.hasNodeAvailableOrConnectionReady()) {
- metadataManager.rebootstrap(time.milliseconds());
+ rebootstrap(now);
}
return leastLoadedNode.node();
@@ -735,6 +742,11 @@ public class KafkaAdminClient extends AdminClient {
public boolean supportsUseControllers() {
return true;
}
+
+ private void rebootstrap(long now) {
+ client.closeAll();
+ metadataManager.rebootstrap(now);
+ }
}
private class ConstantNodeIdProvider implements NodeProvider {
@@ -1701,7 +1713,11 @@ public class KafkaAdminClient extends AdminClient {
public void handleResponse(AbstractResponse abstractResponse) {
MetadataResponse response = (MetadataResponse)
abstractResponse;
long now = time.milliseconds();
- metadataManager.update(response.buildCluster(), now);
+
+ if (response.topLevelError() ==
Errors.REBOOTSTRAP_REQUIRED)
+ metadataManager.initiateRebootstrap();
+ else
+ metadataManager.update(response.buildCluster(), now);
// Unassign all unsent requests after a metadata refresh
to allow for a new
// destination to be selected from the new metadata
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
index 90b237aa749..09fd000e50e 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
@@ -83,6 +83,15 @@ public class AdminMetadataManager {
*/
private long lastMetadataFetchAttemptMs = 0;
+ /**
+ * The time in wall-clock milliseconds when we started attempts to fetch
metadata. If empty,
+ * metadata has not been requested. This is the start time based on which
rebootstrap is
+ * triggered if metadata is not obtained for the configured rebootstrap
trigger interval.
+ * Set to Optional.of(0L) to force rebootstrap immediately.
+ */
+ private Optional<Long> metadataAttemptStartMs = Optional.empty();
+
+
/**
* The current cluster information.
*/
@@ -240,12 +249,18 @@ public class AdminMetadataManager {
return Math.max(0, refreshBackoffMs - timeSinceAttempt);
}
+ public boolean needsRebootstrap(long now, long rebootstrapTriggerMs) {
+ return metadataAttemptStartMs.filter(startMs -> now - startMs >
rebootstrapTriggerMs).isPresent();
+ }
+
/**
* Transition into the UPDATE_PENDING state. Updates
lastMetadataFetchAttemptMs.
*/
public void transitionToUpdatePending(long now) {
this.state = State.UPDATE_PENDING;
this.lastMetadataFetchAttemptMs = now;
+ if (!metadataAttemptStartMs.isPresent())
+ metadataAttemptStartMs = Optional.of(now);
}
public void updateFailed(Throwable exception) {
@@ -289,17 +304,24 @@ public class AdminMetadataManager {
this.state = State.QUIESCENT;
this.fatalException = null;
+ this.metadataAttemptStartMs = Optional.empty();
if (!cluster.nodes().isEmpty()) {
this.cluster = cluster;
}
}
+ public void initiateRebootstrap() {
+ requestUpdate();
+ this.metadataAttemptStartMs = Optional.of(0L);
+ }
+
/**
* Rebootstrap metadata with the cluster previously used for bootstrapping.
*/
public void rebootstrap(long now) {
log.info("Rebootstrapping with {}", this.bootstrapCluster);
update(bootstrapCluster, now);
+ this.metadataAttemptStartMs = Optional.of(now);
}
}
diff --git
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
index cee249c280f..ff9bb6c1166 100644
---
a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java
@@ -654,7 +654,14 @@ public class ConsumerConfig extends AbstractConfig {
ConfigDef.CaseInsensitiveValidString
.in(Utils.enumOptions(MetadataRecoveryStrategy.class)),
Importance.LOW,
-
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC);
+
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC)
+
.define(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
+ Type.LONG,
+
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS,
+ atLeast(0),
+ Importance.LOW,
+
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC);
+
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
index 8b360d4d839..217d9495922 100644
---
a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
+++
b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java
@@ -527,7 +527,13 @@ public class ProducerConfig extends AbstractConfig {
ConfigDef.CaseInsensitiveValidString
.in(Utils.enumOptions(MetadataRecoveryStrategy.class)),
Importance.LOW,
-
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC);
+
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC)
+
.define(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
+ Type.LONG,
+
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS,
+ atLeast(0),
+ Importance.LOW,
+
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC);
}
@Override
diff --git
a/clients/src/main/java/org/apache/kafka/common/errors/RebootstrapRequiredException.java
b/clients/src/main/java/org/apache/kafka/common/errors/RebootstrapRequiredException.java
new file mode 100644
index 00000000000..78a66aabd3e
--- /dev/null
+++
b/clients/src/main/java/org/apache/kafka/common/errors/RebootstrapRequiredException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.errors;
+
+public class RebootstrapRequiredException extends ApiException {
+ private static final long serialVersionUID = 1L;
+
+ public RebootstrapRequiredException(String message) {
+ super(message);
+ }
+
+ public RebootstrapRequiredException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
index a80ec308ebb..309ae7bc86a 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
@@ -107,6 +107,7 @@ import
org.apache.kafka.common.errors.PrincipalDeserializationException;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.apache.kafka.common.errors.ReassignmentInProgressException;
import org.apache.kafka.common.errors.RebalanceInProgressException;
+import org.apache.kafka.common.errors.RebootstrapRequiredException;
import org.apache.kafka.common.errors.RecordBatchTooLargeException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.ReplicaNotAvailableException;
@@ -411,7 +412,8 @@ public enum Errors {
INVALID_VOTER_KEY(125, "The voter key doesn't match the receiving
replica's key.", InvalidVoterKeyException::new),
DUPLICATE_VOTER(126, "The voter is already part of the set of voters.",
DuplicateVoterException::new),
VOTER_NOT_FOUND(127, "The voter is not part of the set of voters.",
VoterNotFoundException::new),
- INVALID_REGULAR_EXPRESSION(128, "The regular expression is not valid.",
InvalidRegularExpression::new);
+ INVALID_REGULAR_EXPRESSION(128, "The regular expression is not valid.",
InvalidRegularExpression::new),
+ REBOOTSTRAP_REQUIRED(129, "Client metadata is stale, client should
rebootstrap to obtain new metadata.", RebootstrapRequiredException::new);
private static final Logger log = LoggerFactory.getLogger(Errors.class);
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
index 48609b1666c..2e60e04b2aa 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java
@@ -156,6 +156,7 @@ public class MetadataRequest extends AbstractRequest {
}
responseData.setThrottleTimeMs(throttleTimeMs);
+ responseData.setErrorCode(error.code());
return new MetadataResponse(responseData, true);
}
diff --git
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
index d7d9a6c3ba4..3a7e4f276d9 100644
---
a/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
+++
b/clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java
@@ -104,6 +104,10 @@ public class MetadataResponse extends AbstractResponse {
return errors;
}
+ public Errors topLevelError() {
+ return Errors.forCode(data.errorCode());
+ }
+
/**
* Get a map of the topicIds which had metadata errors
* @return the map
diff --git a/clients/src/main/resources/common/message/MetadataRequest.json
b/clients/src/main/resources/common/message/MetadataRequest.json
index 552dea0a6f8..437bd0ef50b 100644
--- a/clients/src/main/resources/common/message/MetadataRequest.json
+++ b/clients/src/main/resources/common/message/MetadataRequest.json
@@ -18,7 +18,7 @@
"type": "request",
"listeners": ["zkBroker", "broker"],
"name": "MetadataRequest",
- "validVersions": "0-12",
+ "validVersions": "0-13",
"deprecatedVersions": "0-3",
"flexibleVersions": "9+",
"fields": [
@@ -40,6 +40,7 @@
// Version 11 deprecates IncludeClusterAuthorizedOperations field. This is
now exposed
// by the DescribeCluster API (KIP-700).
// Version 12 supports topic Id.
+ // Version 13 supports top-level error code in the response.
{ "name": "Topics", "type": "[]MetadataRequestTopic", "versions": "0+",
"nullableVersions": "1+",
"about": "The topics to fetch metadata for.", "fields": [
{ "name": "TopicId", "type": "uuid", "versions": "10+", "ignorable":
true, "about": "The topic id." },
diff --git a/clients/src/main/resources/common/message/MetadataResponse.json
b/clients/src/main/resources/common/message/MetadataResponse.json
index 408cdc7940a..d65484b3f67 100644
--- a/clients/src/main/resources/common/message/MetadataResponse.json
+++ b/clients/src/main/resources/common/message/MetadataResponse.json
@@ -42,7 +42,8 @@
// Version 11 deprecates ClusterAuthorizedOperations. This is now exposed
// by the DescribeCluster API (KIP-700).
// Version 12 supports topicId.
- "validVersions": "0-12",
+ // Version 13 supports top-level error code in the response.
+ "validVersions": "0-13",
"flexibleVersions": "9+",
"fields": [
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "3+",
"ignorable": true,
@@ -93,6 +94,9 @@
"about": "32-bit bitfield to represent authorized operations for this
topic." }
]},
{ "name": "ClusterAuthorizedOperations", "type": "int32", "versions":
"8-10", "default": "-2147483648",
- "about": "32-bit bitfield to represent authorized operations for this
cluster." }
+ "about": "32-bit bitfield to represent authorized operations for this
cluster." },
+ { "name": "ErrorCode", "type": "int16", "versions": "13+", "ignorable":
true,
+ "about": "The top-level error code, or 0 if there was no error." }
+
]
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 8a195184e93..6386a11a322 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -588,6 +588,11 @@ public class MockClient implements KafkaClient {
connections.remove(node);
}
+ @Override
+ public void closeAll() {
+ connections.clear();
+ }
+
@Override
public LeastLoadedNode leastLoadedNode(long now) {
// Consistent with NetworkClient, we do not return nodes awaiting
reconnect backoff
diff --git
a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 89f567157c3..3fd75a261d8 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.errors.AuthenticationException;
+import org.apache.kafka.common.errors.RebootstrapRequiredException;
import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.internals.ClusterResourceListeners;
import org.apache.kafka.common.message.ApiMessageType;
@@ -59,6 +60,7 @@ import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.net.InetAddress;
+import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -241,6 +243,59 @@ public class NetworkClientTest {
assertEquals(UnsupportedVersionException.class,
metadataUpdater.getAndClearFailure().getClass());
}
+ @Test
+ public void testRebootstrap() {
+ long rebootstrapTriggerMs = 1000;
+ AtomicInteger rebootstrapCount = new AtomicInteger();
+ Metadata metadata = new Metadata(50, 50, 5000, new LogContext(), new
ClusterResourceListeners()) {
+ @Override
+ public synchronized void rebootstrap() {
+ super.rebootstrap();
+ rebootstrapCount.incrementAndGet();
+ }
+ };
+
+ NetworkClient client = new NetworkClient(selector, metadata, "mock",
Integer.MAX_VALUE,
+ reconnectBackoffMsTest, 0, 64 * 1024, 64 * 1024,
+ defaultRequestTimeoutMs, connectionSetupTimeoutMsTest,
connectionSetupTimeoutMaxMsTest, time, false, new ApiVersions(), new
LogContext(),
+ rebootstrapTriggerMs,
+ MetadataRecoveryStrategy.REBOOTSTRAP);
+ MetadataUpdater metadataUpdater = TestUtils.fieldValue(client,
NetworkClient.class, "metadataUpdater");
+ metadata.bootstrap(Collections.singletonList(new
InetSocketAddress("localhost", 9999)));
+
+ metadata.requestUpdate(true);
+ client.poll(0, time.milliseconds());
+ time.sleep(rebootstrapTriggerMs + 1);
+ client.poll(0, time.milliseconds());
+ assertEquals(1, rebootstrapCount.get());
+ time.sleep(1);
+ client.poll(0, time.milliseconds());
+ assertEquals(1, rebootstrapCount.get());
+
+ metadata.requestUpdate(true);
+ client.poll(0, time.milliseconds());
+ assertEquals(1, rebootstrapCount.get());
+ metadataUpdater.handleFailedRequest(time.milliseconds(),
Optional.of(new KafkaException()));
+ client.poll(0, time.milliseconds());
+ assertEquals(1, rebootstrapCount.get());
+ time.sleep(rebootstrapTriggerMs);
+ client.poll(0, time.milliseconds());
+ assertEquals(2, rebootstrapCount.get());
+
+ metadata.requestUpdate(true);
+ client.poll(0, time.milliseconds());
+ assertEquals(2, rebootstrapCount.get());
+
+ MetadataRequest.Builder builder = new
MetadataRequest.Builder(Collections.emptyList(), true);
+ ClientRequest request = client.newClientRequest(node.idString(),
builder, time.milliseconds(), true);
+ MetadataResponse rebootstrapResponse = (MetadataResponse)
builder.build().getErrorResponse(0, new
RebootstrapRequiredException("rebootstrap"));
+
metadataUpdater.handleSuccessfulResponse(request.makeHeader(builder.latestAllowedVersion()),
time.milliseconds(), rebootstrapResponse);
+ assertEquals(2, rebootstrapCount.get());
+ time.sleep(50);
+ client.poll(0, time.milliseconds());
+ assertEquals(3, rebootstrapCount.get());
+ }
+
private void checkSimpleRequestResponse(NetworkClient networkClient) {
awaitReady(networkClient, node); // has to be before creating any
request, as it may send ApiVersionsRequest and its response is mocked with
correlation id 0
short requestVersion = PRODUCE.latestVersion();
@@ -1086,7 +1141,7 @@ public class NetworkClientTest {
reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024,
64 * 1024,
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest,
connectionSetupTimeoutMaxMsTest,
time, false, new ApiVersions(), null, new LogContext(),
mockHostResolver, mockClientTelemetrySender,
- MetadataRecoveryStrategy.NONE);
+ Long.MAX_VALUE, MetadataRecoveryStrategy.NONE);
// Connect to one the initial addresses, then change the addresses and
disconnect
client.ready(node, time.milliseconds());
@@ -1147,7 +1202,7 @@ public class NetworkClientTest {
reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024,
64 * 1024,
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest,
connectionSetupTimeoutMaxMsTest,
time, false, new ApiVersions(), null, new LogContext(),
mockHostResolver, mockClientTelemetrySender,
- MetadataRecoveryStrategy.NONE);
+ Long.MAX_VALUE, MetadataRecoveryStrategy.NONE);
// First connection attempt should fail
client.ready(node, time.milliseconds());
@@ -1200,7 +1255,7 @@ public class NetworkClientTest {
reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024,
64 * 1024,
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest,
connectionSetupTimeoutMaxMsTest,
time, false, new ApiVersions(), null, new LogContext(),
mockHostResolver, mockClientTelemetrySender,
- MetadataRecoveryStrategy.NONE);
+ Long.MAX_VALUE, MetadataRecoveryStrategy.NONE);
// Connect to one the initial addresses, then change the addresses and
disconnect
client.ready(node, time.milliseconds());
@@ -1309,7 +1364,7 @@ public class NetworkClientTest {
reconnectBackoffMsTest, reconnectBackoffMaxMsTest, 64 * 1024, 64 *
1024,
defaultRequestTimeoutMs, connectionSetupTimeoutMsTest,
connectionSetupTimeoutMaxMsTest,
time, true, new ApiVersions(), null, new LogContext(), new
DefaultHostResolver(), mockClientTelemetrySender,
- MetadataRecoveryStrategy.NONE);
+ Long.MAX_VALUE, MetadataRecoveryStrategy.NONE);
// Send the ApiVersionsRequest
client.ready(node, time.milliseconds());
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientConfigTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientConfigTest.java
index 252aa63109a..6d0ec9e8a8e 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientConfigTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/AdminClientConfigTest.java
@@ -35,7 +35,8 @@ public class AdminClientConfigTest {
public void testDefaultMetadataRecoveryStrategy() {
Map<String, Object> configs = new HashMap<>();
final AdminClientConfig adminClientConfig = new
AdminClientConfig(configs);
- assertEquals(MetadataRecoveryStrategy.NONE.name,
adminClientConfig.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
+ assertEquals(MetadataRecoveryStrategy.REBOOTSTRAP.name,
adminClientConfig.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
+
}
@Test
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminMetadataManagerTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminMetadataManagerTest.java
index 54ac4175582..5620dd06a5e 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminMetadataManagerTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/internals/AdminMetadataManagerTest.java
@@ -98,6 +98,47 @@ public class AdminMetadataManagerTest {
assertTrue(mgr.isReady());
}
+ @Test
+ public void testNeedsRebootstrap() {
+ long rebootstrapTriggerMs = 1000;
+ mgr.update(Cluster.bootstrap(Collections.singletonList(new
InetSocketAddress("localhost", 9999))), time.milliseconds());
+ assertFalse(mgr.needsRebootstrap(time.milliseconds(),
rebootstrapTriggerMs));
+ assertFalse(mgr.needsRebootstrap(time.milliseconds() + 2000,
rebootstrapTriggerMs));
+
+ mgr.transitionToUpdatePending(time.milliseconds());
+ assertFalse(mgr.needsRebootstrap(time.milliseconds(),
rebootstrapTriggerMs));
+ assertTrue(mgr.needsRebootstrap(time.milliseconds() + 1001,
rebootstrapTriggerMs));
+
+ time.sleep(100);
+ mgr.updateFailed(new RuntimeException());
+ assertFalse(mgr.needsRebootstrap(time.milliseconds() + 900,
rebootstrapTriggerMs));
+ assertTrue(mgr.needsRebootstrap(time.milliseconds() + 901,
rebootstrapTriggerMs));
+
+ time.sleep(1000);
+ mgr.update(mockCluster(), time.milliseconds());
+ assertFalse(mgr.needsRebootstrap(time.milliseconds(),
rebootstrapTriggerMs));
+ assertFalse(mgr.needsRebootstrap(time.milliseconds() + 2000,
rebootstrapTriggerMs));
+
+ time.sleep(1000);
+ mgr.transitionToUpdatePending(time.milliseconds());
+ assertFalse(mgr.needsRebootstrap(time.milliseconds(),
rebootstrapTriggerMs));
+ assertTrue(mgr.needsRebootstrap(time.milliseconds() + 1001,
rebootstrapTriggerMs));
+
+ time.sleep(1001);
+ assertTrue(mgr.needsRebootstrap(time.milliseconds(),
rebootstrapTriggerMs));
+ mgr.rebootstrap(time.milliseconds());
+ assertFalse(mgr.needsRebootstrap(time.milliseconds(),
rebootstrapTriggerMs));
+ assertFalse(mgr.needsRebootstrap(time.milliseconds() + 1000,
rebootstrapTriggerMs));
+ assertTrue(mgr.needsRebootstrap(time.milliseconds() + 1001,
rebootstrapTriggerMs));
+
+ mgr.initiateRebootstrap();
+ assertTrue(mgr.needsRebootstrap(time.milliseconds(),
rebootstrapTriggerMs));
+ mgr.rebootstrap(time.milliseconds());
+ assertFalse(mgr.needsRebootstrap(time.milliseconds(),
rebootstrapTriggerMs));
+ assertFalse(mgr.needsRebootstrap(time.milliseconds() + 1000,
rebootstrapTriggerMs));
+ assertTrue(mgr.needsRebootstrap(time.milliseconds() + 1001,
rebootstrapTriggerMs));
+ }
+
private static Cluster mockCluster() {
HashMap<Integer, Node> nodes = new HashMap<>();
nodes.put(0, new Node(0, "localhost", 8121));
diff --git
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
index 99c45f05c15..cbe0bd6ef35 100644
---
a/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerConfigTest.java
@@ -210,7 +210,7 @@ public class ConsumerConfigTest {
configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
keyDeserializerClass);
configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
valueDeserializerClass);
final ConsumerConfig consumerConfig = new ConsumerConfig(configs);
- assertEquals(MetadataRecoveryStrategy.NONE.name,
consumerConfig.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
+ assertEquals(MetadataRecoveryStrategy.REBOOTSTRAP.name,
consumerConfig.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
}
@Test
diff --git
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
index f3ec9ca96c2..d8c77433545 100644
---
a/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/producer/ProducerConfigTest.java
@@ -106,7 +106,7 @@ public class ProducerConfigTest {
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
keySerializerClass);
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
valueSerializerClass);
final ProducerConfig producerConfig = new ProducerConfig(configs);
- assertEquals(MetadataRecoveryStrategy.NONE.name,
producerConfig.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
+ assertEquals(MetadataRecoveryStrategy.REBOOTSTRAP.name,
producerConfig.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG));
}
@Test
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
index 9150000223b..16ab0d47a3c 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedConfig.java
@@ -103,6 +103,10 @@ public final class DistributedConfig extends WorkerConfig {
private static final String METADATA_RECOVERY_STRATEGY_DOC =
CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC;
public static final String DEFAULT_METADATA_RECOVERY_STRATEGY =
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY;
+ public static final String METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG
= CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG;
+ private static final String METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC =
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC;
+ public static final long DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS
= CommonClientConfigs.DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS;
+
/**
* <code>worker.sync.timeout.ms</code>
*/
@@ -526,7 +530,14 @@ public final class DistributedConfig extends WorkerConfig {
ConfigDef.CaseInsensitiveValidString
.in(Utils.enumOptions(MetadataRecoveryStrategy.class)),
ConfigDef.Importance.LOW,
- METADATA_RECOVERY_STRATEGY_DOC);
+ METADATA_RECOVERY_STRATEGY_DOC)
+ .define(METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
+ ConfigDef.Type.LONG,
+ DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS,
+ atLeast(0),
+ ConfigDef.Importance.LOW,
+ METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC);
+
}
private final ExactlyOnceSourceSupport exactlyOnceSourceSupport;
diff --git
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index f4fdcaf801e..a3982f070a4 100644
---
a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++
b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -122,6 +122,7 @@ public class WorkerGroupMember {
true,
new ApiVersions(),
logContext,
+
config.getLong(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG),
MetadataRecoveryStrategy.forName(config.getString(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG))
);
this.client = new ConsumerNetworkClient(
diff --git
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
index 50e690057ec..1a78643950d 100644
---
a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
+++
b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectWorkerIntegrationTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.integration;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.provider.FileConfigProvider;
@@ -78,6 +79,7 @@ import javax.ws.rs.core.Response;
import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
import static
org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static
org.apache.kafka.clients.CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG;
import static
org.apache.kafka.common.config.AbstractConfig.CONFIG_PROVIDERS_CONFIG;
import static
org.apache.kafka.common.config.TopicConfig.DELETE_RETENTION_MS_CONFIG;
import static org.apache.kafka.common.config.TopicConfig.SEGMENT_MS_CONFIG;
@@ -840,6 +842,7 @@ public class ConnectWorkerIntegrationTest {
// Workaround for KAFKA-15676, which can cause the scheduled rebalance
delay to
// be spuriously triggered after the group coordinator for a Connect
cluster is bounced
workerProps.put(SCHEDULED_REBALANCE_MAX_DELAY_MS_CONFIG, "0");
+ workerProps.put(METADATA_RECOVERY_STRATEGY_CONFIG,
MetadataRecoveryStrategy.NONE.name);
useFixedBrokerPort();
diff --git
a/core/src/test/scala/integration/kafka/api/AdminClientRebootstrapTest.scala
b/core/src/test/scala/integration/kafka/api/AdminClientRebootstrapTest.scala
index 70b514f199b..d9cc326ff94 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientRebootstrapTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientRebootstrapTest.scala
@@ -16,15 +16,18 @@
*/
package kafka.api
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
class AdminClientRebootstrapTest extends RebootstrapTest {
- @Test
- def testRebootstrap(): Unit = {
+ @ParameterizedTest
+ @ValueSource(booleans = Array(false, true))
+ def testRebootstrap(useRebootstrapTriggerMs: Boolean): Unit = {
+
server1.shutdown()
server1.awaitShutdown()
- val adminClient = createAdminClient(configOverrides = clientOverrides)
+ val adminClient = createAdminClient(configOverrides =
clientOverrides(useRebootstrapTriggerMs))
// Only the server 0 is available for the admin client during the
bootstrap.
adminClient.listTopics().names().get()
diff --git
a/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
b/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
index 51a5afe391b..c7c87ce3b84 100644
--- a/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ConsumerRebootstrapTest.scala
@@ -16,17 +16,24 @@
*/
package kafka.api
+import kafka.api.ConsumerRebootstrapTest._
+import
kafka.server.QuorumTestHarness.getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit
import kafka.utils.{TestInfoUtils, TestUtils}
+import org.apache.kafka.clients.CommonClientConfigs
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
+import org.junit.jupiter.api.Assertions.{assertEquals, assertThrows}
import org.junit.jupiter.params.ParameterizedTest
-import org.junit.jupiter.params.provider.MethodSource
+import org.junit.jupiter.params.provider.{Arguments, MethodSource}
-import java.util.Collections
+import java.time.Duration
+import java.util.{Collections, stream}
+import java.util.concurrent.TimeUnit
+import java.util.concurrent.TimeoutException
class ConsumerRebootstrapTest extends RebootstrapTest {
- @ParameterizedTest(name =
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
-
@MethodSource(Array("getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit"))
- def testRebootstrap(quorum: String, groupProtocol: String): Unit = {
+ @ParameterizedTest(name = RebootstrapTestName)
+ @MethodSource(Array("rebootstrapTestParams"))
+ def testRebootstrap(quorum: String, groupProtocol: String,
useRebootstrapTriggerMs: Boolean): Unit = {
sendRecords(10, 0)
TestUtils.waitUntilTrue(
@@ -37,7 +44,7 @@ class ConsumerRebootstrapTest extends RebootstrapTest {
server1.shutdown()
server1.awaitShutdown()
- val consumer = createConsumer(configOverrides = clientOverrides)
+ val consumer = createConsumer(configOverrides =
clientOverrides(useRebootstrapTriggerMs))
// Only the server 0 is available for the consumer during the bootstrap.
consumer.assign(Collections.singleton(tp))
@@ -77,6 +84,40 @@ class ConsumerRebootstrapTest extends RebootstrapTest {
consumeAndVerifyRecords(consumer, 10, 20, startingKeyAndValueIndex = 20,
startingTimestamp = 20)
}
+ @ParameterizedTest(name = RebootstrapTestName)
+ @MethodSource(Array("rebootstrapTestParams"))
+ def testRebootstrapDisabled(quorum: String, groupProtocol: String,
useRebootstrapTriggerMs: Boolean): Unit = {
+ server1.shutdown()
+ server1.awaitShutdown()
+
+ val configOverrides = clientOverrides(useRebootstrapTriggerMs)
+ configOverrides.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
"none")
+ if (useRebootstrapTriggerMs)
+
configOverrides.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
"1000")
+
+ val producer = createProducer(configOverrides = configOverrides)
+ val consumer = createConsumer(configOverrides = configOverrides)
+ val adminClient = createAdminClient(configOverrides = configOverrides)
+
+ // Only the server 0 is available during the bootstrap.
+ val recordMetadata0 = producer.send(new ProducerRecord(topic, part, 0L,
"key 0".getBytes, "value 0".getBytes)).get(15, TimeUnit.SECONDS)
+ assertEquals(0, recordMetadata0.offset())
+ adminClient.listTopics().names().get(15, TimeUnit.SECONDS)
+ consumer.assign(Collections.singleton(tp))
+ consumeAndVerifyRecords(consumer, 1, 0)
+
+ server0.shutdown()
+ server0.awaitShutdown()
+ server1.startup()
+
+ assertThrows(classOf[TimeoutException], () => producer.send(new
ProducerRecord(topic, part, "key 2".getBytes, "value 2".getBytes)).get(5,
TimeUnit.SECONDS))
+ assertThrows(classOf[TimeoutException], () =>
adminClient.listTopics().names().get(5, TimeUnit.SECONDS))
+
+ val producer2 = createProducer(configOverrides = configOverrides)
+ producer2.send(new ProducerRecord(topic, part, 1L, "key 1".getBytes,
"value 1".getBytes)).get(15, TimeUnit.SECONDS)
+ assertEquals(0, consumer.poll(Duration.ofSeconds(5)).count)
+ }
+
private def sendRecords(numRecords: Int, from: Int): Unit = {
val producer: KafkaProducer[Array[Byte], Array[Byte]] = createProducer()
(from until (numRecords + from)).foreach { i =>
@@ -87,3 +128,17 @@ class ConsumerRebootstrapTest extends RebootstrapTest {
producer.close()
}
}
+
+object ConsumerRebootstrapTest {
+
+ final val RebootstrapTestName =
s"${TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames}.useRebootstrapTriggerMs={2}"
+ def rebootstrapTestParams: stream.Stream[Arguments] = {
+ assertEquals(1,
getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit.count())
+ val args =
getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_ZK_implicit
+ .findFirst().get.get
+ stream.Stream.of(
+ Arguments.of((args :+ true):_*),
+ Arguments.of((args :+ false):_*)
+ )
+ }
+}
diff --git
a/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
b/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
index 3cb40b6a0cf..d0eabf370cb 100644
--- a/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
+++ b/core/src/test/scala/integration/kafka/api/ProducerRebootstrapTest.scala
@@ -18,18 +18,21 @@ package kafka.api
import org.apache.kafka.clients.producer.ProducerRecord
import org.junit.jupiter.api.Assertions.assertEquals
-import org.junit.jupiter.api.Test
+import org.junit.jupiter.params.ParameterizedTest
+import org.junit.jupiter.params.provider.ValueSource
class ProducerRebootstrapTest extends RebootstrapTest {
- @Test
- def testRebootstrap(): Unit = {
+ @ParameterizedTest
+ @ValueSource(booleans = Array(false, true))
+ def testRebootstrap(useRebootstrapTriggerMs: Boolean): Unit = {
server1.shutdown()
server1.awaitShutdown()
- val producer = createProducer(configOverrides = clientOverrides)
+ val producer = createProducer(configOverrides =
clientOverrides(useRebootstrapTriggerMs))
// Only the server 0 is available for the producer during the bootstrap.
- producer.send(new ProducerRecord(topic, part, "key 0".getBytes, "value
0".getBytes)).get()
+ val recordMetadata0 = producer.send(new ProducerRecord(topic, part, "key
0".getBytes, "value 0".getBytes)).get()
+ assertEquals(0, recordMetadata0.offset())
server0.shutdown()
server0.awaitShutdown()
diff --git a/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
b/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
index b3b044ebcdb..45324a89c6e 100644
--- a/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
+++ b/core/src/test/scala/integration/kafka/api/RebootstrapTest.scala
@@ -40,12 +40,17 @@ abstract class RebootstrapTest extends AbstractConsumerTest
{
.map(KafkaConfig.fromProps(_, overridingProps))
}
- def clientOverrides: Properties = {
+ def clientOverrides(useRebootstrapTriggerMs: Boolean): Properties = {
val overrides = new Properties()
-
overrides.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
"5000")
-
overrides.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
"5000")
- overrides.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "1000")
- overrides.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG, "1000")
+ if (useRebootstrapTriggerMs) {
+
overrides.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
"5000")
+ } else {
+
overrides.put(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
"3600000")
+
overrides.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG,
"5000")
+
overrides.put(CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG,
"5000")
+ overrides.put(CommonClientConfigs.RECONNECT_BACKOFF_MS_CONFIG, "1000")
+ overrides.put(CommonClientConfigs.RECONNECT_BACKOFF_MAX_MS_CONFIG,
"1000")
+ }
overrides.put(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
"rebootstrap")
overrides
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index dafef7d7bc4..29b92408ef5 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.AdminClientConfig;
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -1147,6 +1148,19 @@ public class StreamsConfig extends AbstractConfig {
atLeast(0),
Importance.LOW,
CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC)
+ .define(CommonClientConfigs.METADATA_RECOVERY_STRATEGY_CONFIG,
+ Type.STRING,
+ CommonClientConfigs.DEFAULT_METADATA_RECOVERY_STRATEGY,
+ ConfigDef.CaseInsensitiveValidString
+ .in(Utils.enumOptions(MetadataRecoveryStrategy.class)),
+ Importance.LOW,
+ CommonClientConfigs.METADATA_RECOVERY_STRATEGY_DOC)
+
.define(CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG,
+ Type.LONG,
+
CommonClientConfigs.DEFAULT_METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS,
+ atLeast(0),
+ Importance.LOW,
+
CommonClientConfigs.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_DOC)
.define(ROCKSDB_CONFIG_SETTER_CLASS_CONFIG,
Type.CLASS,
null,