Repository: kafka Updated Branches: refs/heads/trunk f7354e779 -> 022bf1295
MINOR: Use ConcurrentMap for ConsumerNetworkClient UnsentRequests Author: Jason Gustafson <[email protected]> Reviewers: Ismael Juma <[email protected]> Closes #2656 from hachikuji/minor-cleanup-unsent-requests Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/022bf129 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/022bf129 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/022bf129 Branch: refs/heads/trunk Commit: 022bf129518e33e165f9ceefc4ab9e622952d3bd Parents: f7354e7 Author: Jason Gustafson <[email protected]> Authored: Wed Mar 8 23:16:53 2017 +0000 Committer: Ismael Juma <[email protected]> Committed: Wed Mar 8 23:16:53 2017 +0000 ---------------------------------------------------------------------- .../kafka/clients/consumer/KafkaConsumer.java | 2 +- .../internals/ConsumerNetworkClient.java | 95 ++++++++++++-------- .../clients/consumer/internals/Fetcher.java | 2 +- 3 files changed, 58 insertions(+), 41 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/022bf129/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 51b00af..612f446 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -1004,7 +1004,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { // // NOTE: since the consumed position has already been updated, we must not allow // wakeups or any other errors to be triggered prior to returning the fetched records. - if (fetcher.sendFetches() > 0 || client.hasPendingRequest()) + if (fetcher.sendFetches() > 0 || client.hasPendingRequests()) client.pollNoWakeup(); if (this.interceptors == null) http://git-wip-us.apache.org/repos/asf/kafka/blob/022bf129/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index eb25359..478ed3f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -37,11 +37,11 @@ import java.io.IOException; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.Iterator; import java.util.List; -import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.kafka.common.errors.InterruptException; @@ -242,6 +242,9 @@ public class ConsumerNetworkClient implements Closeable { // fail requests that couldn't be sent if they have expired failExpiredRequests(now); + + // clean unsent requests collection to keep the map from growing indefinitely + unsent.clean(); } // called without the lock to avoid deadlock potential if handlers need to acquire locks @@ -271,12 +274,12 @@ public class ConsumerNetworkClient implements Closeable { long startMs = time.milliseconds(); long remainingMs = timeoutMs; - while (hasPendingRequest(node) && remainingMs > 0) { + while (hasPendingRequests(node) && remainingMs > 0) { poll(remainingMs); remainingMs = timeoutMs - (time.milliseconds() - startMs); } - return !hasPendingRequest(node); + return !hasPendingRequests(node); } /** @@ -297,8 +300,8 @@ public class ConsumerNetworkClient implements Closeable { * @param node The node in question * @return A boolean indicating whether there is pending request */ - public boolean hasPendingRequest(Node node) { - if (unsent.hasRequest(node)) + public boolean hasPendingRequests(Node node) { + if (unsent.hasRequests(node)) return true; synchronized (this) { return client.inFlightRequestCount(node.idString()) > 0; @@ -321,8 +324,8 @@ public class ConsumerNetworkClient implements Closeable { * have been transmitted (i.e. in-flight requests) and those which are awaiting transmission. * @return A boolean indicating whether there is pending request */ - public boolean hasPendingRequest() { - if (unsent.hasRequest()) + public boolean hasPendingRequests() { + if (unsent.hasRequests()) return true; synchronized (this) { return client.inFlightRequestCount() > 0; @@ -350,8 +353,7 @@ public class ConsumerNetworkClient implements Closeable { // by NetworkClient, so we just need to check whether connections for any of the unsent // requests have been disconnected; if they have, then we complete the corresponding future // and set the disconnect flag in the ClientResponse - List<Node> nodes = unsent.nodes(); - for (Node node : nodes) { + for (Node node : unsent.nodes()) { if (client.connectionFailed(node)) { // Remove entry before invoking request callback to avoid callbacks handling // coordinator failures traversing the unsent list again. @@ -368,7 +370,7 @@ public class ConsumerNetworkClient implements Closeable { private void failExpiredRequests(long now) { // clear all expired unsent requests and fail their corresponding futures - List<ClientRequest> expiredRequests = unsent.removeExpiredRequests(now, unsentExpiryMs); + Collection<ClientRequest> expiredRequests = unsent.removeExpiredRequests(now, unsentExpiryMs); for (ClientRequest request : expiredRequests) { RequestFutureCompletionHandler handler = (RequestFutureCompletionHandler) request.callback(); handler.onFailure(new TimeoutException("Failed to send request after " + unsentExpiryMs + " ms.")); @@ -471,12 +473,12 @@ public class ConsumerNetworkClient implements Closeable { } } - public class RequestFutureCompletionHandler implements RequestCompletionHandler { + private class RequestFutureCompletionHandler implements RequestCompletionHandler { private final RequestFuture<ClientResponse> future; private ClientResponse response; private RuntimeException e; - public RequestFutureCompletionHandler() { + private RequestFutureCompletionHandler() { this.future = new RequestFuture<>(); } @@ -525,55 +527,55 @@ public class ConsumerNetworkClient implements Closeable { boolean shouldBlock(); } - /* * A threadsafe helper class to hold requests per node that have not been sent yet */ private final static class UnsentRequests { - private final Map<Node, ConcurrentLinkedQueue<ClientRequest>> unsent; + private final ConcurrentMap<Node, ConcurrentLinkedQueue<ClientRequest>> unsent; - public UnsentRequests() { - unsent = new HashMap<>(); + private UnsentRequests() { + unsent = new ConcurrentHashMap<>(); } - public synchronized void put(Node node, ClientRequest request) { - ConcurrentLinkedQueue<ClientRequest> nodeUnsent = unsent.get(node); - if (nodeUnsent == null) { - nodeUnsent = new ConcurrentLinkedQueue<>(); - unsent.put(node, nodeUnsent); + public void put(Node node, ClientRequest request) { + // the lock protects the put from a concurrent removal of the queue for the node + synchronized (unsent) { + ConcurrentLinkedQueue<ClientRequest> requests = unsent.get(node); + if (requests == null) { + requests = new ConcurrentLinkedQueue<>(); + unsent.put(node, requests); + } + requests.add(request); } - nodeUnsent.add(request); } - public synchronized int requestCount(Node node) { + public int requestCount(Node node) { ConcurrentLinkedQueue<ClientRequest> requests = unsent.get(node); return requests == null ? 0 : requests.size(); } - public synchronized int requestCount() { + public int requestCount() { int total = 0; for (ConcurrentLinkedQueue<ClientRequest> requests : unsent.values()) total += requests.size(); return total; } - public synchronized boolean hasRequest(Node node) { + public boolean hasRequests(Node node) { ConcurrentLinkedQueue<ClientRequest> requests = unsent.get(node); return requests != null && !requests.isEmpty(); } - public synchronized boolean hasRequest() { + public boolean hasRequests() { for (ConcurrentLinkedQueue<ClientRequest> requests : unsent.values()) if (!requests.isEmpty()) return true; return false; } - public synchronized List<ClientRequest> removeExpiredRequests(long now, long unsentExpiryMs) { + public Collection<ClientRequest> removeExpiredRequests(long now, long unsentExpiryMs) { List<ClientRequest> expiredRequests = new ArrayList<>(); - Iterator<ConcurrentLinkedQueue<ClientRequest>> iterator = unsent.values().iterator(); - while (iterator.hasNext()) { - ConcurrentLinkedQueue<ClientRequest> requests = iterator.next(); + for (ConcurrentLinkedQueue<ClientRequest> requests : unsent.values()) { Iterator<ClientRequest> requestIterator = requests.iterator(); while (requestIterator.hasNext()) { ClientRequest request = requestIterator.next(); @@ -583,24 +585,39 @@ public class ConsumerNetworkClient implements Closeable { } else break; } - if (requests.isEmpty()) - iterator.remove(); } return expiredRequests; } - public synchronized Collection<ClientRequest> remove(Node node) { - ConcurrentLinkedQueue<ClientRequest> requests = unsent.remove(node); - return requests == null ? Collections.<ClientRequest>emptyList() : requests; + public void clean() { + // the lock protects removal from a concurrent put which could otherwise mutate the + // queue after it has been removed from the map + synchronized (unsent) { + Iterator<ConcurrentLinkedQueue<ClientRequest>> iterator = unsent.values().iterator(); + while (iterator.hasNext()) { + ConcurrentLinkedQueue<ClientRequest> requests = iterator.next(); + if (requests.isEmpty()) + iterator.remove(); + } + } + } + + public Collection<ClientRequest> remove(Node node) { + // the lock protects removal from a concurrent put which could otherwise mutate the + // queue after it has been removed from the map + synchronized (unsent) { + ConcurrentLinkedQueue<ClientRequest> requests = unsent.remove(node); + return requests == null ? Collections.<ClientRequest>emptyList() : requests; + } } - public synchronized Iterator<ClientRequest> requestIterator(Node node) { + public Iterator<ClientRequest> requestIterator(Node node) { ConcurrentLinkedQueue<ClientRequest> requests = unsent.get(node); return requests == null ? Collections.<ClientRequest>emptyIterator() : requests.iterator(); } - public synchronized List<Node> nodes() { - return new ArrayList<>(unsent.keySet()); + public Collection<Node> nodes() { + return unsent.keySet(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/022bf129/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 536e4e8..441206a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -713,7 +713,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener { Node node = cluster.leaderFor(partition); if (node == null) { metadata.requestUpdate(); - } else if (!this.client.hasPendingRequest(node)) { + } else if (!this.client.hasPendingRequests(node)) { // if there is a leader and no in-flight requests, issue a new fetch LinkedHashMap<TopicPartition, FetchRequest.PartitionData> fetch = fetchable.get(node); if (fetch == null) {
