Thanks for the feedback.

Mattijs -

- Constructors link to
http://kafka.apache.org/documentation.html#consumerconfigs for valid
configurations, which lists zookeeper.connect rather than
metadata.broker.list, the value for BROKER_LIST_CONFIG in ConsumerConfig.
Fixed it to just point to ConsumerConfig for now until we finalize the new
configs
- Docs for poll(long) mention consumer.commit(true), which I can't find in
the Consumer docs. For a simple consumer setup, that call is something that
would make a lot of sense.
Missed changing the examples to use consumer.commit(true, offsets). The
suggestions by Jay would change it to commit(offsets) and
commitAsync(offsets), which will hopefully make it easier to understand
those commit APIs.
- Love the addition of MockConsumer, awesome for unittesting :)
I'm not quite satisfied with what it does as of right now, but we will
surely improve it as we start writing the consumer.

Jay -

1. ConsumerRebalanceCallback
    a. Makes sense. Renamed to onPartitionsRevoked
    b. Ya, it will be good to make it forward compatible with Java 8
capabilities. We can change it to PartitionsAssignedCallback and
         PartitionsRevokedCallback or RebalanceBeginCallback and
RebalanceEndCallback?
    c. Ya, I thought about that but then didn't name it just
RebalanceCallback since there could be a conflict with a controller side
rebalance callback if/when we have one. However, you can argue that at that
time we can name it ControllerRebalanceCallback instead of polluting a user
facing API. So agree with you here.
2. Ya, that is a good idea. Changed to subscribe(String topic,
int...partitions).
3. lastCommittedOffset() is not necessarily a local access since the
consumer can potentially ask for the last committed offsets of partitions
that the consumer does not consume and maintain the offsets for. That's the
reason it is batched right now.
4. Yes, look at
http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html#AUTO_OFFSET_RESET_CONFIG
5. Sure, but that is not part of the consumer API right? I think you're
suggesting looking at OffsetRequest to see if it would do that properly?
6. Good point. Changed to poll(long timeout, TimeUnit) and poll with a
negative timeout will poll indefinitely?
7. Good point. Changed to commit(...) and commitAsync(...)
8. To commit the current position for all partitions owned by the consumer,
you can use commit(). If you don't use group management, then
commit(customListOfPartitions)
9. Forgot to include unsubscribe. Done now.
10. positions() can be called at any time and affects the next fetch on the
next poll(). Fixed the places that said "starting fetch offsets"
11. Can we not look that up by going through the messages returned and
getting the offset from the ConsumerRecord?

One thing that I really found helpful for the API design was writing out
actual code for different scenarios against the API. I think it might be
good to do that for this too--i.e. enumerate the various use cases and code
that use case up to see how it looks
The javadocs include examples for almost all possible scenarios of consumer
usage, that I could come up with. Will add more to the javadocs as I get
more feedback from our users. The advantage of having the examples in the
javadoc itself is to that the usage is self explanatory to new users.

Pradeep -

2. Changed to poll(long, TimeUnit) and a negative value for the timeout
would block in the poll forever until there is new data
3. We don't have hierarchical topics support. Would you mind explaining
what you meant?
4. I'm not so sure that we need a class to express a topic which is a
string and a separate class for just partition id. We do have a class for
TopicPartition which uniquely identifies a partition of a topic

Thanks,
Neha


On Mon, Feb 10, 2014 at 12:36 PM, Pradeep Gollakota <pradeep...@gmail.com>wrote:

> Couple of very quick thoughts.
>
> 1. +1 about renaming commit(...) and commitAsync(...)
> 2. I'd also like to extend the above for the poll()  method as well. poll()
> and pollWithTimeout(long, TimeUnit)?
> 3. Have you guys given any thought around how this API would be used with
> hierarchical topics?
> 4. Would it make sense to add classes such as TopicId, PartitionId, etc?
> Seems like it would be easier to read code with these classes as opposed to
> string and longs.
>
> - Pradeep
>
>
> On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
>
> > A few items:
> > 1. ConsumerRebalanceCallback
> >    a. onPartitionsRevoked would be a better name.
> >    b. We should discuss the possibility of splitting this into two
> > interfaces. The motivation would be that in Java 8 single method
> interfaces
> > can directly take methods which might be more intuitive.
> >    c. If we stick with a single interface I would prefer the name
> > RebalanceCallback as its more concise
> > 2. Should subscribe(String topic, int partition) should be
> subscribe(String
> > topic, int...partition)?
> > 3. Is lastCommittedOffset call just a local access? If so it would be
> more
> > convenient not to batch it.
> > 4. How are we going to handle the earliest/latest starting position
> > functionality we currently have. Does that remain a config?
> > 5. Do we need to expose the general ability to get known positions from
> the
> > log? E.g. the functionality in the OffsetRequest...? That would make the
> > ability to change position a little easier.
> > 6. Should poll(java.lang.Long timeout) be poll(long timeout, TimeUnit
> > unit)? Is it Long because it allows null? If so should we just add a
> poll()
> > that polls indefinitely?
> > 7. I recommend we remove the boolean parameter from commit as it is
> really
> > hard to read code that has boolean parameters without named arguments.
> Can
> > we make it something like commit(...) and commitAsync(...)?
> > 8. What about the common case where you just want to commit the current
> > position for all partitions?
> > 9. How do you unsubscribe?
> > 10. You say in a few places that positions() only impacts the starting
> > position, but surely that isn't the case, right? Surely it controls the
> > fetch position for that partition and can be called at any time?
> Otherwise
> > it is a pretty weird api, right?
> > 11. How do I get my current position? Not the committed position but the
> > offset of the next message that will be given to me?
> >
> > One thing that I really found helpful for the API design was writing out
> > actual code for different scenarios against the API. I think it might be
> > good to do that for this too--i.e. enumerate the various use cases and
> code
> > that use case up to see how it looks. I'm not sure if it would be useful
> to
> > collect these kinds of scenarios from people. I know they have
> sporadically
> > popped up on the mailing list.
> >
> > -Jay
> >
> >
> > On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede <neha.narkh...@gmail.com
> > >wrote:
> >
> > > As mentioned in previous emails, we are also working on a
> > re-implementation
> > > of the consumer. I would like to use this email thread to discuss the
> > > details of the public API. I would also like us to be picky about this
> > > public api now so it is as good as possible and we don't need to break
> it
> > > in the future.
> > >
> > > The best way to get a feel for the API is actually to take a look at
> the
> > > javadoc<
> > >
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
> > > >,
> > > the hope is to get the api docs good enough so that it is
> > self-explanatory.
> > > You can also take a look at the configs
> > > here<
> > >
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html
> > > >
> > >
> > > Some background info on implementation:
> > >
> > > At a high level the primary difference in this consumer is that it
> > removes
> > > the distinction between the "high-level" and "low-level" consumer. The
> > new
> > > consumer API is non blocking and instead of returning a blocking
> > iterator,
> > > the consumer provides a poll() API that returns a list of records. We
> > think
> > > this is better compared to the blocking iterators since it effectively
> > > decouples the threading strategy used for processing messages from the
> > > consumer. It is worth noting that the consumer is entirely single
> > threaded
> > > and runs in the user thread. The advantage is that it can be easily
> > > rewritten in less multi-threading-friendly languages. The consumer
> > batches
> > > data and multiplexes I/O over TCP connections to each of the brokers it
> > > communicates with, for high throughput. The consumer also allows long
> > poll
> > > to reduce the end-to-end message latency for low throughput data.
> > >
> > > The consumer provides a group management facility that supports the
> > concept
> > > of a group with multiple consumer instances (just like the current
> > > consumer). This is done through a custom heartbeat and group management
> > > protocol transparent to the user. At the same time, it allows users the
> > > option to subscribe to a fixed set of partitions and not use group
> > > management at all. The offset management strategy defaults to Kafka
> based
> > > offset management and the API provides a way for the user to use a
> > > customized offset store to manage the consumer's offsets.
> > >
> > > A key difference in this consumer also is the fact that it does not
> > depend
> > > on zookeeper at all.
> > >
> > > More details about the new consumer design are
> > > here<
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
> > > >
> > >
> > > Please take a look at the new
> > > API<
> > >
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
> > > >and
> > > give us any thoughts you may have.
> > >
> > > Thanks,
> > > Neha
> > >
> >
>

Reply via email to