Neha, what does the use of the RebalanceBeginCallback and RebalanceEndCallback 
look like?

thanks,
Rob

On Feb 25, 2014, at 3:51 PM, Neha Narkhede <neha.narkh...@gmail.com> wrote:

> How do you know n? The whole point is that you need to be able to fetch the
> end offset. You can't a priori decide you will load 1m messages without
> knowing what is there.
> 
> Hmm. I think what you are pointing out is that in the new consumer API, we
> don't have a way to issue the equivalent of the existing getOffsetsBefore()
> API. Agree that is a flaw that we should fix.
> 
> Will update the docs/wiki with a few use cases that I've collected so far
> and see if the API covers those.
> 
> I would prefer PartitionsAssigned and PartitionsRevoked as that seems
> clearer to me
> 
> Well the RebalanceBeginCallback interface will have onPartitionsAssigned()
> as the callback. Similarly, the RebalanceEndCallback interface will have
> onPartitionsRevoked() as the callback. Makes sense?
> 
> Thanks,
> Neha
> 
> 
> On Tue, Feb 25, 2014 at 2:38 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
> 
>> 1. I would prefer PartitionsAssigned and PartitionsRevoked as that seems
>> clearer to me.
>> 
>> -Jay
>> 
>> 
>> On Tue, Feb 25, 2014 at 10:19 AM, Neha Narkhede <neha.narkh...@gmail.com
>>> wrote:
>> 
>>> Thanks for the reviews so far! There are a few outstanding questions -
>>> 
>>> 1.  It will be good to make the rebalance callbacks forward compatible
>> with
>>> Java 8 capabilities. We can change it to PartitionsAssignedCallback
>>> and PartitionsRevokedCallback or RebalanceBeginCallback and
>>> RebalanceEndCallback?
>>> 
>>> If there are no objections, I will change it to RebalanceBeginCallback
>> and
>>> RebalanceEndCallback.
>>> 
>>> 2.  The return type for committed() is List<TopicPartitionOffset>. There
>>> was a suggestion to change it to either be Map<TopicPartition,Long> or
>>> Map<TopicPartition, TopicPartitionOffset>
>>> 
>>> Do people have feedback on this suggestion?
>>> 
>>> 
>>> On Tue, Feb 25, 2014 at 9:56 AM, Neha Narkhede <neha.narkh...@gmail.com
>>>> wrote:
>>> 
>>>> Robert,
>>>> 
>>>> Are you saying it is possible to get events from the high-level
>>> consumerregarding various state machine changes?  For instance, can we
>> get a
>>>> notification when a rebalance starts and ends, when a partition is
>>>> assigned/unassigned, when an offset is committed on a partition, when a
>>>> leader changes and so on?  I call this OOB traffic, since they are not
>>> the
>>>> core messages streaming, but side-band events, yet they are still
>>>> potentially useful to consumers.
>>>> 
>>>> In the current proposal, you get notified when the state machine
>> changes
>>>> i.e. before and after a rebalance is triggered. Look at
>>>> ConsumerRebalanceCallback<
>>> 
>> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html
>>>> 
>>>> .Leader changes do not count as state machine changes for consumer
>>>> rebalance purposes.
>>>> 
>>>> Thanks,
>>>> Neha
>>>> 
>>>> 
>>>> On Tue, Feb 25, 2014 at 9:54 AM, Neha Narkhede <
>> neha.narkh...@gmail.com
>>>> wrote:
>>>> 
>>>>> Jay/Robert -
>>>>> 
>>>>> 
>>>>> I think what Robert is saying is that we need to think through the
>>> offset
>>>>> API to enable "batch processing" of topic data. Think of a process
>> that
>>>>> periodically kicks off to compute a data summary or do a data load or
>>>>> something like that. I think what we need to support this is an api to
>>>>> fetch the last offset from the server for a partition. Something like
>>>>>   long lastOffset(TopicPartition tp)
>>>>> and for symmetry
>>>>>   long firstOffset(TopicPartition tp)
>>>>> 
>>>>> Likely this would have to be batched.
>>>>> 
>>>>> A fixed range of data load can be done using the existing APIs as
>>>>> follows. This assumes you know the endOffset which can be
>> currentOffset
>>> + n
>>>>> (number of messages in the load)
>>>>> 
>>>>> long startOffset = consumer.position(partition);
>>>>> long endOffset = startOffset + n;
>>>>> while(consumer.position(partition) <= endOffset) {
>>>>>     List<ConsumerRecord> messages = consumer.poll(timeout,
>>>>> TimeUnit.MILLISECONDS);
>>>>>     process(messages, endOffset);          // processes messages
>> until
>>>>> endOffset
>>>>> }
>>>>> 
>>>>> Does that make sense?
>>>>> 
>>>>> 
>>>>> On Tue, Feb 25, 2014 at 9:49 AM, Neha Narkhede <
>> neha.narkh...@gmail.com
>>>> wrote:
>>>>> 
>>>>>> Thanks for the review, Jun. Here are some comments -
>>>>>> 
>>>>>> 
>>>>>> 1. The using of ellipsis: This may make passing a list of items from
>> a
>>>>>> collection to the api a bit harder. Suppose that you have a list of
>>>>>> topics
>>>>>> stored in
>>>>>> 
>>>>>> ArrayList<String> topics;
>>>>>> 
>>>>>> If you want subscribe to all topics in one call, you will have to do:
>>>>>> 
>>>>>> String[] topicArray = new String[topics.size()];
>>>>>> consumer.subscribe(topics.
>>>>>> toArray(topicArray));
>>>>>> 
>>>>>> A similar argument can be made for arguably the more common use case
>> of
>>>>>> subscribing to a single topic as well. In these cases, user is
>> required
>>>>>> to write more
>>>>>> code to create a single item collection and pass it in. Since
>>>>>> subscription is extremely lightweight
>>>>>> invoking it multiple times also seems like a workable solution, no?
>>>>>> 
>>>>>> 2. It would be good to document that the following apis are mutually
>>>>>> exclusive. Also, if the partition level subscription is specified,
>>> there
>>>>>> is
>>>>>> no group management. Finally, unsubscribe() can only be used to
>> cancel
>>>>>> subscriptions with the same pattern. For example, you can't
>> unsubscribe
>>>>>> at
>>>>>> the partition level if the subscription is done at the topic level.
>>>>>> 
>>>>>> *subscribe*(java.lang.String... topics)
>>>>>> *subscribe*(java.lang.String topic, int... partitions)
>>>>>> 
>>>>>> Makes sense. Made the suggested improvements to the docs<
>>> 
>> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29
>>>> 
>>>>>> 
>>>>>> 
>>>>>> 3.commit(): The following comment in the doc should probably say
>>> "commit
>>>>>> offsets for partitions assigned to this consumer".
>>>>>> 
>>>>>> If no partitions are specified, commits offsets for the subscribed
>>> list
>>>>>> of
>>>>>> topics and partitions to Kafka.
>>>>>> 
>>>>>> Could you give more context on this suggestion? Here is the entire
>> doc
>>> -
>>>>>> 
>>>>>> Synchronously commits the specified offsets for the specified list of
>>>>>> topics and partitions to *Kafka*. If no partitions are specified,
>>>>>> commits offsets for the subscribed list of topics and partitions.
>>>>>> 
>>>>>> The hope is to convey that if no partitions are specified, offsets
>> will
>>>>>> be committed for the subscribed list of partitions. One improvement
>>> could
>>>>>> be to
>>>>>> explicitly state that the offsets returned on the last poll will be
>>>>>> committed. I updated this to -
>>>>>> 
>>>>>> Synchronously commits the specified offsets for the specified list of
>>>>>> topics and partitions to *Kafka*. If no offsets are specified,
>> commits
>>>>>> offsets returned on the last {@link #poll(long, TimeUnit) poll()} for
>>>>>> the subscribed list of topics and partitions.
>>>>>> 
>>>>>> 4. There is inconsistency in specifying partitions. Sometimes we use
>>>>>> TopicPartition and some other times we use String and int (see
>>>>>> examples below).
>>>>>> 
>>>>>> void onPartitionsAssigned(Consumer consumer,
>>>>>> TopicPartition...partitions)
>>>>>> 
>>>>>> public void *subscribe*(java.lang.String topic, int... partitions)
>>>>>> 
>>>>>> Yes, this was discussed previously. I think generally the consensus
>>>>>> seems to be to use the higher level
>>>>>> classes everywhere. Made those changes.
>>>>>> 
>>>>>> What's the use case of position()? Isn't that just the nextOffset()
>> on
>>>>>> the
>>>>>> last message returned from poll()?
>>>>>> 
>>>>>> Yes, except in the case where a rebalance is triggered and poll() is
>>> not
>>>>>> yet invoked. Here, you would use position() to get the new fetch
>>> position
>>>>>> for the specific partition. Even if this is not a common use case,
>> IMO
>>> it
>>>>>> is much easier to use position() to get the fetch offset than
>> invoking
>>>>>> nextOffset() on the last message. This also keeps the APIs symmetric,
>>> which
>>>>>> is nice.
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert <
>>>>>> robert.with...@dish.com> wrote:
>>>>>> 
>>>>>>> That's wonderful.  Thanks for kafka.
>>>>>>> 
>>>>>>> Rob
>>>>>>> 
>>>>>>> On Feb 24, 2014, at 9:58 AM, Guozhang Wang <wangg...@gmail.com
>>> <mailto:
>>>>>>> wangg...@gmail.com>> wrote:
>>>>>>> 
>>>>>>> Hi Robert,
>>>>>>> 
>>>>>>> Yes, you can check out the callback functions in the new API
>>>>>>> 
>>>>>>> onPartitionDesigned
>>>>>>> onPartitionAssigned
>>>>>>> 
>>>>>>> and see if they meet your needs.
>>>>>>> 
>>>>>>> Guozhang
>>>>>>> 
>>>>>>> 
>>>>>>> On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert <
>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote:
>>>>>>> 
>>>>>>> Jun,
>>>>>>> 
>>>>>>> Are you saying it is possible to get events from the high-level
>>> consumer
>>>>>>> regarding various state machine changes?  For instance, can we get a
>>>>>>> notification when a rebalance starts and ends, when a partition is
>>>>>>> assigned/unassigned, when an offset is committed on a partition,
>> when
>>> a
>>>>>>> leader changes and so on?  I call this OOB traffic, since they are
>> not
>>>>>>> the
>>>>>>> core messages streaming, but side-band events, yet they are still
>>>>>>> potentially useful to consumers.
>>>>>>> 
>>>>>>> Thank you,
>>>>>>> Robert
>>>>>>> 
>>>>>>> 
>>>>>>> Robert Withers
>>>>>>> Staff Analyst/Developer
>>>>>>> o: (720) 514-8963
>>>>>>> c:  (571) 262-1873
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> -----Original Message-----
>>>>>>> From: Jun Rao [mailto:jun...@gmail.com]
>>>>>>> Sent: Sunday, February 23, 2014 4:19 PM
>>>>>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org>
>>>>>>> Subject: Re: New Consumer API discussion
>>>>>>> 
>>>>>>> Robert,
>>>>>>> 
>>>>>>> For the push orient api, you can potentially implement your own
>>>>>>> MessageHandler with those methods. In the main loop of our new
>>> consumer
>>>>>>> api, you can just call those methods based on the events you get.
>>>>>>> 
>>>>>>> Also, we already have an api to get the first and the last offset
>> of a
>>>>>>> partition (getOffsetBefore).
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> 
>>>>>>> Jun
>>>>>>> 
>>>>>>> 
>>>>>>> On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert
>>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote:
>>>>>>> 
>>>>>>> This is a good idea, too.  I would modify it to include stream
>>>>>>> marking, then you can have:
>>>>>>> 
>>>>>>> long end = consumer.lastOffset(tp);
>>>>>>> consumer.setMark(end);
>>>>>>> while(consumer.beforeMark()) {
>>>>>>>  process(consumer.pollToMark());
>>>>>>> }
>>>>>>> 
>>>>>>> or
>>>>>>> 
>>>>>>> long end = consumer.lastOffset(tp);
>>>>>>> consumer.setMark(end);
>>>>>>> for(Object msg : consumer.iteratorToMark()) {
>>>>>>>  process(msg);
>>>>>>> }
>>>>>>> 
>>>>>>> I actually have 4 suggestions, then:
>>>>>>> 
>>>>>>> *   pull: stream marking
>>>>>>> *   pull: finite streams, bound by time range (up-to-now, yesterday)
>>> or
>>>>>>> offset
>>>>>>> *   pull: async api
>>>>>>> *   push: KafkaMessageSource, for a push model, with msg and OOB
>>> events.
>>>>>>> Build one in either individual or chunk mode and have a listener for
>>>>>>> each msg or a listener for a chunk of msgs.  Make it composable and
>>>>>>> policy driven (chunked, range, commitOffsets policy, retry policy,
>>>>>>> transactional)
>>>>>>> 
>>>>>>> Thank you,
>>>>>>> Robert
>>>>>>> 
>>>>>>> On Feb 22, 2014, at 11:21 AM, Jay Kreps <jay.kr...@gmail.com
>> <mailto:
>>>>>>> jay.kr...@gmail.com><mailto:
>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>>> wrote:
>>>>>>> 
>>>>>>> I think what Robert is saying is that we need to think through the
>>>>>>> offset API to enable "batch processing" of topic data. Think of a
>>>>>>> process that periodically kicks off to compute a data summary or do
>> a
>>>>>>> data load or something like that. I think what we need to support
>> this
>>>>>>> is an api to fetch the last offset from the server for a partition.
>>>>>>> Something like
>>>>>>> long lastOffset(TopicPartition tp)
>>>>>>> and for symmetry
>>>>>>> long firstOffset(TopicPartition tp)
>>>>>>> 
>>>>>>> Likely this would have to be batched. Essentially we should add this
>>>>>>> use case to our set of code examples to write and think through.
>>>>>>> 
>>>>>>> The usage would be something like
>>>>>>> 
>>>>>>> long end = consumer.lastOffset(tp);
>>>>>>> while(consumer.position < end)
>>>>>>>  process(consumer.poll());
>>>>>>> 
>>>>>>> -Jay
>>>>>>> 
>>>>>>> 
>>>>>>> On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert
>>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com>
>>>>>>> <mailto:robert.with...@dish.com>>wrote:
>>>>>>> 
>>>>>>> Jun,
>>>>>>> 
>>>>>>> I was originally thinking a non-blocking read from a distributed
>>>>>>> stream should distinguish between "no local messages, but a fetch is
>>>>>>> occurring"
>>>>>>> versus "you have drained the stream".  The reason this may be
>> valuable
>>>>>>> to me is so I can write consumers that read all known traffic then
>>>>>>> terminate.
>>>>>>> You caused me to reconsider and I think I am conflating 2 things.
>> One
>>>>>>> is a sync/async api while the other is whether to have an infinite
>> or
>>>>>>> finite stream.  Is it possible to build a finite KafkaStream on a
>>>>>>> range of messages?
>>>>>>> 
>>>>>>> Perhaps a Simple Consumer would do just fine and then I could start
>>>>>>> off getting the writeOffset from zookeeper and tell it to read a
>>>>>>> specified range per partition.  I've done this and forked a simple
>>>>>>> consumer runnable for each partition, for one of our analyzers.  The
>>>>>>> great thing about the high-level consumer is that rebalance, so I
>> can
>>>>>>> fork however many stream readers I want and you just figure it out
>> for
>>>>>>> me.  In that way you offer us the control over the resource
>>>>>>> consumption within a pull model.  This is best to regulate message
>>>>>>> pressure, they say.
>>>>>>> 
>>>>>>> Combining that high-level rebalance ability with a ranged partition
>>>>>>> drain could be really nice...build the stream with an ending
>> position
>>>>>>> and it is a finite stream, but retain the high-level rebalance.
>> With
>>>>>>> a finite stream, you would know the difference of the 2 async
>>>>>>> scenarios: fetch-in-progress versus end-of-stream.  With an infinite
>>>>>>> stream, you never get end-of-stream.
>>>>>>> 
>>>>>>> Aside from a high-level consumer over a finite range within each
>>>>>>> partition, the other feature I can think of is more complicated.  A
>>>>>>> high-level consumer has state machine changes that the client cannot
>>>>>>> access, to my knowledge.  Our use of kafka has us invoke a message
>>>>>>> handler with each message we consumer from the KafkaStream, so we
>>>>>>> convert a pull-model to a push-model.  Including the idea of
>> receiving
>>>>>>> notifications from state machine changes, what would be really nice
>> is
>>>>>>> to have a KafkaMessageSource, that is an eventful push model.  If it
>>>>>>> were thread-safe, then we could register listeners for various
>> events:
>>>>>>> 
>>>>>>> *   opening-stream
>>>>>>> *   closing-stream
>>>>>>> *   message-arrived
>>>>>>> *   end-of-stream/no-more-messages-in-partition (for finite streams)
>>>>>>> *   rebalance started
>>>>>>> *   partition assigned
>>>>>>> *   partition unassigned
>>>>>>> *   rebalance finished
>>>>>>> *   partition-offset-committed
>>>>>>> 
>>>>>>> Perhaps that is just our use, but instead of a pull-oriented
>>>>>>> KafkaStream, is there any sense in your providing a push-oriented
>>>>>>> KafkaMessageSource publishing OOB messages?
>>>>>>> 
>>>>>>> thank you,
>>>>>>> Robert
>>>>>>> 
>>>>>>> On Feb 21, 2014, at 5:59 PM, Jun Rao <jun...@gmail.com<mailto:
>>>>>>> jun...@gmail.com><mailto:
>>>>>>> jun...@gmail.com<mailto:jun...@gmail.com>><mailto:
>>>>>>> jun...@gmail.com<mailto:jun...@gmail.com><mailto:jun...@gmail.com
>>>>> 
>>>>>>> wrote:
>>>>>>> 
>>>>>>> Robert,
>>>>>>> 
>>>>>>> Could you explain why you want to distinguish btw
>>>>>>> FetchingInProgressException and NoMessagePendingException? The
>>>>>>> nextMsgs() method that you want is exactly what poll() does.
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> 
>>>>>>> Jun
>>>>>>> 
>>>>>>> 
>>>>>>> On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert
>>>>>>> <robert.with...@dish.com<mailto:robert.with...@dish.com> <mailto:
>>>>>>> robert.with...@dish.com>
>>>>>>> <mailto:robert.with...@dish.com>>wrote:
>>>>>>> 
>>>>>>> I am not clear on why the consumer stream should be positionable,
>>>>>>> especially if it is limited to the in-memory fetched messages.
>> Could
>>>>>>> someone explain to me, please?  I really like the idea of committing
>>>>>>> the offset specifically on those partitions with changed read
>> offsets,
>>>>>>> only.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 2 items I would like to see added to the KafkaStream are:
>>>>>>> 
>>>>>>> *         a non-blocking next(), throws several exceptions
>>>>>>> (FetchingInProgressException and a NoMessagePendingException or
>>>>>>> something) to differentiate between fetching or no messages left.
>>>>>>> 
>>>>>>> *         A nextMsgs() method which returns all locally available
>>>>>>> messages
>>>>>>> and kicks off a fetch for the next chunk.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> If you are trying to add transactional features, then formally
>> define
>>>>>>> a DTP capability and pull in other server frameworks to share the
>>>>>>> implementation.  Should it be XA/Open?  How about a new peer2peer
>> DTP
>>>>>>> protocol?
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> Thank you,
>>>>>>> 
>>>>>>> Robert
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> Robert Withers
>>>>>>> 
>>>>>>> Staff Analyst/Developer
>>>>>>> 
>>>>>>> o: (720) 514-8963
>>>>>>> 
>>>>>>> c:  (571) 262-1873
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> -----Original Message-----
>>>>>>> From: Jay Kreps [mailto:jay.kr...@gmail.com]
>>>>>>> Sent: Sunday, February 16, 2014 10:13 AM
>>>>>>> To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:
>>>>>>> users@kafka.apache.org><mailto:
>>>>>>> users@kafka.apache.org<mailto:users@kafka.apache.org>>
>>>>>>> Subject: Re: New Consumer API discussion
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> +1 I think those are good. It is a little weird that changing the
>>>>>>> +fetch
>>>>>>> 
>>>>>>> point is not batched but changing the commit point is, but I suppose
>>>>>>> there is no helping that.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> -Jay
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede
>>>>>>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com> <mailto:
>>>>>>> neha.narkh...@gmail.com>
>>>>>>> <mailto:neha.narkh...@gmail.com>
>>>>>>> <mailto:neha.narkh...@gmail.com>>wrote:
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> Jay,
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> That makes sense. position/seek deal with changing the consumers
>>>>>>> 
>>>>>>> in-memory data, so there is no remote rpc there. For some reason, I
>>>>>>> 
>>>>>>> got committed and seek mixed up in my head at that time :)
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> So we still end up with
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> long position(TopicPartition tp)
>>>>>>> 
>>>>>>> void seek(TopicPartitionOffset p)
>>>>>>> 
>>>>>>> Map<TopicPartition, Long> committed(TopicPartition tp);
>>>>>>> 
>>>>>>> void commit(TopicPartitionOffset...);
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> 
>>>>>>> Neha
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On Friday, February 14, 2014, Jay Kreps <jay.kr...@gmail.com
>> <mailto:
>>>>>>> jay.kr...@gmail.com><mailto:
>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto:
>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
>>>>>>> jay.kr...@gmail.com>><mailto:
>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
>>>>>>> jay.kr...@gmail.com><mailto:jay.kreps@gmail
>>>>>>> .com>>>
>>>>>>> wrote:
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> Oh, interesting. So I am assuming the following implementation:
>>>>>>> 
>>>>>>> 1. We have an in-memory fetch position which controls the next fetch
>>>>>>> 
>>>>>>> offset.
>>>>>>> 
>>>>>>> 2. Changing this has no effect until you poll again at which point
>>>>>>> 
>>>>>>> your fetch request will be from the newly specified offset 3. We
>>>>>>> 
>>>>>>> then have an in-memory but also remotely stored committed offset.
>>>>>>> 
>>>>>>> 4. Calling commit has the effect of saving the fetch position as
>>>>>>> 
>>>>>>> both the in memory committed position and in the remote store 5.
>>>>>>> 
>>>>>>> Auto-commit is the same as periodically calling commit on all
>>>>>>> 
>>>>>>> positions.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> So batching on commit as well as getting the committed position
>>>>>>> 
>>>>>>> makes sense, but batching the fetch position wouldn't, right? I
>>>>>>> 
>>>>>>> think you are actually thinking of a different approach.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> -Jay
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede
>>>>>>> 
>>>>>>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:
>>>>>>> neha.narkh...@gmail.com><mailto:
>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>>
>>>>>>> 
>>>>>>> <javascript:;>
>>>>>>> 
>>>>>>> wrote:
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> I think you are saying both, i.e. if you have committed on a
>>>>>>> 
>>>>>>> partition it returns you that value but if you
>>>>>>> 
>>>>>>> haven't
>>>>>>> 
>>>>>>> it does a remote lookup?
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> Correct.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> The other argument for making committed batched is that commit()
>>>>>>> 
>>>>>>> is batched, so there is symmetry.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> position() and seek() are always in memory changes (I assume) so
>>>>>>> 
>>>>>>> there
>>>>>>> 
>>>>>>> is
>>>>>>> 
>>>>>>> no need to batch them.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> I'm not as sure as you are about that assumption being true.
>>>>>>> 
>>>>>>> Basically
>>>>>>> 
>>>>>>> in
>>>>>>> 
>>>>>>> my example above, the batching argument for committed() also
>>>>>>> 
>>>>>>> applies to
>>>>>>> 
>>>>>>> position() since one purpose of fetching a partition's offset is
>>>>>>> 
>>>>>>> to use
>>>>>>> 
>>>>>>> it
>>>>>>> 
>>>>>>> to set the position of the consumer to that offset. Since that
>>>>>>> 
>>>>>>> might
>>>>>>> 
>>>>>>> lead
>>>>>>> 
>>>>>>> to a remote OffsetRequest call, I think we probably would be
>>>>>>> 
>>>>>>> better off batching it.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> Another option for naming would be position/reposition instead of
>>>>>>> 
>>>>>>> position/seek.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> I think position/seek is better since it aligns with Java file APIs.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> I also think your suggestion about ConsumerPosition makes sense.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> 
>>>>>>> Neha
>>>>>>> 
>>>>>>> On Feb 13, 2014 9:22 PM, "Jay Kreps" <jay.kr...@gmail.com<mailto:
>>>>>>> jay.kr...@gmail.com><mailto:
>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto:
>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
>>>>>>> jay.kr...@gmail.com>><mailto:
>>>>>>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
>>>>>>> jay.kr...@gmail.com><mailto:jay.kreps@gmail
>>>>>>> .com>>>
>>>>>>> wrote:
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> Hey Neha,
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> I actually wasn't proposing the name TopicOffsetPosition, that
>>>>>>> 
>>>>>>> was
>>>>>>> 
>>>>>>> just a
>>>>>>> 
>>>>>>> typo. I meant TopicPartitionOffset, and I was just referencing
>>>>>>> 
>>>>>>> what
>>>>>>> 
>>>>>>> was
>>>>>>> 
>>>>>>> in
>>>>>>> 
>>>>>>> the javadoc. So to restate my proposal without the typo, using
>>>>>>> 
>>>>>>> just
>>>>>>> 
>>>>>>> the
>>>>>>> 
>>>>>>> existing classes (that naming is a separate question):
>>>>>>> 
>>>>>>> long position(TopicPartition tp)
>>>>>>> 
>>>>>>> void seek(TopicPartitionOffset p)
>>>>>>> 
>>>>>>> long committed(TopicPartition tp)
>>>>>>> 
>>>>>>> void commit(TopicPartitionOffset...);
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> So I may be unclear on committed() (AKA lastCommittedOffset). Is
>>>>>>> 
>>>>>>> it returning the in-memory value from the last commit by this
>>>>>>> 
>>>>>>> consumer,
>>>>>>> 
>>>>>>> or
>>>>>>> 
>>>>>>> is
>>>>>>> 
>>>>>>> it doing a remote fetch, or both? I think you are saying both, i.e.
>>>>>>> 
>>>>>>> if
>>>>>>> 
>>>>>>> you
>>>>>>> 
>>>>>>> have committed on a partition it returns you that value but if
>>>>>>> 
>>>>>>> you
>>>>>>> 
>>>>>>> haven't
>>>>>>> 
>>>>>>> it does a remote lookup?
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> The other argument for making committed batched is that commit()
>>>>>>> 
>>>>>>> is batched, so there is symmetry.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> position() and seek() are always in memory changes (I assume) so
>>>>>>> 
>>>>>>> there
>>>>>>> 
>>>>>>> is
>>>>>>> 
>>>>>>> no need to batch them.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> So taking all that into account what if we revise it to
>>>>>>> 
>>>>>>> long position(TopicPartition tp)
>>>>>>> 
>>>>>>> void seek(TopicPartitionOffset p)
>>>>>>> 
>>>>>>> Map<TopicPartition, Long> committed(TopicPartition tp);
>>>>>>> 
>>>>>>> void commit(TopicPartitionOffset...);
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> This is not symmetric between position/seek and commit/committed
>>>>>>> 
>>>>>>> but
>>>>>>> 
>>>>>>> it
>>>>>>> 
>>>>>>> is
>>>>>>> 
>>>>>>> convenient. Another option for naming would be
>>>>>>> 
>>>>>>> position/reposition
>>>>>>> 
>>>>>>> instead
>>>>>>> 
>>>>>>> of position/seek.
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> With respect to the name TopicPartitionOffset, what I was trying
>>>>>>> 
>>>>>>> to
>>>>>>> 
>>>>>>> say
>>>>>>> 
>>>>>>> is
>>>>>>> 
>>>>>>> that I recommend we change that to something shorter. I think
>>>>>>> 
>>>>>>> TopicPosition
>>>>>>> 
>>>>>>> or ConsumerPosition might be better. Position does not refer to
>>>>>>> 
>>>>>>> the variables in the object, it refers to the meaning of the
>>>>>>> 
>>>>>>> object--it represents a position within a topic. The offset
>>>>>>> 
>>>>>>> field in that object
>>>>>>> 
>>>>>>> is
>>>>>>> 
>>>>>>> still called the offset. TopicOffset, PartitionOffset, or
>>>>>>> 
>>>>>>> ConsumerOffset
>>>>>>> 
>>>>>>> would all be workable too. Basically I am just objecting to
>>>>>>> 
>>>>>>> concatenating
>>>>>>> 
>>>>>>> three nouns together. :-)
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> -Jay
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede <
>>>>>>> 
>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:
>>>>>>> neha.narkh...@gmail.com><mailto:
>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>><mailto:
>>>>>>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:
>>>>>>> neha.narkh...@gmail.com>>
>>>>>>> 
>>>>>>> wrote:
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 2. It returns a list of results. But how can you use the list?
>>>>>>> 
>>>>>>> The
>>>>>>> 
>>>>>>> only
>>>>>>> 
>>>>>>> way
>>>>>>> 
>>>>>>> to use the list is to make a map of tp=>offset and then look
>>>>>>> 
>>>>>>> up
>>>>>>> 
>>>>>>> results
>>>>>>> 
>>>>>>> in
>>>>>>> 
>>>>>>> this map (or do a for loop over the list for the partition you
>>>>>>> 
>>>>>>> want). I
>>>>>>> 
>>>>>>> recommend that if this is an in-memory check we just do one at
>>>>>>> 
>>>>>>> a
>>>>>>> 
>>>>>>> time.
>>>>>>> 
>>>>>>> E.g.
>>>>>>> 
>>>>>>> long committedPosition(
>>>>>>> 
>>>>>>> TopicPosition).
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> This was discussed in the previous emails. There is a choic
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> Robert Withers
>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto:
>>>>>>> robert.with...@dish.com><mailto:
>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com>>
>>>>>>> c: 303.919.5856
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> Robert Withers
>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto:
>>>>>>> robert.with...@dish.com>
>>>>>>> c: 303.919.5856
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> -- Guozhang
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> --
>>>>>>> Robert Withers
>>>>>>> robert.with...@dish.com<mailto:robert.with...@dish.com>
>>>>>>> c: 303.919.5856
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>> 
>> 

Attachment: smime.p7s
Description: S/MIME cryptographic signature

Reply via email to