Yes but the problem is that poll() actually has side effects if you are
using auto commit. So you have to do an awkward thing were you track the
last offset you've seen and somehow keep this up to date as the partitions
you own changes. Likewise if you want this value prior to reading any
messages that won't work.

-Jay


On Fri, Feb 21, 2014 at 4:56 PM, Jun Rao <jun...@gmail.com> wrote:

> What's the use case of position()? Isn't that just the nextOffset() on the
> last message returned from poll()?
>
> Thanks,
>
> Jun
>
>
> On Sun, Feb 16, 2014 at 9:12 AM, Jay Kreps <jay.kr...@gmail.com> wrote:
>
> > +1 I think those are good. It is a little weird that changing the fetch
> > point is not batched but changing the commit point is, but I suppose
> there
> > is no helping that.
> >
> > -Jay
> >
> >
> > On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede <neha.narkh...@gmail.com
> > >wrote:
> >
> > > Jay,
> > >
> > > That makes sense. position/seek deal with changing the consumers
> > in-memory
> > > data, so there is no remote rpc there. For some reason, I got committed
> > and
> > > seek mixed up in my head at that time :)
> > >
> > > So we still end up with
> > >
> > >    long position(TopicPartition tp)
> > >    void seek(TopicPartitionOffset p)
> > >    Map<TopicPartition, Long> committed(TopicPartition tp);
> > >    void commit(TopicPartitionOffset...);
> > >
> > > Thanks,
> > > Neha
> > >
> > > On Friday, February 14, 2014, Jay Kreps <jay.kr...@gmail.com> wrote:
> > >
> > > > Oh, interesting. So I am assuming the following implementation:
> > > > 1. We have an in-memory fetch position which controls the next fetch
> > > > offset.
> > > > 2. Changing this has no effect until you poll again at which point
> your
> > > > fetch request will be from the newly specified offset
> > > > 3. We then have an in-memory but also remotely stored committed
> offset.
> > > > 4. Calling commit has the effect of saving the fetch position as both
> > the
> > > > in memory committed position and in the remote store
> > > > 5. Auto-commit is the same as periodically calling commit on all
> > > positions.
> > > >
> > > > So batching on commit as well as getting the committed position makes
> > > > sense, but batching the fetch position wouldn't, right? I think you
> are
> > > > actually thinking of a different approach.
> > > >
> > > > -Jay
> > > >
> > > >
> > > > On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede <
> > neha.narkh...@gmail.com
> > > <javascript:;>
> > > > >wrote:
> > > >
> > > > > I think you are saying both, i.e. if you
> > > > > have committed on a partition it returns you that value but if you
> > > > haven't
> > > > > it does a remote lookup?
> > > > >
> > > > > Correct.
> > > > >
> > > > > The other argument for making committed batched is that commit() is
> > > > > batched, so there is symmetry.
> > > > >
> > > > > position() and seek() are always in memory changes (I assume) so
> > there
> > > is
> > > > > no need to batch them.
> > > > >
> > > > > I'm not as sure as you are about that assumption being true.
> > Basically
> > > in
> > > > > my example above, the batching argument for committed() also
> applies
> > to
> > > > > position() since one purpose of fetching a partition's offset is to
> > use
> > > > it
> > > > > to set the position of the consumer to that offset. Since that
> might
> > > lead
> > > > > to a remote OffsetRequest call, I think we probably would be better
> > off
> > > > > batching it.
> > > > >
> > > > > Another option for naming would be position/reposition instead
> > > > > of position/seek.
> > > > >
> > > > > I think position/seek is better since it aligns with Java file
> APIs.
> > > > >
> > > > > I also think your suggestion about ConsumerPosition makes sense.
> > > > >
> > > > > Thanks,
> > > > > Neha
> > > > > On Feb 13, 2014 9:22 PM, "Jay Kreps" <jay.kr...@gmail.com> wrote:
> > > > >
> > > > > > Hey Neha,
> > > > > >
> > > > > > I actually wasn't proposing the name TopicOffsetPosition, that
> was
> > > > just a
> > > > > > typo. I meant TopicPartitionOffset, and I was just referencing
> what
> > > was
> > > > > in
> > > > > > the javadoc. So to restate my proposal without the typo, using
> just
> > > the
> > > > > > existing classes (that naming is a separate question):
> > > > > >    long position(TopicPartition tp)
> > > > > >    void seek(TopicPartitionOffset p)
> > > > > >    long committed(TopicPartition tp)
> > > > > >    void commit(TopicPartitionOffset...);
> > > > > >
> > > > > > So I may be unclear on committed() (AKA lastCommittedOffset). Is
> it
> > > > > > returning the in-memory value from the last commit by this
> > consumer,
> > > or
> > > > > is
> > > > > > it doing a remote fetch, or both? I think you are saying both,
> i.e.
> > > if
> > > > > you
> > > > > > have committed on a partition it returns you that value but if
> you
> > > > > haven't
> > > > > > it does a remote lookup?
> > > > > >
> > > > > > The other argument for making committed batched is that commit()
> is
> > > > > > batched, so there is symmetry.
> > > > > >
> > > > > > position() and seek() are always in memory changes (I assume) so
> > > there
> > > > is
> > > > > > no need to batch them.
> > > > > >
> > > > > > So taking all that into account what if we revise it to
> > > > > >    long position(TopicPartition tp)
> > > > > >    void seek(TopicPartitionOffset p)
> > > > > >    Map<TopicPartition, Long> committed(TopicPartition tp);
> > > > > >    void commit(TopicPartitionOffset...);
> > > > > >
> > > > > > This is not symmetric between position/seek and commit/committed
> > but
> > > it
> > > > > is
> > > > > > convenient. Another option for naming would be
> position/reposition
> > > > > instead
> > > > > > of position/seek.
> > > > > >
> > > > > > With respect to the name TopicPartitionOffset, what I was trying
> to
> > > say
> > > > > is
> > > > > > that I recommend we change that to something shorter. I think
> > > > > TopicPosition
> > > > > > or ConsumerPosition might be better. Position does not refer to
> the
> > > > > > variables in the object, it refers to the meaning of the
> object--it
> > > > > > represents a position within a topic. The offset field in that
> > object
> > > > is
> > > > > > still called the offset. TopicOffset, PartitionOffset, or
> > > > ConsumerOffset
> > > > > > would all be workable too. Basically I am just objecting to
> > > > concatenating
> > > > > > three nouns together. :-)
> > > > > >
> > > > > > -Jay
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > >
> > > > > > On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede <
> > > > neha.narkh...@gmail.com
> > > > > > >wrote:
> > > > > >
> > > > > > > 2. It returns a list of results. But how can you use the list?
> > The
> > > > only
> > > > > > way
> > > > > > > to use the list is to make a map of tp=>offset and then look up
> > > > results
> > > > > > in
> > > > > > > this map (or do a for loop over the list for the partition you
> > > > want). I
> > > > > > > recommend that if this is an in-memory check we just do one at
> a
> > > > time.
> > > > > > E.g.
> > > > > > > long committedPosition(
> > > > > > > TopicPosition).
> > > > > > >
> > > > > > > This was discussed in the previous emails. There is a choic
> > >
> >
>

Reply via email to