Repository: kafka Updated Branches: refs/heads/trunk 44f6c4b94 -> 0785feeb0
KAFKA-2459: Mark last committed timestamp to fix connection backoff This fix applies to three JIRAs, since they are all connected. KAFKA-2459Connection backoff/blackout period should start when a connection is disconnected, not when the connection attempt was initiated Backoff when connection is disconnected KAFKA-2615Poll() method is broken wrt time Added Time through the NetworkClient API. Minimal change. KAFKA-1843Metadata fetch/refresh in new producer should handle all node connection states gracefully Iâve partially addressed this for a specific failure case in the JIRA. Author: Eno Thereska <[email protected]> Reviewers: Ewen Cheslack-Postava, Jason Gustafson, Ismael Juma, Guozhang Wang Closes #290 from enothereska/trunk Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0785feeb Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0785feeb Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0785feeb Branch: refs/heads/trunk Commit: 0785feeb0fae2a4f75a59147197c5138109b1b39 Parents: 44f6c4b Author: Eno Thereska <[email protected]> Authored: Wed Oct 21 10:04:49 2015 -0700 Committer: Guozhang Wang <[email protected]> Committed: Wed Oct 21 10:04:49 2015 -0700 ---------------------------------------------------------------------- .../kafka/clients/ClusterConnectionStates.java | 4 +- .../org/apache/kafka/clients/NetworkClient.java | 88 ++++++++++++++++---- .../kafka/clients/consumer/KafkaConsumer.java | 2 +- .../internals/ConsumerNetworkClient.java | 13 +-- .../kafka/clients/producer/KafkaProducer.java | 2 +- .../apache/kafka/clients/NetworkClientTest.java | 33 +++++++- .../org/apache/kafka/test/MockSelector.java | 6 ++ .../controller/ControllerChannelManager.scala | 3 +- .../main/scala/kafka/server/KafkaServer.scala | 3 +- .../kafka/server/ReplicaFetcherThread.scala | 3 +- 10 files changed, 129 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/0785feeb/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java index 6c58211..a8101da 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java @@ -115,10 +115,12 @@ final class ClusterConnectionStates { /** * Enter the disconnected state for the given node * @param id The connection we have disconnected + * @param now The current time */ - public void disconnected(String id) { + public void disconnected(String id, long now) { NodeConnectionState nodeState = nodeState(id); nodeState.state = ConnectionState.DISCONNECTED; + nodeState.lastConnectAttemptMs = now; } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/0785feeb/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 6f39ac9..4265004 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -15,8 +15,10 @@ package org.apache.kafka.clients; import java.io.IOException; import java.net.InetSocketAddress; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.Set; @@ -33,6 +35,7 @@ import org.apache.kafka.common.requests.MetadataResponse; import org.apache.kafka.common.requests.RequestHeader; import org.apache.kafka.common.requests.RequestSend; import org.apache.kafka.common.requests.ResponseHeader; +import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,7 +54,13 @@ public class NetworkClient implements KafkaClient { private final Selectable selector; private final MetadataUpdater metadataUpdater; - + + /* a list of nodes we've connected to in the past */ + private final List<Integer> nodesEverSeen; + private final Map<Integer, Node> nodesEverSeenById; + /* random offset into nodesEverSeen list */ + private final Random randOffset; + /* the state of each node's connection */ private final ClusterConnectionStates connectionStates; @@ -75,6 +84,8 @@ public class NetworkClient implements KafkaClient { /* max time in ms for the producer to wait for acknowledgement from server*/ private final int requestTimeoutMs; + + private final Time time; public NetworkClient(Selectable selector, Metadata metadata, @@ -83,9 +94,10 @@ public class NetworkClient implements KafkaClient { long reconnectBackoffMs, int socketSendBuffer, int socketReceiveBuffer, - int requestTimeoutMs) { + int requestTimeoutMs, + Time time) { this(null, metadata, selector, clientId, maxInFlightRequestsPerConnection, - reconnectBackoffMs, socketSendBuffer, socketReceiveBuffer, requestTimeoutMs); + reconnectBackoffMs, socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time); } public NetworkClient(Selectable selector, @@ -95,9 +107,10 @@ public class NetworkClient implements KafkaClient { long reconnectBackoffMs, int socketSendBuffer, int socketReceiveBuffer, - int requestTimeoutMs) { + int requestTimeoutMs, + Time time) { this(metadataUpdater, null, selector, clientId, maxInFlightRequestsPerConnection, reconnectBackoffMs, - socketSendBuffer, socketReceiveBuffer, requestTimeoutMs); + socketSendBuffer, socketReceiveBuffer, requestTimeoutMs, time); } private NetworkClient(MetadataUpdater metadataUpdater, @@ -107,7 +120,9 @@ public class NetworkClient implements KafkaClient { int maxInFlightRequestsPerConnection, long reconnectBackoffMs, int socketSendBuffer, - int socketReceiveBuffer, int requestTimeoutMs) { + int socketReceiveBuffer, + int requestTimeoutMs, + Time time) { /* It would be better if we could pass `DefaultMetadataUpdater` from the public constructor, but it's not * possible because `DefaultMetadataUpdater` is an inner class and it can only be instantiated after the @@ -127,8 +142,13 @@ public class NetworkClient implements KafkaClient { this.socketSendBuffer = socketSendBuffer; this.socketReceiveBuffer = socketReceiveBuffer; this.correlation = 0; - this.nodeIndexOffset = new Random().nextInt(Integer.MAX_VALUE); + this.randOffset = new Random(); + this.nodeIndexOffset = this.randOffset.nextInt(Integer.MAX_VALUE); this.requestTimeoutMs = requestTimeoutMs; + this.nodesEverSeen = new ArrayList<>(); + this.nodesEverSeenById = new HashMap<>(); + + this.time = time; } /** @@ -255,6 +275,7 @@ public class NetworkClient implements KafkaClient { @Override public List<ClientResponse> poll(long timeout, long now) { long metadataTimeout = metadataUpdater.maybeUpdate(now); + long updatedNow = now; try { this.selector.poll(Utils.min(timeout, metadataTimeout, requestTimeoutMs)); } catch (IOException e) { @@ -262,12 +283,13 @@ public class NetworkClient implements KafkaClient { } // process completed actions + updatedNow = this.time.milliseconds(); List<ClientResponse> responses = new ArrayList<>(); - handleCompletedSends(responses, now); - handleCompletedReceives(responses, now); - handleDisconnections(responses, now); + handleCompletedSends(responses, updatedNow); + handleCompletedReceives(responses, updatedNow); + handleDisconnections(responses, updatedNow); handleConnections(); - handleTimedOutRequests(responses, now); + handleTimedOutRequests(responses, updatedNow); // invoke callbacks for (ClientResponse response : responses) { @@ -364,6 +386,20 @@ public class NetworkClient implements KafkaClient { found = node; } } + + // if we found no node in the current list, try one from the nodes seen before + if (found == null && nodesEverSeen.size() > 0) { + int offset = randOffset.nextInt(nodesEverSeen.size()); + for (int i = 0; i < nodesEverSeen.size(); i++) { + int idx = Utils.abs((offset + i) % nodesEverSeen.size()); + Node node = nodesEverSeenById.get(nodesEverSeen.get(idx)); + log.debug("No node found. Trying previously-seen node with ID {}", node.id()); + if (!this.connectionStates.isBlackedOut(node.idString(), now)) { + found = node; + } + } + } + return found; } @@ -375,7 +411,7 @@ public class NetworkClient implements KafkaClient { * @param now The current time */ private void processDisconnection(List<ClientResponse> responses, String nodeId, long now) { - connectionStates.disconnected(nodeId); + connectionStates.disconnected(nodeId, now); for (ClientRequest request : this.inFlightRequests.clearAll(nodeId)) { log.trace("Cancelled request {} due to node {} being disconnected", request, nodeId); if (!metadataUpdater.maybeHandleDisconnection(request)) @@ -489,7 +525,7 @@ public class NetworkClient implements KafkaClient { this.socketReceiveBuffer); } catch (IOException e) { /* attempt failed, we'll try again after the backoff */ - connectionStates.disconnected(nodeConnectionId); + connectionStates.disconnected(nodeConnectionId, now); /* maybe the problem is our metadata, update it */ metadataUpdater.requestUpdate(); log.debug("Error connecting to node {} at {}:{}:", node.id(), node.host(), node.port(), e); @@ -532,7 +568,7 @@ public class NetworkClient implements KafkaClient { // if there is no node available to connect, back off refreshing metadata long metadataTimeout = Math.max(Math.max(timeToNextMetadataUpdate, timeToNextReconnectAttempt), waitForMetadataFetch); - + if (metadataTimeout == 0) { // Beware that the behavior of this method and the computation of timeouts for poll() are // highly dependent on the behavior of leastLoadedNode. @@ -570,6 +606,29 @@ public class NetworkClient implements KafkaClient { this.metadata.requestUpdate(); } + /* + * Keep track of any nodes we've ever seen. Add current + * alive nodes to this tracking list. + * @param nodes Current alive nodes + */ + private void updateNodesEverSeen(List<Node> nodes) { + Node existing = null; + for (Node n : nodes) { + existing = nodesEverSeenById.get(n.id()); + if (existing == null) { + nodesEverSeenById.put(n.id(), n); + log.debug("Adding node {} to nodes ever seen", n.id()); + nodesEverSeen.add(n.id()); + } else { + // check if the nodes are really equal. There could be a case + // where node.id() is the same but node has moved to different host + if (!existing.equals(n)) { + nodesEverSeenById.put(n.id(), n); + } + } + } + } + private void handleResponse(RequestHeader header, Struct body, long now) { this.metadataFetchInProgress = false; MetadataResponse response = new MetadataResponse(body); @@ -582,6 +641,7 @@ public class NetworkClient implements KafkaClient { // created which means we will get errors and no nodes until it exists if (cluster.nodes().size() > 0) { this.metadata.update(cluster, now); + this.updateNodesEverSeen(cluster.nodes()); } else { log.trace("Ignoring empty metadata response with correlation id {}.", header.correlationId()); this.metadata.failedUpdate(now); http://git-wip-us.apache.org/repos/asf/kafka/blob/0785feeb/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 24051f2..2f7f153 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -527,7 +527,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { config.getLong(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG), config.getInt(ConsumerConfig.SEND_BUFFER_CONFIG), config.getInt(ConsumerConfig.RECEIVE_BUFFER_CONFIG), - config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG)); + config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG), time); this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs); OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(config.getString(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toUpperCase()); this.subscriptions = new SubscriptionState(offsetResetStrategy); http://git-wip-us.apache.org/repos/asf/kafka/blob/0785feeb/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index 0b611fb..4153eb3 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -175,7 +175,8 @@ public class ConsumerNetworkClient implements Closeable { private void poll(long timeout, long now) { // send all the requests we can send now pollUnsentRequests(now); - + now = time.milliseconds(); + // ensure we don't poll any longer than the deadline for // the next scheduled task timeout = Math.min(timeout, delayedTasks.nextTimeout(now)); @@ -190,7 +191,7 @@ public class ConsumerNetworkClient implements Closeable { pollUnsentRequests(now); // fail all requests that couldn't be sent - clearUnsentRequests(now); + clearUnsentRequests(); } @@ -228,11 +229,13 @@ public class ConsumerNetworkClient implements Closeable { } private void pollUnsentRequests(long now) { - while (trySend(now)) + while (trySend(now)) { clientPoll(0, now); + now = time.milliseconds(); + } } - private void clearUnsentRequests(long now) { + private void clearUnsentRequests() { // clear all unsent requests and fail their corresponding futures for (Map.Entry<Node, List<ClientRequest>> requestEntry: unsent.entrySet()) { Iterator<ClientRequest> iterator = requestEntry.getValue().iterator(); @@ -273,7 +276,7 @@ public class ConsumerNetworkClient implements Closeable { private void clientPoll(long timeout, long now) { client.poll(timeout, now); if (wakeup.get()) { - clearUnsentRequests(now); + clearUnsentRequests(); wakeup.set(false); throw new ConsumerWakeupException(); } http://git-wip-us.apache.org/repos/asf/kafka/blob/0785feeb/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 44280e0..ff3bfe6 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -276,7 +276,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { config.getLong(ProducerConfig.RECONNECT_BACKOFF_MS_CONFIG), config.getInt(ProducerConfig.SEND_BUFFER_CONFIG), config.getInt(ProducerConfig.RECEIVE_BUFFER_CONFIG), - this.requestTimeoutMs); + this.requestTimeoutMs, time); this.sender = new Sender(client, this.metadata, this.accumulator, http://git-wip-us.apache.org/repos/asf/kafka/blob/0785feeb/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java index 2379896..12136d8 100644 --- a/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java @@ -52,10 +52,12 @@ public class NetworkClientTest { private int nodeId = 1; private Cluster cluster = TestUtils.singletonCluster("test", nodeId); private Node node = cluster.nodes().get(0); - private NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024, requestTimeoutMs); - + private long reconnectBackoffMsTest = 10 * 1000; + private NetworkClient client = new NetworkClient(selector, metadata, "mock", Integer.MAX_VALUE, reconnectBackoffMsTest, + 64 * 1024, 64 * 1024, requestTimeoutMs, time); + private NetworkClient clientWithStaticNodes = new NetworkClient(selector, new ManualMetadataUpdater(Arrays.asList(node)), - "mock-static", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024, requestTimeoutMs); + "mock-static", Integer.MAX_VALUE, 0, 64 * 1024, 64 * 1024, requestTimeoutMs, time); @Before public void setup() { @@ -149,6 +151,31 @@ public class NetworkClientTest { assertEquals(node.idString(), disconnectedNode); } + @Test + public void testLeastLoadedNode() { + Node leastNode = null; + client.ready(node, time.milliseconds()); + awaitReady(client, node); + client.poll(1, time.milliseconds()); + assertTrue("The client should be ready", client.isReady(node, time.milliseconds())); + + // leastloadednode should be our single node + leastNode = client.leastLoadedNode(time.milliseconds()); + assertEquals("There should be one leastloadednode", leastNode.id(), node.id()); + + // sleep for longer than reconnect backoff + time.sleep(reconnectBackoffMsTest); + + // CLOSE node + selector.close(node.idString()); + + client.poll(1, time.milliseconds()); + assertFalse("After we forced the disconnection the client is no longer ready.", client.ready(node, time.milliseconds())); + leastNode = client.leastLoadedNode(time.milliseconds()); + assertEquals("There should be NO leastloadednode", leastNode, null); + + } + private static class TestCallbackHandler implements RequestCompletionHandler { public boolean executed = false; public ClientResponse response; http://git-wip-us.apache.org/repos/asf/kafka/blob/0785feeb/clients/src/test/java/org/apache/kafka/test/MockSelector.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/MockSelector.java b/clients/src/test/java/org/apache/kafka/test/MockSelector.java index 5a5f963..b39ff7e 100644 --- a/clients/src/test/java/org/apache/kafka/test/MockSelector.java +++ b/clients/src/test/java/org/apache/kafka/test/MockSelector.java @@ -55,6 +55,12 @@ public class MockSelector implements Selectable { @Override public void close(String id) { this.disconnected.add(id); + for (int i = 0; i < this.connected.size(); i++) { + if (this.connected.get(i).equals(id)) { + this.connected.remove(i); + break; + } + } } public void clear() { http://git-wip-us.apache.org/repos/asf/kafka/blob/0785feeb/core/src/main/scala/kafka/controller/ControllerChannelManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala index 3756822..d86c8ce 100755 --- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala +++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala @@ -106,7 +106,8 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf 0, Selectable.USE_DEFAULT_BUFFER_SIZE, Selectable.USE_DEFAULT_BUFFER_SIZE, - config.requestTimeoutMs + config.requestTimeoutMs, + time ) } val threadName = threadNamePrefix match { http://git-wip-us.apache.org/repos/asf/kafka/blob/0785feeb/core/src/main/scala/kafka/server/KafkaServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala index 510957b..beea83a 100755 --- a/core/src/main/scala/kafka/server/KafkaServer.scala +++ b/core/src/main/scala/kafka/server/KafkaServer.scala @@ -317,7 +317,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = SystemTime, threadNamePr 0, Selectable.USE_DEFAULT_BUFFER_SIZE, Selectable.USE_DEFAULT_BUFFER_SIZE, - config.requestTimeoutMs) + config.requestTimeoutMs, + kafkaMetricsTime) } var shutdownSucceeded: Boolean = false http://git-wip-us.apache.org/repos/asf/kafka/blob/0785feeb/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala index 5993bbb..4affd89 100644 --- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala +++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala @@ -84,7 +84,8 @@ class ReplicaFetcherThread(name: String, 0, Selectable.USE_DEFAULT_BUFFER_SIZE, brokerConfig.replicaSocketReceiveBufferBytes, - brokerConfig.requestTimeoutMs + brokerConfig.requestTimeoutMs, + time ) }
