This is an automated email from the ASF dual-hosted git repository.
rsivaram pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new eeb1e702eb KAFKA-13879: Reconnect exponential backoff is ineffective
in some cases (#12131)
eeb1e702eb is described below
commit eeb1e702eb7a43d88f11458f739672e2b7aa4871
Author: chern <[email protected]>
AuthorDate: Tue May 10 03:36:42 2022 -0700
KAFKA-13879: Reconnect exponential backoff is ineffective in some cases
(#12131)
When a client connects to a SSL listener using PLAINTEXT security protocol,
after the TCP connection is setup, the client considers the channel setup is
complete. In reality the channel setup is not complete yet. The client then
resets reconnect exponential backoff and issues API version request. Since the
broker expects SSL handshake, the API version request will cause the connection
to disconnect. Client reconnects without exponential backoff since it has been
reset.
This commit removes the reset of reconnect exponential backoff when sending
API version request. In the good case where the channel setup is complete,
reconnect exponential backoff will be reset when the node becomes ready, which
is after getting the API version response. Inter-broker clients which do not
send API version request and go directly to ready state continue to reset
backoff before any successful requests.
Reviewers: Rajini Sivaram <[email protected]>
---
.../kafka/clients/ClusterConnectionStates.java | 1 -
.../kafka/clients/ClusterConnectionStatesTest.java | 38 ++++++++++++++--------
.../apache/kafka/clients/NetworkClientTest.java | 31 ++++++++++++++----
3 files changed, 48 insertions(+), 22 deletions(-)
diff --git
a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
index 95efdbeae4..f4d9092258 100644
---
a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
+++
b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
@@ -246,7 +246,6 @@ final class ClusterConnectionStates {
public void checkingApiVersions(String id) {
NodeConnectionState nodeState = nodeState(id);
nodeState.state = ConnectionState.CHECKING_API_VERSIONS;
- resetReconnectBackoff(nodeState);
resetConnectionSetupTimeout(nodeState);
connectingNodes.remove(id);
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
index 72cc123921..96fe89ca11 100644
---
a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
+++
b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
@@ -231,20 +231,8 @@ public class ClusterConnectionStatesTest {
@Test
public void testExponentialReconnectBackoff() {
- double reconnectBackoffMaxExp = Math.log(reconnectBackoffMax /
(double) Math.max(reconnectBackoffMs, 1))
- / Math.log(reconnectBackoffExpBase);
-
- // Run through 10 disconnects and check that reconnect backoff value
is within expected range for every attempt
- for (int i = 0; i < 10; i++) {
- connectionStates.connecting(nodeId1, time.milliseconds(),
"localhost");
- connectionStates.disconnected(nodeId1, time.milliseconds());
- // Calculate expected backoff value without jitter
- long expectedBackoff =
Math.round(Math.pow(reconnectBackoffExpBase, Math.min(i,
reconnectBackoffMaxExp))
- * reconnectBackoffMs);
- long currentBackoff = connectionStates.connectionDelay(nodeId1,
time.milliseconds());
- assertEquals(expectedBackoff, currentBackoff,
reconnectBackoffJitter * expectedBackoff);
- time.sleep(connectionStates.connectionDelay(nodeId1,
time.milliseconds()) + 1);
- }
+ verifyReconnectExponentialBackoff(false);
+ verifyReconnectExponentialBackoff(true);
}
@Test
@@ -426,4 +414,26 @@ public class ClusterConnectionStatesTest {
this.connectionStates = new
ClusterConnectionStates(reconnectBackoffMs, reconnectBackoffMax,
connectionSetupTimeoutMs, connectionSetupTimeoutMaxMs, new
LogContext(), this.multipleIPHostResolver);
}
+
+ private void verifyReconnectExponentialBackoff(boolean
enterCheckingApiVersionState) {
+ double reconnectBackoffMaxExp = Math.log(reconnectBackoffMax /
(double) Math.max(reconnectBackoffMs, 1))
+ / Math.log(reconnectBackoffExpBase);
+
+ connectionStates.remove(nodeId1);
+ // Run through 10 disconnects and check that reconnect backoff value
is within expected range for every attempt
+ for (int i = 0; i < 10; i++) {
+ connectionStates.connecting(nodeId1, time.milliseconds(),
"localhost");
+ if (enterCheckingApiVersionState) {
+ connectionStates.checkingApiVersions(nodeId1);
+ }
+
+ connectionStates.disconnected(nodeId1, time.milliseconds());
+ // Calculate expected backoff value without jitter
+ long expectedBackoff =
Math.round(Math.pow(reconnectBackoffExpBase, Math.min(i,
reconnectBackoffMaxExp))
+ * reconnectBackoffMs);
+ long currentBackoff = connectionStates.connectionDelay(nodeId1,
time.milliseconds());
+ assertEquals(expectedBackoff, currentBackoff,
reconnectBackoffJitter * expectedBackoff);
+ time.sleep(connectionStates.connectionDelay(nodeId1,
time.milliseconds()) + 1);
+ }
+ }
}
diff --git
a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
index fe1e9d1920..63b44835f6 100644
--- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java
@@ -82,6 +82,8 @@ public class NetworkClientTest {
protected final long reconnectBackoffMaxMsTest = 10 * 10000;
protected final long connectionSetupTimeoutMsTest = 5 * 1000;
protected final long connectionSetupTimeoutMaxMsTest = 127 * 1000;
+ private final int reconnectBackoffExpBase =
ClusterConnectionStates.RECONNECT_BACKOFF_EXP_BASE;
+ private final double reconnectBackoffJitter =
ClusterConnectionStates.RECONNECT_BACKOFF_JITTER;
private final TestMetadataUpdater metadataUpdater = new
TestMetadataUpdater(Collections.singletonList(node));
private final NetworkClient client =
createNetworkClient(reconnectBackoffMaxMsTest);
private final NetworkClient clientWithNoExponentialBackoff =
createNetworkClient(reconnectBackoffMsTest);
@@ -831,13 +833,28 @@ public class NetworkClientTest {
@Test
public void testServerDisconnectAfterInternalApiVersionRequest() throws
Exception {
- awaitInFlightApiVersionRequest();
- selector.serverDisconnect(node.idString());
-
- // The failed ApiVersion request should not be forwarded to upper
layers
- List<ClientResponse> responses = client.poll(0, time.milliseconds());
- assertFalse(client.hasInFlightRequests(node.idString()));
- assertTrue(responses.isEmpty());
+ final long numIterations = 5;
+ double reconnectBackoffMaxExp = Math.log(reconnectBackoffMaxMsTest /
(double) Math.max(reconnectBackoffMsTest, 1))
+ / Math.log(reconnectBackoffExpBase);
+ for (int i = 0; i < numIterations; i++) {
+ selector.clear();
+ awaitInFlightApiVersionRequest();
+ selector.serverDisconnect(node.idString());
+
+ // The failed ApiVersion request should not be forwarded to upper
layers
+ List<ClientResponse> responses = client.poll(0,
time.milliseconds());
+ assertFalse(client.hasInFlightRequests(node.idString()));
+ assertTrue(responses.isEmpty());
+
+ long expectedBackoff =
Math.round(Math.pow(reconnectBackoffExpBase, Math.min(i,
reconnectBackoffMaxExp))
+ * reconnectBackoffMsTest);
+ long delay = client.connectionDelay(node, time.milliseconds());
+ assertEquals(expectedBackoff, delay, reconnectBackoffJitter *
expectedBackoff);
+ if (i == numIterations - 1) {
+ break;
+ }
+ time.sleep(delay + 1);
+ }
}
@Test