KAFKA-3418: add javadoc section describing consumer failure detection

Author: Jason Gustafson <[email protected]>

Reviewers: Manikumar Reddy <[email protected]>, Ismael Juma 
<[email protected]>, Ewen Cheslack-Postava <[email protected]>

Closes #1129 from hachikuji/KAFKA-3418


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/fe6c481b
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/fe6c481b
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/fe6c481b

Branch: refs/heads/0.10.0
Commit: fe6c481b38d6b1b61341e4e1a6237f64accfbfbc
Parents: cea01af
Author: Jason Gustafson <[email protected]>
Authored: Fri Apr 29 10:26:01 2016 -0700
Committer: Ewen Cheslack-Postava <[email protected]>
Committed: Fri Apr 29 10:26:01 2016 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   | 71 ++++++++++++++++----
 1 file changed, 57 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/fe6c481b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index ad44d16..7290a38 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -76,7 +76,7 @@ import java.util.regex.Pattern;
  * <h3>Offsets and Consumer Position</h3>
  * Kafka maintains a numerical offset for each record in a partition. This 
offset acts as a kind of unique identifier of
  * a record within that partition, and also denotes the position of the 
consumer in the partition. That is, a consumer
- * which has position 5 has consumed records with offsets 0 through 4 and will 
next receive record with offset 5. There
+ * which has position 5 has consumed records with offsets 0 through 4 and will 
next receive the record with offset 5. There
  * are actually two notions of position relevant to the user of the consumer.
  * <p>
  * The {@link #position(TopicPartition) position} of the consumer gives the 
offset of the next record that will be given
@@ -95,22 +95,23 @@ import java.util.regex.Pattern;
  *
  * <h3>Consumer Groups and Topic Subscriptions</h3>
  *
- * Kafka uses the concept of <i>consumer groups</i> to allow a pool of 
processes to divide up the work of consuming and
+ * Kafka uses the concept of <i>consumer groups</i> to allow a pool of 
processes to divide the work of consuming and
  * processing records. These processes can either be running on the same 
machine or, as is more likely, they can be
- * distributed over many machines to provide additional scalability and fault 
tolerance for processing.
+ * distributed over many machines to provide scalability and fault tolerance 
for processing.
  * <p>
- * Each Kafka consumer is able to configure a consumer group that it belongs 
to, and can dynamically set the
- * list of topics it wants to subscribe to through {@link 
#subscribe(Collection, ConsumerRebalanceListener)},
- * or subscribe to all topics matching certain pattern through {@link 
#subscribe(Pattern, ConsumerRebalanceListener)}.
- * Kafka will deliver each message in the
- * subscribed topics to one process in each consumer group. This is achieved 
by balancing the partitions in the topic
- * over the consumer processes in each group. So if there is a topic with four 
partitions, and a consumer group with two
- * processes, each process would consume from two partitions. This group 
membership is maintained dynamically: if a
- * process fails the partitions assigned to it will be reassigned to other 
processes in the same group, and if a new
- * process joins the group, partitions will be moved from existing consumers 
to this new process.
+ * Each Kafka consumer is able to configure a consumer group that it belongs 
to, and can dynamically set the list
+ * of topics it wants to subscribe to through one of the {@link 
#subscribe(Collection, ConsumerRebalanceListener) subscribe}
+ * APIs. Kafka will deliver each message in the subscribed topics to one 
process in each consumer group.
+ * This is achieved by balancing the partitions between all members in the 
consumer group so that each partition is
+ * assigned to exactly one consumer in the group. So if there is a topic with 
four partitions, and a consumer group with two
+ * processes, each process would consume from two partitions.
  * <p>
- * So if two processes subscribe to a topic both specifying different groups 
they will each get all the records in that
- * topic; if they both specify the same group they will each get about half 
the records.
+ * Membership in a consumer group is maintained dynamically: if a process 
fails, the partitions assigned to it will
+ * be reassigned to other consumers in the same group. Similarly, if a new 
consumer joins the group, partitions will be moved
+ * from existing consumers to the new one. This is known as <i>rebalancing</i> 
the group and is discussed in more
+ * detail <a href="#failuredetection">below</a>. Note that the same process is 
also used when new partitions are added
+ * to one of the subscribed topics: the group automatically detects the new 
partitions and rebalances the group so
+ * that every new partition is assigned to one of the members.
  * <p>
  * Conceptually you can think of a consumer group as being a single logical 
subscriber that happens to be made up of
  * multiple processes. As a multi-subscriber system, Kafka naturally supports 
having any number of consumer groups for a
@@ -131,6 +132,48 @@ import java.util.regex.Pattern;
  * (similar to the older "simple" consumer) using {@link #assign(Collection)}. 
In this case, dynamic partition
  * assignment and consumer group coordination will be disabled.
  *
+ * <h3><a name="failuredetection">Detecting Consumer Failures</a></h3>
+ *
+ * After subscribing to a set of topics, the consumer will automatically join 
the group when {@link #poll(long)} is
+ * invoked. The poll API is designed to ensure consumer liveness. As long as 
you continue to call poll, the consumer
+ * will stay in the group and continue to receive messages from the partitions 
it was assigned. Underneath the covers,
+ * the poll API sends periodic heartbeats to the server; when you stop calling 
poll (perhaps because an exception was thrown),
+ * then no heartbeats will be sent. If a period of the configured <i>session 
timeout</i> elapses before the server
+ * has received a heartbeat, then the consumer will be kicked out of the group 
and its partitions will be reassigned.
+ * This is designed to prevent situations where the consumer has failed, yet 
continues to hold onto the partitions
+ * it was assigned (thus preventing active consumers in the group from taking 
them). To stay in the group, you
+ * have to prove you are still alive by calling poll.
+ * <p>
+ * The implication of this design is that message processing time in the poll 
loop must be bounded so that
+ * heartbeats can be sent before expiration of the session timeout. What 
typically happens when processing time
+ * exceeds the session timeout is that the consumer won't be able to commit 
offsets for any of the processed records.
+ * For example, this is indicated by a {@link CommitFailedException} thrown 
from {@link #commitSync()}. This
+ * guarantees that only active members of the group are allowed to commit 
offsets. If the consumer
+ * has been kicked out of the group, then its partitions will have been 
assigned to another member, which will be
+ * committing its own offsets as it handles new records. This gives offset 
commits an isolation guarantee.
+ * <p>
+ * The consumer provides two configuration settings to control this behavior:
+ * <ol>
+ *     <li><code>session.timeout.ms</code>: By increasing the session timeout, 
you can give the consumer more
+ *     time to handle a batch of records returned from {@link #poll(long)}. 
The only drawback is that it
+ *     will take longer for the server to detect hard consumer failures, which 
can cause a delay before
+ *     a rebalance can be completed. However, clean shutdown with {@link 
#close()} is not impacted since
+ *     the consumer will send an explicit message to the server to leave the 
group and cause an immediate
+ *     rebalance.</li>
+ *     <li><code>max.poll.records</code>: Processing time in the poll loop is 
typically proportional to the number
+ *     of records processed, so it's natural to want to set a limit on the 
number of records handled at once.
+ *     This setting provides that. By default, there is essentially no 
limit.</li>
+ * </ol>
+ * <p>
+ * For use cases where message processing time varies unpredictably, neither 
of these options may be viable.
+ * The recommended way to handle these cases is to move message processing to 
another thread, which allows
+ * the consumer to continue sending heartbeats while the processor is still 
working. Some care must be taken
+ * to ensure that committed offsets do not get ahead of the actual position. 
Typically, you must disable automatic
+ * commits and manually commit processed offsets for records only after the 
thread has finished handling them
+ * (depending on the delivery semantics you need). Note also that you will 
generally need to {@link #pause(Collection)}
+ * the partition so that no new records are received from poll until after 
thread has finished handling those
+ * previously returned.
+ *
  * <h3>Usage Examples</h3>
  * The consumer APIs offer flexibility to cover a variety of consumption use 
cases. Here are some examples to
  * demonstrate how to use them.

Reply via email to