Hey Neha,

I actually wasn't proposing the name TopicOffsetPosition, that was just a
typo. I meant TopicPartitionOffset, and I was just referencing what was in
the javadoc. So to restate my proposal without the typo, using just the
existing classes (that naming is a separate question):
   long position(TopicPartition tp)
   void seek(TopicPartitionOffset p)
   long committed(TopicPartition tp)
   void commit(TopicPartitionOffset...);

So I may be unclear on committed() (AKA lastCommittedOffset). Is it
returning the in-memory value from the last commit by this consumer, or is
it doing a remote fetch, or both? I think you are saying both, i.e. if you
have committed on a partition it returns you that value but if you haven't
it does a remote lookup?

The other argument for making committed batched is that commit() is
batched, so there is symmetry.

position() and seek() are always in memory changes (I assume) so there is
no need to batch them.

So taking all that into account what if we revise it to
   long position(TopicPartition tp)
   void seek(TopicPartitionOffset p)
   Map<TopicPartition, Long> committed(TopicPartition tp);
   void commit(TopicPartitionOffset...);

This is not symmetric between position/seek and commit/committed but it is
convenient. Another option for naming would be position/reposition instead
of position/seek.

With respect to the name TopicPartitionOffset, what I was trying to say is
that I recommend we change that to something shorter. I think TopicPosition
or ConsumerPosition might be better. Position does not refer to the
variables in the object, it refers to the meaning of the object--it
represents a position within a topic. The offset field in that object is
still called the offset. TopicOffset, PartitionOffset, or ConsumerOffset
would all be workable too. Basically I am just objecting to concatenating
three nouns together. :-)

-Jay





On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede <neha.narkh...@gmail.com>wrote:

> 2. It returns a list of results. But how can you use the list? The only way
> to use the list is to make a map of tp=>offset and then look up results in
> this map (or do a for loop over the list for the partition you want). I
> recommend that if this is an in-memory check we just do one at a time. E.g.
> long committedPosition(
> TopicPosition).
>
> This was discussed in the previous emails. There is a choice between
> returning a map or a list. Some people found the map to be more usable.
>
> What if we made it:
>    long position(TopicPartition tp)
>    void seek(TopicOffsetPosition p)
>    long committed(TopicPartition tp)
>    void commit(TopicOffsetPosition...);
>
> This is fine, but TopicOffsetPosition doesn't make sense. Offset and
> Position is confusing. Also both fetch and commit positions are related to
> partitions, not topics. Some more options are TopicPartitionPosition or
> TopicPartitionOffset. And we should use either position everywhere in Kafka
> or offset but having both is confusing.
>
>    void seek(TopicOffsetPosition p)
>    long committed(TopicPartition tp)
>
> Whether these are batched or not really depends on how flexible we want
> these APIs to be. The question is whether we allow a consumer to fetch or
> set the offsets for partitions that it doesn't own or consume. For example,
> if I choose to skip group management and do my own partition assignment but
> choose Kafka based offset management. I could imagine a use case where I
> want to change the partition assignment on the fly, and to do that, I would
> need to fetch the last committed offsets of partitions that I currently
> don't consume.
>
> If we want to allow this, these APIs would be more performant if batched.
> And would probably look like -
>    Map<TopicPartition, Long> positions(TopicPartition... tp)
>    void seek(TopicOffsetPosition... p)
>    Map<TopicPartition, Long> committed(TopicPartition... tp)
>    void commit(TopicOffsetPosition...)
>
> These are definitely more clunky than the non batched ones though.
>
> Thanks,
> Neha
>
>
>
> On Thu, Feb 13, 2014 at 1:24 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
>
> > Hey guys,
> >
> > One thing that bugs me is the lack of symmetric for the different
> position
> > calls. The way I see it there are two positions we maintain: the fetch
> > position and the last commit position. There are two things you can do to
> > these positions: get the current value or change the current value. But
> the
> > names somewhat obscure this:
> >   Fetch position:
> >     - No get
> >     - set by positions(TopicOffsetPosition...)
> >   Committed position:
> >     - get by List<TopicOffsetPosition> lastCommittedPosition(
> > TopicPartition...)
> >     - set by commit or commitAsync
> >
> > The lastCommittedPosition is particular bothersome because:
> > 1. The name is weird and long
> > 2. It returns a list of results. But how can you use the list? The only
> way
> > to use the list is to make a map of tp=>offset and then look up results
> in
> > this map (or do a for loop over the list for the partition you want). I
> > recommend that if this is an in-memory check we just do one at a time.
> E.g.
> > long committedPosition(TopicPosition).
> >
> > What if we made it:
> >    long position(TopicPartition tp)
> >    void seek(TopicOffsetPosition p)
> >    long committed(TopicPartition tp)
> >    void commit(TopicOffsetPosition...);
> >
> > This still isn't terribly consistent, but I think it is better.
> >
> > I would also like to shorten the name TopicOffsetPosition. Offset and
> > Position are duplicative of each other. So perhaps we could call it a
> > PartitionOffset or a TopicPosition or something like that. In general
> class
> > names that are just a concatenation of the fields (e.g.
> > TopicAndPartitionAndOffset) seem kind of lazy to me since the name
> doesn't
> > really describe it just enumerates. But that is more of a nit pick.
> >
> > -Jay
> >
> >
> > On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede <neha.narkh...@gmail.com
> > >wrote:
> >
> > > As mentioned in previous emails, we are also working on a
> > re-implementation
> > > of the consumer. I would like to use this email thread to discuss the
> > > details of the public API. I would also like us to be picky about this
> > > public api now so it is as good as possible and we don't need to break
> it
> > > in the future.
> > >
> > > The best way to get a feel for the API is actually to take a look at
> the
> > > javadoc<
> > >
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
> > > >,
> > > the hope is to get the api docs good enough so that it is
> > self-explanatory.
> > > You can also take a look at the configs
> > > here<
> > >
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html
> > > >
> > >
> > > Some background info on implementation:
> > >
> > > At a high level the primary difference in this consumer is that it
> > removes
> > > the distinction between the "high-level" and "low-level" consumer. The
> > new
> > > consumer API is non blocking and instead of returning a blocking
> > iterator,
> > > the consumer provides a poll() API that returns a list of records. We
> > think
> > > this is better compared to the blocking iterators since it effectively
> > > decouples the threading strategy used for processing messages from the
> > > consumer. It is worth noting that the consumer is entirely single
> > threaded
> > > and runs in the user thread. The advantage is that it can be easily
> > > rewritten in less multi-threading-friendly languages. The consumer
> > batches
> > > data and multiplexes I/O over TCP connections to each of the brokers it
> > > communicates with, for high throughput. The consumer also allows long
> > poll
> > > to reduce the end-to-end message latency for low throughput data.
> > >
> > > The consumer provides a group management facility that supports the
> > concept
> > > of a group with multiple consumer instances (just like the current
> > > consumer). This is done through a custom heartbeat and group management
> > > protocol transparent to the user. At the same time, it allows users the
> > > option to subscribe to a fixed set of partitions and not use group
> > > management at all. The offset management strategy defaults to Kafka
> based
> > > offset management and the API provides a way for the user to use a
> > > customized offset store to manage the consumer's offsets.
> > >
> > > A key difference in this consumer also is the fact that it does not
> > depend
> > > on zookeeper at all.
> > >
> > > More details about the new consumer design are
> > > here<
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
> > > >
> > >
> > > Please take a look at the new
> > > API<
> > >
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
> > > >and
> > > give us any thoughts you may have.
> > >
> > > Thanks,
> > > Neha
> > >
> >
>

Reply via email to