Repository: kafka Updated Branches: refs/heads/trunk 5b5f6bbe6 -> 13e483ade
KAFKA-2942: inadvertent auto-commit when pre-fetching can cause message loss Author: Jason Gustafson <[email protected]> Reviewers: Guozhang Wang Closes #623 from hachikuji/KAFKA-2942 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/13e483ad Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/13e483ad Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/13e483ad Branch: refs/heads/trunk Commit: 13e483adeee8d968397a21bde3bb159516f26ff0 Parents: 5b5f6bb Author: Jason Gustafson <[email protected]> Authored: Thu Dec 3 11:01:08 2015 -0800 Committer: Guozhang Wang <[email protected]> Committed: Thu Dec 3 11:01:08 2015 -0800 ---------------------------------------------------------------------- .../kafka/clients/consumer/KafkaConsumer.java | 11 +++++------ .../internals/ConsumerNetworkClient.java | 19 +++++++++++++++---- 2 files changed, 20 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/13e483ad/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 9b36af6..c559593 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -830,13 +830,12 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { // and avoid block waiting for their responses to enable pipelining while the user // is handling the fetched records. // - // NOTE that in this case we need to disable wakeups for the non-blocking poll since - // the consumed positions has already been updated and hence we must return these - // records to users to process before being interrupted + // NOTE that we use quickPoll() in this case which disables wakeups and delayed + // task execution since the consumed positions has already been updated and we + // must return these records to users to process before being interrupted or + // auto-committing offsets fetcher.initFetches(metadata.fetch()); - client.disableWakeups(); - client.poll(0); - client.enableWakeups(); + client.quickPoll(); return new ConsumerRecords<>(records); } http://git-wip-us.apache.org/repos/asf/kafka/blob/13e483ad/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java index 84c312e..f707d6f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java @@ -175,7 +175,7 @@ public class ConsumerNetworkClient implements Closeable { long remaining = timeout; long now = begin; do { - poll(remaining, now); + poll(remaining, now, true); now = time.milliseconds(); long elapsed = now - begin; remaining = timeout - elapsed; @@ -190,10 +190,20 @@ public class ConsumerNetworkClient implements Closeable { * @throws WakeupException if {@link #wakeup()} is called from another thread */ public void poll(long timeout) { - poll(timeout, time.milliseconds()); + poll(timeout, time.milliseconds(), true); } - private void poll(long timeout, long now) { + /** + * Poll for network IO and return immediately. This will not trigger wakeups, + * nor will it execute any delayed tasks. + */ + public void quickPoll() { + disableWakeups(); + poll(0, time.milliseconds(), false); + enableWakeups(); + } + + private void poll(long timeout, long now, boolean executeDelayedTasks) { // send all the requests we can send now trySend(now); @@ -209,7 +219,8 @@ public class ConsumerNetworkClient implements Closeable { checkDisconnects(now); // execute scheduled tasks - delayedTasks.poll(now); + if (executeDelayedTasks) + delayedTasks.poll(now); // try again to send requests since buffer space may have been // cleared or a connect finished in the poll
