This is an automated email from the ASF dual-hosted git repository.

rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new eeb1e702eb KAFKA-13879: Reconnect exponential backoff is ineffective 
in some cases (#12131)
eeb1e702eb is described below

commit eeb1e702eb7a43d88f11458f739672e2b7aa4871
Author: chern <[email protected]>
AuthorDate: Tue May 10 03:36:42 2022 -0700

    KAFKA-13879: Reconnect exponential backoff is ineffective in some cases 
(#12131)
    
    When a client connects to a SSL listener using PLAINTEXT security protocol, 
after the TCP connection is setup, the client considers the channel setup is 
complete. In reality the channel setup is not complete yet. The client then 
resets reconnect exponential backoff and issues API version request. Since the 
broker expects SSL handshake, the API version request will cause the connection 
to disconnect. Client reconnects without exponential backoff since it has been 
reset.
    
    This commit removes the reset of reconnect exponential backoff when sending 
API version request. In the good case where the channel setup is complete, 
reconnect exponential backoff will be reset when the node becomes ready, which 
is after getting the API version response. Inter-broker clients which do not 
send API version request and go directly to ready state continue to reset 
backoff before any  successful requests.
    
    Reviewers: Rajini Sivaram <[email protected]>
---
 .../kafka/clients/ClusterConnectionStates.java     |  1 -
 .../kafka/clients/ClusterConnectionStatesTest.java | 38 ++++++++++++++--------
 .../apache/kafka/clients/NetworkClientTest.java    | 31 ++++++++++++++----
 3 files changed, 48 insertions(+), 22 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index 95efdbeae4..f4d9092258 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -246,7 +246,6 @@ final class ClusterConnectionStates {
     public void checkingApiVersions(String id) {
         NodeConnectionState nodeState = nodeState(id);
         nodeState.state = ConnectionState.CHECKING_API_VERSIONS;
-        resetReconnectBackoff(nodeState);
         resetConnectionSetupTimeout(nodeState);
         connectingNodes.remove(id);
     }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
index 72cc123921..96fe89ca11 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
@@ -231,20 +231,8 @@ public class ClusterConnectionStatesTest {
 
     @Test
     public void testExponentialReconnectBackoff() {
-        double reconnectBackoffMaxExp = Math.log(reconnectBackoffMax / 
(double) Math.max(reconnectBackoffMs, 1))
-            / Math.log(reconnectBackoffExpBase);
-
-        // Run through 10 disconnects and check that reconnect backoff value 
is within expected range for every attempt
-        for (int i = 0; i < 10; i++) {
-            connectionStates.connecting(nodeId1, time.milliseconds(), 
"localhost");
-            connectionStates.disconnected(nodeId1, time.milliseconds());
-            // Calculate expected backoff value without jitter
-            long expectedBackoff = 
Math.round(Math.pow(reconnectBackoffExpBase, Math.min(i, 
reconnectBackoffMaxExp))
-                * reconnectBackoffMs);
-            long currentBackoff = connectionStates.connectionDelay(nodeId1, 
time.milliseconds());
-            assertEquals(expectedBackoff, currentBackoff, 
reconnectBackoffJitter * expectedBackoff);
-            time.sleep(connectionStates.connectionDelay(nodeId1, 
time.milliseconds()) + 1);
-        }
+        verifyReconnectExponentialBackoff(false);
+        verifyReconnectExponentialBackoff(true);
     }
 
     @Test
@@ -426,4 +414,26 @@ public class ClusterConnectionStatesTest {
         this.connectionStates = new 
ClusterConnectionStates(reconnectBackoffMs, reconnectBackoffMax,
                 connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, new 
LogContext(), this.multipleIPHostResolver);
     }
+
+    private void verifyReconnectExponentialBackoff(boolean 
enterCheckingApiVersionState) {
+        double reconnectBackoffMaxExp = Math.log(reconnectBackoffMax / 
(double) Math.max(reconnectBackoffMs, 1))
+            / Math.log(reconnectBackoffExpBase);
+
+        connectionStates.remove(nodeId1);
+        // Run through 10 disconnects and check that reconnect backoff value 
is within expected range for every attempt
+        for (int i = 0; i < 10; i++) {
+            connectionStates.connecting(nodeId1, time.milliseconds(), 
"localhost");
+            if (enterCheckingApiVersionState) {
+                connectionStates.checkingApiVersions(nodeId1);
+            }
+
+            connectionStates.disconnected(nodeId1, time.milliseconds());
+            // Calculate expected backoff value without jitter
+            long expectedBackoff = 
Math.round(Math.pow(reconnectBackoffExpBase, Math.min(i, 
reconnectBackoffMaxExp))
+                * reconnectBackoffMs);
+            long currentBackoff = connectionStates.connectionDelay(nodeId1, 
time.milliseconds());
+            assertEquals(expectedBackoff, currentBackoff, 
reconnectBackoffJitter * expectedBackoff);
+            time.sleep(connectionStates.connectionDelay(nodeId1, 
time.milliseconds()) + 1);
+        }
+    }
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index fe1e9d1920..63b44835f6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -82,6 +82,8 @@ public class NetworkClientTest {
     protected final long reconnectBackoffMaxMsTest = 10 * 10000;
     protected final long connectionSetupTimeoutMsTest = 5 * 1000;
     protected final long connectionSetupTimeoutMaxMsTest = 127 * 1000;
+    private final int reconnectBackoffExpBase = 
ClusterConnectionStates.RECONNECT_BACKOFF_EXP_BASE;
+    private final double reconnectBackoffJitter = 
ClusterConnectionStates.RECONNECT_BACKOFF_JITTER;
     private final TestMetadataUpdater metadataUpdater = new 
TestMetadataUpdater(Collections.singletonList(node));
     private final NetworkClient client = 
createNetworkClient(reconnectBackoffMaxMsTest);
     private final NetworkClient clientWithNoExponentialBackoff = 
createNetworkClient(reconnectBackoffMsTest);
@@ -831,13 +833,28 @@ public class NetworkClientTest {
 
     @Test
     public void testServerDisconnectAfterInternalApiVersionRequest() throws 
Exception {
-        awaitInFlightApiVersionRequest();
-        selector.serverDisconnect(node.idString());
-
-        // The failed ApiVersion request should not be forwarded to upper 
layers
-        List<ClientResponse> responses = client.poll(0, time.milliseconds());
-        assertFalse(client.hasInFlightRequests(node.idString()));
-        assertTrue(responses.isEmpty());
+        final long numIterations = 5;
+        double reconnectBackoffMaxExp = Math.log(reconnectBackoffMaxMsTest / 
(double) Math.max(reconnectBackoffMsTest, 1))
+            / Math.log(reconnectBackoffExpBase);
+        for (int i = 0; i < numIterations; i++) {
+            selector.clear();
+            awaitInFlightApiVersionRequest();
+            selector.serverDisconnect(node.idString());
+
+            // The failed ApiVersion request should not be forwarded to upper 
layers
+            List<ClientResponse> responses = client.poll(0, 
time.milliseconds());
+            assertFalse(client.hasInFlightRequests(node.idString()));
+            assertTrue(responses.isEmpty());
+
+            long expectedBackoff = 
Math.round(Math.pow(reconnectBackoffExpBase, Math.min(i, 
reconnectBackoffMaxExp))
+                * reconnectBackoffMsTest);
+            long delay = client.connectionDelay(node, time.milliseconds());
+            assertEquals(expectedBackoff, delay, reconnectBackoffJitter * 
expectedBackoff);
+            if (i == numIterations - 1) {
+                break;
+            }
+            time.sleep(delay + 1);
+        }
     }
 
     @Test

Reply via email to