Repository: kafka Updated Branches: refs/heads/0.10.1 8ed595836 -> 8ad31173e
KAFKA-3824; Clarify autocommit delivery semantics for consumer Author: Jason Gustafson <[email protected]> Reviewers: Vahid Hashemian <[email protected]>, Jiangjie Qin <[email protected]>, Ismael Juma <[email protected]> Closes #1936 from hachikuji/KAFKA-3824 (cherry picked from commit f8b69aacd4ed1af1263e46b93879cea2d855df3b) Signed-off-by: Jason Gustafson <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8ad31173 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8ad31173 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8ad31173 Branch: refs/heads/0.10.1 Commit: 8ad31173e52bf304a40c675680bade35c1f97bfb Parents: 8ed5958 Author: Jason Gustafson <[email protected]> Authored: Fri Sep 30 17:25:42 2016 -0700 Committer: Jason Gustafson <[email protected]> Committed: Fri Sep 30 17:26:05 2016 -0700 ---------------------------------------------------------------------- .../kafka/clients/consumer/KafkaConsumer.java | 141 ++++++++++--------- 1 file changed, 73 insertions(+), 68 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/8ad31173/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 830f071..e263448 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -65,55 +65,54 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; /** - * A Kafka client that consumes records from a Kafka cluster. + * A client that consumes records from a Kafka cluster. * <p> - * It will transparently handle the failure of servers in the Kafka cluster, and transparently adapt as partitions of - * data it fetches migrate within the cluster. This client also interacts with the server to allow groups of - * consumers to load balance consumption using consumer groups (as described below). + * This client transparently handles the failure of Kafka brokers, and transparently adapts as topic partitions + * it fetches migrate within the cluster. This client also interacts with the broker to allow groups of + * consumers to load balance consumption using <a href="#consumergroups">consumer groups</a>. * <p> * The consumer maintains TCP connections to the necessary brokers to fetch data. * Failure to close the consumer after use will leak these connections. * The consumer is not thread-safe. See <a href="#multithreaded">Multi-threaded Processing</a> for more details. * * <h3>Offsets and Consumer Position</h3> - * Kafka maintains a numerical offset for each record in a partition. This offset acts as a kind of unique identifier of - * a record within that partition, and also denotes the position of the consumer in the partition. That is, a consumer - * which has position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5. There - * are actually two notions of position relevant to the user of the consumer. + * Kafka maintains a numerical offset for each record in a partition. This offset acts as a unique identifier of + * a record within that partition, and also denotes the position of the consumer in the partition. For example, a consumer + * which is at position 5 has consumed records with offsets 0 through 4 and will next receive the record with offset 5. There + * are actually two notions of position relevant to the user of the consumer: * <p> * The {@link #position(TopicPartition) position} of the consumer gives the offset of the next record that will be given * out. It will be one larger than the highest offset the consumer has seen in that partition. It automatically advances - * every time the consumer receives data calls {@link #poll(long)} and receives messages. + * every time the consumer receives messages in a call to {@link #poll(long)}. * <p> - * The {@link #commitSync() committed position} is the last offset that has been saved securely. Should the - * process fail and restart, this is the offset that it will recover to. The consumer can either automatically commit - * offsets periodically; or it can choose to control this committed position manually by calling - * {@link #commitSync() commitSync}, which will block until the offsets have been successfully committed - * or fatal error has happened during the commit process, or {@link #commitAsync(OffsetCommitCallback) commitAsync} which is non-blocking - * and will trigger {@link OffsetCommitCallback} upon either successfully committed or fatally failed. + * The {@link #commitSync() committed position} is the last offset that has been stored securely. Should the + * process fail and restart, this is the offset that the consumer will recover to. The consumer can either automatically commit + * offsets periodically; or it can choose to control this committed position manually by calling one of the commit APIs + * (e.g. {@link #commitSync() commitSync} and {@link #commitAsync(OffsetCommitCallback) commitAsync}). * <p> * This distinction gives the consumer control over when a record is considered consumed. It is discussed in further * detail below. * - * <h3>Consumer Groups and Topic Subscriptions</h3> + * <h3><a name="consumergroups">Consumer Groups and Topic Subscriptions</a></h3> * * Kafka uses the concept of <i>consumer groups</i> to allow a pool of processes to divide the work of consuming and - * processing records. These processes can either be running on the same machine or, as is more likely, they can be - * distributed over many machines to provide scalability and fault tolerance for processing. + * processing records. These processes can either be running on the same machine or they can be + * distributed over many machines to provide scalability and fault tolerance for processing. All consumer instances + * sharing the same <code>group.id</code> will be part of the same consumer group. * <p> - * Each Kafka consumer is able to configure a consumer group that it belongs to, and can dynamically set the list - * of topics it wants to subscribe to through one of the {@link #subscribe(Collection, ConsumerRebalanceListener) subscribe} - * APIs. Kafka will deliver each message in the subscribed topics to one process in each consumer group. - * This is achieved by balancing the partitions between all members in the consumer group so that each partition is - * assigned to exactly one consumer in the group. So if there is a topic with four partitions, and a consumer group with two - * processes, each process would consume from two partitions. + * Each consumer in a group can dynamically set the list of topics it wants to subscribe to through one of the + * {@link #subscribe(Collection, ConsumerRebalanceListener) subscribe} APIs. Kafka will deliver each message in the + * subscribed topics to one process in each consumer group. This is achieved by balancing the partitions between all + * members in the consumer group so that each partition is assigned to exactly one consumer in the group. So if there + * is a topic with four partitions, and a consumer group with two processes, each process would consume from two partitions. * <p> * Membership in a consumer group is maintained dynamically: if a process fails, the partitions assigned to it will * be reassigned to other consumers in the same group. Similarly, if a new consumer joins the group, partitions will be moved * from existing consumers to the new one. This is known as <i>rebalancing</i> the group and is discussed in more - * detail <a href="#failuredetection">below</a>. Note that the same process is also used when new partitions are added - * to one of the subscribed topics: the group automatically detects the new partitions and rebalances the group so - * that every new partition is assigned to one of the members. + * detail <a href="#failuredetection">below</a>. Group rebalancing is also used when new partitions are added + * to one of the subscribed topics or when a new topic matching a {@link #subscribe(Pattern, ConsumerRebalanceListener) subscribed regex} + * is created. The group will automatically detect the new partitions through periodic metadata refreshes and + * assign them to members of the group. * <p> * Conceptually you can think of a consumer group as being a single logical subscriber that happens to be made up of * multiple processes. As a multi-subscriber system, Kafka naturally supports having any number of consumer groups for a @@ -125,10 +124,9 @@ import java.util.regex.Pattern; * have multiple such groups. To get semantics similar to pub-sub in a traditional messaging system each process would * have its own consumer group, so each process would subscribe to all the records published to the topic. * <p> - * In addition, when group reassignment happens automatically, consumers can be notified through {@link ConsumerRebalanceListener}, + * In addition, when group reassignment happens automatically, consumers can be notified through a {@link ConsumerRebalanceListener}, * which allows them to finish necessary application-level logic such as state cleanup, manual offset - * commits (note that offsets are always committed for a given consumer group), etc. - * See <a href="#rebalancecallback">Storing Offsets Outside Kafka</a> for more details + * commits, etc. See <a href="#rebalancecallback">Storing Offsets Outside Kafka</a> for more details. * <p> * It is also possible for the consumer to <a href="#manualassignment">manually assign</a> specific partitions * (similar to the older "simple" consumer) using {@link #assign(Collection)}. In this case, dynamic partition @@ -141,37 +139,36 @@ import java.util.regex.Pattern; * will stay in the group and continue to receive messages from the partitions it was assigned. Underneath the covers, * the consumer sends periodic heartbeats to the server. If the consumer crashes or is unable to send heartbeats for * a duration of <code>session.timeout.ms</code>, then the consumer will be considered dead and its partitions will - * be reassigned. It is also possible that the consumer could encounter a "livelock" situation where it is continuing - * to send heartbeats, but no progress is being made. To prevent the consumer from holding onto its partitions - * indefinitely in this case, we provide a liveness detection mechanism: basically if you don't call poll at least - * as frequently as the configured <code>poll.interval.ms</code>, then the client will proactively leave the group - * so that another consumer can take over its partitions. So to stay in the group, you must continue to call poll + * be reassigned. * <p> - * The implication of this design is that message processing time in the poll loop must be bounded so that - * you always ensure that poll() is called at least once every poll interval. If not, then the consumer leaves - * the group, which typically results in an offset commit failure when the processing of the polled records - * finally completes (this is indicated by a {@link CommitFailedException} thrown from {@link #commitSync()}). + * It is also possible that the consumer could encounter a "livelock" situation where it is continuing + * to send heartbeats, but no progress is being made. To prevent the consumer from holding onto its partitions + * indefinitely in this case, we provide a liveness detection mechanism using the <code>max.poll.interval.ms</code> + * setting. Basically if you don't call poll at least as frequently as the configured max interval, + * then the client will proactively leave the group so that another consumer can take over its partitions. When this happens, + * you may see an offset commit failure (as indicated by a {@link CommitFailedException} thrown from a call to {@link #commitSync()}). * This is a safety mechanism which guarantees that only active members of the group are able to commit offsets. - * If the consumer has been kicked out of the group, then its partitions will have been assigned to another member, - * which will be committing its own offsets as it handles new records. This gives offset commits an isolation guarantee. + * So to stay in the group, you must continue to call poll. * <p> * The consumer provides two configuration settings to control the behavior of the poll loop: * <ol> * <li><code>max.poll.interval.ms</code>: By increasing the interval between expected polls, you can give * the consumer more time to handle a batch of records returned from {@link #poll(long)}. The drawback * is that increasing this value may delay a group rebalance since the consumer will only join the rebalance - * inside the call to poll.</li> + * inside the call to poll. You can use this setting to bound the time to finish a rebalance, but + * you risk slower progress if the consumer cannot actually call {@link #poll(long) poll} often enough.</li> * <li><code>max.poll.records</code>: Use this setting to limit the total records returned from a single * call to poll. This can make it easier to predict the maximum that must be handled within each poll - * interval.</li> + * interval. By tuning this value, you may be able to reduce the poll interval, which will reduce the + * impact of group rebalancing.</li> * </ol> * <p> - * For use cases where message processing time varies unpredictably, neither of these options may be viable. + * For use cases where message processing time varies unpredictably, neither of these options may be sufficient. * The recommended way to handle these cases is to move message processing to another thread, which allows - * the consumer to continue sending heartbeats while the processor is still working. Some care must be taken + * the consumer to continue calling {@link #poll(long) poll} while the processor is still working. Some care must be taken * to ensure that committed offsets do not get ahead of the actual position. Typically, you must disable automatic * commits and manually commit processed offsets for records only after the thread has finished handling them - * (depending on the delivery semantics you need). Note also that you will generally need to {@link #pause(Collection)} + * (depending on the delivery semantics you need). Note also that you will need to {@link #pause(Collection) pause} * the partition so that no new records are received from poll until after thread has finished handling those * previously returned. * @@ -199,36 +196,26 @@ import java.util.regex.Pattern; * } * </pre> * - * Setting <code>enable.auto.commit</code> means that offsets are committed automatically with a frequency controlled by - * the config <code>auto.commit.interval.ms</code>. - * <p> * The connection to the cluster is bootstrapped by specifying a list of one or more brokers to contact using the * configuration <code>bootstrap.servers</code>. This list is just used to discover the rest of the brokers in the * cluster and need not be an exhaustive list of servers in the cluster (though you may want to specify more than one in * case there are servers down when the client is connecting). * <p> - * In this example the client is subscribing to the topics <i>foo</i> and <i>bar</i> as part of a group of consumers - * called <i>test</i> as described above. + * Setting <code>enable.auto.commit</code> means that offsets are committed automatically with a frequency controlled by + * the config <code>auto.commit.interval.ms</code>. + * <p> + * In this example the consumer is subscribing to the topics <i>foo</i> and <i>bar</i> as part of a group of consumers + * called <i>test</i> as configured with <code>group.id</code>. * <p> * The deserializer settings specify how to turn bytes into objects. For example, by specifying string deserializers, we * are saying that our record's key and value will just be simple strings. * * <h4>Manual Offset Control</h4> * - * Instead of relying on the consumer to periodically commit consumed offsets, users can also control when messages - * should be considered as consumed and hence commit their offsets. This is useful when the consumption of the messages + * Instead of relying on the consumer to periodically commit consumed offsets, users can also control when records + * should be considered as consumed and hence commit their offsets. This is useful when the consumption of the * are coupled with some processing logic and hence a message should not be considered as consumed until it is completed processing. - * In this example we will consume a batch of records and batch them up in memory, when we have sufficient records - * batched we will insert them into a database. If we allowed offsets to auto commit as in the previous example messages - * would be considered consumed after they were given out by the consumer, and it would be possible that our process - * could fail after we have read messages into our in-memory buffer but before they had been inserted into the database. - * To avoid this we will manually commit the offsets only once the corresponding messages have been inserted into the - * database. This gives us exact control of when a message is considered consumed. This raises the opposite possibility: - * the process could fail in the interval after the insert into the database but before the commit (even though this - * would likely just be a few milliseconds, it is a possibility). In this case the process that took over consumption - * would consume from last committed offset and would repeat the insert of the last batch of data. Used in this way - * Kafka provides what is often called "at-least once delivery" guarantees, as each message will likely be delivered one - * time but in failure cases could be duplicated. + * <p> * <pre> * Properties props = new Properties(); @@ -254,9 +241,28 @@ import java.util.regex.Pattern; * } * </pre> * - * The above example uses {@link #commitSync() commitSync} to mark all received messages as committed. In some cases - * you may wish to have even finer control over which messages have been committed by specifying an offset explicitly. - * In the example below we commit offset after we finish handling the messages in each partition. + * In this example we will consume a batch of records and batch them up in memory. When we have enough records + * batched, we will insert them into a database. If we allowed offsets to auto commit as in the previous example, records + * would be considered consumed after they were returned to the user in {@link #poll(long) poll}. It would then be possible + * for our process to fail after batching the records, but before they had been inserted into the database. + * <p> + * To avoid this, we will manually commit the offsets only after the corresponding records have been inserted into the + * database. This gives us exact control of when a record is considered consumed. This raises the opposite possibility: + * the process could fail in the interval after the insert into the database but before the commit (even though this + * would likely just be a few milliseconds, it is a possibility). In this case the process that took over consumption + * would consume from last committed offset and would repeat the insert of the last batch of data. Used in this way + * Kafka provides what is often called "at-least-once" delivery guarantees, as each record will likely be delivered one + * time but in failure cases could be duplicated. + * <p> + * <b>Note: Using automatic offset commits can also give you "at-least-once" delivery, but the requirement is that + * you must consume all data returned from each call to {@link #poll(long)} before any subsequent calls, or before + * {@link #close() closing} the consumer. If you fail to do either of these, it is possible for the committed offset + * to get ahead of the consumed position, which results in missing records. The advantage of using manual offset + * control is that you have direct control over when a record is considered "consumed."</b> + * <p> + * The above example uses {@link #commitSync() commitSync} to mark all received records as committed. In some cases + * you may wish to have even finer control over which records have been committed by specifying an offset explicitly. + * In the example below we commit offset after we finish handling the records in each partition. * <p> * <pre> * try { @@ -450,7 +456,6 @@ import java.util.regex.Pattern; * We have intentionally avoided implementing a particular threading model for processing. This leaves several * options for implementing multi-threaded processing of records. * - * * <h4>1. One Consumer Per Thread</h4> * * A simple option is to give each thread its own consumer instance. Here are the pros and cons of this approach:
