Repository: kafka
Updated Branches:
  refs/heads/0.10.2 ac05c343a -> a9f4b671f


MINOR: Ensure consumer calls poll() if requests are outstanding

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Onur Karaman <okara...@linkedin.com>, Ismael Juma <ism...@juma.me.uk>

Closes #2596 from hachikuji/ensure-poll-with-inflight-requests

(cherry picked from commit 3749832637b58b291a45076658feb8cd6e830247)
Signed-off-by: Jason Gustafson <ja...@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/a9f4b671
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/a9f4b671
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/a9f4b671

Branch: refs/heads/0.10.2
Commit: a9f4b671f5c429e57eff5a543c8ce8a58edea118
Parents: ac05c34
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri Feb 24 15:49:13 2017 -0800
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri Feb 24 15:50:26 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/kafka/clients/consumer/KafkaConsumer.java     | 3 +--
 1 file changed, 1 insertion(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/a9f4b671/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 8149ad6..2791bc5 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -1000,9 +1000,8 @@ public class KafkaConsumer<K, V> implements Consumer<K, 
V> {
                     //
                     // NOTE: since the consumed position has already been 
updated, we must not allow
                     // wakeups or any other errors to be triggered prior to 
returning the fetched records.
-                    if (fetcher.sendFetches() > 0) {
+                    if (fetcher.sendFetches() > 0 || 
client.pendingRequestCount() > 0)
                         client.pollNoWakeup();
-                    }
 
                     if (this.interceptors == null)
                         return new ConsumerRecords<>(records);

Reply via email to