This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 0f33b16fdf7 KAFKA-18085: Abort inflight requests on existing 
connections while rebootstrapping (#17939)
0f33b16fdf7 is described below

commit 0f33b16fdf7ddb76b7f13480ded05374af6c2195
Author: Rajini Sivaram <[email protected]>
AuthorDate: Mon Nov 25 17:58:11 2024 +0000

    KAFKA-18085: Abort inflight requests on existing connections while 
rebootstrapping (#17939)
    
    When disconnecting channels before rebootstrapping due to the rebootstrap 
conditions introduced in KIP-1102, we should ensure that inflight requests are 
aborted similar to other disconnections like request timeout in clients. With 
the earlier rebootstrapping from KIP-899, we only rebootstrapped when there 
were no connections, so no disconnections are required.
    
    Reviewers: Manikumar Reddy <[email protected]>
---
 .../java/org/apache/kafka/clients/KafkaClient.java |  6 ---
 .../org/apache/kafka/clients/MetadataUpdater.java  | 17 +++++++
 .../org/apache/kafka/clients/NetworkClient.java    | 53 ++++++++++++---------
 .../kafka/clients/admin/KafkaAdminClient.java      | 13 +-----
 .../admin/internals/AdminMetadataManager.java      | 11 ++++-
 .../java/org/apache/kafka/clients/MockClient.java  |  5 --
 .../apache/kafka/clients/NetworkClientTest.java    | 54 +++++++++++++++++++++-
 .../kafka/clients/admin/KafkaAdminClientTest.java  | 17 ++++++-
 8 files changed, 127 insertions(+), 49 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
index 78c13985228..46b64986064 100644
--- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java
@@ -122,12 +122,6 @@ public interface KafkaClient extends Closeable {
      */
     void close(String nodeId);
 
-    /**
-     * Closes connections to all nodes. All requests on the connections will 
be cleared.  ClientRequest
-     * callbacks will not be invoked for the cleared requests, nor will they 
be returned from poll().
-     */
-    void closeAll();
-
     /**
      * Choose the node with the fewest outstanding requests. This method will 
prefer a node with an existing connection,
      * but will potentially choose a node for which we don't yet have a 
connection if all existing connections are in
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java 
b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
index 77f3efadce3..825d2e67f70 100644
--- a/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
+++ b/clients/src/main/java/org/apache/kafka/clients/MetadataUpdater.java
@@ -85,6 +85,23 @@ public interface MetadataUpdater extends Closeable {
      */
     void handleSuccessfulResponse(RequestHeader requestHeader, long now, 
MetadataResponse metadataResponse);
 
+    /**
+     * Returns true if metadata couldn't be fetched for `rebootstrapTriggerMs` 
or if server requested rebootstrap.
+     *
+     * @param now Current time in milliseconds
+     * @param rebootstrapTriggerMs Configured timeout after which rebootstrap 
is triggered
+     */
+    default boolean needsRebootstrap(long now, long rebootstrapTriggerMs) {
+        return false;
+    }
+
+    /**
+     * Performs rebootstrap, replacing the existing cluster with the bootstrap 
cluster.
+     *
+     * @param now Current time in milliseconds
+     */
+    default void rebootstrap(long now) {}
+
     /**
      * Close this updater.
      */
diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
index 044019d6581..839aea3f63e 100644
--- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
@@ -452,15 +452,6 @@ public class NetworkClient implements KafkaClient {
         nodesNeedingApiVersionsFetch.remove(nodeId);
     }
 
-    @Override
-    public void closeAll() {
-        log.info("Client requested connection close from all nodes.");
-        List<Node> nodes = this.metadataUpdater.fetchNodes();
-        for (Node node : nodes) {
-            close(node.idString());
-        }
-    }
-
     /**
      * Returns the number of milliseconds to wait, based on the connection 
state, before attempting to send data. When
      * disconnected, this respects the reconnect backoff time. When connecting 
or connected, this handles slow/stalled
@@ -666,6 +657,7 @@ public class NetworkClient implements KafkaClient {
         handleInitiateApiVersionRequests(updatedNow);
         handleTimedOutConnections(responses, updatedNow);
         handleTimedOutRequests(responses, updatedNow);
+        handleRebootstrap(responses, updatedNow);
         completeResponses(responses);
 
         return responses;
@@ -1123,6 +1115,20 @@ public class NetworkClient implements KafkaClient {
         }
     }
 
+    private void handleRebootstrap(List<ClientResponse> responses, long now) {
+        if (metadataRecoveryStrategy == MetadataRecoveryStrategy.REBOOTSTRAP 
&& metadataUpdater.needsRebootstrap(now, rebootstrapTriggerMs)) {
+            this.metadataUpdater.fetchNodes().forEach(node -> {
+                String nodeId = node.idString();
+                this.selector.close(nodeId);
+                if (connectionStates.isConnecting(nodeId) || 
connectionStates.isConnected(nodeId)) {
+                    log.info("Disconnecting from node {} due to client 
rebootstrap.", nodeId);
+                    processDisconnection(responses, nodeId, now, 
ChannelState.LOCAL_CLOSE);
+                }
+            });
+            metadataUpdater.rebootstrap(now);
+        }
+    }
+
     /**
      * Initiate a connection to the given node
      * @param node the node to connect to
@@ -1213,13 +1219,8 @@ public class NetworkClient implements KafkaClient {
                 return metadataTimeout;
             }
 
-            if (metadataRecoveryStrategy == 
MetadataRecoveryStrategy.REBOOTSTRAP) {
-                if (!metadataAttemptStartMs.isPresent())
-                    metadataAttemptStartMs = Optional.of(now);
-                else if (metadataAttemptStartMs.filter(startMs -> now - 
startMs > rebootstrapTriggerMs).isPresent()) {
-                    rebootstrap(now);
-                }
-            }
+            if (!metadataAttemptStartMs.isPresent())
+                metadataAttemptStartMs = Optional.of(now);
 
             // Beware that the behavior of this method and the computation of 
timeouts for poll() are
             // highly dependent on the behavior of leastLoadedNode.
@@ -1296,8 +1297,7 @@ public class NetworkClient implements KafkaClient {
 
             if (metadataRecoveryStrategy == 
MetadataRecoveryStrategy.REBOOTSTRAP && response.topLevelError() == 
Errors.REBOOTSTRAP_REQUIRED) {
                 log.info("Rebootstrap requested by server.");
-                metadataAttemptStartMs = Optional.of(0L); // to force 
rebootstrap
-                this.metadata.requestUpdate(true);
+                initiateRebootstrap();
             } else if (response.brokers().isEmpty()) {
                 // When talking to the startup phase of a broker, it is 
possible to receive an empty metadata set, which
                 // we should retry later.
@@ -1312,16 +1312,25 @@ public class NetworkClient implements KafkaClient {
         }
 
         @Override
-        public void close() {
-            this.metadata.close();
+        public boolean needsRebootstrap(long now, long rebootstrapTriggerMs) {
+            return metadataAttemptStartMs.filter(startMs -> now - startMs > 
rebootstrapTriggerMs).isPresent();
         }
 
-        private void rebootstrap(long now) {
-            closeAll();
+        @Override
+        public void rebootstrap(long now) {
             metadata.rebootstrap();
             metadataAttemptStartMs = Optional.of(now);
         }
 
+        @Override
+        public void close() {
+            this.metadata.close();
+        }
+
+        private void initiateRebootstrap() {
+            metadataAttemptStartMs = Optional.of(0L); // to force rebootstrap
+        }
+
         /**
          * Add a metadata request to the list of sends if we can make one
          */
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java 
b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index 8f5ae2a6b3b..4fdc8a45bc9 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -406,7 +406,6 @@ public class KafkaAdminClient extends AdminClient {
     private final long retryBackoffMs;
     private final long retryBackoffMaxMs;
     private final ExponentialBackoff retryBackoff;
-    private final long rebootstrapTriggerMs;
     private final MetadataRecoveryStrategy metadataRecoveryStrategy;
     private final Map<TopicPartition, Integer> partitionLeaderCache;
     private final AdminFetchMetricsManager adminFetchMetricsManager;
@@ -634,7 +633,6 @@ public class KafkaAdminClient extends AdminClient {
         List<MetricsReporter> reporters = 
CommonClientConfigs.metricsReporters(this.clientId, config);
         this.clientTelemetryReporter = clientTelemetryReporter;
         this.clientTelemetryReporter.ifPresent(reporters::add);
-        this.rebootstrapTriggerMs = 
config.getLong(AdminClientConfig.METADATA_RECOVERY_REBOOTSTRAP_TRIGGER_MS_CONFIG);
         this.metadataRecoveryStrategy = 
MetadataRecoveryStrategy.forName(config.getString(AdminClientConfig.METADATA_RECOVERY_STRATEGY_CONFIG));
         this.partitionLeaderCache = new HashMap<>();
         this.adminFetchMetricsManager = new AdminFetchMetricsManager(metrics);
@@ -727,14 +725,10 @@ public class KafkaAdminClient extends AdminClient {
         @Override
         public Node provide() {
             long now = time.milliseconds();
-            if (metadataRecoveryStrategy == 
MetadataRecoveryStrategy.REBOOTSTRAP &&
-                    metadataManager.needsRebootstrap(now, 
rebootstrapTriggerMs)) {
-                rebootstrap(now);
-            }
             LeastLoadedNode leastLoadedNode = client.leastLoadedNode(now);
             if (metadataRecoveryStrategy == 
MetadataRecoveryStrategy.REBOOTSTRAP
                     && !leastLoadedNode.hasNodeAvailableOrConnectionReady()) {
-                rebootstrap(now);
+                metadataManager.rebootstrap(now);
             }
 
             return leastLoadedNode.node();
@@ -744,11 +738,6 @@ public class KafkaAdminClient extends AdminClient {
         public boolean supportsUseControllers() {
             return true;
         }
-
-        private void rebootstrap(long now) {
-            client.closeAll();
-            metadataManager.rebootstrap(now);
-        }
     }
 
     private class ConstantNodeIdProvider implements NodeProvider {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
index c1e92ca26fa..9dc2e190d13 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/admin/internals/AdminMetadataManager.java
@@ -139,6 +139,16 @@ public class AdminMetadataManager {
             // Do nothing
         }
 
+        @Override
+        public boolean needsRebootstrap(long now, long rebootstrapTriggerMs) {
+            return AdminMetadataManager.this.needsRebootstrap(now, 
rebootstrapTriggerMs);
+        }
+
+        @Override
+        public void rebootstrap(long now) {
+            AdminMetadataManager.this.rebootstrap(now);
+        }
+
         @Override
         public void close() {
         }
@@ -312,7 +322,6 @@ public class AdminMetadataManager {
     }
 
     public void initiateRebootstrap() {
-        requestUpdate();
         this.metadataAttemptStartMs = Optional.of(0L);
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java 
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 6386a11a322..8a195184e93 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -588,11 +588,6 @@ public class MockClient implements KafkaClient {
         connections.remove(node);
     }
 
-    @Override
-    public void closeAll() {
-        connections.clear();
-    }
-
     @Override
     public LeastLoadedNode leastLoadedNode(long now) {
         // Consistent with NetworkClient, we do not return nodes awaiting 
reconnect backoff
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index 3fd75a261d8..691a3d3a482 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -296,6 +296,56 @@ public class NetworkClientTest {
         assertEquals(3, rebootstrapCount.get());
     }
 
+    @Test
+    public void testInflightRequestsDuringRebootstrap() {
+        long refreshBackoffMs = 50;
+        long rebootstrapTriggerMs = 1000;
+        int defaultRequestTimeoutMs = 5000;
+        AtomicInteger rebootstrapCount = new AtomicInteger();
+        Metadata metadata = new Metadata(refreshBackoffMs, refreshBackoffMs, 
5000, new LogContext(), new ClusterResourceListeners())  {
+            @Override
+            public synchronized void rebootstrap() {
+                super.rebootstrap();
+                rebootstrapCount.incrementAndGet();
+            }
+        };
+        metadata.bootstrap(Collections.singletonList(new 
InetSocketAddress("localhost", 9999)));
+        NetworkClient client = new NetworkClient(selector, metadata, "mock", 
Integer.MAX_VALUE,
+                reconnectBackoffMsTest, 0, 64 * 1024, 64 * 1024,
+                defaultRequestTimeoutMs, connectionSetupTimeoutMsTest, 
connectionSetupTimeoutMaxMsTest, time, false, new ApiVersions(), new 
LogContext(),
+                rebootstrapTriggerMs, MetadataRecoveryStrategy.REBOOTSTRAP);
+
+        MetadataResponse metadataResponse = 
RequestTestUtils.metadataUpdateWith(2, Collections.emptyMap());
+        metadata.updateWithCurrentRequestVersion(metadataResponse, false, 
time.milliseconds());
+        List<Node> nodes = metadata.fetch().nodes();
+        nodes.forEach(node -> {
+            client.ready(node, time.milliseconds());
+            awaitReady(client, node);
+        });
+
+        // Queue a request
+        sendEmptyProduceRequest(client, nodes.get(0).idString());
+        List<ClientResponse> responses = client.poll(0, time.milliseconds());
+        assertEquals(0, responses.size());
+        assertEquals(1, client.inFlightRequestCount());
+
+        // Trigger rebootstrap
+        metadata.requestUpdate(true);
+        time.sleep(refreshBackoffMs);
+        responses = client.poll(0, time.milliseconds());
+        assertEquals(0, responses.size());
+        assertEquals(2, client.inFlightRequestCount());
+        time.sleep(rebootstrapTriggerMs + 1);
+        responses = client.poll(0, time.milliseconds());
+
+        // Verify that inflight produce request was aborted with disconnection
+        assertEquals(1, responses.size());
+        assertEquals(PRODUCE, responses.get(0).requestHeader().apiKey());
+        assertTrue(responses.get(0).wasDisconnected());
+        assertEquals(0, client.inFlightRequestCount());
+        assertEquals(Collections.emptySet(), nodes.stream().filter(node -> 
!client.connectionFailed(node)).collect(Collectors.toSet()));
+    }
+
     private void checkSimpleRequestResponse(NetworkClient networkClient) {
         awaitReady(networkClient, node); // has to be before creating any 
request, as it may send ApiVersionsRequest and its response is mocked with 
correlation id 0
         short requestVersion = PRODUCE.latestVersion();
@@ -738,10 +788,10 @@ public class NetworkClientTest {
     }
 
     private int sendEmptyProduceRequest() {
-        return sendEmptyProduceRequest(node.idString());
+        return sendEmptyProduceRequest(client, node.idString());
     }
 
-    private int sendEmptyProduceRequest(String nodeId) {
+    private int sendEmptyProduceRequest(NetworkClient client, String nodeId) {
         ProduceRequest.Builder builder = ProduceRequest.forCurrentMagic(new 
ProduceRequestData()
                 .setTopicData(new 
ProduceRequestData.TopicProduceDataCollection())
                 .setAcks((short) 1)
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
index 2861f7995cb..17e0a20dbf9 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.clients.ClientDnsLookup;
 import org.apache.kafka.clients.ClientRequest;
 import org.apache.kafka.clients.ClientUtils;
 import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.MetadataRecoveryStrategy;
 import org.apache.kafka.clients.MockClient;
 import org.apache.kafka.clients.NodeApiVersions;
 import org.apache.kafka.clients.admin.DeleteAclsResult.FilterResults;
@@ -882,18 +883,32 @@ public class KafkaAdminClientTest {
 
     @Test
     public void testUnreachableBootstrapServer() throws Exception {
+        verifyUnreachableBootstrapServer(MetadataRecoveryStrategy.REBOOTSTRAP);
+    }
+
+    @Test
+    public void testUnreachableBootstrapServerNoRebootstrap() throws Exception 
{
+        verifyUnreachableBootstrapServer(MetadataRecoveryStrategy.NONE);
+    }
+
+    private void verifyUnreachableBootstrapServer(MetadataRecoveryStrategy 
metadataRecoveryStrategy) throws Exception {
         // This tests the scenario in which the bootstrap server is 
unreachable for a short while,
         // which prevents AdminClient from being able to send the initial 
metadata request
 
         Cluster cluster = Cluster.bootstrap(singletonList(new 
InetSocketAddress("localhost", 8121)));
         Map<Node, Long> unreachableNodes = 
Collections.singletonMap(cluster.nodes().get(0), 200L);
         try (final AdminClientUnitTestEnv env = new 
AdminClientUnitTestEnv(Time.SYSTEM, cluster,
-                AdminClientUnitTestEnv.clientConfigs(), unreachableNodes)) {
+                
AdminClientUnitTestEnv.clientConfigs(AdminClientConfig.METADATA_RECOVERY_STRATEGY_CONFIG,
 metadataRecoveryStrategy.name), unreachableNodes)) {
             Cluster discoveredCluster = mockCluster(3, 0);
             env.kafkaClient().setNodeApiVersions(NodeApiVersions.create());
             env.kafkaClient().prepareResponse(body -> body instanceof 
MetadataRequest,
                     
RequestTestUtils.metadataResponse(discoveredCluster.nodes(), 
discoveredCluster.clusterResource().clusterId(),
                             1, Collections.emptyList()));
+            if (metadataRecoveryStrategy == 
MetadataRecoveryStrategy.REBOOTSTRAP) {
+                env.kafkaClient().prepareResponse(body -> body instanceof 
MetadataRequest,
+                        
RequestTestUtils.metadataResponse(discoveredCluster.nodes(),
+                                
discoveredCluster.clusterResource().clusterId(), 1, Collections.emptyList()));
+            }
             env.kafkaClient().prepareResponse(body -> body instanceof 
CreateTopicsRequest,
                 prepareCreateTopicsResponse("myTopic", Errors.NONE));
 

Reply via email to