Re: New Consumer API + Reactive Kafka

2015-12-02 Thread Guozhang Wang
In the new API commitSync() handles retires and reconnecting, and will only
throw an exception if it encounters a non-retriable error (e.g. it is been
told that the partitions it wants to commit no longer belongs to itself) or
a timeout has elapsed. You can find possible exceptions thrown from this
function here (for function commitSync):

http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

Guozhang


On Wed, Dec 2, 2015 at 8:58 AM, Krzysztof Ciesielski <
krzysztof.ciesiel...@softwaremill.pl> wrote:

> I see, that’s actually a very important point, thanks Jay.
> I think that we are very optimistic about updating Reactive Kafka now
> after getting all these details :)
> I have one more question: in the new client we only have to call
> commitSync(offsets). This is a ‘void’ method so I suspect that it commits
> atomically?
> In our current native committer, we have quite a lot of additional code
> for retries, reconnecting or finding new channel coordinator. I suspect
> that the new API handles it all internally and if commitSync() fails then
> it means that the only thing we can do is kill the consumer and try to
> create a new one?
>
> —
> Bests,
> Chris
> SoftwareMill
> On 2 December 2015 at 17:42:24, Jay Kreps (j...@confluent.io) wrote:
>
> It's worth noting that both the old and new consumer are identical in the
> number of records fetched at once and this is bounded by the fetch size and
> the number of partitions you subscribe to. The old consumer held these in
> memory internally and waited for you to ask for them, the new consumer
> immediately gives you what it has. Overall, though, the new consumer gives
> much better control over what is being fetched since it only uses memory
> when you call poll(); the old consumer had a background thread doing this
> which would only stop when it filled up a queue of unprocessed
> chunks...this is a lot harder to predict.
>
> -Jay
>
> On Wed, Dec 2, 2015 at 7:13 AM, Gwen Shapira  wrote:
>
> > On Wed, Dec 2, 2015 at 10:44 PM, Krzysztof Ciesielski <
> > krzysztof.ciesiel...@softwaremill.pl> wrote:
> >
> > > Hello,
> > >
> > > I’m the main maintainer of Reactive Kafka - a wrapper library that
> > > provides Kafka API as Reactive Streams (
> > > https://github.com/softwaremill/reactive-kafka).
> > > I’m a bit concerned about switching to Kafka 0.9 because of the new
> > > Consumer API which doesn’t seem to fit well into this paradigm,
> comparing
> > > to the old one. My main concerns are:
> > >
> > > 1. Our current code uses the KafkaIterator and reads messages
> > > sequentially, then sends them further upstream. In the new API, you
> > cannot
> > > control how many messages are returned with poll(), so we would need to
> > > introduce some kind of in-memory buffering.
> > > 2. You cannot specify which offsets to commit. Our current native
> > > committer (
> > >
> >
> https://github.com/softwaremill/reactive-kafka/blob/4055e88c09b8e08aefe8dbbd4748605df5779b07/core/src/main/scala/com/softwaremill/react/kafka/commit/native/NativeCommitter.scala
> > )
> > > uses the OffsetCommitRequest/Response API and
> > > kafka.api.ConsumerMetadataRequest/Response for resolving brokers.
> > Switching
> > > to Kafka 0.9 brings some compilation errors that raise questions.
> > >
> > > My questions are:
> > >
> > > 1. Do I understand the capabilities and limitations of new API
> correctly?
> > > :)
> > >
> >
> > The first limitation is correct - poll() may return any number of records
> > and you need to handle this.
> > The second is not correct - commitSync() can take a map of TopicPartition
> > and Offsets, so you would only commit specific offsets of specific
> > partitions.
> >
> >
> >
> > > 2. Can we stay with the old iterator-based client, or is it going to
> get
> > > abandoned in future Kafka versions, or discouraged for some reasons?
> > >
> >
> > It is already a bit behind - only the new client includes support for
> > secured clusters (authentication and encryption). It will get deprecated
> in
> > the future.
> >
> >
> > > 3. Can we still use the OffsetCommitRequest/Response API to commit
> > > messages manually? If yes, could someone update this example:
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
> > or
> > > give me a few hints on how to do this with 0.9?
> > >
> >
> > AFAIK, the wire protocol and the API is not going anywhere. Hopefully you
> > can use the new objects we provide in the clients jar
> > (org.apache.kafka.common.requests).
> >
> >
> > >
> > > By the way, we’d like our library to appear on the Ecosystem Wiki, I’m
> > not
> > > sure how to request that officially :)
> > >
> >
> > Let us know what to write there and where to link :)
> >
> >
> > >
> > > —
> > > Bests,
> > > Chris
> > > SoftwareMill
> >
>



-- 
-- Guozhang


Re: New Consumer API + Reactive Kafka

2015-12-02 Thread Krzysztof Ciesielski
I see, that’s actually a very important point, thanks Jay.
I think that we are very optimistic about updating Reactive Kafka now after 
getting all these details :)
I have one more question: in the new client we only have to call 
commitSync(offsets). This is a ‘void’ method so I suspect that it commits 
atomically?
In our current native committer, we have quite a lot of additional code for 
retries, reconnecting or finding new channel coordinator. I suspect that the 
new API handles it all internally and if commitSync() fails then it means that 
the only thing we can do is kill the consumer and try to create a new one?

— 
Bests,
Chris
SoftwareMill
On 2 December 2015 at 17:42:24, Jay Kreps (j...@confluent.io) wrote:

It's worth noting that both the old and new consumer are identical in the  
number of records fetched at once and this is bounded by the fetch size and  
the number of partitions you subscribe to. The old consumer held these in  
memory internally and waited for you to ask for them, the new consumer  
immediately gives you what it has. Overall, though, the new consumer gives  
much better control over what is being fetched since it only uses memory  
when you call poll(); the old consumer had a background thread doing this  
which would only stop when it filled up a queue of unprocessed  
chunks...this is a lot harder to predict.  

-Jay  

On Wed, Dec 2, 2015 at 7:13 AM, Gwen Shapira  wrote:  

> On Wed, Dec 2, 2015 at 10:44 PM, Krzysztof Ciesielski <  
> krzysztof.ciesiel...@softwaremill.pl> wrote:  
>  
> > Hello,  
> >  
> > I’m the main maintainer of Reactive Kafka - a wrapper library that  
> > provides Kafka API as Reactive Streams (  
> > https://github.com/softwaremill/reactive-kafka).  
> > I’m a bit concerned about switching to Kafka 0.9 because of the new  
> > Consumer API which doesn’t seem to fit well into this paradigm, comparing  
> > to the old one. My main concerns are:  
> >  
> > 1. Our current code uses the KafkaIterator and reads messages  
> > sequentially, then sends them further upstream. In the new API, you  
> cannot  
> > control how many messages are returned with poll(), so we would need to  
> > introduce some kind of in-memory buffering.  
> > 2. You cannot specify which offsets to commit. Our current native  
> > committer (  
> >  
> https://github.com/softwaremill/reactive-kafka/blob/4055e88c09b8e08aefe8dbbd4748605df5779b07/core/src/main/scala/com/softwaremill/react/kafka/commit/native/NativeCommitter.scala
>   
> )  
> > uses the OffsetCommitRequest/Response API and  
> > kafka.api.ConsumerMetadataRequest/Response for resolving brokers.  
> Switching  
> > to Kafka 0.9 brings some compilation errors that raise questions.  
> >  
> > My questions are:  
> >  
> > 1. Do I understand the capabilities and limitations of new API correctly?  
> > :)  
> >  
>  
> The first limitation is correct - poll() may return any number of records  
> and you need to handle this.  
> The second is not correct - commitSync() can take a map of TopicPartition  
> and Offsets, so you would only commit specific offsets of specific  
> partitions.  
>  
>  
>  
> > 2. Can we stay with the old iterator-based client, or is it going to get  
> > abandoned in future Kafka versions, or discouraged for some reasons?  
> >  
>  
> It is already a bit behind - only the new client includes support for  
> secured clusters (authentication and encryption). It will get deprecated in  
> the future.  
>  
>  
> > 3. Can we still use the OffsetCommitRequest/Response API to commit  
> > messages manually? If yes, could someone update this example:  
> >  
> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
>   
> or  
> > give me a few hints on how to do this with 0.9?  
> >  
>  
> AFAIK, the wire protocol and the API is not going anywhere. Hopefully you  
> can use the new objects we provide in the clients jar  
> (org.apache.kafka.common.requests).  
>  
>  
> >  
> > By the way, we’d like our library to appear on the Ecosystem Wiki, I’m  
> not  
> > sure how to request that officially :)  
> >  
>  
> Let us know what to write there and where to link :)  
>  
>  
> >  
> > —  
> > Bests,  
> > Chris  
> > SoftwareMill  
>  


Re: New Consumer API + Reactive Kafka

2015-12-02 Thread Jay Kreps
It's worth noting that both the old and new consumer are identical in the
number of records fetched at once and this is bounded by the fetch size and
the number of partitions you subscribe to. The old consumer held these in
memory internally and waited for you to ask for them, the new consumer
immediately gives you what it has. Overall, though, the new consumer gives
much better control over what is being fetched since it only uses memory
when you call poll(); the old consumer had a background thread doing this
which would only stop when it filled up a queue of unprocessed
chunks...this is a lot harder to predict.

-Jay

On Wed, Dec 2, 2015 at 7:13 AM, Gwen Shapira  wrote:

> On Wed, Dec 2, 2015 at 10:44 PM, Krzysztof Ciesielski <
> krzysztof.ciesiel...@softwaremill.pl> wrote:
>
> > Hello,
> >
> > I’m the main maintainer of Reactive Kafka - a wrapper library that
> > provides Kafka API as Reactive Streams (
> > https://github.com/softwaremill/reactive-kafka).
> > I’m a bit concerned about switching to Kafka 0.9 because of the new
> > Consumer API which doesn’t seem to fit well into this paradigm, comparing
> > to the old one. My main concerns are:
> >
> > 1. Our current code uses the KafkaIterator and reads messages
> > sequentially, then sends them further upstream. In the new API, you
> cannot
> > control how many messages are returned with poll(), so we would need to
> > introduce some kind of in-memory buffering.
> > 2. You cannot specify which offsets to commit. Our current native
> > committer (
> >
> https://github.com/softwaremill/reactive-kafka/blob/4055e88c09b8e08aefe8dbbd4748605df5779b07/core/src/main/scala/com/softwaremill/react/kafka/commit/native/NativeCommitter.scala
> )
> > uses the OffsetCommitRequest/Response API and
> > kafka.api.ConsumerMetadataRequest/Response for resolving brokers.
> Switching
> > to Kafka 0.9 brings some compilation errors that raise questions.
> >
> > My questions are:
> >
> > 1. Do I understand the capabilities and limitations of new API correctly?
> > :)
> >
>
> The first limitation is correct - poll() may return any number of records
> and you need to handle this.
> The second is not correct - commitSync() can take a map of TopicPartition
> and Offsets, so you would only commit specific offsets of specific
> partitions.
>
>
>
> > 2. Can we stay with the old iterator-based client, or is it going to get
> > abandoned in future Kafka versions, or discouraged for some reasons?
> >
>
> It is already a bit behind - only the new client includes support for
> secured clusters (authentication and encryption). It will get deprecated in
> the future.
>
>
> > 3. Can we still use the OffsetCommitRequest/Response API to commit
> > messages manually? If yes, could someone update this example:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
> or
> > give me a few hints on how to do this with 0.9?
> >
>
> AFAIK, the wire protocol and the API is not going anywhere. Hopefully you
> can use the new objects we provide in the clients jar
> (org.apache.kafka.common.requests).
>
>
> >
> > By the way, we’d like our library to appear on the Ecosystem Wiki, I’m
> not
> > sure how to request that officially :)
> >
>
> Let us know what to write there and where to link :)
>
>
> >
> > —
> > Bests,
> > Chris
> > SoftwareMill
>


Re: New Consumer API + Reactive Kafka

2015-12-02 Thread Gwen Shapira
On Wed, Dec 2, 2015 at 10:44 PM, Krzysztof Ciesielski <
krzysztof.ciesiel...@softwaremill.pl> wrote:

> Hello,
>
> I’m the main maintainer of Reactive Kafka - a wrapper library that
> provides Kafka API as Reactive Streams (
> https://github.com/softwaremill/reactive-kafka).
> I’m a bit concerned about switching to Kafka 0.9 because of the new
> Consumer API which doesn’t seem to fit well into this paradigm, comparing
> to the old one. My main concerns are:
>
> 1. Our current code uses the KafkaIterator and reads messages
> sequentially, then sends them further upstream. In the new API, you cannot
> control how many messages are returned with poll(), so we would need to
> introduce some kind of in-memory buffering.
> 2. You cannot specify which offsets to commit. Our current native
> committer (
> https://github.com/softwaremill/reactive-kafka/blob/4055e88c09b8e08aefe8dbbd4748605df5779b07/core/src/main/scala/com/softwaremill/react/kafka/commit/native/NativeCommitter.scala)
> uses the OffsetCommitRequest/Response API and
> kafka.api.ConsumerMetadataRequest/Response for resolving brokers. Switching
> to Kafka 0.9 brings some compilation errors that raise questions.
>
> My questions are:
>
> 1. Do I understand the capabilities and limitations of new API correctly?
> :)
>

The first limitation is correct - poll() may return any number of records
and you need to handle this.
The second is not correct - commitSync() can take a map of TopicPartition
and Offsets, so you would only commit specific offsets of specific
partitions.



> 2. Can we stay with the old iterator-based client, or is it going to get
> abandoned in future Kafka versions, or discouraged for some reasons?
>

It is already a bit behind - only the new client includes support for
secured clusters (authentication and encryption). It will get deprecated in
the future.


> 3. Can we still use the OffsetCommitRequest/Response API to commit
> messages manually? If yes, could someone update this example:
> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
>  or
> give me a few hints on how to do this with 0.9?
>

AFAIK, the wire protocol and the API is not going anywhere. Hopefully you
can use the new objects we provide in the clients jar
(org.apache.kafka.common.requests).


>
> By the way, we’d like our library to appear on the Ecosystem Wiki, I’m not
> sure how to request that officially :)
>

Let us know what to write there and where to link :)


>
> —
> Bests,
> Chris
> SoftwareMill