Updated thoughts.

   1.

   subscribe(String topic, int... paritions) and unsubscribe(String topic,
   int... partitions) should be subscribe(TopicPartition...
topicPartitions)and unsubscribe(TopicPartition...
   topicPartitons)
    2.

   Does it make sense to provide a convenience method to subscribe to
   topics at a particular offset directly? E.g.
subscribe(TopicPartitionOffset...
   offsets)
    3.

   The javadoc makes no mention of what would happen if positions() is
   called with a TopicPartitionOffset to which the Consumer is not
   subscribed to.
    4.

   The javadoc makes no mention of what would happen if positions() is
   called with two different offsets for a single TopicPartition
    5. The javadoc shows lastCommittedOffsets() return type as
   List<TopicPartitionOffset>. This should either be Map<TopicPartition,
   Long> or Map<TopicPartition, TopicPartitionOffset>
   6. It seems like #4 can be avoided by using Map<TopicPartition,
Long> or Map<TopicPartition,
   TopicPartitionOffset> as the argument type.
   7. To address #3, maybe we can return List<TopicPartitionOffset> that
   are invalid.



On Tue, Feb 11, 2014 at 12:04 PM, Neha Narkhede <neha.narkh...@gmail.com>wrote:

> Pradeep,
>
> To be clear, we want to get feedback on the APIs from the
> javadoc<
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
> >since
> the wiki will be slightly behind on the APIs.
>
> 1. Regarding consistency, do you have specific feedback on which APIs
> should have different arguments/return types?
> 2. lastCommittedOffsets() does what you said in the javadoc.
>
> Thanks,
> Neha
>
>
> On Tue, Feb 11, 2014 at 11:45 AM, Pradeep Gollakota <pradeep...@gmail.com
> >wrote:
>
> > Hi Jay,
> >
> > I apologize for derailing the conversation about the consumer API. We
> > should start a new discussion about hierarchical topics, if we want to
> keep
> > talking about it. My final thought on the matter is that, hierarchical
> > topics is still an important feature to have in Kafka, because it gives
> us
> > flexibility to do namespace level access controls.
> >
> > Getting back to the topic of the Consumer API:
> >
> >    1. Any thoughts on consistency for method arguments and return types?
> >    2. lastCommittedOffsets() method returns a
> > List<TopicPartitionOffset>where as the confluence page suggested a
> > Map<TopicPartition,
> >    Long>. I would think that a Map is the more appropriate return type.
> >
> >
> >
> > On Tue, Feb 11, 2014 at 8:04 AM, Jay Kreps <jay.kr...@gmail.com> wrote:
> >
> > > Hey Pradeep,
> > >
> > > That wiki is fairly old and it predated more flexible subscription
> > > mechanisms. In the high-level consumer you currently have wildcard
> > > subscription and in the new proposed interface you can actually
> subscribe
> > > based on any logic you want to create a "union" of streams. Personally
> I
> > > think this gives you everything you would want with a hierarchy and
> more
> > > actual flexibility (since you can define groupings however you want).
> > What
> > > do you think?
> > >
> > > -Jay
> > >
> > >
> > > On Mon, Feb 10, 2014 at 3:37 PM, Pradeep Gollakota <
> pradeep...@gmail.com
> > > >wrote:
> > >
> > > > WRT to hierarchical topics, I'm referring to
> > > > KAFKA-1175<https://issues.apache.org/jira/browse/KAFKA-1175>.
> > > > I would just like to think through the implications for the Consumer
> > API
> > > if
> > > > and when we do implement hierarchical topics. For example, in the
> > > > proposal<
> > > >
> https://cwiki.apache.org/confluence/display/KAFKA/Hierarchical+Topics#
> > > > >written
> > > > by Jay, he says that initially wildcard subscriptions are not going
> > > > to be supported. But does that mean that they will be supported in
> v2?
> > If
> > > > that's the case, that would change the semantics of the Consumer API.
> > > >
> > > > As to having classes for Topic, PartitionId, etc. it looks like I was
> > > > referring to the TopicPartition and TopicPartitionOffset classes (I
> > > didn't
> > > > realize these were already there). I was only looking at the
> confluence
> > > > page which shows List[(String, Int, Long)] instead of
> > > > List[TopicParitionOffset] (as is shown in the javadoc). However, I
> did
> > > > notice that we're not being consistent in the Java version. E.g. we
> > have
> > > > commit(TopicPartitionOffset... offsets) and
> > > > lastCommittedOffsets(TopicPartition... partitions) on the one hand.
> On
> > > the
> > > > other hand we have subscribe(String topic, int... partitions). I
> agree
> > > that
> > > > creating a class for TopicId today would probably not make too much
> > sense
> > > > today. But with hierarchical topics, I may change my mind. This is
> > > exactly
> > > > what was done in the HBase API in 0.96 when namespaces were added.
> 0.96
> > > > HBase API introduced a class called 'TableName' to represent the
> > > namespace
> > > > and table name.
> > > >
> > > >
> > > > On Mon, Feb 10, 2014 at 3:08 PM, Neha Narkhede <
> > neha.narkh...@gmail.com
> > > > >wrote:
> > > >
> > > > > Thanks for the feedback.
> > > > >
> > > > > Mattijs -
> > > > >
> > > > > - Constructors link to
> > > > > http://kafka.apache.org/documentation.html#consumerconfigs for
> valid
> > > > > configurations, which lists zookeeper.connect rather than
> > > > > metadata.broker.list, the value for BROKER_LIST_CONFIG in
> > > ConsumerConfig.
> > > > > Fixed it to just point to ConsumerConfig for now until we finalize
> > the
> > > > new
> > > > > configs
> > > > > - Docs for poll(long) mention consumer.commit(true), which I can't
> > find
> > > > in
> > > > > the Consumer docs. For a simple consumer setup, that call is
> > something
> > > > that
> > > > > would make a lot of sense.
> > > > > Missed changing the examples to use consumer.commit(true, offsets).
> > The
> > > > > suggestions by Jay would change it to commit(offsets) and
> > > > > commitAsync(offsets), which will hopefully make it easier to
> > understand
> > > > > those commit APIs.
> > > > > - Love the addition of MockConsumer, awesome for unittesting :)
> > > > > I'm not quite satisfied with what it does as of right now, but we
> > will
> > > > > surely improve it as we start writing the consumer.
> > > > >
> > > > > Jay -
> > > > >
> > > > > 1. ConsumerRebalanceCallback
> > > > >     a. Makes sense. Renamed to onPartitionsRevoked
> > > > >     b. Ya, it will be good to make it forward compatible with Java
> 8
> > > > > capabilities. We can change it to PartitionsAssignedCallback and
> > > > >          PartitionsRevokedCallback or RebalanceBeginCallback and
> > > > > RebalanceEndCallback?
> > > > >     c. Ya, I thought about that but then didn't name it just
> > > > > RebalanceCallback since there could be a conflict with a controller
> > > side
> > > > > rebalance callback if/when we have one. However, you can argue that
> > at
> > > > that
> > > > > time we can name it ControllerRebalanceCallback instead of
> polluting
> > a
> > > > user
> > > > > facing API. So agree with you here.
> > > > > 2. Ya, that is a good idea. Changed to subscribe(String topic,
> > > > > int...partitions).
> > > > > 3. lastCommittedOffset() is not necessarily a local access since
> the
> > > > > consumer can potentially ask for the last committed offsets of
> > > partitions
> > > > > that the consumer does not consume and maintain the offsets for.
> > That's
> > > > the
> > > > > reason it is batched right now.
> > > > > 4. Yes, look at
> > > > >
> > > > >
> > > >
> > >
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html#AUTO_OFFSET_RESET_CONFIG
> > > > > 5. Sure, but that is not part of the consumer API right? I think
> > you're
> > > > > suggesting looking at OffsetRequest to see if it would do that
> > > properly?
> > > > > 6. Good point. Changed to poll(long timeout, TimeUnit) and poll
> with
> > a
> > > > > negative timeout will poll indefinitely?
> > > > > 7. Good point. Changed to commit(...) and commitAsync(...)
> > > > > 8. To commit the current position for all partitions owned by the
> > > > consumer,
> > > > > you can use commit(). If you don't use group management, then
> > > > > commit(customListOfPartitions)
> > > > > 9. Forgot to include unsubscribe. Done now.
> > > > > 10. positions() can be called at any time and affects the next
> fetch
> > on
> > > > the
> > > > > next poll(). Fixed the places that said "starting fetch offsets"
> > > > > 11. Can we not look that up by going through the messages returned
> > and
> > > > > getting the offset from the ConsumerRecord?
> > > > >
> > > > > One thing that I really found helpful for the API design was
> writing
> > > out
> > > > > actual code for different scenarios against the API. I think it
> might
> > > be
> > > > > good to do that for this too--i.e. enumerate the various use cases
> > and
> > > > code
> > > > > that use case up to see how it looks
> > > > > The javadocs include examples for almost all possible scenarios of
> > > > consumer
> > > > > usage, that I could come up with. Will add more to the javadocs as
> I
> > > get
> > > > > more feedback from our users. The advantage of having the examples
> in
> > > the
> > > > > javadoc itself is to that the usage is self explanatory to new
> users.
> > > > >
> > > > > Pradeep -
> > > > >
> > > > > 2. Changed to poll(long, TimeUnit) and a negative value for the
> > timeout
> > > > > would block in the poll forever until there is new data
> > > > > 3. We don't have hierarchical topics support. Would you mind
> > explaining
> > > > > what you meant?
> > > > > 4. I'm not so sure that we need a class to express a topic which
> is a
> > > > > string and a separate class for just partition id. We do have a
> class
> > > for
> > > > > TopicPartition which uniquely identifies a partition of a topic
> > > > >
> > > > > Thanks,
> > > > > Neha
> > > > >
> > > > >
> > > > > On Mon, Feb 10, 2014 at 12:36 PM, Pradeep Gollakota <
> > > > pradeep...@gmail.com
> > > > > >wrote:
> > > > >
> > > > > > Couple of very quick thoughts.
> > > > > >
> > > > > > 1. +1 about renaming commit(...) and commitAsync(...)
> > > > > > 2. I'd also like to extend the above for the poll()  method as
> > well.
> > > > > poll()
> > > > > > and pollWithTimeout(long, TimeUnit)?
> > > > > > 3. Have you guys given any thought around how this API would be
> > used
> > > > with
> > > > > > hierarchical topics?
> > > > > > 4. Would it make sense to add classes such as TopicId,
> PartitionId,
> > > > etc?
> > > > > > Seems like it would be easier to read code with these classes as
> > > > opposed
> > > > > to
> > > > > > string and longs.
> > > > > >
> > > > > > - Pradeep
> > > > > >
> > > > > >
> > > > > > On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps <jay.kr...@gmail.com
> >
> > > > wrote:
> > > > > >
> > > > > > > A few items:
> > > > > > > 1. ConsumerRebalanceCallback
> > > > > > >    a. onPartitionsRevoked would be a better name.
> > > > > > >    b. We should discuss the possibility of splitting this into
> > two
> > > > > > > interfaces. The motivation would be that in Java 8 single
> method
> > > > > > interfaces
> > > > > > > can directly take methods which might be more intuitive.
> > > > > > >    c. If we stick with a single interface I would prefer the
> name
> > > > > > > RebalanceCallback as its more concise
> > > > > > > 2. Should subscribe(String topic, int partition) should be
> > > > > > subscribe(String
> > > > > > > topic, int...partition)?
> > > > > > > 3. Is lastCommittedOffset call just a local access? If so it
> > would
> > > be
> > > > > > more
> > > > > > > convenient not to batch it.
> > > > > > > 4. How are we going to handle the earliest/latest starting
> > position
> > > > > > > functionality we currently have. Does that remain a config?
> > > > > > > 5. Do we need to expose the general ability to get known
> > positions
> > > > from
> > > > > > the
> > > > > > > log? E.g. the functionality in the OffsetRequest...? That would
> > > make
> > > > > the
> > > > > > > ability to change position a little easier.
> > > > > > > 6. Should poll(java.lang.Long timeout) be poll(long timeout,
> > > TimeUnit
> > > > > > > unit)? Is it Long because it allows null? If so should we just
> > add
> > > a
> > > > > > poll()
> > > > > > > that polls indefinitely?
> > > > > > > 7. I recommend we remove the boolean parameter from commit as
> it
> > is
> > > > > > really
> > > > > > > hard to read code that has boolean parameters without named
> > > > arguments.
> > > > > > Can
> > > > > > > we make it something like commit(...) and commitAsync(...)?
> > > > > > > 8. What about the common case where you just want to commit the
> > > > current
> > > > > > > position for all partitions?
> > > > > > > 9. How do you unsubscribe?
> > > > > > > 10. You say in a few places that positions() only impacts the
> > > > starting
> > > > > > > position, but surely that isn't the case, right? Surely it
> > controls
> > > > the
> > > > > > > fetch position for that partition and can be called at any
> time?
> > > > > > Otherwise
> > > > > > > it is a pretty weird api, right?
> > > > > > > 11. How do I get my current position? Not the committed
> position
> > > but
> > > > > the
> > > > > > > offset of the next message that will be given to me?
> > > > > > >
> > > > > > > One thing that I really found helpful for the API design was
> > > writing
> > > > > out
> > > > > > > actual code for different scenarios against the API. I think it
> > > might
> > > > > be
> > > > > > > good to do that for this too--i.e. enumerate the various use
> > cases
> > > > and
> > > > > > code
> > > > > > > that use case up to see how it looks. I'm not sure if it would
> be
> > > > > useful
> > > > > > to
> > > > > > > collect these kinds of scenarios from people. I know they have
> > > > > > sporadically
> > > > > > > popped up on the mailing list.
> > > > > > >
> > > > > > > -Jay
> > > > > > >
> > > > > > >
> > > > > > > On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede <
> > > > > neha.narkh...@gmail.com
> > > > > > > >wrote:
> > > > > > >
> > > > > > > > As mentioned in previous emails, we are also working on a
> > > > > > > re-implementation
> > > > > > > > of the consumer. I would like to use this email thread to
> > discuss
> > > > the
> > > > > > > > details of the public API. I would also like us to be picky
> > about
> > > > > this
> > > > > > > > public api now so it is as good as possible and we don't need
> > to
> > > > > break
> > > > > > it
> > > > > > > > in the future.
> > > > > > > >
> > > > > > > > The best way to get a feel for the API is actually to take a
> > look
> > > > at
> > > > > > the
> > > > > > > > javadoc<
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
> > > > > > > > >,
> > > > > > > > the hope is to get the api docs good enough so that it is
> > > > > > > self-explanatory.
> > > > > > > > You can also take a look at the configs
> > > > > > > > here<
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html
> > > > > > > > >
> > > > > > > >
> > > > > > > > Some background info on implementation:
> > > > > > > >
> > > > > > > > At a high level the primary difference in this consumer is
> that
> > > it
> > > > > > > removes
> > > > > > > > the distinction between the "high-level" and "low-level"
> > > consumer.
> > > > > The
> > > > > > > new
> > > > > > > > consumer API is non blocking and instead of returning a
> > blocking
> > > > > > > iterator,
> > > > > > > > the consumer provides a poll() API that returns a list of
> > > records.
> > > > We
> > > > > > > think
> > > > > > > > this is better compared to the blocking iterators since it
> > > > > effectively
> > > > > > > > decouples the threading strategy used for processing messages
> > > from
> > > > > the
> > > > > > > > consumer. It is worth noting that the consumer is entirely
> > single
> > > > > > > threaded
> > > > > > > > and runs in the user thread. The advantage is that it can be
> > > easily
> > > > > > > > rewritten in less multi-threading-friendly languages. The
> > > consumer
> > > > > > > batches
> > > > > > > > data and multiplexes I/O over TCP connections to each of the
> > > > brokers
> > > > > it
> > > > > > > > communicates with, for high throughput. The consumer also
> > allows
> > > > long
> > > > > > > poll
> > > > > > > > to reduce the end-to-end message latency for low throughput
> > data.
> > > > > > > >
> > > > > > > > The consumer provides a group management facility that
> supports
> > > the
> > > > > > > concept
> > > > > > > > of a group with multiple consumer instances (just like the
> > > current
> > > > > > > > consumer). This is done through a custom heartbeat and group
> > > > > management
> > > > > > > > protocol transparent to the user. At the same time, it allows
> > > users
> > > > > the
> > > > > > > > option to subscribe to a fixed set of partitions and not use
> > > group
> > > > > > > > management at all. The offset management strategy defaults to
> > > Kafka
> > > > > > based
> > > > > > > > offset management and the API provides a way for the user to
> > use
> > > a
> > > > > > > > customized offset store to manage the consumer's offsets.
> > > > > > > >
> > > > > > > > A key difference in this consumer also is the fact that it
> does
> > > not
> > > > > > > depend
> > > > > > > > on zookeeper at all.
> > > > > > > >
> > > > > > > > More details about the new consumer design are
> > > > > > > > here<
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
> > > > > > > > >
> > > > > > > >
> > > > > > > > Please take a look at the new
> > > > > > > > API<
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
> > > > > > > > >and
> > > > > > > > give us any thoughts you may have.
> > > > > > > >
> > > > > > > > Thanks,
> > > > > > > > Neha
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to