Ah, gotcha.

-Jay


On Wed, Feb 12, 2014 at 8:40 AM, Neha Narkhede <neha.narkh...@gmail.com>wrote:

> Jay
>
> Well none kind of address the common case which is to commit all
> partitions. For these I was thinking just
>    commit();
> The advantage of this simpler method is that you don't need to bother about
> partitions you just consume the messages given to you and then commit them
>
> This is already what the commit() API is supposed to do. Here is the
> javadoc -
>
>     * Synchronously commits the specified offsets for the specified list of
> topics and partitions to Kafka. If no partitions are specified,
>      * commits offsets for the subscribed list of topics and partitions to
> Kafka.
>
>     public void commit(TopicPartitionOffset... offsets);
>
> Could you take another look at the
> javadoc<
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
> >?
> I've uploaded changes from the previous discussions and included some of
> your review suggestions.
>
>
>
> On Wed, Feb 12, 2014 at 8:32 AM, Neha Narkhede <neha.narkh...@gmail.com
> >wrote:
>
> > Imran,
> >
> >
> > Sorry I am probably missing
> > something basic, but I'm not sure how a multi-threaded consumer would
> > work.  I can imagine its either:
> >
> > a) I just have one thread poll kafka.  If I want to process msgs in
> > multiple threads, than I deal w/ that after polling, eg. stick them into
> a
> > blocking queue or something, and have more threads that read from the
> > queue.
> >
> > b) each thread creates its own KafkaConsumer.  They are all registered
> the
> > same way, and I leave it to kafka to figure out what data to give to each
> > one.
> >
> > We designed the new consumer API to not require multi threading on
> > purpose.
> > The reason this is better than the existing ZookeeperConsumerConnector is
> > that
> > it effectively allows the user to use whatever threading and load balance
> > message
> > processing amongst those threads. For example, you might want more
> threads
> > dedicated
> > to a certain high throughput partition compared to other partitions. In
> > option a) above, you can
> > create your own thread pool and hand over the messages returned by poll
> > using a blocking
> > queue or any other approach. Option b) would work as well and the user
> > has to figure out which
> > topics each KafkaConsumer subscribes to.
> >
> >
> > (a) certainly makes things simple, but I worry about throughput -- is
> that
> > just as good as having one thread trying to consumer each partition?
> >
> > (b) makes it a bit of a pain to figure out how many threads to use.  I
> > assume there is no point in using more threads than there are partitions,
> > so first you've got to figure out how many partitions there are in each
> > topic.  Might be nice if there were some util functions to simplify this.
> >
> > The user can pick the number of threads. That is still better as only the
> > user knows how
> > slow/fast the message processing of her application is.
> >
> > Also, since the initial call to subscribe doesn't give the partition
> > assignment, does that mean the first call to poll() will always call the
> > ConsumerRebalanceCallback?
> >
> > Assuming you choose to use group management (by using subscribe(topics)),
> > poll() will invoke
> > the ConsumerRebalanceCallback on every single rebalance attempt. Improved
> > the javadoc<
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html
> >to
> > explain that. Could you give that another look?
> >
> > If I'm on the right track, I'd like to expand this example, showing how
> > each "MyConsumer" can keep track of its partitions & offsets, even in the
> > face of rebalances.  As Jay said, I think a minimal code example could
> > really help us see the utility & faults of the api.
> >
> > Sure, please look at the javadoc<
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
> >.
> > I've tried to include code examples there. Please help in
> > improving those or adding more. Looks like we should add some multi
> > threading examples. I avoided
> > adding those since there are many ways to handling the message processing
> > and it will not be feasible
> > to list all of those. If we list one, people might think that is the only
> > recommended approach.
> >
> > With that said, here is an example of using Option b) above -
> >
> >
> > List<MyConsumer> consumers = new ArrayList<MyConsumer>();
> > List<String> topics = new ArrayList<String();
> > // populate topics
> > assert(consumers.size == topics.size);
> >
> > for (int i = 0; i < numThreads; i++) {
> >   MyConsumer c = new MyConsumer();
> >   c.subscribe(topics(i));
> >   consumers.add(c);
> > }
> > // poll each consumer in a separate thread.
> > for (int i = 0; i < numThreads; i++) {
> >    executorService.submit(new Runnable() {
> >         @Override
> >          public void run() {
> >              new ProcessMessagesTask(consumers(i));
> >          }
> >    });
> > }
> >
> > Let me know what you think.
> >
> > Thanks,
> > Neha
> >
> > On Tue, Feb 11, 2014 at 3:54 PM, Jay Kreps <jay.kr...@gmail.com> wrote:
> >
> >> Comments inline:
> >>
> >>
> >> On Mon, Feb 10, 2014 at 2:31 PM, Guozhang Wang <wangg...@gmail.com>
> >> wrote:
> >>
> >> > Hello Jay,
> >> >
> >> > Thanks for the detailed comments.
> >> >
> >> > 1. Yeah we could discuss a bit more on that.
> >> >
> >> > 2. Since subscribe() is incremental, adding one topic-partition is OK,
> >> and
> >> > personally I think it is cleaner than subscribe(String topic,
> >> > int...partition)?
> >> >
> >> I am not too particular. Have you actually tried this? I think writing
> >> actual sample code is important.
> >>
> >>
> >> > 3. Originally I was thinking about two interfaces:
> >> >
> >> > getOffsets() // offsets for all partitions that I am consuming now
> >> >
> >> > getOffset(topc-partition) // offset of the specified topic-partition,
> >> will
> >> > throw exception if it is not currently consumed.
> >> >
> >> > What do you think about these?
> >> >
> >>
> >> The naming needs to distinguish committed offset position versus fetch
> >> offset position. Also we aren't using the getX convention.
> >>
> >>
> >> > 4. Yes, that remains a config.
> >> >
> >>
> >> Does that make sense given that you change your position via an api now?
> >>
> >>
> >> > 5. Agree.
> >> >
> >> > 6. If the time out value is null then it will "logically" return
> >> > immediately with whatever data is available. I think an indefinitely
> >> poll()
> >> > function could be replaced with just
> >> >
> >> > while (true) poll(some-time)?
> >> >
> >>
> >> That is fine but we should provide a no arg poll for that, poll(null)
> >> isn't
> >> clear. We should add the timeunit as per the post java 5 convention as
> >> that
> >> makes the call more readable. E.g.
> >>    poll(5) vs poll(5, TimeUnit.MILLISECONDS)
> >>
> >>
> >> > 7. I am open with either approach.
> >> >
> >>
> >> Cool.
> >>
> >> 8. I was thinking about two interfaces for the commit functionality:
> >> >
> >> >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
> >> >
> >> > Do those sound better?
> >> >
> >>
> >> Well none kind of address the common case which is to commit all
> >> partitions. For these I was thinking just
> >>    commit();
> >> The advantage of this simpler method is that you don't need to bother
> >> about
> >> partitions you just consume the messages given to you and then commit
> >> them.
> >>
> >> 9. Currently I think about un-subscribe as "close and re-subscribe", and
> >> > would like to hear people's opinion about it.
> >> >
> >>
> >> Hmm, I think it is a little weird if there is a subscribe which can be
> >> called at any time but no unsubscribe. Would this be hard to do.
> >>
> >>
> >> > 10. Yes. Position() is an API function, and as and API it means "be
> >> called
> >> > at any time" and will change the next fetching starting offset.
> >> >
> >>
> >> Cool.
> >>
> >>
> >> > 11. The ConsumerRecord would have the offset info of the message. Is
> >> that
> >> > what you want?
> >> >
> >>
> >> But that is only after I have gotten a message. I'm not sure if that
> >> covers
> >> all cases or not.
> >>
> >>
> >> > About use cases: great point. I will add some more examples of using
> the
> >> > API functions in the wiki pages.
> >> >
> >> > Guozhang
> >> >
> >> >
> >> >
> >> >
> >> > On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps <jay.kr...@gmail.com>
> >> wrote:
> >> >
> >> > > A few items:
> >> > > 1. ConsumerRebalanceCallback
> >> > >    a. onPartitionsRevoked would be a better name.
> >> > >    b. We should discuss the possibility of splitting this into two
> >> > > interfaces. The motivation would be that in Java 8 single method
> >> > interfaces
> >> > > can directly take methods which might be more intuitive.
> >> > >    c. If we stick with a single interface I would prefer the name
> >> > > RebalanceCallback as its more concise
> >> > > 2. Should subscribe(String topic, int partition) should be
> >> > subscribe(String
> >> > > topic, int...partition)?
> >> > > 3. Is lastCommittedOffset call just a local access? If so it would
> be
> >> > more
> >> > > convenient not to batch it.
> >> > > 4. How are we going to handle the earliest/latest starting position
> >> > > functionality we currently have. Does that remain a config?
> >> > > 5. Do we need to expose the general ability to get known positions
> >> from
> >> > the
> >> > > log? E.g. the functionality in the OffsetRequest...? That would make
> >> the
> >> > > ability to change position a little easier.
> >> > > 6. Should poll(java.lang.Long timeout) be poll(long timeout,
> TimeUnit
> >> > > unit)? Is it Long because it allows null? If so should we just add a
> >> > poll()
> >> > > that polls indefinitely?
> >> > > 7. I recommend we remove the boolean parameter from commit as it is
> >> > really
> >> > > hard to read code that has boolean parameters without named
> arguments.
> >> > Can
> >> > > we make it something like commit(...) and commitAsync(...)?
> >> > > 8. What about the common case where you just want to commit the
> >> current
> >> > > position for all partitions?
> >> > > 9. How do you unsubscribe?
> >> > > 10. You say in a few places that positions() only impacts the
> starting
> >> > > position, but surely that isn't the case, right? Surely it controls
> >> the
> >> > > fetch position for that partition and can be called at any time?
> >> > Otherwise
> >> > > it is a pretty weird api, right?
> >> > > 11. How do I get my current position? Not the committed position but
> >> the
> >> > > offset of the next message that will be given to me?
> >> > >
> >> > > One thing that I really found helpful for the API design was writing
> >> out
> >> > > actual code for different scenarios against the API. I think it
> might
> >> be
> >> > > good to do that for this too--i.e. enumerate the various use cases
> and
> >> > code
> >> > > that use case up to see how it looks. I'm not sure if it would be
> >> useful
> >> > to
> >> > > collect these kinds of scenarios from people. I know they have
> >> > sporadically
> >> > > popped up on the mailing list.
> >> > >
> >> > > -Jay
> >> > >
> >> > >
> >> > > On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede <
> >> neha.narkh...@gmail.com
> >> > > >wrote:
> >> > >
> >> > > > As mentioned in previous emails, we are also working on a
> >> > > re-implementation
> >> > > > of the consumer. I would like to use this email thread to discuss
> >> the
> >> > > > details of the public API. I would also like us to be picky about
> >> this
> >> > > > public api now so it is as good as possible and we don't need to
> >> break
> >> > it
> >> > > > in the future.
> >> > > >
> >> > > > The best way to get a feel for the API is actually to take a look
> at
> >> > the
> >> > > > javadoc<
> >> > > >
> >> > >
> >> >
> >>
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
> >> > > > >,
> >> > > > the hope is to get the api docs good enough so that it is
> >> > > self-explanatory.
> >> > > > You can also take a look at the configs
> >> > > > here<
> >> > > >
> >> > >
> >> >
> >>
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html
> >> > > > >
> >> > > >
> >> > > > Some background info on implementation:
> >> > > >
> >> > > > At a high level the primary difference in this consumer is that it
> >> > > removes
> >> > > > the distinction between the "high-level" and "low-level" consumer.
> >> The
> >> > > new
> >> > > > consumer API is non blocking and instead of returning a blocking
> >> > > iterator,
> >> > > > the consumer provides a poll() API that returns a list of records.
> >> We
> >> > > think
> >> > > > this is better compared to the blocking iterators since it
> >> effectively
> >> > > > decouples the threading strategy used for processing messages from
> >> the
> >> > > > consumer. It is worth noting that the consumer is entirely single
> >> > > threaded
> >> > > > and runs in the user thread. The advantage is that it can be
> easily
> >> > > > rewritten in less multi-threading-friendly languages. The consumer
> >> > > batches
> >> > > > data and multiplexes I/O over TCP connections to each of the
> >> brokers it
> >> > > > communicates with, for high throughput. The consumer also allows
> >> long
> >> > > poll
> >> > > > to reduce the end-to-end message latency for low throughput data.
> >> > > >
> >> > > > The consumer provides a group management facility that supports
> the
> >> > > concept
> >> > > > of a group with multiple consumer instances (just like the current
> >> > > > consumer). This is done through a custom heartbeat and group
> >> management
> >> > > > protocol transparent to the user. At the same time, it allows
> users
> >> the
> >> > > > option to subscribe to a fixed set of partitions and not use group
> >> > > > management at all. The offset management strategy defaults to
> Kafka
> >> > based
> >> > > > offset management and the API provides a way for the user to use a
> >> > > > customized offset store to manage the consumer's offsets.
> >> > > >
> >> > > > A key difference in this consumer also is the fact that it does
> not
> >> > > depend
> >> > > > on zookeeper at all.
> >> > > >
> >> > > > More details about the new consumer design are
> >> > > > here<
> >> > > >
> >> > >
> >> >
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
> >> > > > >
> >> > > >
> >> > > > Please take a look at the new
> >> > > > API<
> >> > > >
> >> > >
> >> >
> >>
> http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
> >> > > > >and
> >> > > > give us any thoughts you may have.
> >> > > >
> >> > > > Thanks,
> >> > > > Neha
> >> > > >
> >> > >
> >> >
> >> >
> >> >
> >> > --
> >> > -- Guozhang
> >> >
> >>
> >
> >
>

Reply via email to