Repository: kafka Updated Branches: refs/heads/0.10.1 ac6cce77e -> abf056c08
KAFKA-4234; Revert automatic offset commit behavior in consumer's `unsubscribe()` Temporarily disable the offset commit (when auto commit is enabled) in the new consumer's `unsubscribe()` method towards a workaround for the issue reported in [KAFKA-3491](https://issues.apache.org/jira/browse/KAFKA-3491). For now, a call to `unsubscribe()` can be made to reset the offsets in case processing the batch received from the most recent `poll()` is interrupted (due to some exception). Author: Vahid Hashemian <[email protected]> Reviewers: Jason Gustafson <[email protected]> Closes #1944 from vahidhashemian/KAFKA-4234 (cherry picked from commit 20322446aa261dec8b51e6e4514307e926a29ba5) Signed-off-by: Jason Gustafson <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/abf056c0 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/abf056c0 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/abf056c0 Branch: refs/heads/0.10.1 Commit: abf056c08565d6966f4feccf722dac98be18dc31 Parents: ac6cce7 Author: Vahid Hashemian <[email protected]> Authored: Fri Sep 30 13:13:24 2016 -0700 Committer: Jason Gustafson <[email protected]> Committed: Fri Sep 30 13:28:55 2016 -0700 ---------------------------------------------------------------------- .../apache/kafka/clients/consumer/KafkaConsumer.java | 4 ---- .../kafka/clients/consumer/KafkaConsumerTest.java | 12 +----------- 2 files changed, 1 insertion(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/abf056c0/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index d5b1a4b..830f071 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -874,10 +874,6 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { public void unsubscribe() { acquire(); try { - // make sure the offsets of topic partitions the consumer is unsubscribing from - // are committed since there will be no following rebalance - this.coordinator.maybeAutoCommitOffsetsNow(); - log.debug("Unsubscribed all topics or patterns and assigned partitions"); this.subscriptions.unsubscribe(); this.coordinator.maybeLeaveGroup(); http://git-wip-us.apache.org/repos/asf/kafka/blob/abf056c0/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java index 0096e72..b1c6962 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/KafkaConsumerTest.java @@ -614,8 +614,7 @@ public class KafkaConsumerTest { * do not immediately change, and the latest consumed offsets of its to-be-revoked * partitions are properly committed (when auto-commit is enabled). * Upon unsubscribing from subscribed topics the consumer subscription and assignment - * are both updated right away and its consumed offsets are committed (if auto-commit - * is enabled). + * are both updated right away but its consumed offsets are not auto committed. */ @Test public void testSubscriptionChangesWithAutoCommitEnabled() { @@ -722,21 +721,12 @@ public class KafkaConsumerTest { assertTrue(consumer.assignment().size() == 2); assertTrue(consumer.assignment().contains(tp0) && consumer.assignment().contains(t3p0)); - // mock the offset commit response for to be revoked partitions - Map<TopicPartition, Long> partitionOffsets2 = new HashMap<>(); - partitionOffsets2.put(tp0, 2L); - partitionOffsets2.put(t3p0, 100L); - commitReceived = prepareOffsetCommitResponse(client, coordinator, partitionOffsets2); - consumer.unsubscribe(); // verify that subscription and assignment are both cleared assertTrue(consumer.subscription().isEmpty()); assertTrue(consumer.assignment().isEmpty()); - // verify that the offset commits occurred as expected - assertTrue(commitReceived.get()); - consumer.close(); }
