For ellipsis, sometimes you may have to make a single batch call, instead
of multiple individual calls. An example would be commit(). I think either
way is fine. We just need to be aware of the implication.

Thanks,

Jun


On Tue, Feb 25, 2014 at 9:49 AM, Neha Narkhede <neha.narkh...@gmail.com>wrote:

> Thanks for the review, Jun. Here are some comments -
>
> 1. The using of ellipsis: This may make passing a list of items from a
> collection to the api a bit harder. Suppose that you have a list of topics
> stored in
>
> ArrayList<String> topics;
>
> If you want subscribe to all topics in one call, you will have to do:
>
> String[] topicArray = new String[topics.size()];
> consumer.subscribe(topics.
> toArray(topicArray));
>
> A similar argument can be made for arguably the more common use case of
> subscribing to a single topic as well. In these cases, user is required to
> write more
> code to create a single item collection and pass it in. Since subscription
> is extremely lightweight
> invoking it multiple times also seems like a workable solution, no?
>
> 2. It would be good to document that the following apis are mutually
> exclusive. Also, if the partition level subscription is specified, there is
> no group management. Finally, unsubscribe() can only be used to cancel
> subscriptions with the same pattern. For example, you can't unsubscribe at
> the partition level if the subscription is done at the topic level.
>
> *subscribe*(java.lang.String... topics)
> *subscribe*(java.lang.String topic, int... partitions)
>
> Makes sense. Made the suggested improvements to the
> docs<
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29
> >
>
> 3.commit(): The following comment in the doc should probably say "commit
> offsets for partitions assigned to this consumer".
>
>  If no partitions are specified, commits offsets for the subscribed list of
> topics and partitions to Kafka.
>
> Could you give more context on this suggestion? Here is the entire doc -
>
> Synchronously commits the specified offsets for the specified list of
> topics and partitions to *Kafka*. If no partitions are specified, commits
> offsets for the subscribed list of topics and partitions.
>
> The hope is to convey that if no partitions are specified, offsets will be
> committed for the subscribed list of partitions. One improvement could be
> to
> explicitly state that the offsets returned on the last poll will be
> committed. I updated this to -
>
> Synchronously commits the specified offsets for the specified list of
> topics and partitions to *Kafka*. If no offsets are specified, commits
> offsets returned on the last {@link #poll(long, TimeUnit) poll()} for the
> subscribed list of topics and partitions.
>
> 4. There is inconsistency in specifying partitions. Sometimes we use
> TopicPartition and some other times we use String and int (see
> examples below).
>
> void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions)
>
> public void *subscribe*(java.lang.String topic, int... partitions)
>
> Yes, this was discussed previously. I think generally the consensus seems
> to be to use the higher level
> classes everywhere. Made those changes.
>
> What's the use case of position()? Isn't that just the nextOffset() on the
> last message returned from poll()?
>
> Yes, except in the case where a rebalance is triggered and poll() is not
> yet invoked. Here, you would use position() to get the new fetch position
> for the specific partition. Even if this is not a common use case, IMO it
> is much easier to use position() to get the fetch offset than invoking
> nextOffset() on the last message. This also keeps the APIs symmetric, which
> is nice.
>
>
>
>
> On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert <robert.with...@dish.com
> >wrote:
>
> > That's wonderful.  Thanks for kafka.
> >
> > Rob
> >
> > On Feb 24, 2014, at 9:58 AM, Guozhang Wang <wangg...@gmail.com<mailto:
> > wangg...@gmail.com>> wrote:
> >
> > Hi Robert,
> >
> > Yes, you can check out the callback functions in the new API
> >
> > onPartitionDesigned
> > onPartitionAssigned
> >
> > and see if they meet your needs.
> >
> > Guozhang
> >
> >
> > On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert <
> robert.with...@dish.com
> > <mailto:robert.with...@dish.com>>wrote:
> >
> > Jun,
> >
> > Are you saying it is possible to get events from the high-level consumer
> > regarding various state machine changes?  For instance, can we get a
> > notification when a rebalance starts and ends, when a partition is
> > assigned/unassigned, when an offset is committed on a partition, when a
> > leader changes and so on?  I call this OOB traffic, since they are not
> the
> > core messages streaming, but side-band events, yet they are still
> > potentially useful to consumers.
> >
> > Thank you,
> > Robert
> >
> >
> > Robert Withers
> > Staff Analyst/Developer
> > o: (720) 514-8963
> > c:  (571) 262-1873
> >
> >
> >
> > -----Original Message-----
> > From: Jun Rao [mailto:jun...@gmail.com]
> > Sent: Sunday, February 23, 2014 4:19 PM
> > To: users@kafka.apache.org<mailto:users@kafka.apache.org>
> > Subject: Re: New Consumer API discussion
> >
> > Robert,
> >
> > For the push orient api, you can potentially implement your own
> > MessageHandler with those methods. In the main loop of our new consumer
> > api, you can just call those methods based on the events you get.
> >
> > Also, we already have an api to get the first and the last offset of a
> > partition (getOffsetBefore).
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert
> > <robert.with...@dish.com<mailto:robert.with...@dish.com>>wrote:
> >
> > This is a good idea, too.  I would modify it to include stream
> > marking, then you can have:
> >
> > long end = consumer.lastOffset(tp);
> > consumer.setMark(end);
> > while(consumer.beforeMark()) {
> >   process(consumer.pollToMark());
> > }
> >
> > or
> >
> > long end = consumer.lastOffset(tp);
> > consumer.setMark(end);
> > for(Object msg : consumer.iteratorToMark()) {
> >   process(msg);
> > }
> >
> > I actually have 4 suggestions, then:
> >
> > *   pull: stream marking
> > *   pull: finite streams, bound by time range (up-to-now, yesterday) or
> > offset
> > *   pull: async api
> > *   push: KafkaMessageSource, for a push model, with msg and OOB events.
> > Build one in either individual or chunk mode and have a listener for
> > each msg or a listener for a chunk of msgs.  Make it composable and
> > policy driven (chunked, range, commitOffsets policy, retry policy,
> > transactional)
> >
> > Thank you,
> > Robert
> >
> > On Feb 22, 2014, at 11:21 AM, Jay Kreps <jay.kr...@gmail.com<mailto:
> > jay.kr...@gmail.com><mailto:
> > jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>>> wrote:
> >
> > I think what Robert is saying is that we need to think through the
> > offset API to enable "batch processing" of topic data. Think of a
> > process that periodically kicks off to compute a data summary or do a
> > data load or something like that. I think what we need to support this
> > is an api to fetch the last offset from the server for a partition.
> > Something like
> >  long lastOffset(TopicPartition tp)
> > and for symmetry
> >  long firstOffset(TopicPartition tp)
> >
> > Likely this would have to be batched. Essentially we should add this
> > use case to our set of code examples to write and think through.
> >
> > The usage would be something like
> >
> > long end = consumer.lastOffset(tp);
> > while(consumer.position < end)
> >   process(consumer.poll());
> >
> > -Jay
> >
> >
> > On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert
> > <robert.with...@dish.com<mailto:robert.with...@dish.com>
> > <mailto:robert.with...@dish.com>>wrote:
> >
> > Jun,
> >
> > I was originally thinking a non-blocking read from a distributed
> > stream should distinguish between "no local messages, but a fetch is
> > occurring"
> > versus "you have drained the stream".  The reason this may be valuable
> > to me is so I can write consumers that read all known traffic then
> > terminate.
> > You caused me to reconsider and I think I am conflating 2 things.  One
> > is a sync/async api while the other is whether to have an infinite or
> > finite stream.  Is it possible to build a finite KafkaStream on a
> > range of messages?
> >
> > Perhaps a Simple Consumer would do just fine and then I could start
> > off getting the writeOffset from zookeeper and tell it to read a
> > specified range per partition.  I've done this and forked a simple
> > consumer runnable for each partition, for one of our analyzers.  The
> > great thing about the high-level consumer is that rebalance, so I can
> > fork however many stream readers I want and you just figure it out for
> > me.  In that way you offer us the control over the resource
> > consumption within a pull model.  This is best to regulate message
> > pressure, they say.
> >
> > Combining that high-level rebalance ability with a ranged partition
> > drain could be really nice...build the stream with an ending position
> > and it is a finite stream, but retain the high-level rebalance.  With
> > a finite stream, you would know the difference of the 2 async
> > scenarios: fetch-in-progress versus end-of-stream.  With an infinite
> > stream, you never get end-of-stream.
> >
> > Aside from a high-level consumer over a finite range within each
> > partition, the other feature I can think of is more complicated.  A
> > high-level consumer has state machine changes that the client cannot
> > access, to my knowledge.  Our use of kafka has us invoke a message
> > handler with each message we consumer from the KafkaStream, so we
> > convert a pull-model to a push-model.  Including the idea of receiving
> > notifications from state machine changes, what would be really nice is
> > to have a KafkaMessageSource, that is an eventful push model.  If it
> > were thread-safe, then we could register listeners for various events:
> >
> > *   opening-stream
> > *   closing-stream
> > *   message-arrived
> > *   end-of-stream/no-more-messages-in-partition (for finite streams)
> > *   rebalance started
> > *   partition assigned
> > *   partition unassigned
> > *   rebalance finished
> > *   partition-offset-committed
> >
> > Perhaps that is just our use, but instead of a pull-oriented
> > KafkaStream, is there any sense in your providing a push-oriented
> > KafkaMessageSource publishing OOB messages?
> >
> > thank you,
> > Robert
> >
> > On Feb 21, 2014, at 5:59 PM, Jun Rao <jun...@gmail.com<mailto:
> > jun...@gmail.com><mailto:
> > jun...@gmail.com<mailto:jun...@gmail.com>><mailto:
> > jun...@gmail.com<mailto:jun...@gmail.com><mailto:jun...@gmail.com>>>
> > wrote:
> >
> > Robert,
> >
> > Could you explain why you want to distinguish btw
> > FetchingInProgressException and NoMessagePendingException? The
> > nextMsgs() method that you want is exactly what poll() does.
> >
> > Thanks,
> >
> > Jun
> >
> >
> > On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert
> > <robert.with...@dish.com<mailto:robert.with...@dish.com> <mailto:
> > robert.with...@dish.com>
> > <mailto:robert.with...@dish.com>>wrote:
> >
> > I am not clear on why the consumer stream should be positionable,
> > especially if it is limited to the in-memory fetched messages.  Could
> > someone explain to me, please?  I really like the idea of committing
> > the offset specifically on those partitions with changed read offsets,
> > only.
> >
> >
> >
> > 2 items I would like to see added to the KafkaStream are:
> >
> > *         a non-blocking next(), throws several exceptions
> > (FetchingInProgressException and a NoMessagePendingException or
> > something) to differentiate between fetching or no messages left.
> >
> > *         A nextMsgs() method which returns all locally available
> > messages
> > and kicks off a fetch for the next chunk.
> >
> >
> >
> > If you are trying to add transactional features, then formally define
> > a DTP capability and pull in other server frameworks to share the
> > implementation.  Should it be XA/Open?  How about a new peer2peer DTP
> > protocol?
> >
> >
> >
> > Thank you,
> >
> > Robert
> >
> >
> >
> > Robert Withers
> >
> > Staff Analyst/Developer
> >
> > o: (720) 514-8963
> >
> > c:  (571) 262-1873
> >
> >
> >
> > -----Original Message-----
> > From: Jay Kreps [mailto:jay.kr...@gmail.com]
> > Sent: Sunday, February 16, 2014 10:13 AM
> > To: users@kafka.apache.org<mailto:users@kafka.apache.org><mailto:
> > users@kafka.apache.org><mailto:
> > users@kafka.apache.org<mailto:users@kafka.apache.org>>
> > Subject: Re: New Consumer API discussion
> >
> >
> >
> > +1 I think those are good. It is a little weird that changing the
> > +fetch
> >
> > point is not batched but changing the commit point is, but I suppose
> > there is no helping that.
> >
> >
> >
> > -Jay
> >
> >
> >
> >
> >
> > On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede
> > <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com> <mailto:
> > neha.narkh...@gmail.com>
> > <mailto:neha.narkh...@gmail.com>
> > <mailto:neha.narkh...@gmail.com>>wrote:
> >
> >
> >
> > Jay,
> >
> >
> >
> > That makes sense. position/seek deal with changing the consumers
> >
> > in-memory data, so there is no remote rpc there. For some reason, I
> >
> > got committed and seek mixed up in my head at that time :)
> >
> >
> >
> > So we still end up with
> >
> >
> >
> > long position(TopicPartition tp)
> >
> > void seek(TopicPartitionOffset p)
> >
> > Map<TopicPartition, Long> committed(TopicPartition tp);
> >
> > void commit(TopicPartitionOffset...);
> >
> >
> >
> > Thanks,
> >
> > Neha
> >
> >
> >
> > On Friday, February 14, 2014, Jay Kreps <jay.kr...@gmail.com<mailto:
> > jay.kr...@gmail.com><mailto:
> > jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto:
> > jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
> jay.kr...@gmail.com
> > >><mailto:
> > jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
> jay.kr...@gmail.com
> > ><mailto:jay.kreps@gmail
> > .com>>>
> > wrote:
> >
> >
> >
> > Oh, interesting. So I am assuming the following implementation:
> >
> > 1. We have an in-memory fetch position which controls the next fetch
> >
> > offset.
> >
> > 2. Changing this has no effect until you poll again at which point
> >
> > your fetch request will be from the newly specified offset 3. We
> >
> > then have an in-memory but also remotely stored committed offset.
> >
> > 4. Calling commit has the effect of saving the fetch position as
> >
> > both the in memory committed position and in the remote store 5.
> >
> > Auto-commit is the same as periodically calling commit on all
> >
> > positions.
> >
> >
> >
> > So batching on commit as well as getting the committed position
> >
> > makes sense, but batching the fetch position wouldn't, right? I
> >
> > think you are actually thinking of a different approach.
> >
> >
> >
> > -Jay
> >
> >
> >
> >
> >
> > On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede
> >
> > <neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:
> > neha.narkh...@gmail.com><mailto:
> > neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>>
> >
> > <javascript:;>
> >
> > wrote:
> >
> >
> >
> > I think you are saying both, i.e. if you have committed on a
> >
> > partition it returns you that value but if you
> >
> > haven't
> >
> > it does a remote lookup?
> >
> >
> >
> > Correct.
> >
> >
> >
> > The other argument for making committed batched is that commit()
> >
> > is batched, so there is symmetry.
> >
> >
> >
> > position() and seek() are always in memory changes (I assume) so
> >
> > there
> >
> > is
> >
> > no need to batch them.
> >
> >
> >
> > I'm not as sure as you are about that assumption being true.
> >
> > Basically
> >
> > in
> >
> > my example above, the batching argument for committed() also
> >
> > applies to
> >
> > position() since one purpose of fetching a partition's offset is
> >
> > to use
> >
> > it
> >
> > to set the position of the consumer to that offset. Since that
> >
> > might
> >
> > lead
> >
> > to a remote OffsetRequest call, I think we probably would be
> >
> > better off batching it.
> >
> >
> >
> > Another option for naming would be position/reposition instead of
> >
> > position/seek.
> >
> >
> >
> > I think position/seek is better since it aligns with Java file APIs.
> >
> >
> >
> > I also think your suggestion about ConsumerPosition makes sense.
> >
> >
> >
> > Thanks,
> >
> > Neha
> >
> > On Feb 13, 2014 9:22 PM, "Jay Kreps" <jay.kr...@gmail.com<mailto:
> > jay.kr...@gmail.com><mailto:
> > jay.kr...@gmail.com<mailto:jay.kr...@gmail.com>><mailto:
> > jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
> jay.kr...@gmail.com
> > >><mailto:
> > jay.kr...@gmail.com<mailto:jay.kr...@gmail.com><mailto:
> jay.kr...@gmail.com
> > ><mailto:jay.kreps@gmail
> > .com>>>
> > wrote:
> >
> >
> >
> > Hey Neha,
> >
> >
> >
> > I actually wasn't proposing the name TopicOffsetPosition, that
> >
> > was
> >
> > just a
> >
> > typo. I meant TopicPartitionOffset, and I was just referencing
> >
> > what
> >
> > was
> >
> > in
> >
> > the javadoc. So to restate my proposal without the typo, using
> >
> > just
> >
> > the
> >
> > existing classes (that naming is a separate question):
> >
> > long position(TopicPartition tp)
> >
> > void seek(TopicPartitionOffset p)
> >
> > long committed(TopicPartition tp)
> >
> > void commit(TopicPartitionOffset...);
> >
> >
> >
> > So I may be unclear on committed() (AKA lastCommittedOffset). Is
> >
> > it returning the in-memory value from the last commit by this
> >
> > consumer,
> >
> > or
> >
> > is
> >
> > it doing a remote fetch, or both? I think you are saying both, i.e.
> >
> > if
> >
> > you
> >
> > have committed on a partition it returns you that value but if
> >
> > you
> >
> > haven't
> >
> > it does a remote lookup?
> >
> >
> >
> > The other argument for making committed batched is that commit()
> >
> > is batched, so there is symmetry.
> >
> >
> >
> > position() and seek() are always in memory changes (I assume) so
> >
> > there
> >
> > is
> >
> > no need to batch them.
> >
> >
> >
> > So taking all that into account what if we revise it to
> >
> > long position(TopicPartition tp)
> >
> > void seek(TopicPartitionOffset p)
> >
> > Map<TopicPartition, Long> committed(TopicPartition tp);
> >
> > void commit(TopicPartitionOffset...);
> >
> >
> >
> > This is not symmetric between position/seek and commit/committed
> >
> > but
> >
> > it
> >
> > is
> >
> > convenient. Another option for naming would be
> >
> > position/reposition
> >
> > instead
> >
> > of position/seek.
> >
> >
> >
> > With respect to the name TopicPartitionOffset, what I was trying
> >
> > to
> >
> > say
> >
> > is
> >
> > that I recommend we change that to something shorter. I think
> >
> > TopicPosition
> >
> > or ConsumerPosition might be better. Position does not refer to
> >
> > the variables in the object, it refers to the meaning of the
> >
> > object--it represents a position within a topic. The offset
> >
> > field in that object
> >
> > is
> >
> > still called the offset. TopicOffset, PartitionOffset, or
> >
> > ConsumerOffset
> >
> > would all be workable too. Basically I am just objecting to
> >
> > concatenating
> >
> > three nouns together. :-)
> >
> >
> >
> > -Jay
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede <
> >
> > neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:
> > neha.narkh...@gmail.com><mailto:
> > neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com>><mailto:
> > neha.narkh...@gmail.com<mailto:neha.narkh...@gmail.com><mailto:
> > neha.narkh...@gmail.com>>
> >
> > wrote:
> >
> >
> >
> > 2. It returns a list of results. But how can you use the list?
> >
> > The
> >
> > only
> >
> > way
> >
> > to use the list is to make a map of tp=>offset and then look
> >
> > up
> >
> > results
> >
> > in
> >
> > this map (or do a for loop over the list for the partition you
> >
> > want). I
> >
> > recommend that if this is an in-memory check we just do one at
> >
> > a
> >
> > time.
> >
> > E.g.
> >
> > long committedPosition(
> >
> > TopicPosition).
> >
> >
> >
> > This was discussed in the previous emails. There is a choic
> >
> >
> >
> >
> > --
> > Robert Withers
> > robert.with...@dish.com<mailto:robert.with...@dish.com><mailto:
> > robert.with...@dish.com><mailto:
> > robert.with...@dish.com<mailto:robert.with...@dish.com>>
> > c: 303.919.5856
> >
> >
> >
> > --
> > Robert Withers
> > robert.with...@dish.com<mailto:robert.with...@dish.com><mailto:
> > robert.with...@dish.com>
> > c: 303.919.5856
> >
> >
> >
> >
> >
> >
> > --
> > -- Guozhang
> >
> >
> >
> > --
> > Robert Withers
> > robert.with...@dish.com<mailto:robert.with...@dish.com>
> > c: 303.919.5856
> >
> >
>

Reply via email to