Repository: kafka Updated Branches: refs/heads/1.0 2a3219413 -> 3d35294b1
KAFKA-6104; Added unit tests for ClusterConnectionStates Author: Soenke Liebau <soenke.lie...@opencore.com> Reviewers: Ted Yu <yuzhih...@gmail.com>, Ismael Juma <ism...@juma.me.uk> Closes #4113 from soenkeliebau/KAFKA-6104 (cherry picked from commit 021f2e7e246268da46be80fd87d76bb9961b93d8) Signed-off-by: Ismael Juma <ism...@juma.me.uk> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3d35294b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3d35294b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3d35294b Branch: refs/heads/1.0 Commit: 3d35294b18e01394ff611d8ca06f0e192504e799 Parents: 2a32194 Author: Soenke Liebau <soenke.lie...@opencore.com> Authored: Mon Oct 23 11:57:45 2017 +0100 Committer: Ismael Juma <ism...@juma.me.uk> Committed: Mon Oct 23 14:15:33 2017 +0100 ---------------------------------------------------------------------- .../clients/ClusterConnectionStatesTest.java | 181 +++++++++++++++++++ 1 file changed, 181 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3d35294b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java new file mode 100644 index 0000000..21b8719 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.clients; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.utils.MockTime; +import org.junit.Before; +import org.junit.Test; + +public class ClusterConnectionStatesTest { + + private final MockTime time = new MockTime(); + private final long reconnectBackoffMs = 10 * 1000; + private final long reconnectBackoffMax = 60 * 1000; + private final double reconnectBackoffJitter = 0.2; + private final String nodeId1 = "1001"; + private final String nodeId2 = "2002"; + + private ClusterConnectionStates connectionStates; + + @Before + public void setup() { + this.connectionStates = new ClusterConnectionStates(reconnectBackoffMs, reconnectBackoffMax); + } + + @Test + public void testClusterConnectionStateChanges() { + assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds())); + + // Start connecting to Node and check state + connectionStates.connecting(nodeId1, time.milliseconds()); + assertEquals(connectionStates.connectionState(nodeId1), ConnectionState.CONNECTING); + assertTrue(connectionStates.isConnecting(nodeId1)); + assertFalse(connectionStates.isReady(nodeId1)); + assertFalse(connectionStates.isBlackedOut(nodeId1, time.milliseconds())); + assertFalse(connectionStates.hasReadyNodes()); + + time.sleep(100); + + // Successful connection + connectionStates.ready(nodeId1); + assertEquals(connectionStates.connectionState(nodeId1), ConnectionState.READY); + assertTrue(connectionStates.isReady(nodeId1)); + assertTrue(connectionStates.hasReadyNodes()); + assertFalse(connectionStates.isConnecting(nodeId1)); + assertFalse(connectionStates.isBlackedOut(nodeId1, time.milliseconds())); + assertEquals(connectionStates.connectionDelay(nodeId1, time.milliseconds()), Long.MAX_VALUE); + + time.sleep(15000); + + // Disconnected from broker + connectionStates.disconnected(nodeId1, time.milliseconds()); + assertEquals(connectionStates.connectionState(nodeId1), ConnectionState.DISCONNECTED); + assertTrue(connectionStates.isDisconnected(nodeId1)); + assertTrue(connectionStates.isBlackedOut(nodeId1, time.milliseconds())); + assertFalse(connectionStates.isConnecting(nodeId1)); + assertFalse(connectionStates.hasReadyNodes()); + assertFalse(connectionStates.canConnect(nodeId1, time.milliseconds())); + + // After disconnecting we expect a backoff value equal to the reconnect.backoff.ms setting (plus minus 20% jitter) + double backoffTolerance = reconnectBackoffMs * reconnectBackoffJitter; + long currentBackoff = connectionStates.connectionDelay(nodeId1, time.milliseconds()); + assertEquals(reconnectBackoffMs, currentBackoff, backoffTolerance); + + time.sleep(currentBackoff + 1); + // after waiting for the current backoff value we should be allowed to connect again + assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds())); + } + + @Test + public void testMultipleNodeConnectionStates() { + // Check initial state, allowed to connect to all nodes, but no nodes shown as ready + assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds())); + assertTrue(connectionStates.canConnect(nodeId2, time.milliseconds())); + assertFalse(connectionStates.hasReadyNodes()); + + // Start connecting one node and check that the pool only shows ready nodes after + // successful connect + connectionStates.connecting(nodeId2, time.milliseconds()); + assertFalse(connectionStates.hasReadyNodes()); + time.sleep(1000); + connectionStates.ready(nodeId2); + assertTrue(connectionStates.hasReadyNodes()); + + // Connect second node and check that both are shown as ready, pool should immediately + // show ready nodes, since node2 is already connected + connectionStates.connecting(nodeId1, time.milliseconds()); + assertTrue(connectionStates.hasReadyNodes()); + time.sleep(1000); + connectionStates.ready(nodeId1); + assertTrue(connectionStates.hasReadyNodes()); + + time.sleep(12000); + + // disconnect nodes and check proper state of pool throughout + connectionStates.disconnected(nodeId2, time.milliseconds()); + assertTrue(connectionStates.hasReadyNodes()); + assertTrue(connectionStates.isBlackedOut(nodeId2, time.milliseconds())); + assertFalse(connectionStates.isBlackedOut(nodeId1, time.milliseconds())); + time.sleep(connectionStates.connectionDelay(nodeId2, time.milliseconds())); + // by the time node1 disconnects node2 should have been unblocked again + connectionStates.disconnected(nodeId1, time.milliseconds() + 1); + assertTrue(connectionStates.isBlackedOut(nodeId1, time.milliseconds())); + assertFalse(connectionStates.isBlackedOut(nodeId2, time.milliseconds())); + assertFalse(connectionStates.hasReadyNodes()); + } + + @Test + public void testAuthorizationFailed() { + // Try connecting + connectionStates.connecting(nodeId1, time.milliseconds()); + + time.sleep(100); + + connectionStates.authenticationFailed(nodeId1, time.milliseconds(), new AuthenticationException("No path to CA for certificate!")); + time.sleep(1000); + assertEquals(connectionStates.connectionState(nodeId1), ConnectionState.AUTHENTICATION_FAILED); + assertTrue(connectionStates.authenticationException(nodeId1) instanceof AuthenticationException); + assertFalse(connectionStates.hasReadyNodes()); + assertFalse(connectionStates.canConnect(nodeId1, time.milliseconds())); + + time.sleep(connectionStates.connectionDelay(nodeId1, time.milliseconds()) + 1); + + assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds())); + } + + + @Test + public void testRemoveNode() { + connectionStates.connecting(nodeId1, time.milliseconds()); + time.sleep(1000); + connectionStates.ready(nodeId1); + time.sleep(10000); + + connectionStates.disconnected(nodeId1, time.milliseconds()); + // Node is disconnected and blocked, removing it from the list should reset all blocks + connectionStates.remove(nodeId1); + assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds())); + assertFalse(connectionStates.isBlackedOut(nodeId1, time.milliseconds())); + assertEquals(connectionStates.connectionDelay(nodeId1, time.milliseconds()), 0L); + } + + @Test + public void testMaxReconnectBackoff() { + long effectiveMaxReconnectBackoff = Math.round(reconnectBackoffMax * (1 + reconnectBackoffJitter)); + connectionStates.connecting(nodeId1, time.milliseconds()); + time.sleep(1000); + connectionStates.disconnected(nodeId1, time.milliseconds()); + + // Do 100 reconnect attempts and check that MaxReconnectBackoff (plus jitter) is not exceeded + for (int i = 0; i < 100; i++) { + long reconnectBackoff = connectionStates.connectionDelay(nodeId1, time.milliseconds()); + assertTrue(reconnectBackoff <= effectiveMaxReconnectBackoff); + assertFalse(connectionStates.canConnect(nodeId1, time.milliseconds())); + time.sleep(reconnectBackoff + 1); + assertTrue(connectionStates.canConnect(nodeId1, time.milliseconds())); + connectionStates.connecting(nodeId1, time.milliseconds()); + time.sleep(10); + connectionStates.disconnected(nodeId1, time.milliseconds()); + } + } +}