Jay/Robert -

I think what Robert is saying is that we need to think through the offset
API to enable "batch processing" of topic data. Think of a process that
periodically kicks off to compute a data summary or do a data load or
something like that. I think what we need to support this is an api to
fetch the last offset from the server for a partition. Something like
   long lastOffset(TopicPartition tp)
and for symmetry
   long firstOffset(TopicPartition tp)

Likely this would have to be batched.

A fixed range of data load can be done using the existing APIs as follows.
This assumes you know the endOffset which can be currentOffset + n (number
of messages in the load)

long startOffset = consumer.position(partition);
long endOffset = startOffset + n;
while(consumer.position(partition) <= endOffset) {
     List<ConsumerRecord> messages = consumer.poll(timeout,
TimeUnit.MILLISECONDS);
     process(messages, endOffset);          // processes messages until
endOffset
}

Does that make sense?


On Tue, Feb 25, 2014 at 9:49 AM, Neha Narkhede <neha.narkh...@gmail.com>wrote:

> Thanks for the review, Jun. Here are some comments -
>
>
> 1. The using of ellipsis: This may make passing a list of items from a
> collection to the api a bit harder. Suppose that you have a list of topics
> stored in
>
> ArrayList<String> topics;
>
> If you want subscribe to all topics in one call, you will have to do:
>
> String[] topicArray = new String[topics.size()];
> consumer.subscribe(topics.
> toArray(topicArray));
>
> A similar argument can be made for arguably the more common use case of
> subscribing to a single topic as well. In these cases, user is required to
> write more
> code to create a single item collection and pass it in. Since subscription
> is extremely lightweight
> invoking it multiple times also seems like a workable solution, no?
>
> 2. It would be good to document that the following apis are mutually
> exclusive. Also, if the partition level subscription is specified, there is
> no group management. Finally, unsubscribe() can only be used to cancel
> subscriptions with the same pattern. For example, you can't unsubscribe at
> the partition level if the subscription is done at the topic level.
>
> *subscribe*(java.lang.String... topics)
> *subscribe*(java.lang.String topic, int... partitions)
>
> Makes sense. Made the suggested improvements to the 
> docs<http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29>
>
>
> 3.commit(): The following comment in the doc should probably say "commit
> offsets for partitions assigned to this consumer".
>
>  If no partitions are specified, commits offsets for the subscribed list of
> topics and partitions to Kafka.
>
> Could you give more context on this suggestion? Here is the entire doc -
>
> Synchronously commits the specified offsets for the specified list of
> topics and partitions to *Kafka*. If no partitions are specified, commits
> offsets for the subscribed list of topics and partitions.
>
> The hope is to convey that if no partitions are specified, offsets will be
> committed for the subscribed list of partitions. One improvement could be to
> explicitly state that the offsets returned on the last poll will be
> committed. I updated this to -
>
> Synchronously commits the specified offsets for the specified list of
> topics and partitions to *Kafka*. If no offsets are specified, commits
> offsets returned on the last {@link #poll(long, TimeUnit) poll()} for the
> subscribed list of topics and partitions.
>
> 4. There is inconsistency in specifying partitions. Sometimes we use
> TopicPartition and some other times we use String and int (see
> examples below).
>
> void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions)
>
> public void *subscribe*(java.lang.String topic, int... partitions)
>
> Yes, this was discussed previously. I think generally the consensus seems
> to be to use the higher level
> classes everywhere. Made those changes.
>
> What's the use case of position()? Isn't that just the nextOffset() on the
> last message returned from poll()?
>
> Yes, except in the case where a rebalance is triggered and poll() is not
> yet invoked. Here, you would use position() to get the new fetch position
> for the specific partition. Even if this is not a common use case, IMO it
> is much easier to use position() to get the fetch offset than invoking
> nextOffset() on the last message. This also keeps the APIs symmetric, which
> is nice.
>
>
>
>
> On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert 
> <robert.with...@dish.com>wrote:
>
>> That's wonderful.  Thanks for kafka.
>>
>> Rob
>>
>> On Feb 24, 2014, at 9:58 AM, Guozhang Wang <wangg...@gmail.com<mailto:
>> wangg...@gmail.com>> wrote:
>>
>> Hi Robert,
>>
>> Yes, you can check out the callback functions in the new API
>>
>> onPartitionDesigned
>> onPartitionAssigned
>>
>> and see if they meet your needs.
>>
>> Guozhang
>>
>>
>> On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert <robert.with...@dish.com
>> <mailto:robert.with...@dish.com>>wrote:
>>
>> Jun,
>>
>> Are you saying it is possible to get events from the high-level consumer
>> regarding various state machine changes?  For instance, can we get a
>> notification when a rebalance starts and ends, when a partition is
>> assigned/unassigned, when an offset is committed on a partition, when a
>> leader changes and so on?  I call this OOB traffic, since they are not the
>> core messages streaming, but side-band events, yet they are still
>> potentially useful to consumers.
>>
>> Thank you,
>> Robert
>>
>>
>> Robert Withers
>> Staff Analyst/Developer
>> o: (720) 514-8963
>> c:  (571) 262-1873
>>
>>
>>
>> -----Original Message-----
>> From: Jun Rao [mailto:jun...@gmail.com]
>> Sent: Sunday, February 23, 2014 4:19 PM
>> To: users@kafka.apache.org<mailto:users@kafka.apache.org>
>> Subject: Re: New Consumer API discussion
>>
>> Robert,
>>
>> For the push orient api, you can potentially implement your own
>> MessageHandler with those methods. In the main loop of our new consumer
>> api, you can just call those methods based on the events you get.
>>
>> Also, we already have an api to get the first and the last offset of a
>> partition (getOffsetBefore).
>>
>> Thanks,
>>
>> Jun
>>
>>
>> On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert
>> <robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote:
>>
>> This is a good idea, too.  I would modify it to include stream
>> marking, then you can have:
>>
>> long end = consumer.lastOffset(tp);
>> consumer.setMark(end);
>> while(consumer.beforeMark()) {
>>   process(consumer.pollToMark());
>> }
>>
>> or
>>
>> long end = consumer.lastOffset(tp);
>> consumer.setMark(end);
>> for(Object msg : consumer.iteratorToMark()) {
>>   process(msg);
>> }
>>
>> I actually have 4 suggestions, then:
>>
>> *   pull: stream marking
>> *   pull: finite streams, bound by time range (up-to-now, yesterday) or
>> offset
>> *   pull: async api
>> *   push: KafkaMessageSource, for a push model, with msg and OOB events.
>> Build one in either individual or chunk mode and have a listener for
>> each msg or a listener for a chunk of msgs.  Make it composable and
>> policy driven (chunked, range, commitOffsets policy, retry policy,
>> transactional)
>>
>> Thank you,
>> Robert
>>
>> On Feb 22, 2014, at 11:21 AM, Jay Kreps <jay.kr...@gmail.com<mailto:
>> jay.kr...@gmail.com><mailto:
>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>>> wrote:
>>
>> I think what Robert is saying is that we need to think through the
>> offset API to enable "batch processing" of topic data. Think of a
>> process that periodically kicks off to compute a data summary or do a
>> data load or something like that. I think what we need to support this
>> is an api to fetch the last offset from the server for a partition.
>> Something like
>>  long lastOffset(TopicPartition tp)
>> and for symmetry
>>  long firstOffset(TopicPartition tp)
>>
>> Likely this would have to be batched. Essentially we should add this
>> use case to our set of code examples to write and think through.
>>
>> The usage would be something like
>>
>> long end = consumer.lastOffset(tp);
>> while(consumer.position < end)
>>   process(consumer.poll());
>>
>> -Jay
>>
>>
>> On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert
>> <robert.with...@dish.com<mailto:robert.with...@dish.com>
>> <mailto:robert.with...@dish.com>>wrote:
>>
>> Jun,
>>
>> I was originally thinking a non-blocking read from a distributed
>> stream should distinguish between "no local messages, but a fetch is
>> occurring"
>> versus "you have drained the stream".  The reason this may be valuable
>> to me is so I can write consumers that read all known traffic then
>> terminate.
>> You caused me to reconsider and I think I am conflating 2 things.  One
>> is a sync/async api while the other is whether to have an infinite or
>> finite stream.  Is it possible to build a finite KafkaStream on a
>> range of messages?
>>
>> Perhaps a Simple Consumer would do just fine and then I could start
>> off getting the writeOffset from zookeeper and tell it to read a
>> specified range per partition.  I've done this and forked a simple
>> consumer runnable for each partition, for one of our analyzers.  The
>> great thing about the high-level consumer is that rebalance, so I can
>> fork however many stream readers I want and you just figure it out for
>> me.  In that way you offer us the control over the resource
>> consumption within a pull model.  This is best to regulate message
>> pressure, they say.
>>
>> Combining that high-level rebalance ability with a ranged partition
>> drain could be really nice...build the stream with an ending position
>> and it is a finite stream, but retain the high-level rebalance.  With
>> a finite stream, you would know the difference of the 2 async
>> scenarios: fetch-in-progress versus end-of-stream.  With an infinite
>> stream, you never get end-of-stream.
>>
>> Aside from a high-level consumer over a finite range within each
>> partition, the other feature I can think of is more complicated.  A
>> high-level consumer has state machine changes that the client cannot
>> access, to my knowledge.  Our use of kafka has us invoke a message
>> handler with each message we consumer from the KafkaStream, so we
>> convert a pull-model to a push-model.  Including the idea of receiving
>> notifications from state machine changes, what would be really nice is
>> to have a KafkaMessageSource, that is an eventful push model.  If it
>> were thread-safe, then we could register listeners for various events:
>>
>> *   opening-stream
>> *   closing-stream
>> *   message-arrived
>> *   end-of-stream/no-more-messages-in-partition (for finite streams)
>> *   rebalance started
>> *   partition assigned
>> *   partition unassigned
>> *   rebalance finished
>> *   partition-offset-committed
>>
>> Perhaps that is just our use, but instead of a pull-oriented
>> KafkaStream, is there any sense in your providing a push-oriented
>> KafkaMessageSource publishing OOB messages?
>>
>> thank you,
>> Robert
>>
>> On Feb 21, 2014, at 5:59 PM, Jun Rao <jun...@gmail.com<mailto:
>> jun...@gmail.com><mailto:
>> jun...@gmail.com<mailto:jun...@gmail.com>><mailto:
>> jun...@gmail.com<mailto:jun...@gmail.com><mailto:jun...@gmail.com>>>
>> wrote:
>>
>> Robert,
>>
>> Could you explain why you want to distinguish btw
>> FetchingInProgressException and NoMessagePendingException? The
>> nextMsgs() method that you want is exactly what poll() does.
>>
>> Thanks,
>>
>> Jun
>>
>>
>> On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert
>> <robert.with...@dish.com<mailto:robert.with...@dish.com> <mailto:
>> robert.with...@dish.com>
>> <mailto:robert.with...@dish.com>>wrote:
>>
>> I am not clear on why the consumer stream should be positionable,
>> especially if it is limited to the in-memory fetched messages.  Could
>> someone explain to me, please?  I really like the idea of committing
>> the offset specifically on those partitions with changed read offsets,
>> only.
>>
>>
>>
>> 2 items I would like to see added to the KafkaStream are:
>>
>> *         a non-blocking next(), throws several exceptions
>> (FetchingInProgressException and a NoMessagePendingException or
>> something) to differentiate between fetching or no messages left.
>>
>> *         A nextMsgs() method which returns all locally available
>> messages
>> and kicks off a fetch for the next chunk.
>>
>>
>>
>> If you are trying to add transactional features, then formally define
>> a DTP capability and pull in other server frameworks to share the
>> implementation.  Should it be XA/Open?  How about a new peer2peer DTP
>> protocol?
>>
>>
>>
>> Thank you,
>>
>> Robert
>>
>>
>>
>> Robert Withers
>>
>> Staff Analyst/Developer
>>
>> o: (720) 514-8963
>>
>> c:  (571) 262-1873
>>
>>
>>
>> -----Original Message-----
>> From: Jay Kreps [mailto:jay.kr...@gmail.com]
>> Sent: Sunday, February 16, 2014 10:13 AM
>> To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:
>> users@kafka.apache.org><mailto:
>> users@kafka.apache.org<mailto:users@kafka.apache.org>>
>> Subject: Re: New Consumer API discussion
>>
>>
>>
>> +1 I think those are good. It is a little weird that changing the
>> +fetch
>>
>> point is not batched but changing the commit point is, but I suppose
>> there is no helping that.
>>
>>
>>
>> -Jay
>>
>>
>>
>>
>>
>> On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede
>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com> <mailto:
>> neha.narkh...@gmail.com>
>> <mailto:neha.narkh...@gmail.com>
>> <mailto:neha.narkh...@gmail.com>>wrote:
>>
>>
>>
>> Jay,
>>
>>
>>
>> That makes sense. position/seek deal with changing the consumers
>>
>> in-memory data, so there is no remote rpc there. For some reason, I
>>
>> got committed and seek mixed up in my head at that time :)
>>
>>
>>
>> So we still end up with
>>
>>
>>
>> long position(TopicPartition tp)
>>
>> void seek(TopicPartitionOffset p)
>>
>> Map<TopicPartition, Long> committed(TopicPartition tp);
>>
>> void commit(TopicPartitionOffset...);
>>
>>
>>
>> Thanks,
>>
>> Neha
>>
>>
>>
>> On Friday, February 14, 2014, Jay Kreps <jay.kr...@gmail.com<mailto:
>> jay.kr...@gmail.com><mailto:
>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto:
>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
>> jay.kr...@gmail.com>><mailto:
>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
>> jay.kr...@gmail.com><mailto:jay.kreps@gmail
>> .com>>>
>> wrote:
>>
>>
>>
>> Oh, interesting. So I am assuming the following implementation:
>>
>> 1. We have an in-memory fetch position which controls the next fetch
>>
>> offset.
>>
>> 2. Changing this has no effect until you poll again at which point
>>
>> your fetch request will be from the newly specified offset 3. We
>>
>> then have an in-memory but also remotely stored committed offset.
>>
>> 4. Calling commit has the effect of saving the fetch position as
>>
>> both the in memory committed position and in the remote store 5.
>>
>> Auto-commit is the same as periodically calling commit on all
>>
>> positions.
>>
>>
>>
>> So batching on commit as well as getting the committed position
>>
>> makes sense, but batching the fetch position wouldn't, right? I
>>
>> think you are actually thinking of a different approach.
>>
>>
>>
>> -Jay
>>
>>
>>
>>
>>
>> On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede
>>
>> <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:
>> neha.narkh...@gmail.com><mailto:
>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>>
>>
>> <javascript:;>
>>
>> wrote:
>>
>>
>>
>> I think you are saying both, i.e. if you have committed on a
>>
>> partition it returns you that value but if you
>>
>> haven't
>>
>> it does a remote lookup?
>>
>>
>>
>> Correct.
>>
>>
>>
>> The other argument for making committed batched is that commit()
>>
>> is batched, so there is symmetry.
>>
>>
>>
>> position() and seek() are always in memory changes (I assume) so
>>
>> there
>>
>> is
>>
>> no need to batch them.
>>
>>
>>
>> I'm not as sure as you are about that assumption being true.
>>
>> Basically
>>
>> in
>>
>> my example above, the batching argument for committed() also
>>
>> applies to
>>
>> position() since one purpose of fetching a partition's offset is
>>
>> to use
>>
>> it
>>
>> to set the position of the consumer to that offset. Since that
>>
>> might
>>
>> lead
>>
>> to a remote OffsetRequest call, I think we probably would be
>>
>> better off batching it.
>>
>>
>>
>> Another option for naming would be position/reposition instead of
>>
>> position/seek.
>>
>>
>>
>> I think position/seek is better since it aligns with Java file APIs.
>>
>>
>>
>> I also think your suggestion about ConsumerPosition makes sense.
>>
>>
>>
>> Thanks,
>>
>> Neha
>>
>> On Feb 13, 2014 9:22 PM, "Jay Kreps" <jay.kr...@gmail.com<mailto:
>> jay.kr...@gmail.com><mailto:
>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto:
>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
>> jay.kr...@gmail.com>><mailto:
>> jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
>> jay.kr...@gmail.com><mailto:jay.kreps@gmail
>> .com>>>
>> wrote:
>>
>>
>>
>> Hey Neha,
>>
>>
>>
>> I actually wasn't proposing the name TopicOffsetPosition, that
>>
>> was
>>
>> just a
>>
>> typo. I meant TopicPartitionOffset, and I was just referencing
>>
>> what
>>
>> was
>>
>> in
>>
>> the javadoc. So to restate my proposal without the typo, using
>>
>> just
>>
>> the
>>
>> existing classes (that naming is a separate question):
>>
>> long position(TopicPartition tp)
>>
>> void seek(TopicPartitionOffset p)
>>
>> long committed(TopicPartition tp)
>>
>> void commit(TopicPartitionOffset...);
>>
>>
>>
>> So I may be unclear on committed() (AKA lastCommittedOffset). Is
>>
>> it returning the in-memory value from the last commit by this
>>
>> consumer,
>>
>> or
>>
>> is
>>
>> it doing a remote fetch, or both? I think you are saying both, i.e.
>>
>> if
>>
>> you
>>
>> have committed on a partition it returns you that value but if
>>
>> you
>>
>> haven't
>>
>> it does a remote lookup?
>>
>>
>>
>> The other argument for making committed batched is that commit()
>>
>> is batched, so there is symmetry.
>>
>>
>>
>> position() and seek() are always in memory changes (I assume) so
>>
>> there
>>
>> is
>>
>> no need to batch them.
>>
>>
>>
>> So taking all that into account what if we revise it to
>>
>> long position(TopicPartition tp)
>>
>> void seek(TopicPartitionOffset p)
>>
>> Map<TopicPartition, Long> committed(TopicPartition tp);
>>
>> void commit(TopicPartitionOffset...);
>>
>>
>>
>> This is not symmetric between position/seek and commit/committed
>>
>> but
>>
>> it
>>
>> is
>>
>> convenient. Another option for naming would be
>>
>> position/reposition
>>
>> instead
>>
>> of position/seek.
>>
>>
>>
>> With respect to the name TopicPartitionOffset, what I was trying
>>
>> to
>>
>> say
>>
>> is
>>
>> that I recommend we change that to something shorter. I think
>>
>> TopicPosition
>>
>> or ConsumerPosition might be better. Position does not refer to
>>
>> the variables in the object, it refers to the meaning of the
>>
>> object--it represents a position within a topic. The offset
>>
>> field in that object
>>
>> is
>>
>> still called the offset. TopicOffset, PartitionOffset, or
>>
>> ConsumerOffset
>>
>> would all be workable too. Basically I am just objecting to
>>
>> concatenating
>>
>> three nouns together. :-)
>>
>>
>>
>> -Jay
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede <
>>
>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:
>> neha.narkh...@gmail.com><mailto:
>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>><mailto:
>> neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:
>> neha.narkh...@gmail.com>>
>>
>> wrote:
>>
>>
>>
>> 2. It returns a list of results. But how can you use the list?
>>
>> The
>>
>> only
>>
>> way
>>
>> to use the list is to make a map of tp=>offset and then look
>>
>> up
>>
>> results
>>
>> in
>>
>> this map (or do a for loop over the list for the partition you
>>
>> want). I
>>
>> recommend that if this is an in-memory check we just do one at
>>
>> a
>>
>> time.
>>
>> E.g.
>>
>> long committedPosition(
>>
>> TopicPosition).
>>
>>
>>
>> This was discussed in the previous emails. There is a choic
>>
>>
>>
>>
>> --
>> Robert Withers
>> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto:
>> robert.with...@dish.com><mailto:
>> robert.with...@dish.com<mailto:robert.with...@dish.com>>
>> c: 303.919.5856
>>
>>
>>
>> --
>> Robert Withers
>> robert.with...@dish.com<mailto:robert.with...@dish.com><mailto:
>> robert.with...@dish.com>
>> c: 303.919.5856
>>
>>
>>
>>
>>
>>
>> --
>> -- Guozhang
>>
>>
>>
>> --
>> Robert Withers
>> robert.with...@dish.com<mailto:robert.with...@dish.com>
>> c: 303.919.5856
>>
>>
>

Reply via email to