Hey Neha,

How do you know n? The whole point is that you need to be able to fetch the
end offset. You can't a priori decide you will load 1m messages without
knowing what is there.

-Jay


On Tue, Feb 25, 2014 at 9:54 AM, Neha Narkhede <neha.narkh...@gmail.com>wrote:

> Jay/Robert -
>
> I think what Robert is saying is that we need to think through the offset
> API to enable "batch processing" of topic data. Think of a process that
> periodically kicks off to compute a data summary or do a data load or
> something like that. I think what we need to support this is an api to
> fetch the last offset from the server for a partition. Something like
>    long lastOffset(TopicPartition tp)
> and for symmetry
>    long firstOffset(TopicPartition tp)
>
> Likely this would have to be batched.
>
> A fixed range of data load can be done using the existing APIs as follows.
> This assumes you know the endOffset which can be currentOffset + n (number
> of messages in the load)
>
> long startOffset = consumer.position(partition);
> long endOffset = startOffset + n;
> while(consumer.position(partition) <= endOffset) {
>      List<ConsumerRecord> messages = consumer.poll(timeout,
> TimeUnit.MILLISECONDS);
>      process(messages, endOffset);          // processes messages until
> endOffset
> }
>
> Does that make sense?
>
>
> On Tue, Feb 25, 2014 at 9:49 AM, Neha Narkhede <neha.narkh...@gmail.com
> >wrote:
>
> > Thanks for the review, Jun. Here are some comments -
> >
> >
> > 1. The using of ellipsis: This may make passing a list of items from a
> > collection to the api a bit harder. Suppose that you have a list of
> topics
> > stored in
> >
> > ArrayList<String> topics;
> >
> > If you want subscribe to all topics in one call, you will have to do:
> >
> > String[] topicArray = new String[topics.size()];
> > consumer.subscribe(topics.
> > toArray(topicArray));
> >
> > A similar argument can be made for arguably the more common use case of
> > subscribing to a single topic as well. In these cases, user is required
> to
> > write more
> > code to create a single item collection and pass it in. Since
> subscription
> > is extremely lightweight
> > invoking it multiple times also seems like a workable solution, no?
> >
> > 2. It would be good to document that the following apis are mutually
> > exclusive. Also, if the partition level subscription is specified, there
> is
> > no group management. Finally, unsubscribe() can only be used to cancel
> > subscriptions with the same pattern. For example, you can't unsubscribe
> at
> > the partition level if the subscription is done at the topic level.
> >
> > *subscribe*(java.lang.String... topics)
> > *subscribe*(java.lang.String topic, int... partitions)
> >
> > Makes sense. Made the suggested improvements to the docs<
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29
> >
> >
> >
> > 3.commit(): The following comment in the doc should probably say "commit
> > offsets for partitions assigned to this consumer".
> >
> >  If no partitions are specified, commits offsets for the subscribed list
> of
> > topics and partitions to Kafka.
> >
> > Could you give more context on this suggestion? Here is the entire doc -
> >
> > Synchronously commits the specified offsets for the specified list of
> > topics and partitions to *Kafka*. If no partitions are specified, commits
> > offsets for the subscribed list of topics and partitions.
> >
> > The hope is to convey that if no partitions are specified, offsets will
> be
> > committed for the subscribed list of partitions. One improvement could
> be to
> > explicitly state that the offsets returned on the last poll will be
> > committed. I updated this to -
> >
> > Synchronously commits the specified offsets for the specified list of
> > topics and partitions to *Kafka*. If no offsets are specified, commits
> > offsets returned on the last {@link #poll(long, TimeUnit) poll()} for the
> > subscribed list of topics and partitions.
> >
> > 4. There is inconsistency in specifying partitions. Sometimes we use
> > TopicPartition and some other times we use String and int (see
> > examples below).
> >
> > void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions)
> >
> > public void *subscribe*(java.lang.String topic, int... partitions)
> >
> > Yes, this was discussed previously. I think generally the consensus seems
> > to be to use the higher level
> > classes everywhere. Made those changes.
> >
> > What's the use case of position()? Isn't that just the nextOffset() on
> the
> > last message returned from poll()?
> >
> > Yes, except in the case where a rebalance is triggered and poll() is not
> > yet invoked. Here, you would use position() to get the new fetch position
> > for the specific partition. Even if this is not a common use case, IMO it
> > is much easier to use position() to get the fetch offset than invoking
> > nextOffset() on the last message. This also keeps the APIs symmetric,
> which
> > is nice.
> >
> >
> >
> >
> > On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert <
> robert.with...@dish.com>wrote:
> >
> >> That's wonderful.  Thanks for kafka.
> >>
> >> Rob
> >>
> >> On Feb 24, 2014, at 9:58 AM, Guozhang Wang <wangg...@gmail.com<mailto:
> >> wangg...@gmail.com>> wrote:
> >>
> >> Hi Robert,
> >>
> >> Yes, you can check out the callback functions in the new API
> >>
> >> onPartitionDesigned
> >> onPartitionAssigned
> >>
> >> and see if they meet your needs.
> >>
> >> Guozhang
> >>
> >>
> >> On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert <
> robert.with...@dish.com
> >> <mailto:robert.with...@dish.com>>wrote:
> >>
> >> Jun,
> >>
> >> Are you saying it is possible to get events from the high-level consumer
> >> regarding various state machine changes?  For instance, can we get a
> >> notification when a rebalance starts and ends, when a partition is
> >> assigned/unassigned, when an offset is committed on a partition, when a
> >> leader changes and so on?  I call this OOB traffic, since they are not
> the
> >> core messages streaming, but side-band events, yet they are still
> >> potentially useful to consumers.
> >>
> >> Thank you,
> >> Robert
> >>
> >>
> >> Robert Withers
> >> Staff Analyst/Developer
> >> o: (720) 514-8963
> >> c:  (571) 262-1873
> >>
> >>
> >>
> >> -----Original Message-----
> >> From: Jun Rao [mailto:jun...@gmail.com]
> >> Sent: Sunday, February 23, 2014 4:19 PM
> >> To: users@kafka.apache.org<mailto:users@kafka.apache.org>
> >> Subject: Re: New Consumer API discussion
> >>
> >> Robert,
> >>
> >> For the push orient api, you can potentially implement your own
> >> MessageHandler with those methods. In the main loop of our new consumer
> >> api, you can just call those methods based on the events you get.
> >>
> >> Also, we already have an api to get the first and the last offset of a
> >> partition (getOffsetBefore).
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >>
> >> On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert
> >> <robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote:
> >>
> >> This is a good idea, too.  I would modify it to include stream
> >> marking, then you can have:
> >>
> >> long end = consumer.lastOffset(tp);
> >> consumer.setMark(end);
> >> while(consumer.beforeMark()) {
> >>   process(consumer.pollToMark());
> >> }
> >>
> >> or
> >>
> >> long end = consumer.lastOffset(tp);
> >> consumer.setMark(end);
> >> for(Object msg : consumer.iteratorToMark()) {
> >>   process(msg);
> >> }
> >>
> >> I actually have 4 suggestions, then:
> >>
> >> *   pull: stream marking
> >> *   pull: finite streams, bound by time range (up-to-now, yesterday) or
> >> offset
> >> *   pull: async api
> >> *   push: KafkaMessageSource, for a push model, with msg and OOB events.
> >> Build one in either individual or chunk mode and have a listener for
> >> each msg or a listener for a chunk of msgs.  Make it composable and
> >> policy driven (chunked, range, commitOffsets policy, retry policy,
> >> transactional)
> >>
> >> Thank you,
> >> Robert
> >>
> >> On Feb 22, 2014, at 11:21 AM, Jay Kreps <jay.kr...@gmail.com<mailto:
> >> jay.kr...@gmail.com><mailto:
> >> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>>> wrote:
> >>
> >> I think what Robert is saying is that we need to think through the
> >> offset API to enable "batch processing" of topic data. Think of a
> >> process that periodically kicks off to compute a data summary or do a
> >> data load or something like that. I think what we need to support this
> >> is an api to fetch the last offset from the server for a partition.
> >> Something like
> >>  long lastOffset(TopicPartition tp)
> >> and for symmetry
> >>  long firstOffset(TopicPartition tp)
> >>
> >> Likely this would have to be batched. Essentially we should add this
> >> use case to our set of code examples to write and think through.
> >>
> >> The usage would be something like
> >>
> >> long end = consumer.lastOffset(tp);
> >> while(consumer.position < end)
> >>   process(consumer.poll());
> >>
> >> -Jay
> >>
> >>
> >> On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert
> >> <robert.with...@dish.com<mailto:robert.with...@dish.com>
> >> <mailto:robert.with...@dish.com>>wrote:
> >>
> >> Jun,
> >>
> >> I was originally thinking a non-blocking read from a distributed
> >> stream should distinguish between "no local messages, but a fetch is
> >> occurring"
> >> versus "you have drained the stream".  The reason this may be valuable
> >> to me is so I can write consumers that read all known traffic then
> >> terminate.
> >> You caused me to reconsider and I think I am conflating 2 things.  One
> >> is a sync/async api while the other is whether to have an infinite or
> >> finite stream.  Is it possible to build a finite KafkaStream on a
> >> range of messages?
> >>
> >> Perhaps a Simple Consumer would do just fine and then I could start
> >> off getting the writeOffset from zookeeper and tell it to read a
> >> specified range per partition.  I've done this and forked a simple
> >> consumer runnable for each partition, for one of our analyzers.  The
> >> great thing about the high-level consumer is that rebalance, so I can
> >> fork however many stream readers I want and you just figure it out for
> >> me.  In that way you offer us the control over the resource
> >> consumption within a pull model.  This is best to regulate message
> >> pressure, they say.
> >>
> >> Combining that high-level rebalance ability with a ranged partition
> >> drain could be really nice...build the stream with an ending position
> >> and it is a finite stream, but retain the high-level rebalance.  With
> >> a finite stream, you would know the difference of the 2 async
> >> scenarios: fetch-in-progress versus end-of-stream.  With an infinite
> >> stream, you never get end-of-stream.
> >>
> >> Aside from a high-level consumer over a finite range within each
> >> partition, the other feature I can think of is more complicated.  A
> >> high-level consumer has state machine changes that the client cannot
> >> access, to my knowledge.  Our use of kafka has us invoke a message
> >> handler with each message we consumer from the KafkaStream, so we
> >> convert a pull-model to a push-model.  Including the idea of receiving
> >> notifications from state machine changes, what would be really nice is
> >> to have a KafkaMessageSource, that is an eventful push model.  If it
> >> were thread-safe, then we could register listeners for various events:
> >>
> >> *   opening-stream
> >> *   closing-stream
> >> *   message-arrived
> >> *   end-of-stream/no-more-messages-in-partition (for finite streams)
> >> *   rebalance started
> >> *   partition assigned
> >> *   partition unassigned
> >> *   rebalance finished
> >> *   partition-offset-committed
> >>
> >> Perhaps that is just our use, but instead of a pull-oriented
> >> KafkaStream, is there any sense in your providing a push-oriented
> >> KafkaMessageSource publishing OOB messages?
> >>
> >> thank you,
> >> Robert
> >>
> >> On Feb 21, 2014, at 5:59 PM, Jun Rao <jun...@gmail.com<mailto:
> >> jun...@gmail.com><mailto:
> >> jun...@gmail.com<mailto:jun...@gmail.com>><mailto:
> >> jun...@gmail.com<mailto:jun...@gmail.com><mailto:jun...@gmail.com>>>
> >> wrote:
> >>
> >> Robert,
> >>
> >> Could you explain why you want to distinguish btw
> >> FetchingInProgressException and NoMessagePendingException? The
> >> nextMsgs() method that you want is exactly what poll() does.
> >>
> >> Thanks,
> >>
> >> Jun
> >>
> >>
> >> On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert
> >> <robert.with...@dish.com<mailto:robert.with...@dish.com> <mailto:
> >> robert.with...@dish.com>
> >> <mailto:robert.with...@dish.com>>wrote:
> >>
> >> I am not clear on why the consumer stream should be positionable,
> >> especially if it is limited to the in-memory fetched messages.  Could
> >> someone explain to me, please?  I really like the idea of committing
> >> the offset specifically on those partitions with changed read offsets,
> >> only.
> >>
> >>
> >>
> >> 2 items I would like to see added to the KafkaStream are:
> >>
> >> *         a non-blocking next(), throws several exceptions
> >> (FetchingInProgressException and a NoMessagePendingException or
> >> something) to differentiate between fetching or no messages left.
> >>
> >> *         A nextMsgs() method which returns all locally available
> >> messages
> >> and kicks off a fetch for the next chunk.
> >>
> >>
> >>
> >> If you are trying to add transactional features, then formally define
> >> a DTP capability and pull in other server frameworks to share the
> >> implementation.  Should it be XA/Open?  How about a new peer2peer DTP
> >> protocol?
> >>
> >>
> >>
> >> Thank you,
> >>
> >> Robert
> >>
> >>
> >>
> >> Robert Withers
> >>
> >> Staff Analyst/Developer
> >>
> >> o: (720) 514-8963
> >>
> >> c:  (571) 262-1873
> >>
> >>
> >>
> >> -----Original Message-----
> >> From: Jay Kreps [mailto:jay.kr...@gmail.com]
> >> Sent: Sunday, February 16, 2014 10:13 AM
> >> To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:
> >> users@kafka.apache.org><mailto:
> >> users@kafka.apache.org<mailto:users@kafka.apache.org>>
> >> Subject: Re: New Consumer API discussion
> >>
> >>
> >>
> >> +1 I think those are good. It is a little weird that changing the
> >> +fetch
> >>
> >> point is not batched but changing the commit point is, but I suppose
> >> there is no helping that.
> >>
> >>
> >>
> >> -Jay
> >>
> >>
> >>
> >>
> >>
> >> On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede
> >> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com> <mailto:
> >> neha.narkh...@gmail.com>
> >> <mailto:neha.narkh...@gmail.com>
> >> <mailto:neha.narkh...@gmail.com>>wrote:
> >>
> >>
> >>
> >> Jay,
> >>
> >>
> >>
> >> That makes sense. position/seek deal with changing the consumers
> >>
> >> in-memory data, so there is no remote rpc there. For some reason, I
> >>
> >> got committed and seek mixed up in my head at that time :)
> >>
> >>
> >>
> >> So we still end up with
> >>
> >>
> >>
> >> long position(TopicPartition tp)
> >>
> >> void seek(TopicPartitionOffset p)
> >>
> >> Map<TopicPartition, Long> committed(TopicPartition tp);
> >>
> >> void commit(TopicPartitionOffset...);
> >>
> >>
> >>
> >> Thanks,
> >>
> >> Neha
> >>
> >>
> >>
> >> On Friday, February 14, 2014, Jay Kreps <jay.kr...@gmail.com<mailto:
> >> jay.kr...@gmail.com><mailto:
> >> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto:
> >> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
> >> jay.kr...@gmail.com>><mailto:
> >> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
> >> jay.kr...@gmail.com><mailto:jay.kreps@gmail
> >> .com>>>
> >> wrote:
> >>
> >>
> >>
> >> Oh, interesting. So I am assuming the following implementation:
> >>
> >> 1. We have an in-memory fetch position which controls the next fetch
> >>
> >> offset.
> >>
> >> 2. Changing this has no effect until you poll again at which point
> >>
> >> your fetch request will be from the newly specified offset 3. We
> >>
> >> then have an in-memory but also remotely stored committed offset.
> >>
> >> 4. Calling commit has the effect of saving the fetch position as
> >>
> >> both the in memory committed position and in the remote store 5.
> >>
> >> Auto-commit is the same as periodically calling commit on all
> >>
> >> positions.
> >>
> >>
> >>
> >> So batching on commit as well as getting the committed position
> >>
> >> makes sense, but batching the fetch position wouldn't, right? I
> >>
> >> think you are actually thinking of a different approach.
> >>
> >>
> >>
> >> -Jay
> >>
> >>
> >>
> >>
> >>
> >> On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede
> >>
> >> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:
> >> neha.narkh...@gmail.com><mailto:
> >> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>>
> >>
> >> <javascript:;>
> >>
> >> wrote:
> >>
> >>
> >>
> >> I think you are saying both, i.e. if you have committed on a
> >>
> >> partition it returns you that value but if you
> >>
> >> haven't
> >>
> >> it does a remote lookup?
> >>
> >>
> >>
> >> Correct.
> >>
> >>
> >>
> >> The other argument for making committed batched is that commit()
> >>
> >> is batched, so there is symmetry.
> >>
> >>
> >>
> >> position() and seek() are always in memory changes (I assume) so
> >>
> >> there
> >>
> >> is
> >>
> >> no need to batch them.
> >>
> >>
> >>
> >> I'm not as sure as you are about that assumption being true.
> >>
> >> Basically
> >>
> >> in
> >>
> >> my example above, the batching argument for committed() also
> >>
> >> applies to
> >>
> >> position() since one purpose of fetching a partition's offset is
> >>
> >> to use
> >>
> >> it
> >>
> >> to set the position of the consumer to that offset. Since that
> >>
> >> might
> >>
> >> lead
> >>
> >> to a remote OffsetRequest call, I think we probably would be
> >>
> >> better off batching it.
> >>
> >>
> >>
> >> Another option for naming would be position/reposition instead of
> >>
> >> position/seek.
> >>
> >>
> >>
> >> I think position/seek is better since it aligns with Java file APIs.
> >>
> >>
> >>
> >> I also think your suggestion about ConsumerPosition makes sense.
> >>
> >>
> >>
> >> Thanks,
> >>
> >> Neha
> >>
> >> On Feb 13, 2014 9:22 PM, "Jay Kreps" <jay.kr...@gmail.com<mailto:
> >> jay.kr...@gmail.com><mailto:
> >> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto:
> >> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
> >> jay.kr...@gmail.com>><mailto:
> >> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
> >> jay.kr...@gmail.com><mailto:jay.kreps@gmail
> >> .com>>>
> >> wrote:
> >>
> >>
> >>
> >> Hey Neha,
> >>
> >>
> >>
> >> I actually wasn't proposing the name TopicOffsetPosition, that
> >>
> >> was
> >>
> >> just a
> >>
> >> typo. I meant TopicPartitionOffset, and I was just referencing
> >>
> >> what
> >>
> >> was
> >>
> >> in
> >>
> >> the javadoc. So to restate my proposal without the typo, using
> >>
> >> just
> >>
> >> the
> >>
> >> existing classes (that naming is a separate question):
> >>
> >> long position(TopicPartition tp)
> >>
> >> void seek(TopicPartitionOffset p)
> >>
> >> long committed(TopicPartition tp)
> >>
> >> void commit(TopicPartitionOffset...);
> >>
> >>
> >>
> >> So I may be unclear on committed() (AKA lastCommittedOffset). Is
> >>
> >> it returning the in-memory value from the last commit by this
> >>
> >> consumer,
> >>
> >> or
> >>
> >> is
> >>
> >> it doing a remote fetch, or both? I think you are saying both, i.e.
> >>
> >> if
> >>
> >> you
> >>
> >> have committed on a partition it returns you that value but if
> >>
> >> you
> >>
> >> haven't
> >>
> >> it does a remote lookup?
> >>
> >>
> >>
> >> The other argument for making committed batched is that commit()
> >>
> >> is batched, so there is symmetry.
> >>
> >>
> >>
> >> position() and seek() are always in memory changes (I assume) so
> >>
> >> there
> >>
> >> is
> >>
> >> no need to batch them.
> >>
> >>
> >>
> >> So taking all that into account what if we revise it to
> >>
> >> long position(TopicPartition tp)
> >>
> >> void seek(TopicPartitionOffset p)
> >>
> >> Map<TopicPartition, Long> committed(TopicPartition tp);
> >>
> >> void commit(TopicPartitionOffset...);
> >>
> >>
> >>
> >> This is not symmetric between position/seek and commit/committed
> >>
> >> but
> >>
> >> it
> >>
> >> is
> >>
> >> convenient. Another option for naming would be
> >>
> >> position/reposition
> >>
> >> instead
> >>
> >> of position/seek.
> >>
> >>
> >>
> >> With respect to the name TopicPartitionOffset, what I was trying
> >>
> >> to
> >>
> >> say
> >>
> >> is
> >>
> >> that I recommend we change that to something shorter. I think
> >>
> >> TopicPosition
> >>
> >> or ConsumerPosition might be better. Position does not refer to
> >>
> >> the variables in the object, it refers to the meaning of the
> >>
> >> object--it represents a position within a topic. The offset
> >>
> >> field in that object
> >>
> >> is
> >>
> >> still called the offset. TopicOffset, PartitionOffset, or
> >>
> >> ConsumerOffset
> >>
> >> would all be workable too. Basically I am just objecting to
> >>
> >> concatenating
> >>
> >> three nouns together. :-)
> >>
> >>
> >>
> >> -Jay
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede <
> >>
> >> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:
> >> neha.narkh...@gmail.com><mailto:
> >> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>><mailto:
> >> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:
> >> neha.narkh...@gmail.com>>
> >>
> >> wrote:
> >>
> >>
> >>
> >> 2. It returns a list of results. But how can you use the list?
> >>
> >> The
> >>
> >> only
> >>
> >> way
> >>
> >> to use the list is to make a map of tp=>offset and then look
> >>
> >> up
> >>
> >> results
> >>
> >> in
> >>
> >> this map (or do a for loop over the list for the partition you
> >>
> >> want). I
> >>
> >> recommend that if this is an in-memory check we just do one at
> >>
> >> a
> >>
> >> time.
> >>
> >> E.g.
> >>
> >> long committedPosition(
> >>
> >> TopicPosition).
> >>
> >>
> >>
> >> This was discussed in the previous emails. There is a choic
> >>
> >>
> >>
> >>
> >> --
> >> Robert Withers
> >> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto:
> >> robert.with...@dish.com><mailto:
> >> robert.with...@dish.com<mailto:robert.with...@dish.com>>
> >> c: 303.919.5856
> >>
> >>
> >>
> >> --
> >> Robert Withers
> >> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto:
> >> robert.with...@dish.com>
> >> c: 303.919.5856
> >>
> >>
> >>
> >>
> >>
> >>
> >> --
> >> -- Guozhang
> >>
> >>
> >>
> >> --
> >> Robert Withers
> >> robert.with...@dish.com<mailto:robert.with...@dish.com>
> >> c: 303.919.5856
> >>
> >>
> >
>

Reply via email to