Repository: kafka Updated Branches: refs/heads/trunk 647182207 -> 342f34a19
MINOR: InFlightRequests#isEmpty(node) method corrected. - In clearAll method, get operation is removed. - variable name `requestTimeout` changed to `requestTimeoutMs` for clarity Author: Kamal C <kamal.chandraprak...@gmail.com> Reviewers: Rajini Sivaram <rajinisiva...@googlemail.com> Closes #3467 from Kamal15/frequest Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/342f34a1 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/342f34a1 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/342f34a1 Branch: refs/heads/trunk Commit: 342f34a1994caafa3cea94a2f85ffd219b8cebc5 Parents: 6471822 Author: Kamal C <kamal.chandraprak...@gmail.com> Authored: Fri Jun 30 15:07:00 2017 -0400 Committer: Rajini Sivaram <rajinisiva...@googlemail.com> Committed: Fri Jun 30 15:07:00 2017 -0400 ---------------------------------------------------------------------- .../org/apache/kafka/clients/InFlightRequests.java | 17 ++++++----------- .../org/apache/kafka/clients/NetworkClient.java | 2 +- 2 files changed, 7 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/342f34a1/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java index 642f028..f977329 100644 --- a/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java +++ b/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java @@ -110,7 +110,7 @@ final class InFlightRequests { */ public boolean isEmpty(String node) { Deque<NetworkClient.InFlightRequest> queue = requests.get(node); - return queue != null && !queue.isEmpty(); + return queue == null || queue.isEmpty(); } /** @@ -141,22 +141,18 @@ final class InFlightRequests { * @return All the in-flight requests for that node that have been removed */ public Iterable<NetworkClient.InFlightRequest> clearAll(String node) { - Deque<NetworkClient.InFlightRequest> reqs = requests.get(node); - if (reqs == null) { - return Collections.emptyList(); - } else { - return requests.remove(node); - } + Deque<NetworkClient.InFlightRequest> reqs = requests.remove(node); + return (reqs == null) ? Collections.<NetworkClient.InFlightRequest>emptyList() : reqs; } /** * Returns a list of nodes with pending in-flight request, that need to be timed out * * @param now current time in milliseconds - * @param requestTimeout max time to wait for the request to be completed + * @param requestTimeoutMs max time to wait for the request to be completed * @return list of nodes */ - public List<String> getNodesWithTimedOutRequests(long now, int requestTimeout) { + public List<String> getNodesWithTimedOutRequests(long now, int requestTimeoutMs) { List<String> nodeIds = new LinkedList<>(); for (Map.Entry<String, Deque<NetworkClient.InFlightRequest>> requestEntry : requests.entrySet()) { String nodeId = requestEntry.getKey(); @@ -165,11 +161,10 @@ final class InFlightRequests { if (!deque.isEmpty()) { NetworkClient.InFlightRequest request = deque.peekLast(); long timeSinceSend = now - request.sendTimeMs; - if (timeSinceSend > requestTimeout) + if (timeSinceSend > requestTimeoutMs) nodeIds.add(nodeId); } } - return nodeIds; } http://git-wip-us.apache.org/repos/asf/kafka/blob/342f34a1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java index 59c606f..60b1598 100644 --- a/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java @@ -482,7 +482,7 @@ public class NetworkClient implements KafkaClient { @Override public boolean hasInFlightRequests(String node) { - return this.inFlightRequests.isEmpty(node); + return !this.inFlightRequests.isEmpty(node); } @Override