Re: New Consumer API discussion

2014-03-27 Thread Neha Narkhede
If people don't have any more thoughts on this, I will go ahead and submit
a reviewboard to https://issues.apache.org/jira/browse/KAFKA-1328.

Thanks,
Neha


On Mon, Mar 24, 2014 at 5:39 PM, Neha Narkhede neha.narkh...@gmail.comwrote:

 I took some time to write some example code using the new consumer APIs to
 cover a range of use cases. This exercise was very useful (thanks for the
 suggestion, Jay!) since I found several improvements to the APIs to make
 them more usable. Here are some of the 
 changeshttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/I
  made -

 1. Added usage examples to the KafkaConsumer 
 javadochttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html.
 I find it useful for the examples to be in the javadoc vs some wiki. Please
 go through these examples and suggest improvements. The goal would be to
 document a limited set of examples that cover every major use case.
 2. All APIs that either accept or return offsets are changed to
 MapTopicPartition,Long instead of TopicPartitionOffset... In all the
 examples that I wrote, it was much easier to deal with offsets and pass
 them around in the consumer APIs if they were maps instead of lists
 3. Due to the above change, I had to introduce 
 commit()http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html#commit%28%29and
  commitAsync() APIs explicitly, in addition to
 commit(MapTopicPartition,Long offsets) and
 commitAsync(MapTopicPartition,Long offsets), since the no-argument case
 would not be covered automatically with Map as the input parameter to the
 commit APIs
 4. Offset rewind logic is funky with group management. I took a stab and
 it and wrote examples to cover the various offset rewind uses cases I could
 think of. I'm not so sure I like it, so I encourage people to take a look
 at the examples and provide feedback. This feedback is very critical in
 finalizing the consumer APIs as we might have to add/change APIs to make
 offset rewind intuitive and easy to use. (Please see the 3rd and 4th
 examples 
 herehttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
 )

 Once I have feedback on the above, I will go ahead and submit a review
 board for the new APIs and javadoc.

 Thanks
 Neha


 On Mon, Mar 24, 2014 at 5:29 PM, Neha Narkhede neha.narkh...@gmail.comwrote:

 Hey Chris,

 Really sorry for the late reply, wonder how this fell through the cracks.
 Anyhow, thanks for the great feedback! Here are my comments -


 1. Why is the config String-Object instead of String-String?

 This is probably more of a feedback about the new config management that
 we adopted in the new clients. I think it is more convenient to write
 configs.put(a, 42);
 instead of
 configs.put(a, Integer.toString(42));

 2. Are these Java docs correct?

   KafkaConsumer(java.util.Map
 java.lang.String,java.lang.Object configs)
   A consumer is instantiated by providing a set of key-value pairs as
 configuration and a ConsumerRebalanceCallback implementation

 There is no ConsumerRebalanceCallback parameter.

 Fixed.


 3. Would like to have a method:

   poll(long timeout, java.util.concurrent.TimeUnit timeUnit,
 TopicPartition... topicAndPartitionsToPoll)

 I see I can effectively do this by just fiddling with subscribe and
 unsubscribe before each poll. Is this a low-overhead operation? Can I just
 unsubscribe from everything after each poll, then re-subscribe to a topic
 the next iteration. I would probably be doing this in a fairly tight loop.

 The subscribe and unsubscribe will be very lightweight in-memory
 operations,
 so it shouldn't be a problem to just use those APIs directly.
 Let me know if you think otherwise.

 4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there
 are use cases for decoupling what to do when no offset exists from what
 to do when I'm out of range. I might want to start from smallest the
 first time I run, but fail if I ever get offset out of range.

 How about adding a third option disable to auto.offset.reset?
 What this says is that never automatically reset the offset, either if
 one is not found or if the offset
 falls out of range. Presumably, you would want to turn this off when you
 want to control the offsets
 yourself and use custom rewind/replay logic to reset the consumer's
 offset. In this case, you would
 want to turn this feature off so Kafka does not accidentally reset the
 offset to something else.

 I'm not so sure when you would want to make the distinction regarding
 startup and offset falling out
 of range. Presumably, if you don't trust Kafka to reset the offset, then
 you can always turn this off
 and use commit/commitAsync and seek() to set the consumer to the right
 offset on startup and every
 time your consumer falls out of range.

 Does that make sense?

 5. ENABLE_JMX could use Java docs, 

Re: New Consumer API discussion

2014-03-24 Thread Neha Narkhede
I took some time to write some example code using the new consumer APIs to
cover a range of use cases. This exercise was very useful (thanks for the
suggestion, Jay!) since I found several improvements to the APIs to make
them more usable. Here are some of the
changeshttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/I
made -

1. Added usage examples to the KafkaConsumer
javadochttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html.
I find it useful for the examples to be in the javadoc vs some wiki. Please
go through these examples and suggest improvements. The goal would be to
document a limited set of examples that cover every major use case.
2. All APIs that either accept or return offsets are changed to
MapTopicPartition,Long instead of TopicPartitionOffset... In all the
examples that I wrote, it was much easier to deal with offsets and pass
them around in the consumer APIs if they were maps instead of lists
3. Due to the above change, I had to introduce
commit()http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html#commit%28%29and
commitAsync() APIs explicitly, in addition to
commit(MapTopicPartition,Long offsets) and
commitAsync(MapTopicPartition,Long offsets), since the no-argument case
would not be covered automatically with Map as the input parameter to the
commit APIs
4. Offset rewind logic is funky with group management. I took a stab and it
and wrote examples to cover the various offset rewind uses cases I could
think of. I'm not so sure I like it, so I encourage people to take a look
at the examples and provide feedback. This feedback is very critical in
finalizing the consumer APIs as we might have to add/change APIs to make
offset rewind intuitive and easy to use. (Please see the 3rd and 4th
examples 
herehttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
)

Once I have feedback on the above, I will go ahead and submit a review
board for the new APIs and javadoc.

Thanks
Neha


On Mon, Mar 24, 2014 at 5:29 PM, Neha Narkhede neha.narkh...@gmail.comwrote:

 Hey Chris,

 Really sorry for the late reply, wonder how this fell through the cracks.
 Anyhow, thanks for the great feedback! Here are my comments -


 1. Why is the config String-Object instead of String-String?

 This is probably more of a feedback about the new config management that
 we adopted in the new clients. I think it is more convenient to write
 configs.put(a, 42);
 instead of
 configs.put(a, Integer.toString(42));

 2. Are these Java docs correct?

   KafkaConsumer(java.util.Map
 java.lang.String,java.lang.Object configs)
   A consumer is instantiated by providing a set of key-value pairs as
 configuration and a ConsumerRebalanceCallback implementation

 There is no ConsumerRebalanceCallback parameter.

 Fixed.


 3. Would like to have a method:

   poll(long timeout, java.util.concurrent.TimeUnit timeUnit,
 TopicPartition... topicAndPartitionsToPoll)

 I see I can effectively do this by just fiddling with subscribe and
 unsubscribe before each poll. Is this a low-overhead operation? Can I just
 unsubscribe from everything after each poll, then re-subscribe to a topic
 the next iteration. I would probably be doing this in a fairly tight loop.

 The subscribe and unsubscribe will be very lightweight in-memory
 operations,
 so it shouldn't be a problem to just use those APIs directly.
 Let me know if you think otherwise.

 4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there
 are use cases for decoupling what to do when no offset exists from what
 to do when I'm out of range. I might want to start from smallest the
 first time I run, but fail if I ever get offset out of range.

 How about adding a third option disable to auto.offset.reset?
 What this says is that never automatically reset the offset, either if one
 is not found or if the offset
 falls out of range. Presumably, you would want to turn this off when you
 want to control the offsets
 yourself and use custom rewind/replay logic to reset the consumer's
 offset. In this case, you would
 want to turn this feature off so Kafka does not accidentally reset the
 offset to something else.

 I'm not so sure when you would want to make the distinction regarding
 startup and offset falling out
 of range. Presumably, if you don't trust Kafka to reset the offset, then
 you can always turn this off
 and use commit/commitAsync and seek() to set the consumer to the right
 offset on startup and every
 time your consumer falls out of range.

 Does that make sense?

 5. ENABLE_JMX could use Java docs, even though it's fairly
 self-explanatory.

 Fixed.

 6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or
 across all topic/partitions is useful. I believe it's per-topic/partition,
 right? That is, setting to 2 megs with two TopicAndPartitions would result

Re: New Consumer API discussion

2014-03-17 Thread Neha Narkhede
I'm not quite sure if I fully understood your question. The consumer API
exposes a close() method that will shutdown the consumer's connections to
all brokers and frees up resources that the consumer uses.

I've updated the javadoc for the new consumer API to include a few examples
of different ways of using the consumer. Probably you might find it useful
-
http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html

Thanks,
Neha


On Sun, Mar 16, 2014 at 7:55 PM, Shanmugam, Srividhya 
srividhyashanmu...@fico.com wrote:

 Can the consumer API provide a way to shut down the connector by doing a
 look up by the consumer group Id? For example, application may be consuming
 the messages in one thread whereas the shutdown call can  be initiated in a
 different thread.

 This email and any files transmitted with it are confidential, proprietary
 and intended solely for the individual or entity to whom they are
 addressed. If you have received this email in error please delete it
 immediately.



Re: New Consumer API discussion

2014-03-16 Thread Shanmugam, Srividhya
Can the consumer API provide a way to shut down the connector by doing a look 
up by the consumer group Id? For example, application may be consuming the 
messages in one thread whereas the shutdown call can  be initiated in a 
different thread.

This email and any files transmitted with it are confidential, proprietary and 
intended solely for the individual or entity to whom they are addressed. If you 
have received this email in error please delete it immediately.


Re: New Consumer API discussion

2014-03-03 Thread Chris Riccomini
Hey Guys,

Sorry for the late follow up. Here are my questions/thoughts on the API:

1. Why is the config String-Object instead of String-String?

2. Are these Java docs correct?

  KafkaConsumer(java.util.Mapjava.lang.String,java.lang.Object configs)
  A consumer is instantiated by providing a set of key-value pairs as
configuration and a ConsumerRebalanceCallback implementation

There is no ConsumerRebalanceCallback parameter.

3. Would like to have a method:

  poll(long timeout, java.util.concurrent.TimeUnit timeUnit,
TopicPartition... topicAndPartitionsToPoll)

I see I can effectively do this by just fiddling with subscribe and
unsubscribe before each poll. Is this a low-overhead operation? Can I just
unsubscribe from everything after each poll, then re-subscribe to a topic
the next iteration. I would probably be doing this in a fairly tight loop.

4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there
are use cases for decoupling what to do when no offset exists from what
to do when I'm out of range. I might want to start from smallest the
first time I run, but fail if I ever get offset out of range.

5. ENABLE_JMX could use Java docs, even though it's fairly
self-explanatory.

6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or
across all topic/partitions is useful. I believe it's per-topic/partition,
right? That is, setting to 2 megs with two TopicAndPartitions would result
in 4 megs worth of data coming in per fetch, right?

7. What does the consumer do if METADATA_FETCH_TIMEOUT_CONFIG times out?
Retry, or throw exception?

8. Does RECONNECT_BACKOFF_MS_CONFIG apply to both metadata requests and
fetch requests?

9. What does SESSION_TIMEOUT_MS default to?

10. Is this consumer thread-safe?

11. How do you use a different offset management strategy? Your email
implies that it's pluggable, but I don't see how. The offset management
strategy defaults to Kafka based offset management and the API provides a
way for the user to use a customized offset store to manage the consumer's
offsets.

12. If I wish to decouple the consumer from the offset checkpointing, is
it OK to use Joel's offset management stuff directly, rather than through
the consumer's commit API?


Cheers,
Chris

On 2/10/14 10:54 AM, Neha Narkhede neha.narkh...@gmail.com wrote:

As mentioned in previous emails, we are also working on a
re-implementation
of the consumer. I would like to use this email thread to discuss the
details of the public API. I would also like us to be picky about this
public api now so it is as good as possible and we don't need to break it
in the future.

The best way to get a feel for the API is actually to take a look at the
javadochttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/
doc/kafka/clients/consumer/KafkaConsumer.html,
the hope is to get the api docs good enough so that it is
self-explanatory.
You can also take a look at the configs
herehttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc
/kafka/clients/consumer/ConsumerConfig.html

Some background info on implementation:

At a high level the primary difference in this consumer is that it removes
the distinction between the high-level and low-level consumer. The new
consumer API is non blocking and instead of returning a blocking iterator,
the consumer provides a poll() API that returns a list of records. We
think
this is better compared to the blocking iterators since it effectively
decouples the threading strategy used for processing messages from the
consumer. It is worth noting that the consumer is entirely single threaded
and runs in the user thread. The advantage is that it can be easily
rewritten in less multi-threading-friendly languages. The consumer batches
data and multiplexes I/O over TCP connections to each of the brokers it
communicates with, for high throughput. The consumer also allows long poll
to reduce the end-to-end message latency for low throughput data.

The consumer provides a group management facility that supports the
concept
of a group with multiple consumer instances (just like the current
consumer). This is done through a custom heartbeat and group management
protocol transparent to the user. At the same time, it allows users the
option to subscribe to a fixed set of partitions and not use group
management at all. The offset management strategy defaults to Kafka based
offset management and the API provides a way for the user to use a
customized offset store to manage the consumer's offsets.

A key difference in this consumer also is the fact that it does not depend
on zookeeper at all.

More details about the new consumer design are
herehttps://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+
Rewrite+Design

Please take a look at the new
APIhttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/
kafka/clients/consumer/KafkaConsumer.htmland
give us any thoughts you may have.

Thanks,
Neha



Re: New Consumer API discussion

2014-03-03 Thread Chris Riccomini
Hey Guys,

Also, for reference, we'll be looking to implement new Samza consumers
which have these APIs:

http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/or
g/apache/samza/system/SystemConsumer.html

http://samza.incubator.apache.org/learn/documentation/0.7.0/api/javadocs/or
g/apache/samza/checkpoint/CheckpointManager.html


Question (3) below is a result of having Samza's SystemConsumers poll
allow specific topic/partitions to be specified.

The split between consumer and checkpoint manager is the reason for
question (12) below.

Cheers,
Chris

On 3/3/14 10:19 AM, Chris Riccomini criccom...@linkedin.com wrote:

Hey Guys,

Sorry for the late follow up. Here are my questions/thoughts on the API:

1. Why is the config String-Object instead of String-String?

2. Are these Java docs correct?

  KafkaConsumer(java.util.Mapjava.lang.String,java.lang.Object configs)
  A consumer is instantiated by providing a set of key-value pairs as
configuration and a ConsumerRebalanceCallback implementation

There is no ConsumerRebalanceCallback parameter.

3. Would like to have a method:

  poll(long timeout, java.util.concurrent.TimeUnit timeUnit,
TopicPartition... topicAndPartitionsToPoll)

I see I can effectively do this by just fiddling with subscribe and
unsubscribe before each poll. Is this a low-overhead operation? Can I just
unsubscribe from everything after each poll, then re-subscribe to a topic
the next iteration. I would probably be doing this in a fairly tight loop.

4. The behavior of AUTO_OFFSET_RESET_CONFIG is overloaded. I think there
are use cases for decoupling what to do when no offset exists from what
to do when I'm out of range. I might want to start from smallest the
first time I run, but fail if I ever get offset out of range.

5. ENABLE_JMX could use Java docs, even though it's fairly
self-explanatory.

6. Clarity about whether FETCH_BUFFER_CONFIG is per-topic/partition, or
across all topic/partitions is useful. I believe it's per-topic/partition,
right? That is, setting to 2 megs with two TopicAndPartitions would result
in 4 megs worth of data coming in per fetch, right?

7. What does the consumer do if METADATA_FETCH_TIMEOUT_CONFIG times out?
Retry, or throw exception?

8. Does RECONNECT_BACKOFF_MS_CONFIG apply to both metadata requests and
fetch requests?

9. What does SESSION_TIMEOUT_MS default to?

10. Is this consumer thread-safe?

11. How do you use a different offset management strategy? Your email
implies that it's pluggable, but I don't see how. The offset management
strategy defaults to Kafka based offset management and the API provides a
way for the user to use a customized offset store to manage the consumer's
offsets.

12. If I wish to decouple the consumer from the offset checkpointing, is
it OK to use Joel's offset management stuff directly, rather than through
the consumer's commit API?


Cheers,
Chris

On 2/10/14 10:54 AM, Neha Narkhede neha.narkh...@gmail.com wrote:

As mentioned in previous emails, we are also working on a
re-implementation
of the consumer. I would like to use this email thread to discuss the
details of the public API. I would also like us to be picky about this
public api now so it is as good as possible and we don't need to break it
in the future.

The best way to get a feel for the API is actually to take a look at the
javadochttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc
/
doc/kafka/clients/consumer/KafkaConsumer.html,
the hope is to get the api docs good enough so that it is
self-explanatory.
You can also take a look at the configs
herehttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/do
c
/kafka/clients/consumer/ConsumerConfig.html

Some background info on implementation:

At a high level the primary difference in this consumer is that it
removes
the distinction between the high-level and low-level consumer. The
new
consumer API is non blocking and instead of returning a blocking
iterator,
the consumer provides a poll() API that returns a list of records. We
think
this is better compared to the blocking iterators since it effectively
decouples the threading strategy used for processing messages from the
consumer. It is worth noting that the consumer is entirely single
threaded
and runs in the user thread. The advantage is that it can be easily
rewritten in less multi-threading-friendly languages. The consumer
batches
data and multiplexes I/O over TCP connections to each of the brokers it
communicates with, for high throughput. The consumer also allows long
poll
to reduce the end-to-end message latency for low throughput data.

The consumer provides a group management facility that supports the
concept
of a group with multiple consumer instances (just like the current
consumer). This is done through a custom heartbeat and group management
protocol transparent to the user. At the same time, it allows users the
option to subscribe to a fixed set of partitions and not use group
management at all. The 

Re: New Consumer API discussion

2014-02-28 Thread Neha Narkhede
 improvements to the docs
  
  
  
  
 
 http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29
  
  
  
   3.commit(): The following comment in the doc should probably
 say
   commit
   offsets for partitions assigned to this consumer.
  
   If no partitions are specified, commits offsets for the
  subscribed
   list
   of
   topics and partitions to Kafka.
  
   Could you give more context on this suggestion? Here is the
  entire
   doc
   -
  
   Synchronously commits the specified offsets for the specified
  list
   of
   topics and partitions to *Kafka*. If no partitions are
  specified,
   commits offsets for the subscribed list of topics and
  partitions.
  
   The hope is to convey that if no partitions are specified,
  offsets
   will
   be committed for the subscribed list of partitions. One
   improvement
   could
   be to
   explicitly state that the offsets returned on the last poll
 will
   be
   committed. I updated this to -
  
   Synchronously commits the specified offsets for the specified
  list
   of
   topics and partitions to *Kafka*. If no offsets are specified,
   commits
   offsets returned on the last {@link #poll(long, TimeUnit)
  poll()}
   for
   the subscribed list of topics and partitions.
  
   4. There is inconsistency in specifying partitions. Sometimes
 we
   use
   TopicPartition and some other times we use String and int (see
   examples below).
  
   void onPartitionsAssigned(Consumer consumer,
   TopicPartition...partitions)
  
   public void *subscribe*(java.lang.String topic, int...
  partitions)
  
   Yes, this was discussed previously. I think generally the
   consensus
   seems to be to use the higher level
   classes everywhere. Made those changes.
  
   What's the use case of position()? Isn't that just the
   nextOffset()
   on
   the
   last message returned from poll()?
  
   Yes, except in the case where a rebalance is triggered and
  poll()
   is
   not
   yet invoked. Here, you would use position() to get the new
 fetch
   position
   for the specific partition. Even if this is not a common use
  case,
   IMO
   it
   is much easier to use position() to get the fetch offset than
   invoking
   nextOffset() on the last message. This also keeps the APIs
   symmetric,
   which
   is nice.
  
  
  
  
   On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert 
   robert.with...@dish.com wrote:
  
   That's wonderful.  Thanks for kafka.
  
   Rob
  
   On Feb 24, 2014, at 9:58 AM, Guozhang Wang 
 wangg...@gmail.com
   mailto:
   wangg...@gmail.com wrote:
  
   Hi Robert,
  
   Yes, you can check out the callback functions in the new API
  
   onPartitionDesigned
   onPartitionAssigned
  
   and see if they meet your needs.
  
   Guozhang
  
  
   On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert 
   robert.with...@dish.commailto:robert.with...@dish.com
 wrote:
  
   Jun,
  
   Are you saying it is possible to get events from the
 high-level
   consumer
   regarding various state machine changes?  For instance, can
 we
   get
   a
   notification when a rebalance starts and ends, when a
 partition
   is
   assigned/unassigned, when an offset is committed on a
  partition,
   when
   a
   leader changes and so on?  I call this OOB traffic, since
 they
   are
   not
   the
   core messages streaming, but side-band events, yet they are
  still
   potentially useful to consumers.
  
   Thank you,
   Robert
  
  
   Robert Withers
   Staff Analyst/Developer
   o: (720) 514-8963
   c:  (571) 262-1873
  
  
  
   -Original Message-
   From: Jun Rao [mailto:jun...@gmail.com]
   Sent: Sunday, February 23, 2014 4:19 PM
   To: users@kafka.apache.orgmailto:users@kafka.apache.org
   Subject: Re: New Consumer API discussion
  
   Robert,
  
   For the push orient api, you can potentially implement your
 own
   MessageHandler with those methods. In the main loop of our
 new
   consumer
   api, you can just call those methods based on the events you
  get.
  
   Also, we already have an api to get the first and the last
  offset
   of a
   partition (getOffsetBefore).
  
   Thanks,
  
   Jun
  
  
   On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert
   robert.with...@dish.commailto:robert.with...@dish.com
  wrote:
  
   This is a good idea, too.  I would modify it to include
 stream
   marking, then you can have:
  
   long end = consumer.lastOffset(tp);
   consumer.setMark(end);
   while(consumer.beforeMark()) {
   process(consumer.pollToMark());
   }
  
   or
  
   long end = consumer.lastOffset(tp);
   consumer.setMark(end);
   for(Object msg : consumer.iteratorToMark()) {
   process(msg);
   }
  
   I actually have 4 suggestions, then:
  
   *   pull: stream marking
   *   pull: finite streams, bound by time range (up-to-now,
   yesterday)
   or
   offset
   *   pull: async api
   *   push: KafkaMessageSource, for a push model, with msg and
  OOB
   events.
   Build one in either individual or chunk

Re: New Consumer API discussion

2014-02-27 Thread Neha Narkhede
 be made for arguably the more common use case
  of
  subscribing to a single topic as well. In these cases, user is
  required
  to write more
  code to create a single item collection and pass it in. Since
  subscription is extremely lightweight
  invoking it multiple times also seems like a workable solution, no?
 
  2. It would be good to document that the following apis are mutually
  exclusive. Also, if the partition level subscription is specified,
  there
  is
  no group management. Finally, unsubscribe() can only be used to
  cancel
  subscriptions with the same pattern. For example, you can't
  unsubscribe
  at
  the partition level if the subscription is done at the topic level.
 
  *subscribe*(java.lang.String... topics)
  *subscribe*(java.lang.String topic, int... partitions)
 
  Makes sense. Made the suggested improvements to the docs
 
 
 http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29
 
 
 
  3.commit(): The following comment in the doc should probably say
  commit
  offsets for partitions assigned to this consumer.
 
  If no partitions are specified, commits offsets for the subscribed
  list
  of
  topics and partitions to Kafka.
 
  Could you give more context on this suggestion? Here is the entire
  doc
  -
 
  Synchronously commits the specified offsets for the specified list
 of
  topics and partitions to *Kafka*. If no partitions are specified,
  commits offsets for the subscribed list of topics and partitions.
 
  The hope is to convey that if no partitions are specified, offsets
  will
  be committed for the subscribed list of partitions. One improvement
  could
  be to
  explicitly state that the offsets returned on the last poll will be
  committed. I updated this to -
 
  Synchronously commits the specified offsets for the specified list
 of
  topics and partitions to *Kafka*. If no offsets are specified,
  commits
  offsets returned on the last {@link #poll(long, TimeUnit) poll()}
 for
  the subscribed list of topics and partitions.
 
  4. There is inconsistency in specifying partitions. Sometimes we use
  TopicPartition and some other times we use String and int (see
  examples below).
 
  void onPartitionsAssigned(Consumer consumer,
  TopicPartition...partitions)
 
  public void *subscribe*(java.lang.String topic, int... partitions)
 
  Yes, this was discussed previously. I think generally the consensus
  seems to be to use the higher level
  classes everywhere. Made those changes.
 
  What's the use case of position()? Isn't that just the nextOffset()
  on
  the
  last message returned from poll()?
 
  Yes, except in the case where a rebalance is triggered and poll() is
  not
  yet invoked. Here, you would use position() to get the new fetch
  position
  for the specific partition. Even if this is not a common use case,
  IMO
  it
  is much easier to use position() to get the fetch offset than
  invoking
  nextOffset() on the last message. This also keeps the APIs
 symmetric,
  which
  is nice.
 
 
 
 
  On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert 
  robert.with...@dish.com wrote:
 
  That's wonderful.  Thanks for kafka.
 
  Rob
 
  On Feb 24, 2014, at 9:58 AM, Guozhang Wang wangg...@gmail.com
  mailto:
  wangg...@gmail.com wrote:
 
  Hi Robert,
 
  Yes, you can check out the callback functions in the new API
 
  onPartitionDesigned
  onPartitionAssigned
 
  and see if they meet your needs.
 
  Guozhang
 
 
  On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert 
  robert.with...@dish.commailto:robert.with...@dish.comwrote:
 
  Jun,
 
  Are you saying it is possible to get events from the high-level
  consumer
  regarding various state machine changes?  For instance, can we get
 a
  notification when a rebalance starts and ends, when a partition is
  assigned/unassigned, when an offset is committed on a partition,
  when
  a
  leader changes and so on?  I call this OOB traffic, since they are
  not
  the
  core messages streaming, but side-band events, yet they are still
  potentially useful to consumers.
 
  Thank you,
  Robert
 
 
  Robert Withers
  Staff Analyst/Developer
  o: (720) 514-8963
  c:  (571) 262-1873
 
 
 
  -Original Message-
  From: Jun Rao [mailto:jun...@gmail.com]
  Sent: Sunday, February 23, 2014 4:19 PM
  To: users@kafka.apache.orgmailto:users@kafka.apache.org
  Subject: Re: New Consumer API discussion
 
  Robert,
 
  For the push orient api, you can potentially implement your own
  MessageHandler with those methods. In the main loop of our new
  consumer
  api, you can just call those methods based on the events you get.
 
  Also, we already have an api to get the first and the last offset
  of a
  partition (getOffsetBefore).
 
  Thanks,
 
  Jun
 
 
  On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert
  robert.with...@dish.commailto:robert.with...@dish.comwrote:
 
  This is a good idea, too.  I would modify it to include stream
  marking, then you can have

Re: New Consumer API discussion

2014-02-27 Thread Robert Withers
 a bit harder. Suppose that you have a list of
 topics
 stored in
 
 ArrayListString topics;
 
 If you want subscribe to all topics in one call, you will have to
 do:
 
 String[] topicArray = new String[topics.size()];
 consumer.subscribe(topics.
 toArray(topicArray));
 
 A similar argument can be made for arguably the more common use case
 of
 subscribing to a single topic as well. In these cases, user is
 required
 to write more
 code to create a single item collection and pass it in. Since
 subscription is extremely lightweight
 invoking it multiple times also seems like a workable solution, no?
 
 2. It would be good to document that the following apis are mutually
 exclusive. Also, if the partition level subscription is specified,
 there
 is
 no group management. Finally, unsubscribe() can only be used to
 cancel
 subscriptions with the same pattern. For example, you can't
 unsubscribe
 at
 the partition level if the subscription is done at the topic level.
 
 *subscribe*(java.lang.String... topics)
 *subscribe*(java.lang.String topic, int... partitions)
 
 Makes sense. Made the suggested improvements to the docs
 
 
 http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29
 
 
 
 3.commit(): The following comment in the doc should probably say
 commit
 offsets for partitions assigned to this consumer.
 
 If no partitions are specified, commits offsets for the subscribed
 list
 of
 topics and partitions to Kafka.
 
 Could you give more context on this suggestion? Here is the entire
 doc
 -
 
 Synchronously commits the specified offsets for the specified list
 of
 topics and partitions to *Kafka*. If no partitions are specified,
 commits offsets for the subscribed list of topics and partitions.
 
 The hope is to convey that if no partitions are specified, offsets
 will
 be committed for the subscribed list of partitions. One improvement
 could
 be to
 explicitly state that the offsets returned on the last poll will be
 committed. I updated this to -
 
 Synchronously commits the specified offsets for the specified list
 of
 topics and partitions to *Kafka*. If no offsets are specified,
 commits
 offsets returned on the last {@link #poll(long, TimeUnit) poll()}
 for
 the subscribed list of topics and partitions.
 
 4. There is inconsistency in specifying partitions. Sometimes we use
 TopicPartition and some other times we use String and int (see
 examples below).
 
 void onPartitionsAssigned(Consumer consumer,
 TopicPartition...partitions)
 
 public void *subscribe*(java.lang.String topic, int... partitions)
 
 Yes, this was discussed previously. I think generally the consensus
 seems to be to use the higher level
 classes everywhere. Made those changes.
 
 What's the use case of position()? Isn't that just the nextOffset()
 on
 the
 last message returned from poll()?
 
 Yes, except in the case where a rebalance is triggered and poll() is
 not
 yet invoked. Here, you would use position() to get the new fetch
 position
 for the specific partition. Even if this is not a common use case,
 IMO
 it
 is much easier to use position() to get the fetch offset than
 invoking
 nextOffset() on the last message. This also keeps the APIs
 symmetric,
 which
 is nice.
 
 
 
 
 On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert 
 robert.with...@dish.com wrote:
 
 That's wonderful.  Thanks for kafka.
 
 Rob
 
 On Feb 24, 2014, at 9:58 AM, Guozhang Wang wangg...@gmail.com
 mailto:
 wangg...@gmail.com wrote:
 
 Hi Robert,
 
 Yes, you can check out the callback functions in the new API
 
 onPartitionDesigned
 onPartitionAssigned
 
 and see if they meet your needs.
 
 Guozhang
 
 
 On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert 
 robert.with...@dish.commailto:robert.with...@dish.comwrote:
 
 Jun,
 
 Are you saying it is possible to get events from the high-level
 consumer
 regarding various state machine changes?  For instance, can we get
 a
 notification when a rebalance starts and ends, when a partition is
 assigned/unassigned, when an offset is committed on a partition,
 when
 a
 leader changes and so on?  I call this OOB traffic, since they are
 not
 the
 core messages streaming, but side-band events, yet they are still
 potentially useful to consumers.
 
 Thank you,
 Robert
 
 
 Robert Withers
 Staff Analyst/Developer
 o: (720) 514-8963
 c:  (571) 262-1873
 
 
 
 -Original Message-
 From: Jun Rao [mailto:jun...@gmail.com]
 Sent: Sunday, February 23, 2014 4:19 PM
 To: users@kafka.apache.orgmailto:users@kafka.apache.org
 Subject: Re: New Consumer API discussion
 
 Robert,
 
 For the push orient api, you can potentially implement your own
 MessageHandler with those methods. In the main loop of our new
 consumer
 api, you can just call those methods based on the events you get.
 
 Also, we already have an api to get the first and the last offset
 of a
 partition (getOffsetBefore).
 
 Thanks,
 
 Jun
 
 
 On Sat, Feb 22, 2014 at 11:29 AM

Re: New Consumer API discussion

2014-02-27 Thread Neha Narkhede
, since they
 are
  not
  the
  core messages streaming, but side-band events, yet they are still
  potentially useful to consumers.
 
  Thank you,
  Robert
 
 
  Robert Withers
  Staff Analyst/Developer
  o: (720) 514-8963
  c:  (571) 262-1873
 
 
 
  -Original Message-
  From: Jun Rao [mailto:jun...@gmail.com]
  Sent: Sunday, February 23, 2014 4:19 PM
  To: users@kafka.apache.orgmailto:users@kafka.apache.org
  Subject: Re: New Consumer API discussion
 
  Robert,
 
  For the push orient api, you can potentially implement your own
  MessageHandler with those methods. In the main loop of our new
  consumer
  api, you can just call those methods based on the events you get.
 
  Also, we already have an api to get the first and the last offset
  of a
  partition (getOffsetBefore).
 
  Thanks,
 
  Jun
 
 
  On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert
  robert.with...@dish.commailto:robert.with...@dish.comwrote:
 
  This is a good idea, too.  I would modify it to include stream
  marking, then you can have:
 
  long end = consumer.lastOffset(tp);
  consumer.setMark(end);
  while(consumer.beforeMark()) {
  process(consumer.pollToMark());
  }
 
  or
 
  long end = consumer.lastOffset(tp);
  consumer.setMark(end);
  for(Object msg : consumer.iteratorToMark()) {
  process(msg);
  }
 
  I actually have 4 suggestions, then:
 
  *   pull: stream marking
  *   pull: finite streams, bound by time range (up-to-now,
  yesterday)
  or
  offset
  *   pull: async api
  *   push: KafkaMessageSource, for a push model, with msg and OOB
  events.
  Build one in either individual or chunk mode and have a listener
  for
  each msg or a listener for a chunk of msgs.  Make it composable
 and
  policy driven (chunked, range, commitOffsets policy, retry
 policy,
  transactional)
 
  Thank you,
  Robert
 
  On Feb 22, 2014, at 11:21 AM, Jay Kreps jay.kr...@gmail.com
  mailto:
  jay.kr...@gmail.commailto:
  jay.kr...@gmail.commailto:jay.kr...@gmail.com wrote:
 
  I think what Robert is saying is that we need to think through
 the
  offset API to enable batch processing of topic data. Think of a
  process that periodically kicks off to compute a data summary or
 do
  a
  data load or something like that. I think what we need to support
  this
  is an api to fetch the last offset from the server for a
 partition.
  Something like
  long lastOffset(TopicPartition tp)
  and for symmetry
  long firstOffset(TopicPartition tp)
 
  Likely this would have to be batched. Essentially we should add
  this
  use case to our set of code examples to write and think through.
 
  The usage would be something like
 
  long end = consumer.lastOffset(tp);
  while(consumer.position  end)
  process(consumer.poll());
 
  -Jay
 
 
  On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert
  robert.with...@dish.commailto:robert.with...@dish.com
  mailto:robert.with...@dish.comwrote:
 
  Jun,
 
  I was originally thinking a non-blocking read from a distributed
  stream should distinguish between no local messages, but a fetch
  is
  occurring
  versus you have drained the stream.  The reason this may be
  valuable
  to me is so I can write consumers that read all known traffic
 then
  terminate.
  You caused me to reconsider and I think I am conflating 2 things.
  One
  is a sync/async api while the other is whether to have an
 infinite
  or
  finite stream.  Is it possible to build a finite KafkaStream on a
  range of messages?
 
  Perhaps a Simple Consumer would do just fine and then I could
 start
  off getting the writeOffset from zookeeper and tell it to read a
  specified range per partition.  I've done this and forked a
 simple
  consumer runnable for each partition, for one of our analyzers.
  The
  great thing about the high-level consumer is that rebalance, so I
  can
  fork however many stream readers I want and you just figure it
 out
  for
  me.  In that way you offer us the control over the resource
  consumption within a pull model.  This is best to regulate
 message
  pressure, they say.
 
  Combining that high-level rebalance ability with a ranged
 partition
  drain could be really nice...build the stream with an ending
  position
  and it is a finite stream, but retain the high-level rebalance.
  With
  a finite stream, you would know the difference of the 2 async
  scenarios: fetch-in-progress versus end-of-stream.  With an
  infinite
  stream, you never get end-of-stream.
 
  Aside from a high-level consumer over a finite range within each
  partition, the other feature I can think of is more complicated.
  A
  high-level consumer has state machine changes that the client
  cannot
  access, to my knowledge.  Our use of kafka has us invoke a
 message
  handler with each message we consumer from the KafkaStream, so we
  convert a pull-model to a push-model.  Including the idea of
  receiving
  notifications from state machine changes, what would be really
 nice
  is
  to have a KafkaMessageSource, that is an eventful push model.  If
  it
  were

Re: New Consumer API discussion

2014-02-27 Thread Robert Withers
 starts and ends, when a partition
 is
 assigned/unassigned, when an offset is committed on a partition,
 when
 a
 leader changes and so on?  I call this OOB traffic, since they
 are
 not
 the
 core messages streaming, but side-band events, yet they are still
 potentially useful to consumers.
 
 Thank you,
 Robert
 
 
 Robert Withers
 Staff Analyst/Developer
 o: (720) 514-8963
 c:  (571) 262-1873
 
 
 
 -Original Message-
 From: Jun Rao [mailto:jun...@gmail.com]
 Sent: Sunday, February 23, 2014 4:19 PM
 To: users@kafka.apache.orgmailto:users@kafka.apache.org
 Subject: Re: New Consumer API discussion
 
 Robert,
 
 For the push orient api, you can potentially implement your own
 MessageHandler with those methods. In the main loop of our new
 consumer
 api, you can just call those methods based on the events you get.
 
 Also, we already have an api to get the first and the last offset
 of a
 partition (getOffsetBefore).
 
 Thanks,
 
 Jun
 
 
 On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert
 robert.with...@dish.commailto:robert.with...@dish.comwrote:
 
 This is a good idea, too.  I would modify it to include stream
 marking, then you can have:
 
 long end = consumer.lastOffset(tp);
 consumer.setMark(end);
 while(consumer.beforeMark()) {
 process(consumer.pollToMark());
 }
 
 or
 
 long end = consumer.lastOffset(tp);
 consumer.setMark(end);
 for(Object msg : consumer.iteratorToMark()) {
 process(msg);
 }
 
 I actually have 4 suggestions, then:
 
 *   pull: stream marking
 *   pull: finite streams, bound by time range (up-to-now,
 yesterday)
 or
 offset
 *   pull: async api
 *   push: KafkaMessageSource, for a push model, with msg and OOB
 events.
 Build one in either individual or chunk mode and have a listener
 for
 each msg or a listener for a chunk of msgs.  Make it composable
 and
 policy driven (chunked, range, commitOffsets policy, retry
 policy,
 transactional)
 
 Thank you,
 Robert
 
 On Feb 22, 2014, at 11:21 AM, Jay Kreps jay.kr...@gmail.com
 mailto:
 jay.kr...@gmail.commailto:
 jay.kr...@gmail.commailto:jay.kr...@gmail.com wrote:
 
 I think what Robert is saying is that we need to think through
 the
 offset API to enable batch processing of topic data. Think of a
 process that periodically kicks off to compute a data summary or
 do
 a
 data load or something like that. I think what we need to support
 this
 is an api to fetch the last offset from the server for a
 partition.
 Something like
 long lastOffset(TopicPartition tp)
 and for symmetry
 long firstOffset(TopicPartition tp)
 
 Likely this would have to be batched. Essentially we should add
 this
 use case to our set of code examples to write and think through.
 
 The usage would be something like
 
 long end = consumer.lastOffset(tp);
 while(consumer.position  end)
 process(consumer.poll());
 
 -Jay
 
 
 On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert
 robert.with...@dish.commailto:robert.with...@dish.com
 mailto:robert.with...@dish.comwrote:
 
 Jun,
 
 I was originally thinking a non-blocking read from a distributed
 stream should distinguish between no local messages, but a fetch
 is
 occurring
 versus you have drained the stream.  The reason this may be
 valuable
 to me is so I can write consumers that read all known traffic
 then
 terminate.
 You caused me to reconsider and I think I am conflating 2 things.
 One
 is a sync/async api while the other is whether to have an
 infinite
 or
 finite stream.  Is it possible to build a finite KafkaStream on a
 range of messages?
 
 Perhaps a Simple Consumer would do just fine and then I could
 start
 off getting the writeOffset from zookeeper and tell it to read a
 specified range per partition.  I've done this and forked a
 simple
 consumer runnable for each partition, for one of our analyzers.
 The
 great thing about the high-level consumer is that rebalance, so I
 can
 fork however many stream readers I want and you just figure it
 out
 for
 me.  In that way you offer us the control over the resource
 consumption within a pull model.  This is best to regulate
 message
 pressure, they say.
 
 Combining that high-level rebalance ability with a ranged
 partition
 drain could be really nice...build the stream with an ending
 position
 and it is a finite stream, but retain the high-level rebalance.
 With
 a finite stream, you would know the difference of the 2 async
 scenarios: fetch-in-progress versus end-of-stream.  With an
 infinite
 stream, you never get end-of-stream.
 
 Aside from a high-level consumer over a finite range within each
 partition, the other feature I can think of is more complicated.
 A
 high-level consumer has state machine changes that the client
 cannot
 access, to my knowledge.  Our use of kafka has us invoke a
 message
 handler with each message we consumer from the KafkaStream, so we
 convert a pull-model to a push-model.  Including the idea of
 receiving
 notifications from state machine changes, what would be really
 nice
 is
 to have a KafkaMessageSource

Re: New Consumer API discussion

2014-02-26 Thread Robert Withers
 only be used to
 cancel
 subscriptions with the same pattern. For example, you can't
 unsubscribe
 at
 the partition level if the subscription is done at the topic level.
 
 *subscribe*(java.lang.String... topics)
 *subscribe*(java.lang.String topic, int... partitions)
 
 Makes sense. Made the suggested improvements to the docs
 
 http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29
 
 
 
 3.commit(): The following comment in the doc should probably say
 commit
 offsets for partitions assigned to this consumer.
 
 If no partitions are specified, commits offsets for the subscribed
 list
 of
 topics and partitions to Kafka.
 
 Could you give more context on this suggestion? Here is the entire
 doc
 -
 
 Synchronously commits the specified offsets for the specified list of
 topics and partitions to *Kafka*. If no partitions are specified,
 commits offsets for the subscribed list of topics and partitions.
 
 The hope is to convey that if no partitions are specified, offsets
 will
 be committed for the subscribed list of partitions. One improvement
 could
 be to
 explicitly state that the offsets returned on the last poll will be
 committed. I updated this to -
 
 Synchronously commits the specified offsets for the specified list of
 topics and partitions to *Kafka*. If no offsets are specified,
 commits
 offsets returned on the last {@link #poll(long, TimeUnit) poll()} for
 the subscribed list of topics and partitions.
 
 4. There is inconsistency in specifying partitions. Sometimes we use
 TopicPartition and some other times we use String and int (see
 examples below).
 
 void onPartitionsAssigned(Consumer consumer,
 TopicPartition...partitions)
 
 public void *subscribe*(java.lang.String topic, int... partitions)
 
 Yes, this was discussed previously. I think generally the consensus
 seems to be to use the higher level
 classes everywhere. Made those changes.
 
 What's the use case of position()? Isn't that just the nextOffset()
 on
 the
 last message returned from poll()?
 
 Yes, except in the case where a rebalance is triggered and poll() is
 not
 yet invoked. Here, you would use position() to get the new fetch
 position
 for the specific partition. Even if this is not a common use case,
 IMO
 it
 is much easier to use position() to get the fetch offset than
 invoking
 nextOffset() on the last message. This also keeps the APIs symmetric,
 which
 is nice.
 
 
 
 
 On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert 
 robert.with...@dish.com wrote:
 
 That's wonderful.  Thanks for kafka.
 
 Rob
 
 On Feb 24, 2014, at 9:58 AM, Guozhang Wang wangg...@gmail.com
 mailto:
 wangg...@gmail.com wrote:
 
 Hi Robert,
 
 Yes, you can check out the callback functions in the new API
 
 onPartitionDesigned
 onPartitionAssigned
 
 and see if they meet your needs.
 
 Guozhang
 
 
 On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert 
 robert.with...@dish.commailto:robert.with...@dish.comwrote:
 
 Jun,
 
 Are you saying it is possible to get events from the high-level
 consumer
 regarding various state machine changes?  For instance, can we get a
 notification when a rebalance starts and ends, when a partition is
 assigned/unassigned, when an offset is committed on a partition,
 when
 a
 leader changes and so on?  I call this OOB traffic, since they are
 not
 the
 core messages streaming, but side-band events, yet they are still
 potentially useful to consumers.
 
 Thank you,
 Robert
 
 
 Robert Withers
 Staff Analyst/Developer
 o: (720) 514-8963
 c:  (571) 262-1873
 
 
 
 -Original Message-
 From: Jun Rao [mailto:jun...@gmail.com]
 Sent: Sunday, February 23, 2014 4:19 PM
 To: users@kafka.apache.orgmailto:users@kafka.apache.org
 Subject: Re: New Consumer API discussion
 
 Robert,
 
 For the push orient api, you can potentially implement your own
 MessageHandler with those methods. In the main loop of our new
 consumer
 api, you can just call those methods based on the events you get.
 
 Also, we already have an api to get the first and the last offset
 of a
 partition (getOffsetBefore).
 
 Thanks,
 
 Jun
 
 
 On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert
 robert.with...@dish.commailto:robert.with...@dish.comwrote:
 
 This is a good idea, too.  I would modify it to include stream
 marking, then you can have:
 
 long end = consumer.lastOffset(tp);
 consumer.setMark(end);
 while(consumer.beforeMark()) {
  process(consumer.pollToMark());
 }
 
 or
 
 long end = consumer.lastOffset(tp);
 consumer.setMark(end);
 for(Object msg : consumer.iteratorToMark()) {
  process(msg);
 }
 
 I actually have 4 suggestions, then:
 
 *   pull: stream marking
 *   pull: finite streams, bound by time range (up-to-now, yesterday)
 or
 offset
 *   pull: async api
 *   push: KafkaMessageSource, for a push model, with msg and OOB
 events.
 Build one in either individual or chunk mode and have a listener for
 each msg or a listener for a chunk of msgs.  Make

Re: New Consumer API discussion

2014-02-25 Thread Neha Narkhede
Thanks for the review, Jun. Here are some comments -

1. The using of ellipsis: This may make passing a list of items from a
collection to the api a bit harder. Suppose that you have a list of topics
stored in

ArrayListString topics;

If you want subscribe to all topics in one call, you will have to do:

String[] topicArray = new String[topics.size()];
consumer.subscribe(topics.
toArray(topicArray));

A similar argument can be made for arguably the more common use case of
subscribing to a single topic as well. In these cases, user is required to
write more
code to create a single item collection and pass it in. Since subscription
is extremely lightweight
invoking it multiple times also seems like a workable solution, no?

2. It would be good to document that the following apis are mutually
exclusive. Also, if the partition level subscription is specified, there is
no group management. Finally, unsubscribe() can only be used to cancel
subscriptions with the same pattern. For example, you can't unsubscribe at
the partition level if the subscription is done at the topic level.

*subscribe*(java.lang.String... topics)
*subscribe*(java.lang.String topic, int... partitions)

Makes sense. Made the suggested improvements to the
docshttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29

3.commit(): The following comment in the doc should probably say commit
offsets for partitions assigned to this consumer.

 If no partitions are specified, commits offsets for the subscribed list of
topics and partitions to Kafka.

Could you give more context on this suggestion? Here is the entire doc -

Synchronously commits the specified offsets for the specified list of
topics and partitions to *Kafka*. If no partitions are specified, commits
offsets for the subscribed list of topics and partitions.

The hope is to convey that if no partitions are specified, offsets will be
committed for the subscribed list of partitions. One improvement could be to
explicitly state that the offsets returned on the last poll will be
committed. I updated this to -

Synchronously commits the specified offsets for the specified list of
topics and partitions to *Kafka*. If no offsets are specified, commits
offsets returned on the last {@link #poll(long, TimeUnit) poll()} for the
subscribed list of topics and partitions.

4. There is inconsistency in specifying partitions. Sometimes we use
TopicPartition and some other times we use String and int (see
examples below).

void onPartitionsAssigned(Consumer consumer, TopicPartition...partitions)

public void *subscribe*(java.lang.String topic, int... partitions)

Yes, this was discussed previously. I think generally the consensus seems
to be to use the higher level
classes everywhere. Made those changes.

What's the use case of position()? Isn't that just the nextOffset() on the
last message returned from poll()?

Yes, except in the case where a rebalance is triggered and poll() is not
yet invoked. Here, you would use position() to get the new fetch position
for the specific partition. Even if this is not a common use case, IMO it
is much easier to use position() to get the fetch offset than invoking
nextOffset() on the last message. This also keeps the APIs symmetric, which
is nice.




On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert robert.with...@dish.comwrote:

 That's wonderful.  Thanks for kafka.

 Rob

 On Feb 24, 2014, at 9:58 AM, Guozhang Wang wangg...@gmail.commailto:
 wangg...@gmail.com wrote:

 Hi Robert,

 Yes, you can check out the callback functions in the new API

 onPartitionDesigned
 onPartitionAssigned

 and see if they meet your needs.

 Guozhang


 On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert robert.with...@dish.com
 mailto:robert.with...@dish.comwrote:

 Jun,

 Are you saying it is possible to get events from the high-level consumer
 regarding various state machine changes?  For instance, can we get a
 notification when a rebalance starts and ends, when a partition is
 assigned/unassigned, when an offset is committed on a partition, when a
 leader changes and so on?  I call this OOB traffic, since they are not the
 core messages streaming, but side-band events, yet they are still
 potentially useful to consumers.

 Thank you,
 Robert


 Robert Withers
 Staff Analyst/Developer
 o: (720) 514-8963
 c:  (571) 262-1873



 -Original Message-
 From: Jun Rao [mailto:jun...@gmail.com]
 Sent: Sunday, February 23, 2014 4:19 PM
 To: users@kafka.apache.orgmailto:users@kafka.apache.org
 Subject: Re: New Consumer API discussion

 Robert,

 For the push orient api, you can potentially implement your own
 MessageHandler with those methods. In the main loop of our new consumer
 api, you can just call those methods based on the events you get.

 Also, we already have an api to get the first and the last offset of a
 partition (getOffsetBefore).

 Thanks,

 Jun


 On Sat, Feb 22, 2014 at 11:29 AM

Re: New Consumer API discussion

2014-02-25 Thread Neha Narkhede
 saying it is possible to get events from the high-level consumer
 regarding various state machine changes?  For instance, can we get a
 notification when a rebalance starts and ends, when a partition is
 assigned/unassigned, when an offset is committed on a partition, when a
 leader changes and so on?  I call this OOB traffic, since they are not the
 core messages streaming, but side-band events, yet they are still
 potentially useful to consumers.

 Thank you,
 Robert


 Robert Withers
 Staff Analyst/Developer
 o: (720) 514-8963
 c:  (571) 262-1873



 -Original Message-
 From: Jun Rao [mailto:jun...@gmail.com]
 Sent: Sunday, February 23, 2014 4:19 PM
 To: users@kafka.apache.orgmailto:users@kafka.apache.org
 Subject: Re: New Consumer API discussion

 Robert,

 For the push orient api, you can potentially implement your own
 MessageHandler with those methods. In the main loop of our new consumer
 api, you can just call those methods based on the events you get.

 Also, we already have an api to get the first and the last offset of a
 partition (getOffsetBefore).

 Thanks,

 Jun


 On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert
 robert.with...@dish.commailto:robert.with...@dish.comwrote:

 This is a good idea, too.  I would modify it to include stream
 marking, then you can have:

 long end = consumer.lastOffset(tp);
 consumer.setMark(end);
 while(consumer.beforeMark()) {
   process(consumer.pollToMark());
 }

 or

 long end = consumer.lastOffset(tp);
 consumer.setMark(end);
 for(Object msg : consumer.iteratorToMark()) {
   process(msg);
 }

 I actually have 4 suggestions, then:

 *   pull: stream marking
 *   pull: finite streams, bound by time range (up-to-now, yesterday) or
 offset
 *   pull: async api
 *   push: KafkaMessageSource, for a push model, with msg and OOB events.
 Build one in either individual or chunk mode and have a listener for
 each msg or a listener for a chunk of msgs.  Make it composable and
 policy driven (chunked, range, commitOffsets policy, retry policy,
 transactional)

 Thank you,
 Robert

 On Feb 22, 2014, at 11:21 AM, Jay Kreps jay.kr...@gmail.commailto:
 jay.kr...@gmail.commailto:
 jay.kr...@gmail.commailto:jay.kr...@gmail.com wrote:

 I think what Robert is saying is that we need to think through the
 offset API to enable batch processing of topic data. Think of a
 process that periodically kicks off to compute a data summary or do a
 data load or something like that. I think what we need to support this
 is an api to fetch the last offset from the server for a partition.
 Something like
  long lastOffset(TopicPartition tp)
 and for symmetry
  long firstOffset(TopicPartition tp)

 Likely this would have to be batched. Essentially we should add this
 use case to our set of code examples to write and think through.

 The usage would be something like

 long end = consumer.lastOffset(tp);
 while(consumer.position  end)
   process(consumer.poll());

 -Jay


 On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert
 robert.with...@dish.commailto:robert.with...@dish.com
 mailto:robert.with...@dish.comwrote:

 Jun,

 I was originally thinking a non-blocking read from a distributed
 stream should distinguish between no local messages, but a fetch is
 occurring
 versus you have drained the stream.  The reason this may be valuable
 to me is so I can write consumers that read all known traffic then
 terminate.
 You caused me to reconsider and I think I am conflating 2 things.  One
 is a sync/async api while the other is whether to have an infinite or
 finite stream.  Is it possible to build a finite KafkaStream on a
 range of messages?

 Perhaps a Simple Consumer would do just fine and then I could start
 off getting the writeOffset from zookeeper and tell it to read a
 specified range per partition.  I've done this and forked a simple
 consumer runnable for each partition, for one of our analyzers.  The
 great thing about the high-level consumer is that rebalance, so I can
 fork however many stream readers I want and you just figure it out for
 me.  In that way you offer us the control over the resource
 consumption within a pull model.  This is best to regulate message
 pressure, they say.

 Combining that high-level rebalance ability with a ranged partition
 drain could be really nice...build the stream with an ending position
 and it is a finite stream, but retain the high-level rebalance.  With
 a finite stream, you would know the difference of the 2 async
 scenarios: fetch-in-progress versus end-of-stream.  With an infinite
 stream, you never get end-of-stream.

 Aside from a high-level consumer over a finite range within each
 partition, the other feature I can think of is more complicated.  A
 high-level consumer has state machine changes that the client cannot
 access, to my knowledge.  Our use of kafka has us invoke a message
 handler with each message we consumer from the KafkaStream, so we
 convert a pull-model to a push-model.  Including the idea of receiving

Re: New Consumer API discussion

2014-02-25 Thread Neha Narkhede
() on the
 last message returned from poll()?

 Yes, except in the case where a rebalance is triggered and poll() is not
 yet invoked. Here, you would use position() to get the new fetch position
 for the specific partition. Even if this is not a common use case, IMO it
 is much easier to use position() to get the fetch offset than invoking
 nextOffset() on the last message. This also keeps the APIs symmetric, which
 is nice.




 On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert robert.with...@dish.com
  wrote:

 That's wonderful.  Thanks for kafka.

 Rob

 On Feb 24, 2014, at 9:58 AM, Guozhang Wang wangg...@gmail.commailto:
 wangg...@gmail.com wrote:

 Hi Robert,

 Yes, you can check out the callback functions in the new API

 onPartitionDesigned
 onPartitionAssigned

 and see if they meet your needs.

 Guozhang


 On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert 
 robert.with...@dish.commailto:robert.with...@dish.comwrote:

 Jun,

 Are you saying it is possible to get events from the high-level consumer
 regarding various state machine changes?  For instance, can we get a
 notification when a rebalance starts and ends, when a partition is
 assigned/unassigned, when an offset is committed on a partition, when a
 leader changes and so on?  I call this OOB traffic, since they are not
 the
 core messages streaming, but side-band events, yet they are still
 potentially useful to consumers.

 Thank you,
 Robert


 Robert Withers
 Staff Analyst/Developer
 o: (720) 514-8963
 c:  (571) 262-1873



 -Original Message-
 From: Jun Rao [mailto:jun...@gmail.com]
 Sent: Sunday, February 23, 2014 4:19 PM
 To: users@kafka.apache.orgmailto:users@kafka.apache.org
 Subject: Re: New Consumer API discussion

 Robert,

 For the push orient api, you can potentially implement your own
 MessageHandler with those methods. In the main loop of our new consumer
 api, you can just call those methods based on the events you get.

 Also, we already have an api to get the first and the last offset of a
 partition (getOffsetBefore).

 Thanks,

 Jun


 On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert
 robert.with...@dish.commailto:robert.with...@dish.comwrote:

 This is a good idea, too.  I would modify it to include stream
 marking, then you can have:

 long end = consumer.lastOffset(tp);
 consumer.setMark(end);
 while(consumer.beforeMark()) {
   process(consumer.pollToMark());
 }

 or

 long end = consumer.lastOffset(tp);
 consumer.setMark(end);
 for(Object msg : consumer.iteratorToMark()) {
   process(msg);
 }

 I actually have 4 suggestions, then:

 *   pull: stream marking
 *   pull: finite streams, bound by time range (up-to-now, yesterday) or
 offset
 *   pull: async api
 *   push: KafkaMessageSource, for a push model, with msg and OOB events.
 Build one in either individual or chunk mode and have a listener for
 each msg or a listener for a chunk of msgs.  Make it composable and
 policy driven (chunked, range, commitOffsets policy, retry policy,
 transactional)

 Thank you,
 Robert

 On Feb 22, 2014, at 11:21 AM, Jay Kreps jay.kr...@gmail.commailto:
 jay.kr...@gmail.commailto:
 jay.kr...@gmail.commailto:jay.kr...@gmail.com wrote:

 I think what Robert is saying is that we need to think through the
 offset API to enable batch processing of topic data. Think of a
 process that periodically kicks off to compute a data summary or do a
 data load or something like that. I think what we need to support this
 is an api to fetch the last offset from the server for a partition.
 Something like
  long lastOffset(TopicPartition tp)
 and for symmetry
  long firstOffset(TopicPartition tp)

 Likely this would have to be batched. Essentially we should add this
 use case to our set of code examples to write and think through.

 The usage would be something like

 long end = consumer.lastOffset(tp);
 while(consumer.position  end)
   process(consumer.poll());

 -Jay


 On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert
 robert.with...@dish.commailto:robert.with...@dish.com
 mailto:robert.with...@dish.comwrote:

 Jun,

 I was originally thinking a non-blocking read from a distributed
 stream should distinguish between no local messages, but a fetch is
 occurring
 versus you have drained the stream.  The reason this may be valuable
 to me is so I can write consumers that read all known traffic then
 terminate.
 You caused me to reconsider and I think I am conflating 2 things.  One
 is a sync/async api while the other is whether to have an infinite or
 finite stream.  Is it possible to build a finite KafkaStream on a
 range of messages?

 Perhaps a Simple Consumer would do just fine and then I could start
 off getting the writeOffset from zookeeper and tell it to read a
 specified range per partition.  I've done this and forked a simple
 consumer runnable for each partition, for one of our analyzers.  The
 great thing about the high-level consumer is that rebalance, so I can
 fork however many stream readers I want and you just figure it out for
 me

Re: New Consumer API discussion

2014-02-25 Thread Neha Narkhede
 for the specified list of
 topics and partitions to *Kafka*. If no offsets are specified, commits
 offsets returned on the last {@link #poll(long, TimeUnit) poll()} for
 the subscribed list of topics and partitions.

 4. There is inconsistency in specifying partitions. Sometimes we use
 TopicPartition and some other times we use String and int (see
 examples below).

 void onPartitionsAssigned(Consumer consumer,
 TopicPartition...partitions)

 public void *subscribe*(java.lang.String topic, int... partitions)

 Yes, this was discussed previously. I think generally the consensus
 seems to be to use the higher level
 classes everywhere. Made those changes.

 What's the use case of position()? Isn't that just the nextOffset() on
 the
 last message returned from poll()?

 Yes, except in the case where a rebalance is triggered and poll() is not
 yet invoked. Here, you would use position() to get the new fetch position
 for the specific partition. Even if this is not a common use case, IMO it
 is much easier to use position() to get the fetch offset than invoking
 nextOffset() on the last message. This also keeps the APIs symmetric, which
 is nice.




 On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert 
 robert.with...@dish.com wrote:

 That's wonderful.  Thanks for kafka.

 Rob

 On Feb 24, 2014, at 9:58 AM, Guozhang Wang wangg...@gmail.commailto:
 wangg...@gmail.com wrote:

 Hi Robert,

 Yes, you can check out the callback functions in the new API

 onPartitionDesigned
 onPartitionAssigned

 and see if they meet your needs.

 Guozhang


 On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert 
 robert.with...@dish.commailto:robert.with...@dish.comwrote:

 Jun,

 Are you saying it is possible to get events from the high-level consumer
 regarding various state machine changes?  For instance, can we get a
 notification when a rebalance starts and ends, when a partition is
 assigned/unassigned, when an offset is committed on a partition, when a
 leader changes and so on?  I call this OOB traffic, since they are not
 the
 core messages streaming, but side-band events, yet they are still
 potentially useful to consumers.

 Thank you,
 Robert


 Robert Withers
 Staff Analyst/Developer
 o: (720) 514-8963
 c:  (571) 262-1873



 -Original Message-
 From: Jun Rao [mailto:jun...@gmail.com]
 Sent: Sunday, February 23, 2014 4:19 PM
 To: users@kafka.apache.orgmailto:users@kafka.apache.org
 Subject: Re: New Consumer API discussion

 Robert,

 For the push orient api, you can potentially implement your own
 MessageHandler with those methods. In the main loop of our new consumer
 api, you can just call those methods based on the events you get.

 Also, we already have an api to get the first and the last offset of a
 partition (getOffsetBefore).

 Thanks,

 Jun


 On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert
 robert.with...@dish.commailto:robert.with...@dish.comwrote:

 This is a good idea, too.  I would modify it to include stream
 marking, then you can have:

 long end = consumer.lastOffset(tp);
 consumer.setMark(end);
 while(consumer.beforeMark()) {
   process(consumer.pollToMark());
 }

 or

 long end = consumer.lastOffset(tp);
 consumer.setMark(end);
 for(Object msg : consumer.iteratorToMark()) {
   process(msg);
 }

 I actually have 4 suggestions, then:

 *   pull: stream marking
 *   pull: finite streams, bound by time range (up-to-now, yesterday) or
 offset
 *   pull: async api
 *   push: KafkaMessageSource, for a push model, with msg and OOB events.
 Build one in either individual or chunk mode and have a listener for
 each msg or a listener for a chunk of msgs.  Make it composable and
 policy driven (chunked, range, commitOffsets policy, retry policy,
 transactional)

 Thank you,
 Robert

 On Feb 22, 2014, at 11:21 AM, Jay Kreps jay.kr...@gmail.commailto:
 jay.kr...@gmail.commailto:
 jay.kr...@gmail.commailto:jay.kr...@gmail.com wrote:

 I think what Robert is saying is that we need to think through the
 offset API to enable batch processing of topic data. Think of a
 process that periodically kicks off to compute a data summary or do a
 data load or something like that. I think what we need to support this
 is an api to fetch the last offset from the server for a partition.
 Something like
  long lastOffset(TopicPartition tp)
 and for symmetry
  long firstOffset(TopicPartition tp)

 Likely this would have to be batched. Essentially we should add this
 use case to our set of code examples to write and think through.

 The usage would be something like

 long end = consumer.lastOffset(tp);
 while(consumer.position  end)
   process(consumer.poll());

 -Jay


 On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert
 robert.with...@dish.commailto:robert.with...@dish.com
 mailto:robert.with...@dish.comwrote:

 Jun,

 I was originally thinking a non-blocking read from a distributed
 stream should distinguish between no local messages, but a fetch is
 occurring
 versus you have drained the stream.  The reason this may be valuable
 to me

Re: New Consumer API discussion

2014-02-25 Thread Jun Rao
  Subject: Re: New Consumer API discussion
 
  Robert,
 
  For the push orient api, you can potentially implement your own
  MessageHandler with those methods. In the main loop of our new consumer
  api, you can just call those methods based on the events you get.
 
  Also, we already have an api to get the first and the last offset of a
  partition (getOffsetBefore).
 
  Thanks,
 
  Jun
 
 
  On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert
  robert.with...@dish.commailto:robert.with...@dish.comwrote:
 
  This is a good idea, too.  I would modify it to include stream
  marking, then you can have:
 
  long end = consumer.lastOffset(tp);
  consumer.setMark(end);
  while(consumer.beforeMark()) {
process(consumer.pollToMark());
  }
 
  or
 
  long end = consumer.lastOffset(tp);
  consumer.setMark(end);
  for(Object msg : consumer.iteratorToMark()) {
process(msg);
  }
 
  I actually have 4 suggestions, then:
 
  *   pull: stream marking
  *   pull: finite streams, bound by time range (up-to-now, yesterday) or
  offset
  *   pull: async api
  *   push: KafkaMessageSource, for a push model, with msg and OOB events.
  Build one in either individual or chunk mode and have a listener for
  each msg or a listener for a chunk of msgs.  Make it composable and
  policy driven (chunked, range, commitOffsets policy, retry policy,
  transactional)
 
  Thank you,
  Robert
 
  On Feb 22, 2014, at 11:21 AM, Jay Kreps jay.kr...@gmail.commailto:
  jay.kr...@gmail.commailto:
  jay.kr...@gmail.commailto:jay.kr...@gmail.com wrote:
 
  I think what Robert is saying is that we need to think through the
  offset API to enable batch processing of topic data. Think of a
  process that periodically kicks off to compute a data summary or do a
  data load or something like that. I think what we need to support this
  is an api to fetch the last offset from the server for a partition.
  Something like
   long lastOffset(TopicPartition tp)
  and for symmetry
   long firstOffset(TopicPartition tp)
 
  Likely this would have to be batched. Essentially we should add this
  use case to our set of code examples to write and think through.
 
  The usage would be something like
 
  long end = consumer.lastOffset(tp);
  while(consumer.position  end)
process(consumer.poll());
 
  -Jay
 
 
  On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert
  robert.with...@dish.commailto:robert.with...@dish.com
  mailto:robert.with...@dish.comwrote:
 
  Jun,
 
  I was originally thinking a non-blocking read from a distributed
  stream should distinguish between no local messages, but a fetch is
  occurring
  versus you have drained the stream.  The reason this may be valuable
  to me is so I can write consumers that read all known traffic then
  terminate.
  You caused me to reconsider and I think I am conflating 2 things.  One
  is a sync/async api while the other is whether to have an infinite or
  finite stream.  Is it possible to build a finite KafkaStream on a
  range of messages?
 
  Perhaps a Simple Consumer would do just fine and then I could start
  off getting the writeOffset from zookeeper and tell it to read a
  specified range per partition.  I've done this and forked a simple
  consumer runnable for each partition, for one of our analyzers.  The
  great thing about the high-level consumer is that rebalance, so I can
  fork however many stream readers I want and you just figure it out for
  me.  In that way you offer us the control over the resource
  consumption within a pull model.  This is best to regulate message
  pressure, they say.
 
  Combining that high-level rebalance ability with a ranged partition
  drain could be really nice...build the stream with an ending position
  and it is a finite stream, but retain the high-level rebalance.  With
  a finite stream, you would know the difference of the 2 async
  scenarios: fetch-in-progress versus end-of-stream.  With an infinite
  stream, you never get end-of-stream.
 
  Aside from a high-level consumer over a finite range within each
  partition, the other feature I can think of is more complicated.  A
  high-level consumer has state machine changes that the client cannot
  access, to my knowledge.  Our use of kafka has us invoke a message
  handler with each message we consumer from the KafkaStream, so we
  convert a pull-model to a push-model.  Including the idea of receiving
  notifications from state machine changes, what would be really nice is
  to have a KafkaMessageSource, that is an eventful push model.  If it
  were thread-safe, then we could register listeners for various events:
 
  *   opening-stream
  *   closing-stream
  *   message-arrived
  *   end-of-stream/no-more-messages-in-partition (for finite streams)
  *   rebalance started
  *   partition assigned
  *   partition unassigned
  *   rebalance finished
  *   partition-offset-committed
 
  Perhaps that is just our use, but instead of a pull-oriented
  KafkaStream, is there any sense in your providing a push-oriented

Re: New Consumer API discussion

2014-02-25 Thread Jay Kreps
 
  On Feb 24, 2014, at 9:58 AM, Guozhang Wang wangg...@gmail.commailto:
  wangg...@gmail.com wrote:
 
  Hi Robert,
 
  Yes, you can check out the callback functions in the new API
 
  onPartitionDesigned
  onPartitionAssigned
 
  and see if they meet your needs.
 
  Guozhang
 
 
  On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert 
 robert.with...@dish.com
  mailto:robert.with...@dish.comwrote:
 
  Jun,
 
  Are you saying it is possible to get events from the high-level consumer
  regarding various state machine changes?  For instance, can we get a
  notification when a rebalance starts and ends, when a partition is
  assigned/unassigned, when an offset is committed on a partition, when a
  leader changes and so on?  I call this OOB traffic, since they are not
 the
  core messages streaming, but side-band events, yet they are still
  potentially useful to consumers.
 
  Thank you,
  Robert
 
 
  Robert Withers
  Staff Analyst/Developer
  o: (720) 514-8963
  c:  (571) 262-1873
 
 
 
  -Original Message-
  From: Jun Rao [mailto:jun...@gmail.com]
  Sent: Sunday, February 23, 2014 4:19 PM
  To: users@kafka.apache.orgmailto:users@kafka.apache.org
  Subject: Re: New Consumer API discussion
 
  Robert,
 
  For the push orient api, you can potentially implement your own
  MessageHandler with those methods. In the main loop of our new consumer
  api, you can just call those methods based on the events you get.
 
  Also, we already have an api to get the first and the last offset of a
  partition (getOffsetBefore).
 
  Thanks,
 
  Jun
 
 
  On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert
  robert.with...@dish.commailto:robert.with...@dish.comwrote:
 
  This is a good idea, too.  I would modify it to include stream
  marking, then you can have:
 
  long end = consumer.lastOffset(tp);
  consumer.setMark(end);
  while(consumer.beforeMark()) {
process(consumer.pollToMark());
  }
 
  or
 
  long end = consumer.lastOffset(tp);
  consumer.setMark(end);
  for(Object msg : consumer.iteratorToMark()) {
process(msg);
  }
 
  I actually have 4 suggestions, then:
 
  *   pull: stream marking
  *   pull: finite streams, bound by time range (up-to-now, yesterday) or
  offset
  *   pull: async api
  *   push: KafkaMessageSource, for a push model, with msg and OOB events.
  Build one in either individual or chunk mode and have a listener for
  each msg or a listener for a chunk of msgs.  Make it composable and
  policy driven (chunked, range, commitOffsets policy, retry policy,
  transactional)
 
  Thank you,
  Robert
 
  On Feb 22, 2014, at 11:21 AM, Jay Kreps jay.kr...@gmail.commailto:
  jay.kr...@gmail.commailto:
  jay.kr...@gmail.commailto:jay.kr...@gmail.com wrote:
 
  I think what Robert is saying is that we need to think through the
  offset API to enable batch processing of topic data. Think of a
  process that periodically kicks off to compute a data summary or do a
  data load or something like that. I think what we need to support this
  is an api to fetch the last offset from the server for a partition.
  Something like
   long lastOffset(TopicPartition tp)
  and for symmetry
   long firstOffset(TopicPartition tp)
 
  Likely this would have to be batched. Essentially we should add this
  use case to our set of code examples to write and think through.
 
  The usage would be something like
 
  long end = consumer.lastOffset(tp);
  while(consumer.position  end)
process(consumer.poll());
 
  -Jay
 
 
  On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert
  robert.with...@dish.commailto:robert.with...@dish.com
  mailto:robert.with...@dish.comwrote:
 
  Jun,
 
  I was originally thinking a non-blocking read from a distributed
  stream should distinguish between no local messages, but a fetch is
  occurring
  versus you have drained the stream.  The reason this may be valuable
  to me is so I can write consumers that read all known traffic then
  terminate.
  You caused me to reconsider and I think I am conflating 2 things.  One
  is a sync/async api while the other is whether to have an infinite or
  finite stream.  Is it possible to build a finite KafkaStream on a
  range of messages?
 
  Perhaps a Simple Consumer would do just fine and then I could start
  off getting the writeOffset from zookeeper and tell it to read a
  specified range per partition.  I've done this and forked a simple
  consumer runnable for each partition, for one of our analyzers.  The
  great thing about the high-level consumer is that rebalance, so I can
  fork however many stream readers I want and you just figure it out for
  me.  In that way you offer us the control over the resource
  consumption within a pull model.  This is best to regulate message
  pressure, they say.
 
  Combining that high-level rebalance ability with a ranged partition
  drain could be really nice...build the stream with an ending position
  and it is a finite stream, but retain the high-level rebalance.  With
  a finite stream, you would know the difference

Re: New Consumer API discussion

2014-02-25 Thread Jay Kreps
.
 
  The hope is to convey that if no partitions are specified, offsets will
  be committed for the subscribed list of partitions. One improvement
 could
  be to
  explicitly state that the offsets returned on the last poll will be
  committed. I updated this to -
 
  Synchronously commits the specified offsets for the specified list of
  topics and partitions to *Kafka*. If no offsets are specified, commits
  offsets returned on the last {@link #poll(long, TimeUnit) poll()} for
  the subscribed list of topics and partitions.
 
  4. There is inconsistency in specifying partitions. Sometimes we use
  TopicPartition and some other times we use String and int (see
  examples below).
 
  void onPartitionsAssigned(Consumer consumer,
  TopicPartition...partitions)
 
  public void *subscribe*(java.lang.String topic, int... partitions)
 
  Yes, this was discussed previously. I think generally the consensus
  seems to be to use the higher level
  classes everywhere. Made those changes.
 
  What's the use case of position()? Isn't that just the nextOffset() on
  the
  last message returned from poll()?
 
  Yes, except in the case where a rebalance is triggered and poll() is
 not
  yet invoked. Here, you would use position() to get the new fetch
 position
  for the specific partition. Even if this is not a common use case, IMO
 it
  is much easier to use position() to get the fetch offset than invoking
  nextOffset() on the last message. This also keeps the APIs symmetric,
 which
  is nice.
 
 
 
 
  On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert 
  robert.with...@dish.com wrote:
 
  That's wonderful.  Thanks for kafka.
 
  Rob
 
  On Feb 24, 2014, at 9:58 AM, Guozhang Wang wangg...@gmail.com
 mailto:
  wangg...@gmail.com wrote:
 
  Hi Robert,
 
  Yes, you can check out the callback functions in the new API
 
  onPartitionDesigned
  onPartitionAssigned
 
  and see if they meet your needs.
 
  Guozhang
 
 
  On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert 
  robert.with...@dish.commailto:robert.with...@dish.comwrote:
 
  Jun,
 
  Are you saying it is possible to get events from the high-level
 consumer
  regarding various state machine changes?  For instance, can we get a
  notification when a rebalance starts and ends, when a partition is
  assigned/unassigned, when an offset is committed on a partition, when
 a
  leader changes and so on?  I call this OOB traffic, since they are not
  the
  core messages streaming, but side-band events, yet they are still
  potentially useful to consumers.
 
  Thank you,
  Robert
 
 
  Robert Withers
  Staff Analyst/Developer
  o: (720) 514-8963
  c:  (571) 262-1873
 
 
 
  -Original Message-
  From: Jun Rao [mailto:jun...@gmail.com]
  Sent: Sunday, February 23, 2014 4:19 PM
  To: users@kafka.apache.orgmailto:users@kafka.apache.org
  Subject: Re: New Consumer API discussion
 
  Robert,
 
  For the push orient api, you can potentially implement your own
  MessageHandler with those methods. In the main loop of our new
 consumer
  api, you can just call those methods based on the events you get.
 
  Also, we already have an api to get the first and the last offset of a
  partition (getOffsetBefore).
 
  Thanks,
 
  Jun
 
 
  On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert
  robert.with...@dish.commailto:robert.with...@dish.comwrote:
 
  This is a good idea, too.  I would modify it to include stream
  marking, then you can have:
 
  long end = consumer.lastOffset(tp);
  consumer.setMark(end);
  while(consumer.beforeMark()) {
process(consumer.pollToMark());
  }
 
  or
 
  long end = consumer.lastOffset(tp);
  consumer.setMark(end);
  for(Object msg : consumer.iteratorToMark()) {
process(msg);
  }
 
  I actually have 4 suggestions, then:
 
  *   pull: stream marking
  *   pull: finite streams, bound by time range (up-to-now, yesterday)
 or
  offset
  *   pull: async api
  *   push: KafkaMessageSource, for a push model, with msg and OOB
 events.
  Build one in either individual or chunk mode and have a listener for
  each msg or a listener for a chunk of msgs.  Make it composable and
  policy driven (chunked, range, commitOffsets policy, retry policy,
  transactional)
 
  Thank you,
  Robert
 
  On Feb 22, 2014, at 11:21 AM, Jay Kreps jay.kr...@gmail.commailto:
  jay.kr...@gmail.commailto:
  jay.kr...@gmail.commailto:jay.kr...@gmail.com wrote:
 
  I think what Robert is saying is that we need to think through the
  offset API to enable batch processing of topic data. Think of a
  process that periodically kicks off to compute a data summary or do a
  data load or something like that. I think what we need to support this
  is an api to fetch the last offset from the server for a partition.
  Something like
   long lastOffset(TopicPartition tp)
  and for symmetry
   long firstOffset(TopicPartition tp)
 
  Likely this would have to be batched. Essentially we should add this
  use case to our set of code examples to write and think through.
 
  The usage would be something like

Re: New Consumer API discussion

2014-02-25 Thread Neha Narkhede
 with the same pattern. For example, you can't
 unsubscribe
   at
   the partition level if the subscription is done at the topic level.
  
   *subscribe*(java.lang.String... topics)
   *subscribe*(java.lang.String topic, int... partitions)
  
   Makes sense. Made the suggested improvements to the docs
 
 http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/Consumer.html#subscribe%28java.lang.String...%29
  
  
  
   3.commit(): The following comment in the doc should probably say
  commit
   offsets for partitions assigned to this consumer.
  
If no partitions are specified, commits offsets for the subscribed
  list
   of
   topics and partitions to Kafka.
  
   Could you give more context on this suggestion? Here is the entire
 doc
  -
  
   Synchronously commits the specified offsets for the specified list of
   topics and partitions to *Kafka*. If no partitions are specified,
   commits offsets for the subscribed list of topics and partitions.
  
   The hope is to convey that if no partitions are specified, offsets
 will
   be committed for the subscribed list of partitions. One improvement
  could
   be to
   explicitly state that the offsets returned on the last poll will be
   committed. I updated this to -
  
   Synchronously commits the specified offsets for the specified list of
   topics and partitions to *Kafka*. If no offsets are specified,
 commits
   offsets returned on the last {@link #poll(long, TimeUnit) poll()} for
   the subscribed list of topics and partitions.
  
   4. There is inconsistency in specifying partitions. Sometimes we use
   TopicPartition and some other times we use String and int (see
   examples below).
  
   void onPartitionsAssigned(Consumer consumer,
   TopicPartition...partitions)
  
   public void *subscribe*(java.lang.String topic, int... partitions)
  
   Yes, this was discussed previously. I think generally the consensus
   seems to be to use the higher level
   classes everywhere. Made those changes.
  
   What's the use case of position()? Isn't that just the nextOffset()
 on
   the
   last message returned from poll()?
  
   Yes, except in the case where a rebalance is triggered and poll() is
  not
   yet invoked. Here, you would use position() to get the new fetch
  position
   for the specific partition. Even if this is not a common use case,
 IMO
  it
   is much easier to use position() to get the fetch offset than
 invoking
   nextOffset() on the last message. This also keeps the APIs symmetric,
  which
   is nice.
  
  
  
  
   On Mon, Feb 24, 2014 at 7:06 PM, Withers, Robert 
   robert.with...@dish.com wrote:
  
   That's wonderful.  Thanks for kafka.
  
   Rob
  
   On Feb 24, 2014, at 9:58 AM, Guozhang Wang wangg...@gmail.com
  mailto:
   wangg...@gmail.com wrote:
  
   Hi Robert,
  
   Yes, you can check out the callback functions in the new API
  
   onPartitionDesigned
   onPartitionAssigned
  
   and see if they meet your needs.
  
   Guozhang
  
  
   On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert 
   robert.with...@dish.commailto:robert.with...@dish.comwrote:
  
   Jun,
  
   Are you saying it is possible to get events from the high-level
  consumer
   regarding various state machine changes?  For instance, can we get a
   notification when a rebalance starts and ends, when a partition is
   assigned/unassigned, when an offset is committed on a partition,
 when
  a
   leader changes and so on?  I call this OOB traffic, since they are
 not
   the
   core messages streaming, but side-band events, yet they are still
   potentially useful to consumers.
  
   Thank you,
   Robert
  
  
   Robert Withers
   Staff Analyst/Developer
   o: (720) 514-8963
   c:  (571) 262-1873
  
  
  
   -Original Message-
   From: Jun Rao [mailto:jun...@gmail.com]
   Sent: Sunday, February 23, 2014 4:19 PM
   To: users@kafka.apache.orgmailto:users@kafka.apache.org
   Subject: Re: New Consumer API discussion
  
   Robert,
  
   For the push orient api, you can potentially implement your own
   MessageHandler with those methods. In the main loop of our new
  consumer
   api, you can just call those methods based on the events you get.
  
   Also, we already have an api to get the first and the last offset
 of a
   partition (getOffsetBefore).
  
   Thanks,
  
   Jun
  
  
   On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert
   robert.with...@dish.commailto:robert.with...@dish.comwrote:
  
   This is a good idea, too.  I would modify it to include stream
   marking, then you can have:
  
   long end = consumer.lastOffset(tp);
   consumer.setMark(end);
   while(consumer.beforeMark()) {
 process(consumer.pollToMark());
   }
  
   or
  
   long end = consumer.lastOffset(tp);
   consumer.setMark(end);
   for(Object msg : consumer.iteratorToMark()) {
 process(msg);
   }
  
   I actually have 4 suggestions, then:
  
   *   pull: stream marking
   *   pull: finite streams, bound by time range (up-to-now, yesterday

RE: New Consumer API discussion

2014-02-24 Thread Withers, Robert
Jun,

Are you saying it is possible to get events from the high-level consumer 
regarding various state machine changes?  For instance, can we get a 
notification when a rebalance starts and ends, when a partition is 
assigned/unassigned, when an offset is committed on a partition, when a leader 
changes and so on?  I call this OOB traffic, since they are not the core 
messages streaming, but side-band events, yet they are still potentially useful 
to consumers.

Thank you,
Robert


Robert Withers
Staff Analyst/Developer
o: (720) 514-8963
c:  (571) 262-1873



-Original Message-
From: Jun Rao [mailto:jun...@gmail.com]
Sent: Sunday, February 23, 2014 4:19 PM
To: users@kafka.apache.org
Subject: Re: New Consumer API discussion

Robert,

For the push orient api, you can potentially implement your own MessageHandler 
with those methods. In the main loop of our new consumer api, you can just call 
those methods based on the events you get.

Also, we already have an api to get the first and the last offset of a 
partition (getOffsetBefore).

Thanks,

Jun


On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert
robert.with...@dish.comwrote:

 This is a good idea, too.  I would modify it to include stream
 marking, then you can have:

 long end = consumer.lastOffset(tp);
 consumer.setMark(end);
 while(consumer.beforeMark()) {
process(consumer.pollToMark());
 }

 or

 long end = consumer.lastOffset(tp);
 consumer.setMark(end);
 for(Object msg : consumer.iteratorToMark()) {
process(msg);
 }

 I actually have 4 suggestions, then:

  *   pull: stream marking
  *   pull: finite streams, bound by time range (up-to-now, yesterday) or
 offset
  *   pull: async api
  *   push: KafkaMessageSource, for a push model, with msg and OOB events.
  Build one in either individual or chunk mode and have a listener for
 each msg or a listener for a chunk of msgs.  Make it composable and
 policy driven (chunked, range, commitOffsets policy, retry policy,
 transactional)

 Thank you,
 Robert

 On Feb 22, 2014, at 11:21 AM, Jay Kreps jay.kr...@gmail.commailto:
 jay.kr...@gmail.com wrote:

 I think what Robert is saying is that we need to think through the
 offset API to enable batch processing of topic data. Think of a
 process that periodically kicks off to compute a data summary or do a
 data load or something like that. I think what we need to support this
 is an api to fetch the last offset from the server for a partition. Something 
 like
   long lastOffset(TopicPartition tp)
 and for symmetry
   long firstOffset(TopicPartition tp)

 Likely this would have to be batched. Essentially we should add this
 use case to our set of code examples to write and think through.

 The usage would be something like

 long end = consumer.lastOffset(tp);
 while(consumer.position  end)
process(consumer.poll());

 -Jay


 On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert
 robert.with...@dish.com
 mailto:robert.with...@dish.comwrote:

 Jun,

 I was originally thinking a non-blocking read from a distributed
 stream should distinguish between no local messages, but a fetch is 
 occurring
 versus you have drained the stream.  The reason this may be valuable
 to me is so I can write consumers that read all known traffic then terminate.
 You caused me to reconsider and I think I am conflating 2 things.  One
 is a sync/async api while the other is whether to have an infinite or
 finite stream.  Is it possible to build a finite KafkaStream on a
 range of messages?

 Perhaps a Simple Consumer would do just fine and then I could start
 off getting the writeOffset from zookeeper and tell it to read a
 specified range per partition.  I've done this and forked a simple
 consumer runnable for each partition, for one of our analyzers.  The
 great thing about the high-level consumer is that rebalance, so I can
 fork however many stream readers I want and you just figure it out for
 me.  In that way you offer us the control over the resource
 consumption within a pull model.  This is best to regulate message pressure, 
 they say.

 Combining that high-level rebalance ability with a ranged partition
 drain could be really nice...build the stream with an ending position
 and it is a finite stream, but retain the high-level rebalance.  With
 a finite stream, you would know the difference of the 2 async
 scenarios: fetch-in-progress versus end-of-stream.  With an infinite
 stream, you never get end-of-stream.

 Aside from a high-level consumer over a finite range within each
 partition, the other feature I can think of is more complicated.  A
 high-level consumer has state machine changes that the client cannot
 access, to my knowledge.  Our use of kafka has us invoke a message
 handler with each message we consumer from the KafkaStream, so we
 convert a pull-model to a push-model.  Including the idea of receiving
 notifications from state machine changes, what would be really nice is
 to have a KafkaMessageSource, that is an eventful push model

Re: New Consumer API discussion

2014-02-24 Thread Guozhang Wang
Hi Robert,

Yes, you can check out the callback functions in the new API

onPartitionDesigned
onPartitionAssigned

and see if they meet your needs.

Guozhang


On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert robert.with...@dish.comwrote:

 Jun,

 Are you saying it is possible to get events from the high-level consumer
 regarding various state machine changes?  For instance, can we get a
 notification when a rebalance starts and ends, when a partition is
 assigned/unassigned, when an offset is committed on a partition, when a
 leader changes and so on?  I call this OOB traffic, since they are not the
 core messages streaming, but side-band events, yet they are still
 potentially useful to consumers.

 Thank you,
 Robert


 Robert Withers
 Staff Analyst/Developer
 o: (720) 514-8963
 c:  (571) 262-1873



 -Original Message-
 From: Jun Rao [mailto:jun...@gmail.com]
 Sent: Sunday, February 23, 2014 4:19 PM
 To: users@kafka.apache.org
 Subject: Re: New Consumer API discussion

 Robert,

 For the push orient api, you can potentially implement your own
 MessageHandler with those methods. In the main loop of our new consumer
 api, you can just call those methods based on the events you get.

 Also, we already have an api to get the first and the last offset of a
 partition (getOffsetBefore).

 Thanks,

 Jun


 On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert
 robert.with...@dish.comwrote:

  This is a good idea, too.  I would modify it to include stream
  marking, then you can have:
 
  long end = consumer.lastOffset(tp);
  consumer.setMark(end);
  while(consumer.beforeMark()) {
 process(consumer.pollToMark());
  }
 
  or
 
  long end = consumer.lastOffset(tp);
  consumer.setMark(end);
  for(Object msg : consumer.iteratorToMark()) {
 process(msg);
  }
 
  I actually have 4 suggestions, then:
 
   *   pull: stream marking
   *   pull: finite streams, bound by time range (up-to-now, yesterday) or
  offset
   *   pull: async api
   *   push: KafkaMessageSource, for a push model, with msg and OOB events.
   Build one in either individual or chunk mode and have a listener for
  each msg or a listener for a chunk of msgs.  Make it composable and
  policy driven (chunked, range, commitOffsets policy, retry policy,
  transactional)
 
  Thank you,
  Robert
 
  On Feb 22, 2014, at 11:21 AM, Jay Kreps jay.kr...@gmail.commailto:
  jay.kr...@gmail.com wrote:
 
  I think what Robert is saying is that we need to think through the
  offset API to enable batch processing of topic data. Think of a
  process that periodically kicks off to compute a data summary or do a
  data load or something like that. I think what we need to support this
  is an api to fetch the last offset from the server for a partition.
 Something like
long lastOffset(TopicPartition tp)
  and for symmetry
long firstOffset(TopicPartition tp)
 
  Likely this would have to be batched. Essentially we should add this
  use case to our set of code examples to write and think through.
 
  The usage would be something like
 
  long end = consumer.lastOffset(tp);
  while(consumer.position  end)
 process(consumer.poll());
 
  -Jay
 
 
  On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert
  robert.with...@dish.com
  mailto:robert.with...@dish.comwrote:
 
  Jun,
 
  I was originally thinking a non-blocking read from a distributed
  stream should distinguish between no local messages, but a fetch is
 occurring
  versus you have drained the stream.  The reason this may be valuable
  to me is so I can write consumers that read all known traffic then
 terminate.
  You caused me to reconsider and I think I am conflating 2 things.  One
  is a sync/async api while the other is whether to have an infinite or
  finite stream.  Is it possible to build a finite KafkaStream on a
  range of messages?
 
  Perhaps a Simple Consumer would do just fine and then I could start
  off getting the writeOffset from zookeeper and tell it to read a
  specified range per partition.  I've done this and forked a simple
  consumer runnable for each partition, for one of our analyzers.  The
  great thing about the high-level consumer is that rebalance, so I can
  fork however many stream readers I want and you just figure it out for
  me.  In that way you offer us the control over the resource
  consumption within a pull model.  This is best to regulate message
 pressure, they say.
 
  Combining that high-level rebalance ability with a ranged partition
  drain could be really nice...build the stream with an ending position
  and it is a finite stream, but retain the high-level rebalance.  With
  a finite stream, you would know the difference of the 2 async
  scenarios: fetch-in-progress versus end-of-stream.  With an infinite
  stream, you never get end-of-stream.
 
  Aside from a high-level consumer over a finite range within each
  partition, the other feature I can think of is more complicated.  A
  high-level consumer has state machine changes that the client cannot

Re: New Consumer API discussion

2014-02-24 Thread Withers, Robert
That’s wonderful.  Thanks for kafka.

Rob

On Feb 24, 2014, at 9:58 AM, Guozhang Wang 
wangg...@gmail.commailto:wangg...@gmail.com wrote:

Hi Robert,

Yes, you can check out the callback functions in the new API

onPartitionDesigned
onPartitionAssigned

and see if they meet your needs.

Guozhang


On Mon, Feb 24, 2014 at 8:18 AM, Withers, Robert 
robert.with...@dish.commailto:robert.with...@dish.comwrote:

Jun,

Are you saying it is possible to get events from the high-level consumer
regarding various state machine changes?  For instance, can we get a
notification when a rebalance starts and ends, when a partition is
assigned/unassigned, when an offset is committed on a partition, when a
leader changes and so on?  I call this OOB traffic, since they are not the
core messages streaming, but side-band events, yet they are still
potentially useful to consumers.

Thank you,
Robert


Robert Withers
Staff Analyst/Developer
o: (720) 514-8963
c:  (571) 262-1873



-Original Message-
From: Jun Rao [mailto:jun...@gmail.com]
Sent: Sunday, February 23, 2014 4:19 PM
To: users@kafka.apache.orgmailto:users@kafka.apache.org
Subject: Re: New Consumer API discussion

Robert,

For the push orient api, you can potentially implement your own
MessageHandler with those methods. In the main loop of our new consumer
api, you can just call those methods based on the events you get.

Also, we already have an api to get the first and the last offset of a
partition (getOffsetBefore).

Thanks,

Jun


On Sat, Feb 22, 2014 at 11:29 AM, Withers, Robert
robert.with...@dish.commailto:robert.with...@dish.comwrote:

This is a good idea, too.  I would modify it to include stream
marking, then you can have:

long end = consumer.lastOffset(tp);
consumer.setMark(end);
while(consumer.beforeMark()) {
  process(consumer.pollToMark());
}

or

long end = consumer.lastOffset(tp);
consumer.setMark(end);
for(Object msg : consumer.iteratorToMark()) {
  process(msg);
}

I actually have 4 suggestions, then:

*   pull: stream marking
*   pull: finite streams, bound by time range (up-to-now, yesterday) or
offset
*   pull: async api
*   push: KafkaMessageSource, for a push model, with msg and OOB events.
Build one in either individual or chunk mode and have a listener for
each msg or a listener for a chunk of msgs.  Make it composable and
policy driven (chunked, range, commitOffsets policy, retry policy,
transactional)

Thank you,
Robert

On Feb 22, 2014, at 11:21 AM, Jay Kreps 
jay.kr...@gmail.commailto:jay.kr...@gmail.commailto:
jay.kr...@gmail.commailto:jay.kr...@gmail.com wrote:

I think what Robert is saying is that we need to think through the
offset API to enable batch processing of topic data. Think of a
process that periodically kicks off to compute a data summary or do a
data load or something like that. I think what we need to support this
is an api to fetch the last offset from the server for a partition.
Something like
 long lastOffset(TopicPartition tp)
and for symmetry
 long firstOffset(TopicPartition tp)

Likely this would have to be batched. Essentially we should add this
use case to our set of code examples to write and think through.

The usage would be something like

long end = consumer.lastOffset(tp);
while(consumer.position  end)
  process(consumer.poll());

-Jay


On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert
robert.with...@dish.commailto:robert.with...@dish.com
mailto:robert.with...@dish.comwrote:

Jun,

I was originally thinking a non-blocking read from a distributed
stream should distinguish between no local messages, but a fetch is
occurring
versus you have drained the stream.  The reason this may be valuable
to me is so I can write consumers that read all known traffic then
terminate.
You caused me to reconsider and I think I am conflating 2 things.  One
is a sync/async api while the other is whether to have an infinite or
finite stream.  Is it possible to build a finite KafkaStream on a
range of messages?

Perhaps a Simple Consumer would do just fine and then I could start
off getting the writeOffset from zookeeper and tell it to read a
specified range per partition.  I've done this and forked a simple
consumer runnable for each partition, for one of our analyzers.  The
great thing about the high-level consumer is that rebalance, so I can
fork however many stream readers I want and you just figure it out for
me.  In that way you offer us the control over the resource
consumption within a pull model.  This is best to regulate message
pressure, they say.

Combining that high-level rebalance ability with a ranged partition
drain could be really nice...build the stream with an ending position
and it is a finite stream, but retain the high-level rebalance.  With
a finite stream, you would know the difference of the 2 async
scenarios: fetch-in-progress versus end-of-stream.  With an infinite
stream, you never get end-of-stream.

Aside from a high-level consumer over a finite range within each
partition, the other feature I

Re: New Consumer API discussion

2014-02-23 Thread Withers, Robert
We use kafka as a durable buffer for 3rd party event traffic.  It acts as the 
event source in a lambda architecture.  We want it to be exactly once and we 
are close, though we can lose messages aggregating for Hadoop.  To really tie 
this all together, I think there should be an Apache project to implement a 
proper 3-phase distributed transaction capability, which the Kafka and Hadoop 
communities could implement together.  This paper looks promising.  It is a 3 
RTT protocol, but it is non-blocking.  This could be a part of a new consumer 
api, at some point.

http://ieeexplore.ieee.org/xpl/articleDetails.jsp?arnumber=1703048

regards,
Rob

Re: New Consumer API discussion

2014-02-23 Thread Jun Rao
 and NoMessagePendingException? The nextMsgs() method that you want is
 exactly what poll() does.

 Thanks,

 Jun


 On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert robert.with...@dish.com
 mailto:robert.with...@dish.com
 mailto:robert.with...@dish.comwrote:

 I am not clear on why the consumer stream should be positionable,
 especially if it is limited to the in-memory fetched messages.  Could
 someone explain to me, please?  I really like the idea of committing the
 offset specifically on those partitions with changed read offsets, only.



 2 items I would like to see added to the KafkaStream are:

 * a non-blocking next(), throws several exceptions
 (FetchingInProgressException and a NoMessagePendingException or something)
 to differentiate between fetching or no messages left.

 * A nextMsgs() method which returns all locally available messages
 and kicks off a fetch for the next chunk.



 If you are trying to add transactional features, then formally define a
 DTP capability and pull in other server frameworks to share the
 implementation.  Should it be XA/Open?  How about a new peer2peer DTP
 protocol?



 Thank you,

 Robert



 Robert Withers

 Staff Analyst/Developer

 o: (720) 514-8963

 c:  (571) 262-1873



 -Original Message-
 From: Jay Kreps [mailto:jay.kr...@gmail.com]
 Sent: Sunday, February 16, 2014 10:13 AM
 To: users@kafka.apache.orgmailto:users@kafka.apache.orgmailto:
 users@kafka.apache.org
 Subject: Re: New Consumer API discussion



 +1 I think those are good. It is a little weird that changing the fetch

 point is not batched but changing the commit point is, but I suppose there
 is no helping that.



 -Jay





 On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede neha.narkh...@gmail.com
 mailto:neha.narkh...@gmail.com
 mailto:neha.narkh...@gmail.com
 mailto:neha.narkh...@gmail.comwrote:



 Jay,



 That makes sense. position/seek deal with changing the consumers

 in-memory data, so there is no remote rpc there. For some reason, I

 got committed and seek mixed up in my head at that time :)



 So we still end up with



  long position(TopicPartition tp)

  void seek(TopicPartitionOffset p)

  MapTopicPartition, Long committed(TopicPartition tp);

  void commit(TopicPartitionOffset...);



 Thanks,

 Neha



 On Friday, February 14, 2014, Jay Kreps jay.kr...@gmail.commailto:
 jay.kr...@gmail.commailto:
 jay.kr...@gmail.commailto:jay.kr...@gmail.commailto:
 jay.kr...@gmail.commailto:jay.kr...@gmail.commailto:jay.kr...@gmail.com
 wrote:



 Oh, interesting. So I am assuming the following implementation:

 1. We have an in-memory fetch position which controls the next fetch

 offset.

 2. Changing this has no effect until you poll again at which point

 your fetch request will be from the newly specified offset 3. We

 then have an in-memory but also remotely stored committed offset.

 4. Calling commit has the effect of saving the fetch position as

 both the in memory committed position and in the remote store 5.

 Auto-commit is the same as periodically calling commit on all

 positions.



 So batching on commit as well as getting the committed position

 makes sense, but batching the fetch position wouldn't, right? I

 think you are actually thinking of a different approach.



 -Jay





 On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede

 neha.narkh...@gmail.commailto:neha.narkh...@gmail.commailto:
 neha.narkh...@gmail.com

 javascript:;

 wrote:



 I think you are saying both, i.e. if you have committed on a

 partition it returns you that value but if you

 haven't

 it does a remote lookup?



 Correct.



 The other argument for making committed batched is that commit()

 is batched, so there is symmetry.



 position() and seek() are always in memory changes (I assume) so

 there

 is

 no need to batch them.



 I'm not as sure as you are about that assumption being true.

 Basically

 in

 my example above, the batching argument for committed() also

 applies to

 position() since one purpose of fetching a partition's offset is

 to use

 it

 to set the position of the consumer to that offset. Since that

 might

 lead

 to a remote OffsetRequest call, I think we probably would be

 better off batching it.



 Another option for naming would be position/reposition instead of

 position/seek.



 I think position/seek is better since it aligns with Java file APIs.



 I also think your suggestion about ConsumerPosition makes sense.



 Thanks,

 Neha

 On Feb 13, 2014 9:22 PM, Jay Kreps jay.kr...@gmail.commailto:
 jay.kr...@gmail.commailto:
 jay.kr...@gmail.commailto:jay.kr...@gmail.commailto:
 jay.kr...@gmail.commailto:jay.kr...@gmail.commailto:jay.kr...@gmail.com
 wrote:



 Hey Neha,



 I actually wasn't proposing the name TopicOffsetPosition, that

 was

 just a

 typo. I meant TopicPartitionOffset, and I was just referencing

 what

 was

 in

 the javadoc. So to restate my proposal without the typo, using

 just

 the

 existing classes

Re: New Consumer API discussion

2014-02-22 Thread Withers, Robert
Jun,

I was originally thinking a non-blocking read from a distributed stream should 
distinguish between no local messages, but a fetch is occurring” versus “you 
have drained the stream”.  The reason this may be valuable to me is so I can 
write consumers that read all known traffic then terminate.  You caused me to 
reconsider and I think I am conflating 2 things.  One is a sync/async api while 
the other is whether to have an infinite or finite stream.  Is it possible to 
build a finite KafkaStream on a range of messages?

Perhaps a Simple Consumer would do just fine and then I could start off getting 
the writeOffset from zookeeper and tell it to read a specified range per 
partition.  I’ve done this and forked a simple consumer runnable for each 
partition, for one of our analyzers.  The great thing about the high-level 
consumer is that rebalance, so I can fork however many stream readers I want 
and you just figure it out for me.  In that way you offer us the control over 
the resource consumption within a pull model.  This is best to regulate message 
pressure, they say.

Combining that high-level rebalance ability with a ranged partition drain could 
be really nice…build the stream with an ending position and it is a finite 
stream, but retain the high-level rebalance.  With a finite stream, you would 
know the difference of the 2 async scenarios: fetch-in-progress versus 
end-of-stream.  With an infinite stream, you never get end-of-stream.

Aside from a high-level consumer over a finite range within each partition, the 
other feature I can think of is more complicated.  A high-level consumer has 
state machine changes that the client cannot access, to my knowledge.  Our use 
of kafka has us invoke a message handler with each message we consumer from the 
KafkaStream, so we convert a pull-model to a push-model.  Including the idea of 
receiving notifications from state machine changes, what would be really nice 
is to have a KafkaMessageSource, that is an eventful push model.  If it were 
thread-safe, then we could register listeners for various events:

 *   opening-stream
 *   closing-stream
 *   message-arrived
 *   end-of-stream/no-more-messages-in-partition (for finite streams)
 *   rebalance started
 *   partition assigned
 *   partition unassigned
 *   rebalance finished
 *   partition-offset-committed

Perhaps that is just our use, but instead of a pull-oriented KafkaStream, is 
there any sense in your providing a push-oriented KafkaMessageSource publishing 
OOB messages?

thank you,
Robert

On Feb 21, 2014, at 5:59 PM, Jun Rao 
jun...@gmail.commailto:jun...@gmail.com wrote:

Robert,

Could you explain why you want to distinguish btw FetchingInProgressException
and NoMessagePendingException? The nextMsgs() method that you want is
exactly what poll() does.

Thanks,

Jun


On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert 
robert.with...@dish.commailto:robert.with...@dish.comwrote:

I am not clear on why the consumer stream should be positionable,
especially if it is limited to the in-memory fetched messages.  Could
someone explain to me, please?  I really like the idea of committing the
offset specifically on those partitions with changed read offsets, only.



2 items I would like to see added to the KafkaStream are:

* a non-blocking next(), throws several exceptions
(FetchingInProgressException and a NoMessagePendingException or something)
to differentiate between fetching or no messages left.

* A nextMsgs() method which returns all locally available messages
and kicks off a fetch for the next chunk.



If you are trying to add transactional features, then formally define a
DTP capability and pull in other server frameworks to share the
implementation.  Should it be XA/Open?  How about a new peer2peer DTP
protocol?



Thank you,

Robert



Robert Withers

Staff Analyst/Developer

o: (720) 514-8963

c:  (571) 262-1873



-Original Message-
From: Jay Kreps [mailto:jay.kr...@gmail.com]
Sent: Sunday, February 16, 2014 10:13 AM
To: users@kafka.apache.orgmailto:users@kafka.apache.org
Subject: Re: New Consumer API discussion



+1 I think those are good. It is a little weird that changing the fetch

point is not batched but changing the commit point is, but I suppose there
is no helping that.



-Jay





On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede 
neha.narkh...@gmail.commailto:neha.narkh...@gmail.com
mailto:neha.narkh...@gmail.comwrote:



Jay,



That makes sense. position/seek deal with changing the consumers

in-memory data, so there is no remote rpc there. For some reason, I

got committed and seek mixed up in my head at that time :)



So we still end up with



  long position(TopicPartition tp)

  void seek(TopicPartitionOffset p)

  MapTopicPartition, Long committed(TopicPartition tp);

  void commit(TopicPartitionOffset...);



Thanks,

Neha



On Friday, February 14, 2014, Jay Kreps 
jay.kr...@gmail.commailto:jay.kr...@gmail.commailto:
jay.kr

Re: New Consumer API discussion

2014-02-22 Thread Jay Kreps
I think what Robert is saying is that we need to think through the offset
API to enable batch processing of topic data. Think of a process that
periodically kicks off to compute a data summary or do a data load or
something like that. I think what we need to support this is an api to
fetch the last offset from the server for a partition. Something like
   long lastOffset(TopicPartition tp)
and for symmetry
   long firstOffset(TopicPartition tp)

Likely this would have to be batched. Essentially we should add this use
case to our set of code examples to write and think through.

The usage would be something like

long end = consumer.lastOffset(tp);
while(consumer.position  end)
process(consumer.poll());

-Jay


On Sat, Feb 22, 2014 at 1:52 AM, Withers, Robert robert.with...@dish.comwrote:

 Jun,

 I was originally thinking a non-blocking read from a distributed stream
 should distinguish between no local messages, but a fetch is occurring
 versus you have drained the stream.  The reason this may be valuable to
 me is so I can write consumers that read all known traffic then terminate.
  You caused me to reconsider and I think I am conflating 2 things.  One is
 a sync/async api while the other is whether to have an infinite or finite
 stream.  Is it possible to build a finite KafkaStream on a range of
 messages?

 Perhaps a Simple Consumer would do just fine and then I could start off
 getting the writeOffset from zookeeper and tell it to read a specified
 range per partition.  I've done this and forked a simple consumer runnable
 for each partition, for one of our analyzers.  The great thing about the
 high-level consumer is that rebalance, so I can fork however many stream
 readers I want and you just figure it out for me.  In that way you offer us
 the control over the resource consumption within a pull model.  This is
 best to regulate message pressure, they say.

 Combining that high-level rebalance ability with a ranged partition drain
 could be really nice...build the stream with an ending position and it is a
 finite stream, but retain the high-level rebalance.  With a finite stream,
 you would know the difference of the 2 async scenarios: fetch-in-progress
 versus end-of-stream.  With an infinite stream, you never get end-of-stream.

 Aside from a high-level consumer over a finite range within each
 partition, the other feature I can think of is more complicated.  A
 high-level consumer has state machine changes that the client cannot
 access, to my knowledge.  Our use of kafka has us invoke a message handler
 with each message we consumer from the KafkaStream, so we convert a
 pull-model to a push-model.  Including the idea of receiving notifications
 from state machine changes, what would be really nice is to have a
 KafkaMessageSource, that is an eventful push model.  If it were
 thread-safe, then we could register listeners for various events:

  *   opening-stream
  *   closing-stream
  *   message-arrived
  *   end-of-stream/no-more-messages-in-partition (for finite streams)
  *   rebalance started
  *   partition assigned
  *   partition unassigned
  *   rebalance finished
  *   partition-offset-committed

 Perhaps that is just our use, but instead of a pull-oriented KafkaStream,
 is there any sense in your providing a push-oriented KafkaMessageSource
 publishing OOB messages?

 thank you,
 Robert

 On Feb 21, 2014, at 5:59 PM, Jun Rao jun...@gmail.commailto:
 jun...@gmail.com wrote:

 Robert,

 Could you explain why you want to distinguish btw
 FetchingInProgressException
 and NoMessagePendingException? The nextMsgs() method that you want is
 exactly what poll() does.

 Thanks,

 Jun


 On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert robert.with...@dish.com
 mailto:robert.with...@dish.comwrote:

 I am not clear on why the consumer stream should be positionable,
 especially if it is limited to the in-memory fetched messages.  Could
 someone explain to me, please?  I really like the idea of committing the
 offset specifically on those partitions with changed read offsets, only.



 2 items I would like to see added to the KafkaStream are:

 * a non-blocking next(), throws several exceptions
 (FetchingInProgressException and a NoMessagePendingException or something)
 to differentiate between fetching or no messages left.

 * A nextMsgs() method which returns all locally available messages
 and kicks off a fetch for the next chunk.



 If you are trying to add transactional features, then formally define a
 DTP capability and pull in other server frameworks to share the
 implementation.  Should it be XA/Open?  How about a new peer2peer DTP
 protocol?



 Thank you,

 Robert



 Robert Withers

 Staff Analyst/Developer

 o: (720) 514-8963

 c:  (571) 262-1873



 -Original Message-
 From: Jay Kreps [mailto:jay.kr...@gmail.com]
 Sent: Sunday, February 16, 2014 10:13 AM
 To: users@kafka.apache.orgmailto:users@kafka.apache.org
 Subject: Re: New Consumer API discussion

Re: New Consumer API discussion

2014-02-22 Thread Withers, Robert
 partitions with changed read offsets, only.



2 items I would like to see added to the KafkaStream are:

* a non-blocking next(), throws several exceptions
(FetchingInProgressException and a NoMessagePendingException or something)
to differentiate between fetching or no messages left.

* A nextMsgs() method which returns all locally available messages
and kicks off a fetch for the next chunk.



If you are trying to add transactional features, then formally define a
DTP capability and pull in other server frameworks to share the
implementation.  Should it be XA/Open?  How about a new peer2peer DTP
protocol?



Thank you,

Robert



Robert Withers

Staff Analyst/Developer

o: (720) 514-8963

c:  (571) 262-1873



-Original Message-
From: Jay Kreps [mailto:jay.kr...@gmail.com]
Sent: Sunday, February 16, 2014 10:13 AM
To: 
users@kafka.apache.orgmailto:users@kafka.apache.orgmailto:users@kafka.apache.org
Subject: Re: New Consumer API discussion



+1 I think those are good. It is a little weird that changing the fetch

point is not batched but changing the commit point is, but I suppose there
is no helping that.



-Jay





On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede 
neha.narkh...@gmail.commailto:neha.narkh...@gmail.com
mailto:neha.narkh...@gmail.com
mailto:neha.narkh...@gmail.comwrote:



Jay,



That makes sense. position/seek deal with changing the consumers

in-memory data, so there is no remote rpc there. For some reason, I

got committed and seek mixed up in my head at that time :)



So we still end up with



 long position(TopicPartition tp)

 void seek(TopicPartitionOffset p)

 MapTopicPartition, Long committed(TopicPartition tp);

 void commit(TopicPartitionOffset...);



Thanks,

Neha



On Friday, February 14, 2014, Jay Kreps 
jay.kr...@gmail.commailto:jay.kr...@gmail.commailto:
jay.kr...@gmail.commailto:jay.kr...@gmail.commailto:
jay.kr...@gmail.commailto:jay.kr...@gmail.commailto:jay.kr...@gmail.com 
wrote:



Oh, interesting. So I am assuming the following implementation:

1. We have an in-memory fetch position which controls the next fetch

offset.

2. Changing this has no effect until you poll again at which point

your fetch request will be from the newly specified offset 3. We

then have an in-memory but also remotely stored committed offset.

4. Calling commit has the effect of saving the fetch position as

both the in memory committed position and in the remote store 5.

Auto-commit is the same as periodically calling commit on all

positions.



So batching on commit as well as getting the committed position

makes sense, but batching the fetch position wouldn't, right? I

think you are actually thinking of a different approach.



-Jay





On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede

neha.narkh...@gmail.commailto:neha.narkh...@gmail.commailto:neha.narkh...@gmail.com

javascript:;

wrote:



I think you are saying both, i.e. if you have committed on a

partition it returns you that value but if you

haven't

it does a remote lookup?



Correct.



The other argument for making committed batched is that commit()

is batched, so there is symmetry.



position() and seek() are always in memory changes (I assume) so

there

is

no need to batch them.



I'm not as sure as you are about that assumption being true.

Basically

in

my example above, the batching argument for committed() also

applies to

position() since one purpose of fetching a partition's offset is

to use

it

to set the position of the consumer to that offset. Since that

might

lead

to a remote OffsetRequest call, I think we probably would be

better off batching it.



Another option for naming would be position/reposition instead of

position/seek.



I think position/seek is better since it aligns with Java file APIs.



I also think your suggestion about ConsumerPosition makes sense.



Thanks,

Neha

On Feb 13, 2014 9:22 PM, Jay Kreps 
jay.kr...@gmail.commailto:jay.kr...@gmail.commailto:
jay.kr...@gmail.commailto:jay.kr...@gmail.commailto:
jay.kr...@gmail.commailto:jay.kr...@gmail.commailto:jay.kr...@gmail.com 
wrote:



Hey Neha,



I actually wasn't proposing the name TopicOffsetPosition, that

was

just a

typo. I meant TopicPartitionOffset, and I was just referencing

what

was

in

the javadoc. So to restate my proposal without the typo, using

just

the

existing classes (that naming is a separate question):

 long position(TopicPartition tp)

 void seek(TopicPartitionOffset p)

 long committed(TopicPartition tp)

 void commit(TopicPartitionOffset...);



So I may be unclear on committed() (AKA lastCommittedOffset). Is

it returning the in-memory value from the last commit by this

consumer,

or

is

it doing a remote fetch, or both? I think you are saying both, i.e.

if

you

have committed on a partition it returns you that value but if

you

haven't

it does a remote lookup?



The other argument for making committed batched is that commit()

is batched, so there is symmetry

Re: New Consumer API discussion

2014-02-21 Thread Jun Rao
What's the use case of position()? Isn't that just the nextOffset() on the
last message returned from poll()?

Thanks,

Jun


On Sun, Feb 16, 2014 at 9:12 AM, Jay Kreps jay.kr...@gmail.com wrote:

 +1 I think those are good. It is a little weird that changing the fetch
 point is not batched but changing the commit point is, but I suppose there
 is no helping that.

 -Jay


 On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede neha.narkh...@gmail.com
 wrote:

  Jay,
 
  That makes sense. position/seek deal with changing the consumers
 in-memory
  data, so there is no remote rpc there. For some reason, I got committed
 and
  seek mixed up in my head at that time :)
 
  So we still end up with
 
 long position(TopicPartition tp)
 void seek(TopicPartitionOffset p)
 MapTopicPartition, Long committed(TopicPartition tp);
 void commit(TopicPartitionOffset...);
 
  Thanks,
  Neha
 
  On Friday, February 14, 2014, Jay Kreps jay.kr...@gmail.com wrote:
 
   Oh, interesting. So I am assuming the following implementation:
   1. We have an in-memory fetch position which controls the next fetch
   offset.
   2. Changing this has no effect until you poll again at which point your
   fetch request will be from the newly specified offset
   3. We then have an in-memory but also remotely stored committed offset.
   4. Calling commit has the effect of saving the fetch position as both
 the
   in memory committed position and in the remote store
   5. Auto-commit is the same as periodically calling commit on all
  positions.
  
   So batching on commit as well as getting the committed position makes
   sense, but batching the fetch position wouldn't, right? I think you are
   actually thinking of a different approach.
  
   -Jay
  
  
   On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede 
 neha.narkh...@gmail.com
  javascript:;
   wrote:
  
I think you are saying both, i.e. if you
have committed on a partition it returns you that value but if you
   haven't
it does a remote lookup?
   
Correct.
   
The other argument for making committed batched is that commit() is
batched, so there is symmetry.
   
position() and seek() are always in memory changes (I assume) so
 there
  is
no need to batch them.
   
I'm not as sure as you are about that assumption being true.
 Basically
  in
my example above, the batching argument for committed() also applies
 to
position() since one purpose of fetching a partition's offset is to
 use
   it
to set the position of the consumer to that offset. Since that might
  lead
to a remote OffsetRequest call, I think we probably would be better
 off
batching it.
   
Another option for naming would be position/reposition instead
of position/seek.
   
I think position/seek is better since it aligns with Java file APIs.
   
I also think your suggestion about ConsumerPosition makes sense.
   
Thanks,
Neha
On Feb 13, 2014 9:22 PM, Jay Kreps jay.kr...@gmail.com wrote:
   
 Hey Neha,

 I actually wasn't proposing the name TopicOffsetPosition, that was
   just a
 typo. I meant TopicPartitionOffset, and I was just referencing what
  was
in
 the javadoc. So to restate my proposal without the typo, using just
  the
 existing classes (that naming is a separate question):
long position(TopicPartition tp)
void seek(TopicPartitionOffset p)
long committed(TopicPartition tp)
void commit(TopicPartitionOffset...);

 So I may be unclear on committed() (AKA lastCommittedOffset). Is it
 returning the in-memory value from the last commit by this
 consumer,
  or
is
 it doing a remote fetch, or both? I think you are saying both, i.e.
  if
you
 have committed on a partition it returns you that value but if you
haven't
 it does a remote lookup?

 The other argument for making committed batched is that commit() is
 batched, so there is symmetry.

 position() and seek() are always in memory changes (I assume) so
  there
   is
 no need to batch them.

 So taking all that into account what if we revise it to
long position(TopicPartition tp)
void seek(TopicPartitionOffset p)
MapTopicPartition, Long committed(TopicPartition tp);
void commit(TopicPartitionOffset...);

 This is not symmetric between position/seek and commit/committed
 but
  it
is
 convenient. Another option for naming would be position/reposition
instead
 of position/seek.

 With respect to the name TopicPartitionOffset, what I was trying to
  say
is
 that I recommend we change that to something shorter. I think
TopicPosition
 or ConsumerPosition might be better. Position does not refer to the
 variables in the object, it refers to the meaning of the object--it
 represents a position within a topic. The offset field in that
 object
   is
 still called the offset. 

Re: New Consumer API discussion

2014-02-21 Thread Jun Rao
Robert,

Could you explain why you want to distinguish btw FetchingInProgressException
and NoMessagePendingException? The nextMsgs() method that you want is
exactly what poll() does.

Thanks,

Jun


On Wed, Feb 19, 2014 at 8:45 AM, Withers, Robert robert.with...@dish.comwrote:

 I am not clear on why the consumer stream should be positionable,
 especially if it is limited to the in-memory fetched messages.  Could
 someone explain to me, please?  I really like the idea of committing the
 offset specifically on those partitions with changed read offsets, only.



 2 items I would like to see added to the KafkaStream are:

 * a non-blocking next(), throws several exceptions
 (FetchingInProgressException and a NoMessagePendingException or something)
 to differentiate between fetching or no messages left.

 * A nextMsgs() method which returns all locally available messages
 and kicks off a fetch for the next chunk.



 If you are trying to add transactional features, then formally define a
 DTP capability and pull in other server frameworks to share the
 implementation.  Should it be XA/Open?  How about a new peer2peer DTP
 protocol?



 Thank you,

 Robert



 Robert Withers

 Staff Analyst/Developer

 o: (720) 514-8963

 c:  (571) 262-1873



 -Original Message-
 From: Jay Kreps [mailto:jay.kr...@gmail.com]
 Sent: Sunday, February 16, 2014 10:13 AM
 To: users@kafka.apache.org
 Subject: Re: New Consumer API discussion



 +1 I think those are good. It is a little weird that changing the fetch

 point is not batched but changing the commit point is, but I suppose there
 is no helping that.



 -Jay





 On Sat, Feb 15, 2014 at 7:52 AM, Neha Narkhede neha.narkh...@gmail.com
 mailto:neha.narkh...@gmail.comwrote:



  Jay,

 

  That makes sense. position/seek deal with changing the consumers

  in-memory data, so there is no remote rpc there. For some reason, I

  got committed and seek mixed up in my head at that time :)

 

  So we still end up with

 

 long position(TopicPartition tp)

 void seek(TopicPartitionOffset p)

 MapTopicPartition, Long committed(TopicPartition tp);

 void commit(TopicPartitionOffset...);

 

  Thanks,

  Neha

 

  On Friday, February 14, 2014, Jay Kreps jay.kr...@gmail.commailto:
 jay.kr...@gmail.com wrote:

 

   Oh, interesting. So I am assuming the following implementation:

   1. We have an in-memory fetch position which controls the next fetch

   offset.

   2. Changing this has no effect until you poll again at which point

   your fetch request will be from the newly specified offset 3. We

   then have an in-memory but also remotely stored committed offset.

   4. Calling commit has the effect of saving the fetch position as

   both the in memory committed position and in the remote store 5.

   Auto-commit is the same as periodically calling commit on all

  positions.

  

   So batching on commit as well as getting the committed position

   makes sense, but batching the fetch position wouldn't, right? I

   think you are actually thinking of a different approach.

  

   -Jay

  

  

   On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede

   neha.narkh...@gmail.com

  javascript:;

   wrote:

  

I think you are saying both, i.e. if you have committed on a

partition it returns you that value but if you

   haven't

it does a remote lookup?

   

Correct.

   

The other argument for making committed batched is that commit()

is batched, so there is symmetry.

   

position() and seek() are always in memory changes (I assume) so

there

  is

no need to batch them.

   

I'm not as sure as you are about that assumption being true.

Basically

  in

my example above, the batching argument for committed() also

applies to

position() since one purpose of fetching a partition's offset is

to use

   it

to set the position of the consumer to that offset. Since that

might

  lead

to a remote OffsetRequest call, I think we probably would be

better off batching it.

   

Another option for naming would be position/reposition instead of

position/seek.

   

I think position/seek is better since it aligns with Java file APIs.

   

I also think your suggestion about ConsumerPosition makes sense.

   

Thanks,

Neha

On Feb 13, 2014 9:22 PM, Jay Kreps jay.kr...@gmail.commailto:
 jay.kr...@gmail.com wrote:

   

 Hey Neha,



 I actually wasn't proposing the name TopicOffsetPosition, that

 was

   just a

 typo. I meant TopicPartitionOffset, and I was just referencing

 what

  was

in

 the javadoc. So to restate my proposal without the typo, using

 just

  the

 existing classes (that naming is a separate question):

long position(TopicPartition tp)

void seek(TopicPartitionOffset p)

long committed(TopicPartition tp)

void commit(TopicPartitionOffset

Re: New Consumer API discussion

2014-02-15 Thread Neha Narkhede
Jay,

That makes sense. position/seek deal with changing the consumers in-memory
data, so there is no remote rpc there. For some reason, I got committed and
seek mixed up in my head at that time :)

So we still end up with

   long position(TopicPartition tp)
   void seek(TopicPartitionOffset p)
   MapTopicPartition, Long committed(TopicPartition tp);
   void commit(TopicPartitionOffset...);

Thanks,
Neha

On Friday, February 14, 2014, Jay Kreps jay.kr...@gmail.com wrote:

 Oh, interesting. So I am assuming the following implementation:
 1. We have an in-memory fetch position which controls the next fetch
 offset.
 2. Changing this has no effect until you poll again at which point your
 fetch request will be from the newly specified offset
 3. We then have an in-memory but also remotely stored committed offset.
 4. Calling commit has the effect of saving the fetch position as both the
 in memory committed position and in the remote store
 5. Auto-commit is the same as periodically calling commit on all positions.

 So batching on commit as well as getting the committed position makes
 sense, but batching the fetch position wouldn't, right? I think you are
 actually thinking of a different approach.

 -Jay


 On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede 
 neha.narkh...@gmail.comjavascript:;
 wrote:

  I think you are saying both, i.e. if you
  have committed on a partition it returns you that value but if you
 haven't
  it does a remote lookup?
 
  Correct.
 
  The other argument for making committed batched is that commit() is
  batched, so there is symmetry.
 
  position() and seek() are always in memory changes (I assume) so there is
  no need to batch them.
 
  I'm not as sure as you are about that assumption being true. Basically in
  my example above, the batching argument for committed() also applies to
  position() since one purpose of fetching a partition's offset is to use
 it
  to set the position of the consumer to that offset. Since that might lead
  to a remote OffsetRequest call, I think we probably would be better off
  batching it.
 
  Another option for naming would be position/reposition instead
  of position/seek.
 
  I think position/seek is better since it aligns with Java file APIs.
 
  I also think your suggestion about ConsumerPosition makes sense.
 
  Thanks,
  Neha
  On Feb 13, 2014 9:22 PM, Jay Kreps jay.kr...@gmail.com wrote:
 
   Hey Neha,
  
   I actually wasn't proposing the name TopicOffsetPosition, that was
 just a
   typo. I meant TopicPartitionOffset, and I was just referencing what was
  in
   the javadoc. So to restate my proposal without the typo, using just the
   existing classes (that naming is a separate question):
  long position(TopicPartition tp)
  void seek(TopicPartitionOffset p)
  long committed(TopicPartition tp)
  void commit(TopicPartitionOffset...);
  
   So I may be unclear on committed() (AKA lastCommittedOffset). Is it
   returning the in-memory value from the last commit by this consumer, or
  is
   it doing a remote fetch, or both? I think you are saying both, i.e. if
  you
   have committed on a partition it returns you that value but if you
  haven't
   it does a remote lookup?
  
   The other argument for making committed batched is that commit() is
   batched, so there is symmetry.
  
   position() and seek() are always in memory changes (I assume) so there
 is
   no need to batch them.
  
   So taking all that into account what if we revise it to
  long position(TopicPartition tp)
  void seek(TopicPartitionOffset p)
  MapTopicPartition, Long committed(TopicPartition tp);
  void commit(TopicPartitionOffset...);
  
   This is not symmetric between position/seek and commit/committed but it
  is
   convenient. Another option for naming would be position/reposition
  instead
   of position/seek.
  
   With respect to the name TopicPartitionOffset, what I was trying to say
  is
   that I recommend we change that to something shorter. I think
  TopicPosition
   or ConsumerPosition might be better. Position does not refer to the
   variables in the object, it refers to the meaning of the object--it
   represents a position within a topic. The offset field in that object
 is
   still called the offset. TopicOffset, PartitionOffset, or
 ConsumerOffset
   would all be workable too. Basically I am just objecting to
 concatenating
   three nouns together. :-)
  
   -Jay
  
  
  
  
  
   On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede 
 neha.narkh...@gmail.com
   wrote:
  
2. It returns a list of results. But how can you use the list? The
 only
   way
to use the list is to make a map of tp=offset and then look up
 results
   in
this map (or do a for loop over the list for the partition you
 want). I
recommend that if this is an in-memory check we just do one at a
 time.
   E.g.
long committedPosition(
TopicPosition).
   
This was discussed in the previous emails. There is a choic


Re: New Consumer API discussion

2014-02-14 Thread Jay Kreps
Oh, interesting. So I am assuming the following implementation:
1. We have an in-memory fetch position which controls the next fetch
offset.
2. Changing this has no effect until you poll again at which point your
fetch request will be from the newly specified offset
3. We then have an in-memory but also remotely stored committed offset.
4. Calling commit has the effect of saving the fetch position as both the
in memory committed position and in the remote store
5. Auto-commit is the same as periodically calling commit on all positions.

So batching on commit as well as getting the committed position makes
sense, but batching the fetch position wouldn't, right? I think you are
actually thinking of a different approach.

-Jay


On Thu, Feb 13, 2014 at 10:40 PM, Neha Narkhede neha.narkh...@gmail.comwrote:

 I think you are saying both, i.e. if you
 have committed on a partition it returns you that value but if you haven't
 it does a remote lookup?

 Correct.

 The other argument for making committed batched is that commit() is
 batched, so there is symmetry.

 position() and seek() are always in memory changes (I assume) so there is
 no need to batch them.

 I'm not as sure as you are about that assumption being true. Basically in
 my example above, the batching argument for committed() also applies to
 position() since one purpose of fetching a partition's offset is to use it
 to set the position of the consumer to that offset. Since that might lead
 to a remote OffsetRequest call, I think we probably would be better off
 batching it.

 Another option for naming would be position/reposition instead
 of position/seek.

 I think position/seek is better since it aligns with Java file APIs.

 I also think your suggestion about ConsumerPosition makes sense.

 Thanks,
 Neha
 On Feb 13, 2014 9:22 PM, Jay Kreps jay.kr...@gmail.com wrote:

  Hey Neha,
 
  I actually wasn't proposing the name TopicOffsetPosition, that was just a
  typo. I meant TopicPartitionOffset, and I was just referencing what was
 in
  the javadoc. So to restate my proposal without the typo, using just the
  existing classes (that naming is a separate question):
 long position(TopicPartition tp)
 void seek(TopicPartitionOffset p)
 long committed(TopicPartition tp)
 void commit(TopicPartitionOffset...);
 
  So I may be unclear on committed() (AKA lastCommittedOffset). Is it
  returning the in-memory value from the last commit by this consumer, or
 is
  it doing a remote fetch, or both? I think you are saying both, i.e. if
 you
  have committed on a partition it returns you that value but if you
 haven't
  it does a remote lookup?
 
  The other argument for making committed batched is that commit() is
  batched, so there is symmetry.
 
  position() and seek() are always in memory changes (I assume) so there is
  no need to batch them.
 
  So taking all that into account what if we revise it to
 long position(TopicPartition tp)
 void seek(TopicPartitionOffset p)
 MapTopicPartition, Long committed(TopicPartition tp);
 void commit(TopicPartitionOffset...);
 
  This is not symmetric between position/seek and commit/committed but it
 is
  convenient. Another option for naming would be position/reposition
 instead
  of position/seek.
 
  With respect to the name TopicPartitionOffset, what I was trying to say
 is
  that I recommend we change that to something shorter. I think
 TopicPosition
  or ConsumerPosition might be better. Position does not refer to the
  variables in the object, it refers to the meaning of the object--it
  represents a position within a topic. The offset field in that object is
  still called the offset. TopicOffset, PartitionOffset, or ConsumerOffset
  would all be workable too. Basically I am just objecting to concatenating
  three nouns together. :-)
 
  -Jay
 
 
 
 
 
  On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede neha.narkh...@gmail.com
  wrote:
 
   2. It returns a list of results. But how can you use the list? The only
  way
   to use the list is to make a map of tp=offset and then look up results
  in
   this map (or do a for loop over the list for the partition you want). I
   recommend that if this is an in-memory check we just do one at a time.
  E.g.
   long committedPosition(
   TopicPosition).
  
   This was discussed in the previous emails. There is a choice between
   returning a map or a list. Some people found the map to be more usable.
  
   What if we made it:
  long position(TopicPartition tp)
  void seek(TopicOffsetPosition p)
  long committed(TopicPartition tp)
  void commit(TopicOffsetPosition...);
  
   This is fine, but TopicOffsetPosition doesn't make sense. Offset and
   Position is confusing. Also both fetch and commit positions are related
  to
   partitions, not topics. Some more options are TopicPartitionPosition or
   TopicPartitionOffset. And we should use either position everywhere in
  Kafka
   or offset but having both is confusing.
  
  

Re: New Consumer API discussion

2014-02-13 Thread Neha Narkhede
Pradeep -

Thanks for your detailed comments.

1.

   subscribe(String topic, int... paritions) and unsubscribe(String topic,
   int... partitions) should be subscribe(TopicPartition...
topicPartitions)and unsubscribe(TopicPartition...
   topicPartitons)

I think that is reasonable. Overall, I'm in favor of exposing
TopicPartition and TopicPartitionOffset as public APIs. They make the APIs
more readable especially given that the consumer aims to provide a small
set of APIs to support a wide range of functionalities. I will make that
change if there are no objections.

2.

   Does it make sense to provide a convenience method to subscribe to
   topics at a particular offset directly? E.g.
subscribe(
TopicPartitionOffset...
   offsets)

 I view subscriptions a little differently. One subscribes to resources. In
this case, either topics (when you use group management) or specific
partitions. Offsets are specific to the consumption protocol and unrelated
to subscription which just expresses the user's interest in certain
resources. Also, if we have one way to specify fetch offsets (positions()),
I'd like to avoid creating *n* APIs to do the same thing, since that just
makes the consumer APIs more bulky and eventually confusing.

3.

   The javadoc makes no mention of what would happen if positions() is
   called with a TopicPartitionOffset to which the Consumer is not
   subscribed to.

 Good point. Fixed the
javadochttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html#positions%28kafka.common.TopicPartitionOffset...%29

4.

   The javadoc makes no mention of what would happen if positions() is
   called with two different offsets for a single TopicPartition

positions() can be called multiple times and hence with different offsets.
I think I mentioned in the latest javadoc that positions() will change the
offset on the next fetch request (poll()). Improved the javadoc to
explicitly mention this case.

5. The javadoc shows lastCommittedOffsets() return type as
   ListTopicPartitionOffset. This should either be MapTopicPartition,
   Long or MapTopicPartition, TopicPartitionOffset

 This depends on how the user would use the committed offsets. One example
I could think off and is mentioned in the javadoc for
lastCommittedOffsets() is to rewind consumption. In this case, you may or
may not require random access to a particular partition's offset, depending
on whether you want to selectively rewind consumption or not. So it may be
fine to return a map. I'm not sure if people can think of other uses of
this API though. In any case, if we
wanted to change this to a map, I'd prefer MapTopicPartition, Long.

   6. It seems like #4 can be avoided by using MapTopicPartition,
Long or MapTopicPartition,
   TopicPartitionOffset as the argument type.

How? lastCommittedOffsets() is independent of positions(). I'm not sure I
understood your suggestion.

   7. To address #3, maybe we can return ListTopicPartitionOffset that
   are invalid.

I don't particularly see the advantage of returning a list of invalid
partitions from position(). It seems a bit awkward to return a list to
indicate what is obviously a bug. Prefer throwing an error since the user
should just fix that logic.

Thanks,
Neha



On Wed, Feb 12, 2014 at 3:59 PM, Jay Kreps jay.kr...@gmail.com wrote:

 Ah, gotcha.

 -Jay


 On Wed, Feb 12, 2014 at 8:40 AM, Neha Narkhede neha.narkh...@gmail.com
 wrote:

  Jay
 
  Well none kind of address the common case which is to commit all
  partitions. For these I was thinking just
 commit();
  The advantage of this simpler method is that you don't need to bother
 about
  partitions you just consume the messages given to you and then commit
 them
 
  This is already what the commit() API is supposed to do. Here is the
  javadoc -
 
  * Synchronously commits the specified offsets for the specified list
 of
  topics and partitions to Kafka. If no partitions are specified,
   * commits offsets for the subscribed list of topics and partitions
 to
  Kafka.
 
  public void commit(TopicPartitionOffset... offsets);
 
  Could you take another look at the
  javadoc
 
 http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
  ?
  I've uploaded changes from the previous discussions and included some of
  your review suggestions.
 
 
 
  On Wed, Feb 12, 2014 at 8:32 AM, Neha Narkhede neha.narkh...@gmail.com
  wrote:
 
   Imran,
  
  
   Sorry I am probably missing
   something basic, but I'm not sure how a multi-threaded consumer would
   work.  I can imagine its either:
  
   a) I just have one thread poll kafka.  If I want to process msgs in
   multiple threads, than I deal w/ that after polling, eg. stick them
 into
  a
   blocking queue or something, and have more threads that read from the
   queue.
  
   b) each thread creates its own KafkaConsumer.  They are all registered
  the
   same 

Re: New Consumer API discussion

2014-02-13 Thread Jay Kreps
Hey guys,

One thing that bugs me is the lack of symmetric for the different position
calls. The way I see it there are two positions we maintain: the fetch
position and the last commit position. There are two things you can do to
these positions: get the current value or change the current value. But the
names somewhat obscure this:
  Fetch position:
- No get
- set by positions(TopicOffsetPosition...)
  Committed position:
- get by ListTopicOffsetPosition lastCommittedPosition(
TopicPartition...)
- set by commit or commitAsync

The lastCommittedPosition is particular bothersome because:
1. The name is weird and long
2. It returns a list of results. But how can you use the list? The only way
to use the list is to make a map of tp=offset and then look up results in
this map (or do a for loop over the list for the partition you want). I
recommend that if this is an in-memory check we just do one at a time. E.g.
long committedPosition(TopicPosition).

What if we made it:
   long position(TopicPartition tp)
   void seek(TopicOffsetPosition p)
   long committed(TopicPartition tp)
   void commit(TopicOffsetPosition...);

This still isn't terribly consistent, but I think it is better.

I would also like to shorten the name TopicOffsetPosition. Offset and
Position are duplicative of each other. So perhaps we could call it a
PartitionOffset or a TopicPosition or something like that. In general class
names that are just a concatenation of the fields (e.g.
TopicAndPartitionAndOffset) seem kind of lazy to me since the name doesn't
really describe it just enumerates. But that is more of a nit pick.

-Jay


On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede neha.narkh...@gmail.comwrote:

 As mentioned in previous emails, we are also working on a re-implementation
 of the consumer. I would like to use this email thread to discuss the
 details of the public API. I would also like us to be picky about this
 public api now so it is as good as possible and we don't need to break it
 in the future.

 The best way to get a feel for the API is actually to take a look at the
 javadoc
 http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
 ,
 the hope is to get the api docs good enough so that it is self-explanatory.
 You can also take a look at the configs
 here
 http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html
 

 Some background info on implementation:

 At a high level the primary difference in this consumer is that it removes
 the distinction between the high-level and low-level consumer. The new
 consumer API is non blocking and instead of returning a blocking iterator,
 the consumer provides a poll() API that returns a list of records. We think
 this is better compared to the blocking iterators since it effectively
 decouples the threading strategy used for processing messages from the
 consumer. It is worth noting that the consumer is entirely single threaded
 and runs in the user thread. The advantage is that it can be easily
 rewritten in less multi-threading-friendly languages. The consumer batches
 data and multiplexes I/O over TCP connections to each of the brokers it
 communicates with, for high throughput. The consumer also allows long poll
 to reduce the end-to-end message latency for low throughput data.

 The consumer provides a group management facility that supports the concept
 of a group with multiple consumer instances (just like the current
 consumer). This is done through a custom heartbeat and group management
 protocol transparent to the user. At the same time, it allows users the
 option to subscribe to a fixed set of partitions and not use group
 management at all. The offset management strategy defaults to Kafka based
 offset management and the API provides a way for the user to use a
 customized offset store to manage the consumer's offsets.

 A key difference in this consumer also is the fact that it does not depend
 on zookeeper at all.

 More details about the new consumer design are
 here
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
 

 Please take a look at the new
 API
 http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
 and
 give us any thoughts you may have.

 Thanks,
 Neha



Re: New Consumer API discussion

2014-02-13 Thread Pradeep Gollakota
Hi Neha,

   6. It seems like #4 can be avoided by using MapTopicPartition,
 Long or MapTopicPartition, TopicPartitionOffset as the argument type.

 How? lastCommittedOffsets() is independent of positions(). I'm not sure I
 understood your suggestion.

I think of subscription as you're subscribing to a Set of TopicPartitions.
Because the argument to positions() is TopicPartitionOffset ... it's
conceivable that the method can be called with two offsets for the same
TopicPartition. One way to handle this, is to accept either the first or
the last offset for a TopicPartition. However, if the argument type is
changed to MapTopicPartition, Long it precludes the possibility of
getting duplicate offsets of the same TopicPartition.

   7. To address #3, maybe we can return ListTopicPartitionOffset that are
 invalid.

 I don't particularly see the advantage of returning a list of invalid

partitions from position(). It seems a bit awkward to return a list to

indicate what is obviously a bug. Prefer throwing an error since the user
 should just fix that logic.

I'm not sure if an Exception is needed or desirable here. I don't see this
as a catastrophic failure or a non-recoverable failure. Even if we just
write the bad offsets to a log file and call it a day, I'm ok with that.
But my main goal is to communicate to the API users somehow that they've
provided bad offests which are simply being ignored.

Hi Jay,

I would also like to shorten the name TopicOffsetPosition. Offset and
 Position are duplicative of each other. So perhaps we could call it a
 PartitionOffset or a TopicPosition or something like that. In general class
 names that are just a concatenation of the fields (e.g.
 TopicAndPartitionAndOffset) seem kind of lazy to me since the name doesn't
 really describe it just enumerates. But that is more of a nit pick.


   1. Did you mean to say TopicPartitionOffset instead of
   TopicOffsetPosition?
   2. +1 on PartitionOffset

The lastCommittedPosition is particular bothersome because:
 1. The name is weird and long
 2. It returns a list of results. But how can you use the list? The only way
 to use the list is to make a map of tp=offset and then look up results in
 this map (or do a for loop over the list for the partition you want).

This is sort of what I was talking about in my previous email. My
suggestion was to change the return type to MapTopicPartition, Long.

What if we made it:
long position(TopicPartition tp)
void seek(TopicOffsetPosition p)
long committed(TopicPartition tp)
void commit(TopicOffsetPosition...);


   1. Absolutely love the idea of position(TopicPartition tp).
   2. I think we also need to provide a method for accessing all positions
   positions() which maybe returns a MapTopicPartition, Long?
   3. What is the difference between position(TopicPartition tp) and
committed(TopicPartition
   tp)?
   4. +1 on commit(PartitionOffset...)
   5. +1 on seek(PartitionOffset p)
   6. We should also provide a seek(PartitionOffset... offsets)

Finally, in all the methods where we're using varargs, we should use an
appropriate Collection data structure. For example, for the
subscribe(TopicPartition...
partitions) method, I think a more accurate API would be
subscribe(SetTopicPartition
partitions). This allows for the code to be self-documenting.


Re: New Consumer API discussion

2014-02-13 Thread Neha Narkhede
I think you are saying both, i.e. if you
have committed on a partition it returns you that value but if you haven't
it does a remote lookup?

Correct.

The other argument for making committed batched is that commit() is
batched, so there is symmetry.

position() and seek() are always in memory changes (I assume) so there is
no need to batch them.

I'm not as sure as you are about that assumption being true. Basically in
my example above, the batching argument for committed() also applies to
position() since one purpose of fetching a partition's offset is to use it
to set the position of the consumer to that offset. Since that might lead
to a remote OffsetRequest call, I think we probably would be better off
batching it.

Another option for naming would be position/reposition instead
of position/seek.

I think position/seek is better since it aligns with Java file APIs.

I also think your suggestion about ConsumerPosition makes sense.

Thanks,
Neha
On Feb 13, 2014 9:22 PM, Jay Kreps jay.kr...@gmail.com wrote:

 Hey Neha,

 I actually wasn't proposing the name TopicOffsetPosition, that was just a
 typo. I meant TopicPartitionOffset, and I was just referencing what was in
 the javadoc. So to restate my proposal without the typo, using just the
 existing classes (that naming is a separate question):
long position(TopicPartition tp)
void seek(TopicPartitionOffset p)
long committed(TopicPartition tp)
void commit(TopicPartitionOffset...);

 So I may be unclear on committed() (AKA lastCommittedOffset). Is it
 returning the in-memory value from the last commit by this consumer, or is
 it doing a remote fetch, or both? I think you are saying both, i.e. if you
 have committed on a partition it returns you that value but if you haven't
 it does a remote lookup?

 The other argument for making committed batched is that commit() is
 batched, so there is symmetry.

 position() and seek() are always in memory changes (I assume) so there is
 no need to batch them.

 So taking all that into account what if we revise it to
long position(TopicPartition tp)
void seek(TopicPartitionOffset p)
MapTopicPartition, Long committed(TopicPartition tp);
void commit(TopicPartitionOffset...);

 This is not symmetric between position/seek and commit/committed but it is
 convenient. Another option for naming would be position/reposition instead
 of position/seek.

 With respect to the name TopicPartitionOffset, what I was trying to say is
 that I recommend we change that to something shorter. I think TopicPosition
 or ConsumerPosition might be better. Position does not refer to the
 variables in the object, it refers to the meaning of the object--it
 represents a position within a topic. The offset field in that object is
 still called the offset. TopicOffset, PartitionOffset, or ConsumerOffset
 would all be workable too. Basically I am just objecting to concatenating
 three nouns together. :-)

 -Jay





 On Thu, Feb 13, 2014 at 1:54 PM, Neha Narkhede neha.narkh...@gmail.com
 wrote:

  2. It returns a list of results. But how can you use the list? The only
 way
  to use the list is to make a map of tp=offset and then look up results
 in
  this map (or do a for loop over the list for the partition you want). I
  recommend that if this is an in-memory check we just do one at a time.
 E.g.
  long committedPosition(
  TopicPosition).
 
  This was discussed in the previous emails. There is a choice between
  returning a map or a list. Some people found the map to be more usable.
 
  What if we made it:
 long position(TopicPartition tp)
 void seek(TopicOffsetPosition p)
 long committed(TopicPartition tp)
 void commit(TopicOffsetPosition...);
 
  This is fine, but TopicOffsetPosition doesn't make sense. Offset and
  Position is confusing. Also both fetch and commit positions are related
 to
  partitions, not topics. Some more options are TopicPartitionPosition or
  TopicPartitionOffset. And we should use either position everywhere in
 Kafka
  or offset but having both is confusing.
 
 void seek(TopicOffsetPosition p)
 long committed(TopicPartition tp)
 
  Whether these are batched or not really depends on how flexible we want
  these APIs to be. The question is whether we allow a consumer to fetch or
  set the offsets for partitions that it doesn't own or consume. For
 example,
  if I choose to skip group management and do my own partition assignment
 but
  choose Kafka based offset management. I could imagine a use case where I
  want to change the partition assignment on the fly, and to do that, I
 would
  need to fetch the last committed offsets of partitions that I currently
  don't consume.
 
  If we want to allow this, these APIs would be more performant if batched.
  And would probably look like -
 MapTopicPartition, Long positions(TopicPartition... tp)
 void seek(TopicOffsetPosition... p)
 MapTopicPartition, Long committed(TopicPartition... tp)
 void 

Re: New Consumer API discussion

2014-02-12 Thread Neha Narkhede
Imran,

Sorry I am probably missing
something basic, but I'm not sure how a multi-threaded consumer would
work.  I can imagine its either:

a) I just have one thread poll kafka.  If I want to process msgs in
multiple threads, than I deal w/ that after polling, eg. stick them into a
blocking queue or something, and have more threads that read from the queue.

b) each thread creates its own KafkaConsumer.  They are all registered the
same way, and I leave it to kafka to figure out what data to give to each
one.

We designed the new consumer API to not require multi threading on purpose.
The reason this is better than the existing ZookeeperConsumerConnector is
that
it effectively allows the user to use whatever threading and load balance
message
processing amongst those threads. For example, you might want more threads
dedicated
to a certain high throughput partition compared to other partitions. In
option a) above, you can
create your own thread pool and hand over the messages returned by poll
using a blocking
queue or any other approach. Option b) would work as well and the user has
to figure out which
topics each KafkaConsumer subscribes to.

(a) certainly makes things simple, but I worry about throughput -- is that
just as good as having one thread trying to consumer each partition?

(b) makes it a bit of a pain to figure out how many threads to use.  I
assume there is no point in using more threads than there are partitions,
so first you've got to figure out how many partitions there are in each
topic.  Might be nice if there were some util functions to simplify this.

The user can pick the number of threads. That is still better as only the
user knows how
slow/fast the message processing of her application is.

Also, since the initial call to subscribe doesn't give the partition
assignment, does that mean the first call to poll() will always call the
ConsumerRebalanceCallback?

Assuming you choose to use group management (by using subscribe(topics)),
poll() will invoke
the ConsumerRebalanceCallback on every single rebalance attempt. Improved
the 
javadochttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.htmlto
explain that. Could you give that another look?

If I'm on the right track, I'd like to expand this example, showing how
each MyConsumer can keep track of its partitions  offsets, even in the
face of rebalances.  As Jay said, I think a minimal code example could
really help us see the utility  faults of the api.

Sure, please look at the
javadochttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html.
I've tried to include code examples there. Please help in
improving those or adding more. Looks like we should add some multi
threading examples. I avoided
adding those since there are many ways to handling the message processing
and it will not be feasible
to list all of those. If we list one, people might think that is the only
recommended approach.

With that said, here is an example of using Option b) above -

ListMyConsumer consumers = new ArrayListMyConsumer();
ListString topics = new ArrayListString();
// populate topics
assert(consumers.size == topics.size);
for (int i = 0; i  numThreads; i++) {
  MyConsumer c = new MyConsumer();
  c.subscribe(topics(i));
  consumers.add(c);
}
// poll each consumer in a separate thread.
for (int i = 0; i  numThreads; i++) {
   executorService.submit(new Runnable() {
@Override
 public void run() {
 new ProcessMessagesTask(consumers(i));
 }
   });
}

Let me know what you think.

Thanks,
Neha

On Tue, Feb 11, 2014 at 3:54 PM, Jay Kreps jay.kr...@gmail.com wrote:

 Comments inline:


 On Mon, Feb 10, 2014 at 2:31 PM, Guozhang Wang wangg...@gmail.com wrote:

  Hello Jay,
 
  Thanks for the detailed comments.
 
  1. Yeah we could discuss a bit more on that.
 
  2. Since subscribe() is incremental, adding one topic-partition is OK,
 and
  personally I think it is cleaner than subscribe(String topic,
  int...partition)?
 
 I am not too particular. Have you actually tried this? I think writing
 actual sample code is important.


  3. Originally I was thinking about two interfaces:
 
  getOffsets() // offsets for all partitions that I am consuming now
 
  getOffset(topc-partition) // offset of the specified topic-partition,
 will
  throw exception if it is not currently consumed.
 
  What do you think about these?
 

 The naming needs to distinguish committed offset position versus fetch
 offset position. Also we aren't using the getX convention.


  4. Yes, that remains a config.
 

 Does that make sense given that you change your position via an api now?


  5. Agree.
 
  6. If the time out value is null then it will logically return
  immediately with whatever data is available. I think an indefinitely
 poll()
  function could be replaced with just
 
  while (true) poll(some-time)?
 

 That is fine but we 

Re: New Consumer API discussion

2014-02-12 Thread Neha Narkhede
Jay

Well none kind of address the common case which is to commit all
partitions. For these I was thinking just
   commit();
The advantage of this simpler method is that you don't need to bother about
partitions you just consume the messages given to you and then commit them

This is already what the commit() API is supposed to do. Here is the
javadoc -

* Synchronously commits the specified offsets for the specified list of
topics and partitions to Kafka. If no partitions are specified,
 * commits offsets for the subscribed list of topics and partitions to
Kafka.

public void commit(TopicPartitionOffset... offsets);

Could you take another look at the
javadochttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html?
I've uploaded changes from the previous discussions and included some of
your review suggestions.



On Wed, Feb 12, 2014 at 8:32 AM, Neha Narkhede neha.narkh...@gmail.comwrote:

 Imran,


 Sorry I am probably missing
 something basic, but I'm not sure how a multi-threaded consumer would
 work.  I can imagine its either:

 a) I just have one thread poll kafka.  If I want to process msgs in
 multiple threads, than I deal w/ that after polling, eg. stick them into a
 blocking queue or something, and have more threads that read from the
 queue.

 b) each thread creates its own KafkaConsumer.  They are all registered the
 same way, and I leave it to kafka to figure out what data to give to each
 one.

 We designed the new consumer API to not require multi threading on
 purpose.
 The reason this is better than the existing ZookeeperConsumerConnector is
 that
 it effectively allows the user to use whatever threading and load balance
 message
 processing amongst those threads. For example, you might want more threads
 dedicated
 to a certain high throughput partition compared to other partitions. In
 option a) above, you can
 create your own thread pool and hand over the messages returned by poll
 using a blocking
 queue or any other approach. Option b) would work as well and the user
 has to figure out which
 topics each KafkaConsumer subscribes to.


 (a) certainly makes things simple, but I worry about throughput -- is that
 just as good as having one thread trying to consumer each partition?

 (b) makes it a bit of a pain to figure out how many threads to use.  I
 assume there is no point in using more threads than there are partitions,
 so first you've got to figure out how many partitions there are in each
 topic.  Might be nice if there were some util functions to simplify this.

 The user can pick the number of threads. That is still better as only the
 user knows how
 slow/fast the message processing of her application is.

 Also, since the initial call to subscribe doesn't give the partition
 assignment, does that mean the first call to poll() will always call the
 ConsumerRebalanceCallback?

 Assuming you choose to use group management (by using subscribe(topics)),
 poll() will invoke
 the ConsumerRebalanceCallback on every single rebalance attempt. Improved
 the 
 javadochttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.htmlto
 explain that. Could you give that another look?

 If I'm on the right track, I'd like to expand this example, showing how
 each MyConsumer can keep track of its partitions  offsets, even in the
 face of rebalances.  As Jay said, I think a minimal code example could
 really help us see the utility  faults of the api.

 Sure, please look at the 
 javadochttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html.
 I've tried to include code examples there. Please help in
 improving those or adding more. Looks like we should add some multi
 threading examples. I avoided
 adding those since there are many ways to handling the message processing
 and it will not be feasible
 to list all of those. If we list one, people might think that is the only
 recommended approach.

 With that said, here is an example of using Option b) above -


 ListMyConsumer consumers = new ArrayListMyConsumer();
 ListString topics = new ArrayListString();
 // populate topics
 assert(consumers.size == topics.size);

 for (int i = 0; i  numThreads; i++) {
   MyConsumer c = new MyConsumer();
   c.subscribe(topics(i));
   consumers.add(c);
 }
 // poll each consumer in a separate thread.
 for (int i = 0; i  numThreads; i++) {
executorService.submit(new Runnable() {
 @Override
  public void run() {
  new ProcessMessagesTask(consumers(i));
  }
});
 }

 Let me know what you think.

 Thanks,
 Neha

 On Tue, Feb 11, 2014 at 3:54 PM, Jay Kreps jay.kr...@gmail.com wrote:

 Comments inline:


 On Mon, Feb 10, 2014 at 2:31 PM, Guozhang Wang wangg...@gmail.com
 wrote:

  Hello Jay,
 
  Thanks for the detailed comments.
 
  1. Yeah we could discuss a bit more on that.
 
  2. Since 

Re: New Consumer API discussion

2014-02-12 Thread Jay Kreps
Ah, gotcha.

-Jay


On Wed, Feb 12, 2014 at 8:40 AM, Neha Narkhede neha.narkh...@gmail.comwrote:

 Jay

 Well none kind of address the common case which is to commit all
 partitions. For these I was thinking just
commit();
 The advantage of this simpler method is that you don't need to bother about
 partitions you just consume the messages given to you and then commit them

 This is already what the commit() API is supposed to do. Here is the
 javadoc -

 * Synchronously commits the specified offsets for the specified list of
 topics and partitions to Kafka. If no partitions are specified,
  * commits offsets for the subscribed list of topics and partitions to
 Kafka.

 public void commit(TopicPartitionOffset... offsets);

 Could you take another look at the
 javadoc
 http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
 ?
 I've uploaded changes from the previous discussions and included some of
 your review suggestions.



 On Wed, Feb 12, 2014 at 8:32 AM, Neha Narkhede neha.narkh...@gmail.com
 wrote:

  Imran,
 
 
  Sorry I am probably missing
  something basic, but I'm not sure how a multi-threaded consumer would
  work.  I can imagine its either:
 
  a) I just have one thread poll kafka.  If I want to process msgs in
  multiple threads, than I deal w/ that after polling, eg. stick them into
 a
  blocking queue or something, and have more threads that read from the
  queue.
 
  b) each thread creates its own KafkaConsumer.  They are all registered
 the
  same way, and I leave it to kafka to figure out what data to give to each
  one.
 
  We designed the new consumer API to not require multi threading on
  purpose.
  The reason this is better than the existing ZookeeperConsumerConnector is
  that
  it effectively allows the user to use whatever threading and load balance
  message
  processing amongst those threads. For example, you might want more
 threads
  dedicated
  to a certain high throughput partition compared to other partitions. In
  option a) above, you can
  create your own thread pool and hand over the messages returned by poll
  using a blocking
  queue or any other approach. Option b) would work as well and the user
  has to figure out which
  topics each KafkaConsumer subscribes to.
 
 
  (a) certainly makes things simple, but I worry about throughput -- is
 that
  just as good as having one thread trying to consumer each partition?
 
  (b) makes it a bit of a pain to figure out how many threads to use.  I
  assume there is no point in using more threads than there are partitions,
  so first you've got to figure out how many partitions there are in each
  topic.  Might be nice if there were some util functions to simplify this.
 
  The user can pick the number of threads. That is still better as only the
  user knows how
  slow/fast the message processing of her application is.
 
  Also, since the initial call to subscribe doesn't give the partition
  assignment, does that mean the first call to poll() will always call the
  ConsumerRebalanceCallback?
 
  Assuming you choose to use group management (by using subscribe(topics)),
  poll() will invoke
  the ConsumerRebalanceCallback on every single rebalance attempt. Improved
  the javadoc
 http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerRebalanceCallback.html
 to
  explain that. Could you give that another look?
 
  If I'm on the right track, I'd like to expand this example, showing how
  each MyConsumer can keep track of its partitions  offsets, even in the
  face of rebalances.  As Jay said, I think a minimal code example could
  really help us see the utility  faults of the api.
 
  Sure, please look at the javadoc
 http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
 .
  I've tried to include code examples there. Please help in
  improving those or adding more. Looks like we should add some multi
  threading examples. I avoided
  adding those since there are many ways to handling the message processing
  and it will not be feasible
  to list all of those. If we list one, people might think that is the only
  recommended approach.
 
  With that said, here is an example of using Option b) above -
 
 
  ListMyConsumer consumers = new ArrayListMyConsumer();
  ListString topics = new ArrayListString();
  // populate topics
  assert(consumers.size == topics.size);
 
  for (int i = 0; i  numThreads; i++) {
MyConsumer c = new MyConsumer();
c.subscribe(topics(i));
consumers.add(c);
  }
  // poll each consumer in a separate thread.
  for (int i = 0; i  numThreads; i++) {
 executorService.submit(new Runnable() {
  @Override
   public void run() {
   new ProcessMessagesTask(consumers(i));
   }
 });
  }
 
  Let me know what you think.
 
  Thanks,
  Neha
 
  On Tue, Feb 11, 2014 at 3:54 PM, Jay Kreps 

Re: New Consumer API discussion

2014-02-11 Thread Jay Kreps
Hey Pradeep,

That wiki is fairly old and it predated more flexible subscription
mechanisms. In the high-level consumer you currently have wildcard
subscription and in the new proposed interface you can actually subscribe
based on any logic you want to create a union of streams. Personally I
think this gives you everything you would want with a hierarchy and more
actual flexibility (since you can define groupings however you want). What
do you think?

-Jay


On Mon, Feb 10, 2014 at 3:37 PM, Pradeep Gollakota pradeep...@gmail.comwrote:

 WRT to hierarchical topics, I'm referring to
 KAFKA-1175https://issues.apache.org/jira/browse/KAFKA-1175.
 I would just like to think through the implications for the Consumer API if
 and when we do implement hierarchical topics. For example, in the
 proposal
 https://cwiki.apache.org/confluence/display/KAFKA/Hierarchical+Topics#
 written
 by Jay, he says that initially wildcard subscriptions are not going
 to be supported. But does that mean that they will be supported in v2? If
 that's the case, that would change the semantics of the Consumer API.

 As to having classes for Topic, PartitionId, etc. it looks like I was
 referring to the TopicPartition and TopicPartitionOffset classes (I didn't
 realize these were already there). I was only looking at the confluence
 page which shows List[(String, Int, Long)] instead of
 List[TopicParitionOffset] (as is shown in the javadoc). However, I did
 notice that we're not being consistent in the Java version. E.g. we have
 commit(TopicPartitionOffset... offsets) and
 lastCommittedOffsets(TopicPartition... partitions) on the one hand. On the
 other hand we have subscribe(String topic, int... partitions). I agree that
 creating a class for TopicId today would probably not make too much sense
 today. But with hierarchical topics, I may change my mind. This is exactly
 what was done in the HBase API in 0.96 when namespaces were added. 0.96
 HBase API introduced a class called 'TableName' to represent the namespace
 and table name.


 On Mon, Feb 10, 2014 at 3:08 PM, Neha Narkhede neha.narkh...@gmail.com
 wrote:

  Thanks for the feedback.
 
  Mattijs -
 
  - Constructors link to
  http://kafka.apache.org/documentation.html#consumerconfigs for valid
  configurations, which lists zookeeper.connect rather than
  metadata.broker.list, the value for BROKER_LIST_CONFIG in ConsumerConfig.
  Fixed it to just point to ConsumerConfig for now until we finalize the
 new
  configs
  - Docs for poll(long) mention consumer.commit(true), which I can't find
 in
  the Consumer docs. For a simple consumer setup, that call is something
 that
  would make a lot of sense.
  Missed changing the examples to use consumer.commit(true, offsets). The
  suggestions by Jay would change it to commit(offsets) and
  commitAsync(offsets), which will hopefully make it easier to understand
  those commit APIs.
  - Love the addition of MockConsumer, awesome for unittesting :)
  I'm not quite satisfied with what it does as of right now, but we will
  surely improve it as we start writing the consumer.
 
  Jay -
 
  1. ConsumerRebalanceCallback
  a. Makes sense. Renamed to onPartitionsRevoked
  b. Ya, it will be good to make it forward compatible with Java 8
  capabilities. We can change it to PartitionsAssignedCallback and
   PartitionsRevokedCallback or RebalanceBeginCallback and
  RebalanceEndCallback?
  c. Ya, I thought about that but then didn't name it just
  RebalanceCallback since there could be a conflict with a controller side
  rebalance callback if/when we have one. However, you can argue that at
 that
  time we can name it ControllerRebalanceCallback instead of polluting a
 user
  facing API. So agree with you here.
  2. Ya, that is a good idea. Changed to subscribe(String topic,
  int...partitions).
  3. lastCommittedOffset() is not necessarily a local access since the
  consumer can potentially ask for the last committed offsets of partitions
  that the consumer does not consume and maintain the offsets for. That's
 the
  reason it is batched right now.
  4. Yes, look at
 
 
 http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html#AUTO_OFFSET_RESET_CONFIG
  5. Sure, but that is not part of the consumer API right? I think you're
  suggesting looking at OffsetRequest to see if it would do that properly?
  6. Good point. Changed to poll(long timeout, TimeUnit) and poll with a
  negative timeout will poll indefinitely?
  7. Good point. Changed to commit(...) and commitAsync(...)
  8. To commit the current position for all partitions owned by the
 consumer,
  you can use commit(). If you don't use group management, then
  commit(customListOfPartitions)
  9. Forgot to include unsubscribe. Done now.
  10. positions() can be called at any time and affects the next fetch on
 the
  next poll(). Fixed the places that said starting fetch offsets
  11. Can we not look that up by going through the 

Re: New Consumer API discussion

2014-02-11 Thread Pradeep Gollakota
Hi Jay,

I apologize for derailing the conversation about the consumer API. We
should start a new discussion about hierarchical topics, if we want to keep
talking about it. My final thought on the matter is that, hierarchical
topics is still an important feature to have in Kafka, because it gives us
flexibility to do namespace level access controls.

Getting back to the topic of the Consumer API:

   1. Any thoughts on consistency for method arguments and return types?
   2. lastCommittedOffsets() method returns a
ListTopicPartitionOffsetwhere as the confluence page suggested a
MapTopicPartition,
   Long. I would think that a Map is the more appropriate return type.



On Tue, Feb 11, 2014 at 8:04 AM, Jay Kreps jay.kr...@gmail.com wrote:

 Hey Pradeep,

 That wiki is fairly old and it predated more flexible subscription
 mechanisms. In the high-level consumer you currently have wildcard
 subscription and in the new proposed interface you can actually subscribe
 based on any logic you want to create a union of streams. Personally I
 think this gives you everything you would want with a hierarchy and more
 actual flexibility (since you can define groupings however you want). What
 do you think?

 -Jay


 On Mon, Feb 10, 2014 at 3:37 PM, Pradeep Gollakota pradeep...@gmail.com
 wrote:

  WRT to hierarchical topics, I'm referring to
  KAFKA-1175https://issues.apache.org/jira/browse/KAFKA-1175.
  I would just like to think through the implications for the Consumer API
 if
  and when we do implement hierarchical topics. For example, in the
  proposal
  https://cwiki.apache.org/confluence/display/KAFKA/Hierarchical+Topics#
  written
  by Jay, he says that initially wildcard subscriptions are not going
  to be supported. But does that mean that they will be supported in v2? If
  that's the case, that would change the semantics of the Consumer API.
 
  As to having classes for Topic, PartitionId, etc. it looks like I was
  referring to the TopicPartition and TopicPartitionOffset classes (I
 didn't
  realize these were already there). I was only looking at the confluence
  page which shows List[(String, Int, Long)] instead of
  List[TopicParitionOffset] (as is shown in the javadoc). However, I did
  notice that we're not being consistent in the Java version. E.g. we have
  commit(TopicPartitionOffset... offsets) and
  lastCommittedOffsets(TopicPartition... partitions) on the one hand. On
 the
  other hand we have subscribe(String topic, int... partitions). I agree
 that
  creating a class for TopicId today would probably not make too much sense
  today. But with hierarchical topics, I may change my mind. This is
 exactly
  what was done in the HBase API in 0.96 when namespaces were added. 0.96
  HBase API introduced a class called 'TableName' to represent the
 namespace
  and table name.
 
 
  On Mon, Feb 10, 2014 at 3:08 PM, Neha Narkhede neha.narkh...@gmail.com
  wrote:
 
   Thanks for the feedback.
  
   Mattijs -
  
   - Constructors link to
   http://kafka.apache.org/documentation.html#consumerconfigs for valid
   configurations, which lists zookeeper.connect rather than
   metadata.broker.list, the value for BROKER_LIST_CONFIG in
 ConsumerConfig.
   Fixed it to just point to ConsumerConfig for now until we finalize the
  new
   configs
   - Docs for poll(long) mention consumer.commit(true), which I can't find
  in
   the Consumer docs. For a simple consumer setup, that call is something
  that
   would make a lot of sense.
   Missed changing the examples to use consumer.commit(true, offsets). The
   suggestions by Jay would change it to commit(offsets) and
   commitAsync(offsets), which will hopefully make it easier to understand
   those commit APIs.
   - Love the addition of MockConsumer, awesome for unittesting :)
   I'm not quite satisfied with what it does as of right now, but we will
   surely improve it as we start writing the consumer.
  
   Jay -
  
   1. ConsumerRebalanceCallback
   a. Makes sense. Renamed to onPartitionsRevoked
   b. Ya, it will be good to make it forward compatible with Java 8
   capabilities. We can change it to PartitionsAssignedCallback and
PartitionsRevokedCallback or RebalanceBeginCallback and
   RebalanceEndCallback?
   c. Ya, I thought about that but then didn't name it just
   RebalanceCallback since there could be a conflict with a controller
 side
   rebalance callback if/when we have one. However, you can argue that at
  that
   time we can name it ControllerRebalanceCallback instead of polluting a
  user
   facing API. So agree with you here.
   2. Ya, that is a good idea. Changed to subscribe(String topic,
   int...partitions).
   3. lastCommittedOffset() is not necessarily a local access since the
   consumer can potentially ask for the last committed offsets of
 partitions
   that the consumer does not consume and maintain the offsets for. That's
  the
   reason it is batched right now.
   4. Yes, look at
  
  
 
 

Re: New Consumer API discussion

2014-02-11 Thread Jay Kreps
Comments inline:


On Mon, Feb 10, 2014 at 2:31 PM, Guozhang Wang wangg...@gmail.com wrote:

 Hello Jay,

 Thanks for the detailed comments.

 1. Yeah we could discuss a bit more on that.

 2. Since subscribe() is incremental, adding one topic-partition is OK, and
 personally I think it is cleaner than subscribe(String topic,
 int...partition)?

I am not too particular. Have you actually tried this? I think writing
actual sample code is important.


 3. Originally I was thinking about two interfaces:

 getOffsets() // offsets for all partitions that I am consuming now

 getOffset(topc-partition) // offset of the specified topic-partition, will
 throw exception if it is not currently consumed.

 What do you think about these?


The naming needs to distinguish committed offset position versus fetch
offset position. Also we aren't using the getX convention.


 4. Yes, that remains a config.


Does that make sense given that you change your position via an api now?


 5. Agree.

 6. If the time out value is null then it will logically return
 immediately with whatever data is available. I think an indefinitely poll()
 function could be replaced with just

 while (true) poll(some-time)?


That is fine but we should provide a no arg poll for that, poll(null) isn't
clear. We should add the timeunit as per the post java 5 convention as that
makes the call more readable. E.g.
   poll(5) vs poll(5, TimeUnit.MILLISECONDS)


 7. I am open with either approach.


Cool.

8. I was thinking about two interfaces for the commit functionality:


 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design

 Do those sound better?


Well none kind of address the common case which is to commit all
partitions. For these I was thinking just
   commit();
The advantage of this simpler method is that you don't need to bother about
partitions you just consume the messages given to you and then commit them.

9. Currently I think about un-subscribe as close and re-subscribe, and
 would like to hear people's opinion about it.


Hmm, I think it is a little weird if there is a subscribe which can be
called at any time but no unsubscribe. Would this be hard to do.


 10. Yes. Position() is an API function, and as and API it means be called
 at any time and will change the next fetching starting offset.


Cool.


 11. The ConsumerRecord would have the offset info of the message. Is that
 what you want?


But that is only after I have gotten a message. I'm not sure if that covers
all cases or not.


 About use cases: great point. I will add some more examples of using the
 API functions in the wiki pages.

 Guozhang




 On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps jay.kr...@gmail.com wrote:

  A few items:
  1. ConsumerRebalanceCallback
 a. onPartitionsRevoked would be a better name.
 b. We should discuss the possibility of splitting this into two
  interfaces. The motivation would be that in Java 8 single method
 interfaces
  can directly take methods which might be more intuitive.
 c. If we stick with a single interface I would prefer the name
  RebalanceCallback as its more concise
  2. Should subscribe(String topic, int partition) should be
 subscribe(String
  topic, int...partition)?
  3. Is lastCommittedOffset call just a local access? If so it would be
 more
  convenient not to batch it.
  4. How are we going to handle the earliest/latest starting position
  functionality we currently have. Does that remain a config?
  5. Do we need to expose the general ability to get known positions from
 the
  log? E.g. the functionality in the OffsetRequest...? That would make the
  ability to change position a little easier.
  6. Should poll(java.lang.Long timeout) be poll(long timeout, TimeUnit
  unit)? Is it Long because it allows null? If so should we just add a
 poll()
  that polls indefinitely?
  7. I recommend we remove the boolean parameter from commit as it is
 really
  hard to read code that has boolean parameters without named arguments.
 Can
  we make it something like commit(...) and commitAsync(...)?
  8. What about the common case where you just want to commit the current
  position for all partitions?
  9. How do you unsubscribe?
  10. You say in a few places that positions() only impacts the starting
  position, but surely that isn't the case, right? Surely it controls the
  fetch position for that partition and can be called at any time?
 Otherwise
  it is a pretty weird api, right?
  11. How do I get my current position? Not the committed position but the
  offset of the next message that will be given to me?
 
  One thing that I really found helpful for the API design was writing out
  actual code for different scenarios against the API. I think it might be
  good to do that for this too--i.e. enumerate the various use cases and
 code
  that use case up to see how it looks. I'm not sure if it would be useful
 to
  collect these kinds of scenarios from people. I know they have
 sporadically
 

New Consumer API discussion

2014-02-10 Thread Neha Narkhede
As mentioned in previous emails, we are also working on a re-implementation
of the consumer. I would like to use this email thread to discuss the
details of the public API. I would also like us to be picky about this
public api now so it is as good as possible and we don't need to break it
in the future.

The best way to get a feel for the API is actually to take a look at the
javadochttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html,
the hope is to get the api docs good enough so that it is self-explanatory.
You can also take a look at the configs
herehttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html

Some background info on implementation:

At a high level the primary difference in this consumer is that it removes
the distinction between the high-level and low-level consumer. The new
consumer API is non blocking and instead of returning a blocking iterator,
the consumer provides a poll() API that returns a list of records. We think
this is better compared to the blocking iterators since it effectively
decouples the threading strategy used for processing messages from the
consumer. It is worth noting that the consumer is entirely single threaded
and runs in the user thread. The advantage is that it can be easily
rewritten in less multi-threading-friendly languages. The consumer batches
data and multiplexes I/O over TCP connections to each of the brokers it
communicates with, for high throughput. The consumer also allows long poll
to reduce the end-to-end message latency for low throughput data.

The consumer provides a group management facility that supports the concept
of a group with multiple consumer instances (just like the current
consumer). This is done through a custom heartbeat and group management
protocol transparent to the user. At the same time, it allows users the
option to subscribe to a fixed set of partitions and not use group
management at all. The offset management strategy defaults to Kafka based
offset management and the API provides a way for the user to use a
customized offset store to manage the consumer's offsets.

A key difference in this consumer also is the fact that it does not depend
on zookeeper at all.

More details about the new consumer design are
herehttps://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design

Please take a look at the new
APIhttp://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.htmland
give us any thoughts you may have.

Thanks,
Neha


Re: New Consumer API discussion

2014-02-10 Thread Mattijs Ugen

Hey Neha,

This looks really promising, I particularly like the ability to commit 
offsets for topic/partition tuples over just commit(). Some remarks:


- Constructors link to 
http://kafka.apache.org/documentation.html#consumerconfigs for valid 
configurations, which lists zookeeper.connect rather than 
metadata.broker.list, the value for BROKER_LIST_CONFIG in ConsumerConfig.
- Docs for poll(long) mention consumer.commit(true), which I can't find 
in the Consumer docs. For a simple consumer setup, that call is 
something that would make a lot of sense.

- Love the addition of MockConsumer, awesome for unittesting :)

Digging these open discussions on API changes on the mailing list btw, 
keep up the good work :)


Kind regards,

Mattijs


Re: New Consumer API discussion

2014-02-10 Thread Jay Kreps
A few items:
1. ConsumerRebalanceCallback
   a. onPartitionsRevoked would be a better name.
   b. We should discuss the possibility of splitting this into two
interfaces. The motivation would be that in Java 8 single method interfaces
can directly take methods which might be more intuitive.
   c. If we stick with a single interface I would prefer the name
RebalanceCallback as its more concise
2. Should subscribe(String topic, int partition) should be subscribe(String
topic, int...partition)?
3. Is lastCommittedOffset call just a local access? If so it would be more
convenient not to batch it.
4. How are we going to handle the earliest/latest starting position
functionality we currently have. Does that remain a config?
5. Do we need to expose the general ability to get known positions from the
log? E.g. the functionality in the OffsetRequest...? That would make the
ability to change position a little easier.
6. Should poll(java.lang.Long timeout) be poll(long timeout, TimeUnit
unit)? Is it Long because it allows null? If so should we just add a poll()
that polls indefinitely?
7. I recommend we remove the boolean parameter from commit as it is really
hard to read code that has boolean parameters without named arguments. Can
we make it something like commit(...) and commitAsync(...)?
8. What about the common case where you just want to commit the current
position for all partitions?
9. How do you unsubscribe?
10. You say in a few places that positions() only impacts the starting
position, but surely that isn't the case, right? Surely it controls the
fetch position for that partition and can be called at any time? Otherwise
it is a pretty weird api, right?
11. How do I get my current position? Not the committed position but the
offset of the next message that will be given to me?

One thing that I really found helpful for the API design was writing out
actual code for different scenarios against the API. I think it might be
good to do that for this too--i.e. enumerate the various use cases and code
that use case up to see how it looks. I'm not sure if it would be useful to
collect these kinds of scenarios from people. I know they have sporadically
popped up on the mailing list.

-Jay


On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede neha.narkh...@gmail.comwrote:

 As mentioned in previous emails, we are also working on a re-implementation
 of the consumer. I would like to use this email thread to discuss the
 details of the public API. I would also like us to be picky about this
 public api now so it is as good as possible and we don't need to break it
 in the future.

 The best way to get a feel for the API is actually to take a look at the
 javadoc
 http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
 ,
 the hope is to get the api docs good enough so that it is self-explanatory.
 You can also take a look at the configs
 here
 http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html
 

 Some background info on implementation:

 At a high level the primary difference in this consumer is that it removes
 the distinction between the high-level and low-level consumer. The new
 consumer API is non blocking and instead of returning a blocking iterator,
 the consumer provides a poll() API that returns a list of records. We think
 this is better compared to the blocking iterators since it effectively
 decouples the threading strategy used for processing messages from the
 consumer. It is worth noting that the consumer is entirely single threaded
 and runs in the user thread. The advantage is that it can be easily
 rewritten in less multi-threading-friendly languages. The consumer batches
 data and multiplexes I/O over TCP connections to each of the brokers it
 communicates with, for high throughput. The consumer also allows long poll
 to reduce the end-to-end message latency for low throughput data.

 The consumer provides a group management facility that supports the concept
 of a group with multiple consumer instances (just like the current
 consumer). This is done through a custom heartbeat and group management
 protocol transparent to the user. At the same time, it allows users the
 option to subscribe to a fixed set of partitions and not use group
 management at all. The offset management strategy defaults to Kafka based
 offset management and the API provides a way for the user to use a
 customized offset store to manage the consumer's offsets.

 A key difference in this consumer also is the fact that it does not depend
 on zookeeper at all.

 More details about the new consumer design are
 here
 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design
 

 Please take a look at the new
 API
 http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
 and
 give us any thoughts you may have.

 Thanks,
 Neha



Re: New Consumer API discussion

2014-02-10 Thread Pradeep Gollakota
Couple of very quick thoughts.

1. +1 about renaming commit(...) and commitAsync(...)
2. I'd also like to extend the above for the poll()  method as well. poll()
and pollWithTimeout(long, TimeUnit)?
3. Have you guys given any thought around how this API would be used with
hierarchical topics?
4. Would it make sense to add classes such as TopicId, PartitionId, etc?
Seems like it would be easier to read code with these classes as opposed to
string and longs.

- Pradeep


On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps jay.kr...@gmail.com wrote:

 A few items:
 1. ConsumerRebalanceCallback
a. onPartitionsRevoked would be a better name.
b. We should discuss the possibility of splitting this into two
 interfaces. The motivation would be that in Java 8 single method interfaces
 can directly take methods which might be more intuitive.
c. If we stick with a single interface I would prefer the name
 RebalanceCallback as its more concise
 2. Should subscribe(String topic, int partition) should be subscribe(String
 topic, int...partition)?
 3. Is lastCommittedOffset call just a local access? If so it would be more
 convenient not to batch it.
 4. How are we going to handle the earliest/latest starting position
 functionality we currently have. Does that remain a config?
 5. Do we need to expose the general ability to get known positions from the
 log? E.g. the functionality in the OffsetRequest...? That would make the
 ability to change position a little easier.
 6. Should poll(java.lang.Long timeout) be poll(long timeout, TimeUnit
 unit)? Is it Long because it allows null? If so should we just add a poll()
 that polls indefinitely?
 7. I recommend we remove the boolean parameter from commit as it is really
 hard to read code that has boolean parameters without named arguments. Can
 we make it something like commit(...) and commitAsync(...)?
 8. What about the common case where you just want to commit the current
 position for all partitions?
 9. How do you unsubscribe?
 10. You say in a few places that positions() only impacts the starting
 position, but surely that isn't the case, right? Surely it controls the
 fetch position for that partition and can be called at any time? Otherwise
 it is a pretty weird api, right?
 11. How do I get my current position? Not the committed position but the
 offset of the next message that will be given to me?

 One thing that I really found helpful for the API design was writing out
 actual code for different scenarios against the API. I think it might be
 good to do that for this too--i.e. enumerate the various use cases and code
 that use case up to see how it looks. I'm not sure if it would be useful to
 collect these kinds of scenarios from people. I know they have sporadically
 popped up on the mailing list.

 -Jay


 On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede neha.narkh...@gmail.com
 wrote:

  As mentioned in previous emails, we are also working on a
 re-implementation
  of the consumer. I would like to use this email thread to discuss the
  details of the public API. I would also like us to be picky about this
  public api now so it is as good as possible and we don't need to break it
  in the future.
 
  The best way to get a feel for the API is actually to take a look at the
  javadoc
 
 http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
  ,
  the hope is to get the api docs good enough so that it is
 self-explanatory.
  You can also take a look at the configs
  here
 
 http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html
  
 
  Some background info on implementation:
 
  At a high level the primary difference in this consumer is that it
 removes
  the distinction between the high-level and low-level consumer. The
 new
  consumer API is non blocking and instead of returning a blocking
 iterator,
  the consumer provides a poll() API that returns a list of records. We
 think
  this is better compared to the blocking iterators since it effectively
  decouples the threading strategy used for processing messages from the
  consumer. It is worth noting that the consumer is entirely single
 threaded
  and runs in the user thread. The advantage is that it can be easily
  rewritten in less multi-threading-friendly languages. The consumer
 batches
  data and multiplexes I/O over TCP connections to each of the brokers it
  communicates with, for high throughput. The consumer also allows long
 poll
  to reduce the end-to-end message latency for low throughput data.
 
  The consumer provides a group management facility that supports the
 concept
  of a group with multiple consumer instances (just like the current
  consumer). This is done through a custom heartbeat and group management
  protocol transparent to the user. At the same time, it allows users the
  option to subscribe to a fixed set of partitions and not use group
  management at all. The 

Re: New Consumer API discussion

2014-02-10 Thread Guozhang Wang
Hi Mattijs:

We have not updated the wiki pages for config yet, and it will not be
updated until we release 0.9 with these changes.

Currently consumers do have a commitOffsets function that can be called by
the users, but for most use cases auto.commit is turned on and this
function gets called by the consumer client itself.

Guozhang



On Mon, Feb 10, 2014 at 11:18 AM, Mattijs Ugen akaid...@almost3.net wrote:

 Hey Neha,

 This looks really promising, I particularly like the ability to commit
 offsets for topic/partition tuples over just commit(). Some remarks:

 - Constructors link to http://kafka.apache.org/documentation.html#
 consumerconfigs for valid configurations, which lists zookeeper.connect
 rather than metadata.broker.list, the value for BROKER_LIST_CONFIG in
 ConsumerConfig.
 - Docs for poll(long) mention consumer.commit(true), which I can't find in
 the Consumer docs. For a simple consumer setup, that call is something that
 would make a lot of sense.
 - Love the addition of MockConsumer, awesome for unittesting :)

 Digging these open discussions on API changes on the mailing list btw,
 keep up the good work :)

 Kind regards,

 Mattijs




-- 
-- Guozhang


Re: New Consumer API discussion

2014-02-10 Thread Guozhang Wang
Hello Jay,

Thanks for the detailed comments.

1. Yeah we could discuss a bit more on that.

2. Since subscribe() is incremental, adding one topic-partition is OK, and
personally I think it is cleaner than subscribe(String topic,
int...partition)?

3. Originally I was thinking about two interfaces:

getOffsets() // offsets for all partitions that I am consuming now

getOffset(topc-partition) // offset of the specified topic-partition, will
throw exception if it is not currently consumed.

What do you think about these?

4. Yes, that remains a config.

5. Agree.

6. If the time out value is null then it will logically return
immediately with whatever data is available. I think an indefinitely poll()
function could be replaced with just

while (true) poll(some-time)?

7. I am open with either approach.

8. I was thinking about two interfaces for the commit functionality:

https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design

Do those sound better?

9. Currently I think about un-subscribe as close and re-subscribe, and
would like to hear people's opinion about it.

10. Yes. Position() is an API function, and as and API it means be called
at any time and will change the next fetching starting offset.

11. The ConsumerRecord would have the offset info of the message. Is that
what you want?

About use cases: great point. I will add some more examples of using the
API functions in the wiki pages.

Guozhang




On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps jay.kr...@gmail.com wrote:

 A few items:
 1. ConsumerRebalanceCallback
a. onPartitionsRevoked would be a better name.
b. We should discuss the possibility of splitting this into two
 interfaces. The motivation would be that in Java 8 single method interfaces
 can directly take methods which might be more intuitive.
c. If we stick with a single interface I would prefer the name
 RebalanceCallback as its more concise
 2. Should subscribe(String topic, int partition) should be subscribe(String
 topic, int...partition)?
 3. Is lastCommittedOffset call just a local access? If so it would be more
 convenient not to batch it.
 4. How are we going to handle the earliest/latest starting position
 functionality we currently have. Does that remain a config?
 5. Do we need to expose the general ability to get known positions from the
 log? E.g. the functionality in the OffsetRequest...? That would make the
 ability to change position a little easier.
 6. Should poll(java.lang.Long timeout) be poll(long timeout, TimeUnit
 unit)? Is it Long because it allows null? If so should we just add a poll()
 that polls indefinitely?
 7. I recommend we remove the boolean parameter from commit as it is really
 hard to read code that has boolean parameters without named arguments. Can
 we make it something like commit(...) and commitAsync(...)?
 8. What about the common case where you just want to commit the current
 position for all partitions?
 9. How do you unsubscribe?
 10. You say in a few places that positions() only impacts the starting
 position, but surely that isn't the case, right? Surely it controls the
 fetch position for that partition and can be called at any time? Otherwise
 it is a pretty weird api, right?
 11. How do I get my current position? Not the committed position but the
 offset of the next message that will be given to me?

 One thing that I really found helpful for the API design was writing out
 actual code for different scenarios against the API. I think it might be
 good to do that for this too--i.e. enumerate the various use cases and code
 that use case up to see how it looks. I'm not sure if it would be useful to
 collect these kinds of scenarios from people. I know they have sporadically
 popped up on the mailing list.

 -Jay


 On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede neha.narkh...@gmail.com
 wrote:

  As mentioned in previous emails, we are also working on a
 re-implementation
  of the consumer. I would like to use this email thread to discuss the
  details of the public API. I would also like us to be picky about this
  public api now so it is as good as possible and we don't need to break it
  in the future.
 
  The best way to get a feel for the API is actually to take a look at the
  javadoc
 
 http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/KafkaConsumer.html
  ,
  the hope is to get the api docs good enough so that it is
 self-explanatory.
  You can also take a look at the configs
  here
 
 http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html
  
 
  Some background info on implementation:
 
  At a high level the primary difference in this consumer is that it
 removes
  the distinction between the high-level and low-level consumer. The
 new
  consumer API is non blocking and instead of returning a blocking
 iterator,
  the consumer provides a poll() API that returns a list of records. We
 think
  

Re: New Consumer API discussion

2014-02-10 Thread Guozhang Wang
Hi Mattijs:

2. As Neha said, one design of the new consumer is to have non-blocking
consuming API instead of blocking API. Do you have a strong reason in mind
to still keep the blocking API instead of just using while(no-data)
poll(timeout)?

3. No we have not thought about hierarchical topics. Could you elaborate on
some use cases?

4. Consumer will share some of the common code as Producer, in which the
ProduceRecord has

private final String topic;
private final Integer partition;
private final byte[] key;
private final byte[] value;

Thanks,

Guozhang


On Mon, Feb 10, 2014 at 2:31 PM, Guozhang Wang wangg...@gmail.com wrote:

 Hello Jay,

 Thanks for the detailed comments.

 1. Yeah we could discuss a bit more on that.

 2. Since subscribe() is incremental, adding one topic-partition is OK, and
 personally I think it is cleaner than subscribe(String topic,
 int...partition)?

 3. Originally I was thinking about two interfaces:

 getOffsets() // offsets for all partitions that I am consuming now

 getOffset(topc-partition) // offset of the specified topic-partition, will
 throw exception if it is not currently consumed.

 What do you think about these?

 4. Yes, that remains a config.

 5. Agree.

 6. If the time out value is null then it will logically return
 immediately with whatever data is available. I think an indefinitely poll()
 function could be replaced with just

 while (true) poll(some-time)?

 7. I am open with either approach.

 8. I was thinking about two interfaces for the commit functionality:


 https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design

 Do those sound better?

 9. Currently I think about un-subscribe as close and re-subscribe, and
 would like to hear people's opinion about it.

 10. Yes. Position() is an API function, and as and API it means be called
 at any time and will change the next fetching starting offset.

 11. The ConsumerRecord would have the offset info of the message. Is that
 what you want?

 About use cases: great point. I will add some more examples of using the
 API functions in the wiki pages.

 Guozhang




 On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps jay.kr...@gmail.com wrote:

 A few items:
 1. ConsumerRebalanceCallback
a. onPartitionsRevoked would be a better name.
b. We should discuss the possibility of splitting this into two
 interfaces. The motivation would be that in Java 8 single method
 interfaces
 can directly take methods which might be more intuitive.
c. If we stick with a single interface I would prefer the name
 RebalanceCallback as its more concise
 2. Should subscribe(String topic, int partition) should be
 subscribe(String
 topic, int...partition)?
 3. Is lastCommittedOffset call just a local access? If so it would be more
 convenient not to batch it.
 4. How are we going to handle the earliest/latest starting position
 functionality we currently have. Does that remain a config?
 5. Do we need to expose the general ability to get known positions from
 the
 log? E.g. the functionality in the OffsetRequest...? That would make the
 ability to change position a little easier.
 6. Should poll(java.lang.Long timeout) be poll(long timeout, TimeUnit
 unit)? Is it Long because it allows null? If so should we just add a
 poll()
 that polls indefinitely?
 7. I recommend we remove the boolean parameter from commit as it is really
 hard to read code that has boolean parameters without named arguments. Can
 we make it something like commit(...) and commitAsync(...)?
 8. What about the common case where you just want to commit the current
 position for all partitions?
 9. How do you unsubscribe?
 10. You say in a few places that positions() only impacts the starting
 position, but surely that isn't the case, right? Surely it controls the
 fetch position for that partition and can be called at any time? Otherwise
 it is a pretty weird api, right?
 11. How do I get my current position? Not the committed position but the
 offset of the next message that will be given to me?

 One thing that I really found helpful for the API design was writing out
 actual code for different scenarios against the API. I think it might be
 good to do that for this too--i.e. enumerate the various use cases and
 code
 that use case up to see how it looks. I'm not sure if it would be useful
 to
 collect these kinds of scenarios from people. I know they have
 sporadically
 popped up on the mailing list.

 -Jay


 On Mon, Feb 10, 2014 at 10:54 AM, Neha Narkhede neha.narkh...@gmail.com
 wrote:

  As mentioned in previous emails, we are also working on a
 re-implementation
  of the consumer. I would like to use this email thread to discuss the
  details of the public API. I would also like us to be picky about this
  public api now so it is as good as possible and we don't need to break
 it
  in the future.
 
  The best way to get a feel for the API is actually to take a look at the
  javadoc
 
 

Re: New Consumer API discussion

2014-02-10 Thread Neha Narkhede
Thanks for the feedback.

Mattijs -

- Constructors link to
http://kafka.apache.org/documentation.html#consumerconfigs for valid
configurations, which lists zookeeper.connect rather than
metadata.broker.list, the value for BROKER_LIST_CONFIG in ConsumerConfig.
Fixed it to just point to ConsumerConfig for now until we finalize the new
configs
- Docs for poll(long) mention consumer.commit(true), which I can't find in
the Consumer docs. For a simple consumer setup, that call is something that
would make a lot of sense.
Missed changing the examples to use consumer.commit(true, offsets). The
suggestions by Jay would change it to commit(offsets) and
commitAsync(offsets), which will hopefully make it easier to understand
those commit APIs.
- Love the addition of MockConsumer, awesome for unittesting :)
I'm not quite satisfied with what it does as of right now, but we will
surely improve it as we start writing the consumer.

Jay -

1. ConsumerRebalanceCallback
a. Makes sense. Renamed to onPartitionsRevoked
b. Ya, it will be good to make it forward compatible with Java 8
capabilities. We can change it to PartitionsAssignedCallback and
 PartitionsRevokedCallback or RebalanceBeginCallback and
RebalanceEndCallback?
c. Ya, I thought about that but then didn't name it just
RebalanceCallback since there could be a conflict with a controller side
rebalance callback if/when we have one. However, you can argue that at that
time we can name it ControllerRebalanceCallback instead of polluting a user
facing API. So agree with you here.
2. Ya, that is a good idea. Changed to subscribe(String topic,
int...partitions).
3. lastCommittedOffset() is not necessarily a local access since the
consumer can potentially ask for the last committed offsets of partitions
that the consumer does not consume and maintain the offsets for. That's the
reason it is batched right now.
4. Yes, look at
http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html#AUTO_OFFSET_RESET_CONFIG
5. Sure, but that is not part of the consumer API right? I think you're
suggesting looking at OffsetRequest to see if it would do that properly?
6. Good point. Changed to poll(long timeout, TimeUnit) and poll with a
negative timeout will poll indefinitely?
7. Good point. Changed to commit(...) and commitAsync(...)
8. To commit the current position for all partitions owned by the consumer,
you can use commit(). If you don't use group management, then
commit(customListOfPartitions)
9. Forgot to include unsubscribe. Done now.
10. positions() can be called at any time and affects the next fetch on the
next poll(). Fixed the places that said starting fetch offsets
11. Can we not look that up by going through the messages returned and
getting the offset from the ConsumerRecord?

One thing that I really found helpful for the API design was writing out
actual code for different scenarios against the API. I think it might be
good to do that for this too--i.e. enumerate the various use cases and code
that use case up to see how it looks
The javadocs include examples for almost all possible scenarios of consumer
usage, that I could come up with. Will add more to the javadocs as I get
more feedback from our users. The advantage of having the examples in the
javadoc itself is to that the usage is self explanatory to new users.

Pradeep -

2. Changed to poll(long, TimeUnit) and a negative value for the timeout
would block in the poll forever until there is new data
3. We don't have hierarchical topics support. Would you mind explaining
what you meant?
4. I'm not so sure that we need a class to express a topic which is a
string and a separate class for just partition id. We do have a class for
TopicPartition which uniquely identifies a partition of a topic

Thanks,
Neha


On Mon, Feb 10, 2014 at 12:36 PM, Pradeep Gollakota pradeep...@gmail.comwrote:

 Couple of very quick thoughts.

 1. +1 about renaming commit(...) and commitAsync(...)
 2. I'd also like to extend the above for the poll()  method as well. poll()
 and pollWithTimeout(long, TimeUnit)?
 3. Have you guys given any thought around how this API would be used with
 hierarchical topics?
 4. Would it make sense to add classes such as TopicId, PartitionId, etc?
 Seems like it would be easier to read code with these classes as opposed to
 string and longs.

 - Pradeep


 On Mon, Feb 10, 2014 at 12:20 PM, Jay Kreps jay.kr...@gmail.com wrote:

  A few items:
  1. ConsumerRebalanceCallback
 a. onPartitionsRevoked would be a better name.
 b. We should discuss the possibility of splitting this into two
  interfaces. The motivation would be that in Java 8 single method
 interfaces
  can directly take methods which might be more intuitive.
 c. If we stick with a single interface I would prefer the name
  RebalanceCallback as its more concise
  2. Should subscribe(String topic, int partition) should be
 subscribe(String
  topic, int...partition)?
  3. 

Re: New Consumer API discussion

2014-02-10 Thread Pradeep Gollakota
WRT to hierarchical topics, I'm referring to
KAFKA-1175https://issues.apache.org/jira/browse/KAFKA-1175.
I would just like to think through the implications for the Consumer API if
and when we do implement hierarchical topics. For example, in the
proposalhttps://cwiki.apache.org/confluence/display/KAFKA/Hierarchical+Topics#written
by Jay, he says that initially wildcard subscriptions are not going
to be supported. But does that mean that they will be supported in v2? If
that's the case, that would change the semantics of the Consumer API.

As to having classes for Topic, PartitionId, etc. it looks like I was
referring to the TopicPartition and TopicPartitionOffset classes (I didn't
realize these were already there). I was only looking at the confluence
page which shows List[(String, Int, Long)] instead of
List[TopicParitionOffset] (as is shown in the javadoc). However, I did
notice that we're not being consistent in the Java version. E.g. we have
commit(TopicPartitionOffset... offsets) and
lastCommittedOffsets(TopicPartition... partitions) on the one hand. On the
other hand we have subscribe(String topic, int... partitions). I agree that
creating a class for TopicId today would probably not make too much sense
today. But with hierarchical topics, I may change my mind. This is exactly
what was done in the HBase API in 0.96 when namespaces were added. 0.96
HBase API introduced a class called 'TableName' to represent the namespace
and table name.


On Mon, Feb 10, 2014 at 3:08 PM, Neha Narkhede neha.narkh...@gmail.comwrote:

 Thanks for the feedback.

 Mattijs -

 - Constructors link to
 http://kafka.apache.org/documentation.html#consumerconfigs for valid
 configurations, which lists zookeeper.connect rather than
 metadata.broker.list, the value for BROKER_LIST_CONFIG in ConsumerConfig.
 Fixed it to just point to ConsumerConfig for now until we finalize the new
 configs
 - Docs for poll(long) mention consumer.commit(true), which I can't find in
 the Consumer docs. For a simple consumer setup, that call is something that
 would make a lot of sense.
 Missed changing the examples to use consumer.commit(true, offsets). The
 suggestions by Jay would change it to commit(offsets) and
 commitAsync(offsets), which will hopefully make it easier to understand
 those commit APIs.
 - Love the addition of MockConsumer, awesome for unittesting :)
 I'm not quite satisfied with what it does as of right now, but we will
 surely improve it as we start writing the consumer.

 Jay -

 1. ConsumerRebalanceCallback
 a. Makes sense. Renamed to onPartitionsRevoked
 b. Ya, it will be good to make it forward compatible with Java 8
 capabilities. We can change it to PartitionsAssignedCallback and
  PartitionsRevokedCallback or RebalanceBeginCallback and
 RebalanceEndCallback?
 c. Ya, I thought about that but then didn't name it just
 RebalanceCallback since there could be a conflict with a controller side
 rebalance callback if/when we have one. However, you can argue that at that
 time we can name it ControllerRebalanceCallback instead of polluting a user
 facing API. So agree with you here.
 2. Ya, that is a good idea. Changed to subscribe(String topic,
 int...partitions).
 3. lastCommittedOffset() is not necessarily a local access since the
 consumer can potentially ask for the last committed offsets of partitions
 that the consumer does not consume and maintain the offsets for. That's the
 reason it is batched right now.
 4. Yes, look at

 http://people.apache.org/~nehanarkhede/kafka-0.9-consumer-javadoc/doc/kafka/clients/consumer/ConsumerConfig.html#AUTO_OFFSET_RESET_CONFIG
 5. Sure, but that is not part of the consumer API right? I think you're
 suggesting looking at OffsetRequest to see if it would do that properly?
 6. Good point. Changed to poll(long timeout, TimeUnit) and poll with a
 negative timeout will poll indefinitely?
 7. Good point. Changed to commit(...) and commitAsync(...)
 8. To commit the current position for all partitions owned by the consumer,
 you can use commit(). If you don't use group management, then
 commit(customListOfPartitions)
 9. Forgot to include unsubscribe. Done now.
 10. positions() can be called at any time and affects the next fetch on the
 next poll(). Fixed the places that said starting fetch offsets
 11. Can we not look that up by going through the messages returned and
 getting the offset from the ConsumerRecord?

 One thing that I really found helpful for the API design was writing out
 actual code for different scenarios against the API. I think it might be
 good to do that for this too--i.e. enumerate the various use cases and code
 that use case up to see how it looks
 The javadocs include examples for almost all possible scenarios of consumer
 usage, that I could come up with. Will add more to the javadocs as I get
 more feedback from our users. The advantage of having the examples in the
 javadoc itself is to that the usage is self explanatory to new users.