Re: New Consumer API discussion
If people don't have any more thoughts on this, I will go ahead and submit a reviewboard to https://issues.apache.org/jira/browse/KAFKA-1328. Thanks, Neha On Mon, Mar 24, 2014 at 5:39 PM, Neha Narkhede neha.narkh...@gmail.comwrote: I took some time to write some example code using the new consumer APIs to cover a range of use cases. This exercise was very useful (thanks for the suggestion, Jay!) since I found several improvements to the APIs to make them more usable. Here are some of the changeshttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/I made - 1. Added usage examples to the KafkaConsumer javadochttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html. I find it useful for the examples to be in the javadoc vs some wiki. Please go through these examples and suggest improvements. The goal would be to document a limited set of examples that cover every major use case. 2. All APIs that either accept or return offsets are changed to MapTopicPartition,Long instead of TopicPartitionOffset... In all the examples that I wrote, it was much easier to deal with offsets and pass them around in the consumer APIs if they were maps instead of lists 3. Due to the above change, I had to introduce commit()http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html#commit%28%29and commitAsync() APIs explicitly, in addition to commit(MapTopicPartition,Long offsets) and commitAsync(MapTopicPartition,Long offsets), since the no-argument case would not be covered automatically with Map as the input parameter to the commit APIs 4. Offset rewind logic is funky with group management. I took a stab and it and wrote examples to cover the various offset rewind uses cases I could think of. I'm not so sure I like it, so I encourage people to take a look at the examples and provide feedback. This feedback is very critical in finalizing the consumer APIs as we might have to add/change APIs to make offset rewind intuitive and easy to use. (Please see the 3rd and 4th examples herehttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html ) Once I have feedback on the above, I will go ahead and submit a review board for the new APIs and javadoc. Thanks Neha On Mon, Mar 24, 2014 at 5:29 PM, Neha Narkhede neha.narkh...@gmail.comwrote: Hey Chris, Really sorry for the late reply, wonder how this fell through the cracks. Anyhow, thanks for the great feedback! Here are my comments - 1. Why is the config String-Object instead of String-String? This is probably more of a feedback about the new config management that we adopted in the new clients. I think it is more convenient to write configs.put(a, 42); instead of configs.put(a, Integer.toString(42)); 2. Are these Java docs correct? KafkaConsumer(java.util.Map java.lang.String,java.lang.Object configs) A consumer is instantiated by providing a set of key-value pairs as configuration and a ConsumerRebalanceCallback implementation There is no ConsumerRebalanceCallback parameter. Fixed. 3. Would like to have a method: poll(long timeout, java.util.concurrent.TimeUnit timeUnit, TopicPartition... topicAndPartitionsToPoll) I see I can effectively do this by just fiddling with subscribe and unsubscribe before each poll. Is this a low-overhead operation? Can I just unsubscribe from everything after each poll, then re-subscribe to a topic the next iteration. I would probably be doing this in a fairly tight loop. The subscribe and unsubscribe will be very lightweight in-memory operations, so it shouldn't be a problem to just use those APIs directly. Let me know if you think otherwise. 4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there are use cases for decoupling what to do when no offset exists from what to do when I'm out of range. I might want to start from smallest the first time I run, but fail if I ever get offset out of range. How about adding a third option disable to auto.offset.reset? What this says is that never automatically reset the offset, either if one is not found or if the offset falls out of range. Presumably, you would want to turn this off when you want to control the offsets yourself and use custom rewind/replay logic to reset the consumer's offset. In this case, you would want to turn this feature off so Kafka does not accidentally reset the offset to something else. I'm not so sure when you would want to make the distinction regarding startup and offset falling out of range. Presumably, if you don't trust Kafka to reset the offset, then you can always turn this off and use commit/commitAsync and seek() to set the consumer to the right offset on startup and every time your consumer falls out of range. Does that make sense? 5. ENABLE_JMX could use Java docs,
Re: New Consumer API discussion
I took some time to write some example code using the new consumer APIs to cover a range of use cases. This exercise was very useful (thanks for the suggestion, Jay!) since I found several improvements to the APIs to make them more usable. Here are some of the changeshttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/I made - 1. Added usage examples to the KafkaConsumer javadochttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html. I find it useful for the examples to be in the javadoc vs some wiki. Please go through these examples and suggest improvements. The goal would be to document a limited set of examples that cover every major use case. 2. All APIs that either accept or return offsets are changed to MapTopicPartition,Long instead of TopicPartitionOffset... In all the examples that I wrote, it was much easier to deal with offsets and pass them around in the consumer APIs if they were maps instead of lists 3. Due to the above change, I had to introduce commit()http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html#commit%28%29and commitAsync() APIs explicitly, in addition to commit(MapTopicPartition,Long offsets) and commitAsync(MapTopicPartition,Long offsets), since the no-argument case would not be covered automatically with Map as the input parameter to the commit APIs 4. Offset rewind logic is funky with group management. I took a stab and it and wrote examples to cover the various offset rewind uses cases I could think of. I'm not so sure I like it, so I encourage people to take a look at the examples and provide feedback. This feedback is very critical in finalizing the consumer APIs as we might have to add/change APIs to make offset rewind intuitive and easy to use. (Please see the 3rd and 4th examples herehttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html ) Once I have feedback on the above, I will go ahead and submit a review board for the new APIs and javadoc. Thanks Neha On Mon, Mar 24, 2014 at 5:29 PM, Neha Narkhede neha.narkh...@gmail.comwrote: Hey Chris, Really sorry for the late reply, wonder how this fell through the cracks. Anyhow, thanks for the great feedback! Here are my comments - 1. Why is the config String-Object instead of String-String? This is probably more of a feedback about the new config management that we adopted in the new clients. I think it is more convenient to write configs.put(a, 42); instead of configs.put(a, Integer.toString(42)); 2. Are these Java docs correct? KafkaConsumer(java.util.Map java.lang.String,java.lang.Object configs) A consumer is instantiated by providing a set of key-value pairs as configuration and a ConsumerRebalanceCallback implementation There is no ConsumerRebalanceCallback parameter. Fixed. 3. Would like to have a method: poll(long timeout, java.util.concurrent.TimeUnit timeUnit, TopicPartition... topicAndPartitionsToPoll) I see I can effectively do this by just fiddling with subscribe and unsubscribe before each poll. Is this a low-overhead operation? Can I just unsubscribe from everything after each poll, then re-subscribe to a topic the next iteration. I would probably be doing this in a fairly tight loop. The subscribe and unsubscribe will be very lightweight in-memory operations, so it shouldn't be a problem to just use those APIs directly. Let me know if you think otherwise. 4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there are use cases for decoupling what to do when no offset exists from what to do when I'm out of range. I might want to start from smallest the first time I run, but fail if I ever get offset out of range. How about adding a third option disable to auto.offset.reset? What this says is that never automatically reset the offset, either if one is not found or if the offset falls out of range. Presumably, you would want to turn this off when you want to control the offsets yourself and use custom rewind/replay logic to reset the consumer's offset. In this case, you would want to turn this feature off so Kafka does not accidentally reset the offset to something else. I'm not so sure when you would want to make the distinction regarding startup and offset falling out of range. Presumably, if you don't trust Kafka to reset the offset, then you can always turn this off and use commit/commitAsync and seek() to set the consumer to the right offset on startup and every time your consumer falls out of range. Does that make sense? 5. ENABLE_JMX could use Java docs, even though it's fairly self-explanatory. Fixed. 6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or across all topic/partitions is useful. I believe it's per-topic/partition, right? That is, setting to 2 megs with two TopicAndPartitions would result
Re: New Consumer API discussion
I'm not quite sure if I fully understood your question. The consumer API exposes a close() method that will shutdown the consumer's connections to all brokers and frees up resources that the consumer uses. I've updated the javadoc for the new consumer API to include a few examples of different ways of using the consumer. Probably you might find it useful - http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html Thanks, Neha On Sun, Mar 16, 2014 at 7:55 PM, Shanmugam, Srividhya srividhyashanmu...@fico.com wrote: Can the consumer API provide a way to shut down the connector by doing a look up by the consumer group Id? For example, application may be consuming the messages in one thread whereas the shutdown call can be initiated in a different thread. This email and any files transmitted with it are confidential, proprietary and intended solely for the individual or entity to whom they are addressed. If you have received this email in error please delete it immediately.
Re: New Consumer API discussion
Can the consumer API provide a way to shut down the connector by doing a look up by the consumer group Id? For example, application may be consuming the messages in one thread whereas the shutdown call can be initiated in a different thread. This email and any files transmitted with it are confidential, proprietary and intended solely for the individual or entity to whom they are addressed. If you have received this email in error please delete it immediately.
Re: New Consumer API discussion
Hey Guys, Sorry for the late follow up. Here are my questions/thoughts on the API: 1. Why is the config String-Object instead of String-String? 2. Are these Java docs correct? KafkaConsumer(java.util.Mapjava.lang.String,java.lang.Object configs) A consumer is instantiated by providing a set of key-value pairs as configuration and a ConsumerRebalanceCallback implementation There is no ConsumerRebalanceCallback parameter. 3. Would like to have a method: poll(long timeout, java.util.concurrent.TimeUnit timeUnit, TopicPartition... topicAndPartitionsToPoll) I see I can effectively do this by just fiddling with subscribe and unsubscribe before each poll. Is this a low-overhead operation? Can I just unsubscribe from everything after each poll, then re-subscribe to a topic the next iteration. I would probably be doing this in a fairly tight loop. 4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there are use cases for decoupling what to do when no offset exists from what to do when I'm out of range. I might want to start from smallest the first time I run, but fail if I ever get offset out of range. 5. ENABLE_JMX could use Java docs, even though it's fairly self-explanatory. 6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or across all topic/partitions is useful. I believe it's per-topic/partition, right? That is, setting to 2 megs with two TopicAndPartitions would result in 4 megs worth of data coming in per fetch, right? 7. What does the consumer do if METADATA_FETCH_TIMEOUT_CONFIG times out? Retry, or throw exception? 8. Does RECONNECT_BACKOFF_MS_CONFIG apply to both metadata requests and fetch requests? 9. What does SESSION_TIMEOUT_MS default to? 10. Is this consumer thread-safe? 11. How do you use a different offset management strategy? Your email implies that it's pluggable, but I don't see how. The offset management strategy defaults to Kafka based offset management and the API provides a way for the user to use a customized offset store to manage the consumer's offsets. 12. If I wish to decouple the consumer from the offset checkpointing, is it OK to use Joel's offset management stuff directly, rather than through the consumer's commit API? Cheers, Chris On 2/10/14 10:54 AM, Neha Narkhede neha.narkh...@gmail.com wrote: As mentioned in previous emails, we are also working on a re-implementation of the consumer. I would like to use this email thread to discuss the details of the public API. I would also like us to be picky about this public api now so it is as good as possible and we don't need to break it in the future. The best way to get a feel for the API is actually to take a look at the javadochttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/ doc/kafka/clients/consumer/KafkaConsumer.html, the hope is to get the api docs good enough so that it is self-explanatory. You can also take a look at the configs herehttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc /kafka/clients/consumer/ConsumerConfig.html Some background info on implementation: At a high level the primary difference in this consumer is that it removes the distinction between the high-level and low-level consumer. The new consumer API is non blocking and instead of returning a blocking iterator, the consumer provides a poll() API that returns a list of records. We think this is better compared to the blocking iterators since it effectively decouples the threading strategy used for processing messages from the consumer. It is worth noting that the consumer is entirely single threaded and runs in the user thread. The advantage is that it can be easily rewritten in less multi-threading-friendly languages. The consumer batches data and multiplexes I/O over TCP connections to each of the brokers it communicates with, for high throughput. The consumer also allows long poll to reduce the end-to-end message latency for low throughput data. The consumer provides a group management facility that supports the concept of a group with multiple consumer instances (just like the current consumer). This is done through a custom heartbeat and group management protocol transparent to the user. At the same time, it allows users the option to subscribe to a fixed set of partitions and not use group management at all. The offset management strategy defaults to Kafka based offset management and the API provides a way for the user to use a customized offset store to manage the consumer's offsets. A key difference in this consumer also is the fact that it does not depend on zookeeper at all. More details about the new consumer design are herehttps://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+ Rewrite+Design Please take a look at the new APIhttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/ kafka/clients/consumer/KafkaConsumer.htmland give us any thoughts you may have. Thanks, Neha
Re: New Consumer API discussion
Hey Guys, Also, for reference, we'll be looking to implement new Samza consumers which have these APIs: http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/or g/apache/samza/system/SystemConsumer.html http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/or g/apache/samza/checkpoint/CheckpointManager.html Question (3) below is a result of having Samza's SystemConsumers poll allow specific topic/partitions to be specified. The split between consumer and checkpoint manager is the reason for question (12) below. Cheers, Chris On 3/3/14 10:19 AM, Chris Riccomini criccom...@linkedin.com wrote: Hey Guys, Sorry for the late follow up. Here are my questions/thoughts on the API: 1. Why is the config String-Object instead of String-String? 2. Are these Java docs correct? KafkaConsumer(java.util.Mapjava.lang.String,java.lang.Object configs) A consumer is instantiated by providing a set of key-value pairs as configuration and a ConsumerRebalanceCallback implementation There is no ConsumerRebalanceCallback parameter. 3. Would like to have a method: poll(long timeout, java.util.concurrent.TimeUnit timeUnit, TopicPartition... topicAndPartitionsToPoll) I see I can effectively do this by just fiddling with subscribe and unsubscribe before each poll. Is this a low-overhead operation? Can I just unsubscribe from everything after each poll, then re-subscribe to a topic the next iteration. I would probably be doing this in a fairly tight loop. 4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there are use cases for decoupling what to do when no offset exists from what to do when I'm out of range. I might want to start from smallest the first time I run, but fail if I ever get offset out of range. 5. ENABLE_JMX could use Java docs, even though it's fairly self-explanatory. 6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or across all topic/partitions is useful. I believe it's per-topic/partition, right? That is, setting to 2 megs with two TopicAndPartitions would result in 4 megs worth of data coming in per fetch, right? 7. What does the consumer do if METADATA_FETCH_TIMEOUT_CONFIG times out? Retry, or throw exception? 8. Does RECONNECT_BACKOFF_MS_CONFIG apply to both metadata requests and fetch requests? 9. What does SESSION_TIMEOUT_MS default to? 10. Is this consumer thread-safe? 11. How do you use a different offset management strategy? Your email implies that it's pluggable, but I don't see how. The offset management strategy defaults to Kafka based offset management and the API provides a way for the user to use a customized offset store to manage the consumer's offsets. 12. If I wish to decouple the consumer from the offset checkpointing, is it OK to use Joel's offset management stuff directly, rather than through the consumer's commit API? Cheers, Chris On 2/10/14 10:54 AM, Neha Narkhede neha.narkh...@gmail.com wrote: As mentioned in previous emails, we are also working on a re-implementation of the consumer. I would like to use this email thread to discuss the details of the public API. I would also like us to be picky about this public api now so it is as good as possible and we don't need to break it in the future. The best way to get a feel for the API is actually to take a look at the javadochttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc / doc/kafka/clients/consumer/KafkaConsumer.html, the hope is to get the api docs good enough so that it is self-explanatory. You can also take a look at the configs herehttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/do c /kafka/clients/consumer/ConsumerConfig.html Some background info on implementation: At a high level the primary difference in this consumer is that it removes the distinction between the high-level and low-level consumer. The new consumer API is non blocking and instead of returning a blocking iterator, the consumer provides a poll() API that returns a list of records. We think this is better compared to the blocking iterators since it effectively decouples the threading strategy used for processing messages from the consumer. It is worth noting that the consumer is entirely single threaded and runs in the user thread. The advantage is that it can be easily rewritten in less multi-threading-friendly languages. The consumer batches data and multiplexes I/O over TCP connections to each of the brokers it communicates with, for high throughput. The consumer also allows long poll to reduce the end-to-end message latency for low throughput data. The consumer provides a group management facility that supports the concept of a group with multiple consumer instances (just like the current consumer). This is done through a custom heartbeat and group management protocol transparent to the user. At the same time, it allows users the option to subscribe to a fixed set of partitions and not use group management at all. The
Re: New Consumer API discussion
improvements to the docs http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29 3.commit(): The following comment in the doc should probably say commit offsets for partitions assigned to this consumer. If no partitions are specified, commits offsets for the subscribed list of topics and partitions to Kafka. Could you give more context on this suggestion? Here is the entire doc - Synchronously commits the specified offsets for the specified list of topics and partitions to *Kafka*. If no partitions are specified, commits offsets for the subscribed list of topics and partitions. The hope is to convey that if no partitions are specified, offsets will be committed for the subscribed list of partitions. One improvement could be to explicitly state that the offsets returned on the last poll will be committed. I updated this to - Synchronously commits the specified offsets for the specified list of topics and partitions to *Kafka*. If no offsets are specified, commits offsets returned on the last {@link #poll(long, TimeUnit) poll()} for the subscribed list of topics and partitions. 4. There is inconsistency in specifying partitions. Sometimes we use TopicPartition and some other times we use String and int (see examples below). void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions) public void *subscribe*(java.lang.String topic, int... partitions) Yes, this was discussed previously. I think generally the consensus seems to be to use the higher level classes everywhere. Made those changes. What's the use case of position()? Isn't that just the nextOffset() on the last message returned from poll()? Yes, except in the case where a rebalance is triggered and poll() is not yet invoked. Here, you would use position() to get the new fetch position for the specific partition. Even if this is not a common use case, IMO it is much easier to use position() to get the fetch offset than invoking nextOffset() on the last message. This also keeps the APIs symmetric, which is nice. On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert robert.with...@dish.com wrote: That's wonderful. Thanks for kafka. Rob On Feb 24, 2014, at 9:58 AM, Guozhang Wang wangg...@gmail.com mailto: wangg...@gmail.com wrote: Hi Robert, Yes, you can check out the callback functions in the new API onPartitionDesigned onPartitionAssigned and see if they meet your needs. Guozhang On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.com wrote: Jun, Are you saying it is possible to get events from the high-level consumer regarding various state machine changes? For instance, can we get a notification when a rebalance starts and ends, when a partition is assigned/unassigned, when an offset is committed on a partition, when a leader changes and so on? I call this OOB traffic, since they are not the core messages streaming, but side-band events, yet they are still potentially useful to consumers. Thank you, Robert Robert Withers Staff Analyst/Developer o: (720) 514-8963 c: (571) 262-1873 -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Sunday, February 23, 2014 4:19 PM To: users@kafka.apache.orgmailto:users@kafka.apache.org Subject: Re: New Consumer API discussion Robert, For the push orient api, you can potentially implement your own MessageHandler with those methods. In the main loop of our new consumer api, you can just call those methods based on the events you get. Also, we already have an api to get the first and the last offset of a partition (getOffsetBefore). Thanks, Jun On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.com wrote: This is a good idea, too. I would modify it to include stream marking, then you can have: long end = consumer.lastOffset(tp); consumer.setMark(end); while(consumer.beforeMark()) { process(consumer.pollToMark()); } or long end = consumer.lastOffset(tp); consumer.setMark(end); for(Object msg : consumer.iteratorToMark()) { process(msg); } I actually have 4 suggestions, then: * pull: stream marking * pull: finite streams, bound by time range (up-to-now, yesterday) or offset * pull: async api * push: KafkaMessageSource, for a push model, with msg and OOB events. Build one in either individual or chunk
Re: New Consumer API discussion
be made for arguably the more common use case of subscribing to a single topic as well. In these cases, user is required to write more code to create a single item collection and pass it in. Since subscription is extremely lightweight invoking it multiple times also seems like a workable solution, no? 2. It would be good to document that the following apis are mutually exclusive. Also, if the partition level subscription is specified, there is no group management. Finally, unsubscribe() can only be used to cancel subscriptions with the same pattern. For example, you can't unsubscribe at the partition level if the subscription is done at the topic level. *subscribe*(java.lang.String... topics) *subscribe*(java.lang.String topic, int... partitions) Makes sense. Made the suggested improvements to the docs http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29 3.commit(): The following comment in the doc should probably say commit offsets for partitions assigned to this consumer. If no partitions are specified, commits offsets for the subscribed list of topics and partitions to Kafka. Could you give more context on this suggestion? Here is the entire doc - Synchronously commits the specified offsets for the specified list of topics and partitions to *Kafka*. If no partitions are specified, commits offsets for the subscribed list of topics and partitions. The hope is to convey that if no partitions are specified, offsets will be committed for the subscribed list of partitions. One improvement could be to explicitly state that the offsets returned on the last poll will be committed. I updated this to - Synchronously commits the specified offsets for the specified list of topics and partitions to *Kafka*. If no offsets are specified, commits offsets returned on the last {@link #poll(long, TimeUnit) poll()} for the subscribed list of topics and partitions. 4. There is inconsistency in specifying partitions. Sometimes we use TopicPartition and some other times we use String and int (see examples below). void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions) public void *subscribe*(java.lang.String topic, int... partitions) Yes, this was discussed previously. I think generally the consensus seems to be to use the higher level classes everywhere. Made those changes. What's the use case of position()? Isn't that just the nextOffset() on the last message returned from poll()? Yes, except in the case where a rebalance is triggered and poll() is not yet invoked. Here, you would use position() to get the new fetch position for the specific partition. Even if this is not a common use case, IMO it is much easier to use position() to get the fetch offset than invoking nextOffset() on the last message. This also keeps the APIs symmetric, which is nice. On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert robert.with...@dish.com wrote: That's wonderful. Thanks for kafka. Rob On Feb 24, 2014, at 9:58 AM, Guozhang Wang wangg...@gmail.com mailto: wangg...@gmail.com wrote: Hi Robert, Yes, you can check out the callback functions in the new API onPartitionDesigned onPartitionAssigned and see if they meet your needs. Guozhang On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.comwrote: Jun, Are you saying it is possible to get events from the high-level consumer regarding various state machine changes? For instance, can we get a notification when a rebalance starts and ends, when a partition is assigned/unassigned, when an offset is committed on a partition, when a leader changes and so on? I call this OOB traffic, since they are not the core messages streaming, but side-band events, yet they are still potentially useful to consumers. Thank you, Robert Robert Withers Staff Analyst/Developer o: (720) 514-8963 c: (571) 262-1873 -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Sunday, February 23, 2014 4:19 PM To: users@kafka.apache.orgmailto:users@kafka.apache.org Subject: Re: New Consumer API discussion Robert, For the push orient api, you can potentially implement your own MessageHandler with those methods. In the main loop of our new consumer api, you can just call those methods based on the events you get. Also, we already have an api to get the first and the last offset of a partition (getOffsetBefore). Thanks, Jun On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.comwrote: This is a good idea, too. I would modify it to include stream marking, then you can have
Re: New Consumer API discussion
a bit harder. Suppose that you have a list of topics stored in ArrayListString topics; If you want subscribe to all topics in one call, you will have to do: String[] topicArray = new String[topics.size()]; consumer.subscribe(topics. toArray(topicArray)); A similar argument can be made for arguably the more common use case of subscribing to a single topic as well. In these cases, user is required to write more code to create a single item collection and pass it in. Since subscription is extremely lightweight invoking it multiple times also seems like a workable solution, no? 2. It would be good to document that the following apis are mutually exclusive. Also, if the partition level subscription is specified, there is no group management. Finally, unsubscribe() can only be used to cancel subscriptions with the same pattern. For example, you can't unsubscribe at the partition level if the subscription is done at the topic level. *subscribe*(java.lang.String... topics) *subscribe*(java.lang.String topic, int... partitions) Makes sense. Made the suggested improvements to the docs http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29 3.commit(): The following comment in the doc should probably say commit offsets for partitions assigned to this consumer. If no partitions are specified, commits offsets for the subscribed list of topics and partitions to Kafka. Could you give more context on this suggestion? Here is the entire doc - Synchronously commits the specified offsets for the specified list of topics and partitions to *Kafka*. If no partitions are specified, commits offsets for the subscribed list of topics and partitions. The hope is to convey that if no partitions are specified, offsets will be committed for the subscribed list of partitions. One improvement could be to explicitly state that the offsets returned on the last poll will be committed. I updated this to - Synchronously commits the specified offsets for the specified list of topics and partitions to *Kafka*. If no offsets are specified, commits offsets returned on the last {@link #poll(long, TimeUnit) poll()} for the subscribed list of topics and partitions. 4. There is inconsistency in specifying partitions. Sometimes we use TopicPartition and some other times we use String and int (see examples below). void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions) public void *subscribe*(java.lang.String topic, int... partitions) Yes, this was discussed previously. I think generally the consensus seems to be to use the higher level classes everywhere. Made those changes. What's the use case of position()? Isn't that just the nextOffset() on the last message returned from poll()? Yes, except in the case where a rebalance is triggered and poll() is not yet invoked. Here, you would use position() to get the new fetch position for the specific partition. Even if this is not a common use case, IMO it is much easier to use position() to get the fetch offset than invoking nextOffset() on the last message. This also keeps the APIs symmetric, which is nice. On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert robert.with...@dish.com wrote: That's wonderful. Thanks for kafka. Rob On Feb 24, 2014, at 9:58 AM, Guozhang Wang wangg...@gmail.com mailto: wangg...@gmail.com wrote: Hi Robert, Yes, you can check out the callback functions in the new API onPartitionDesigned onPartitionAssigned and see if they meet your needs. Guozhang On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.comwrote: Jun, Are you saying it is possible to get events from the high-level consumer regarding various state machine changes? For instance, can we get a notification when a rebalance starts and ends, when a partition is assigned/unassigned, when an offset is committed on a partition, when a leader changes and so on? I call this OOB traffic, since they are not the core messages streaming, but side-band events, yet they are still potentially useful to consumers. Thank you, Robert Robert Withers Staff Analyst/Developer o: (720) 514-8963 c: (571) 262-1873 -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Sunday, February 23, 2014 4:19 PM To: users@kafka.apache.orgmailto:users@kafka.apache.org Subject: Re: New Consumer API discussion Robert, For the push orient api, you can potentially implement your own MessageHandler with those methods. In the main loop of our new consumer api, you can just call those methods based on the events you get. Also, we already have an api to get the first and the last offset of a partition (getOffsetBefore). Thanks, Jun On Sat, Feb 22, 2014 at 11:29 AM
Re: New Consumer API discussion
, since they are not the core messages streaming, but side-band events, yet they are still potentially useful to consumers. Thank you, Robert Robert Withers Staff Analyst/Developer o: (720) 514-8963 c: (571) 262-1873 -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Sunday, February 23, 2014 4:19 PM To: users@kafka.apache.orgmailto:users@kafka.apache.org Subject: Re: New Consumer API discussion Robert, For the push orient api, you can potentially implement your own MessageHandler with those methods. In the main loop of our new consumer api, you can just call those methods based on the events you get. Also, we already have an api to get the first and the last offset of a partition (getOffsetBefore). Thanks, Jun On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.comwrote: This is a good idea, too. I would modify it to include stream marking, then you can have: long end = consumer.lastOffset(tp); consumer.setMark(end); while(consumer.beforeMark()) { process(consumer.pollToMark()); } or long end = consumer.lastOffset(tp); consumer.setMark(end); for(Object msg : consumer.iteratorToMark()) { process(msg); } I actually have 4 suggestions, then: * pull: stream marking * pull: finite streams, bound by time range (up-to-now, yesterday) or offset * pull: async api * push: KafkaMessageSource, for a push model, with msg and OOB events. Build one in either individual or chunk mode and have a listener for each msg or a listener for a chunk of msgs. Make it composable and policy driven (chunked, range, commitOffsets policy, retry policy, transactional) Thank you, Robert On Feb 22, 2014, at 11:21 AM, Jay Kreps jay.kr...@gmail.com mailto: jay.kr...@gmail.commailto: jay.kr...@gmail.commailto:jay.kr...@gmail.com wrote: I think what Robert is saying is that we need to think through the offset API to enable batch processing of topic data. Think of a process that periodically kicks off to compute a data summary or do a data load or something like that. I think what we need to support this is an api to fetch the last offset from the server for a partition. Something like long lastOffset(TopicPartition tp) and for symmetry long firstOffset(TopicPartition tp) Likely this would have to be batched. Essentially we should add this use case to our set of code examples to write and think through. The usage would be something like long end = consumer.lastOffset(tp); while(consumer.position end) process(consumer.poll()); -Jay On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.com mailto:robert.with...@dish.comwrote: Jun, I was originally thinking a non-blocking read from a distributed stream should distinguish between no local messages, but a fetch is occurring versus you have drained the stream. The reason this may be valuable to me is so I can write consumers that read all known traffic then terminate. You caused me to reconsider and I think I am conflating 2 things. One is a sync/async api while the other is whether to have an infinite or finite stream. Is it possible to build a finite KafkaStream on a range of messages? Perhaps a Simple Consumer would do just fine and then I could start off getting the writeOffset from zookeeper and tell it to read a specified range per partition. I've done this and forked a simple consumer runnable for each partition, for one of our analyzers. The great thing about the high-level consumer is that rebalance, so I can fork however many stream readers I want and you just figure it out for me. In that way you offer us the control over the resource consumption within a pull model. This is best to regulate message pressure, they say. Combining that high-level rebalance ability with a ranged partition drain could be really nice...build the stream with an ending position and it is a finite stream, but retain the high-level rebalance. With a finite stream, you would know the difference of the 2 async scenarios: fetch-in-progress versus end-of-stream. With an infinite stream, you never get end-of-stream. Aside from a high-level consumer over a finite range within each partition, the other feature I can think of is more complicated. A high-level consumer has state machine changes that the client cannot access, to my knowledge. Our use of kafka has us invoke a message handler with each message we consumer from the KafkaStream, so we convert a pull-model to a push-model. Including the idea of receiving notifications from state machine changes, what would be really nice is to have a KafkaMessageSource, that is an eventful push model. If it were
Re: New Consumer API discussion
starts and ends, when a partition is assigned/unassigned, when an offset is committed on a partition, when a leader changes and so on? I call this OOB traffic, since they are not the core messages streaming, but side-band events, yet they are still potentially useful to consumers. Thank you, Robert Robert Withers Staff Analyst/Developer o: (720) 514-8963 c: (571) 262-1873 -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Sunday, February 23, 2014 4:19 PM To: users@kafka.apache.orgmailto:users@kafka.apache.org Subject: Re: New Consumer API discussion Robert, For the push orient api, you can potentially implement your own MessageHandler with those methods. In the main loop of our new consumer api, you can just call those methods based on the events you get. Also, we already have an api to get the first and the last offset of a partition (getOffsetBefore). Thanks, Jun On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.comwrote: This is a good idea, too. I would modify it to include stream marking, then you can have: long end = consumer.lastOffset(tp); consumer.setMark(end); while(consumer.beforeMark()) { process(consumer.pollToMark()); } or long end = consumer.lastOffset(tp); consumer.setMark(end); for(Object msg : consumer.iteratorToMark()) { process(msg); } I actually have 4 suggestions, then: * pull: stream marking * pull: finite streams, bound by time range (up-to-now, yesterday) or offset * pull: async api * push: KafkaMessageSource, for a push model, with msg and OOB events. Build one in either individual or chunk mode and have a listener for each msg or a listener for a chunk of msgs. Make it composable and policy driven (chunked, range, commitOffsets policy, retry policy, transactional) Thank you, Robert On Feb 22, 2014, at 11:21 AM, Jay Kreps jay.kr...@gmail.com mailto: jay.kr...@gmail.commailto: jay.kr...@gmail.commailto:jay.kr...@gmail.com wrote: I think what Robert is saying is that we need to think through the offset API to enable batch processing of topic data. Think of a process that periodically kicks off to compute a data summary or do a data load or something like that. I think what we need to support this is an api to fetch the last offset from the server for a partition. Something like long lastOffset(TopicPartition tp) and for symmetry long firstOffset(TopicPartition tp) Likely this would have to be batched. Essentially we should add this use case to our set of code examples to write and think through. The usage would be something like long end = consumer.lastOffset(tp); while(consumer.position end) process(consumer.poll()); -Jay On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.com mailto:robert.with...@dish.comwrote: Jun, I was originally thinking a non-blocking read from a distributed stream should distinguish between no local messages, but a fetch is occurring versus you have drained the stream. The reason this may be valuable to me is so I can write consumers that read all known traffic then terminate. You caused me to reconsider and I think I am conflating 2 things. One is a sync/async api while the other is whether to have an infinite or finite stream. Is it possible to build a finite KafkaStream on a range of messages? Perhaps a Simple Consumer would do just fine and then I could start off getting the writeOffset from zookeeper and tell it to read a specified range per partition. I've done this and forked a simple consumer runnable for each partition, for one of our analyzers. The great thing about the high-level consumer is that rebalance, so I can fork however many stream readers I want and you just figure it out for me. In that way you offer us the control over the resource consumption within a pull model. This is best to regulate message pressure, they say. Combining that high-level rebalance ability with a ranged partition drain could be really nice...build the stream with an ending position and it is a finite stream, but retain the high-level rebalance. With a finite stream, you would know the difference of the 2 async scenarios: fetch-in-progress versus end-of-stream. With an infinite stream, you never get end-of-stream. Aside from a high-level consumer over a finite range within each partition, the other feature I can think of is more complicated. A high-level consumer has state machine changes that the client cannot access, to my knowledge. Our use of kafka has us invoke a message handler with each message we consumer from the KafkaStream, so we convert a pull-model to a push-model. Including the idea of receiving notifications from state machine changes, what would be really nice is to have a KafkaMessageSource
Re: New Consumer API discussion
only be used to cancel subscriptions with the same pattern. For example, you can't unsubscribe at the partition level if the subscription is done at the topic level. *subscribe*(java.lang.String... topics) *subscribe*(java.lang.String topic, int... partitions) Makes sense. Made the suggested improvements to the docs http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29 3.commit(): The following comment in the doc should probably say commit offsets for partitions assigned to this consumer. If no partitions are specified, commits offsets for the subscribed list of topics and partitions to Kafka. Could you give more context on this suggestion? Here is the entire doc - Synchronously commits the specified offsets for the specified list of topics and partitions to *Kafka*. If no partitions are specified, commits offsets for the subscribed list of topics and partitions. The hope is to convey that if no partitions are specified, offsets will be committed for the subscribed list of partitions. One improvement could be to explicitly state that the offsets returned on the last poll will be committed. I updated this to - Synchronously commits the specified offsets for the specified list of topics and partitions to *Kafka*. If no offsets are specified, commits offsets returned on the last {@link #poll(long, TimeUnit) poll()} for the subscribed list of topics and partitions. 4. There is inconsistency in specifying partitions. Sometimes we use TopicPartition and some other times we use String and int (see examples below). void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions) public void *subscribe*(java.lang.String topic, int... partitions) Yes, this was discussed previously. I think generally the consensus seems to be to use the higher level classes everywhere. Made those changes. What's the use case of position()? Isn't that just the nextOffset() on the last message returned from poll()? Yes, except in the case where a rebalance is triggered and poll() is not yet invoked. Here, you would use position() to get the new fetch position for the specific partition. Even if this is not a common use case, IMO it is much easier to use position() to get the fetch offset than invoking nextOffset() on the last message. This also keeps the APIs symmetric, which is nice. On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert robert.with...@dish.com wrote: That's wonderful. Thanks for kafka. Rob On Feb 24, 2014, at 9:58 AM, Guozhang Wang wangg...@gmail.com mailto: wangg...@gmail.com wrote: Hi Robert, Yes, you can check out the callback functions in the new API onPartitionDesigned onPartitionAssigned and see if they meet your needs. Guozhang On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.comwrote: Jun, Are you saying it is possible to get events from the high-level consumer regarding various state machine changes? For instance, can we get a notification when a rebalance starts and ends, when a partition is assigned/unassigned, when an offset is committed on a partition, when a leader changes and so on? I call this OOB traffic, since they are not the core messages streaming, but side-band events, yet they are still potentially useful to consumers. Thank you, Robert Robert Withers Staff Analyst/Developer o: (720) 514-8963 c: (571) 262-1873 -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Sunday, February 23, 2014 4:19 PM To: users@kafka.apache.orgmailto:users@kafka.apache.org Subject: Re: New Consumer API discussion Robert, For the push orient api, you can potentially implement your own MessageHandler with those methods. In the main loop of our new consumer api, you can just call those methods based on the events you get. Also, we already have an api to get the first and the last offset of a partition (getOffsetBefore). Thanks, Jun On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.comwrote: This is a good idea, too. I would modify it to include stream marking, then you can have: long end = consumer.lastOffset(tp); consumer.setMark(end); while(consumer.beforeMark()) { process(consumer.pollToMark()); } or long end = consumer.lastOffset(tp); consumer.setMark(end); for(Object msg : consumer.iteratorToMark()) { process(msg); } I actually have 4 suggestions, then: * pull: stream marking * pull: finite streams, bound by time range (up-to-now, yesterday) or offset * pull: async api * push: KafkaMessageSource, for a push model, with msg and OOB events. Build one in either individual or chunk mode and have a listener for each msg or a listener for a chunk of msgs. Make
Re: New Consumer API discussion
Thanks for the review, Jun. Here are some comments - 1. The using of ellipsis: This may make passing a list of items from a collection to the api a bit harder. Suppose that you have a list of topics stored in ArrayListString topics; If you want subscribe to all topics in one call, you will have to do: String[] topicArray = new String[topics.size()]; consumer.subscribe(topics. toArray(topicArray)); A similar argument can be made for arguably the more common use case of subscribing to a single topic as well. In these cases, user is required to write more code to create a single item collection and pass it in. Since subscription is extremely lightweight invoking it multiple times also seems like a workable solution, no? 2. It would be good to document that the following apis are mutually exclusive. Also, if the partition level subscription is specified, there is no group management. Finally, unsubscribe() can only be used to cancel subscriptions with the same pattern. For example, you can't unsubscribe at the partition level if the subscription is done at the topic level. *subscribe*(java.lang.String... topics) *subscribe*(java.lang.String topic, int... partitions) Makes sense. Made the suggested improvements to the docshttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29 3.commit(): The following comment in the doc should probably say commit offsets for partitions assigned to this consumer. If no partitions are specified, commits offsets for the subscribed list of topics and partitions to Kafka. Could you give more context on this suggestion? Here is the entire doc - Synchronously commits the specified offsets for the specified list of topics and partitions to *Kafka*. If no partitions are specified, commits offsets for the subscribed list of topics and partitions. The hope is to convey that if no partitions are specified, offsets will be committed for the subscribed list of partitions. One improvement could be to explicitly state that the offsets returned on the last poll will be committed. I updated this to - Synchronously commits the specified offsets for the specified list of topics and partitions to *Kafka*. If no offsets are specified, commits offsets returned on the last {@link #poll(long, TimeUnit) poll()} for the subscribed list of topics and partitions. 4. There is inconsistency in specifying partitions. Sometimes we use TopicPartition and some other times we use String and int (see examples below). void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions) public void *subscribe*(java.lang.String topic, int... partitions) Yes, this was discussed previously. I think generally the consensus seems to be to use the higher level classes everywhere. Made those changes. What's the use case of position()? Isn't that just the nextOffset() on the last message returned from poll()? Yes, except in the case where a rebalance is triggered and poll() is not yet invoked. Here, you would use position() to get the new fetch position for the specific partition. Even if this is not a common use case, IMO it is much easier to use position() to get the fetch offset than invoking nextOffset() on the last message. This also keeps the APIs symmetric, which is nice. On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert robert.with...@dish.comwrote: That's wonderful. Thanks for kafka. Rob On Feb 24, 2014, at 9:58 AM, Guozhang Wang wangg...@gmail.commailto: wangg...@gmail.com wrote: Hi Robert, Yes, you can check out the callback functions in the new API onPartitionDesigned onPartitionAssigned and see if they meet your needs. Guozhang On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert robert.with...@dish.com mailto:robert.with...@dish.comwrote: Jun, Are you saying it is possible to get events from the high-level consumer regarding various state machine changes? For instance, can we get a notification when a rebalance starts and ends, when a partition is assigned/unassigned, when an offset is committed on a partition, when a leader changes and so on? I call this OOB traffic, since they are not the core messages streaming, but side-band events, yet they are still potentially useful to consumers. Thank you, Robert Robert Withers Staff Analyst/Developer o: (720) 514-8963 c: (571) 262-1873 -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Sunday, February 23, 2014 4:19 PM To: users@kafka.apache.orgmailto:users@kafka.apache.org Subject: Re: New Consumer API discussion Robert, For the push orient api, you can potentially implement your own MessageHandler with those methods. In the main loop of our new consumer api, you can just call those methods based on the events you get. Also, we already have an api to get the first and the last offset of a partition (getOffsetBefore). Thanks, Jun On Sat, Feb 22, 2014 at 11:29 AM
Re: New Consumer API discussion
saying it is possible to get events from the high-level consumer regarding various state machine changes? For instance, can we get a notification when a rebalance starts and ends, when a partition is assigned/unassigned, when an offset is committed on a partition, when a leader changes and so on? I call this OOB traffic, since they are not the core messages streaming, but side-band events, yet they are still potentially useful to consumers. Thank you, Robert Robert Withers Staff Analyst/Developer o: (720) 514-8963 c: (571) 262-1873 -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Sunday, February 23, 2014 4:19 PM To: users@kafka.apache.orgmailto:users@kafka.apache.org Subject: Re: New Consumer API discussion Robert, For the push orient api, you can potentially implement your own MessageHandler with those methods. In the main loop of our new consumer api, you can just call those methods based on the events you get. Also, we already have an api to get the first and the last offset of a partition (getOffsetBefore). Thanks, Jun On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.comwrote: This is a good idea, too. I would modify it to include stream marking, then you can have: long end = consumer.lastOffset(tp); consumer.setMark(end); while(consumer.beforeMark()) { process(consumer.pollToMark()); } or long end = consumer.lastOffset(tp); consumer.setMark(end); for(Object msg : consumer.iteratorToMark()) { process(msg); } I actually have 4 suggestions, then: * pull: stream marking * pull: finite streams, bound by time range (up-to-now, yesterday) or offset * pull: async api * push: KafkaMessageSource, for a push model, with msg and OOB events. Build one in either individual or chunk mode and have a listener for each msg or a listener for a chunk of msgs. Make it composable and policy driven (chunked, range, commitOffsets policy, retry policy, transactional) Thank you, Robert On Feb 22, 2014, at 11:21 AM, Jay Kreps jay.kr...@gmail.commailto: jay.kr...@gmail.commailto: jay.kr...@gmail.commailto:jay.kr...@gmail.com wrote: I think what Robert is saying is that we need to think through the offset API to enable batch processing of topic data. Think of a process that periodically kicks off to compute a data summary or do a data load or something like that. I think what we need to support this is an api to fetch the last offset from the server for a partition. Something like long lastOffset(TopicPartition tp) and for symmetry long firstOffset(TopicPartition tp) Likely this would have to be batched. Essentially we should add this use case to our set of code examples to write and think through. The usage would be something like long end = consumer.lastOffset(tp); while(consumer.position end) process(consumer.poll()); -Jay On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.com mailto:robert.with...@dish.comwrote: Jun, I was originally thinking a non-blocking read from a distributed stream should distinguish between no local messages, but a fetch is occurring versus you have drained the stream. The reason this may be valuable to me is so I can write consumers that read all known traffic then terminate. You caused me to reconsider and I think I am conflating 2 things. One is a sync/async api while the other is whether to have an infinite or finite stream. Is it possible to build a finite KafkaStream on a range of messages? Perhaps a Simple Consumer would do just fine and then I could start off getting the writeOffset from zookeeper and tell it to read a specified range per partition. I've done this and forked a simple consumer runnable for each partition, for one of our analyzers. The great thing about the high-level consumer is that rebalance, so I can fork however many stream readers I want and you just figure it out for me. In that way you offer us the control over the resource consumption within a pull model. This is best to regulate message pressure, they say. Combining that high-level rebalance ability with a ranged partition drain could be really nice...build the stream with an ending position and it is a finite stream, but retain the high-level rebalance. With a finite stream, you would know the difference of the 2 async scenarios: fetch-in-progress versus end-of-stream. With an infinite stream, you never get end-of-stream. Aside from a high-level consumer over a finite range within each partition, the other feature I can think of is more complicated. A high-level consumer has state machine changes that the client cannot access, to my knowledge. Our use of kafka has us invoke a message handler with each message we consumer from the KafkaStream, so we convert a pull-model to a push-model. Including the idea of receiving
Re: New Consumer API discussion
() on the last message returned from poll()? Yes, except in the case where a rebalance is triggered and poll() is not yet invoked. Here, you would use position() to get the new fetch position for the specific partition. Even if this is not a common use case, IMO it is much easier to use position() to get the fetch offset than invoking nextOffset() on the last message. This also keeps the APIs symmetric, which is nice. On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert robert.with...@dish.com wrote: That's wonderful. Thanks for kafka. Rob On Feb 24, 2014, at 9:58 AM, Guozhang Wang wangg...@gmail.commailto: wangg...@gmail.com wrote: Hi Robert, Yes, you can check out the callback functions in the new API onPartitionDesigned onPartitionAssigned and see if they meet your needs. Guozhang On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.comwrote: Jun, Are you saying it is possible to get events from the high-level consumer regarding various state machine changes? For instance, can we get a notification when a rebalance starts and ends, when a partition is assigned/unassigned, when an offset is committed on a partition, when a leader changes and so on? I call this OOB traffic, since they are not the core messages streaming, but side-band events, yet they are still potentially useful to consumers. Thank you, Robert Robert Withers Staff Analyst/Developer o: (720) 514-8963 c: (571) 262-1873 -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Sunday, February 23, 2014 4:19 PM To: users@kafka.apache.orgmailto:users@kafka.apache.org Subject: Re: New Consumer API discussion Robert, For the push orient api, you can potentially implement your own MessageHandler with those methods. In the main loop of our new consumer api, you can just call those methods based on the events you get. Also, we already have an api to get the first and the last offset of a partition (getOffsetBefore). Thanks, Jun On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.comwrote: This is a good idea, too. I would modify it to include stream marking, then you can have: long end = consumer.lastOffset(tp); consumer.setMark(end); while(consumer.beforeMark()) { process(consumer.pollToMark()); } or long end = consumer.lastOffset(tp); consumer.setMark(end); for(Object msg : consumer.iteratorToMark()) { process(msg); } I actually have 4 suggestions, then: * pull: stream marking * pull: finite streams, bound by time range (up-to-now, yesterday) or offset * pull: async api * push: KafkaMessageSource, for a push model, with msg and OOB events. Build one in either individual or chunk mode and have a listener for each msg or a listener for a chunk of msgs. Make it composable and policy driven (chunked, range, commitOffsets policy, retry policy, transactional) Thank you, Robert On Feb 22, 2014, at 11:21 AM, Jay Kreps jay.kr...@gmail.commailto: jay.kr...@gmail.commailto: jay.kr...@gmail.commailto:jay.kr...@gmail.com wrote: I think what Robert is saying is that we need to think through the offset API to enable batch processing of topic data. Think of a process that periodically kicks off to compute a data summary or do a data load or something like that. I think what we need to support this is an api to fetch the last offset from the server for a partition. Something like long lastOffset(TopicPartition tp) and for symmetry long firstOffset(TopicPartition tp) Likely this would have to be batched. Essentially we should add this use case to our set of code examples to write and think through. The usage would be something like long end = consumer.lastOffset(tp); while(consumer.position end) process(consumer.poll()); -Jay On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.com mailto:robert.with...@dish.comwrote: Jun, I was originally thinking a non-blocking read from a distributed stream should distinguish between no local messages, but a fetch is occurring versus you have drained the stream. The reason this may be valuable to me is so I can write consumers that read all known traffic then terminate. You caused me to reconsider and I think I am conflating 2 things. One is a sync/async api while the other is whether to have an infinite or finite stream. Is it possible to build a finite KafkaStream on a range of messages? Perhaps a Simple Consumer would do just fine and then I could start off getting the writeOffset from zookeeper and tell it to read a specified range per partition. I've done this and forked a simple consumer runnable for each partition, for one of our analyzers. The great thing about the high-level consumer is that rebalance, so I can fork however many stream readers I want and you just figure it out for me
Re: New Consumer API discussion
for the specified list of topics and partitions to *Kafka*. If no offsets are specified, commits offsets returned on the last {@link #poll(long, TimeUnit) poll()} for the subscribed list of topics and partitions. 4. There is inconsistency in specifying partitions. Sometimes we use TopicPartition and some other times we use String and int (see examples below). void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions) public void *subscribe*(java.lang.String topic, int... partitions) Yes, this was discussed previously. I think generally the consensus seems to be to use the higher level classes everywhere. Made those changes. What's the use case of position()? Isn't that just the nextOffset() on the last message returned from poll()? Yes, except in the case where a rebalance is triggered and poll() is not yet invoked. Here, you would use position() to get the new fetch position for the specific partition. Even if this is not a common use case, IMO it is much easier to use position() to get the fetch offset than invoking nextOffset() on the last message. This also keeps the APIs symmetric, which is nice. On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert robert.with...@dish.com wrote: That's wonderful. Thanks for kafka. Rob On Feb 24, 2014, at 9:58 AM, Guozhang Wang wangg...@gmail.commailto: wangg...@gmail.com wrote: Hi Robert, Yes, you can check out the callback functions in the new API onPartitionDesigned onPartitionAssigned and see if they meet your needs. Guozhang On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.comwrote: Jun, Are you saying it is possible to get events from the high-level consumer regarding various state machine changes? For instance, can we get a notification when a rebalance starts and ends, when a partition is assigned/unassigned, when an offset is committed on a partition, when a leader changes and so on? I call this OOB traffic, since they are not the core messages streaming, but side-band events, yet they are still potentially useful to consumers. Thank you, Robert Robert Withers Staff Analyst/Developer o: (720) 514-8963 c: (571) 262-1873 -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Sunday, February 23, 2014 4:19 PM To: users@kafka.apache.orgmailto:users@kafka.apache.org Subject: Re: New Consumer API discussion Robert, For the push orient api, you can potentially implement your own MessageHandler with those methods. In the main loop of our new consumer api, you can just call those methods based on the events you get. Also, we already have an api to get the first and the last offset of a partition (getOffsetBefore). Thanks, Jun On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.comwrote: This is a good idea, too. I would modify it to include stream marking, then you can have: long end = consumer.lastOffset(tp); consumer.setMark(end); while(consumer.beforeMark()) { process(consumer.pollToMark()); } or long end = consumer.lastOffset(tp); consumer.setMark(end); for(Object msg : consumer.iteratorToMark()) { process(msg); } I actually have 4 suggestions, then: * pull: stream marking * pull: finite streams, bound by time range (up-to-now, yesterday) or offset * pull: async api * push: KafkaMessageSource, for a push model, with msg and OOB events. Build one in either individual or chunk mode and have a listener for each msg or a listener for a chunk of msgs. Make it composable and policy driven (chunked, range, commitOffsets policy, retry policy, transactional) Thank you, Robert On Feb 22, 2014, at 11:21 AM, Jay Kreps jay.kr...@gmail.commailto: jay.kr...@gmail.commailto: jay.kr...@gmail.commailto:jay.kr...@gmail.com wrote: I think what Robert is saying is that we need to think through the offset API to enable batch processing of topic data. Think of a process that periodically kicks off to compute a data summary or do a data load or something like that. I think what we need to support this is an api to fetch the last offset from the server for a partition. Something like long lastOffset(TopicPartition tp) and for symmetry long firstOffset(TopicPartition tp) Likely this would have to be batched. Essentially we should add this use case to our set of code examples to write and think through. The usage would be something like long end = consumer.lastOffset(tp); while(consumer.position end) process(consumer.poll()); -Jay On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.com mailto:robert.with...@dish.comwrote: Jun, I was originally thinking a non-blocking read from a distributed stream should distinguish between no local messages, but a fetch is occurring versus you have drained the stream. The reason this may be valuable to me
Re: New Consumer API discussion
Subject: Re: New Consumer API discussion Robert, For the push orient api, you can potentially implement your own MessageHandler with those methods. In the main loop of our new consumer api, you can just call those methods based on the events you get. Also, we already have an api to get the first and the last offset of a partition (getOffsetBefore). Thanks, Jun On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.comwrote: This is a good idea, too. I would modify it to include stream marking, then you can have: long end = consumer.lastOffset(tp); consumer.setMark(end); while(consumer.beforeMark()) { process(consumer.pollToMark()); } or long end = consumer.lastOffset(tp); consumer.setMark(end); for(Object msg : consumer.iteratorToMark()) { process(msg); } I actually have 4 suggestions, then: * pull: stream marking * pull: finite streams, bound by time range (up-to-now, yesterday) or offset * pull: async api * push: KafkaMessageSource, for a push model, with msg and OOB events. Build one in either individual or chunk mode and have a listener for each msg or a listener for a chunk of msgs. Make it composable and policy driven (chunked, range, commitOffsets policy, retry policy, transactional) Thank you, Robert On Feb 22, 2014, at 11:21 AM, Jay Kreps jay.kr...@gmail.commailto: jay.kr...@gmail.commailto: jay.kr...@gmail.commailto:jay.kr...@gmail.com wrote: I think what Robert is saying is that we need to think through the offset API to enable batch processing of topic data. Think of a process that periodically kicks off to compute a data summary or do a data load or something like that. I think what we need to support this is an api to fetch the last offset from the server for a partition. Something like long lastOffset(TopicPartition tp) and for symmetry long firstOffset(TopicPartition tp) Likely this would have to be batched. Essentially we should add this use case to our set of code examples to write and think through. The usage would be something like long end = consumer.lastOffset(tp); while(consumer.position end) process(consumer.poll()); -Jay On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.com mailto:robert.with...@dish.comwrote: Jun, I was originally thinking a non-blocking read from a distributed stream should distinguish between no local messages, but a fetch is occurring versus you have drained the stream. The reason this may be valuable to me is so I can write consumers that read all known traffic then terminate. You caused me to reconsider and I think I am conflating 2 things. One is a sync/async api while the other is whether to have an infinite or finite stream. Is it possible to build a finite KafkaStream on a range of messages? Perhaps a Simple Consumer would do just fine and then I could start off getting the writeOffset from zookeeper and tell it to read a specified range per partition. I've done this and forked a simple consumer runnable for each partition, for one of our analyzers. The great thing about the high-level consumer is that rebalance, so I can fork however many stream readers I want and you just figure it out for me. In that way you offer us the control over the resource consumption within a pull model. This is best to regulate message pressure, they say. Combining that high-level rebalance ability with a ranged partition drain could be really nice...build the stream with an ending position and it is a finite stream, but retain the high-level rebalance. With a finite stream, you would know the difference of the 2 async scenarios: fetch-in-progress versus end-of-stream. With an infinite stream, you never get end-of-stream. Aside from a high-level consumer over a finite range within each partition, the other feature I can think of is more complicated. A high-level consumer has state machine changes that the client cannot access, to my knowledge. Our use of kafka has us invoke a message handler with each message we consumer from the KafkaStream, so we convert a pull-model to a push-model. Including the idea of receiving notifications from state machine changes, what would be really nice is to have a KafkaMessageSource, that is an eventful push model. If it were thread-safe, then we could register listeners for various events: * opening-stream * closing-stream * message-arrived * end-of-stream/no-more-messages-in-partition (for finite streams) * rebalance started * partition assigned * partition unassigned * rebalance finished * partition-offset-committed Perhaps that is just our use, but instead of a pull-oriented KafkaStream, is there any sense in your providing a push-oriented
Re: New Consumer API discussion
On Feb 24, 2014, at 9:58 AM, Guozhang Wang wangg...@gmail.commailto: wangg...@gmail.com wrote: Hi Robert, Yes, you can check out the callback functions in the new API onPartitionDesigned onPartitionAssigned and see if they meet your needs. Guozhang On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert robert.with...@dish.com mailto:robert.with...@dish.comwrote: Jun, Are you saying it is possible to get events from the high-level consumer regarding various state machine changes? For instance, can we get a notification when a rebalance starts and ends, when a partition is assigned/unassigned, when an offset is committed on a partition, when a leader changes and so on? I call this OOB traffic, since they are not the core messages streaming, but side-band events, yet they are still potentially useful to consumers. Thank you, Robert Robert Withers Staff Analyst/Developer o: (720) 514-8963 c: (571) 262-1873 -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Sunday, February 23, 2014 4:19 PM To: users@kafka.apache.orgmailto:users@kafka.apache.org Subject: Re: New Consumer API discussion Robert, For the push orient api, you can potentially implement your own MessageHandler with those methods. In the main loop of our new consumer api, you can just call those methods based on the events you get. Also, we already have an api to get the first and the last offset of a partition (getOffsetBefore). Thanks, Jun On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.comwrote: This is a good idea, too. I would modify it to include stream marking, then you can have: long end = consumer.lastOffset(tp); consumer.setMark(end); while(consumer.beforeMark()) { process(consumer.pollToMark()); } or long end = consumer.lastOffset(tp); consumer.setMark(end); for(Object msg : consumer.iteratorToMark()) { process(msg); } I actually have 4 suggestions, then: * pull: stream marking * pull: finite streams, bound by time range (up-to-now, yesterday) or offset * pull: async api * push: KafkaMessageSource, for a push model, with msg and OOB events. Build one in either individual or chunk mode and have a listener for each msg or a listener for a chunk of msgs. Make it composable and policy driven (chunked, range, commitOffsets policy, retry policy, transactional) Thank you, Robert On Feb 22, 2014, at 11:21 AM, Jay Kreps jay.kr...@gmail.commailto: jay.kr...@gmail.commailto: jay.kr...@gmail.commailto:jay.kr...@gmail.com wrote: I think what Robert is saying is that we need to think through the offset API to enable batch processing of topic data. Think of a process that periodically kicks off to compute a data summary or do a data load or something like that. I think what we need to support this is an api to fetch the last offset from the server for a partition. Something like long lastOffset(TopicPartition tp) and for symmetry long firstOffset(TopicPartition tp) Likely this would have to be batched. Essentially we should add this use case to our set of code examples to write and think through. The usage would be something like long end = consumer.lastOffset(tp); while(consumer.position end) process(consumer.poll()); -Jay On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.com mailto:robert.with...@dish.comwrote: Jun, I was originally thinking a non-blocking read from a distributed stream should distinguish between no local messages, but a fetch is occurring versus you have drained the stream. The reason this may be valuable to me is so I can write consumers that read all known traffic then terminate. You caused me to reconsider and I think I am conflating 2 things. One is a sync/async api while the other is whether to have an infinite or finite stream. Is it possible to build a finite KafkaStream on a range of messages? Perhaps a Simple Consumer would do just fine and then I could start off getting the writeOffset from zookeeper and tell it to read a specified range per partition. I've done this and forked a simple consumer runnable for each partition, for one of our analyzers. The great thing about the high-level consumer is that rebalance, so I can fork however many stream readers I want and you just figure it out for me. In that way you offer us the control over the resource consumption within a pull model. This is best to regulate message pressure, they say. Combining that high-level rebalance ability with a ranged partition drain could be really nice...build the stream with an ending position and it is a finite stream, but retain the high-level rebalance. With a finite stream, you would know the difference
Re: New Consumer API discussion
. The hope is to convey that if no partitions are specified, offsets will be committed for the subscribed list of partitions. One improvement could be to explicitly state that the offsets returned on the last poll will be committed. I updated this to - Synchronously commits the specified offsets for the specified list of topics and partitions to *Kafka*. If no offsets are specified, commits offsets returned on the last {@link #poll(long, TimeUnit) poll()} for the subscribed list of topics and partitions. 4. There is inconsistency in specifying partitions. Sometimes we use TopicPartition and some other times we use String and int (see examples below). void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions) public void *subscribe*(java.lang.String topic, int... partitions) Yes, this was discussed previously. I think generally the consensus seems to be to use the higher level classes everywhere. Made those changes. What's the use case of position()? Isn't that just the nextOffset() on the last message returned from poll()? Yes, except in the case where a rebalance is triggered and poll() is not yet invoked. Here, you would use position() to get the new fetch position for the specific partition. Even if this is not a common use case, IMO it is much easier to use position() to get the fetch offset than invoking nextOffset() on the last message. This also keeps the APIs symmetric, which is nice. On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert robert.with...@dish.com wrote: That's wonderful. Thanks for kafka. Rob On Feb 24, 2014, at 9:58 AM, Guozhang Wang wangg...@gmail.com mailto: wangg...@gmail.com wrote: Hi Robert, Yes, you can check out the callback functions in the new API onPartitionDesigned onPartitionAssigned and see if they meet your needs. Guozhang On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.comwrote: Jun, Are you saying it is possible to get events from the high-level consumer regarding various state machine changes? For instance, can we get a notification when a rebalance starts and ends, when a partition is assigned/unassigned, when an offset is committed on a partition, when a leader changes and so on? I call this OOB traffic, since they are not the core messages streaming, but side-band events, yet they are still potentially useful to consumers. Thank you, Robert Robert Withers Staff Analyst/Developer o: (720) 514-8963 c: (571) 262-1873 -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Sunday, February 23, 2014 4:19 PM To: users@kafka.apache.orgmailto:users@kafka.apache.org Subject: Re: New Consumer API discussion Robert, For the push orient api, you can potentially implement your own MessageHandler with those methods. In the main loop of our new consumer api, you can just call those methods based on the events you get. Also, we already have an api to get the first and the last offset of a partition (getOffsetBefore). Thanks, Jun On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.comwrote: This is a good idea, too. I would modify it to include stream marking, then you can have: long end = consumer.lastOffset(tp); consumer.setMark(end); while(consumer.beforeMark()) { process(consumer.pollToMark()); } or long end = consumer.lastOffset(tp); consumer.setMark(end); for(Object msg : consumer.iteratorToMark()) { process(msg); } I actually have 4 suggestions, then: * pull: stream marking * pull: finite streams, bound by time range (up-to-now, yesterday) or offset * pull: async api * push: KafkaMessageSource, for a push model, with msg and OOB events. Build one in either individual or chunk mode and have a listener for each msg or a listener for a chunk of msgs. Make it composable and policy driven (chunked, range, commitOffsets policy, retry policy, transactional) Thank you, Robert On Feb 22, 2014, at 11:21 AM, Jay Kreps jay.kr...@gmail.commailto: jay.kr...@gmail.commailto: jay.kr...@gmail.commailto:jay.kr...@gmail.com wrote: I think what Robert is saying is that we need to think through the offset API to enable batch processing of topic data. Think of a process that periodically kicks off to compute a data summary or do a data load or something like that. I think what we need to support this is an api to fetch the last offset from the server for a partition. Something like long lastOffset(TopicPartition tp) and for symmetry long firstOffset(TopicPartition tp) Likely this would have to be batched. Essentially we should add this use case to our set of code examples to write and think through. The usage would be something like
Re: New Consumer API discussion
with the same pattern. For example, you can't unsubscribe at the partition level if the subscription is done at the topic level. *subscribe*(java.lang.String... topics) *subscribe*(java.lang.String topic, int... partitions) Makes sense. Made the suggested improvements to the docs http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29 3.commit(): The following comment in the doc should probably say commit offsets for partitions assigned to this consumer. If no partitions are specified, commits offsets for the subscribed list of topics and partitions to Kafka. Could you give more context on this suggestion? Here is the entire doc - Synchronously commits the specified offsets for the specified list of topics and partitions to *Kafka*. If no partitions are specified, commits offsets for the subscribed list of topics and partitions. The hope is to convey that if no partitions are specified, offsets will be committed for the subscribed list of partitions. One improvement could be to explicitly state that the offsets returned on the last poll will be committed. I updated this to - Synchronously commits the specified offsets for the specified list of topics and partitions to *Kafka*. If no offsets are specified, commits offsets returned on the last {@link #poll(long, TimeUnit) poll()} for the subscribed list of topics and partitions. 4. There is inconsistency in specifying partitions. Sometimes we use TopicPartition and some other times we use String and int (see examples below). void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions) public void *subscribe*(java.lang.String topic, int... partitions) Yes, this was discussed previously. I think generally the consensus seems to be to use the higher level classes everywhere. Made those changes. What's the use case of position()? Isn't that just the nextOffset() on the last message returned from poll()? Yes, except in the case where a rebalance is triggered and poll() is not yet invoked. Here, you would use position() to get the new fetch position for the specific partition. Even if this is not a common use case, IMO it is much easier to use position() to get the fetch offset than invoking nextOffset() on the last message. This also keeps the APIs symmetric, which is nice. On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert robert.with...@dish.com wrote: That's wonderful. Thanks for kafka. Rob On Feb 24, 2014, at 9:58 AM, Guozhang Wang wangg...@gmail.com mailto: wangg...@gmail.com wrote: Hi Robert, Yes, you can check out the callback functions in the new API onPartitionDesigned onPartitionAssigned and see if they meet your needs. Guozhang On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.comwrote: Jun, Are you saying it is possible to get events from the high-level consumer regarding various state machine changes? For instance, can we get a notification when a rebalance starts and ends, when a partition is assigned/unassigned, when an offset is committed on a partition, when a leader changes and so on? I call this OOB traffic, since they are not the core messages streaming, but side-band events, yet they are still potentially useful to consumers. Thank you, Robert Robert Withers Staff Analyst/Developer o: (720) 514-8963 c: (571) 262-1873 -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Sunday, February 23, 2014 4:19 PM To: users@kafka.apache.orgmailto:users@kafka.apache.org Subject: Re: New Consumer API discussion Robert, For the push orient api, you can potentially implement your own MessageHandler with those methods. In the main loop of our new consumer api, you can just call those methods based on the events you get. Also, we already have an api to get the first and the last offset of a partition (getOffsetBefore). Thanks, Jun On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.comwrote: This is a good idea, too. I would modify it to include stream marking, then you can have: long end = consumer.lastOffset(tp); consumer.setMark(end); while(consumer.beforeMark()) { process(consumer.pollToMark()); } or long end = consumer.lastOffset(tp); consumer.setMark(end); for(Object msg : consumer.iteratorToMark()) { process(msg); } I actually have 4 suggestions, then: * pull: stream marking * pull: finite streams, bound by time range (up-to-now, yesterday
RE: New Consumer API discussion
Jun, Are you saying it is possible to get events from the high-level consumer regarding various state machine changes? For instance, can we get a notification when a rebalance starts and ends, when a partition is assigned/unassigned, when an offset is committed on a partition, when a leader changes and so on? I call this OOB traffic, since they are not the core messages streaming, but side-band events, yet they are still potentially useful to consumers. Thank you, Robert Robert Withers Staff Analyst/Developer o: (720) 514-8963 c: (571) 262-1873 -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Sunday, February 23, 2014 4:19 PM To: users@kafka.apache.org Subject: Re: New Consumer API discussion Robert, For the push orient api, you can potentially implement your own MessageHandler with those methods. In the main loop of our new consumer api, you can just call those methods based on the events you get. Also, we already have an api to get the first and the last offset of a partition (getOffsetBefore). Thanks, Jun On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert robert.with...@dish.comwrote: This is a good idea, too. I would modify it to include stream marking, then you can have: long end = consumer.lastOffset(tp); consumer.setMark(end); while(consumer.beforeMark()) { process(consumer.pollToMark()); } or long end = consumer.lastOffset(tp); consumer.setMark(end); for(Object msg : consumer.iteratorToMark()) { process(msg); } I actually have 4 suggestions, then: * pull: stream marking * pull: finite streams, bound by time range (up-to-now, yesterday) or offset * pull: async api * push: KafkaMessageSource, for a push model, with msg and OOB events. Build one in either individual or chunk mode and have a listener for each msg or a listener for a chunk of msgs. Make it composable and policy driven (chunked, range, commitOffsets policy, retry policy, transactional) Thank you, Robert On Feb 22, 2014, at 11:21 AM, Jay Kreps jay.kr...@gmail.commailto: jay.kr...@gmail.com wrote: I think what Robert is saying is that we need to think through the offset API to enable batch processing of topic data. Think of a process that periodically kicks off to compute a data summary or do a data load or something like that. I think what we need to support this is an api to fetch the last offset from the server for a partition. Something like long lastOffset(TopicPartition tp) and for symmetry long firstOffset(TopicPartition tp) Likely this would have to be batched. Essentially we should add this use case to our set of code examples to write and think through. The usage would be something like long end = consumer.lastOffset(tp); while(consumer.position end) process(consumer.poll()); -Jay On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert robert.with...@dish.com mailto:robert.with...@dish.comwrote: Jun, I was originally thinking a non-blocking read from a distributed stream should distinguish between no local messages, but a fetch is occurring versus you have drained the stream. The reason this may be valuable to me is so I can write consumers that read all known traffic then terminate. You caused me to reconsider and I think I am conflating 2 things. One is a sync/async api while the other is whether to have an infinite or finite stream. Is it possible to build a finite KafkaStream on a range of messages? Perhaps a Simple Consumer would do just fine and then I could start off getting the writeOffset from zookeeper and tell it to read a specified range per partition. I've done this and forked a simple consumer runnable for each partition, for one of our analyzers. The great thing about the high-level consumer is that rebalance, so I can fork however many stream readers I want and you just figure it out for me. In that way you offer us the control over the resource consumption within a pull model. This is best to regulate message pressure, they say. Combining that high-level rebalance ability with a ranged partition drain could be really nice...build the stream with an ending position and it is a finite stream, but retain the high-level rebalance. With a finite stream, you would know the difference of the 2 async scenarios: fetch-in-progress versus end-of-stream. With an infinite stream, you never get end-of-stream. Aside from a high-level consumer over a finite range within each partition, the other feature I can think of is more complicated. A high-level consumer has state machine changes that the client cannot access, to my knowledge. Our use of kafka has us invoke a message handler with each message we consumer from the KafkaStream, so we convert a pull-model to a push-model. Including the idea of receiving notifications from state machine changes, what would be really nice is to have a KafkaMessageSource, that is an eventful push model
Re: New Consumer API discussion
Hi Robert, Yes, you can check out the callback functions in the new API onPartitionDesigned onPartitionAssigned and see if they meet your needs. Guozhang On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert robert.with...@dish.comwrote: Jun, Are you saying it is possible to get events from the high-level consumer regarding various state machine changes? For instance, can we get a notification when a rebalance starts and ends, when a partition is assigned/unassigned, when an offset is committed on a partition, when a leader changes and so on? I call this OOB traffic, since they are not the core messages streaming, but side-band events, yet they are still potentially useful to consumers. Thank you, Robert Robert Withers Staff Analyst/Developer o: (720) 514-8963 c: (571) 262-1873 -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Sunday, February 23, 2014 4:19 PM To: users@kafka.apache.org Subject: Re: New Consumer API discussion Robert, For the push orient api, you can potentially implement your own MessageHandler with those methods. In the main loop of our new consumer api, you can just call those methods based on the events you get. Also, we already have an api to get the first and the last offset of a partition (getOffsetBefore). Thanks, Jun On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert robert.with...@dish.comwrote: This is a good idea, too. I would modify it to include stream marking, then you can have: long end = consumer.lastOffset(tp); consumer.setMark(end); while(consumer.beforeMark()) { process(consumer.pollToMark()); } or long end = consumer.lastOffset(tp); consumer.setMark(end); for(Object msg : consumer.iteratorToMark()) { process(msg); } I actually have 4 suggestions, then: * pull: stream marking * pull: finite streams, bound by time range (up-to-now, yesterday) or offset * pull: async api * push: KafkaMessageSource, for a push model, with msg and OOB events. Build one in either individual or chunk mode and have a listener for each msg or a listener for a chunk of msgs. Make it composable and policy driven (chunked, range, commitOffsets policy, retry policy, transactional) Thank you, Robert On Feb 22, 2014, at 11:21 AM, Jay Kreps jay.kr...@gmail.commailto: jay.kr...@gmail.com wrote: I think what Robert is saying is that we need to think through the offset API to enable batch processing of topic data. Think of a process that periodically kicks off to compute a data summary or do a data load or something like that. I think what we need to support this is an api to fetch the last offset from the server for a partition. Something like long lastOffset(TopicPartition tp) and for symmetry long firstOffset(TopicPartition tp) Likely this would have to be batched. Essentially we should add this use case to our set of code examples to write and think through. The usage would be something like long end = consumer.lastOffset(tp); while(consumer.position end) process(consumer.poll()); -Jay On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert robert.with...@dish.com mailto:robert.with...@dish.comwrote: Jun, I was originally thinking a non-blocking read from a distributed stream should distinguish between no local messages, but a fetch is occurring versus you have drained the stream. The reason this may be valuable to me is so I can write consumers that read all known traffic then terminate. You caused me to reconsider and I think I am conflating 2 things. One is a sync/async api while the other is whether to have an infinite or finite stream. Is it possible to build a finite KafkaStream on a range of messages? Perhaps a Simple Consumer would do just fine and then I could start off getting the writeOffset from zookeeper and tell it to read a specified range per partition. I've done this and forked a simple consumer runnable for each partition, for one of our analyzers. The great thing about the high-level consumer is that rebalance, so I can fork however many stream readers I want and you just figure it out for me. In that way you offer us the control over the resource consumption within a pull model. This is best to regulate message pressure, they say. Combining that high-level rebalance ability with a ranged partition drain could be really nice...build the stream with an ending position and it is a finite stream, but retain the high-level rebalance. With a finite stream, you would know the difference of the 2 async scenarios: fetch-in-progress versus end-of-stream. With an infinite stream, you never get end-of-stream. Aside from a high-level consumer over a finite range within each partition, the other feature I can think of is more complicated. A high-level consumer has state machine changes that the client cannot
Re: New Consumer API discussion
That’s wonderful. Thanks for kafka. Rob On Feb 24, 2014, at 9:58 AM, Guozhang Wang wangg...@gmail.commailto:wangg...@gmail.com wrote: Hi Robert, Yes, you can check out the callback functions in the new API onPartitionDesigned onPartitionAssigned and see if they meet your needs. Guozhang On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.comwrote: Jun, Are you saying it is possible to get events from the high-level consumer regarding various state machine changes? For instance, can we get a notification when a rebalance starts and ends, when a partition is assigned/unassigned, when an offset is committed on a partition, when a leader changes and so on? I call this OOB traffic, since they are not the core messages streaming, but side-band events, yet they are still potentially useful to consumers. Thank you, Robert Robert Withers Staff Analyst/Developer o: (720) 514-8963 c: (571) 262-1873 -Original Message- From: Jun Rao [mailto:jun...@gmail.com] Sent: Sunday, February 23, 2014 4:19 PM To: users@kafka.apache.orgmailto:users@kafka.apache.org Subject: Re: New Consumer API discussion Robert, For the push orient api, you can potentially implement your own MessageHandler with those methods. In the main loop of our new consumer api, you can just call those methods based on the events you get. Also, we already have an api to get the first and the last offset of a partition (getOffsetBefore). Thanks, Jun On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.comwrote: This is a good idea, too. I would modify it to include stream marking, then you can have: long end = consumer.lastOffset(tp); consumer.setMark(end); while(consumer.beforeMark()) { process(consumer.pollToMark()); } or long end = consumer.lastOffset(tp); consumer.setMark(end); for(Object msg : consumer.iteratorToMark()) { process(msg); } I actually have 4 suggestions, then: * pull: stream marking * pull: finite streams, bound by time range (up-to-now, yesterday) or offset * pull: async api * push: KafkaMessageSource, for a push model, with msg and OOB events. Build one in either individual or chunk mode and have a listener for each msg or a listener for a chunk of msgs. Make it composable and policy driven (chunked, range, commitOffsets policy, retry policy, transactional) Thank you, Robert On Feb 22, 2014, at 11:21 AM, Jay Kreps jay.kr...@gmail.commailto:jay.kr...@gmail.commailto: jay.kr...@gmail.commailto:jay.kr...@gmail.com wrote: I think what Robert is saying is that we need to think through the offset API to enable batch processing of topic data. Think of a process that periodically kicks off to compute a data summary or do a data load or something like that. I think what we need to support this is an api to fetch the last offset from the server for a partition. Something like long lastOffset(TopicPartition tp) and for symmetry long firstOffset(TopicPartition tp) Likely this would have to be batched. Essentially we should add this use case to our set of code examples to write and think through. The usage would be something like long end = consumer.lastOffset(tp); while(consumer.position end) process(consumer.poll()); -Jay On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.com mailto:robert.with...@dish.comwrote: Jun, I was originally thinking a non-blocking read from a distributed stream should distinguish between no local messages, but a fetch is occurring versus you have drained the stream. The reason this may be valuable to me is so I can write consumers that read all known traffic then terminate. You caused me to reconsider and I think I am conflating 2 things. One is a sync/async api while the other is whether to have an infinite or finite stream. Is it possible to build a finite KafkaStream on a range of messages? Perhaps a Simple Consumer would do just fine and then I could start off getting the writeOffset from zookeeper and tell it to read a specified range per partition. I've done this and forked a simple consumer runnable for each partition, for one of our analyzers. The great thing about the high-level consumer is that rebalance, so I can fork however many stream readers I want and you just figure it out for me. In that way you offer us the control over the resource consumption within a pull model. This is best to regulate message pressure, they say. Combining that high-level rebalance ability with a ranged partition drain could be really nice...build the stream with an ending position and it is a finite stream, but retain the high-level rebalance. With a finite stream, you would know the difference of the 2 async scenarios: fetch-in-progress versus end-of-stream. With an infinite stream, you never get end-of-stream. Aside from a high-level consumer over a finite range within each partition, the other feature I
Re: New Consumer API discussion
We use kafka as a durable buffer for 3rd party event traffic. It acts as the event source in a lambda architecture. We want it to be exactly once and we are close, though we can lose messages aggregating for Hadoop. To really tie this all together, I think there should be an Apache project to implement a proper 3-phase distributed transaction capability, which the Kafka and Hadoop communities could implement together. This paper looks promising. It is a 3 RTT protocol, but it is non-blocking. This could be a part of a new consumer api, at some point. http://ieeexplore.ieee.org/xpl/articleDetails.jsp?arnumber=1703048 regards, Rob
Re: New Consumer API discussion
and NoMessagePendingException? The nextMsgs() method that you want is exactly what poll() does. Thanks, Jun On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert robert.with...@dish.com mailto:robert.with...@dish.com mailto:robert.with...@dish.comwrote: I am not clear on why the consumer stream should be positionable, especially if it is limited to the in-memory fetched messages. Could someone explain to me, please? I really like the idea of committing the offset specifically on those partitions with changed read offsets, only. 2 items I would like to see added to the KafkaStream are: * a non-blocking next(), throws several exceptions (FetchingInProgressException and a NoMessagePendingException or something) to differentiate between fetching or no messages left. * A nextMsgs() method which returns all locally available messages and kicks off a fetch for the next chunk. If you are trying to add transactional features, then formally define a DTP capability and pull in other server frameworks to share the implementation. Should it be XA/Open? How about a new peer2peer DTP protocol? Thank you, Robert Robert Withers Staff Analyst/Developer o: (720) 514-8963 c: (571) 262-1873 -Original Message- From: Jay Kreps [mailto:jay.kr...@gmail.com] Sent: Sunday, February 16, 2014 10:13 AM To: users@kafka.apache.orgmailto:users@kafka.apache.orgmailto: users@kafka.apache.org Subject: Re: New Consumer API discussion +1 I think those are good. It is a little weird that changing the fetch point is not batched but changing the commit point is, but I suppose there is no helping that. -Jay On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede neha.narkh...@gmail.com mailto:neha.narkh...@gmail.com mailto:neha.narkh...@gmail.com mailto:neha.narkh...@gmail.comwrote: Jay, That makes sense. position/seek deal with changing the consumers in-memory data, so there is no remote rpc there. For some reason, I got committed and seek mixed up in my head at that time :) So we still end up with long position(TopicPartition tp) void seek(TopicPartitionOffset p) MapTopicPartition, Long committed(TopicPartition tp); void commit(TopicPartitionOffset...); Thanks, Neha On Friday, February 14, 2014, Jay Kreps jay.kr...@gmail.commailto: jay.kr...@gmail.commailto: jay.kr...@gmail.commailto:jay.kr...@gmail.commailto: jay.kr...@gmail.commailto:jay.kr...@gmail.commailto:jay.kr...@gmail.com wrote: Oh, interesting. So I am assuming the following implementation: 1. We have an in-memory fetch position which controls the next fetch offset. 2. Changing this has no effect until you poll again at which point your fetch request will be from the newly specified offset 3. We then have an in-memory but also remotely stored committed offset. 4. Calling commit has the effect of saving the fetch position as both the in memory committed position and in the remote store 5. Auto-commit is the same as periodically calling commit on all positions. So batching on commit as well as getting the committed position makes sense, but batching the fetch position wouldn't, right? I think you are actually thinking of a different approach. -Jay On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede neha.narkh...@gmail.commailto:neha.narkh...@gmail.commailto: neha.narkh...@gmail.com javascript:; wrote: I think you are saying both, i.e. if you have committed on a partition it returns you that value but if you haven't it does a remote lookup? Correct. The other argument for making committed batched is that commit() is batched, so there is symmetry. position() and seek() are always in memory changes (I assume) so there is no need to batch them. I'm not as sure as you are about that assumption being true. Basically in my example above, the batching argument for committed() also applies to position() since one purpose of fetching a partition's offset is to use it to set the position of the consumer to that offset. Since that might lead to a remote OffsetRequest call, I think we probably would be better off batching it. Another option for naming would be position/reposition instead of position/seek. I think position/seek is better since it aligns with Java file APIs. I also think your suggestion about ConsumerPosition makes sense. Thanks, Neha On Feb 13, 2014 9:22 PM, Jay Kreps jay.kr...@gmail.commailto: jay.kr...@gmail.commailto: jay.kr...@gmail.commailto:jay.kr...@gmail.commailto: jay.kr...@gmail.commailto:jay.kr...@gmail.commailto:jay.kr...@gmail.com wrote: Hey Neha, I actually wasn't proposing the name TopicOffsetPosition, that was just a typo. I meant TopicPartitionOffset, and I was just referencing what was in the javadoc. So to restate my proposal without the typo, using just the existing classes
Re: New Consumer API discussion
Jun, I was originally thinking a non-blocking read from a distributed stream should distinguish between no local messages, but a fetch is occurring” versus “you have drained the stream”. The reason this may be valuable to me is so I can write consumers that read all known traffic then terminate. You caused me to reconsider and I think I am conflating 2 things. One is a sync/async api while the other is whether to have an infinite or finite stream. Is it possible to build a finite KafkaStream on a range of messages? Perhaps a Simple Consumer would do just fine and then I could start off getting the writeOffset from zookeeper and tell it to read a specified range per partition. I’ve done this and forked a simple consumer runnable for each partition, for one of our analyzers. The great thing about the high-level consumer is that rebalance, so I can fork however many stream readers I want and you just figure it out for me. In that way you offer us the control over the resource consumption within a pull model. This is best to regulate message pressure, they say. Combining that high-level rebalance ability with a ranged partition drain could be really nice…build the stream with an ending position and it is a finite stream, but retain the high-level rebalance. With a finite stream, you would know the difference of the 2 async scenarios: fetch-in-progress versus end-of-stream. With an infinite stream, you never get end-of-stream. Aside from a high-level consumer over a finite range within each partition, the other feature I can think of is more complicated. A high-level consumer has state machine changes that the client cannot access, to my knowledge. Our use of kafka has us invoke a message handler with each message we consumer from the KafkaStream, so we convert a pull-model to a push-model. Including the idea of receiving notifications from state machine changes, what would be really nice is to have a KafkaMessageSource, that is an eventful push model. If it were thread-safe, then we could register listeners for various events: * opening-stream * closing-stream * message-arrived * end-of-stream/no-more-messages-in-partition (for finite streams) * rebalance started * partition assigned * partition unassigned * rebalance finished * partition-offset-committed Perhaps that is just our use, but instead of a pull-oriented KafkaStream, is there any sense in your providing a push-oriented KafkaMessageSource publishing OOB messages? thank you, Robert On Feb 21, 2014, at 5:59 PM, Jun Rao jun...@gmail.commailto:jun...@gmail.com wrote: Robert, Could you explain why you want to distinguish btw FetchingInProgressException and NoMessagePendingException? The nextMsgs() method that you want is exactly what poll() does. Thanks, Jun On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert robert.with...@dish.commailto:robert.with...@dish.comwrote: I am not clear on why the consumer stream should be positionable, especially if it is limited to the in-memory fetched messages. Could someone explain to me, please? I really like the idea of committing the offset specifically on those partitions with changed read offsets, only. 2 items I would like to see added to the KafkaStream are: * a non-blocking next(), throws several exceptions (FetchingInProgressException and a NoMessagePendingException or something) to differentiate between fetching or no messages left. * A nextMsgs() method which returns all locally available messages and kicks off a fetch for the next chunk. If you are trying to add transactional features, then formally define a DTP capability and pull in other server frameworks to share the implementation. Should it be XA/Open? How about a new peer2peer DTP protocol? Thank you, Robert Robert Withers Staff Analyst/Developer o: (720) 514-8963 c: (571) 262-1873 -Original Message- From: Jay Kreps [mailto:jay.kr...@gmail.com] Sent: Sunday, February 16, 2014 10:13 AM To: users@kafka.apache.orgmailto:users@kafka.apache.org Subject: Re: New Consumer API discussion +1 I think those are good. It is a little weird that changing the fetch point is not batched but changing the commit point is, but I suppose there is no helping that. -Jay On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede neha.narkh...@gmail.commailto:neha.narkh...@gmail.com mailto:neha.narkh...@gmail.comwrote: Jay, That makes sense. position/seek deal with changing the consumers in-memory data, so there is no remote rpc there. For some reason, I got committed and seek mixed up in my head at that time :) So we still end up with long position(TopicPartition tp) void seek(TopicPartitionOffset p) MapTopicPartition, Long committed(TopicPartition tp); void commit(TopicPartitionOffset...); Thanks, Neha On Friday, February 14, 2014, Jay Kreps jay.kr...@gmail.commailto:jay.kr...@gmail.commailto: jay.kr
Re: New Consumer API discussion
I think what Robert is saying is that we need to think through the offset API to enable batch processing of topic data. Think of a process that periodically kicks off to compute a data summary or do a data load or something like that. I think what we need to support this is an api to fetch the last offset from the server for a partition. Something like long lastOffset(TopicPartition tp) and for symmetry long firstOffset(TopicPartition tp) Likely this would have to be batched. Essentially we should add this use case to our set of code examples to write and think through. The usage would be something like long end = consumer.lastOffset(tp); while(consumer.position end) process(consumer.poll()); -Jay On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert robert.with...@dish.comwrote: Jun, I was originally thinking a non-blocking read from a distributed stream should distinguish between no local messages, but a fetch is occurring versus you have drained the stream. The reason this may be valuable to me is so I can write consumers that read all known traffic then terminate. You caused me to reconsider and I think I am conflating 2 things. One is a sync/async api while the other is whether to have an infinite or finite stream. Is it possible to build a finite KafkaStream on a range of messages? Perhaps a Simple Consumer would do just fine and then I could start off getting the writeOffset from zookeeper and tell it to read a specified range per partition. I've done this and forked a simple consumer runnable for each partition, for one of our analyzers. The great thing about the high-level consumer is that rebalance, so I can fork however many stream readers I want and you just figure it out for me. In that way you offer us the control over the resource consumption within a pull model. This is best to regulate message pressure, they say. Combining that high-level rebalance ability with a ranged partition drain could be really nice...build the stream with an ending position and it is a finite stream, but retain the high-level rebalance. With a finite stream, you would know the difference of the 2 async scenarios: fetch-in-progress versus end-of-stream. With an infinite stream, you never get end-of-stream. Aside from a high-level consumer over a finite range within each partition, the other feature I can think of is more complicated. A high-level consumer has state machine changes that the client cannot access, to my knowledge. Our use of kafka has us invoke a message handler with each message we consumer from the KafkaStream, so we convert a pull-model to a push-model. Including the idea of receiving notifications from state machine changes, what would be really nice is to have a KafkaMessageSource, that is an eventful push model. If it were thread-safe, then we could register listeners for various events: * opening-stream * closing-stream * message-arrived * end-of-stream/no-more-messages-in-partition (for finite streams) * rebalance started * partition assigned * partition unassigned * rebalance finished * partition-offset-committed Perhaps that is just our use, but instead of a pull-oriented KafkaStream, is there any sense in your providing a push-oriented KafkaMessageSource publishing OOB messages? thank you, Robert On Feb 21, 2014, at 5:59 PM, Jun Rao jun...@gmail.commailto: jun...@gmail.com wrote: Robert, Could you explain why you want to distinguish btw FetchingInProgressException and NoMessagePendingException? The nextMsgs() method that you want is exactly what poll() does. Thanks, Jun On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert robert.with...@dish.com mailto:robert.with...@dish.comwrote: I am not clear on why the consumer stream should be positionable, especially if it is limited to the in-memory fetched messages. Could someone explain to me, please? I really like the idea of committing the offset specifically on those partitions with changed read offsets, only. 2 items I would like to see added to the KafkaStream are: * a non-blocking next(), throws several exceptions (FetchingInProgressException and a NoMessagePendingException or something) to differentiate between fetching or no messages left. * A nextMsgs() method which returns all locally available messages and kicks off a fetch for the next chunk. If you are trying to add transactional features, then formally define a DTP capability and pull in other server frameworks to share the implementation. Should it be XA/Open? How about a new peer2peer DTP protocol? Thank you, Robert Robert Withers Staff Analyst/Developer o: (720) 514-8963 c: (571) 262-1873 -Original Message- From: Jay Kreps [mailto:jay.kr...@gmail.com] Sent: Sunday, February 16, 2014 10:13 AM To: users@kafka.apache.orgmailto:users@kafka.apache.org Subject: Re: New Consumer API discussion
Re: New Consumer API discussion
partitions with changed read offsets, only. 2 items I would like to see added to the KafkaStream are: * a non-blocking next(), throws several exceptions (FetchingInProgressException and a NoMessagePendingException or something) to differentiate between fetching or no messages left. * A nextMsgs() method which returns all locally available messages and kicks off a fetch for the next chunk. If you are trying to add transactional features, then formally define a DTP capability and pull in other server frameworks to share the implementation. Should it be XA/Open? How about a new peer2peer DTP protocol? Thank you, Robert Robert Withers Staff Analyst/Developer o: (720) 514-8963 c: (571) 262-1873 -Original Message- From: Jay Kreps [mailto:jay.kr...@gmail.com] Sent: Sunday, February 16, 2014 10:13 AM To: users@kafka.apache.orgmailto:users@kafka.apache.orgmailto:users@kafka.apache.org Subject: Re: New Consumer API discussion +1 I think those are good. It is a little weird that changing the fetch point is not batched but changing the commit point is, but I suppose there is no helping that. -Jay On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede neha.narkh...@gmail.commailto:neha.narkh...@gmail.com mailto:neha.narkh...@gmail.com mailto:neha.narkh...@gmail.comwrote: Jay, That makes sense. position/seek deal with changing the consumers in-memory data, so there is no remote rpc there. For some reason, I got committed and seek mixed up in my head at that time :) So we still end up with long position(TopicPartition tp) void seek(TopicPartitionOffset p) MapTopicPartition, Long committed(TopicPartition tp); void commit(TopicPartitionOffset...); Thanks, Neha On Friday, February 14, 2014, Jay Kreps jay.kr...@gmail.commailto:jay.kr...@gmail.commailto: jay.kr...@gmail.commailto:jay.kr...@gmail.commailto: jay.kr...@gmail.commailto:jay.kr...@gmail.commailto:jay.kr...@gmail.com wrote: Oh, interesting. So I am assuming the following implementation: 1. We have an in-memory fetch position which controls the next fetch offset. 2. Changing this has no effect until you poll again at which point your fetch request will be from the newly specified offset 3. We then have an in-memory but also remotely stored committed offset. 4. Calling commit has the effect of saving the fetch position as both the in memory committed position and in the remote store 5. Auto-commit is the same as periodically calling commit on all positions. So batching on commit as well as getting the committed position makes sense, but batching the fetch position wouldn't, right? I think you are actually thinking of a different approach. -Jay On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede neha.narkh...@gmail.commailto:neha.narkh...@gmail.commailto:neha.narkh...@gmail.com javascript:; wrote: I think you are saying both, i.e. if you have committed on a partition it returns you that value but if you haven't it does a remote lookup? Correct. The other argument for making committed batched is that commit() is batched, so there is symmetry. position() and seek() are always in memory changes (I assume) so there is no need to batch them. I'm not as sure as you are about that assumption being true. Basically in my example above, the batching argument for committed() also applies to position() since one purpose of fetching a partition's offset is to use it to set the position of the consumer to that offset. Since that might lead to a remote OffsetRequest call, I think we probably would be better off batching it. Another option for naming would be position/reposition instead of position/seek. I think position/seek is better since it aligns with Java file APIs. I also think your suggestion about ConsumerPosition makes sense. Thanks, Neha On Feb 13, 2014 9:22 PM, Jay Kreps jay.kr...@gmail.commailto:jay.kr...@gmail.commailto: jay.kr...@gmail.commailto:jay.kr...@gmail.commailto: jay.kr...@gmail.commailto:jay.kr...@gmail.commailto:jay.kr...@gmail.com wrote: Hey Neha, I actually wasn't proposing the name TopicOffsetPosition, that was just a typo. I meant TopicPartitionOffset, and I was just referencing what was in the javadoc. So to restate my proposal without the typo, using just the existing classes (that naming is a separate question): long position(TopicPartition tp) void seek(TopicPartitionOffset p) long committed(TopicPartition tp) void commit(TopicPartitionOffset...); So I may be unclear on committed() (AKA lastCommittedOffset). Is it returning the in-memory value from the last commit by this consumer, or is it doing a remote fetch, or both? I think you are saying both, i.e. if you have committed on a partition it returns you that value but if you haven't it does a remote lookup? The other argument for making committed batched is that commit() is batched, so there is symmetry
Re: New Consumer API discussion
What's the use case of position()? Isn't that just the nextOffset() on the last message returned from poll()? Thanks, Jun On Sun, Feb 16, 2014 at 9:12 AM, Jay Kreps jay.kr...@gmail.com wrote: +1 I think those are good. It is a little weird that changing the fetch point is not batched but changing the commit point is, but I suppose there is no helping that. -Jay On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede neha.narkh...@gmail.com wrote: Jay, That makes sense. position/seek deal with changing the consumers in-memory data, so there is no remote rpc there. For some reason, I got committed and seek mixed up in my head at that time :) So we still end up with long position(TopicPartition tp) void seek(TopicPartitionOffset p) MapTopicPartition, Long committed(TopicPartition tp); void commit(TopicPartitionOffset...); Thanks, Neha On Friday, February 14, 2014, Jay Kreps jay.kr...@gmail.com wrote: Oh, interesting. So I am assuming the following implementation: 1. We have an in-memory fetch position which controls the next fetch offset. 2. Changing this has no effect until you poll again at which point your fetch request will be from the newly specified offset 3. We then have an in-memory but also remotely stored committed offset. 4. Calling commit has the effect of saving the fetch position as both the in memory committed position and in the remote store 5. Auto-commit is the same as periodically calling commit on all positions. So batching on commit as well as getting the committed position makes sense, but batching the fetch position wouldn't, right? I think you are actually thinking of a different approach. -Jay On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede neha.narkh...@gmail.com javascript:; wrote: I think you are saying both, i.e. if you have committed on a partition it returns you that value but if you haven't it does a remote lookup? Correct. The other argument for making committed batched is that commit() is batched, so there is symmetry. position() and seek() are always in memory changes (I assume) so there is no need to batch them. I'm not as sure as you are about that assumption being true. Basically in my example above, the batching argument for committed() also applies to position() since one purpose of fetching a partition's offset is to use it to set the position of the consumer to that offset. Since that might lead to a remote OffsetRequest call, I think we probably would be better off batching it. Another option for naming would be position/reposition instead of position/seek. I think position/seek is better since it aligns with Java file APIs. I also think your suggestion about ConsumerPosition makes sense. Thanks, Neha On Feb 13, 2014 9:22 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Neha, I actually wasn't proposing the name TopicOffsetPosition, that was just a typo. I meant TopicPartitionOffset, and I was just referencing what was in the javadoc. So to restate my proposal without the typo, using just the existing classes (that naming is a separate question): long position(TopicPartition tp) void seek(TopicPartitionOffset p) long committed(TopicPartition tp) void commit(TopicPartitionOffset...); So I may be unclear on committed() (AKA lastCommittedOffset). Is it returning the in-memory value from the last commit by this consumer, or is it doing a remote fetch, or both? I think you are saying both, i.e. if you have committed on a partition it returns you that value but if you haven't it does a remote lookup? The other argument for making committed batched is that commit() is batched, so there is symmetry. position() and seek() are always in memory changes (I assume) so there is no need to batch them. So taking all that into account what if we revise it to long position(TopicPartition tp) void seek(TopicPartitionOffset p) MapTopicPartition, Long committed(TopicPartition tp); void commit(TopicPartitionOffset...); This is not symmetric between position/seek and commit/committed but it is convenient. Another option for naming would be position/reposition instead of position/seek. With respect to the name TopicPartitionOffset, what I was trying to say is that I recommend we change that to something shorter. I think TopicPosition or ConsumerPosition might be better. Position does not refer to the variables in the object, it refers to the meaning of the object--it represents a position within a topic. The offset field in that object is still called the offset.
Re: New Consumer API discussion
Robert, Could you explain why you want to distinguish btw FetchingInProgressException and NoMessagePendingException? The nextMsgs() method that you want is exactly what poll() does. Thanks, Jun On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert robert.with...@dish.comwrote: I am not clear on why the consumer stream should be positionable, especially if it is limited to the in-memory fetched messages. Could someone explain to me, please? I really like the idea of committing the offset specifically on those partitions with changed read offsets, only. 2 items I would like to see added to the KafkaStream are: * a non-blocking next(), throws several exceptions (FetchingInProgressException and a NoMessagePendingException or something) to differentiate between fetching or no messages left. * A nextMsgs() method which returns all locally available messages and kicks off a fetch for the next chunk. If you are trying to add transactional features, then formally define a DTP capability and pull in other server frameworks to share the implementation. Should it be XA/Open? How about a new peer2peer DTP protocol? Thank you, Robert Robert Withers Staff Analyst/Developer o: (720) 514-8963 c: (571) 262-1873 -Original Message- From: Jay Kreps [mailto:jay.kr...@gmail.com] Sent: Sunday, February 16, 2014 10:13 AM To: users@kafka.apache.org Subject: Re: New Consumer API discussion +1 I think those are good. It is a little weird that changing the fetch point is not batched but changing the commit point is, but I suppose there is no helping that. -Jay On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede neha.narkh...@gmail.com mailto:neha.narkh...@gmail.comwrote: Jay, That makes sense. position/seek deal with changing the consumers in-memory data, so there is no remote rpc there. For some reason, I got committed and seek mixed up in my head at that time :) So we still end up with long position(TopicPartition tp) void seek(TopicPartitionOffset p) MapTopicPartition, Long committed(TopicPartition tp); void commit(TopicPartitionOffset...); Thanks, Neha On Friday, February 14, 2014, Jay Kreps jay.kr...@gmail.commailto: jay.kr...@gmail.com wrote: Oh, interesting. So I am assuming the following implementation: 1. We have an in-memory fetch position which controls the next fetch offset. 2. Changing this has no effect until you poll again at which point your fetch request will be from the newly specified offset 3. We then have an in-memory but also remotely stored committed offset. 4. Calling commit has the effect of saving the fetch position as both the in memory committed position and in the remote store 5. Auto-commit is the same as periodically calling commit on all positions. So batching on commit as well as getting the committed position makes sense, but batching the fetch position wouldn't, right? I think you are actually thinking of a different approach. -Jay On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede neha.narkh...@gmail.com javascript:; wrote: I think you are saying both, i.e. if you have committed on a partition it returns you that value but if you haven't it does a remote lookup? Correct. The other argument for making committed batched is that commit() is batched, so there is symmetry. position() and seek() are always in memory changes (I assume) so there is no need to batch them. I'm not as sure as you are about that assumption being true. Basically in my example above, the batching argument for committed() also applies to position() since one purpose of fetching a partition's offset is to use it to set the position of the consumer to that offset. Since that might lead to a remote OffsetRequest call, I think we probably would be better off batching it. Another option for naming would be position/reposition instead of position/seek. I think position/seek is better since it aligns with Java file APIs. I also think your suggestion about ConsumerPosition makes sense. Thanks, Neha On Feb 13, 2014 9:22 PM, Jay Kreps jay.kr...@gmail.commailto: jay.kr...@gmail.com wrote: Hey Neha, I actually wasn't proposing the name TopicOffsetPosition, that was just a typo. I meant TopicPartitionOffset, and I was just referencing what was in the javadoc. So to restate my proposal without the typo, using just the existing classes (that naming is a separate question): long position(TopicPartition tp) void seek(TopicPartitionOffset p) long committed(TopicPartition tp) void commit(TopicPartitionOffset
Re: New Consumer API discussion
Jay, That makes sense. position/seek deal with changing the consumers in-memory data, so there is no remote rpc there. For some reason, I got committed and seek mixed up in my head at that time :) So we still end up with long position(TopicPartition tp) void seek(TopicPartitionOffset p) MapTopicPartition, Long committed(TopicPartition tp); void commit(TopicPartitionOffset...); Thanks, Neha On Friday, February 14, 2014, Jay Kreps jay.kr...@gmail.com wrote: Oh, interesting. So I am assuming the following implementation: 1. We have an in-memory fetch position which controls the next fetch offset. 2. Changing this has no effect until you poll again at which point your fetch request will be from the newly specified offset 3. We then have an in-memory but also remotely stored committed offset. 4. Calling commit has the effect of saving the fetch position as both the in memory committed position and in the remote store 5. Auto-commit is the same as periodically calling commit on all positions. So batching on commit as well as getting the committed position makes sense, but batching the fetch position wouldn't, right? I think you are actually thinking of a different approach. -Jay On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede neha.narkh...@gmail.comjavascript:; wrote: I think you are saying both, i.e. if you have committed on a partition it returns you that value but if you haven't it does a remote lookup? Correct. The other argument for making committed batched is that commit() is batched, so there is symmetry. position() and seek() are always in memory changes (I assume) so there is no need to batch them. I'm not as sure as you are about that assumption being true. Basically in my example above, the batching argument for committed() also applies to position() since one purpose of fetching a partition's offset is to use it to set the position of the consumer to that offset. Since that might lead to a remote OffsetRequest call, I think we probably would be better off batching it. Another option for naming would be position/reposition instead of position/seek. I think position/seek is better since it aligns with Java file APIs. I also think your suggestion about ConsumerPosition makes sense. Thanks, Neha On Feb 13, 2014 9:22 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Neha, I actually wasn't proposing the name TopicOffsetPosition, that was just a typo. I meant TopicPartitionOffset, and I was just referencing what was in the javadoc. So to restate my proposal without the typo, using just the existing classes (that naming is a separate question): long position(TopicPartition tp) void seek(TopicPartitionOffset p) long committed(TopicPartition tp) void commit(TopicPartitionOffset...); So I may be unclear on committed() (AKA lastCommittedOffset). Is it returning the in-memory value from the last commit by this consumer, or is it doing a remote fetch, or both? I think you are saying both, i.e. if you have committed on a partition it returns you that value but if you haven't it does a remote lookup? The other argument for making committed batched is that commit() is batched, so there is symmetry. position() and seek() are always in memory changes (I assume) so there is no need to batch them. So taking all that into account what if we revise it to long position(TopicPartition tp) void seek(TopicPartitionOffset p) MapTopicPartition, Long committed(TopicPartition tp); void commit(TopicPartitionOffset...); This is not symmetric between position/seek and commit/committed but it is convenient. Another option for naming would be position/reposition instead of position/seek. With respect to the name TopicPartitionOffset, what I was trying to say is that I recommend we change that to something shorter. I think TopicPosition or ConsumerPosition might be better. Position does not refer to the variables in the object, it refers to the meaning of the object--it represents a position within a topic. The offset field in that object is still called the offset. TopicOffset, PartitionOffset, or ConsumerOffset would all be workable too. Basically I am just objecting to concatenating three nouns together. :-) -Jay On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede neha.narkh...@gmail.com wrote: 2. It returns a list of results. But how can you use the list? The only way to use the list is to make a map of tp=offset and then look up results in this map (or do a for loop over the list for the partition you want). I recommend that if this is an in-memory check we just do one at a time. E.g. long committedPosition( TopicPosition). This was discussed in the previous emails. There is a choic
Re: New Consumer API discussion
Oh, interesting. So I am assuming the following implementation: 1. We have an in-memory fetch position which controls the next fetch offset. 2. Changing this has no effect until you poll again at which point your fetch request will be from the newly specified offset 3. We then have an in-memory but also remotely stored committed offset. 4. Calling commit has the effect of saving the fetch position as both the in memory committed position and in the remote store 5. Auto-commit is the same as periodically calling commit on all positions. So batching on commit as well as getting the committed position makes sense, but batching the fetch position wouldn't, right? I think you are actually thinking of a different approach. -Jay On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede neha.narkh...@gmail.comwrote: I think you are saying both, i.e. if you have committed on a partition it returns you that value but if you haven't it does a remote lookup? Correct. The other argument for making committed batched is that commit() is batched, so there is symmetry. position() and seek() are always in memory changes (I assume) so there is no need to batch them. I'm not as sure as you are about that assumption being true. Basically in my example above, the batching argument for committed() also applies to position() since one purpose of fetching a partition's offset is to use it to set the position of the consumer to that offset. Since that might lead to a remote OffsetRequest call, I think we probably would be better off batching it. Another option for naming would be position/reposition instead of position/seek. I think position/seek is better since it aligns with Java file APIs. I also think your suggestion about ConsumerPosition makes sense. Thanks, Neha On Feb 13, 2014 9:22 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Neha, I actually wasn't proposing the name TopicOffsetPosition, that was just a typo. I meant TopicPartitionOffset, and I was just referencing what was in the javadoc. So to restate my proposal without the typo, using just the existing classes (that naming is a separate question): long position(TopicPartition tp) void seek(TopicPartitionOffset p) long committed(TopicPartition tp) void commit(TopicPartitionOffset...); So I may be unclear on committed() (AKA lastCommittedOffset). Is it returning the in-memory value from the last commit by this consumer, or is it doing a remote fetch, or both? I think you are saying both, i.e. if you have committed on a partition it returns you that value but if you haven't it does a remote lookup? The other argument for making committed batched is that commit() is batched, so there is symmetry. position() and seek() are always in memory changes (I assume) so there is no need to batch them. So taking all that into account what if we revise it to long position(TopicPartition tp) void seek(TopicPartitionOffset p) MapTopicPartition, Long committed(TopicPartition tp); void commit(TopicPartitionOffset...); This is not symmetric between position/seek and commit/committed but it is convenient. Another option for naming would be position/reposition instead of position/seek. With respect to the name TopicPartitionOffset, what I was trying to say is that I recommend we change that to something shorter. I think TopicPosition or ConsumerPosition might be better. Position does not refer to the variables in the object, it refers to the meaning of the object--it represents a position within a topic. The offset field in that object is still called the offset. TopicOffset, PartitionOffset, or ConsumerOffset would all be workable too. Basically I am just objecting to concatenating three nouns together. :-) -Jay On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede neha.narkh...@gmail.com wrote: 2. It returns a list of results. But how can you use the list? The only way to use the list is to make a map of tp=offset and then look up results in this map (or do a for loop over the list for the partition you want). I recommend that if this is an in-memory check we just do one at a time. E.g. long committedPosition( TopicPosition). This was discussed in the previous emails. There is a choice between returning a map or a list. Some people found the map to be more usable. What if we made it: long position(TopicPartition tp) void seek(TopicOffsetPosition p) long committed(TopicPartition tp) void commit(TopicOffsetPosition...); This is fine, but TopicOffsetPosition doesn't make sense. Offset and Position is confusing. Also both fetch and commit positions are related to partitions, not topics. Some more options are TopicPartitionPosition or TopicPartitionOffset. And we should use either position everywhere in Kafka or offset but having both is confusing.
Re: New Consumer API discussion
Pradeep - Thanks for your detailed comments. 1. subscribe(String topic, int... paritions) and unsubscribe(String topic, int... partitions) should be subscribe(TopicPartition... topicPartitions)and unsubscribe(TopicPartition... topicPartitons) I think that is reasonable. Overall, I'm in favor of exposing TopicPartition and TopicPartitionOffset as public APIs. They make the APIs more readable especially given that the consumer aims to provide a small set of APIs to support a wide range of functionalities. I will make that change if there are no objections. 2. Does it make sense to provide a convenience method to subscribe to topics at a particular offset directly? E.g. subscribe( TopicPartitionOffset... offsets) I view subscriptions a little differently. One subscribes to resources. In this case, either topics (when you use group management) or specific partitions. Offsets are specific to the consumption protocol and unrelated to subscription which just expresses the user's interest in certain resources. Also, if we have one way to specify fetch offsets (positions()), I'd like to avoid creating *n* APIs to do the same thing, since that just makes the consumer APIs more bulky and eventually confusing. 3. The javadoc makes no mention of what would happen if positions() is called with a TopicPartitionOffset to which the Consumer is not subscribed to. Good point. Fixed the javadochttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html#positions%28kafka.common.TopicPartitionOffset...%29 4. The javadoc makes no mention of what would happen if positions() is called with two different offsets for a single TopicPartition positions() can be called multiple times and hence with different offsets. I think I mentioned in the latest javadoc that positions() will change the offset on the next fetch request (poll()). Improved the javadoc to explicitly mention this case. 5. The javadoc shows lastCommittedOffsets() return type as ListTopicPartitionOffset. This should either be MapTopicPartition, Long or MapTopicPartition, TopicPartitionOffset This depends on how the user would use the committed offsets. One example I could think off and is mentioned in the javadoc for lastCommittedOffsets() is to rewind consumption. In this case, you may or may not require random access to a particular partition's offset, depending on whether you want to selectively rewind consumption or not. So it may be fine to return a map. I'm not sure if people can think of other uses of this API though. In any case, if we wanted to change this to a map, I'd prefer MapTopicPartition, Long. 6. It seems like #4 can be avoided by using MapTopicPartition, Long or MapTopicPartition, TopicPartitionOffset as the argument type. How? lastCommittedOffsets() is independent of positions(). I'm not sure I understood your suggestion. 7. To address #3, maybe we can return ListTopicPartitionOffset that are invalid. I don't particularly see the advantage of returning a list of invalid partitions from position(). It seems a bit awkward to return a list to indicate what is obviously a bug. Prefer throwing an error since the user should just fix that logic. Thanks, Neha On Wed, Feb 12, 2014 at 3:59 PM, Jay Kreps jay.kr...@gmail.com wrote: Ah, gotcha. -Jay On Wed, Feb 12, 2014 at 8:40 AM, Neha Narkhede neha.narkh...@gmail.com wrote: Jay Well none kind of address the common case which is to commit all partitions. For these I was thinking just commit(); The advantage of this simpler method is that you don't need to bother about partitions you just consume the messages given to you and then commit them This is already what the commit() API is supposed to do. Here is the javadoc - * Synchronously commits the specified offsets for the specified list of topics and partitions to Kafka. If no partitions are specified, * commits offsets for the subscribed list of topics and partitions to Kafka. public void commit(TopicPartitionOffset... offsets); Could you take another look at the javadoc http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html ? I've uploaded changes from the previous discussions and included some of your review suggestions. On Wed, Feb 12, 2014 at 8:32 AM, Neha Narkhede neha.narkh...@gmail.com wrote: Imran, Sorry I am probably missing something basic, but I'm not sure how a multi-threaded consumer would work. I can imagine its either: a) I just have one thread poll kafka. If I want to process msgs in multiple threads, than I deal w/ that after polling, eg. stick them into a blocking queue or something, and have more threads that read from the queue. b) each thread creates its own KafkaConsumer. They are all registered the same
Re: New Consumer API discussion
Hey guys, One thing that bugs me is the lack of symmetric for the different position calls. The way I see it there are two positions we maintain: the fetch position and the last commit position. There are two things you can do to these positions: get the current value or change the current value. But the names somewhat obscure this: Fetch position: - No get - set by positions(TopicOffsetPosition...) Committed position: - get by ListTopicOffsetPosition lastCommittedPosition( TopicPartition...) - set by commit or commitAsync The lastCommittedPosition is particular bothersome because: 1. The name is weird and long 2. It returns a list of results. But how can you use the list? The only way to use the list is to make a map of tp=offset and then look up results in this map (or do a for loop over the list for the partition you want). I recommend that if this is an in-memory check we just do one at a time. E.g. long committedPosition(TopicPosition). What if we made it: long position(TopicPartition tp) void seek(TopicOffsetPosition p) long committed(TopicPartition tp) void commit(TopicOffsetPosition...); This still isn't terribly consistent, but I think it is better. I would also like to shorten the name TopicOffsetPosition. Offset and Position are duplicative of each other. So perhaps we could call it a PartitionOffset or a TopicPosition or something like that. In general class names that are just a concatenation of the fields (e.g. TopicAndPartitionAndOffset) seem kind of lazy to me since the name doesn't really describe it just enumerates. But that is more of a nit pick. -Jay On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede neha.narkh...@gmail.comwrote: As mentioned in previous emails, we are also working on a re-implementation of the consumer. I would like to use this email thread to discuss the details of the public API. I would also like us to be picky about this public api now so it is as good as possible and we don't need to break it in the future. The best way to get a feel for the API is actually to take a look at the javadoc http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html , the hope is to get the api docs good enough so that it is self-explanatory. You can also take a look at the configs here http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html Some background info on implementation: At a high level the primary difference in this consumer is that it removes the distinction between the high-level and low-level consumer. The new consumer API is non blocking and instead of returning a blocking iterator, the consumer provides a poll() API that returns a list of records. We think this is better compared to the blocking iterators since it effectively decouples the threading strategy used for processing messages from the consumer. It is worth noting that the consumer is entirely single threaded and runs in the user thread. The advantage is that it can be easily rewritten in less multi-threading-friendly languages. The consumer batches data and multiplexes I/O over TCP connections to each of the brokers it communicates with, for high throughput. The consumer also allows long poll to reduce the end-to-end message latency for low throughput data. The consumer provides a group management facility that supports the concept of a group with multiple consumer instances (just like the current consumer). This is done through a custom heartbeat and group management protocol transparent to the user. At the same time, it allows users the option to subscribe to a fixed set of partitions and not use group management at all. The offset management strategy defaults to Kafka based offset management and the API provides a way for the user to use a customized offset store to manage the consumer's offsets. A key difference in this consumer also is the fact that it does not depend on zookeeper at all. More details about the new consumer design are here https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design Please take a look at the new API http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html and give us any thoughts you may have. Thanks, Neha
Re: New Consumer API discussion
Hi Neha, 6. It seems like #4 can be avoided by using MapTopicPartition, Long or MapTopicPartition, TopicPartitionOffset as the argument type. How? lastCommittedOffsets() is independent of positions(). I'm not sure I understood your suggestion. I think of subscription as you're subscribing to a Set of TopicPartitions. Because the argument to positions() is TopicPartitionOffset ... it's conceivable that the method can be called with two offsets for the same TopicPartition. One way to handle this, is to accept either the first or the last offset for a TopicPartition. However, if the argument type is changed to MapTopicPartition, Long it precludes the possibility of getting duplicate offsets of the same TopicPartition. 7. To address #3, maybe we can return ListTopicPartitionOffset that are invalid. I don't particularly see the advantage of returning a list of invalid partitions from position(). It seems a bit awkward to return a list to indicate what is obviously a bug. Prefer throwing an error since the user should just fix that logic. I'm not sure if an Exception is needed or desirable here. I don't see this as a catastrophic failure or a non-recoverable failure. Even if we just write the bad offsets to a log file and call it a day, I'm ok with that. But my main goal is to communicate to the API users somehow that they've provided bad offests which are simply being ignored. Hi Jay, I would also like to shorten the name TopicOffsetPosition. Offset and Position are duplicative of each other. So perhaps we could call it a PartitionOffset or a TopicPosition or something like that. In general class names that are just a concatenation of the fields (e.g. TopicAndPartitionAndOffset) seem kind of lazy to me since the name doesn't really describe it just enumerates. But that is more of a nit pick. 1. Did you mean to say TopicPartitionOffset instead of TopicOffsetPosition? 2. +1 on PartitionOffset The lastCommittedPosition is particular bothersome because: 1. The name is weird and long 2. It returns a list of results. But how can you use the list? The only way to use the list is to make a map of tp=offset and then look up results in this map (or do a for loop over the list for the partition you want). This is sort of what I was talking about in my previous email. My suggestion was to change the return type to MapTopicPartition, Long. What if we made it: long position(TopicPartition tp) void seek(TopicOffsetPosition p) long committed(TopicPartition tp) void commit(TopicOffsetPosition...); 1. Absolutely love the idea of position(TopicPartition tp). 2. I think we also need to provide a method for accessing all positions positions() which maybe returns a MapTopicPartition, Long? 3. What is the difference between position(TopicPartition tp) and committed(TopicPartition tp)? 4. +1 on commit(PartitionOffset...) 5. +1 on seek(PartitionOffset p) 6. We should also provide a seek(PartitionOffset... offsets) Finally, in all the methods where we're using varargs, we should use an appropriate Collection data structure. For example, for the subscribe(TopicPartition... partitions) method, I think a more accurate API would be subscribe(SetTopicPartition partitions). This allows for the code to be self-documenting.
Re: New Consumer API discussion
I think you are saying both, i.e. if you have committed on a partition it returns you that value but if you haven't it does a remote lookup? Correct. The other argument for making committed batched is that commit() is batched, so there is symmetry. position() and seek() are always in memory changes (I assume) so there is no need to batch them. I'm not as sure as you are about that assumption being true. Basically in my example above, the batching argument for committed() also applies to position() since one purpose of fetching a partition's offset is to use it to set the position of the consumer to that offset. Since that might lead to a remote OffsetRequest call, I think we probably would be better off batching it. Another option for naming would be position/reposition instead of position/seek. I think position/seek is better since it aligns with Java file APIs. I also think your suggestion about ConsumerPosition makes sense. Thanks, Neha On Feb 13, 2014 9:22 PM, Jay Kreps jay.kr...@gmail.com wrote: Hey Neha, I actually wasn't proposing the name TopicOffsetPosition, that was just a typo. I meant TopicPartitionOffset, and I was just referencing what was in the javadoc. So to restate my proposal without the typo, using just the existing classes (that naming is a separate question): long position(TopicPartition tp) void seek(TopicPartitionOffset p) long committed(TopicPartition tp) void commit(TopicPartitionOffset...); So I may be unclear on committed() (AKA lastCommittedOffset). Is it returning the in-memory value from the last commit by this consumer, or is it doing a remote fetch, or both? I think you are saying both, i.e. if you have committed on a partition it returns you that value but if you haven't it does a remote lookup? The other argument for making committed batched is that commit() is batched, so there is symmetry. position() and seek() are always in memory changes (I assume) so there is no need to batch them. So taking all that into account what if we revise it to long position(TopicPartition tp) void seek(TopicPartitionOffset p) MapTopicPartition, Long committed(TopicPartition tp); void commit(TopicPartitionOffset...); This is not symmetric between position/seek and commit/committed but it is convenient. Another option for naming would be position/reposition instead of position/seek. With respect to the name TopicPartitionOffset, what I was trying to say is that I recommend we change that to something shorter. I think TopicPosition or ConsumerPosition might be better. Position does not refer to the variables in the object, it refers to the meaning of the object--it represents a position within a topic. The offset field in that object is still called the offset. TopicOffset, PartitionOffset, or ConsumerOffset would all be workable too. Basically I am just objecting to concatenating three nouns together. :-) -Jay On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede neha.narkh...@gmail.com wrote: 2. It returns a list of results. But how can you use the list? The only way to use the list is to make a map of tp=offset and then look up results in this map (or do a for loop over the list for the partition you want). I recommend that if this is an in-memory check we just do one at a time. E.g. long committedPosition( TopicPosition). This was discussed in the previous emails. There is a choice between returning a map or a list. Some people found the map to be more usable. What if we made it: long position(TopicPartition tp) void seek(TopicOffsetPosition p) long committed(TopicPartition tp) void commit(TopicOffsetPosition...); This is fine, but TopicOffsetPosition doesn't make sense. Offset and Position is confusing. Also both fetch and commit positions are related to partitions, not topics. Some more options are TopicPartitionPosition or TopicPartitionOffset. And we should use either position everywhere in Kafka or offset but having both is confusing. void seek(TopicOffsetPosition p) long committed(TopicPartition tp) Whether these are batched or not really depends on how flexible we want these APIs to be. The question is whether we allow a consumer to fetch or set the offsets for partitions that it doesn't own or consume. For example, if I choose to skip group management and do my own partition assignment but choose Kafka based offset management. I could imagine a use case where I want to change the partition assignment on the fly, and to do that, I would need to fetch the last committed offsets of partitions that I currently don't consume. If we want to allow this, these APIs would be more performant if batched. And would probably look like - MapTopicPartition, Long positions(TopicPartition... tp) void seek(TopicOffsetPosition... p) MapTopicPartition, Long committed(TopicPartition... tp) void
Re: New Consumer API discussion
Imran, Sorry I am probably missing something basic, but I'm not sure how a multi-threaded consumer would work. I can imagine its either: a) I just have one thread poll kafka. If I want to process msgs in multiple threads, than I deal w/ that after polling, eg. stick them into a blocking queue or something, and have more threads that read from the queue. b) each thread creates its own KafkaConsumer. They are all registered the same way, and I leave it to kafka to figure out what data to give to each one. We designed the new consumer API to not require multi threading on purpose. The reason this is better than the existing ZookeeperConsumerConnector is that it effectively allows the user to use whatever threading and load balance message processing amongst those threads. For example, you might want more threads dedicated to a certain high throughput partition compared to other partitions. In option a) above, you can create your own thread pool and hand over the messages returned by poll using a blocking queue or any other approach. Option b) would work as well and the user has to figure out which topics each KafkaConsumer subscribes to. (a) certainly makes things simple, but I worry about throughput -- is that just as good as having one thread trying to consumer each partition? (b) makes it a bit of a pain to figure out how many threads to use. I assume there is no point in using more threads than there are partitions, so first you've got to figure out how many partitions there are in each topic. Might be nice if there were some util functions to simplify this. The user can pick the number of threads. That is still better as only the user knows how slow/fast the message processing of her application is. Also, since the initial call to subscribe doesn't give the partition assignment, does that mean the first call to poll() will always call the ConsumerRebalanceCallback? Assuming you choose to use group management (by using subscribe(topics)), poll() will invoke the ConsumerRebalanceCallback on every single rebalance attempt. Improved the javadochttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.htmlto explain that. Could you give that another look? If I'm on the right track, I'd like to expand this example, showing how each MyConsumer can keep track of its partitions offsets, even in the face of rebalances. As Jay said, I think a minimal code example could really help us see the utility faults of the api. Sure, please look at the javadochttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html. I've tried to include code examples there. Please help in improving those or adding more. Looks like we should add some multi threading examples. I avoided adding those since there are many ways to handling the message processing and it will not be feasible to list all of those. If we list one, people might think that is the only recommended approach. With that said, here is an example of using Option b) above - ListMyConsumer consumers = new ArrayListMyConsumer(); ListString topics = new ArrayListString(); // populate topics assert(consumers.size == topics.size); for (int i = 0; i numThreads; i++) { MyConsumer c = new MyConsumer(); c.subscribe(topics(i)); consumers.add(c); } // poll each consumer in a separate thread. for (int i = 0; i numThreads; i++) { executorService.submit(new Runnable() { @Override public void run() { new ProcessMessagesTask(consumers(i)); } }); } Let me know what you think. Thanks, Neha On Tue, Feb 11, 2014 at 3:54 PM, Jay Kreps jay.kr...@gmail.com wrote: Comments inline: On Mon, Feb 10, 2014 at 2:31 PM, Guozhang Wang wangg...@gmail.com wrote: Hello Jay, Thanks for the detailed comments. 1. Yeah we could discuss a bit more on that. 2. Since subscribe() is incremental, adding one topic-partition is OK, and personally I think it is cleaner than subscribe(String topic, int...partition)? I am not too particular. Have you actually tried this? I think writing actual sample code is important. 3. Originally I was thinking about two interfaces: getOffsets() // offsets for all partitions that I am consuming now getOffset(topc-partition) // offset of the specified topic-partition, will throw exception if it is not currently consumed. What do you think about these? The naming needs to distinguish committed offset position versus fetch offset position. Also we aren't using the getX convention. 4. Yes, that remains a config. Does that make sense given that you change your position via an api now? 5. Agree. 6. If the time out value is null then it will logically return immediately with whatever data is available. I think an indefinitely poll() function could be replaced with just while (true) poll(some-time)? That is fine but we
Re: New Consumer API discussion
Jay Well none kind of address the common case which is to commit all partitions. For these I was thinking just commit(); The advantage of this simpler method is that you don't need to bother about partitions you just consume the messages given to you and then commit them This is already what the commit() API is supposed to do. Here is the javadoc - * Synchronously commits the specified offsets for the specified list of topics and partitions to Kafka. If no partitions are specified, * commits offsets for the subscribed list of topics and partitions to Kafka. public void commit(TopicPartitionOffset... offsets); Could you take another look at the javadochttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html? I've uploaded changes from the previous discussions and included some of your review suggestions. On Wed, Feb 12, 2014 at 8:32 AM, Neha Narkhede neha.narkh...@gmail.comwrote: Imran, Sorry I am probably missing something basic, but I'm not sure how a multi-threaded consumer would work. I can imagine its either: a) I just have one thread poll kafka. If I want to process msgs in multiple threads, than I deal w/ that after polling, eg. stick them into a blocking queue or something, and have more threads that read from the queue. b) each thread creates its own KafkaConsumer. They are all registered the same way, and I leave it to kafka to figure out what data to give to each one. We designed the new consumer API to not require multi threading on purpose. The reason this is better than the existing ZookeeperConsumerConnector is that it effectively allows the user to use whatever threading and load balance message processing amongst those threads. For example, you might want more threads dedicated to a certain high throughput partition compared to other partitions. In option a) above, you can create your own thread pool and hand over the messages returned by poll using a blocking queue or any other approach. Option b) would work as well and the user has to figure out which topics each KafkaConsumer subscribes to. (a) certainly makes things simple, but I worry about throughput -- is that just as good as having one thread trying to consumer each partition? (b) makes it a bit of a pain to figure out how many threads to use. I assume there is no point in using more threads than there are partitions, so first you've got to figure out how many partitions there are in each topic. Might be nice if there were some util functions to simplify this. The user can pick the number of threads. That is still better as only the user knows how slow/fast the message processing of her application is. Also, since the initial call to subscribe doesn't give the partition assignment, does that mean the first call to poll() will always call the ConsumerRebalanceCallback? Assuming you choose to use group management (by using subscribe(topics)), poll() will invoke the ConsumerRebalanceCallback on every single rebalance attempt. Improved the javadochttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.htmlto explain that. Could you give that another look? If I'm on the right track, I'd like to expand this example, showing how each MyConsumer can keep track of its partitions offsets, even in the face of rebalances. As Jay said, I think a minimal code example could really help us see the utility faults of the api. Sure, please look at the javadochttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html. I've tried to include code examples there. Please help in improving those or adding more. Looks like we should add some multi threading examples. I avoided adding those since there are many ways to handling the message processing and it will not be feasible to list all of those. If we list one, people might think that is the only recommended approach. With that said, here is an example of using Option b) above - ListMyConsumer consumers = new ArrayListMyConsumer(); ListString topics = new ArrayListString(); // populate topics assert(consumers.size == topics.size); for (int i = 0; i numThreads; i++) { MyConsumer c = new MyConsumer(); c.subscribe(topics(i)); consumers.add(c); } // poll each consumer in a separate thread. for (int i = 0; i numThreads; i++) { executorService.submit(new Runnable() { @Override public void run() { new ProcessMessagesTask(consumers(i)); } }); } Let me know what you think. Thanks, Neha On Tue, Feb 11, 2014 at 3:54 PM, Jay Kreps jay.kr...@gmail.com wrote: Comments inline: On Mon, Feb 10, 2014 at 2:31 PM, Guozhang Wang wangg...@gmail.com wrote: Hello Jay, Thanks for the detailed comments. 1. Yeah we could discuss a bit more on that. 2. Since
Re: New Consumer API discussion
Ah, gotcha. -Jay On Wed, Feb 12, 2014 at 8:40 AM, Neha Narkhede neha.narkh...@gmail.comwrote: Jay Well none kind of address the common case which is to commit all partitions. For these I was thinking just commit(); The advantage of this simpler method is that you don't need to bother about partitions you just consume the messages given to you and then commit them This is already what the commit() API is supposed to do. Here is the javadoc - * Synchronously commits the specified offsets for the specified list of topics and partitions to Kafka. If no partitions are specified, * commits offsets for the subscribed list of topics and partitions to Kafka. public void commit(TopicPartitionOffset... offsets); Could you take another look at the javadoc http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html ? I've uploaded changes from the previous discussions and included some of your review suggestions. On Wed, Feb 12, 2014 at 8:32 AM, Neha Narkhede neha.narkh...@gmail.com wrote: Imran, Sorry I am probably missing something basic, but I'm not sure how a multi-threaded consumer would work. I can imagine its either: a) I just have one thread poll kafka. If I want to process msgs in multiple threads, than I deal w/ that after polling, eg. stick them into a blocking queue or something, and have more threads that read from the queue. b) each thread creates its own KafkaConsumer. They are all registered the same way, and I leave it to kafka to figure out what data to give to each one. We designed the new consumer API to not require multi threading on purpose. The reason this is better than the existing ZookeeperConsumerConnector is that it effectively allows the user to use whatever threading and load balance message processing amongst those threads. For example, you might want more threads dedicated to a certain high throughput partition compared to other partitions. In option a) above, you can create your own thread pool and hand over the messages returned by poll using a blocking queue or any other approach. Option b) would work as well and the user has to figure out which topics each KafkaConsumer subscribes to. (a) certainly makes things simple, but I worry about throughput -- is that just as good as having one thread trying to consumer each partition? (b) makes it a bit of a pain to figure out how many threads to use. I assume there is no point in using more threads than there are partitions, so first you've got to figure out how many partitions there are in each topic. Might be nice if there were some util functions to simplify this. The user can pick the number of threads. That is still better as only the user knows how slow/fast the message processing of her application is. Also, since the initial call to subscribe doesn't give the partition assignment, does that mean the first call to poll() will always call the ConsumerRebalanceCallback? Assuming you choose to use group management (by using subscribe(topics)), poll() will invoke the ConsumerRebalanceCallback on every single rebalance attempt. Improved the javadoc http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html to explain that. Could you give that another look? If I'm on the right track, I'd like to expand this example, showing how each MyConsumer can keep track of its partitions offsets, even in the face of rebalances. As Jay said, I think a minimal code example could really help us see the utility faults of the api. Sure, please look at the javadoc http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html . I've tried to include code examples there. Please help in improving those or adding more. Looks like we should add some multi threading examples. I avoided adding those since there are many ways to handling the message processing and it will not be feasible to list all of those. If we list one, people might think that is the only recommended approach. With that said, here is an example of using Option b) above - ListMyConsumer consumers = new ArrayListMyConsumer(); ListString topics = new ArrayListString(); // populate topics assert(consumers.size == topics.size); for (int i = 0; i numThreads; i++) { MyConsumer c = new MyConsumer(); c.subscribe(topics(i)); consumers.add(c); } // poll each consumer in a separate thread. for (int i = 0; i numThreads; i++) { executorService.submit(new Runnable() { @Override public void run() { new ProcessMessagesTask(consumers(i)); } }); } Let me know what you think. Thanks, Neha On Tue, Feb 11, 2014 at 3:54 PM, Jay Kreps
Re: New Consumer API discussion
Hey Pradeep, That wiki is fairly old and it predated more flexible subscription mechanisms. In the high-level consumer you currently have wildcard subscription and in the new proposed interface you can actually subscribe based on any logic you want to create a union of streams. Personally I think this gives you everything you would want with a hierarchy and more actual flexibility (since you can define groupings however you want). What do you think? -Jay On Mon, Feb 10, 2014 at 3:37 PM, Pradeep Gollakota pradeep...@gmail.comwrote: WRT to hierarchical topics, I'm referring to KAFKA-1175https://issues.apache.org/jira/browse/KAFKA-1175. I would just like to think through the implications for the Consumer API if and when we do implement hierarchical topics. For example, in the proposal https://cwiki.apache.org/confluence/display/KAFKA/Hierarchical+Topics# written by Jay, he says that initially wildcard subscriptions are not going to be supported. But does that mean that they will be supported in v2? If that's the case, that would change the semantics of the Consumer API. As to having classes for Topic, PartitionId, etc. it looks like I was referring to the TopicPartition and TopicPartitionOffset classes (I didn't realize these were already there). I was only looking at the confluence page which shows List[(String, Int, Long)] instead of List[TopicParitionOffset] (as is shown in the javadoc). However, I did notice that we're not being consistent in the Java version. E.g. we have commit(TopicPartitionOffset... offsets) and lastCommittedOffsets(TopicPartition... partitions) on the one hand. On the other hand we have subscribe(String topic, int... partitions). I agree that creating a class for TopicId today would probably not make too much sense today. But with hierarchical topics, I may change my mind. This is exactly what was done in the HBase API in 0.96 when namespaces were added. 0.96 HBase API introduced a class called 'TableName' to represent the namespace and table name. On Mon, Feb 10, 2014 at 3:08 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Thanks for the feedback. Mattijs - - Constructors link to http://kafka.apache.org/documentation.html#consumerconfigs for valid configurations, which lists zookeeper.connect rather than metadata.broker.list, the value for BROKER_LIST_CONFIG in ConsumerConfig. Fixed it to just point to ConsumerConfig for now until we finalize the new configs - Docs for poll(long) mention consumer.commit(true), which I can't find in the Consumer docs. For a simple consumer setup, that call is something that would make a lot of sense. Missed changing the examples to use consumer.commit(true, offsets). The suggestions by Jay would change it to commit(offsets) and commitAsync(offsets), which will hopefully make it easier to understand those commit APIs. - Love the addition of MockConsumer, awesome for unittesting :) I'm not quite satisfied with what it does as of right now, but we will surely improve it as we start writing the consumer. Jay - 1. ConsumerRebalanceCallback a. Makes sense. Renamed to onPartitionsRevoked b. Ya, it will be good to make it forward compatible with Java 8 capabilities. We can change it to PartitionsAssignedCallback and PartitionsRevokedCallback or RebalanceBeginCallback and RebalanceEndCallback? c. Ya, I thought about that but then didn't name it just RebalanceCallback since there could be a conflict with a controller side rebalance callback if/when we have one. However, you can argue that at that time we can name it ControllerRebalanceCallback instead of polluting a user facing API. So agree with you here. 2. Ya, that is a good idea. Changed to subscribe(String topic, int...partitions). 3. lastCommittedOffset() is not necessarily a local access since the consumer can potentially ask for the last committed offsets of partitions that the consumer does not consume and maintain the offsets for. That's the reason it is batched right now. 4. Yes, look at http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html#AUTO_OFFSET_RESET_CONFIG 5. Sure, but that is not part of the consumer API right? I think you're suggesting looking at OffsetRequest to see if it would do that properly? 6. Good point. Changed to poll(long timeout, TimeUnit) and poll with a negative timeout will poll indefinitely? 7. Good point. Changed to commit(...) and commitAsync(...) 8. To commit the current position for all partitions owned by the consumer, you can use commit(). If you don't use group management, then commit(customListOfPartitions) 9. Forgot to include unsubscribe. Done now. 10. positions() can be called at any time and affects the next fetch on the next poll(). Fixed the places that said starting fetch offsets 11. Can we not look that up by going through the
Re: New Consumer API discussion
Hi Jay, I apologize for derailing the conversation about the consumer API. We should start a new discussion about hierarchical topics, if we want to keep talking about it. My final thought on the matter is that, hierarchical topics is still an important feature to have in Kafka, because it gives us flexibility to do namespace level access controls. Getting back to the topic of the Consumer API: 1. Any thoughts on consistency for method arguments and return types? 2. lastCommittedOffsets() method returns a ListTopicPartitionOffsetwhere as the confluence page suggested a MapTopicPartition, Long. I would think that a Map is the more appropriate return type. On Tue, Feb 11, 2014 at 8:04 AM, Jay Kreps jay.kr...@gmail.com wrote: Hey Pradeep, That wiki is fairly old and it predated more flexible subscription mechanisms. In the high-level consumer you currently have wildcard subscription and in the new proposed interface you can actually subscribe based on any logic you want to create a union of streams. Personally I think this gives you everything you would want with a hierarchy and more actual flexibility (since you can define groupings however you want). What do you think? -Jay On Mon, Feb 10, 2014 at 3:37 PM, Pradeep Gollakota pradeep...@gmail.com wrote: WRT to hierarchical topics, I'm referring to KAFKA-1175https://issues.apache.org/jira/browse/KAFKA-1175. I would just like to think through the implications for the Consumer API if and when we do implement hierarchical topics. For example, in the proposal https://cwiki.apache.org/confluence/display/KAFKA/Hierarchical+Topics# written by Jay, he says that initially wildcard subscriptions are not going to be supported. But does that mean that they will be supported in v2? If that's the case, that would change the semantics of the Consumer API. As to having classes for Topic, PartitionId, etc. it looks like I was referring to the TopicPartition and TopicPartitionOffset classes (I didn't realize these were already there). I was only looking at the confluence page which shows List[(String, Int, Long)] instead of List[TopicParitionOffset] (as is shown in the javadoc). However, I did notice that we're not being consistent in the Java version. E.g. we have commit(TopicPartitionOffset... offsets) and lastCommittedOffsets(TopicPartition... partitions) on the one hand. On the other hand we have subscribe(String topic, int... partitions). I agree that creating a class for TopicId today would probably not make too much sense today. But with hierarchical topics, I may change my mind. This is exactly what was done in the HBase API in 0.96 when namespaces were added. 0.96 HBase API introduced a class called 'TableName' to represent the namespace and table name. On Mon, Feb 10, 2014 at 3:08 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Thanks for the feedback. Mattijs - - Constructors link to http://kafka.apache.org/documentation.html#consumerconfigs for valid configurations, which lists zookeeper.connect rather than metadata.broker.list, the value for BROKER_LIST_CONFIG in ConsumerConfig. Fixed it to just point to ConsumerConfig for now until we finalize the new configs - Docs for poll(long) mention consumer.commit(true), which I can't find in the Consumer docs. For a simple consumer setup, that call is something that would make a lot of sense. Missed changing the examples to use consumer.commit(true, offsets). The suggestions by Jay would change it to commit(offsets) and commitAsync(offsets), which will hopefully make it easier to understand those commit APIs. - Love the addition of MockConsumer, awesome for unittesting :) I'm not quite satisfied with what it does as of right now, but we will surely improve it as we start writing the consumer. Jay - 1. ConsumerRebalanceCallback a. Makes sense. Renamed to onPartitionsRevoked b. Ya, it will be good to make it forward compatible with Java 8 capabilities. We can change it to PartitionsAssignedCallback and PartitionsRevokedCallback or RebalanceBeginCallback and RebalanceEndCallback? c. Ya, I thought about that but then didn't name it just RebalanceCallback since there could be a conflict with a controller side rebalance callback if/when we have one. However, you can argue that at that time we can name it ControllerRebalanceCallback instead of polluting a user facing API. So agree with you here. 2. Ya, that is a good idea. Changed to subscribe(String topic, int...partitions). 3. lastCommittedOffset() is not necessarily a local access since the consumer can potentially ask for the last committed offsets of partitions that the consumer does not consume and maintain the offsets for. That's the reason it is batched right now. 4. Yes, look at
Re: New Consumer API discussion
Comments inline: On Mon, Feb 10, 2014 at 2:31 PM, Guozhang Wang wangg...@gmail.com wrote: Hello Jay, Thanks for the detailed comments. 1. Yeah we could discuss a bit more on that. 2. Since subscribe() is incremental, adding one topic-partition is OK, and personally I think it is cleaner than subscribe(String topic, int...partition)? I am not too particular. Have you actually tried this? I think writing actual sample code is important. 3. Originally I was thinking about two interfaces: getOffsets() // offsets for all partitions that I am consuming now getOffset(topc-partition) // offset of the specified topic-partition, will throw exception if it is not currently consumed. What do you think about these? The naming needs to distinguish committed offset position versus fetch offset position. Also we aren't using the getX convention. 4. Yes, that remains a config. Does that make sense given that you change your position via an api now? 5. Agree. 6. If the time out value is null then it will logically return immediately with whatever data is available. I think an indefinitely poll() function could be replaced with just while (true) poll(some-time)? That is fine but we should provide a no arg poll for that, poll(null) isn't clear. We should add the timeunit as per the post java 5 convention as that makes the call more readable. E.g. poll(5) vs poll(5, TimeUnit.MILLISECONDS) 7. I am open with either approach. Cool. 8. I was thinking about two interfaces for the commit functionality: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design Do those sound better? Well none kind of address the common case which is to commit all partitions. For these I was thinking just commit(); The advantage of this simpler method is that you don't need to bother about partitions you just consume the messages given to you and then commit them. 9. Currently I think about un-subscribe as close and re-subscribe, and would like to hear people's opinion about it. Hmm, I think it is a little weird if there is a subscribe which can be called at any time but no unsubscribe. Would this be hard to do. 10. Yes. Position() is an API function, and as and API it means be called at any time and will change the next fetching starting offset. Cool. 11. The ConsumerRecord would have the offset info of the message. Is that what you want? But that is only after I have gotten a message. I'm not sure if that covers all cases or not. About use cases: great point. I will add some more examples of using the API functions in the wiki pages. Guozhang On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps jay.kr...@gmail.com wrote: A few items: 1. ConsumerRebalanceCallback a. onPartitionsRevoked would be a better name. b. We should discuss the possibility of splitting this into two interfaces. The motivation would be that in Java 8 single method interfaces can directly take methods which might be more intuitive. c. If we stick with a single interface I would prefer the name RebalanceCallback as its more concise 2. Should subscribe(String topic, int partition) should be subscribe(String topic, int...partition)? 3. Is lastCommittedOffset call just a local access? If so it would be more convenient not to batch it. 4. How are we going to handle the earliest/latest starting position functionality we currently have. Does that remain a config? 5. Do we need to expose the general ability to get known positions from the log? E.g. the functionality in the OffsetRequest...? That would make the ability to change position a little easier. 6. Should poll(java.lang.Long timeout) be poll(long timeout, TimeUnit unit)? Is it Long because it allows null? If so should we just add a poll() that polls indefinitely? 7. I recommend we remove the boolean parameter from commit as it is really hard to read code that has boolean parameters without named arguments. Can we make it something like commit(...) and commitAsync(...)? 8. What about the common case where you just want to commit the current position for all partitions? 9. How do you unsubscribe? 10. You say in a few places that positions() only impacts the starting position, but surely that isn't the case, right? Surely it controls the fetch position for that partition and can be called at any time? Otherwise it is a pretty weird api, right? 11. How do I get my current position? Not the committed position but the offset of the next message that will be given to me? One thing that I really found helpful for the API design was writing out actual code for different scenarios against the API. I think it might be good to do that for this too--i.e. enumerate the various use cases and code that use case up to see how it looks. I'm not sure if it would be useful to collect these kinds of scenarios from people. I know they have sporadically
New Consumer API discussion
As mentioned in previous emails, we are also working on a re-implementation of the consumer. I would like to use this email thread to discuss the details of the public API. I would also like us to be picky about this public api now so it is as good as possible and we don't need to break it in the future. The best way to get a feel for the API is actually to take a look at the javadochttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html, the hope is to get the api docs good enough so that it is self-explanatory. You can also take a look at the configs herehttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html Some background info on implementation: At a high level the primary difference in this consumer is that it removes the distinction between the high-level and low-level consumer. The new consumer API is non blocking and instead of returning a blocking iterator, the consumer provides a poll() API that returns a list of records. We think this is better compared to the blocking iterators since it effectively decouples the threading strategy used for processing messages from the consumer. It is worth noting that the consumer is entirely single threaded and runs in the user thread. The advantage is that it can be easily rewritten in less multi-threading-friendly languages. The consumer batches data and multiplexes I/O over TCP connections to each of the brokers it communicates with, for high throughput. The consumer also allows long poll to reduce the end-to-end message latency for low throughput data. The consumer provides a group management facility that supports the concept of a group with multiple consumer instances (just like the current consumer). This is done through a custom heartbeat and group management protocol transparent to the user. At the same time, it allows users the option to subscribe to a fixed set of partitions and not use group management at all. The offset management strategy defaults to Kafka based offset management and the API provides a way for the user to use a customized offset store to manage the consumer's offsets. A key difference in this consumer also is the fact that it does not depend on zookeeper at all. More details about the new consumer design are herehttps://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design Please take a look at the new APIhttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.htmland give us any thoughts you may have. Thanks, Neha
Re: New Consumer API discussion
Hey Neha, This looks really promising, I particularly like the ability to commit offsets for topic/partition tuples over just commit(). Some remarks: - Constructors link to http://kafka.apache.org/documentation.html#consumerconfigs for valid configurations, which lists zookeeper.connect rather than metadata.broker.list, the value for BROKER_LIST_CONFIG in ConsumerConfig. - Docs for poll(long) mention consumer.commit(true), which I can't find in the Consumer docs. For a simple consumer setup, that call is something that would make a lot of sense. - Love the addition of MockConsumer, awesome for unittesting :) Digging these open discussions on API changes on the mailing list btw, keep up the good work :) Kind regards, Mattijs
Re: New Consumer API discussion
A few items: 1. ConsumerRebalanceCallback a. onPartitionsRevoked would be a better name. b. We should discuss the possibility of splitting this into two interfaces. The motivation would be that in Java 8 single method interfaces can directly take methods which might be more intuitive. c. If we stick with a single interface I would prefer the name RebalanceCallback as its more concise 2. Should subscribe(String topic, int partition) should be subscribe(String topic, int...partition)? 3. Is lastCommittedOffset call just a local access? If so it would be more convenient not to batch it. 4. How are we going to handle the earliest/latest starting position functionality we currently have. Does that remain a config? 5. Do we need to expose the general ability to get known positions from the log? E.g. the functionality in the OffsetRequest...? That would make the ability to change position a little easier. 6. Should poll(java.lang.Long timeout) be poll(long timeout, TimeUnit unit)? Is it Long because it allows null? If so should we just add a poll() that polls indefinitely? 7. I recommend we remove the boolean parameter from commit as it is really hard to read code that has boolean parameters without named arguments. Can we make it something like commit(...) and commitAsync(...)? 8. What about the common case where you just want to commit the current position for all partitions? 9. How do you unsubscribe? 10. You say in a few places that positions() only impacts the starting position, but surely that isn't the case, right? Surely it controls the fetch position for that partition and can be called at any time? Otherwise it is a pretty weird api, right? 11. How do I get my current position? Not the committed position but the offset of the next message that will be given to me? One thing that I really found helpful for the API design was writing out actual code for different scenarios against the API. I think it might be good to do that for this too--i.e. enumerate the various use cases and code that use case up to see how it looks. I'm not sure if it would be useful to collect these kinds of scenarios from people. I know they have sporadically popped up on the mailing list. -Jay On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede neha.narkh...@gmail.comwrote: As mentioned in previous emails, we are also working on a re-implementation of the consumer. I would like to use this email thread to discuss the details of the public API. I would also like us to be picky about this public api now so it is as good as possible and we don't need to break it in the future. The best way to get a feel for the API is actually to take a look at the javadoc http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html , the hope is to get the api docs good enough so that it is self-explanatory. You can also take a look at the configs here http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html Some background info on implementation: At a high level the primary difference in this consumer is that it removes the distinction between the high-level and low-level consumer. The new consumer API is non blocking and instead of returning a blocking iterator, the consumer provides a poll() API that returns a list of records. We think this is better compared to the blocking iterators since it effectively decouples the threading strategy used for processing messages from the consumer. It is worth noting that the consumer is entirely single threaded and runs in the user thread. The advantage is that it can be easily rewritten in less multi-threading-friendly languages. The consumer batches data and multiplexes I/O over TCP connections to each of the brokers it communicates with, for high throughput. The consumer also allows long poll to reduce the end-to-end message latency for low throughput data. The consumer provides a group management facility that supports the concept of a group with multiple consumer instances (just like the current consumer). This is done through a custom heartbeat and group management protocol transparent to the user. At the same time, it allows users the option to subscribe to a fixed set of partitions and not use group management at all. The offset management strategy defaults to Kafka based offset management and the API provides a way for the user to use a customized offset store to manage the consumer's offsets. A key difference in this consumer also is the fact that it does not depend on zookeeper at all. More details about the new consumer design are here https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design Please take a look at the new API http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html and give us any thoughts you may have. Thanks, Neha
Re: New Consumer API discussion
Couple of very quick thoughts. 1. +1 about renaming commit(...) and commitAsync(...) 2. I'd also like to extend the above for the poll() method as well. poll() and pollWithTimeout(long, TimeUnit)? 3. Have you guys given any thought around how this API would be used with hierarchical topics? 4. Would it make sense to add classes such as TopicId, PartitionId, etc? Seems like it would be easier to read code with these classes as opposed to string and longs. - Pradeep On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps jay.kr...@gmail.com wrote: A few items: 1. ConsumerRebalanceCallback a. onPartitionsRevoked would be a better name. b. We should discuss the possibility of splitting this into two interfaces. The motivation would be that in Java 8 single method interfaces can directly take methods which might be more intuitive. c. If we stick with a single interface I would prefer the name RebalanceCallback as its more concise 2. Should subscribe(String topic, int partition) should be subscribe(String topic, int...partition)? 3. Is lastCommittedOffset call just a local access? If so it would be more convenient not to batch it. 4. How are we going to handle the earliest/latest starting position functionality we currently have. Does that remain a config? 5. Do we need to expose the general ability to get known positions from the log? E.g. the functionality in the OffsetRequest...? That would make the ability to change position a little easier. 6. Should poll(java.lang.Long timeout) be poll(long timeout, TimeUnit unit)? Is it Long because it allows null? If so should we just add a poll() that polls indefinitely? 7. I recommend we remove the boolean parameter from commit as it is really hard to read code that has boolean parameters without named arguments. Can we make it something like commit(...) and commitAsync(...)? 8. What about the common case where you just want to commit the current position for all partitions? 9. How do you unsubscribe? 10. You say in a few places that positions() only impacts the starting position, but surely that isn't the case, right? Surely it controls the fetch position for that partition and can be called at any time? Otherwise it is a pretty weird api, right? 11. How do I get my current position? Not the committed position but the offset of the next message that will be given to me? One thing that I really found helpful for the API design was writing out actual code for different scenarios against the API. I think it might be good to do that for this too--i.e. enumerate the various use cases and code that use case up to see how it looks. I'm not sure if it would be useful to collect these kinds of scenarios from people. I know they have sporadically popped up on the mailing list. -Jay On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede neha.narkh...@gmail.com wrote: As mentioned in previous emails, we are also working on a re-implementation of the consumer. I would like to use this email thread to discuss the details of the public API. I would also like us to be picky about this public api now so it is as good as possible and we don't need to break it in the future. The best way to get a feel for the API is actually to take a look at the javadoc http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html , the hope is to get the api docs good enough so that it is self-explanatory. You can also take a look at the configs here http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html Some background info on implementation: At a high level the primary difference in this consumer is that it removes the distinction between the high-level and low-level consumer. The new consumer API is non blocking and instead of returning a blocking iterator, the consumer provides a poll() API that returns a list of records. We think this is better compared to the blocking iterators since it effectively decouples the threading strategy used for processing messages from the consumer. It is worth noting that the consumer is entirely single threaded and runs in the user thread. The advantage is that it can be easily rewritten in less multi-threading-friendly languages. The consumer batches data and multiplexes I/O over TCP connections to each of the brokers it communicates with, for high throughput. The consumer also allows long poll to reduce the end-to-end message latency for low throughput data. The consumer provides a group management facility that supports the concept of a group with multiple consumer instances (just like the current consumer). This is done through a custom heartbeat and group management protocol transparent to the user. At the same time, it allows users the option to subscribe to a fixed set of partitions and not use group management at all. The
Re: New Consumer API discussion
Hi Mattijs: We have not updated the wiki pages for config yet, and it will not be updated until we release 0.9 with these changes. Currently consumers do have a commitOffsets function that can be called by the users, but for most use cases auto.commit is turned on and this function gets called by the consumer client itself. Guozhang On Mon, Feb 10, 2014 at 11:18 AM, Mattijs Ugen akaid...@almost3.net wrote: Hey Neha, This looks really promising, I particularly like the ability to commit offsets for topic/partition tuples over just commit(). Some remarks: - Constructors link to http://kafka.apache.org/documentation.html# consumerconfigs for valid configurations, which lists zookeeper.connect rather than metadata.broker.list, the value for BROKER_LIST_CONFIG in ConsumerConfig. - Docs for poll(long) mention consumer.commit(true), which I can't find in the Consumer docs. For a simple consumer setup, that call is something that would make a lot of sense. - Love the addition of MockConsumer, awesome for unittesting :) Digging these open discussions on API changes on the mailing list btw, keep up the good work :) Kind regards, Mattijs -- -- Guozhang
Re: New Consumer API discussion
Hello Jay, Thanks for the detailed comments. 1. Yeah we could discuss a bit more on that. 2. Since subscribe() is incremental, adding one topic-partition is OK, and personally I think it is cleaner than subscribe(String topic, int...partition)? 3. Originally I was thinking about two interfaces: getOffsets() // offsets for all partitions that I am consuming now getOffset(topc-partition) // offset of the specified topic-partition, will throw exception if it is not currently consumed. What do you think about these? 4. Yes, that remains a config. 5. Agree. 6. If the time out value is null then it will logically return immediately with whatever data is available. I think an indefinitely poll() function could be replaced with just while (true) poll(some-time)? 7. I am open with either approach. 8. I was thinking about two interfaces for the commit functionality: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design Do those sound better? 9. Currently I think about un-subscribe as close and re-subscribe, and would like to hear people's opinion about it. 10. Yes. Position() is an API function, and as and API it means be called at any time and will change the next fetching starting offset. 11. The ConsumerRecord would have the offset info of the message. Is that what you want? About use cases: great point. I will add some more examples of using the API functions in the wiki pages. Guozhang On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps jay.kr...@gmail.com wrote: A few items: 1. ConsumerRebalanceCallback a. onPartitionsRevoked would be a better name. b. We should discuss the possibility of splitting this into two interfaces. The motivation would be that in Java 8 single method interfaces can directly take methods which might be more intuitive. c. If we stick with a single interface I would prefer the name RebalanceCallback as its more concise 2. Should subscribe(String topic, int partition) should be subscribe(String topic, int...partition)? 3. Is lastCommittedOffset call just a local access? If so it would be more convenient not to batch it. 4. How are we going to handle the earliest/latest starting position functionality we currently have. Does that remain a config? 5. Do we need to expose the general ability to get known positions from the log? E.g. the functionality in the OffsetRequest...? That would make the ability to change position a little easier. 6. Should poll(java.lang.Long timeout) be poll(long timeout, TimeUnit unit)? Is it Long because it allows null? If so should we just add a poll() that polls indefinitely? 7. I recommend we remove the boolean parameter from commit as it is really hard to read code that has boolean parameters without named arguments. Can we make it something like commit(...) and commitAsync(...)? 8. What about the common case where you just want to commit the current position for all partitions? 9. How do you unsubscribe? 10. You say in a few places that positions() only impacts the starting position, but surely that isn't the case, right? Surely it controls the fetch position for that partition and can be called at any time? Otherwise it is a pretty weird api, right? 11. How do I get my current position? Not the committed position but the offset of the next message that will be given to me? One thing that I really found helpful for the API design was writing out actual code for different scenarios against the API. I think it might be good to do that for this too--i.e. enumerate the various use cases and code that use case up to see how it looks. I'm not sure if it would be useful to collect these kinds of scenarios from people. I know they have sporadically popped up on the mailing list. -Jay On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede neha.narkh...@gmail.com wrote: As mentioned in previous emails, we are also working on a re-implementation of the consumer. I would like to use this email thread to discuss the details of the public API. I would also like us to be picky about this public api now so it is as good as possible and we don't need to break it in the future. The best way to get a feel for the API is actually to take a look at the javadoc http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html , the hope is to get the api docs good enough so that it is self-explanatory. You can also take a look at the configs here http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html Some background info on implementation: At a high level the primary difference in this consumer is that it removes the distinction between the high-level and low-level consumer. The new consumer API is non blocking and instead of returning a blocking iterator, the consumer provides a poll() API that returns a list of records. We think
Re: New Consumer API discussion
Hi Mattijs: 2. As Neha said, one design of the new consumer is to have non-blocking consuming API instead of blocking API. Do you have a strong reason in mind to still keep the blocking API instead of just using while(no-data) poll(timeout)? 3. No we have not thought about hierarchical topics. Could you elaborate on some use cases? 4. Consumer will share some of the common code as Producer, in which the ProduceRecord has private final String topic; private final Integer partition; private final byte[] key; private final byte[] value; Thanks, Guozhang On Mon, Feb 10, 2014 at 2:31 PM, Guozhang Wang wangg...@gmail.com wrote: Hello Jay, Thanks for the detailed comments. 1. Yeah we could discuss a bit more on that. 2. Since subscribe() is incremental, adding one topic-partition is OK, and personally I think it is cleaner than subscribe(String topic, int...partition)? 3. Originally I was thinking about two interfaces: getOffsets() // offsets for all partitions that I am consuming now getOffset(topc-partition) // offset of the specified topic-partition, will throw exception if it is not currently consumed. What do you think about these? 4. Yes, that remains a config. 5. Agree. 6. If the time out value is null then it will logically return immediately with whatever data is available. I think an indefinitely poll() function could be replaced with just while (true) poll(some-time)? 7. I am open with either approach. 8. I was thinking about two interfaces for the commit functionality: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design Do those sound better? 9. Currently I think about un-subscribe as close and re-subscribe, and would like to hear people's opinion about it. 10. Yes. Position() is an API function, and as and API it means be called at any time and will change the next fetching starting offset. 11. The ConsumerRecord would have the offset info of the message. Is that what you want? About use cases: great point. I will add some more examples of using the API functions in the wiki pages. Guozhang On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps jay.kr...@gmail.com wrote: A few items: 1. ConsumerRebalanceCallback a. onPartitionsRevoked would be a better name. b. We should discuss the possibility of splitting this into two interfaces. The motivation would be that in Java 8 single method interfaces can directly take methods which might be more intuitive. c. If we stick with a single interface I would prefer the name RebalanceCallback as its more concise 2. Should subscribe(String topic, int partition) should be subscribe(String topic, int...partition)? 3. Is lastCommittedOffset call just a local access? If so it would be more convenient not to batch it. 4. How are we going to handle the earliest/latest starting position functionality we currently have. Does that remain a config? 5. Do we need to expose the general ability to get known positions from the log? E.g. the functionality in the OffsetRequest...? That would make the ability to change position a little easier. 6. Should poll(java.lang.Long timeout) be poll(long timeout, TimeUnit unit)? Is it Long because it allows null? If so should we just add a poll() that polls indefinitely? 7. I recommend we remove the boolean parameter from commit as it is really hard to read code that has boolean parameters without named arguments. Can we make it something like commit(...) and commitAsync(...)? 8. What about the common case where you just want to commit the current position for all partitions? 9. How do you unsubscribe? 10. You say in a few places that positions() only impacts the starting position, but surely that isn't the case, right? Surely it controls the fetch position for that partition and can be called at any time? Otherwise it is a pretty weird api, right? 11. How do I get my current position? Not the committed position but the offset of the next message that will be given to me? One thing that I really found helpful for the API design was writing out actual code for different scenarios against the API. I think it might be good to do that for this too--i.e. enumerate the various use cases and code that use case up to see how it looks. I'm not sure if it would be useful to collect these kinds of scenarios from people. I know they have sporadically popped up on the mailing list. -Jay On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede neha.narkh...@gmail.com wrote: As mentioned in previous emails, we are also working on a re-implementation of the consumer. I would like to use this email thread to discuss the details of the public API. I would also like us to be picky about this public api now so it is as good as possible and we don't need to break it in the future. The best way to get a feel for the API is actually to take a look at the javadoc
Re: New Consumer API discussion
Thanks for the feedback. Mattijs - - Constructors link to http://kafka.apache.org/documentation.html#consumerconfigs for valid configurations, which lists zookeeper.connect rather than metadata.broker.list, the value for BROKER_LIST_CONFIG in ConsumerConfig. Fixed it to just point to ConsumerConfig for now until we finalize the new configs - Docs for poll(long) mention consumer.commit(true), which I can't find in the Consumer docs. For a simple consumer setup, that call is something that would make a lot of sense. Missed changing the examples to use consumer.commit(true, offsets). The suggestions by Jay would change it to commit(offsets) and commitAsync(offsets), which will hopefully make it easier to understand those commit APIs. - Love the addition of MockConsumer, awesome for unittesting :) I'm not quite satisfied with what it does as of right now, but we will surely improve it as we start writing the consumer. Jay - 1. ConsumerRebalanceCallback a. Makes sense. Renamed to onPartitionsRevoked b. Ya, it will be good to make it forward compatible with Java 8 capabilities. We can change it to PartitionsAssignedCallback and PartitionsRevokedCallback or RebalanceBeginCallback and RebalanceEndCallback? c. Ya, I thought about that but then didn't name it just RebalanceCallback since there could be a conflict with a controller side rebalance callback if/when we have one. However, you can argue that at that time we can name it ControllerRebalanceCallback instead of polluting a user facing API. So agree with you here. 2. Ya, that is a good idea. Changed to subscribe(String topic, int...partitions). 3. lastCommittedOffset() is not necessarily a local access since the consumer can potentially ask for the last committed offsets of partitions that the consumer does not consume and maintain the offsets for. That's the reason it is batched right now. 4. Yes, look at http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html#AUTO_OFFSET_RESET_CONFIG 5. Sure, but that is not part of the consumer API right? I think you're suggesting looking at OffsetRequest to see if it would do that properly? 6. Good point. Changed to poll(long timeout, TimeUnit) and poll with a negative timeout will poll indefinitely? 7. Good point. Changed to commit(...) and commitAsync(...) 8. To commit the current position for all partitions owned by the consumer, you can use commit(). If you don't use group management, then commit(customListOfPartitions) 9. Forgot to include unsubscribe. Done now. 10. positions() can be called at any time and affects the next fetch on the next poll(). Fixed the places that said starting fetch offsets 11. Can we not look that up by going through the messages returned and getting the offset from the ConsumerRecord? One thing that I really found helpful for the API design was writing out actual code for different scenarios against the API. I think it might be good to do that for this too--i.e. enumerate the various use cases and code that use case up to see how it looks The javadocs include examples for almost all possible scenarios of consumer usage, that I could come up with. Will add more to the javadocs as I get more feedback from our users. The advantage of having the examples in the javadoc itself is to that the usage is self explanatory to new users. Pradeep - 2. Changed to poll(long, TimeUnit) and a negative value for the timeout would block in the poll forever until there is new data 3. We don't have hierarchical topics support. Would you mind explaining what you meant? 4. I'm not so sure that we need a class to express a topic which is a string and a separate class for just partition id. We do have a class for TopicPartition which uniquely identifies a partition of a topic Thanks, Neha On Mon, Feb 10, 2014 at 12:36 PM, Pradeep Gollakota pradeep...@gmail.comwrote: Couple of very quick thoughts. 1. +1 about renaming commit(...) and commitAsync(...) 2. I'd also like to extend the above for the poll() method as well. poll() and pollWithTimeout(long, TimeUnit)? 3. Have you guys given any thought around how this API would be used with hierarchical topics? 4. Would it make sense to add classes such as TopicId, PartitionId, etc? Seems like it would be easier to read code with these classes as opposed to string and longs. - Pradeep On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps jay.kr...@gmail.com wrote: A few items: 1. ConsumerRebalanceCallback a. onPartitionsRevoked would be a better name. b. We should discuss the possibility of splitting this into two interfaces. The motivation would be that in Java 8 single method interfaces can directly take methods which might be more intuitive. c. If we stick with a single interface I would prefer the name RebalanceCallback as its more concise 2. Should subscribe(String topic, int partition) should be subscribe(String topic, int...partition)? 3.
Re: New Consumer API discussion
WRT to hierarchical topics, I'm referring to KAFKA-1175https://issues.apache.org/jira/browse/KAFKA-1175. I would just like to think through the implications for the Consumer API if and when we do implement hierarchical topics. For example, in the proposalhttps://cwiki.apache.org/confluence/display/KAFKA/Hierarchical+Topics#written by Jay, he says that initially wildcard subscriptions are not going to be supported. But does that mean that they will be supported in v2? If that's the case, that would change the semantics of the Consumer API. As to having classes for Topic, PartitionId, etc. it looks like I was referring to the TopicPartition and TopicPartitionOffset classes (I didn't realize these were already there). I was only looking at the confluence page which shows List[(String, Int, Long)] instead of List[TopicParitionOffset] (as is shown in the javadoc). However, I did notice that we're not being consistent in the Java version. E.g. we have commit(TopicPartitionOffset... offsets) and lastCommittedOffsets(TopicPartition... partitions) on the one hand. On the other hand we have subscribe(String topic, int... partitions). I agree that creating a class for TopicId today would probably not make too much sense today. But with hierarchical topics, I may change my mind. This is exactly what was done in the HBase API in 0.96 when namespaces were added. 0.96 HBase API introduced a class called 'TableName' to represent the namespace and table name. On Mon, Feb 10, 2014 at 3:08 PM, Neha Narkhede neha.narkh...@gmail.comwrote: Thanks for the feedback. Mattijs - - Constructors link to http://kafka.apache.org/documentation.html#consumerconfigs for valid configurations, which lists zookeeper.connect rather than metadata.broker.list, the value for BROKER_LIST_CONFIG in ConsumerConfig. Fixed it to just point to ConsumerConfig for now until we finalize the new configs - Docs for poll(long) mention consumer.commit(true), which I can't find in the Consumer docs. For a simple consumer setup, that call is something that would make a lot of sense. Missed changing the examples to use consumer.commit(true, offsets). The suggestions by Jay would change it to commit(offsets) and commitAsync(offsets), which will hopefully make it easier to understand those commit APIs. - Love the addition of MockConsumer, awesome for unittesting :) I'm not quite satisfied with what it does as of right now, but we will surely improve it as we start writing the consumer. Jay - 1. ConsumerRebalanceCallback a. Makes sense. Renamed to onPartitionsRevoked b. Ya, it will be good to make it forward compatible with Java 8 capabilities. We can change it to PartitionsAssignedCallback and PartitionsRevokedCallback or RebalanceBeginCallback and RebalanceEndCallback? c. Ya, I thought about that but then didn't name it just RebalanceCallback since there could be a conflict with a controller side rebalance callback if/when we have one. However, you can argue that at that time we can name it ControllerRebalanceCallback instead of polluting a user facing API. So agree with you here. 2. Ya, that is a good idea. Changed to subscribe(String topic, int...partitions). 3. lastCommittedOffset() is not necessarily a local access since the consumer can potentially ask for the last committed offsets of partitions that the consumer does not consume and maintain the offsets for. That's the reason it is batched right now. 4. Yes, look at http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html#AUTO_OFFSET_RESET_CONFIG 5. Sure, but that is not part of the consumer API right? I think you're suggesting looking at OffsetRequest to see if it would do that properly? 6. Good point. Changed to poll(long timeout, TimeUnit) and poll with a negative timeout will poll indefinitely? 7. Good point. Changed to commit(...) and commitAsync(...) 8. To commit the current position for all partitions owned by the consumer, you can use commit(). If you don't use group management, then commit(customListOfPartitions) 9. Forgot to include unsubscribe. Done now. 10. positions() can be called at any time and affects the next fetch on the next poll(). Fixed the places that said starting fetch offsets 11. Can we not look that up by going through the messages returned and getting the offset from the ConsumerRecord? One thing that I really found helpful for the API design was writing out actual code for different scenarios against the API. I think it might be good to do that for this too--i.e. enumerate the various use cases and code that use case up to see how it looks The javadocs include examples for almost all possible scenarios of consumer usage, that I could come up with. Will add more to the javadocs as I get more feedback from our users. The advantage of having the examples in the javadoc itself is to that the usage is self explanatory to new users.