This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0f33b16fdf7 KAFKA-18085: Abort inflight requests on existing
connections while rebootstrapping (#17939)
0f33b16fdf7 is described below
commit 0f33b16fdf7ddb76b7f13480ded05374af6c2195
Author: Rajini Sivaram <[email protected]>
AuthorDate: Mon Nov 25 17:58:11 2024 +0000
KAFKA-18085: Abort inflight requests on existing connections while
rebootstrapping (#17939)
When disconnecting channels before rebootstrapping due to the rebootstrap
conditions introduced in KIP-1102, we should ensure that inflight requests are
aborted similar to other disconnections like request timeout in clients. With
the earlier rebootstrapping from KIP-899, we only rebootstrapped when there
were no connections, so no disconnections are required.
Reviewers: Manikumar Reddy <[email protected]>
---
.../java/org/apache/kafka/clients/KafkaClient.java | 6 ---
.../org/apache/kafka/clients/MetadataUpdater.java | 17 +++++++
.../org/apache/kafka/clients/NetworkClient.java | 53 ++++++++++++---------
.../kafka/clients/admin/KafkaAdminClient.java | 13 +-----
.../admin/internals/AdminMetadataManager.java | 11 ++++-
.../java/org/apache/kafka/clients/MockClient.java | 5 --
.../apache/kafka/clients/NetworkClientTest.java | 54 +++++++++++++++++++++-
.../kafka/clients/admin/KafkaAdminClientTest.java | 17 ++++++-
8 files changed, 127 insertions(+), 49 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 78c13985228..46b64986064 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -122,12 +122,6 @@ public interface KafkaClient extends Closeable {
*/
void close(String nodeId);
- /**
- * Closes connections to all nodes. All requests on the connections will
be cleared. ClientRequest
- * callbacks will not be invoked for the cleared requests, nor will they
be returned from poll().
- */
- void closeAll();
-
/**
* Choose the node with the fewest outstanding requests. This method will
prefer a node with an existing connection,
* but will potentially choose a node for which we don't yet have a
connection if all existing connections are in
diff --git
a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
index 77f3efadce3..825d2e67f70 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
@@ -85,6 +85,23 @@ public interface MetadataUpdater extends Closeable {
*/
void handleSuccessfulResponse(RequestHeader requestHeader, long now,
MetadataResponse metadataResponse);
+ /**
+ * Returns true if metadata couldn't be fetched for `rebootstrapTriggerMs`
or if server requested rebootstrap.
+ *
+ * @param now Current time in milliseconds
+ * @param rebootstrapTriggerMs Configured timeout after which rebootstrap
is triggered
+ */
+ default boolean needsRebootstrap(long now, long rebootstrapTriggerMs) {
+ return false;
+ }
+
+ /**
+ * Performs rebootstrap, replacing the existing cluster with the bootstrap
cluster.
+ *
+ * @param now Current time in milliseconds
+ */
+ default void rebootstrap(long now) {}
+
/**
* Close this updater.
*/
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 044019d6581..839aea3f63e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -452,15 +452,6 @@ public class NetworkClient implements KafkaClient {
nodesNeedingApiVersionsFetch.remove(nodeId);
}
- @Override
- public void closeAll() {
- log.info("Client requested connection close from all nodes.");
- List<Node> nodes = this.metadataUpdater.fetchNodes();
- for (Node node : nodes) {
- close(node.idString());
- }
- }
-
/**
* Returns the number of milliseconds to wait, based on the connection
state, before attempting to send data. When
* disconnected, this respects the reconnect backoff time. When connecting
or connected, this handles slow/stalled
@@ -666,6 +657,7 @@ public class NetworkClient implements KafkaClient {
handleInitiateApiVersionRequests(updatedNow);
handleTimedOutConnections(responses, updatedNow);
handleTimedOutRequests(responses, updatedNow);
+ handleRebootstrap(responses, updatedNow);
completeResponses(responses);
return responses;
@@ -1123,6 +1115,20 @@ public class NetworkClient implements KafkaClient {
}
}
+ private void handleRebootstrap(List<ClientResponse> responses, long now) {
+ if (metadataRecoveryStrategy == MetadataRecoveryStrategy.REBOOTSTRAP
&& metadataUpdater.needsRebootstrap(now, rebootstrapTriggerMs)) {
+ this.metadataUpdater.fetchNodes().forEach(node -> {
+ String nodeId = node.idString();
+ this.selector.close(nodeId);
+ if (connectionStates.isConnecting(nodeId) ||
connectionStates.isConnected(nodeId)) {
+ log.info("Disconnecting from node {} due to client
rebootstrap.", nodeId);
+ processDisconnection(responses, nodeId, now,
ChannelState.LOCAL_CLOSE);
+ }
+ });
+ metadataUpdater.rebootstrap(now);
+ }
+ }
+
/**
* Initiate a connection to the given node
* @param node the node to connect to
@@ -1213,13 +1219,8 @@ public class NetworkClient implements KafkaClient {
return metadataTimeout;
}
- if (metadataRecoveryStrategy ==
MetadataRecoveryStrategy.REBOOTSTRAP) {
- if (!metadataAttemptStartMs.isPresent())
- metadataAttemptStartMs = Optional.of(now);
- else if (metadataAttemptStartMs.filter(startMs -> now -
startMs > rebootstrapTriggerMs).isPresent()) {
- rebootstrap(now);
- }
- }
+ if (!metadataAttemptStartMs.isPresent())
+ metadataAttemptStartMs = Optional.of(now);
// Beware that the behavior of this method and the computation of
timeouts for poll() are
// highly dependent on the behavior of leastLoadedNode.
@@ -1296,8 +1297,7 @@ public class NetworkClient implements KafkaClient {
if (metadataRecoveryStrategy ==
MetadataRecoveryStrategy.REBOOTSTRAP && response.topLevelError() ==
Errors.REBOOTSTRAP_REQUIRED) {
log.info("Rebootstrap requested by server.");
- metadataAttemptStartMs = Optional.of(0L); // to force
rebootstrap
- this.metadata.requestUpdate(true);
+ initiateRebootstrap();
} else if (response.brokers().isEmpty()) {
// When talking to the startup phase of a broker, it is
possible to receive an empty metadata set, which
// we should retry later.
@@ -1312,16 +1312,25 @@ public class NetworkClient implements KafkaClient {
}
@Override
- public void close() {
- this.metadata.close();
+ public boolean needsRebootstrap(long now, long rebootstrapTriggerMs) {
+ return metadataAttemptStartMs.filter(startMs -> now - startMs >
rebootstrapTriggerMs).isPresent();
}
- private void rebootstrap(long now) {
- closeAll();
+ @Override
+ public void rebootstrap(long now) {
metadata.rebootstrap();
metadataAttemptStartMs = Optional.of(now);
}
+ @Override
+ public void close() {
+ this.metadata.close();
+ }
+
+ private void initiateRebootstrap() {
+ metadataAttemptStartMs = Optional.of(0L); // to force rebootstrap
+ }
+
/**
* Add a metadata request to the list of sends if we can make one
*/
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 8f5ae2a6b3b..4fdc8a45bc9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -406,7 +406,6 @@ public class KafkaAdminClient extends AdminClient {
private final long retryBackoffMs;
private final long retryBackoffMaxMs;
private final ExponentialBackoff retryBackoff;
- private final long rebootstrapTriggerMs;
private final MetadataRecoveryStrategy metadataRecoveryStrategy;
private final Map<TopicPartition, Integer> partitionLeaderCache;
private final AdminFetchMetricsManager adminFetchMetricsManager;
@@ -634,7 +633,6 @@ public class KafkaAdminClient extends AdminClient {
List<MetricsReporter> reporters =
CommonClientConfigs.metricsReporters(this.clientId, config);
this.clientTelemetryReporter = clientTelemetryReporter;
this.clientTelemetryReporter.ifPresent(reporters::add);
- this.rebootstrapTriggerMs =
config.getLong(AdminClientConfig.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG);
this.metadataRecoveryStrategy =
MetadataRecoveryStrategy.forName(config.getString(AdminClientConfig.METADATA_RECOVERY_STRATEGY_CONFIG));
this.partitionLeaderCache = new HashMap<>();
this.adminFetchMetricsManager = new AdminFetchMetricsManager(metrics);
@@ -727,14 +725,10 @@ public class KafkaAdminClient extends AdminClient {
@Override
public Node provide() {
long now = time.milliseconds();
- if (metadataRecoveryStrategy ==
MetadataRecoveryStrategy.REBOOTSTRAP &&
- metadataManager.needsRebootstrap(now,
rebootstrapTriggerMs)) {
- rebootstrap(now);
- }
LeastLoadedNode leastLoadedNode = client.leastLoadedNode(now);
if (metadataRecoveryStrategy ==
MetadataRecoveryStrategy.REBOOTSTRAP
&& !leastLoadedNode.hasNodeAvailableOrConnectionReady()) {
- rebootstrap(now);
+ metadataManager.rebootstrap(now);
}
return leastLoadedNode.node();
@@ -744,11 +738,6 @@ public class KafkaAdminClient extends AdminClient {
public boolean supportsUseControllers() {
return true;
}
-
- private void rebootstrap(long now) {
- client.closeAll();
- metadataManager.rebootstrap(now);
- }
}
private class ConstantNodeIdProvider implements NodeProvider {
diff --git
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
index c1e92ca26fa..9dc2e190d13 100644
---
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
+++
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
@@ -139,6 +139,16 @@ public class AdminMetadataManager {
// Do nothing
}
+ @Override
+ public boolean needsRebootstrap(long now, long rebootstrapTriggerMs) {
+ return AdminMetadataManager.this.needsRebootstrap(now,
rebootstrapTriggerMs);
+ }
+
+ @Override
+ public void rebootstrap(long now) {
+ AdminMetadataManager.this.rebootstrap(now);
+ }
+
@Override
public void close() {
}
@@ -312,7 +322,6 @@ public class AdminMetadataManager {
}
public void initiateRebootstrap() {
- requestUpdate();
this.metadataAttemptStartMs = Optional.of(0L);
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 6386a11a322..8a195184e93 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -588,11 +588,6 @@ public class MockClient implements KafkaClient {
connections.remove(node);
}
- @Override
- public void closeAll() {
- connections.clear();
- }
-
@Override
public LeastLoadedNode leastLoadedNode(long now) {
// Consistent with NetworkClient, we do not return nodes awaiting
reconnect backoff
diff --git
a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 3fd75a261d8..691a3d3a482 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -296,6 +296,56 @@ public class NetworkClientTest {
assertEquals(3, rebootstrapCount.get());
}
+ @Test
+ public void testInflightRequestsDuringRebootstrap() {
+ long refreshBackoffMs = 50;
+ long rebootstrapTriggerMs = 1000;
+ int defaultRequestTimeoutMs = 5000;
+ AtomicInteger rebootstrapCount = new AtomicInteger();
+ Metadata metadata = new Metadata(refreshBackoffMs, refreshBackoffMs,
5000, new LogContext(), new ClusterResourceListeners()) {
+ @Override
+ public synchronized void rebootstrap() {
+ super.rebootstrap();
+ rebootstrapCount.incrementAndGet();
+ }
+ };
+ metadata.bootstrap(Collections.singletonList(new
InetSocketAddress("localhost", 9999)));
+ NetworkClient client = new NetworkClient(selector, metadata, "mock",
Integer.MAX_VALUE,
+ reconnectBackoffMsTest, 0, 64 * 1024, 64 * 1024,
+ defaultRequestTimeoutMs, connectionSetupTimeoutMsTest,
connectionSetupTimeoutMaxMsTest, time, false, new ApiVersions(), new
LogContext(),
+ rebootstrapTriggerMs, MetadataRecoveryStrategy.REBOOTSTRAP);
+
+ MetadataResponse metadataResponse =
RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap());
+ metadata.updateWithCurrentRequestVersion(metadataResponse, false,
time.milliseconds());
+ List<Node> nodes = metadata.fetch().nodes();
+ nodes.forEach(node -> {
+ client.ready(node, time.milliseconds());
+ awaitReady(client, node);
+ });
+
+ // Queue a request
+ sendEmptyProduceRequest(client, nodes.get(0).idString());
+ List<ClientResponse> responses = client.poll(0, time.milliseconds());
+ assertEquals(0, responses.size());
+ assertEquals(1, client.inFlightRequestCount());
+
+ // Trigger rebootstrap
+ metadata.requestUpdate(true);
+ time.sleep(refreshBackoffMs);
+ responses = client.poll(0, time.milliseconds());
+ assertEquals(0, responses.size());
+ assertEquals(2, client.inFlightRequestCount());
+ time.sleep(rebootstrapTriggerMs + 1);
+ responses = client.poll(0, time.milliseconds());
+
+ // Verify that inflight produce request was aborted with disconnection
+ assertEquals(1, responses.size());
+ assertEquals(PRODUCE, responses.get(0).requestHeader().apiKey());
+ assertTrue(responses.get(0).wasDisconnected());
+ assertEquals(0, client.inFlightRequestCount());
+ assertEquals(Collections.emptySet(), nodes.stream().filter(node ->
!client.connectionFailed(node)).collect(Collectors.toSet()));
+ }
+
private void checkSimpleRequestResponse(NetworkClient networkClient) {
awaitReady(networkClient, node); // has to be before creating any
request, as it may send ApiVersionsRequest and its response is mocked with
correlation id 0
short requestVersion = PRODUCE.latestVersion();
@@ -738,10 +788,10 @@ public class NetworkClientTest {
}
private int sendEmptyProduceRequest() {
- return sendEmptyProduceRequest(node.idString());
+ return sendEmptyProduceRequest(client, node.idString());
}
- private int sendEmptyProduceRequest(String nodeId) {
+ private int sendEmptyProduceRequest(NetworkClient client, String nodeId) {
ProduceRequest.Builder builder = ProduceRequest.forCurrentMagic(new
ProduceRequestData()
.setTopicData(new
ProduceRequestData.TopicProduceDataCollection())
.setAcks((short) 1)
diff --git
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 2861f7995cb..17e0a20dbf9 100644
---
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientRequest;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
@@ -882,18 +883,32 @@ public class KafkaAdminClientTest {
@Test
public void testUnreachableBootstrapServer() throws Exception {
+ verifyUnreachableBootstrapServer(MetadataRecoveryStrategy.REBOOTSTRAP);
+ }
+
+ @Test
+ public void testUnreachableBootstrapServerNoRebootstrap() throws Exception
{
+ verifyUnreachableBootstrapServer(MetadataRecoveryStrategy.NONE);
+ }
+
+ private void verifyUnreachableBootstrapServer(MetadataRecoveryStrategy
metadataRecoveryStrategy) throws Exception {
// This tests the scenario in which the bootstrap server is
unreachable for a short while,
// which prevents AdminClient from being able to send the initial
metadata request
Cluster cluster = Cluster.bootstrap(singletonList(new
InetSocketAddress("localhost", 8121)));
Map<Node, Long> unreachableNodes =
Collections.singletonMap(cluster.nodes().get(0), 200L);
try (final AdminClientUnitTestEnv env = new
AdminClientUnitTestEnv(Time.SYSTEM, cluster,
- AdminClientUnitTestEnv.clientConfigs(), unreachableNodes)) {
+
AdminClientUnitTestEnv.clientConfigs(AdminClientConfig.METADATA_RECOVERY_STRATEGY_CONFIG,
metadataRecoveryStrategy.name), unreachableNodes)) {
Cluster discoveredCluster = mockCluster(3, 0);
env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
env.kafkaClient().prepareResponse(body -> body instanceof
MetadataRequest,
RequestTestUtils.metadataResponse(discoveredCluster.nodes(),
discoveredCluster.clusterResource().clusterId(),
1, Collections.emptyList()));
+ if (metadataRecoveryStrategy ==
MetadataRecoveryStrategy.REBOOTSTRAP) {
+ env.kafkaClient().prepareResponse(body -> body instanceof
MetadataRequest,
+
RequestTestUtils.metadataResponse(discoveredCluster.nodes(),
+
discoveredCluster.clusterResource().clusterId(), 1, Collections.emptyList()));
+ }
env.kafkaClient().prepareResponse(body -> body instanceof
CreateTopicsRequest,
prepareCreateTopicsResponse("myTopic", Errors.NONE));