Repository: kafka Updated Branches: refs/heads/0.8.2 de6d4e4f0 -> 8bd57381c
kafka-1642; [Java New Producer Kafka Trunk] CPU Usage Spike to 100% when network connection is lost; patched by Ewen Cheslack-Postava; reviewed by Guozhang Wang and Jun Rao Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8bd57381 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8bd57381 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8bd57381 Branch: refs/heads/0.8.2 Commit: 8bd57381c4f000565881028582a6a1b2a3a07d4d Parents: de6d4e4 Author: Ewen Cheslack-Postava <[email protected]> Authored: Fri Nov 14 14:53:32 2014 -0800 Committer: Jun Rao <[email protected]> Committed: Fri Nov 14 14:53:32 2014 -0800 ---------------------------------------------------------------------- .../kafka/clients/ClusterConnectionStates.java | 21 +++++++ .../org/apache/kafka/clients/KafkaClient.java | 10 +++ .../org/apache/kafka/clients/NetworkClient.java | 13 ++++ .../producer/internals/RecordAccumulator.java | 18 ++++-- .../clients/producer/internals/Sender.java | 13 +++- .../org/apache/kafka/clients/MockClient.java | 5 ++ .../clients/producer/RecordAccumulatorTest.java | 64 ++++++++++++++++---- 7 files changed, 124 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/8bd57381/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java index d304660..8aece7e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java +++ b/clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java @@ -57,6 +57,27 @@ final class ClusterConnectionStates { } /** + * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When + * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled + * connections. + * @param node The node to check + * @param now The current time in ms + */ + public long connectionDelay(int node, long now) { + NodeConnectionState state = nodeState.get(node); + if (state == null) return 0; + long timeWaited = now - state.lastConnectAttemptMs; + if (state.state == ConnectionState.DISCONNECTED) { + return Math.max(this.reconnectBackoffMs - timeWaited, 0); + } + else { + // When connecting or connected, we should be able to delay indefinitely since other events (connection or + // data acked) will cause a wakeup once data can be sent. + return Long.MAX_VALUE; + } + } + + /** * Enter the connecting state for the given node. * @param node The id of the node we are connecting to * @param now The current time. http://git-wip-us.apache.org/repos/asf/kafka/blob/8bd57381/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java index 29658d4..3976955 100644 --- a/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/KafkaClient.java @@ -41,6 +41,16 @@ public interface KafkaClient { public boolean ready(Node node, long now); /** + * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When + * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled + * connections. + * @param node The node to check + * @param now The current timestamp + * @return The number of milliseconds to wait. + */ + public long connectionDelay(Node node, long now); + + /** * Initiate the sending of the given requests and return any completed responses. Requests can only be sent on ready * connections. * @param requests The requests to send http://git-wip-us.apache.org/repos/asf/kafka/blob/8bd57381/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index eea270a..525b95e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -119,6 +119,19 @@ public class NetworkClient implements KafkaClient { } /** + * Returns the number of milliseconds to wait, based on the connection state, before attempting to send data. When + * disconnected, this respects the reconnect backoff time. When connecting or connected, this handles slow/stalled + * connections. + * @param node The node to check + * @param now The current timestamp + * @return The number of milliseconds to wait. + */ + @Override + public long connectionDelay(Node node, long now) { + return connectionStates.connectionDelay(node.id(), now); + } + + /** * Check if the node with the given id is ready to send more requests. * @param node The given node id * @param now The current time in ms http://git-wip-us.apache.org/repos/asf/kafka/blob/8bd57381/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java index c5d4700..c15485d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java @@ -183,9 +183,9 @@ public final class RecordAccumulator { } /** - * Get a list of nodes whose partitions are ready to be sent, and the time to when any partition will be ready if no - * partitions are ready yet; If the ready nodes list is non-empty, the timeout value will be 0. Also return the flag - * for whether there are any unknown leaders for the accumulated partition batches. + * Get a list of nodes whose partitions are ready to be sent, and the earliest time at which any non-sendable + * partition will be ready; Also return the flag for whether there are any unknown leaders for the accumulated + * partition batches. * <p> * A destination node is ready to send data if ANY one of its partition is not backing off the send and ANY of the * following are true : @@ -219,11 +219,17 @@ public final class RecordAccumulator { long timeToWaitMs = backingOff ? retryBackoffMs : lingerMs; long timeLeftMs = Math.max(timeToWaitMs - waitedTimeMs, 0); boolean full = deque.size() > 1 || batch.records.isFull(); - boolean expired = waitedTimeMs >= lingerMs; + boolean expired = waitedTimeMs >= timeToWaitMs; boolean sendable = full || expired || exhausted || closed; - if (sendable && !backingOff) + if (sendable && !backingOff) { readyNodes.add(leader); - nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); + } + else { + // Note that this results in a conservative estimate since an un-sendable partition may have + // a leader that will later be found to have sendable data. However, this is good enough + // since we'll just wake up and then sleep again for the remaining time. + nextReadyCheckDelayMs = Math.min(timeLeftMs, nextReadyCheckDelayMs); + } } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/8bd57381/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 8ebe7ed..84a7a07 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -152,10 +152,13 @@ public class Sender implements Runnable { // remove any nodes we aren't ready to send to Iterator<Node> iter = result.readyNodes.iterator(); + long notReadyTimeout = Long.MAX_VALUE; while (iter.hasNext()) { Node node = iter.next(); - if (!this.client.ready(node, now)) + if (!this.client.ready(node, now)) { iter.remove(); + notReadyTimeout = Math.min(notReadyTimeout, this.client.connectionDelay(node, now)); + } } // create produce requests @@ -163,16 +166,22 @@ public class Sender implements Runnable { List<ClientRequest> requests = createProduceRequests(batches, now); sensors.updateProduceRequestMetrics(requests); + // If we have any nodes that are ready to send + have sendable data, poll with 0 timeout so this can immediately + // loop and try sending more data. Otherwise, the timeout is determined by nodes that have partitions with data + // that isn't yet sendable (e.g. lingering, backing off). Note that this specifically does not include nodes + // with sendable data that aren't ready to send since they would cause busy looping. + long pollTimeout = Math.min(result.nextReadyCheckDelayMs, notReadyTimeout); if (result.readyNodes.size() > 0) { log.trace("Nodes with data ready to send: {}", result.readyNodes); log.trace("Created {} produce requests: {}", requests.size(), requests); + pollTimeout = 0; } // if some partitions are already ready to be sent, the select time would be 0; // otherwise if some partition already has some data accumulated but not ready yet, // the select time will be the time difference between now and its linger expiry time; // otherwise the select time will be the time difference between now and the metadata expiry time; - List<ClientResponse> responses = this.client.poll(requests, result.nextReadyCheckDelayMs, now); + List<ClientResponse> responses = this.client.poll(requests, pollTimeout, now); for (ClientResponse response : responses) { if (response.wasDisconnected()) handleDisconnect(response, now); http://git-wip-us.apache.org/repos/asf/kafka/blob/8bd57381/clients/src/test/java/org/apache/kafka/clients/MockClient.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index aae8d4a..47b5d4a 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -41,6 +41,11 @@ public class MockClient implements KafkaClient { return found; } + @Override + public long connectionDelay(Node node, long now) { + return 0; + } + public void disconnect(Integer node) { Iterator<ClientRequest> iter = requests.iterator(); while (iter.hasNext()) { http://git-wip-us.apache.org/repos/asf/kafka/blob/8bd57381/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java index 0762b35..2c99324 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java @@ -14,6 +14,7 @@ package org.apache.kafka.clients.producer; import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; import java.nio.ByteBuffer; @@ -43,16 +44,20 @@ public class RecordAccumulatorTest { private String topic = "test"; private int partition1 = 0; private int partition2 = 1; - private Node node = new Node(0, "localhost", 1111); + private int partition3 = 2; + private Node node1 = new Node(0, "localhost", 1111); + private Node node2 = new Node(1, "localhost", 1112); private TopicPartition tp1 = new TopicPartition(topic, partition1); private TopicPartition tp2 = new TopicPartition(topic, partition2); - private PartitionInfo part1 = new PartitionInfo(topic, partition1, node, null, null); - private PartitionInfo part2 = new PartitionInfo(topic, partition2, node, null, null); + private TopicPartition tp3 = new TopicPartition(topic, partition3); + private PartitionInfo part1 = new PartitionInfo(topic, partition1, node1, null, null); + private PartitionInfo part2 = new PartitionInfo(topic, partition2, node1, null, null); + private PartitionInfo part3 = new PartitionInfo(topic, partition3, node2, null, null); private MockTime time = new MockTime(); private byte[] key = "key".getBytes(); private byte[] value = "value".getBytes(); private int msgSize = Records.LOG_OVERHEAD + Record.recordSize(key, value); - private Cluster cluster = new Cluster(Collections.singleton(node), Arrays.asList(part1, part2)); + private Cluster cluster = new Cluster(Arrays.asList(node1, node2), Arrays.asList(part1, part2, part3)); private Metrics metrics = new Metrics(time); @Test @@ -65,8 +70,8 @@ public class RecordAccumulatorTest { assertEquals("No partitions should be ready.", 0, accum.ready(cluster, now).readyNodes.size()); } accum.append(tp1, key, value, CompressionType.NONE, null); - assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes); - List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node), Integer.MAX_VALUE, 0).get(node.id()); + assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); + List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id()); assertEquals(1, batches.size()); RecordBatch batch = batches.get(0); Iterator<LogEntry> iter = batch.records.iterator(); @@ -83,7 +88,7 @@ public class RecordAccumulatorTest { int batchSize = 512; RecordAccumulator accum = new RecordAccumulator(batchSize, 10 * 1024, 0L, 100L, false, metrics, time); accum.append(tp1, key, new byte[2 * batchSize], CompressionType.NONE, null); - assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes); + assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); } @Test @@ -93,8 +98,8 @@ public class RecordAccumulatorTest { accum.append(tp1, key, value, CompressionType.NONE, null); assertEquals("No partitions should be ready", 0, accum.ready(cluster, time.milliseconds()).readyNodes.size()); time.sleep(10); - assertEquals("Our partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes); - List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node), Integer.MAX_VALUE, 0).get(node.id()); + assertEquals("Our partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); + List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), Integer.MAX_VALUE, 0).get(node1.id()); assertEquals(1, batches.size()); RecordBatch batch = batches.get(0); Iterator<LogEntry> iter = batch.records.iterator(); @@ -113,9 +118,9 @@ public class RecordAccumulatorTest { for (int i = 0; i < appends; i++) accum.append(tp, key, value, CompressionType.NONE, null); } - assertEquals("Partition's leader should be ready", Collections.singleton(node), accum.ready(cluster, time.milliseconds()).readyNodes); + assertEquals("Partition's leader should be ready", Collections.singleton(node1), accum.ready(cluster, time.milliseconds()).readyNodes); - List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node), 1024, 0).get(node.id()); + List<RecordBatch> batches = accum.drain(cluster, Collections.singleton(node1), 1024, 0).get(node1.id()); assertEquals("But due to size bound only one partition should have been retrieved", 1, batches.size()); } @@ -145,7 +150,7 @@ public class RecordAccumulatorTest { long now = time.milliseconds(); while (read < numThreads * msgs) { Set<Node> nodes = accum.ready(cluster, now).readyNodes; - List<RecordBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node.id()); + List<RecordBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node1.id()); if (batches != null) { for (RecordBatch batch : batches) { for (LogEntry entry : batch.records) @@ -159,4 +164,39 @@ public class RecordAccumulatorTest { t.join(); } + + @Test + public void testNextReadyCheckDelay() throws Exception { + // Next check time will use lingerMs since this test won't trigger any retries/backoff + long lingerMs = 10L; + RecordAccumulator accum = new RecordAccumulator(1024, 10 * 1024, lingerMs, 100L, false, metrics, time); + // Just short of going over the limit so we trigger linger time + int appends = 1024 / msgSize; + + // Partition on node1 only + for (int i = 0; i < appends; i++) + accum.append(tp1, key, value, CompressionType.NONE, null); + RecordAccumulator.ReadyCheckResult result = accum.ready(cluster, time.milliseconds()); + assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); + assertEquals("Next check time should be the linger time", lingerMs, result.nextReadyCheckDelayMs); + + time.sleep(lingerMs / 2); + + // Add partition on node2 only + for (int i = 0; i < appends; i++) + accum.append(tp3, key, value, CompressionType.NONE, null); + result = accum.ready(cluster, time.milliseconds()); + assertEquals("No nodes should be ready.", 0, result.readyNodes.size()); + assertEquals("Next check time should be defined by node1, half remaining linger time", lingerMs / 2, result.nextReadyCheckDelayMs); + + // Add data for another partition on node1, enough to make data sendable immediately + for (int i = 0; i < appends+1; i++) + accum.append(tp2, key, value, CompressionType.NONE, null); + result = accum.ready(cluster, time.milliseconds()); + assertEquals("Node1 should be ready", Collections.singleton(node1), result.readyNodes); + // Note this can actually be < linger time because it may use delays from partitions that aren't sendable + // but have leaders with other sendable data. + assertTrue("Next check time should be defined by node2, at most linger time", result.nextReadyCheckDelayMs <= lingerMs); + } + }
