KAFKA-3419: clarify difference between topic subscription and partition assignment
Author: Jason Gustafson <[email protected]> Reviewers: Ashish Singh, Ismael Juma, Guozhang Wang Closes #1158 from hachikuji/KAFKA-3419 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/eb08e493 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/eb08e493 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/eb08e493 Branch: refs/heads/0.10.0 Commit: eb08e493228e2e34eae361922796dcffb920e78d Parents: 09f4a7f Author: Jason Gustafson <[email protected]> Authored: Sun Apr 3 13:44:05 2016 -0700 Committer: Gwen Shapira <[email protected]> Committed: Tue Apr 5 17:08:53 2016 -0700 ---------------------------------------------------------------------- .../kafka/clients/consumer/KafkaConsumer.java | 41 ++++++++++---------- 1 file changed, 21 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/eb08e493/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index b15d07f..c457c83 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -125,8 +125,9 @@ import java.util.regex.Pattern; * commits (note that offsets are always committed for a given consumer group), etc. * See <a href="#rebalancecallback">Storing Offsets Outside Kafka</a> for more details * <p> - * It is also possible for the consumer to manually specify the partitions that are assigned to it through {@link #assign(Collection)}, - * which disables this dynamic partition assignment. + * It is also possible for the consumer to <a href="#manualassignment">manually assign</a> specific partitions + * (similar to the older "simple" consumer) using {@link #assign(Collection)}. In this case, dynamic partition + * assignment and consumer group coordination will be disabled. * * <h3>Usage Examples</h3> * The consumer APIs offer flexibility to cover a variety of consumption use cases. Here are some examples to @@ -242,27 +243,23 @@ import java.util.regex.Pattern; * <b>Note: The committed offset should always be the offset of the next message that your application will read.</b> * Thus, when calling {@link #commitSync(Map) commitSync(offsets)} you should add one to the offset of the last message processed. * - * <h4>Subscribing To Specific Partitions</h4> + * <h4><a name="manualassignment">Manual Partition Assignment</a></h4> * - * In the previous examples we subscribed to the topics we were interested in and let Kafka give our particular process - * a fair share of the partitions for those topics. This provides a simple load balancing mechanism so multiple - * instances of our program can divided up the work of processing records. + * In the previous examples, we subscribed to the topics we were interested in and let Kafka dynamically assign a + * fair share of the partitions for those topics based on the active consumers in the group. However, in + * some cases you may need finer control over the specific partitions that are assigned. For example: * <p> - * In this mode the consumer will just get the partitions it subscribes to and if the consumer instance fails no attempt - * will be made to rebalance partitions to other instances. - * <p> - * There are several cases where this makes sense: * <ul> - * <li>The first case is if the process is maintaining some kind of local state associated with that partition (like a - * local on-disk key-value store) and hence it should only get records for the partition it is maintaining on disk. - * <li>Another case is if the process itself is highly available and will be restarted if it fails (perhaps using a + * <li>If the process is maintaining some kind of local state associated with that partition (like a + * local on-disk key-value store), then it should only get records for the partition it is maintaining on disk. + * <li>If the process itself is highly available and will be restarted if it fails (perhaps using a * cluster management framework like YARN, Mesos, or AWS facilities, or as part of a stream processing framework). In - * this case there is no need for Kafka to detect the failure and reassign the partition, rather the consuming process + * this case there is no need for Kafka to detect the failure and reassign the partition since the consuming process * will be restarted on another machine. * </ul> * <p> - * This mode is easy to specify, rather than subscribing to the topic, the consumer just subscribes to particular - * partitions: + * To use this mode, instead of subscribing to the topic using {@link #subscribe(Collection) subscribe}, you just call + * {@link #assign(Collection)} with the full list of partitions that you want to consume. * * <pre> * String topic = "foo"; @@ -271,11 +268,15 @@ import java.util.regex.Pattern; * consumer.assign(Arrays.asList(partition0, partition1)); * </pre> * - * The group that the consumer specifies is still used for committing offsets, but now the set of partitions will only - * be changed if the consumer specifies new partitions, and no attempt at failure detection will be made. + * Once assigned, you can call {@link #poll(long) poll} in a loop, just as in the preceding examples to consume + * records. The group that the consumer specifies is still used for committing offsets, but now the set of partitions + * will only change with another call to {@link #assign(Collection) assign}. Manual partition assignment does + * not use group coordination, so consumer failures will not cause assigned partitions to be rebalanced. Each consumer + * acts independently even if it shares a groupId with another consumer. To avoid offset commit conflicts, you should + * usually ensure that the groupId is unique for each consumer instance. * <p> - * It isn't possible to mix both subscription to specific partitions (with no load balancing) and to topics (with load - * balancing) using the same consumer instance. + * Note that it isn't possible to mix manual partition assignment (i.e. using {@link #assign(Collection) assign}) + * with dynamic partition assignment through topic subscription (i.e. using {@link #subscribe(Collection) subscribe}). * * <h4><a name="rebalancecallback">Storing Offsets Outside Kafka</h4> *
