KAFKA-3418: add javadoc section describing consumer failure detection Author: Jason Gustafson <[email protected]>
Reviewers: Manikumar Reddy <[email protected]>, Ismael Juma <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #1129 from hachikuji/KAFKA-3418 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fe6c481b Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fe6c481b Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fe6c481b Branch: refs/heads/0.10.0 Commit: fe6c481b38d6b1b61341e4e1a6237f64accfbfbc Parents: cea01af Author: Jason Gustafson <[email protected]> Authored: Fri Apr 29 10:26:01 2016 -0700 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Fri Apr 29 10:26:01 2016 -0700 ---------------------------------------------------------------------- .../kafka/clients/consumer/KafkaConsumer.java | 71 ++++++++++++++++---- 1 file changed, 57 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/fe6c481b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index ad44d16..7290a38 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -76,7 +76,7 @@ import java.util.regex.Pattern; * <h3>Offsets and Consumer Position</h3> * Kafka maintains a numerical offset for each record in a partition. This offset acts as a kind of unique identifier of * a record within that partition, and also denotes the position of the consumer in the partition. That is, a consumer - * which has position 5 has consumed records with offsets 0 through 4 and will next receive record with offset 5. There + * which has position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5. There * are actually two notions of position relevant to the user of the consumer. * <p> * The {@link #position(TopicPartition) position} of the consumer gives the offset of the next record that will be given @@ -95,22 +95,23 @@ import java.util.regex.Pattern; * * <h3>Consumer Groups and Topic Subscriptions</h3> * - * Kafka uses the concept of <i>consumer groups</i> to allow a pool of processes to divide up the work of consuming and + * Kafka uses the concept of <i>consumer groups</i> to allow a pool of processes to divide the work of consuming and * processing records. These processes can either be running on the same machine or, as is more likely, they can be - * distributed over many machines to provide additional scalability and fault tolerance for processing. + * distributed over many machines to provide scalability and fault tolerance for processing. * <p> - * Each Kafka consumer is able to configure a consumer group that it belongs to, and can dynamically set the - * list of topics it wants to subscribe to through {@link #subscribe(Collection, ConsumerRebalanceListener)}, - * or subscribe to all topics matching certain pattern through {@link #subscribe(Pattern, ConsumerRebalanceListener)}. - * Kafka will deliver each message in the - * subscribed topics to one process in each consumer group. This is achieved by balancing the partitions in the topic - * over the consumer processes in each group. So if there is a topic with four partitions, and a consumer group with two - * processes, each process would consume from two partitions. This group membership is maintained dynamically: if a - * process fails the partitions assigned to it will be reassigned to other processes in the same group, and if a new - * process joins the group, partitions will be moved from existing consumers to this new process. + * Each Kafka consumer is able to configure a consumer group that it belongs to, and can dynamically set the list + * of topics it wants to subscribe to through one of the {@link #subscribe(Collection, ConsumerRebalanceListener) subscribe} + * APIs. Kafka will deliver each message in the subscribed topics to one process in each consumer group. + * This is achieved by balancing the partitions between all members in the consumer group so that each partition is + * assigned to exactly one consumer in the group. So if there is a topic with four partitions, and a consumer group with two + * processes, each process would consume from two partitions. * <p> - * So if two processes subscribe to a topic both specifying different groups they will each get all the records in that - * topic; if they both specify the same group they will each get about half the records. + * Membership in a consumer group is maintained dynamically: if a process fails, the partitions assigned to it will + * be reassigned to other consumers in the same group. Similarly, if a new consumer joins the group, partitions will be moved + * from existing consumers to the new one. This is known as <i>rebalancing</i> the group and is discussed in more + * detail <a href="#failuredetection">below</a>. Note that the same process is also used when new partitions are added + * to one of the subscribed topics: the group automatically detects the new partitions and rebalances the group so + * that every new partition is assigned to one of the members. * <p> * Conceptually you can think of a consumer group as being a single logical subscriber that happens to be made up of * multiple processes. As a multi-subscriber system, Kafka naturally supports having any number of consumer groups for a @@ -131,6 +132,48 @@ import java.util.regex.Pattern; * (similar to the older "simple" consumer) using {@link #assign(Collection)}. In this case, dynamic partition * assignment and consumer group coordination will be disabled. * + * <h3><a name="failuredetection">Detecting Consumer Failures</a></h3> + * + * After subscribing to a set of topics, the consumer will automatically join the group when {@link #poll(long)} is + * invoked. The poll API is designed to ensure consumer liveness. As long as you continue to call poll, the consumer + * will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers, + * the poll API sends periodic heartbeats to the server; when you stop calling poll (perhaps because an exception was thrown), + * then no heartbeats will be sent. If a period of the configured <i>session timeout</i> elapses before the server + * has received a heartbeat, then the consumer will be kicked out of the group and its partitions will be reassigned. + * This is designed to prevent situations where the consumer has failed, yet continues to hold onto the partitions + * it was assigned (thus preventing active consumers in the group from taking them). To stay in the group, you + * have to prove you are still alive by calling poll. + * <p> + * The implication of this design is that message processing time in the poll loop must be bounded so that + * heartbeats can be sent before expiration of the session timeout. What typically happens when processing time + * exceeds the session timeout is that the consumer won't be able to commit offsets for any of the processed records. + * For example, this is indicated by a {@link CommitFailedException} thrown from {@link #commitSync()}. This + * guarantees that only active members of the group are allowed to commit offsets. If the consumer + * has been kicked out of the group, then its partitions will have been assigned to another member, which will be + * committing its own offsets as it handles new records. This gives offset commits an isolation guarantee. + * <p> + * The consumer provides two configuration settings to control this behavior: + * <ol> + * <li><code>session.timeout.ms</code>: By increasing the session timeout, you can give the consumer more + * time to handle a batch of records returned from {@link #poll(long)}. The only drawback is that it + * will take longer for the server to detect hard consumer failures, which can cause a delay before + * a rebalance can be completed. However, clean shutdown with {@link #close()} is not impacted since + * the consumer will send an explicit message to the server to leave the group and cause an immediate + * rebalance.</li> + * <li><code>max.poll.records</code>: Processing time in the poll loop is typically proportional to the number + * of records processed, so it's natural to want to set a limit on the number of records handled at once. + * This setting provides that. By default, there is essentially no limit.</li> + * </ol> + * <p> + * For use cases where message processing time varies unpredictably, neither of these options may be viable. + * The recommended way to handle these cases is to move message processing to another thread, which allows + * the consumer to continue sending heartbeats while the processor is still working. Some care must be taken + * to ensure that committed offsets do not get ahead of the actual position. Typically, you must disable automatic + * commits and manually commit processed offsets for records only after the thread has finished handling them + * (depending on the delivery semantics you need). Note also that you will generally need to {@link #pause(Collection)} + * the partition so that no new records are received from poll until after thread has finished handling those + * previously returned. + * * <h3>Usage Examples</h3> * The consumer APIs offer flexibility to cover a variety of consumption use cases. Here are some examples to * demonstrate how to use them.
