If people don't have any more thoughts on this, I will go ahead and submit
a reviewboard to https://issues.apache.org/jira/browse/KAFKA-1328.

Thanks,
Neha


On Mon, Mar 24, 2014 at 5:39 PM, Neha Narkhede <neha.narkh...@gmail.com>wrote:

> I took some time to write some example code using the new consumer APIs to
> cover a range of use cases. This exercise was very useful (thanks for the
> suggestion, Jay!) since I found several improvements to the APIs to make
> them more usable. Here are some of the 
> changes<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/>I
>  made -
>
> 1. Added usage examples to the KafkaConsumer 
> javadoc<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html>.
> I find it useful for the examples to be in the javadoc vs some wiki. Please
> go through these examples and suggest improvements. The goal would be to
> document a limited set of examples that cover every major use case.
> 2. All APIs that either accept or return offsets are changed to
> Map<TopicPartition,Long> instead of TopicPartitionOffset... In all the
> examples that I wrote, it was much easier to deal with offsets and pass
> them around in the consumer APIs if they were maps instead of lists
> 3. Due to the above change, I had to introduce 
> commit()<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html#commit%28%29>and
>  commitAsync() APIs explicitly, in addition to
> commit(Map<TopicPartition,Long> offsets) and
> commitAsync(Map<TopicPartition,Long> offsets), since the no-argument case
> would not be covered automatically with Map as the input parameter to the
> commit APIs
> 4. Offset rewind logic is funky with group management. I took a stab and
> it and wrote examples to cover the various offset rewind uses cases I could
> think of. I'm not so sure I like it, so I encourage people to take a look
> at the examples and provide feedback. This feedback is very critical in
> finalizing the consumer APIs as we might have to add/change APIs to make
> offset rewind intuitive and easy to use. (Please see the 3rd and 4th
> examples 
> here<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html>
> )
>
> Once I have feedback on the above, I will go ahead and submit a review
> board for the new APIs and javadoc.
>
> Thanks
> Neha
>
>
> On Mon, Mar 24, 2014 at 5:29 PM, Neha Narkhede <neha.narkh...@gmail.com>wrote:
>
>> Hey Chris,
>>
>> Really sorry for the late reply, wonder how this fell through the cracks.
>> Anyhow, thanks for the great feedback! Here are my comments -
>>
>>
>> 1. Why is the config String->Object instead of String->String?
>>
>> This is probably more of a feedback about the new config management that
>> we adopted in the new clients. I think it is more convenient to write
>> configs.put("a", 42);
>> instead of
>> configs.put("a", Integer.toString(42));
>>
>> 2. Are these Java docs correct?
>>
>>   KafkaConsumer(java.util.Map<
>> java.lang.String,java.lang.Object> configs)
>>   A consumer is instantiated by providing a set of key-value pairs as
>> configuration and a ConsumerRebalanceCallback implementation
>>
>> There is no ConsumerRebalanceCallback parameter.
>>
>> Fixed.
>>
>>
>> 3. Would like to have a method:
>>
>>   poll(long timeout, java.util.concurrent.TimeUnit timeUnit,
>> TopicPartition... topicAndPartitionsToPoll)
>>
>> I see I can effectively do this by just fiddling with subscribe and
>> unsubscribe before each poll. Is this a low-overhead operation? Can I just
>> unsubscribe from everything after each poll, then re-subscribe to a topic
>> the next iteration. I would probably be doing this in a fairly tight loop.
>>
>> The subscribe and unsubscribe will be very lightweight in-memory
>> operations,
>> so it shouldn't be a problem to just use those APIs directly.
>> Let me know if you think otherwise.
>>
>> 4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there
>> are use cases for decoupling "what to do when no offset exists" from "what
>> to do when I'm out of range". I might want to start from smallest the
>> first time I run, but fail if I ever get offset out of range.
>>
>> How about adding a third option "disable" to "auto.offset.reset"?
>> What this says is that never automatically reset the offset, either if
>> one is not found or if the offset
>> falls out of range. Presumably, you would want to turn this off when you
>> want to control the offsets
>> yourself and use custom rewind/replay logic to reset the consumer's
>> offset. In this case, you would
>> want to turn this feature off so Kafka does not accidentally reset the
>> offset to something else.
>>
>> I'm not so sure when you would want to make the distinction regarding
>> startup and offset falling out
>> of range. Presumably, if you don't trust Kafka to reset the offset, then
>> you can always turn this off
>> and use commit/commitAsync and seek() to set the consumer to the right
>> offset on startup and every
>> time your consumer falls out of range.
>>
>> Does that make sense?
>>
>> 5. ENABLE_JMX could use Java docs, even though it's fairly
>> self-explanatory.
>>
>> Fixed.
>>
>> 6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or
>> across all topic/partitions is useful. I believe it's per-topic/partition,
>> right? That is, setting to 2 megs with two TopicAndPartitions would result
>> in 4 megs worth of data coming in per fetch, right?
>>
>> Good point, clarified that. Take a look again to see if it makes sense
>> now.
>>
>> 7. What does the consumer do if METADATA_FETCH_TIMEOUT_CONFIG times out?
>> Retry, or throw exception?
>>
>> Throw a TimeoutException. Clarified that in the 
>> docs<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html>
>> .
>>
>>
>> 8. Does RECONNECT_BACKOFF_MS_CONFIG apply to both metadata requests and
>> fetch requests?
>>
>> Applies to all requests. Clarified that in the docs.
>>
>> 9. What does SESSION_TIMEOUT_MS default to?
>>
>> Defaults are largely TODO, but session.timeout.ms currently defaults to
>> 1000.
>>
>> 10. Is this consumer thread-safe?
>>
>> It should be. Updated the 
>> docs<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html>to
>>  clarify that.
>>
>> 11. How do you use a different offset management strategy? Your email
>> implies that it's pluggable, but I don't see how. "The offset management
>> strategy defaults to Kafka based offset management and the API provides a
>> way for the user to use a customized offset store to manage the consumer
>> 's
>> offsets."
>>
>> 12. If I wish to decouple the consumer from the offset checkpointing, is
>> it OK to use Joel's offset management stuff directly, rather than through
>> the consumer's commit API?
>>
>> For #11 and #12, I updated the 
>> docs<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html>to
>>  include actual usage examples.
>> Could you take a look and see if answers your questions?
>>
>> Thanks,
>> Neha
>>
>>
>>
>> On Mon, Mar 3, 2014 at 10:28 AM, Chris Riccomini <criccom...@linkedin.com
>> > wrote:
>>
>>> Hey Guys,
>>>
>>> Also, for reference, we'll be looking to implement new Samza consumers
>>> which have these APIs:
>>>
>>>
>>> http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/or
>>> g/apache/samza/system/SystemConsumer.html<http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/system/SystemConsumer.html>
>>>
>>>
>>> http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/or
>>> g/apache/samza/checkpoint/CheckpointManager.html<http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/org/apache/samza/checkpoint/CheckpointManager.html>
>>>
>>>
>>> Question (3) below is a result of having Samza's SystemConsumers poll
>>> allow specific topic/partitions to be specified.
>>>
>>> The split between consumer and checkpoint manager is the reason for
>>> question (12) below.
>>>
>>> Cheers,
>>> Chris
>>>
>>> On 3/3/14 10:19 AM, "Chris Riccomini" <criccom...@linkedin.com> wrote:
>>>
>>> >Hey Guys,
>>> >
>>> >Sorry for the late follow up. Here are my questions/thoughts on the API:
>>> >
>>> >1. Why is the config String->Object instead of String->String?
>>> >
>>> >2. Are these Java docs correct?
>>> >
>>> >  KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object>
>>> configs)
>>> >  A consumer is instantiated by providing a set of key-value pairs as
>>> >configuration and a ConsumerRebalanceCallback implementation
>>> >
>>> >There is no ConsumerRebalanceCallback parameter.
>>> >
>>> >3. Would like to have a method:
>>> >
>>> >  poll(long timeout, java.util.concurrent.TimeUnit timeUnit,
>>> >TopicPartition... topicAndPartitionsToPoll)
>>> >
>>> >I see I can effectively do this by just fiddling with subscribe and
>>> >unsubscribe before each poll. Is this a low-overhead operation? Can I
>>> just
>>> >unsubscribe from everything after each poll, then re-subscribe to a
>>> topic
>>> >the next iteration. I would probably be doing this in a fairly tight
>>> loop.
>>> >
>>> >4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there
>>> >are use cases for decoupling "what to do when no offset exists" from
>>> "what
>>> >to do when I'm out of range". I might want to start from smallest the
>>> >first time I run, but fail if I ever get offset out of range.
>>> >
>>> >5. ENABLE_JMX could use Java docs, even though it's fairly
>>> >self-explanatory.
>>> >
>>> >6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or
>>> >across all topic/partitions is useful. I believe it's
>>> per-topic/partition,
>>> >right? That is, setting to 2 megs with two TopicAndPartitions would
>>> result
>>> >in 4 megs worth of data coming in per fetch, right?
>>> >
>>> >7. What does the consumer do if METADATA_FETCH_TIMEOUT_CONFIG times out?
>>> >Retry, or throw exception?
>>> >
>>> >8. Does RECONNECT_BACKOFF_MS_CONFIG apply to both metadata requests and
>>> >fetch requests?
>>> >
>>> >9. What does SESSION_TIMEOUT_MS default to?
>>> >
>>> >10. Is this consumer thread-safe?
>>> >
>>> >11. How do you use a different offset management strategy? Your email
>>> >implies that it's pluggable, but I don't see how. "The offset management
>>> >strategy defaults to Kafka based offset management and the API provides
>>> a
>>> >way for the user to use a customized offset store to manage the
>>> consumer's
>>> >offsets."
>>> >
>>> >12. If I wish to decouple the consumer from the offset checkpointing, is
>>> >it OK to use Joel's offset management stuff directly, rather than
>>> through
>>> >the consumer's commit API?
>>> >
>>> >
>>> >Cheers,
>>> >Chris
>>> >
>>> >On 2/10/14 10:54 AM, "Neha Narkhede" <neha.narkh...@gmail.com> wrote:
>>> >
>>> >>As mentioned in previous emails, we are also working on a
>>> >>re-implementation
>>> >>of the consumer. I would like to use this email thread to discuss the
>>> >>details of the public API. I would also like us to be picky about this
>>> >>public api now so it is as good as possible and we don't need to break
>>> it
>>> >>in the future.
>>> >>
>>> >>The best way to get a feel for the API is actually to take a look at
>>> the
>>> >>javadoc<
>>> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc
>>> >>/
>>> >>doc/kafka/clients/consumer/KafkaConsumer.html>,
>>> >>the hope is to get the api docs good enough so that it is
>>> >>self-explanatory.
>>> >>You can also take a look at the configs
>>> >>here<
>>> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/do
>>> >>c
>>> >>/kafka/clients/consumer/ConsumerConfig.html>
>>> >>
>>> >>Some background info on implementation:
>>> >>
>>> >>At a high level the primary difference in this consumer is that it
>>> >>removes
>>> >>the distinction between the "high-level" and "low-level" consumer. The
>>> >>new
>>> >>consumer API is non blocking and instead of returning a blocking
>>> >>iterator,
>>> >>the consumer provides a poll() API that returns a list of records. We
>>> >>think
>>> >>this is better compared to the blocking iterators since it effectively
>>> >>decouples the threading strategy used for processing messages from the
>>> >>consumer. It is worth noting that the consumer is entirely single
>>> >>threaded
>>> >>and runs in the user thread. The advantage is that it can be easily
>>> >>rewritten in less multi-threading-friendly languages. The consumer
>>> >>batches
>>> >>data and multiplexes I/O over TCP connections to each of the brokers it
>>> >>communicates with, for high throughput. The consumer also allows long
>>> >>poll
>>> >>to reduce the end-to-end message latency for low throughput data.
>>> >>
>>> >>The consumer provides a group management facility that supports the
>>> >>concept
>>> >>of a group with multiple consumer instances (just like the current
>>> >>consumer). This is done through a custom heartbeat and group management
>>> >>protocol transparent to the user. At the same time, it allows users the
>>> >>option to subscribe to a fixed set of partitions and not use group
>>> >>management at all. The offset management strategy defaults to Kafka
>>> based
>>> >>offset management and the API provides a way for the user to use a
>>> >>customized offset store to manage the consumer's offsets.
>>> >>
>>> >>A key difference in this consumer also is the fact that it does not
>>> >>depend
>>> >>on zookeeper at all.
>>> >>
>>> >>More details about the new consumer design are
>>> >>here<
>>> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer
>>> >>+
>>> >>Rewrite+Design>
>>> >>
>>> >>Please take a look at the new
>>> >>API<
>>> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc
>>> >>/
>>> >>kafka/clients/consumer/KafkaConsumer.html>and
>>> >>give us any thoughts you may have.
>>> >>
>>> >>Thanks,
>>> >>Neha
>>> >
>>>
>>>
>>
>

Reply via email to