This is an automated email from the ASF dual-hosted git repository. rsivaram pushed a commit to branch KAFKA-18085-rebootstrap in repository https://gitbox.apache.org/repos/asf/kafka.git
commit 7d3dd08d8e450f19f5cc96e644e56e01af938e38 Author: rajinisivaram <[email protected]> AuthorDate: Mon Nov 25 11:30:57 2024 +0000 KAFKA-18085: Abort inflight requests on existing connections while rebootstrapping --- .../java/org/apache/kafka/clients/KafkaClient.java | 6 --- .../org/apache/kafka/clients/MetadataUpdater.java | 17 +++++++ .../org/apache/kafka/clients/NetworkClient.java | 53 ++++++++++++--------- .../kafka/clients/admin/KafkaAdminClient.java | 13 +----- .../admin/internals/AdminMetadataManager.java | 11 ++++- .../java/org/apache/kafka/clients/MockClient.java | 5 -- .../apache/kafka/clients/NetworkClientTest.java | 54 +++++++++++++++++++++- .../kafka/clients/admin/KafkaAdminClientTest.java | 17 ++++++- 8 files changed, 127 insertions(+), 49 deletions(-) diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java index 78c13985228..46b64986064 100644 --- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java @@ -122,12 +122,6 @@ public interface KafkaClient extends Closeable { */ void close(String nodeId); - /** - * Closes connections to all nodes. All requests on the connections will be cleared. ClientRequest - * callbacks will not be invoked for the cleared requests, nor will they be returned from poll(). - */ - void closeAll(); - /** * Choose the node with the fewest outstanding requests. This method will prefer a node with an existing connection, * but will potentially choose a node for which we don't yet have a connection if all existing connections are in diff --git a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java index 77f3efadce3..825d2e67f70 100644 --- a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java +++ b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java @@ -85,6 +85,23 @@ public interface MetadataUpdater extends Closeable { */ void handleSuccessfulResponse(RequestHeader requestHeader, long now, MetadataResponse metadataResponse); + /** + * Returns true if metadata couldn't be fetched for `rebootstrapTriggerMs` or if server requested rebootstrap. + * + * @param now Current time in milliseconds + * @param rebootstrapTriggerMs Configured timeout after which rebootstrap is triggered + */ + default boolean needsRebootstrap(long now, long rebootstrapTriggerMs) { + return false; + } + + /** + * Performs rebootstrap, replacing the existing cluster with the bootstrap cluster. + * + * @param now Current time in milliseconds + */ + default void rebootstrap(long now) {} + /** * Close this updater. */ diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 044019d6581..839aea3f63e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -452,15 +452,6 @@ public class NetworkClient implements KafkaClient { nodesNeedingApiVersionsFetch.remove(nodeId); } - @Override - public void closeAll() { - log.info("Client requested connection close from all nodes."); - List<Node> nodes = this.metadataUpdater.fetchNodes(); - for (Node node : nodes) { - close(node.idString()); - } - } - /** * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled @@ -666,6 +657,7 @@ public class NetworkClient implements KafkaClient { handleInitiateApiVersionRequests(updatedNow); handleTimedOutConnections(responses, updatedNow); handleTimedOutRequests(responses, updatedNow); + handleRebootstrap(responses, updatedNow); completeResponses(responses); return responses; @@ -1123,6 +1115,20 @@ public class NetworkClient implements KafkaClient { } } + private void handleRebootstrap(List<ClientResponse> responses, long now) { + if (metadataRecoveryStrategy == MetadataRecoveryStrategy.REBOOTSTRAP && metadataUpdater.needsRebootstrap(now, rebootstrapTriggerMs)) { + this.metadataUpdater.fetchNodes().forEach(node -> { + String nodeId = node.idString(); + this.selector.close(nodeId); + if (connectionStates.isConnecting(nodeId) || connectionStates.isConnected(nodeId)) { + log.info("Disconnecting from node {} due to client rebootstrap.", nodeId); + processDisconnection(responses, nodeId, now, ChannelState.LOCAL_CLOSE); + } + }); + metadataUpdater.rebootstrap(now); + } + } + /** * Initiate a connection to the given node * @param node the node to connect to @@ -1213,13 +1219,8 @@ public class NetworkClient implements KafkaClient { return metadataTimeout; } - if (metadataRecoveryStrategy == MetadataRecoveryStrategy.REBOOTSTRAP) { - if (!metadataAttemptStartMs.isPresent()) - metadataAttemptStartMs = Optional.of(now); - else if (metadataAttemptStartMs.filter(startMs -> now - startMs > rebootstrapTriggerMs).isPresent()) { - rebootstrap(now); - } - } + if (!metadataAttemptStartMs.isPresent()) + metadataAttemptStartMs = Optional.of(now); // Beware that the behavior of this method and the computation of timeouts for poll() are // highly dependent on the behavior of leastLoadedNode. @@ -1296,8 +1297,7 @@ public class NetworkClient implements KafkaClient { if (metadataRecoveryStrategy == MetadataRecoveryStrategy.REBOOTSTRAP && response.topLevelError() == Errors.REBOOTSTRAP_REQUIRED) { log.info("Rebootstrap requested by server."); - metadataAttemptStartMs = Optional.of(0L); // to force rebootstrap - this.metadata.requestUpdate(true); + initiateRebootstrap(); } else if (response.brokers().isEmpty()) { // When talking to the startup phase of a broker, it is possible to receive an empty metadata set, which // we should retry later. @@ -1312,16 +1312,25 @@ public class NetworkClient implements KafkaClient { } @Override - public void close() { - this.metadata.close(); + public boolean needsRebootstrap(long now, long rebootstrapTriggerMs) { + return metadataAttemptStartMs.filter(startMs -> now - startMs > rebootstrapTriggerMs).isPresent(); } - private void rebootstrap(long now) { - closeAll(); + @Override + public void rebootstrap(long now) { metadata.rebootstrap(); metadataAttemptStartMs = Optional.of(now); } + @Override + public void close() { + this.metadata.close(); + } + + private void initiateRebootstrap() { + metadataAttemptStartMs = Optional.of(0L); // to force rebootstrap + } + /** * Add a metadata request to the list of sends if we can make one */ diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index bfa15077c8e..6b2b8e0e538 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -406,7 +406,6 @@ public class KafkaAdminClient extends AdminClient { private final long retryBackoffMs; private final long retryBackoffMaxMs; private final ExponentialBackoff retryBackoff; - private final long rebootstrapTriggerMs; private final MetadataRecoveryStrategy metadataRecoveryStrategy; private final Map<TopicPartition, Integer> partitionLeaderCache; private final AdminFetchMetricsManager adminFetchMetricsManager; @@ -634,7 +633,6 @@ public class KafkaAdminClient extends AdminClient { List<MetricsReporter> reporters = CommonClientConfigs.metricsReporters(this.clientId, config); this.clientTelemetryReporter = clientTelemetryReporter; this.clientTelemetryReporter.ifPresent(reporters::add); - this.rebootstrapTriggerMs = config.getLong(AdminClientConfig.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG); this.metadataRecoveryStrategy = MetadataRecoveryStrategy.forName(config.getString(AdminClientConfig.METADATA_RECOVERY_STRATEGY_CONFIG)); this.partitionLeaderCache = new HashMap<>(); this.adminFetchMetricsManager = new AdminFetchMetricsManager(metrics); @@ -727,14 +725,10 @@ public class KafkaAdminClient extends AdminClient { @Override public Node provide() { long now = time.milliseconds(); - if (metadataRecoveryStrategy == MetadataRecoveryStrategy.REBOOTSTRAP && - metadataManager.needsRebootstrap(now, rebootstrapTriggerMs)) { - rebootstrap(now); - } LeastLoadedNode leastLoadedNode = client.leastLoadedNode(now); if (metadataRecoveryStrategy == MetadataRecoveryStrategy.REBOOTSTRAP && !leastLoadedNode.hasNodeAvailableOrConnectionReady()) { - rebootstrap(now); + metadataManager.rebootstrap(now); } return leastLoadedNode.node(); @@ -744,11 +738,6 @@ public class KafkaAdminClient extends AdminClient { public boolean supportsUseControllers() { return true; } - - private void rebootstrap(long now) { - client.closeAll(); - metadataManager.rebootstrap(now); - } } private class ConstantNodeIdProvider implements NodeProvider { diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java index c1e92ca26fa..9dc2e190d13 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java @@ -139,6 +139,16 @@ public class AdminMetadataManager { // Do nothing } + @Override + public boolean needsRebootstrap(long now, long rebootstrapTriggerMs) { + return AdminMetadataManager.this.needsRebootstrap(now, rebootstrapTriggerMs); + } + + @Override + public void rebootstrap(long now) { + AdminMetadataManager.this.rebootstrap(now); + } + @Override public void close() { } @@ -312,7 +322,6 @@ public class AdminMetadataManager { } public void initiateRebootstrap() { - requestUpdate(); this.metadataAttemptStartMs = Optional.of(0L); } diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index 6386a11a322..8a195184e93 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -588,11 +588,6 @@ public class MockClient implements KafkaClient { connections.remove(node); } - @Override - public void closeAll() { - connections.clear(); - } - @Override public LeastLoadedNode leastLoadedNode(long now) { // Consistent with NetworkClient, we do not return nodes awaiting reconnect backoff diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index 3fd75a261d8..691a3d3a482 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -296,6 +296,56 @@ public class NetworkClientTest { assertEquals(3, rebootstrapCount.get()); } + @Test + public void testInflightRequestsDuringRebootstrap() { + long refreshBackoffMs = 50; + long rebootstrapTriggerMs = 1000; + int defaultRequestTimeoutMs = 5000; + AtomicInteger rebootstrapCount = new AtomicInteger(); + Metadata metadata = new Metadata(refreshBackoffMs, refreshBackoffMs, 5000, new LogContext(), new ClusterResourceListeners()) { + @Override + public synchronized void rebootstrap() { + super.rebootstrap(); + rebootstrapCount.incrementAndGet(); + } + }; + metadata.bootstrap(Collections.singletonList(new InetSocketAddress("localhost", 9999))); + NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, + reconnectBackoffMsTest, 0, 64 * 1024, 64 * 1024, + defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, connectionSetupTimeoutMaxMsTest, time, false, new ApiVersions(), new LogContext(), + rebootstrapTriggerMs, MetadataRecoveryStrategy.REBOOTSTRAP); + + MetadataResponse metadataResponse = RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap()); + metadata.updateWithCurrentRequestVersion(metadataResponse, false, time.milliseconds()); + List<Node> nodes = metadata.fetch().nodes(); + nodes.forEach(node -> { + client.ready(node, time.milliseconds()); + awaitReady(client, node); + }); + + // Queue a request + sendEmptyProduceRequest(client, nodes.get(0).idString()); + List<ClientResponse> responses = client.poll(0, time.milliseconds()); + assertEquals(0, responses.size()); + assertEquals(1, client.inFlightRequestCount()); + + // Trigger rebootstrap + metadata.requestUpdate(true); + time.sleep(refreshBackoffMs); + responses = client.poll(0, time.milliseconds()); + assertEquals(0, responses.size()); + assertEquals(2, client.inFlightRequestCount()); + time.sleep(rebootstrapTriggerMs + 1); + responses = client.poll(0, time.milliseconds()); + + // Verify that inflight produce request was aborted with disconnection + assertEquals(1, responses.size()); + assertEquals(PRODUCE, responses.get(0).requestHeader().apiKey()); + assertTrue(responses.get(0).wasDisconnected()); + assertEquals(0, client.inFlightRequestCount()); + assertEquals(Collections.emptySet(), nodes.stream().filter(node -> !client.connectionFailed(node)).collect(Collectors.toSet())); + } + private void checkSimpleRequestResponse(NetworkClient networkClient) { awaitReady(networkClient, node); // has to be before creating any request, as it may send ApiVersionsRequest and its response is mocked with correlation id 0 short requestVersion = PRODUCE.latestVersion(); @@ -738,10 +788,10 @@ public class NetworkClientTest { } private int sendEmptyProduceRequest() { - return sendEmptyProduceRequest(node.idString()); + return sendEmptyProduceRequest(client, node.idString()); } - private int sendEmptyProduceRequest(String nodeId) { + private int sendEmptyProduceRequest(NetworkClient client, String nodeId) { ProduceRequest.Builder builder = ProduceRequest.forCurrentMagic(new ProduceRequestData() .setTopicData(new ProduceRequestData.TopicProduceDataCollection()) .setAcks((short) 1) diff --git a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java index d713abbcae7..f11a6427ce4 100644 --- a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java @@ -20,6 +20,7 @@ import org.apache.kafka.clients.ClientDnsLookup; import org.apache.kafka.clients.ClientRequest; import org.apache.kafka.clients.ClientUtils; import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.MetadataRecoveryStrategy; import org.apache.kafka.clients.MockClient; import org.apache.kafka.clients.NodeApiVersions; import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults; @@ -882,18 +883,32 @@ public class KafkaAdminClientTest { @Test public void testUnreachableBootstrapServer() throws Exception { + verifyUnreachableBootstrapServer(MetadataRecoveryStrategy.REBOOTSTRAP); + } + + @Test + public void testUnreachableBootstrapServerNoRebootstrap() throws Exception { + verifyUnreachableBootstrapServer(MetadataRecoveryStrategy.NONE); + } + + private void verifyUnreachableBootstrapServer(MetadataRecoveryStrategy metadataRecoveryStrategy) throws Exception { // This tests the scenario in which the bootstrap server is unreachable for a short while, // which prevents AdminClient from being able to send the initial metadata request Cluster cluster = Cluster.bootstrap(singletonList(new InetSocketAddress("localhost", 8121))); Map<Node, Long> unreachableNodes = Collections.singletonMap(cluster.nodes().get(0), 200L); try (final AdminClientUnitTestEnv env = new AdminClientUnitTestEnv(Time.SYSTEM, cluster, - AdminClientUnitTestEnv.clientConfigs(), unreachableNodes)) { + AdminClientUnitTestEnv.clientConfigs(AdminClientConfig.METADATA_RECOVERY_STRATEGY_CONFIG, metadataRecoveryStrategy.name), unreachableNodes)) { Cluster discoveredCluster = mockCluster(3, 0); env.kafkaClient().setNodeApiVersions(NodeApiVersions.create()); env.kafkaClient().prepareResponse(body -> body instanceof MetadataRequest, RequestTestUtils.metadataResponse(discoveredCluster.nodes(), discoveredCluster.clusterResource().clusterId(), 1, Collections.emptyList())); + if (metadataRecoveryStrategy == MetadataRecoveryStrategy.REBOOTSTRAP) { + env.kafkaClient().prepareResponse(body -> body instanceof MetadataRequest, + RequestTestUtils.metadataResponse(discoveredCluster.nodes(), + discoveredCluster.clusterResource().clusterId(), 1, Collections.emptyList())); + } env.kafkaClient().prepareResponse(body -> body instanceof CreateTopicsRequest, prepareCreateTopicsResponse("myTopic", Errors.NONE));
