Repository: kafka
Updated Branches:
  refs/heads/0.10.1 8ed595836 -> 8ad31173e


KAFKA-3824; Clarify autocommit delivery semantics for consumer

Author: Jason Gustafson <[email protected]>

Reviewers: Vahid Hashemian <[email protected]>, Jiangjie Qin 
<[email protected]>, Ismael Juma <[email protected]>

Closes #1936 from hachikuji/KAFKA-3824

(cherry picked from commit f8b69aacd4ed1af1263e46b93879cea2d855df3b)
Signed-off-by: Jason Gustafson <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8ad31173
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8ad31173
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8ad31173

Branch: refs/heads/0.10.1
Commit: 8ad31173e52bf304a40c675680bade35c1f97bfb
Parents: 8ed5958
Author: Jason Gustafson <[email protected]>
Authored: Fri Sep 30 17:25:42 2016 -0700
Committer: Jason Gustafson <[email protected]>
Committed: Fri Sep 30 17:26:05 2016 -0700

----------------------------------------------------------------------
 .../kafka/clients/consumer/KafkaConsumer.java   | 141 ++++++++++---------
 1 file changed, 73 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad31173/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
index 830f071..e263448 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
@@ -65,55 +65,54 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 
 /**
- * A Kafka client that consumes records from a Kafka cluster.
+ * A client that consumes records from a Kafka cluster.
  * <p>
- * It will transparently handle the failure of servers in the Kafka cluster, 
and transparently adapt as partitions of
- * data it fetches migrate within the cluster. This client also interacts with 
the server to allow groups of
- * consumers to load balance consumption using consumer groups (as described 
below).
+ * This client transparently handles the failure of Kafka brokers, and 
transparently adapts as topic partitions
+ * it fetches migrate within the cluster. This client also interacts with the 
broker to allow groups of
+ * consumers to load balance consumption using <a 
href="#consumergroups">consumer groups</a>.
  * <p>
  * The consumer maintains TCP connections to the necessary brokers to fetch 
data.
  * Failure to close the consumer after use will leak these connections.
  * The consumer is not thread-safe. See <a 
href="#multithreaded">Multi-threaded Processing</a> for more details.
  *
  * <h3>Offsets and Consumer Position</h3>
- * Kafka maintains a numerical offset for each record in a partition. This 
offset acts as a kind of unique identifier of
- * a record within that partition, and also denotes the position of the 
consumer in the partition. That is, a consumer
- * which has position 5 has consumed records with offsets 0 through 4 and will 
next receive the record with offset 5. There
- * are actually two notions of position relevant to the user of the consumer.
+ * Kafka maintains a numerical offset for each record in a partition. This 
offset acts as a unique identifier of
+ * a record within that partition, and also denotes the position of the 
consumer in the partition. For example, a consumer
+ * which is at position 5 has consumed records with offsets 0 through 4 and 
will next receive the record with offset 5. There
+ * are actually two notions of position relevant to the user of the consumer:
  * <p>
  * The {@link #position(TopicPartition) position} of the consumer gives the 
offset of the next record that will be given
  * out. It will be one larger than the highest offset the consumer has seen in 
that partition. It automatically advances
- * every time the consumer receives data calls {@link #poll(long)} and 
receives messages.
+ * every time the consumer receives messages in a call to {@link #poll(long)}.
  * <p>
- * The {@link #commitSync() committed position} is the last offset that has 
been saved securely. Should the
- * process fail and restart, this is the offset that it will recover to. The 
consumer can either automatically commit
- * offsets periodically; or it can choose to control this committed position 
manually by calling
- * {@link #commitSync() commitSync}, which will block until the offsets have 
been successfully committed
- * or fatal error has happened during the commit process, or {@link 
#commitAsync(OffsetCommitCallback) commitAsync} which is non-blocking
- * and will trigger {@link OffsetCommitCallback} upon either successfully 
committed or fatally failed.
+ * The {@link #commitSync() committed position} is the last offset that has 
been stored securely. Should the
+ * process fail and restart, this is the offset that the consumer will recover 
to. The consumer can either automatically commit
+ * offsets periodically; or it can choose to control this committed position 
manually by calling one of the commit APIs
+ * (e.g. {@link #commitSync() commitSync} and {@link 
#commitAsync(OffsetCommitCallback) commitAsync}).
  * <p>
  * This distinction gives the consumer control over when a record is 
considered consumed. It is discussed in further
  * detail below.
  *
- * <h3>Consumer Groups and Topic Subscriptions</h3>
+ * <h3><a name="consumergroups">Consumer Groups and Topic 
Subscriptions</a></h3>
  *
  * Kafka uses the concept of <i>consumer groups</i> to allow a pool of 
processes to divide the work of consuming and
- * processing records. These processes can either be running on the same 
machine or, as is more likely, they can be
- * distributed over many machines to provide scalability and fault tolerance 
for processing.
+ * processing records. These processes can either be running on the same 
machine or they can be
+ * distributed over many machines to provide scalability and fault tolerance 
for processing. All consumer instances
+ * sharing the same <code>group.id</code> will be part of the same consumer 
group.
  * <p>
- * Each Kafka consumer is able to configure a consumer group that it belongs 
to, and can dynamically set the list
- * of topics it wants to subscribe to through one of the {@link 
#subscribe(Collection, ConsumerRebalanceListener) subscribe}
- * APIs. Kafka will deliver each message in the subscribed topics to one 
process in each consumer group.
- * This is achieved by balancing the partitions between all members in the 
consumer group so that each partition is
- * assigned to exactly one consumer in the group. So if there is a topic with 
four partitions, and a consumer group with two
- * processes, each process would consume from two partitions.
+ * Each consumer in a group can dynamically set the list of topics it wants to 
subscribe to through one of the
+ * {@link #subscribe(Collection, ConsumerRebalanceListener) subscribe} APIs. 
Kafka will deliver each message in the
+ * subscribed topics to one process in each consumer group. This is achieved 
by balancing the partitions between all
+ * members in the consumer group so that each partition is assigned to exactly 
one consumer in the group. So if there
+ * is a topic with four partitions, and a consumer group with two processes, 
each process would consume from two partitions.
  * <p>
  * Membership in a consumer group is maintained dynamically: if a process 
fails, the partitions assigned to it will
  * be reassigned to other consumers in the same group. Similarly, if a new 
consumer joins the group, partitions will be moved
  * from existing consumers to the new one. This is known as <i>rebalancing</i> 
the group and is discussed in more
- * detail <a href="#failuredetection">below</a>. Note that the same process is 
also used when new partitions are added
- * to one of the subscribed topics: the group automatically detects the new 
partitions and rebalances the group so
- * that every new partition is assigned to one of the members.
+ * detail <a href="#failuredetection">below</a>. Group rebalancing is also 
used when new partitions are added
+ * to one of the subscribed topics or when a new topic matching a {@link 
#subscribe(Pattern, ConsumerRebalanceListener) subscribed regex}
+ * is created. The group will automatically detect the new partitions through 
periodic metadata refreshes and
+ * assign them to members of the group.
  * <p>
  * Conceptually you can think of a consumer group as being a single logical 
subscriber that happens to be made up of
  * multiple processes. As a multi-subscriber system, Kafka naturally supports 
having any number of consumer groups for a
@@ -125,10 +124,9 @@ import java.util.regex.Pattern;
  * have multiple such groups. To get semantics similar to pub-sub in a 
traditional messaging system each process would
  * have its own consumer group, so each process would subscribe to all the 
records published to the topic.
  * <p>
- * In addition, when group reassignment happens automatically, consumers can 
be notified through {@link ConsumerRebalanceListener},
+ * In addition, when group reassignment happens automatically, consumers can 
be notified through a {@link ConsumerRebalanceListener},
  * which allows them to finish necessary application-level logic such as state 
cleanup, manual offset
- * commits (note that offsets are always committed for a given consumer 
group), etc.
- * See <a href="#rebalancecallback">Storing Offsets Outside Kafka</a> for more 
details
+ * commits, etc. See <a href="#rebalancecallback">Storing Offsets Outside 
Kafka</a> for more details.
  * <p>
  * It is also possible for the consumer to <a 
href="#manualassignment">manually assign</a> specific partitions
  * (similar to the older "simple" consumer) using {@link #assign(Collection)}. 
In this case, dynamic partition
@@ -141,37 +139,36 @@ import java.util.regex.Pattern;
  * will stay in the group and continue to receive messages from the partitions 
it was assigned. Underneath the covers,
  * the consumer sends periodic heartbeats to the server. If the consumer 
crashes or is unable to send heartbeats for
  * a duration of <code>session.timeout.ms</code>, then the consumer will be 
considered dead and its partitions will
- * be reassigned. It is also possible that the consumer could encounter a 
"livelock" situation where it is continuing
- * to send heartbeats, but no progress is being made. To prevent the consumer 
from holding onto its partitions
- * indefinitely in this case, we provide a liveness detection mechanism: 
basically if you don't call poll at least
- * as frequently as the configured <code>poll.interval.ms</code>, then the 
client will proactively leave the group
- * so that another consumer can take over its partitions. So to stay in the 
group, you must continue to call poll
+ * be reassigned.
  * <p>
- * The implication of this design is that message processing time in the poll 
loop must be bounded so that
- * you always ensure that poll() is called at least once every poll interval. 
If not, then the consumer leaves
- * the group, which typically results in an offset commit failure when the 
processing of the polled records
- * finally completes (this is indicated by a {@link CommitFailedException} 
thrown from {@link #commitSync()}).
+ * It is also possible that the consumer could encounter a "livelock" 
situation where it is continuing
+ * to send heartbeats, but no progress is being made. To prevent the consumer 
from holding onto its partitions
+ * indefinitely in this case, we provide a liveness detection mechanism using 
the <code>max.poll.interval.ms</code>
+ * setting. Basically if you don't call poll at least as frequently as the 
configured max interval,
+ * then the client will proactively leave the group so that another consumer 
can take over its partitions. When this happens,
+ * you may see an offset commit failure (as indicated by a {@link 
CommitFailedException} thrown from a call to {@link #commitSync()}).
  * This is a safety mechanism which guarantees that only active members of the 
group are able to commit offsets.
- * If the consumer has been kicked out of the group, then its partitions will 
have been assigned to another member,
- * which will be committing its own offsets as it handles new records. This 
gives offset commits an isolation guarantee.
+ * So to stay in the group, you must continue to call poll.
  * <p>
  * The consumer provides two configuration settings to control the behavior of 
the poll loop:
  * <ol>
  *     <li><code>max.poll.interval.ms</code>: By increasing the interval 
between expected polls, you can give
  *     the consumer more time to handle a batch of records returned from 
{@link #poll(long)}. The drawback
  *     is that increasing this value may delay a group rebalance since the 
consumer will only join the rebalance
- *     inside the call to poll.</li>
+ *     inside the call to poll. You can use this setting to bound the time to 
finish a rebalance, but
+ *     you risk slower progress if the consumer cannot actually call {@link 
#poll(long) poll} often enough.</li>
  *     <li><code>max.poll.records</code>: Use this setting to limit the total 
records returned from a single
  *     call to poll. This can make it easier to predict the maximum that must 
be handled within each poll
- *     interval.</li>
+ *     interval. By tuning this value, you may be able to reduce the poll 
interval, which will reduce the
+ *     impact of group rebalancing.</li>
  * </ol>
  * <p>
- * For use cases where message processing time varies unpredictably, neither 
of these options may be viable.
+ * For use cases where message processing time varies unpredictably, neither 
of these options may be sufficient.
  * The recommended way to handle these cases is to move message processing to 
another thread, which allows
- * the consumer to continue sending heartbeats while the processor is still 
working. Some care must be taken
+ * the consumer to continue calling {@link #poll(long) poll} while the 
processor is still working. Some care must be taken
  * to ensure that committed offsets do not get ahead of the actual position. 
Typically, you must disable automatic
  * commits and manually commit processed offsets for records only after the 
thread has finished handling them
- * (depending on the delivery semantics you need). Note also that you will 
generally need to {@link #pause(Collection)}
+ * (depending on the delivery semantics you need). Note also that you will 
need to {@link #pause(Collection) pause}
  * the partition so that no new records are received from poll until after 
thread has finished handling those
  * previously returned.
  *
@@ -199,36 +196,26 @@ import java.util.regex.Pattern;
  *     }
  * </pre>
  *
- * Setting <code>enable.auto.commit</code> means that offsets are committed 
automatically with a frequency controlled by
- * the config <code>auto.commit.interval.ms</code>.
- * <p>
  * The connection to the cluster is bootstrapped by specifying a list of one 
or more brokers to contact using the
  * configuration <code>bootstrap.servers</code>. This list is just used to 
discover the rest of the brokers in the
  * cluster and need not be an exhaustive list of servers in the cluster 
(though you may want to specify more than one in
  * case there are servers down when the client is connecting).
  * <p>
- * In this example the client is subscribing to the topics <i>foo</i> and 
<i>bar</i> as part of a group of consumers
- * called <i>test</i> as described above.
+ * Setting <code>enable.auto.commit</code> means that offsets are committed 
automatically with a frequency controlled by
+ * the config <code>auto.commit.interval.ms</code>.
+ * <p>
+ * In this example the consumer is subscribing to the topics <i>foo</i> and 
<i>bar</i> as part of a group of consumers
+ * called <i>test</i> as configured with <code>group.id</code>.
  * <p>
  * The deserializer settings specify how to turn bytes into objects. For 
example, by specifying string deserializers, we
  * are saying that our record's key and value will just be simple strings.
  *
  * <h4>Manual Offset Control</h4>
  *
- * Instead of relying on the consumer to periodically commit consumed offsets, 
users can also control when messages
- * should be considered as consumed and hence commit their offsets. This is 
useful when the consumption of the messages
+ * Instead of relying on the consumer to periodically commit consumed offsets, 
users can also control when records
+ * should be considered as consumed and hence commit their offsets. This is 
useful when the consumption of the
  * are coupled with some processing logic and hence a message should not be 
considered as consumed until it is completed processing.
- * In this example we will consume a batch of records and batch them up in 
memory, when we have sufficient records
- * batched we will insert them into a database. If we allowed offsets to auto 
commit as in the previous example messages
- * would be considered consumed after they were given out by the consumer, and 
it would be possible that our process
- * could fail after we have read messages into our in-memory buffer but before 
they had been inserted into the database.
- * To avoid this we will manually commit the offsets only once the 
corresponding messages have been inserted into the
- * database. This gives us exact control of when a message is considered 
consumed. This raises the opposite possibility:
- * the process could fail in the interval after the insert into the database 
but before the commit (even though this
- * would likely just be a few milliseconds, it is a possibility). In this case 
the process that took over consumption
- * would consume from last committed offset and would repeat the insert of the 
last batch of data. Used in this way
- * Kafka provides what is often called "at-least once delivery" guarantees, as 
each message will likely be delivered one
- * time but in failure cases could be duplicated.
+
  * <p>
  * <pre>
  *     Properties props = new Properties();
@@ -254,9 +241,28 @@ import java.util.regex.Pattern;
  *     }
  * </pre>
  *
- * The above example uses {@link #commitSync() commitSync} to mark all 
received messages as committed. In some cases
- * you may wish to have even finer control over which messages have been 
committed by specifying an offset explicitly.
- * In the example below we commit offset after we finish handling the messages 
in each partition.
+ * In this example we will consume a batch of records and batch them up in 
memory. When we have enough records
+ * batched, we will insert them into a database. If we allowed offsets to auto 
commit as in the previous example, records
+ * would be considered consumed after they were returned to the user in {@link 
#poll(long) poll}. It would then be possible
+ * for our process to fail after batching the records, but before they had 
been inserted into the database.
+ * <p>
+ * To avoid this, we will manually commit the offsets only after the 
corresponding records have been inserted into the
+ * database. This gives us exact control of when a record is considered 
consumed. This raises the opposite possibility:
+ * the process could fail in the interval after the insert into the database 
but before the commit (even though this
+ * would likely just be a few milliseconds, it is a possibility). In this case 
the process that took over consumption
+ * would consume from last committed offset and would repeat the insert of the 
last batch of data. Used in this way
+ * Kafka provides what is often called "at-least-once" delivery guarantees, as 
each record will likely be delivered one
+ * time but in failure cases could be duplicated.
+ * <p>
+ * <b>Note: Using automatic offset commits can also give you "at-least-once" 
delivery, but the requirement is that
+ * you must consume all data returned from each call to {@link #poll(long)} 
before any subsequent calls, or before
+ * {@link #close() closing} the consumer. If you fail to do either of these, 
it is possible for the committed offset
+ * to get ahead of the consumed position, which results in missing records. 
The advantage of using manual offset
+ * control is that you have direct control over when a record is considered 
"consumed."</b>
+ * <p>
+ * The above example uses {@link #commitSync() commitSync} to mark all 
received records as committed. In some cases
+ * you may wish to have even finer control over which records have been 
committed by specifying an offset explicitly.
+ * In the example below we commit offset after we finish handling the records 
in each partition.
  * <p>
  * <pre>
  *     try {
@@ -450,7 +456,6 @@ import java.util.regex.Pattern;
  * We have intentionally avoided implementing a particular threading model for 
processing. This leaves several
  * options for implementing multi-threaded processing of records.
  *
- *
  * <h4>1. One Consumer Per Thread</h4>
  *
  * A simple option is to give each thread its own consumer instance. Here are 
the pros and cons of this approach:

Reply via email to