Is 
this<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html#seek%28kafka.common.TopicPartitionOffset...%29>what
you are looking for? Basically, I think from the overall feedback, it
looks like code snippets don't seem to work for overall understanding of
the APIs. I plan to update the javadoc with more complete examples that
have been discussed so far on this thread and generally on the mailing list.

Thanks,
Neha




On Thu, Feb 27, 2014 at 4:17 AM, Robert Withers
<robert.w.with...@gmail.com>wrote:

> Neha,
>
> I see how one might wish to implement onPartitionsAssigned and
> onPartitionsRevoked, but I don't have a sense for how I might supply these
> implementations to a running consumer.  What would the setup code look like
> to start a high-level consumer with these provided implementations?
>
> thanks,
> Rob
>
>
> On Feb 27, 2014, at 3:48 AM, Neha Narkhede <neha.narkh...@gmail.com>
> wrote:
>
> > Rob,
> >
> > The use of the callbacks is explained in the javadoc here -
> >
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html
> >
> > Let me know if it makes sense. The hope is to improve the javadoc so that
> > it is self explanatory.
> >
> > Thanks,
> > Neha
> >
> >
> > On Wed, Feb 26, 2014 at 9:16 AM, Robert Withers
> > <robert.w.with...@gmail.com>wrote:
> >
> >> Neha, what does the use of the RebalanceBeginCallback and
> >> RebalanceEndCallback look like?
> >>
> >> thanks,
> >> Rob
> >>
> >> On Feb 25, 2014, at 3:51 PM, Neha Narkhede <neha.narkh...@gmail.com>
> >> wrote:
> >>
> >>> How do you know n? The whole point is that you need to be able to fetch
> >> the
> >>> end offset. You can't a priori decide you will load 1m messages without
> >>> knowing what is there.
> >>>
> >>> Hmm. I think what you are pointing out is that in the new consumer API,
> >> we
> >>> don't have a way to issue the equivalent of the existing
> >> getOffsetsBefore()
> >>> API. Agree that is a flaw that we should fix.
> >>>
> >>> Will update the docs/wiki with a few use cases that I've collected so
> far
> >>> and see if the API covers those.
> >>>
> >>> I would prefer PartitionsAssigned and PartitionsRevoked as that seems
> >>> clearer to me
> >>>
> >>> Well the RebalanceBeginCallback interface will have
> >> onPartitionsAssigned()
> >>> as the callback. Similarly, the RebalanceEndCallback interface will
> have
> >>> onPartitionsRevoked() as the callback. Makes sense?
> >>>
> >>> Thanks,
> >>> Neha
> >>>
> >>>
> >>> On Tue, Feb 25, 2014 at 2:38 PM, Jay Kreps <jay.kr...@gmail.com>
> wrote:
> >>>
> >>>> 1. I would prefer PartitionsAssigned and PartitionsRevoked as that
> seems
> >>>> clearer to me.
> >>>>
> >>>> -Jay
> >>>>
> >>>>
> >>>> On Tue, Feb 25, 2014 at 10:19 AM, Neha Narkhede <
> >> neha.narkh...@gmail.com
> >>>>> wrote:
> >>>>
> >>>>> Thanks for the reviews so far! There are a few outstanding questions
> -
> >>>>>
> >>>>> 1.  It will be good to make the rebalance callbacks forward
> compatible
> >>>> with
> >>>>> Java 8 capabilities. We can change it to PartitionsAssignedCallback
> >>>>> and PartitionsRevokedCallback or RebalanceBeginCallback and
> >>>>> RebalanceEndCallback?
> >>>>>
> >>>>> If there are no objections, I will change it to
> RebalanceBeginCallback
> >>>> and
> >>>>> RebalanceEndCallback.
> >>>>>
> >>>>> 2.  The return type for committed() is List<TopicPartitionOffset>.
> >> There
> >>>>> was a suggestion to change it to either be Map<TopicPartition,Long>
> or
> >>>>> Map<TopicPartition, TopicPartitionOffset>
> >>>>>
> >>>>> Do people have feedback on this suggestion?
> >>>>>
> >>>>>
> >>>>> On Tue, Feb 25, 2014 at 9:56 AM, Neha Narkhede <
> >> neha.narkh...@gmail.com
> >>>>>> wrote:
> >>>>>
> >>>>>> Robert,
> >>>>>>
> >>>>>> Are you saying it is possible to get events from the high-level
> >>>>> consumerregarding various state machine changes?  For instance, can
> we
> >>>> get a
> >>>>>> notification when a rebalance starts and ends, when a partition is
> >>>>>> assigned/unassigned, when an offset is committed on a partition,
> when
> >> a
> >>>>>> leader changes and so on?  I call this OOB traffic, since they are
> not
> >>>>> the
> >>>>>> core messages streaming, but side-band events, yet they are still
> >>>>>> potentially useful to consumers.
> >>>>>>
> >>>>>> In the current proposal, you get notified when the state machine
> >>>> changes
> >>>>>> i.e. before and after a rebalance is triggered. Look at
> >>>>>> ConsumerRebalanceCallback<
> >>>>>
> >>>>
> >>
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html
> >>>>>>
> >>>>>> .Leader changes do not count as state machine changes for consumer
> >>>>>> rebalance purposes.
> >>>>>>
> >>>>>> Thanks,
> >>>>>> Neha
> >>>>>>
> >>>>>>
> >>>>>> On Tue, Feb 25, 2014 at 9:54 AM, Neha Narkhede <
> >>>> neha.narkh...@gmail.com
> >>>>>> wrote:
> >>>>>>
> >>>>>>> Jay/Robert -
> >>>>>>>
> >>>>>>>
> >>>>>>> I think what Robert is saying is that we need to think through the
> >>>>> offset
> >>>>>>> API to enable "batch processing" of topic data. Think of a process
> >>>> that
> >>>>>>> periodically kicks off to compute a data summary or do a data load
> or
> >>>>>>> something like that. I think what we need to support this is an api
> >> to
> >>>>>>> fetch the last offset from the server for a partition. Something
> like
> >>>>>>>  long lastOffset(TopicPartition tp)
> >>>>>>> and for symmetry
> >>>>>>>  long firstOffset(TopicPartition tp)
> >>>>>>>
> >>>>>>> Likely this would have to be batched.
> >>>>>>>
> >>>>>>> A fixed range of data load can be done using the existing APIs as
> >>>>>>> follows. This assumes you know the endOffset which can be
> >>>> currentOffset
> >>>>> + n
> >>>>>>> (number of messages in the load)
> >>>>>>>
> >>>>>>> long startOffset = consumer.position(partition);
> >>>>>>> long endOffset = startOffset + n;
> >>>>>>> while(consumer.position(partition) <= endOffset) {
> >>>>>>>    List<ConsumerRecord> messages = consumer.poll(timeout,
> >>>>>>> TimeUnit.MILLISECONDS);
> >>>>>>>    process(messages, endOffset);          // processes messages
> >>>> until
> >>>>>>> endOffset
> >>>>>>> }
> >>>>>>>
> >>>>>>> Does that make sense?
> >>>>>>>
> >>>>>>>
> >>>>>>> On Tue, Feb 25, 2014 at 9:49 AM, Neha Narkhede <
> >>>> neha.narkh...@gmail.com
> >>>>>> wrote:
> >>>>>>>
> >>>>>>>> Thanks for the review, Jun. Here are some comments -
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 1. The using of ellipsis: This may make passing a list of items
> from
> >>>> a
> >>>>>>>> collection to the api a bit harder. Suppose that you have a list
> of
> >>>>>>>> topics
> >>>>>>>> stored in
> >>>>>>>>
> >>>>>>>> ArrayList<String> topics;
> >>>>>>>>
> >>>>>>>> If you want subscribe to all topics in one call, you will have to
> >> do:
> >>>>>>>>
> >>>>>>>> String[] topicArray = new String[topics.size()];
> >>>>>>>> consumer.subscribe(topics.
> >>>>>>>> toArray(topicArray));
> >>>>>>>>
> >>>>>>>> A similar argument can be made for arguably the more common use
> case
> >>>> of
> >>>>>>>> subscribing to a single topic as well. In these cases, user is
> >>>> required
> >>>>>>>> to write more
> >>>>>>>> code to create a single item collection and pass it in. Since
> >>>>>>>> subscription is extremely lightweight
> >>>>>>>> invoking it multiple times also seems like a workable solution,
> no?
> >>>>>>>>
> >>>>>>>> 2. It would be good to document that the following apis are
> mutually
> >>>>>>>> exclusive. Also, if the partition level subscription is specified,
> >>>>> there
> >>>>>>>> is
> >>>>>>>> no group management. Finally, unsubscribe() can only be used to
> >>>> cancel
> >>>>>>>> subscriptions with the same pattern. For example, you can't
> >>>> unsubscribe
> >>>>>>>> at
> >>>>>>>> the partition level if the subscription is done at the topic
> level.
> >>>>>>>>
> >>>>>>>> *subscribe*(java.lang.String... topics)
> >>>>>>>> *subscribe*(java.lang.String topic, int... partitions)
> >>>>>>>>
> >>>>>>>> Makes sense. Made the suggested improvements to the docs<
> >>>>>
> >>>>
> >>
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29
> >>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> 3.commit(): The following comment in the doc should probably say
> >>>>> "commit
> >>>>>>>> offsets for partitions assigned to this consumer".
> >>>>>>>>
> >>>>>>>> If no partitions are specified, commits offsets for the subscribed
> >>>>> list
> >>>>>>>> of
> >>>>>>>> topics and partitions to Kafka.
> >>>>>>>>
> >>>>>>>> Could you give more context on this suggestion? Here is the entire
> >>>> doc
> >>>>> -
> >>>>>>>>
> >>>>>>>> Synchronously commits the specified offsets for the specified list
> >> of
> >>>>>>>> topics and partitions to *Kafka*. If no partitions are specified,
> >>>>>>>> commits offsets for the subscribed list of topics and partitions.
> >>>>>>>>
> >>>>>>>> The hope is to convey that if no partitions are specified, offsets
> >>>> will
> >>>>>>>> be committed for the subscribed list of partitions. One
> improvement
> >>>>> could
> >>>>>>>> be to
> >>>>>>>> explicitly state that the offsets returned on the last poll will
> be
> >>>>>>>> committed. I updated this to -
> >>>>>>>>
> >>>>>>>> Synchronously commits the specified offsets for the specified list
> >> of
> >>>>>>>> topics and partitions to *Kafka*. If no offsets are specified,
> >>>> commits
> >>>>>>>> offsets returned on the last {@link #poll(long, TimeUnit) poll()}
> >> for
> >>>>>>>> the subscribed list of topics and partitions.
> >>>>>>>>
> >>>>>>>> 4. There is inconsistency in specifying partitions. Sometimes we
> use
> >>>>>>>> TopicPartition and some other times we use String and int (see
> >>>>>>>> examples below).
> >>>>>>>>
> >>>>>>>> void onPartitionsAssigned(Consumer consumer,
> >>>>>>>> TopicPartition...partitions)
> >>>>>>>>
> >>>>>>>> public void *subscribe*(java.lang.String topic, int... partitions)
> >>>>>>>>
> >>>>>>>> Yes, this was discussed previously. I think generally the
> consensus
> >>>>>>>> seems to be to use the higher level
> >>>>>>>> classes everywhere. Made those changes.
> >>>>>>>>
> >>>>>>>> What's the use case of position()? Isn't that just the
> nextOffset()
> >>>> on
> >>>>>>>> the
> >>>>>>>> last message returned from poll()?
> >>>>>>>>
> >>>>>>>> Yes, except in the case where a rebalance is triggered and poll()
> is
> >>>>> not
> >>>>>>>> yet invoked. Here, you would use position() to get the new fetch
> >>>>> position
> >>>>>>>> for the specific partition. Even if this is not a common use case,
> >>>> IMO
> >>>>> it
> >>>>>>>> is much easier to use position() to get the fetch offset than
> >>>> invoking
> >>>>>>>> nextOffset() on the last message. This also keeps the APIs
> >> symmetric,
> >>>>> which
> >>>>>>>> is nice.
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert <
> >>>>>>>> robert.with...@dish.com> wrote:
> >>>>>>>>
> >>>>>>>>> That's wonderful.  Thanks for kafka.
> >>>>>>>>>
> >>>>>>>>> Rob
> >>>>>>>>>
> >>>>>>>>> On Feb 24, 2014, at 9:58 AM, Guozhang Wang <wangg...@gmail.com
> >>>>> <mailto:
> >>>>>>>>> wangg...@gmail.com>> wrote:
> >>>>>>>>>
> >>>>>>>>> Hi Robert,
> >>>>>>>>>
> >>>>>>>>> Yes, you can check out the callback functions in the new API
> >>>>>>>>>
> >>>>>>>>> onPartitionDesigned
> >>>>>>>>> onPartitionAssigned
> >>>>>>>>>
> >>>>>>>>> and see if they meet your needs.
> >>>>>>>>>
> >>>>>>>>> Guozhang
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert <
> >>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote:
> >>>>>>>>>
> >>>>>>>>> Jun,
> >>>>>>>>>
> >>>>>>>>> Are you saying it is possible to get events from the high-level
> >>>>> consumer
> >>>>>>>>> regarding various state machine changes?  For instance, can we
> get
> >> a
> >>>>>>>>> notification when a rebalance starts and ends, when a partition
> is
> >>>>>>>>> assigned/unassigned, when an offset is committed on a partition,
> >>>> when
> >>>>> a
> >>>>>>>>> leader changes and so on?  I call this OOB traffic, since they
> are
> >>>> not
> >>>>>>>>> the
> >>>>>>>>> core messages streaming, but side-band events, yet they are still
> >>>>>>>>> potentially useful to consumers.
> >>>>>>>>>
> >>>>>>>>> Thank you,
> >>>>>>>>> Robert
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Robert Withers
> >>>>>>>>> Staff Analyst/Developer
> >>>>>>>>> o: (720) 514-8963
> >>>>>>>>> c:  (571) 262-1873
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> -----Original Message-----
> >>>>>>>>> From: Jun Rao [mailto:jun...@gmail.com]
> >>>>>>>>> Sent: Sunday, February 23, 2014 4:19 PM
> >>>>>>>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org>
> >>>>>>>>> Subject: Re: New Consumer API discussion
> >>>>>>>>>
> >>>>>>>>> Robert,
> >>>>>>>>>
> >>>>>>>>> For the push orient api, you can potentially implement your own
> >>>>>>>>> MessageHandler with those methods. In the main loop of our new
> >>>>> consumer
> >>>>>>>>> api, you can just call those methods based on the events you get.
> >>>>>>>>>
> >>>>>>>>> Also, we already have an api to get the first and the last offset
> >>>> of a
> >>>>>>>>> partition (getOffsetBefore).
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>>
> >>>>>>>>> Jun
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert
> >>>>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote:
> >>>>>>>>>
> >>>>>>>>> This is a good idea, too.  I would modify it to include stream
> >>>>>>>>> marking, then you can have:
> >>>>>>>>>
> >>>>>>>>> long end = consumer.lastOffset(tp);
> >>>>>>>>> consumer.setMark(end);
> >>>>>>>>> while(consumer.beforeMark()) {
> >>>>>>>>> process(consumer.pollToMark());
> >>>>>>>>> }
> >>>>>>>>>
> >>>>>>>>> or
> >>>>>>>>>
> >>>>>>>>> long end = consumer.lastOffset(tp);
> >>>>>>>>> consumer.setMark(end);
> >>>>>>>>> for(Object msg : consumer.iteratorToMark()) {
> >>>>>>>>> process(msg);
> >>>>>>>>> }
> >>>>>>>>>
> >>>>>>>>> I actually have 4 suggestions, then:
> >>>>>>>>>
> >>>>>>>>> *   pull: stream marking
> >>>>>>>>> *   pull: finite streams, bound by time range (up-to-now,
> >> yesterday)
> >>>>> or
> >>>>>>>>> offset
> >>>>>>>>> *   pull: async api
> >>>>>>>>> *   push: KafkaMessageSource, for a push model, with msg and OOB
> >>>>> events.
> >>>>>>>>> Build one in either individual or chunk mode and have a listener
> >> for
> >>>>>>>>> each msg or a listener for a chunk of msgs.  Make it composable
> and
> >>>>>>>>> policy driven (chunked, range, commitOffsets policy, retry
> policy,
> >>>>>>>>> transactional)
> >>>>>>>>>
> >>>>>>>>> Thank you,
> >>>>>>>>> Robert
> >>>>>>>>>
> >>>>>>>>> On Feb 22, 2014, at 11:21 AM, Jay Kreps <jay.kr...@gmail.com
> >>>> <mailto:
> >>>>>>>>> jay.kr...@gmail.com><mailto:
> >>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>>> wrote:
> >>>>>>>>>
> >>>>>>>>> I think what Robert is saying is that we need to think through
> the
> >>>>>>>>> offset API to enable "batch processing" of topic data. Think of a
> >>>>>>>>> process that periodically kicks off to compute a data summary or
> do
> >>>> a
> >>>>>>>>> data load or something like that. I think what we need to support
> >>>> this
> >>>>>>>>> is an api to fetch the last offset from the server for a
> partition.
> >>>>>>>>> Something like
> >>>>>>>>> long lastOffset(TopicPartition tp)
> >>>>>>>>> and for symmetry
> >>>>>>>>> long firstOffset(TopicPartition tp)
> >>>>>>>>>
> >>>>>>>>> Likely this would have to be batched. Essentially we should add
> >> this
> >>>>>>>>> use case to our set of code examples to write and think through.
> >>>>>>>>>
> >>>>>>>>> The usage would be something like
> >>>>>>>>>
> >>>>>>>>> long end = consumer.lastOffset(tp);
> >>>>>>>>> while(consumer.position < end)
> >>>>>>>>> process(consumer.poll());
> >>>>>>>>>
> >>>>>>>>> -Jay
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert
> >>>>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com>
> >>>>>>>>> <mailto:robert.with...@dish.com>>wrote:
> >>>>>>>>>
> >>>>>>>>> Jun,
> >>>>>>>>>
> >>>>>>>>> I was originally thinking a non-blocking read from a distributed
> >>>>>>>>> stream should distinguish between "no local messages, but a fetch
> >> is
> >>>>>>>>> occurring"
> >>>>>>>>> versus "you have drained the stream".  The reason this may be
> >>>> valuable
> >>>>>>>>> to me is so I can write consumers that read all known traffic
> then
> >>>>>>>>> terminate.
> >>>>>>>>> You caused me to reconsider and I think I am conflating 2 things.
> >>>> One
> >>>>>>>>> is a sync/async api while the other is whether to have an
> infinite
> >>>> or
> >>>>>>>>> finite stream.  Is it possible to build a finite KafkaStream on a
> >>>>>>>>> range of messages?
> >>>>>>>>>
> >>>>>>>>> Perhaps a Simple Consumer would do just fine and then I could
> start
> >>>>>>>>> off getting the writeOffset from zookeeper and tell it to read a
> >>>>>>>>> specified range per partition.  I've done this and forked a
> simple
> >>>>>>>>> consumer runnable for each partition, for one of our analyzers.
> >> The
> >>>>>>>>> great thing about the high-level consumer is that rebalance, so I
> >>>> can
> >>>>>>>>> fork however many stream readers I want and you just figure it
> out
> >>>> for
> >>>>>>>>> me.  In that way you offer us the control over the resource
> >>>>>>>>> consumption within a pull model.  This is best to regulate
> message
> >>>>>>>>> pressure, they say.
> >>>>>>>>>
> >>>>>>>>> Combining that high-level rebalance ability with a ranged
> partition
> >>>>>>>>> drain could be really nice...build the stream with an ending
> >>>> position
> >>>>>>>>> and it is a finite stream, but retain the high-level rebalance.
> >>>> With
> >>>>>>>>> a finite stream, you would know the difference of the 2 async
> >>>>>>>>> scenarios: fetch-in-progress versus end-of-stream.  With an
> >> infinite
> >>>>>>>>> stream, you never get end-of-stream.
> >>>>>>>>>
> >>>>>>>>> Aside from a high-level consumer over a finite range within each
> >>>>>>>>> partition, the other feature I can think of is more complicated.
>  A
> >>>>>>>>> high-level consumer has state machine changes that the client
> >> cannot
> >>>>>>>>> access, to my knowledge.  Our use of kafka has us invoke a
> message
> >>>>>>>>> handler with each message we consumer from the KafkaStream, so we
> >>>>>>>>> convert a pull-model to a push-model.  Including the idea of
> >>>> receiving
> >>>>>>>>> notifications from state machine changes, what would be really
> nice
> >>>> is
> >>>>>>>>> to have a KafkaMessageSource, that is an eventful push model.  If
> >> it
> >>>>>>>>> were thread-safe, then we could register listeners for various
> >>>> events:
> >>>>>>>>>
> >>>>>>>>> *   opening-stream
> >>>>>>>>> *   closing-stream
> >>>>>>>>> *   message-arrived
> >>>>>>>>> *   end-of-stream/no-more-messages-in-partition (for finite
> >> streams)
> >>>>>>>>> *   rebalance started
> >>>>>>>>> *   partition assigned
> >>>>>>>>> *   partition unassigned
> >>>>>>>>> *   rebalance finished
> >>>>>>>>> *   partition-offset-committed
> >>>>>>>>>
> >>>>>>>>> Perhaps that is just our use, but instead of a pull-oriented
> >>>>>>>>> KafkaStream, is there any sense in your providing a push-oriented
> >>>>>>>>> KafkaMessageSource publishing OOB messages?
> >>>>>>>>>
> >>>>>>>>> thank you,
> >>>>>>>>> Robert
> >>>>>>>>>
> >>>>>>>>> On Feb 21, 2014, at 5:59 PM, Jun Rao <jun...@gmail.com<mailto:
> >>>>>>>>> jun...@gmail.com><mailto:
> >>>>>>>>> jun...@gmail.com<mailto:jun...@gmail.com>><mailto:
> >>>>>>>>> jun...@gmail.com<mailto:jun...@gmail.com><mailto:
> jun...@gmail.com
> >>>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>> Robert,
> >>>>>>>>>
> >>>>>>>>> Could you explain why you want to distinguish btw
> >>>>>>>>> FetchingInProgressException and NoMessagePendingException? The
> >>>>>>>>> nextMsgs() method that you want is exactly what poll() does.
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>>
> >>>>>>>>> Jun
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert
> >>>>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com>
> <mailto:
> >>>>>>>>> robert.with...@dish.com>
> >>>>>>>>> <mailto:robert.with...@dish.com>>wrote:
> >>>>>>>>>
> >>>>>>>>> I am not clear on why the consumer stream should be positionable,
> >>>>>>>>> especially if it is limited to the in-memory fetched messages.
> >>>> Could
> >>>>>>>>> someone explain to me, please?  I really like the idea of
> >> committing
> >>>>>>>>> the offset specifically on those partitions with changed read
> >>>> offsets,
> >>>>>>>>> only.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> 2 items I would like to see added to the KafkaStream are:
> >>>>>>>>>
> >>>>>>>>> *         a non-blocking next(), throws several exceptions
> >>>>>>>>> (FetchingInProgressException and a NoMessagePendingException or
> >>>>>>>>> something) to differentiate between fetching or no messages left.
> >>>>>>>>>
> >>>>>>>>> *         A nextMsgs() method which returns all locally available
> >>>>>>>>> messages
> >>>>>>>>> and kicks off a fetch for the next chunk.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> If you are trying to add transactional features, then formally
> >>>> define
> >>>>>>>>> a DTP capability and pull in other server frameworks to share the
> >>>>>>>>> implementation.  Should it be XA/Open?  How about a new peer2peer
> >>>> DTP
> >>>>>>>>> protocol?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Thank you,
> >>>>>>>>>
> >>>>>>>>> Robert
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Robert Withers
> >>>>>>>>>
> >>>>>>>>> Staff Analyst/Developer
> >>>>>>>>>
> >>>>>>>>> o: (720) 514-8963
> >>>>>>>>>
> >>>>>>>>> c:  (571) 262-1873
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> -----Original Message-----
> >>>>>>>>> From: Jay Kreps [mailto:jay.kr...@gmail.com]
> >>>>>>>>> Sent: Sunday, February 16, 2014 10:13 AM
> >>>>>>>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org
> ><mailto:
> >>>>>>>>> users@kafka.apache.org><mailto:
> >>>>>>>>> users@kafka.apache.org<mailto:users@kafka.apache.org>>
> >>>>>>>>> Subject: Re: New Consumer API discussion
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> +1 I think those are good. It is a little weird that changing the
> >>>>>>>>> +fetch
> >>>>>>>>>
> >>>>>>>>> point is not batched but changing the commit point is, but I
> >> suppose
> >>>>>>>>> there is no helping that.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> -Jay
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede
> >>>>>>>>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>
> <mailto:
> >>>>>>>>> neha.narkh...@gmail.com>
> >>>>>>>>> <mailto:neha.narkh...@gmail.com>
> >>>>>>>>> <mailto:neha.narkh...@gmail.com>>wrote:
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Jay,
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> That makes sense. position/seek deal with changing the consumers
> >>>>>>>>>
> >>>>>>>>> in-memory data, so there is no remote rpc there. For some
> reason, I
> >>>>>>>>>
> >>>>>>>>> got committed and seek mixed up in my head at that time :)
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> So we still end up with
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> long position(TopicPartition tp)
> >>>>>>>>>
> >>>>>>>>> void seek(TopicPartitionOffset p)
> >>>>>>>>>
> >>>>>>>>> Map<TopicPartition, Long> committed(TopicPartition tp);
> >>>>>>>>>
> >>>>>>>>> void commit(TopicPartitionOffset...);
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>>
> >>>>>>>>> Neha
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Friday, February 14, 2014, Jay Kreps <jay.kr...@gmail.com
> >>>> <mailto:
> >>>>>>>>> jay.kr...@gmail.com><mailto:
> >>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto:
> >>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
> >>>>>>>>> jay.kr...@gmail.com>><mailto:
> >>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
> >>>>>>>>> jay.kr...@gmail.com><mailto:jay.kreps@gmail
> >>>>>>>>> .com>>>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Oh, interesting. So I am assuming the following implementation:
> >>>>>>>>>
> >>>>>>>>> 1. We have an in-memory fetch position which controls the next
> >> fetch
> >>>>>>>>>
> >>>>>>>>> offset.
> >>>>>>>>>
> >>>>>>>>> 2. Changing this has no effect until you poll again at which
> point
> >>>>>>>>>
> >>>>>>>>> your fetch request will be from the newly specified offset 3. We
> >>>>>>>>>
> >>>>>>>>> then have an in-memory but also remotely stored committed offset.
> >>>>>>>>>
> >>>>>>>>> 4. Calling commit has the effect of saving the fetch position as
> >>>>>>>>>
> >>>>>>>>> both the in memory committed position and in the remote store 5.
> >>>>>>>>>
> >>>>>>>>> Auto-commit is the same as periodically calling commit on all
> >>>>>>>>>
> >>>>>>>>> positions.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> So batching on commit as well as getting the committed position
> >>>>>>>>>
> >>>>>>>>> makes sense, but batching the fetch position wouldn't, right? I
> >>>>>>>>>
> >>>>>>>>> think you are actually thinking of a different approach.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> -Jay
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede
> >>>>>>>>>
> >>>>>>>>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:
> >>>>>>>>> neha.narkh...@gmail.com><mailto:
> >>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>>
> >>>>>>>>>
> >>>>>>>>> <javascript:;>
> >>>>>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> I think you are saying both, i.e. if you have committed on a
> >>>>>>>>>
> >>>>>>>>> partition it returns you that value but if you
> >>>>>>>>>
> >>>>>>>>> haven't
> >>>>>>>>>
> >>>>>>>>> it does a remote lookup?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Correct.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> The other argument for making committed batched is that commit()
> >>>>>>>>>
> >>>>>>>>> is batched, so there is symmetry.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> position() and seek() are always in memory changes (I assume) so
> >>>>>>>>>
> >>>>>>>>> there
> >>>>>>>>>
> >>>>>>>>> is
> >>>>>>>>>
> >>>>>>>>> no need to batch them.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> I'm not as sure as you are about that assumption being true.
> >>>>>>>>>
> >>>>>>>>> Basically
> >>>>>>>>>
> >>>>>>>>> in
> >>>>>>>>>
> >>>>>>>>> my example above, the batching argument for committed() also
> >>>>>>>>>
> >>>>>>>>> applies to
> >>>>>>>>>
> >>>>>>>>> position() since one purpose of fetching a partition's offset is
> >>>>>>>>>
> >>>>>>>>> to use
> >>>>>>>>>
> >>>>>>>>> it
> >>>>>>>>>
> >>>>>>>>> to set the position of the consumer to that offset. Since that
> >>>>>>>>>
> >>>>>>>>> might
> >>>>>>>>>
> >>>>>>>>> lead
> >>>>>>>>>
> >>>>>>>>> to a remote OffsetRequest call, I think we probably would be
> >>>>>>>>>
> >>>>>>>>> better off batching it.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Another option for naming would be position/reposition instead of
> >>>>>>>>>
> >>>>>>>>> position/seek.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> I think position/seek is better since it aligns with Java file
> >> APIs.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> I also think your suggestion about ConsumerPosition makes sense.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>>
> >>>>>>>>> Neha
> >>>>>>>>>
> >>>>>>>>> On Feb 13, 2014 9:22 PM, "Jay Kreps" <jay.kr...@gmail.com
> <mailto:
> >>>>>>>>> jay.kr...@gmail.com><mailto:
> >>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto:
> >>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
> >>>>>>>>> jay.kr...@gmail.com>><mailto:
> >>>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
> >>>>>>>>> jay.kr...@gmail.com><mailto:jay.kreps@gmail
> >>>>>>>>> .com>>>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> Hey Neha,
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> I actually wasn't proposing the name TopicOffsetPosition, that
> >>>>>>>>>
> >>>>>>>>> was
> >>>>>>>>>
> >>>>>>>>> just a
> >>>>>>>>>
> >>>>>>>>> typo. I meant TopicPartitionOffset, and I was just referencing
> >>>>>>>>>
> >>>>>>>>> what
> >>>>>>>>>
> >>>>>>>>> was
> >>>>>>>>>
> >>>>>>>>> in
> >>>>>>>>>
> >>>>>>>>> the javadoc. So to restate my proposal without the typo, using
> >>>>>>>>>
> >>>>>>>>> just
> >>>>>>>>>
> >>>>>>>>> the
> >>>>>>>>>
> >>>>>>>>> existing classes (that naming is a separate question):
> >>>>>>>>>
> >>>>>>>>> long position(TopicPartition tp)
> >>>>>>>>>
> >>>>>>>>> void seek(TopicPartitionOffset p)
> >>>>>>>>>
> >>>>>>>>> long committed(TopicPartition tp)
> >>>>>>>>>
> >>>>>>>>> void commit(TopicPartitionOffset...);
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> So I may be unclear on committed() (AKA lastCommittedOffset). Is
> >>>>>>>>>
> >>>>>>>>> it returning the in-memory value from the last commit by this
> >>>>>>>>>
> >>>>>>>>> consumer,
> >>>>>>>>>
> >>>>>>>>> or
> >>>>>>>>>
> >>>>>>>>> is
> >>>>>>>>>
> >>>>>>>>> it doing a remote fetch, or both? I think you are saying both,
> i.e.
> >>>>>>>>>
> >>>>>>>>> if
> >>>>>>>>>
> >>>>>>>>> you
> >>>>>>>>>
> >>>>>>>>> have committed on a partition it returns you that value but if
> >>>>>>>>>
> >>>>>>>>> you
> >>>>>>>>>
> >>>>>>>>> haven't
> >>>>>>>>>
> >>>>>>>>> it does a remote lookup?
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> The other argument for making committed batched is that commit()
> >>>>>>>>>
> >>>>>>>>> is batched, so there is symmetry.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> position() and seek() are always in memory changes (I assume) so
> >>>>>>>>>
> >>>>>>>>> there
> >>>>>>>>>
> >>>>>>>>> is
> >>>>>>>>>
> >>>>>>>>> no need to batch them.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> So taking all that into account what if we revise it to
> >>>>>>>>>
> >>>>>>>>> long position(TopicPartition tp)
> >>>>>>>>>
> >>>>>>>>> void seek(TopicPartitionOffset p)
> >>>>>>>>>
> >>>>>>>>> Map<TopicPartition, Long> committed(TopicPartition tp);
> >>>>>>>>>
> >>>>>>>>> void commit(TopicPartitionOffset...);
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> This is not symmetric between position/seek and commit/committed
> >>>>>>>>>
> >>>>>>>>> but
> >>>>>>>>>
> >>>>>>>>> it
> >>>>>>>>>
> >>>>>>>>> is
> >>>>>>>>>
> >>>>>>>>> convenient. Another option for naming would be
> >>>>>>>>>
> >>>>>>>>> position/reposition
> >>>>>>>>>
> >>>>>>>>> instead
> >>>>>>>>>
> >>>>>>>>> of position/seek.
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> With respect to the name TopicPartitionOffset, what I was trying
> >>>>>>>>>
> >>>>>>>>> to
> >>>>>>>>>
> >>>>>>>>> say
> >>>>>>>>>
> >>>>>>>>> is
> >>>>>>>>>
> >>>>>>>>> that I recommend we change that to something shorter. I think
> >>>>>>>>>
> >>>>>>>>> TopicPosition
> >>>>>>>>>
> >>>>>>>>> or ConsumerPosition might be better. Position does not refer to
> >>>>>>>>>
> >>>>>>>>> the variables in the object, it refers to the meaning of the
> >>>>>>>>>
> >>>>>>>>> object--it represents a position within a topic. The offset
> >>>>>>>>>
> >>>>>>>>> field in that object
> >>>>>>>>>
> >>>>>>>>> is
> >>>>>>>>>
> >>>>>>>>> still called the offset. TopicOffset, PartitionOffset, or
> >>>>>>>>>
> >>>>>>>>> ConsumerOffset
> >>>>>>>>>
> >>>>>>>>> would all be workable too. Basically I am just objecting to
> >>>>>>>>>
> >>>>>>>>> concatenating
> >>>>>>>>>
> >>>>>>>>> three nouns together. :-)
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> -Jay
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede <
> >>>>>>>>>
> >>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:
> >>>>>>>>> neha.narkh...@gmail.com><mailto:
> >>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>><mailto:
> >>>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:
> >>>>>>>>> neha.narkh...@gmail.com>>
> >>>>>>>>>
> >>>>>>>>> wrote:
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> 2. It returns a list of results. But how can you use the list?
> >>>>>>>>>
> >>>>>>>>> The
> >>>>>>>>>
> >>>>>>>>> only
> >>>>>>>>>
> >>>>>>>>> way
> >>>>>>>>>
> >>>>>>>>> to use the list is to make a map of tp=>offset and then look
> >>>>>>>>>
> >>>>>>>>> up
> >>>>>>>>>
> >>>>>>>>> results
> >>>>>>>>>
> >>>>>>>>> in
> >>>>>>>>>
> >>>>>>>>> this map (or do a for loop over the list for the partition you
> >>>>>>>>>
> >>>>>>>>> want). I
> >>>>>>>>>
> >>>>>>>>> recommend that if this is an in-memory check we just do one at
> >>>>>>>>>
> >>>>>>>>> a
> >>>>>>>>>
> >>>>>>>>> time.
> >>>>>>>>>
> >>>>>>>>> E.g.
> >>>>>>>>>
> >>>>>>>>> long committedPosition(
> >>>>>>>>>
> >>>>>>>>> TopicPosition).
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> This was discussed in the previous emails. There is a choic
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> Robert Withers
> >>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto:
> >>>>>>>>> robert.with...@dish.com><mailto:
> >>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com>>
> >>>>>>>>> c: 303.919.5856
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> Robert Withers
> >>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto:
> >>>>>>>>> robert.with...@dish.com>
> >>>>>>>>> c: 303.919.5856
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> -- Guozhang
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> --
> >>>>>>>>> Robert Withers
> >>>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com>
> >>>>>>>>> c: 303.919.5856
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>
> >>>>>>>
> >>>>>>
> >>>>>
> >>>>
> >>
> >>
>
>
>

Reply via email to