Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-12 Thread Joel Koshy
Catching up on email - yes the wire protocol change was a big mistake
and +1 on this approach. I'm reviewing KAFKA-1841 right now.

On Thu, Jan 08, 2015 at 10:07:44PM -0800, Dana Powers wrote:
> This approach makes sense to me -- thanks, Jun.
> 
> -Dana
> 
> On Thu, Jan 8, 2015 at 3:57 PM, Jun Rao  wrote:
> 
> > I took a stab at fixing this issue in KAFKA-1841.
> >
> > This is actually a bit tricky to fix. In addition to the incompatible wire
> > protocol change of version 0 of OffsetCommitRequest, there is also a
> > functionality change. Specifically, in 0.8.1, version 0 of
> > OffsetCommitRequest writes to ZK and version 0 of OffsetFetchRequest reads
> > from ZK. In 0.8.2/trunk, this is changed to version 0 and version 1 of
> > OffsetCommitRequest writes to Kafka and version 0 of OffsetFetchRequest
> > reads from Kafka. To make this really backward compatible with 0.8.1, we
> > have to
> > (1) revert the wire protocol change of version 0 of OffsetCommitRequest.
> > (2) change the behavior of version 0 of OffsetCommitRequest and
> > OffsetFetchRequest to read from ZK
> > (3) create version 1 of OffsetFetchRequest that reads from Kafka (so that
> > it behaves consistently with version 1 of OffsetCommitRequest)
> >
> > That's what the patch in KAFKA-1841 does. This works as long as people are
> > only using released final version. However, since this introduces an
> > incompatible (functional) change of OffsetFetchRequest in 0.8.2-beta and
> > trunk, this will create problems for people (assuming that they are using
> > this api) who have a deployment of 0.8.2-beta and want to upgrade to 0.8.2
> > final, or a deployment from trunk and want to upgrade to a later version of
> > trunk in the future. In either case, the upgrade of the broker will cause
> > the old client to behave differently and incorrectly. The only choice there
> > is to stop the client and the broker and upgrade them together. Most people
> > will probably only deploy final released version in production. However, I
> > want to bring this up to see if anyone has concerns on this.
> >
> > Thanks,
> >
> > Jun
> >
> > On Wed, Jan 7, 2015 at 10:32 AM, Jay Kreps  wrote:
> >
> > > Yes, I think we are saying the same thing. Basically I am saying version
> > 0
> > > should be considered frozen as of the format and behavior in the 0.8.1
> > > release and we can do whatever we want as a version 1+.
> > >
> > > -Jay
> > >
> > > On Wed, Jan 7, 2015 at 10:10 AM, Joe Stein  wrote:
> > >
> > > > << We need to take the versioning of the protocol seriously
> > > >
> > > > amen
> > > >
> > > > << People are definitely using the offset commit functionality in 0.8.1
> > > >
> > > > agreed
> > > >
> > > > << I really think we should treat this as a bug and revert the change
> > to
> > > > version 0.
> > > >
> > > > What do you mean exactly by revert? Why can't we use version as a
> > feature
> > > > flag and support 0 and 1 at the same time? in the handleOffsetFetch and
> > > > handleOffsetCommit functions that process the request messages just do
> > if
> > > > version == 0 old functionality else if version == 1 new functionality.
> > > > This way everyone works and nothing breaks  =8^)
> > > >
> > > > /***
> > > >  Joe Stein
> > > >  Founder, Principal Consultant
> > > >  Big Data Open Source Security LLC
> > > >  http://www.stealth.ly
> > > >  Twitter: @allthingshadoop 
> > > > /
> > > >
> > > > On Wed, Jan 7, 2015 at 1:04 PM, Jay Kreps  wrote:
> > > >
> > > > > Hey guys,
> > > > >
> > > > > We need to take the versioning of the protocol seriously. People are
> > > > > definitely using the offset commit functionality in 0.8.1 and I
> > really
> > > > > think we should treat this as a bug and revert the change to version
> > 0.
> > > > >
> > > > > -Jay
> > > > >
> > > > > On Wed, Jan 7, 2015 at 9:24 AM, Jun Rao  wrote:
> > > > >
> > > > > > Yes, we did make an incompatible change in OffsetCommitRequest in
> > > > 0.8.2,
> > > > > > which is a mistake. The incompatible change was introduced in
> > > > KAFKA-1012
> > > > > in
> > > > > > Mar, 2014 when we added the kafka-based offset management support.
> > > > > However,
> > > > > > we didn't realize that this breaks the wire protocol until much
> > > later.
> > > > > Now,
> > > > > > the wire protocol has evolved again and it's a bit hard to fix the
> > > > format
> > > > > > in version 0. I can see a couple of options.
> > > > > >
> > > > > > Option 1: Just accept the incompatible change as it is.
> > > > > > The argument is that even though we introduced OffsetCommitRequest
> > in
> > > > > > 0.8.1, it's not used in the high level consumer. It's possible that
> > > > some
> > > > > > users of SimpleConsumer started using it. However, that number is
> > > > likely
> > > > > > small. Also, the functionality of OffsetCommitRequest has changed
> > > since
> > > > > > it's writing the offs

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-08 Thread Dana Powers
This approach makes sense to me -- thanks, Jun.

-Dana

On Thu, Jan 8, 2015 at 3:57 PM, Jun Rao  wrote:

> I took a stab at fixing this issue in KAFKA-1841.
>
> This is actually a bit tricky to fix. In addition to the incompatible wire
> protocol change of version 0 of OffsetCommitRequest, there is also a
> functionality change. Specifically, in 0.8.1, version 0 of
> OffsetCommitRequest writes to ZK and version 0 of OffsetFetchRequest reads
> from ZK. In 0.8.2/trunk, this is changed to version 0 and version 1 of
> OffsetCommitRequest writes to Kafka and version 0 of OffsetFetchRequest
> reads from Kafka. To make this really backward compatible with 0.8.1, we
> have to
> (1) revert the wire protocol change of version 0 of OffsetCommitRequest.
> (2) change the behavior of version 0 of OffsetCommitRequest and
> OffsetFetchRequest to read from ZK
> (3) create version 1 of OffsetFetchRequest that reads from Kafka (so that
> it behaves consistently with version 1 of OffsetCommitRequest)
>
> That's what the patch in KAFKA-1841 does. This works as long as people are
> only using released final version. However, since this introduces an
> incompatible (functional) change of OffsetFetchRequest in 0.8.2-beta and
> trunk, this will create problems for people (assuming that they are using
> this api) who have a deployment of 0.8.2-beta and want to upgrade to 0.8.2
> final, or a deployment from trunk and want to upgrade to a later version of
> trunk in the future. In either case, the upgrade of the broker will cause
> the old client to behave differently and incorrectly. The only choice there
> is to stop the client and the broker and upgrade them together. Most people
> will probably only deploy final released version in production. However, I
> want to bring this up to see if anyone has concerns on this.
>
> Thanks,
>
> Jun
>
> On Wed, Jan 7, 2015 at 10:32 AM, Jay Kreps  wrote:
>
> > Yes, I think we are saying the same thing. Basically I am saying version
> 0
> > should be considered frozen as of the format and behavior in the 0.8.1
> > release and we can do whatever we want as a version 1+.
> >
> > -Jay
> >
> > On Wed, Jan 7, 2015 at 10:10 AM, Joe Stein  wrote:
> >
> > > << We need to take the versioning of the protocol seriously
> > >
> > > amen
> > >
> > > << People are definitely using the offset commit functionality in 0.8.1
> > >
> > > agreed
> > >
> > > << I really think we should treat this as a bug and revert the change
> to
> > > version 0.
> > >
> > > What do you mean exactly by revert? Why can't we use version as a
> feature
> > > flag and support 0 and 1 at the same time? in the handleOffsetFetch and
> > > handleOffsetCommit functions that process the request messages just do
> if
> > > version == 0 old functionality else if version == 1 new functionality.
> > > This way everyone works and nothing breaks  =8^)
> > >
> > > /***
> > >  Joe Stein
> > >  Founder, Principal Consultant
> > >  Big Data Open Source Security LLC
> > >  http://www.stealth.ly
> > >  Twitter: @allthingshadoop 
> > > /
> > >
> > > On Wed, Jan 7, 2015 at 1:04 PM, Jay Kreps  wrote:
> > >
> > > > Hey guys,
> > > >
> > > > We need to take the versioning of the protocol seriously. People are
> > > > definitely using the offset commit functionality in 0.8.1 and I
> really
> > > > think we should treat this as a bug and revert the change to version
> 0.
> > > >
> > > > -Jay
> > > >
> > > > On Wed, Jan 7, 2015 at 9:24 AM, Jun Rao  wrote:
> > > >
> > > > > Yes, we did make an incompatible change in OffsetCommitRequest in
> > > 0.8.2,
> > > > > which is a mistake. The incompatible change was introduced in
> > > KAFKA-1012
> > > > in
> > > > > Mar, 2014 when we added the kafka-based offset management support.
> > > > However,
> > > > > we didn't realize that this breaks the wire protocol until much
> > later.
> > > > Now,
> > > > > the wire protocol has evolved again and it's a bit hard to fix the
> > > format
> > > > > in version 0. I can see a couple of options.
> > > > >
> > > > > Option 1: Just accept the incompatible change as it is.
> > > > > The argument is that even though we introduced OffsetCommitRequest
> in
> > > > > 0.8.1, it's not used in the high level consumer. It's possible that
> > > some
> > > > > users of SimpleConsumer started using it. However, that number is
> > > likely
> > > > > small. Also, the functionality of OffsetCommitRequest has changed
> > since
> > > > > it's writing the offset to a Kafka log, instead of ZK (for good
> > > reasons).
> > > > > So, we can document this as a wire protocol and functionality
> > > > incompatible
> > > > > change. For users who don't mind the functionality change, they
> will
> > > need
> > > > > to upgrade the client to the new protocol before they can use the
> new
> > > > > broker. For users who want to preserve the old functionality, they
> > will
> > > > > h

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-08 Thread Jun Rao
I took a stab at fixing this issue in KAFKA-1841.

This is actually a bit tricky to fix. In addition to the incompatible wire
protocol change of version 0 of OffsetCommitRequest, there is also a
functionality change. Specifically, in 0.8.1, version 0 of
OffsetCommitRequest writes to ZK and version 0 of OffsetFetchRequest reads
from ZK. In 0.8.2/trunk, this is changed to version 0 and version 1 of
OffsetCommitRequest writes to Kafka and version 0 of OffsetFetchRequest
reads from Kafka. To make this really backward compatible with 0.8.1, we
have to
(1) revert the wire protocol change of version 0 of OffsetCommitRequest.
(2) change the behavior of version 0 of OffsetCommitRequest and
OffsetFetchRequest to read from ZK
(3) create version 1 of OffsetFetchRequest that reads from Kafka (so that
it behaves consistently with version 1 of OffsetCommitRequest)

That's what the patch in KAFKA-1841 does. This works as long as people are
only using released final version. However, since this introduces an
incompatible (functional) change of OffsetFetchRequest in 0.8.2-beta and
trunk, this will create problems for people (assuming that they are using
this api) who have a deployment of 0.8.2-beta and want to upgrade to 0.8.2
final, or a deployment from trunk and want to upgrade to a later version of
trunk in the future. In either case, the upgrade of the broker will cause
the old client to behave differently and incorrectly. The only choice there
is to stop the client and the broker and upgrade them together. Most people
will probably only deploy final released version in production. However, I
want to bring this up to see if anyone has concerns on this.

Thanks,

Jun

On Wed, Jan 7, 2015 at 10:32 AM, Jay Kreps  wrote:

> Yes, I think we are saying the same thing. Basically I am saying version 0
> should be considered frozen as of the format and behavior in the 0.8.1
> release and we can do whatever we want as a version 1+.
>
> -Jay
>
> On Wed, Jan 7, 2015 at 10:10 AM, Joe Stein  wrote:
>
> > << We need to take the versioning of the protocol seriously
> >
> > amen
> >
> > << People are definitely using the offset commit functionality in 0.8.1
> >
> > agreed
> >
> > << I really think we should treat this as a bug and revert the change to
> > version 0.
> >
> > What do you mean exactly by revert? Why can't we use version as a feature
> > flag and support 0 and 1 at the same time? in the handleOffsetFetch and
> > handleOffsetCommit functions that process the request messages just do if
> > version == 0 old functionality else if version == 1 new functionality.
> > This way everyone works and nothing breaks  =8^)
> >
> > /***
> >  Joe Stein
> >  Founder, Principal Consultant
> >  Big Data Open Source Security LLC
> >  http://www.stealth.ly
> >  Twitter: @allthingshadoop 
> > /
> >
> > On Wed, Jan 7, 2015 at 1:04 PM, Jay Kreps  wrote:
> >
> > > Hey guys,
> > >
> > > We need to take the versioning of the protocol seriously. People are
> > > definitely using the offset commit functionality in 0.8.1 and I really
> > > think we should treat this as a bug and revert the change to version 0.
> > >
> > > -Jay
> > >
> > > On Wed, Jan 7, 2015 at 9:24 AM, Jun Rao  wrote:
> > >
> > > > Yes, we did make an incompatible change in OffsetCommitRequest in
> > 0.8.2,
> > > > which is a mistake. The incompatible change was introduced in
> > KAFKA-1012
> > > in
> > > > Mar, 2014 when we added the kafka-based offset management support.
> > > However,
> > > > we didn't realize that this breaks the wire protocol until much
> later.
> > > Now,
> > > > the wire protocol has evolved again and it's a bit hard to fix the
> > format
> > > > in version 0. I can see a couple of options.
> > > >
> > > > Option 1: Just accept the incompatible change as it is.
> > > > The argument is that even though we introduced OffsetCommitRequest in
> > > > 0.8.1, it's not used in the high level consumer. It's possible that
> > some
> > > > users of SimpleConsumer started using it. However, that number is
> > likely
> > > > small. Also, the functionality of OffsetCommitRequest has changed
> since
> > > > it's writing the offset to a Kafka log, instead of ZK (for good
> > reasons).
> > > > So, we can document this as a wire protocol and functionality
> > > incompatible
> > > > change. For users who don't mind the functionality change, they will
> > need
> > > > to upgrade the client to the new protocol before they can use the new
> > > > broker. For users who want to preserve the old functionality, they
> will
> > > > have to write the offsets directly to ZK. In either case, hopefully
> the
> > > > number of people being affected is small.
> > > >
> > > > Option 2: Revert version 0 format to what's in 0.8.1.
> > > > There will be a few issues here. First, it's not clear how this
> affects
> > > > other people who have been deploying from trunk. Second,

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-07 Thread Jay Kreps
Yes, I think we are saying the same thing. Basically I am saying version 0
should be considered frozen as of the format and behavior in the 0.8.1
release and we can do whatever we want as a version 1+.

-Jay

On Wed, Jan 7, 2015 at 10:10 AM, Joe Stein  wrote:

> << We need to take the versioning of the protocol seriously
>
> amen
>
> << People are definitely using the offset commit functionality in 0.8.1
>
> agreed
>
> << I really think we should treat this as a bug and revert the change to
> version 0.
>
> What do you mean exactly by revert? Why can't we use version as a feature
> flag and support 0 and 1 at the same time? in the handleOffsetFetch and
> handleOffsetCommit functions that process the request messages just do if
> version == 0 old functionality else if version == 1 new functionality.
> This way everyone works and nothing breaks  =8^)
>
> /***
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop 
> /
>
> On Wed, Jan 7, 2015 at 1:04 PM, Jay Kreps  wrote:
>
> > Hey guys,
> >
> > We need to take the versioning of the protocol seriously. People are
> > definitely using the offset commit functionality in 0.8.1 and I really
> > think we should treat this as a bug and revert the change to version 0.
> >
> > -Jay
> >
> > On Wed, Jan 7, 2015 at 9:24 AM, Jun Rao  wrote:
> >
> > > Yes, we did make an incompatible change in OffsetCommitRequest in
> 0.8.2,
> > > which is a mistake. The incompatible change was introduced in
> KAFKA-1012
> > in
> > > Mar, 2014 when we added the kafka-based offset management support.
> > However,
> > > we didn't realize that this breaks the wire protocol until much later.
> > Now,
> > > the wire protocol has evolved again and it's a bit hard to fix the
> format
> > > in version 0. I can see a couple of options.
> > >
> > > Option 1: Just accept the incompatible change as it is.
> > > The argument is that even though we introduced OffsetCommitRequest in
> > > 0.8.1, it's not used in the high level consumer. It's possible that
> some
> > > users of SimpleConsumer started using it. However, that number is
> likely
> > > small. Also, the functionality of OffsetCommitRequest has changed since
> > > it's writing the offset to a Kafka log, instead of ZK (for good
> reasons).
> > > So, we can document this as a wire protocol and functionality
> > incompatible
> > > change. For users who don't mind the functionality change, they will
> need
> > > to upgrade the client to the new protocol before they can use the new
> > > broker. For users who want to preserve the old functionality, they will
> > > have to write the offsets directly to ZK. In either case, hopefully the
> > > number of people being affected is small.
> > >
> > > Option 2: Revert version 0 format to what's in 0.8.1.
> > > There will be a few issues here. First, it's not clear how this affects
> > > other people who have been deploying from trunk. Second, I am not sure
> > that
> > > we want to continue supporting writing the offset to ZK in
> > > OffsetCommitRequest
> > > since that can cause ZK to be overloaded.
> > >
> > > Joel Koshy,
> > >
> > > Any thoughts on this?
> > >
> > > Thanks,
> > >
> > > Jun
> > >
> > > On Mon, Jan 5, 2015 at 11:39 PM, Joe Stein 
> wrote:
> > >
> > > > In addition to the issue you bring up, the functionality as a whole
> has
> > > > changed.. when you call OffsetFetchRequest the version = 0 needs to
> > > > preserve the old functionality
> > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L678-L700
> > > > and version = 1 the new
> > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L153-L223
> > > > .
> > > > Also the OffsetFetchRequest functionality even though the wire
> protocol
> > > is
> > > > the same after the 0.8.2 upgrade for OffsetFetchRequest if you were
> > using
> > > > 0.8.1.1 OffsetFetchRequest
> > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L705-L728
> > > > will stop going to zookeeper and start going to Kafka storage
> > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L504-L519
> > > > so more errors will happen and things break too.
> > > >
> > > > I think we should treat the version field not just to stop from
> > breaking
> > > > the wire protocol calls but also as a "feature flag" preserving
> > upgrades
> > > > and multiple pathways.
> > > >
> > > > I updated the JIRA for the feature flag needs for OffsetFetch and
> > > > OffsetCommit too.
> > > >
> > > > /***
> > > >  Joe Stein
> > > >  Founder, Principal Consultant
> > > >  Big Data Open Source Securit

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-07 Thread Joe Stein
<< We need to take the versioning of the protocol seriously

amen

<< People are definitely using the offset commit functionality in 0.8.1

agreed

<< I really think we should treat this as a bug and revert the change to
version 0.

What do you mean exactly by revert? Why can't we use version as a feature
flag and support 0 and 1 at the same time? in the handleOffsetFetch and
handleOffsetCommit functions that process the request messages just do if
version == 0 old functionality else if version == 1 new functionality.
This way everyone works and nothing breaks  =8^)

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop 
/

On Wed, Jan 7, 2015 at 1:04 PM, Jay Kreps  wrote:

> Hey guys,
>
> We need to take the versioning of the protocol seriously. People are
> definitely using the offset commit functionality in 0.8.1 and I really
> think we should treat this as a bug and revert the change to version 0.
>
> -Jay
>
> On Wed, Jan 7, 2015 at 9:24 AM, Jun Rao  wrote:
>
> > Yes, we did make an incompatible change in OffsetCommitRequest in 0.8.2,
> > which is a mistake. The incompatible change was introduced in KAFKA-1012
> in
> > Mar, 2014 when we added the kafka-based offset management support.
> However,
> > we didn't realize that this breaks the wire protocol until much later.
> Now,
> > the wire protocol has evolved again and it's a bit hard to fix the format
> > in version 0. I can see a couple of options.
> >
> > Option 1: Just accept the incompatible change as it is.
> > The argument is that even though we introduced OffsetCommitRequest in
> > 0.8.1, it's not used in the high level consumer. It's possible that some
> > users of SimpleConsumer started using it. However, that number is likely
> > small. Also, the functionality of OffsetCommitRequest has changed since
> > it's writing the offset to a Kafka log, instead of ZK (for good reasons).
> > So, we can document this as a wire protocol and functionality
> incompatible
> > change. For users who don't mind the functionality change, they will need
> > to upgrade the client to the new protocol before they can use the new
> > broker. For users who want to preserve the old functionality, they will
> > have to write the offsets directly to ZK. In either case, hopefully the
> > number of people being affected is small.
> >
> > Option 2: Revert version 0 format to what's in 0.8.1.
> > There will be a few issues here. First, it's not clear how this affects
> > other people who have been deploying from trunk. Second, I am not sure
> that
> > we want to continue supporting writing the offset to ZK in
> > OffsetCommitRequest
> > since that can cause ZK to be overloaded.
> >
> > Joel Koshy,
> >
> > Any thoughts on this?
> >
> > Thanks,
> >
> > Jun
> >
> > On Mon, Jan 5, 2015 at 11:39 PM, Joe Stein  wrote:
> >
> > > In addition to the issue you bring up, the functionality as a whole has
> > > changed.. when you call OffsetFetchRequest the version = 0 needs to
> > > preserve the old functionality
> > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L678-L700
> > > and version = 1 the new
> > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L153-L223
> > > .
> > > Also the OffsetFetchRequest functionality even though the wire protocol
> > is
> > > the same after the 0.8.2 upgrade for OffsetFetchRequest if you were
> using
> > > 0.8.1.1 OffsetFetchRequest
> > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L705-L728
> > > will stop going to zookeeper and start going to Kafka storage
> > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L504-L519
> > > so more errors will happen and things break too.
> > >
> > > I think we should treat the version field not just to stop from
> breaking
> > > the wire protocol calls but also as a "feature flag" preserving
> upgrades
> > > and multiple pathways.
> > >
> > > I updated the JIRA for the feature flag needs for OffsetFetch and
> > > OffsetCommit too.
> > >
> > > /***
> > >  Joe Stein
> > >  Founder, Principal Consultant
> > >  Big Data Open Source Security LLC
> > >  http://www.stealth.ly
> > >  Twitter: @allthingshadoop 
> > > /
> > >
> > > On Mon, Jan 5, 2015 at 3:21 PM, Dana Powers  wrote:
> > >
> > > > ok, opened KAFKA-1841 .  KAFKA-1634 also related.
> > > >
> > > > -Dana
> > > >
> > > > On Mon, Jan 5, 2015 at 10:55 AM, Gwen Shapira  >
> > > > wrote:
> > > >
> > > > > Ooh, I see what you mean - the OffsetAndMetadata (or PartitionData)
> > > > > part of the Map changed, which will 

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-07 Thread Jay Kreps
Hey guys,

We need to take the versioning of the protocol seriously. People are
definitely using the offset commit functionality in 0.8.1 and I really
think we should treat this as a bug and revert the change to version 0.

-Jay

On Wed, Jan 7, 2015 at 9:24 AM, Jun Rao  wrote:

> Yes, we did make an incompatible change in OffsetCommitRequest in 0.8.2,
> which is a mistake. The incompatible change was introduced in KAFKA-1012 in
> Mar, 2014 when we added the kafka-based offset management support. However,
> we didn't realize that this breaks the wire protocol until much later. Now,
> the wire protocol has evolved again and it's a bit hard to fix the format
> in version 0. I can see a couple of options.
>
> Option 1: Just accept the incompatible change as it is.
> The argument is that even though we introduced OffsetCommitRequest in
> 0.8.1, it's not used in the high level consumer. It's possible that some
> users of SimpleConsumer started using it. However, that number is likely
> small. Also, the functionality of OffsetCommitRequest has changed since
> it's writing the offset to a Kafka log, instead of ZK (for good reasons).
> So, we can document this as a wire protocol and functionality incompatible
> change. For users who don't mind the functionality change, they will need
> to upgrade the client to the new protocol before they can use the new
> broker. For users who want to preserve the old functionality, they will
> have to write the offsets directly to ZK. In either case, hopefully the
> number of people being affected is small.
>
> Option 2: Revert version 0 format to what's in 0.8.1.
> There will be a few issues here. First, it's not clear how this affects
> other people who have been deploying from trunk. Second, I am not sure that
> we want to continue supporting writing the offset to ZK in
> OffsetCommitRequest
> since that can cause ZK to be overloaded.
>
> Joel Koshy,
>
> Any thoughts on this?
>
> Thanks,
>
> Jun
>
> On Mon, Jan 5, 2015 at 11:39 PM, Joe Stein  wrote:
>
> > In addition to the issue you bring up, the functionality as a whole has
> > changed.. when you call OffsetFetchRequest the version = 0 needs to
> > preserve the old functionality
> >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L678-L700
> > and version = 1 the new
> >
> >
> https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L153-L223
> > .
> > Also the OffsetFetchRequest functionality even though the wire protocol
> is
> > the same after the 0.8.2 upgrade for OffsetFetchRequest if you were using
> > 0.8.1.1 OffsetFetchRequest
> >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L705-L728
> > will stop going to zookeeper and start going to Kafka storage
> >
> >
> https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L504-L519
> > so more errors will happen and things break too.
> >
> > I think we should treat the version field not just to stop from breaking
> > the wire protocol calls but also as a "feature flag" preserving upgrades
> > and multiple pathways.
> >
> > I updated the JIRA for the feature flag needs for OffsetFetch and
> > OffsetCommit too.
> >
> > /***
> >  Joe Stein
> >  Founder, Principal Consultant
> >  Big Data Open Source Security LLC
> >  http://www.stealth.ly
> >  Twitter: @allthingshadoop 
> > /
> >
> > On Mon, Jan 5, 2015 at 3:21 PM, Dana Powers  wrote:
> >
> > > ok, opened KAFKA-1841 .  KAFKA-1634 also related.
> > >
> > > -Dana
> > >
> > > On Mon, Jan 5, 2015 at 10:55 AM, Gwen Shapira 
> > > wrote:
> > >
> > > > Ooh, I see what you mean - the OffsetAndMetadata (or PartitionData)
> > > > part of the Map changed, which will modify the wire protocol.
> > > >
> > > > This is actually not handled in the Java client either. It will send
> > > > the timestamp no matter which version is used.
> > > >
> > > > This looks like a bug and I'd even mark it as blocker for 0.8.2 since
> > > > it may prevent rolling upgrades.
> > > >
> > > > Are you opening the JIRA?
> > > >
> > > > Gwen
> > > >
> > > > On Mon, Jan 5, 2015 at 10:28 AM, Dana Powers 
> > wrote:
> > > > > specifically comparing 0.8.1 --
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L37-L50
> > > > > ```
> > > > > (1 to partitionCount).map(_ => {
> > > > >   val partitionId = buffer.getInt
> > > > >   val offset = buffer.getLong
> > > > >   val metadata = readShortString(buffer)
> > > > >   (TopicAndPartition(topic, partitionId),
> > > OffsetMetadataAndError(offset,
> > > > > metadata))
> > > > > })
> > > > > ```
> > > > >
> > > > > totrunk --
> > > > >
> > > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/api/OffsetComm

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-07 Thread Jun Rao
Yes, we did make an incompatible change in OffsetCommitRequest in 0.8.2,
which is a mistake. The incompatible change was introduced in KAFKA-1012 in
Mar, 2014 when we added the kafka-based offset management support. However,
we didn't realize that this breaks the wire protocol until much later. Now,
the wire protocol has evolved again and it's a bit hard to fix the format
in version 0. I can see a couple of options.

Option 1: Just accept the incompatible change as it is.
The argument is that even though we introduced OffsetCommitRequest in
0.8.1, it's not used in the high level consumer. It's possible that some
users of SimpleConsumer started using it. However, that number is likely
small. Also, the functionality of OffsetCommitRequest has changed since
it's writing the offset to a Kafka log, instead of ZK (for good reasons).
So, we can document this as a wire protocol and functionality incompatible
change. For users who don't mind the functionality change, they will need
to upgrade the client to the new protocol before they can use the new
broker. For users who want to preserve the old functionality, they will
have to write the offsets directly to ZK. In either case, hopefully the
number of people being affected is small.

Option 2: Revert version 0 format to what's in 0.8.1.
There will be a few issues here. First, it's not clear how this affects
other people who have been deploying from trunk. Second, I am not sure that
we want to continue supporting writing the offset to ZK in OffsetCommitRequest
since that can cause ZK to be overloaded.

Joel Koshy,

Any thoughts on this?

Thanks,

Jun

On Mon, Jan 5, 2015 at 11:39 PM, Joe Stein  wrote:

> In addition to the issue you bring up, the functionality as a whole has
> changed.. when you call OffsetFetchRequest the version = 0 needs to
> preserve the old functionality
>
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L678-L700
> and version = 1 the new
>
> https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L153-L223
> .
> Also the OffsetFetchRequest functionality even though the wire protocol is
> the same after the 0.8.2 upgrade for OffsetFetchRequest if you were using
> 0.8.1.1 OffsetFetchRequest
>
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L705-L728
> will stop going to zookeeper and start going to Kafka storage
>
> https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L504-L519
> so more errors will happen and things break too.
>
> I think we should treat the version field not just to stop from breaking
> the wire protocol calls but also as a "feature flag" preserving upgrades
> and multiple pathways.
>
> I updated the JIRA for the feature flag needs for OffsetFetch and
> OffsetCommit too.
>
> /***
>  Joe Stein
>  Founder, Principal Consultant
>  Big Data Open Source Security LLC
>  http://www.stealth.ly
>  Twitter: @allthingshadoop 
> /
>
> On Mon, Jan 5, 2015 at 3:21 PM, Dana Powers  wrote:
>
> > ok, opened KAFKA-1841 .  KAFKA-1634 also related.
> >
> > -Dana
> >
> > On Mon, Jan 5, 2015 at 10:55 AM, Gwen Shapira 
> > wrote:
> >
> > > Ooh, I see what you mean - the OffsetAndMetadata (or PartitionData)
> > > part of the Map changed, which will modify the wire protocol.
> > >
> > > This is actually not handled in the Java client either. It will send
> > > the timestamp no matter which version is used.
> > >
> > > This looks like a bug and I'd even mark it as blocker for 0.8.2 since
> > > it may prevent rolling upgrades.
> > >
> > > Are you opening the JIRA?
> > >
> > > Gwen
> > >
> > > On Mon, Jan 5, 2015 at 10:28 AM, Dana Powers 
> wrote:
> > > > specifically comparing 0.8.1 --
> > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L37-L50
> > > > ```
> > > > (1 to partitionCount).map(_ => {
> > > >   val partitionId = buffer.getInt
> > > >   val offset = buffer.getLong
> > > >   val metadata = readShortString(buffer)
> > > >   (TopicAndPartition(topic, partitionId),
> > OffsetMetadataAndError(offset,
> > > > metadata))
> > > > })
> > > > ```
> > > >
> > > > totrunk --
> > > >
> > > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L44-L69
> > > > ```
> > > > (1 to partitionCount).map(_ => {
> > > >   val partitionId = buffer.getInt
> > > >   val offset = buffer.getLong
> > > >   val timestamp = {
> > > > val given = buffer.getLong
> > > > if (given == -1L) now else given
> > > >   }
> > > >   val metadata = readShortString(buffer)
> > > >   (TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset,
> > > > metadata, timestamp))
> > > > })
> > > > ```
> > > >
> > > > should the `timestamp` buffer read be wrapped 

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-05 Thread Joe Stein
In addition to the issue you bring up, the functionality as a whole has
changed.. when you call OffsetFetchRequest the version = 0 needs to
preserve the old functionality
https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L678-L700
and version = 1 the new
https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L153-L223.
Also the OffsetFetchRequest functionality even though the wire protocol is
the same after the 0.8.2 upgrade for OffsetFetchRequest if you were using
0.8.1.1 OffsetFetchRequest
https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L705-L728
will stop going to zookeeper and start going to Kafka storage
https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L504-L519
so more errors will happen and things break too.

I think we should treat the version field not just to stop from breaking
the wire protocol calls but also as a "feature flag" preserving upgrades
and multiple pathways.

I updated the JIRA for the feature flag needs for OffsetFetch and
OffsetCommit too.

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop 
/

On Mon, Jan 5, 2015 at 3:21 PM, Dana Powers  wrote:

> ok, opened KAFKA-1841 .  KAFKA-1634 also related.
>
> -Dana
>
> On Mon, Jan 5, 2015 at 10:55 AM, Gwen Shapira 
> wrote:
>
> > Ooh, I see what you mean - the OffsetAndMetadata (or PartitionData)
> > part of the Map changed, which will modify the wire protocol.
> >
> > This is actually not handled in the Java client either. It will send
> > the timestamp no matter which version is used.
> >
> > This looks like a bug and I'd even mark it as blocker for 0.8.2 since
> > it may prevent rolling upgrades.
> >
> > Are you opening the JIRA?
> >
> > Gwen
> >
> > On Mon, Jan 5, 2015 at 10:28 AM, Dana Powers  wrote:
> > > specifically comparing 0.8.1 --
> > >
> > >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L37-L50
> > > ```
> > > (1 to partitionCount).map(_ => {
> > >   val partitionId = buffer.getInt
> > >   val offset = buffer.getLong
> > >   val metadata = readShortString(buffer)
> > >   (TopicAndPartition(topic, partitionId),
> OffsetMetadataAndError(offset,
> > > metadata))
> > > })
> > > ```
> > >
> > > totrunk --
> > >
> > >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L44-L69
> > > ```
> > > (1 to partitionCount).map(_ => {
> > >   val partitionId = buffer.getInt
> > >   val offset = buffer.getLong
> > >   val timestamp = {
> > > val given = buffer.getLong
> > > if (given == -1L) now else given
> > >   }
> > >   val metadata = readShortString(buffer)
> > >   (TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset,
> > > metadata, timestamp))
> > > })
> > > ```
> > >
> > > should the `timestamp` buffer read be wrapped in an api version check?
> > >
> > >
> > > Dana Powers
> > > Rdio, Inc.
> > > dana.pow...@rd.io
> > > rdio.com/people/dpkp/
> > >
> > > On Mon, Jan 5, 2015 at 9:49 AM, Gwen Shapira 
> > wrote:
> > >
> > >> Ah, I see :)
> > >>
> > >> The readFrom function basically tries to read two extra fields if you
> > >> are on version 1:
> > >>
> > >> if (versionId == 1) {
> > >>   groupGenerationId = buffer.getInt
> > >>   consumerId = readShortString(buffer)
> > >> }
> > >>
> > >> The rest looks identical in version 0 and 1, and still no timestamp in
> > >> sight...
> > >>
> > >> Gwen
> > >>
> > >> On Mon, Jan 5, 2015 at 9:33 AM, Dana Powers 
> wrote:
> > >> > Hi Gwen, I am using/writing kafka-python to construct api requests
> and
> > >> have
> > >> > not dug too deeply into the server source code.  But I believe it is
> > >> > kafka/api/OffsetCommitRequest.scala and specifically the readFrom
> > method
> > >> > used to decode the wire protocol.
> > >> >
> > >> > -Dana
> > >> > OffsetCommitRequest has two constructors now:
> > >> >
> > >> > For version 0:
> > >> >  OffsetCommitRequest(String groupId, Map > >> > PartitionData> offsetData)
> > >> >
> > >> > And version 1:
> > >> > OffsetCommitRequest(String groupId, int generationId, String
> > >> > consumerId, Map offsetData)
> > >> >
> > >> > None of them seem to require timestamps... so I'm not sure where you
> > >> > see that this is required. Can you share an example?
> > >> >
> > >> > Gwen
> > >> >
> > >> > On Sun, Jan 4, 2015 at 11:15 PM, Dana Powers 
> > wrote:
> > >> >> Hi Joel,
> > >> >>
> > >> >> I'm looking more closely at the OffsetCommitRequest wire protocol
> > change
> > >> >> you mentioned below, and I cannot figure out how to explicitly
> > >> construct a
> > >> >> request with the earlier version.  Should the api version be
> > different
> > >> for
> > >> >>

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-05 Thread Dana Powers
ok, opened KAFKA-1841 .  KAFKA-1634 also related.

-Dana

On Mon, Jan 5, 2015 at 10:55 AM, Gwen Shapira  wrote:

> Ooh, I see what you mean - the OffsetAndMetadata (or PartitionData)
> part of the Map changed, which will modify the wire protocol.
>
> This is actually not handled in the Java client either. It will send
> the timestamp no matter which version is used.
>
> This looks like a bug and I'd even mark it as blocker for 0.8.2 since
> it may prevent rolling upgrades.
>
> Are you opening the JIRA?
>
> Gwen
>
> On Mon, Jan 5, 2015 at 10:28 AM, Dana Powers  wrote:
> > specifically comparing 0.8.1 --
> >
> >
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L37-L50
> > ```
> > (1 to partitionCount).map(_ => {
> >   val partitionId = buffer.getInt
> >   val offset = buffer.getLong
> >   val metadata = readShortString(buffer)
> >   (TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset,
> > metadata))
> > })
> > ```
> >
> > totrunk --
> >
> >
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L44-L69
> > ```
> > (1 to partitionCount).map(_ => {
> >   val partitionId = buffer.getInt
> >   val offset = buffer.getLong
> >   val timestamp = {
> > val given = buffer.getLong
> > if (given == -1L) now else given
> >   }
> >   val metadata = readShortString(buffer)
> >   (TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset,
> > metadata, timestamp))
> > })
> > ```
> >
> > should the `timestamp` buffer read be wrapped in an api version check?
> >
> >
> > Dana Powers
> > Rdio, Inc.
> > dana.pow...@rd.io
> > rdio.com/people/dpkp/
> >
> > On Mon, Jan 5, 2015 at 9:49 AM, Gwen Shapira 
> wrote:
> >
> >> Ah, I see :)
> >>
> >> The readFrom function basically tries to read two extra fields if you
> >> are on version 1:
> >>
> >> if (versionId == 1) {
> >>   groupGenerationId = buffer.getInt
> >>   consumerId = readShortString(buffer)
> >> }
> >>
> >> The rest looks identical in version 0 and 1, and still no timestamp in
> >> sight...
> >>
> >> Gwen
> >>
> >> On Mon, Jan 5, 2015 at 9:33 AM, Dana Powers  wrote:
> >> > Hi Gwen, I am using/writing kafka-python to construct api requests and
> >> have
> >> > not dug too deeply into the server source code.  But I believe it is
> >> > kafka/api/OffsetCommitRequest.scala and specifically the readFrom
> method
> >> > used to decode the wire protocol.
> >> >
> >> > -Dana
> >> > OffsetCommitRequest has two constructors now:
> >> >
> >> > For version 0:
> >> >  OffsetCommitRequest(String groupId, Map >> > PartitionData> offsetData)
> >> >
> >> > And version 1:
> >> > OffsetCommitRequest(String groupId, int generationId, String
> >> > consumerId, Map offsetData)
> >> >
> >> > None of them seem to require timestamps... so I'm not sure where you
> >> > see that this is required. Can you share an example?
> >> >
> >> > Gwen
> >> >
> >> > On Sun, Jan 4, 2015 at 11:15 PM, Dana Powers 
> wrote:
> >> >> Hi Joel,
> >> >>
> >> >> I'm looking more closely at the OffsetCommitRequest wire protocol
> change
> >> >> you mentioned below, and I cannot figure out how to explicitly
> >> construct a
> >> >> request with the earlier version.  Should the api version be
> different
> >> for
> >> >> requests that do not include it and/or servers that do not support
> the
> >> >> timestamp field?  It looks like 0.8.1.1 did not include the timestamp
> >> > field
> >> >> and used api version 0.  But 0.8.2-beta seems to now require
> timestamps
> >> >> even when I explicitly encode OffsetCommitRequest api version 0
> (server
> >> >> logs a BufferUnderflowException).
> >> >>
> >> >> Is this the expected server behavior?  Can you provide any tips on
> how
> >> >> third-party clients should manage the wire-protocol change for this
> api
> >> >> method (I'm working on kafka-python)?
> >> >>
> >> >> Thanks,
> >> >>
> >> >> -Dana
> >> >>
> >> >> On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy 
> >> wrote:
> >> >>
> >> >>> Yes it should be backwards compatible. So for e.g., you should be
> able
> >> >>> to use an 0.8.1 client with an 0.8.2 broker. In general, you should
> >> >>> not upgrade your clients until after the brokers have been upgraded.
> >> >>> However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
> >> >>> protocol change I'm aware of is the OffsetCommitRequest.  There is a
> >> >>> change in the OffsetCommitRequest format (KAFKA-1634) although you
> can
> >> >>> explicitly construct an OffsetCommitRequest with the earlier
> version.
> >> >>>
> >> >>> Thanks,
> >> >>>
> >> >>> Joel
> >> >>>
> >> >>> On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
> >> >>> > Hi Joel,
> >> >>> >
> >> >>> > Thanks for all the clarifications!  Just another question on this:
> >> will
> >> >>> > 0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with
> 0.8?
> >> >>> > Generally speaking, would there be any concerns with using the
> 0.8.2
> >> >>> >

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-05 Thread Gwen Shapira
Ooh, I see what you mean - the OffsetAndMetadata (or PartitionData)
part of the Map changed, which will modify the wire protocol.

This is actually not handled in the Java client either. It will send
the timestamp no matter which version is used.

This looks like a bug and I'd even mark it as blocker for 0.8.2 since
it may prevent rolling upgrades.

Are you opening the JIRA?

Gwen

On Mon, Jan 5, 2015 at 10:28 AM, Dana Powers  wrote:
> specifically comparing 0.8.1 --
>
> https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L37-L50
> ```
> (1 to partitionCount).map(_ => {
>   val partitionId = buffer.getInt
>   val offset = buffer.getLong
>   val metadata = readShortString(buffer)
>   (TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset,
> metadata))
> })
> ```
>
> totrunk --
>
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L44-L69
> ```
> (1 to partitionCount).map(_ => {
>   val partitionId = buffer.getInt
>   val offset = buffer.getLong
>   val timestamp = {
> val given = buffer.getLong
> if (given == -1L) now else given
>   }
>   val metadata = readShortString(buffer)
>   (TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset,
> metadata, timestamp))
> })
> ```
>
> should the `timestamp` buffer read be wrapped in an api version check?
>
>
> Dana Powers
> Rdio, Inc.
> dana.pow...@rd.io
> rdio.com/people/dpkp/
>
> On Mon, Jan 5, 2015 at 9:49 AM, Gwen Shapira  wrote:
>
>> Ah, I see :)
>>
>> The readFrom function basically tries to read two extra fields if you
>> are on version 1:
>>
>> if (versionId == 1) {
>>   groupGenerationId = buffer.getInt
>>   consumerId = readShortString(buffer)
>> }
>>
>> The rest looks identical in version 0 and 1, and still no timestamp in
>> sight...
>>
>> Gwen
>>
>> On Mon, Jan 5, 2015 at 9:33 AM, Dana Powers  wrote:
>> > Hi Gwen, I am using/writing kafka-python to construct api requests and
>> have
>> > not dug too deeply into the server source code.  But I believe it is
>> > kafka/api/OffsetCommitRequest.scala and specifically the readFrom method
>> > used to decode the wire protocol.
>> >
>> > -Dana
>> > OffsetCommitRequest has two constructors now:
>> >
>> > For version 0:
>> >  OffsetCommitRequest(String groupId, Map> > PartitionData> offsetData)
>> >
>> > And version 1:
>> > OffsetCommitRequest(String groupId, int generationId, String
>> > consumerId, Map offsetData)
>> >
>> > None of them seem to require timestamps... so I'm not sure where you
>> > see that this is required. Can you share an example?
>> >
>> > Gwen
>> >
>> > On Sun, Jan 4, 2015 at 11:15 PM, Dana Powers  wrote:
>> >> Hi Joel,
>> >>
>> >> I'm looking more closely at the OffsetCommitRequest wire protocol change
>> >> you mentioned below, and I cannot figure out how to explicitly
>> construct a
>> >> request with the earlier version.  Should the api version be different
>> for
>> >> requests that do not include it and/or servers that do not support the
>> >> timestamp field?  It looks like 0.8.1.1 did not include the timestamp
>> > field
>> >> and used api version 0.  But 0.8.2-beta seems to now require timestamps
>> >> even when I explicitly encode OffsetCommitRequest api version 0 (server
>> >> logs a BufferUnderflowException).
>> >>
>> >> Is this the expected server behavior?  Can you provide any tips on how
>> >> third-party clients should manage the wire-protocol change for this api
>> >> method (I'm working on kafka-python)?
>> >>
>> >> Thanks,
>> >>
>> >> -Dana
>> >>
>> >> On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy 
>> wrote:
>> >>
>> >>> Yes it should be backwards compatible. So for e.g., you should be able
>> >>> to use an 0.8.1 client with an 0.8.2 broker. In general, you should
>> >>> not upgrade your clients until after the brokers have been upgraded.
>> >>> However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
>> >>> protocol change I'm aware of is the OffsetCommitRequest.  There is a
>> >>> change in the OffsetCommitRequest format (KAFKA-1634) although you can
>> >>> explicitly construct an OffsetCommitRequest with the earlier version.
>> >>>
>> >>> Thanks,
>> >>>
>> >>> Joel
>> >>>
>> >>> On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
>> >>> > Hi Joel,
>> >>> >
>> >>> > Thanks for all the clarifications!  Just another question on this:
>> will
>> >>> > 0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8?
>> >>> > Generally speaking, would there be any concerns with using the 0.8.2
>> >>> > consumer with a 0.8.1 broker, for instance?
>> >>> >
>> >>> > Marius
>> >>> >
>> >>> > On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy 
>> > wrote:
>> >>> >
>> >>> > > Inline..
>> >>> > >
>> >>> > > On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
>> >>> > > > Hello everyone,
>> >>> > > >
>> >>> > > > I have a few questions about the current status and future of the
>> >>> Kafka
>> >>> > > > consumers

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-05 Thread Dana Powers
specifically comparing 0.8.1 --

https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L37-L50
```
(1 to partitionCount).map(_ => {
  val partitionId = buffer.getInt
  val offset = buffer.getLong
  val metadata = readShortString(buffer)
  (TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset,
metadata))
})
```

totrunk --

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L44-L69
```
(1 to partitionCount).map(_ => {
  val partitionId = buffer.getInt
  val offset = buffer.getLong
  val timestamp = {
val given = buffer.getLong
if (given == -1L) now else given
  }
  val metadata = readShortString(buffer)
  (TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset,
metadata, timestamp))
})
```

should the `timestamp` buffer read be wrapped in an api version check?


Dana Powers
Rdio, Inc.
dana.pow...@rd.io
rdio.com/people/dpkp/

On Mon, Jan 5, 2015 at 9:49 AM, Gwen Shapira  wrote:

> Ah, I see :)
>
> The readFrom function basically tries to read two extra fields if you
> are on version 1:
>
> if (versionId == 1) {
>   groupGenerationId = buffer.getInt
>   consumerId = readShortString(buffer)
> }
>
> The rest looks identical in version 0 and 1, and still no timestamp in
> sight...
>
> Gwen
>
> On Mon, Jan 5, 2015 at 9:33 AM, Dana Powers  wrote:
> > Hi Gwen, I am using/writing kafka-python to construct api requests and
> have
> > not dug too deeply into the server source code.  But I believe it is
> > kafka/api/OffsetCommitRequest.scala and specifically the readFrom method
> > used to decode the wire protocol.
> >
> > -Dana
> > OffsetCommitRequest has two constructors now:
> >
> > For version 0:
> >  OffsetCommitRequest(String groupId, Map > PartitionData> offsetData)
> >
> > And version 1:
> > OffsetCommitRequest(String groupId, int generationId, String
> > consumerId, Map offsetData)
> >
> > None of them seem to require timestamps... so I'm not sure where you
> > see that this is required. Can you share an example?
> >
> > Gwen
> >
> > On Sun, Jan 4, 2015 at 11:15 PM, Dana Powers  wrote:
> >> Hi Joel,
> >>
> >> I'm looking more closely at the OffsetCommitRequest wire protocol change
> >> you mentioned below, and I cannot figure out how to explicitly
> construct a
> >> request with the earlier version.  Should the api version be different
> for
> >> requests that do not include it and/or servers that do not support the
> >> timestamp field?  It looks like 0.8.1.1 did not include the timestamp
> > field
> >> and used api version 0.  But 0.8.2-beta seems to now require timestamps
> >> even when I explicitly encode OffsetCommitRequest api version 0 (server
> >> logs a BufferUnderflowException).
> >>
> >> Is this the expected server behavior?  Can you provide any tips on how
> >> third-party clients should manage the wire-protocol change for this api
> >> method (I'm working on kafka-python)?
> >>
> >> Thanks,
> >>
> >> -Dana
> >>
> >> On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy 
> wrote:
> >>
> >>> Yes it should be backwards compatible. So for e.g., you should be able
> >>> to use an 0.8.1 client with an 0.8.2 broker. In general, you should
> >>> not upgrade your clients until after the brokers have been upgraded.
> >>> However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
> >>> protocol change I'm aware of is the OffsetCommitRequest.  There is a
> >>> change in the OffsetCommitRequest format (KAFKA-1634) although you can
> >>> explicitly construct an OffsetCommitRequest with the earlier version.
> >>>
> >>> Thanks,
> >>>
> >>> Joel
> >>>
> >>> On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
> >>> > Hi Joel,
> >>> >
> >>> > Thanks for all the clarifications!  Just another question on this:
> will
> >>> > 0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8?
> >>> > Generally speaking, would there be any concerns with using the 0.8.2
> >>> > consumer with a 0.8.1 broker, for instance?
> >>> >
> >>> > Marius
> >>> >
> >>> > On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy 
> > wrote:
> >>> >
> >>> > > Inline..
> >>> > >
> >>> > > On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
> >>> > > > Hello everyone,
> >>> > > >
> >>> > > > I have a few questions about the current status and future of the
> >>> Kafka
> >>> > > > consumers.
> >>> > > >
> >>> > > > We have been working to adding Kafka support in Spring XD [1],
> >>> currently
> >>> > > > using the high level consumer via Spring Integration Kafka [2].
> We
> >>> are
> >>> > > > working on adding features such as:
> >>> > > > - the ability to control offsets/replay topics;
> >>> > > > - the ability to control partition allocation across multiple
> >>> consumers;
> >>> > > >
> >>> > > > We are currently at version 0.8.1.1, so using the simple consumer
> > is
> >>> a
> >>> > > > pretty straightforward choice right now. However, in the light of
> > the
> >>> > > > upcoming c

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-05 Thread Gwen Shapira
Ah, I see :)

The readFrom function basically tries to read two extra fields if you
are on version 1:

if (versionId == 1) {
  groupGenerationId = buffer.getInt
  consumerId = readShortString(buffer)
}

The rest looks identical in version 0 and 1, and still no timestamp in sight...

Gwen

On Mon, Jan 5, 2015 at 9:33 AM, Dana Powers  wrote:
> Hi Gwen, I am using/writing kafka-python to construct api requests and have
> not dug too deeply into the server source code.  But I believe it is
> kafka/api/OffsetCommitRequest.scala and specifically the readFrom method
> used to decode the wire protocol.
>
> -Dana
> OffsetCommitRequest has two constructors now:
>
> For version 0:
>  OffsetCommitRequest(String groupId, Map PartitionData> offsetData)
>
> And version 1:
> OffsetCommitRequest(String groupId, int generationId, String
> consumerId, Map offsetData)
>
> None of them seem to require timestamps... so I'm not sure where you
> see that this is required. Can you share an example?
>
> Gwen
>
> On Sun, Jan 4, 2015 at 11:15 PM, Dana Powers  wrote:
>> Hi Joel,
>>
>> I'm looking more closely at the OffsetCommitRequest wire protocol change
>> you mentioned below, and I cannot figure out how to explicitly construct a
>> request with the earlier version.  Should the api version be different for
>> requests that do not include it and/or servers that do not support the
>> timestamp field?  It looks like 0.8.1.1 did not include the timestamp
> field
>> and used api version 0.  But 0.8.2-beta seems to now require timestamps
>> even when I explicitly encode OffsetCommitRequest api version 0 (server
>> logs a BufferUnderflowException).
>>
>> Is this the expected server behavior?  Can you provide any tips on how
>> third-party clients should manage the wire-protocol change for this api
>> method (I'm working on kafka-python)?
>>
>> Thanks,
>>
>> -Dana
>>
>> On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy  wrote:
>>
>>> Yes it should be backwards compatible. So for e.g., you should be able
>>> to use an 0.8.1 client with an 0.8.2 broker. In general, you should
>>> not upgrade your clients until after the brokers have been upgraded.
>>> However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
>>> protocol change I'm aware of is the OffsetCommitRequest.  There is a
>>> change in the OffsetCommitRequest format (KAFKA-1634) although you can
>>> explicitly construct an OffsetCommitRequest with the earlier version.
>>>
>>> Thanks,
>>>
>>> Joel
>>>
>>> On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
>>> > Hi Joel,
>>> >
>>> > Thanks for all the clarifications!  Just another question on this: will
>>> > 0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8?
>>> > Generally speaking, would there be any concerns with using the 0.8.2
>>> > consumer with a 0.8.1 broker, for instance?
>>> >
>>> > Marius
>>> >
>>> > On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy 
> wrote:
>>> >
>>> > > Inline..
>>> > >
>>> > > On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
>>> > > > Hello everyone,
>>> > > >
>>> > > > I have a few questions about the current status and future of the
>>> Kafka
>>> > > > consumers.
>>> > > >
>>> > > > We have been working to adding Kafka support in Spring XD [1],
>>> currently
>>> > > > using the high level consumer via Spring Integration Kafka [2]. We
>>> are
>>> > > > working on adding features such as:
>>> > > > - the ability to control offsets/replay topics;
>>> > > > - the ability to control partition allocation across multiple
>>> consumers;
>>> > > >
>>> > > > We are currently at version 0.8.1.1, so using the simple consumer
> is
>>> a
>>> > > > pretty straightforward choice right now. However, in the light of
> the
>>> > > > upcoming consumer changes for 0.8.2 and 0.9, I have a few
> questions:
>>> > > >
>>> > > > 1) With respect to the consumer redesign for 0.9, what is the
> future
>>> of
>>> > > the
>>> > > > Simple Consumer and High Level Consumer? To my best understanding,
>>> the
>>> > > > existing high level consumer API will be deprecated in favour of
> the
>>> new
>>> > > > consumer API. What is the future of the Simple Consumer, in this
>>> case? it
>>> > > > will continue to exist as a low-level API implementing the Kafka
>>> protocol
>>> > > > [3] and providing the building blocks for the new consumer, or will
>>> it be
>>> > > > deprecated as well?
>>> > >
>>> > > The new consumer will subsume both use-cases (simple and high-level).
>>> > > You can still use the old SimpleConsumer if you wish - i.e., the wire
>>> > > protocol for fetch and other requests will still be supported.
>>> > >
>>> > > >
>>> > > > 2) Regarding the new consumer: the v0.8.2 codebase contains an
> early
>>> > > > implementation of it, but since this a feature scheduled only for
>>> 0.9,
>>> > > what
>>> > > > is its status as well? Is it included only as a future reference
> and
>>> for
>>> > > > stabilizing the API?
>>> > >
>>> > > It is a WIP so you cannot really u

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-05 Thread Dana Powers
Hi Gwen, I am using/writing kafka-python to construct api requests and have
not dug too deeply into the server source code.  But I believe it is
kafka/api/OffsetCommitRequest.scala and specifically the readFrom method
used to decode the wire protocol.

-Dana
OffsetCommitRequest has two constructors now:

For version 0:
 OffsetCommitRequest(String groupId, Map offsetData)

And version 1:
OffsetCommitRequest(String groupId, int generationId, String
consumerId, Map offsetData)

None of them seem to require timestamps... so I'm not sure where you
see that this is required. Can you share an example?

Gwen

On Sun, Jan 4, 2015 at 11:15 PM, Dana Powers  wrote:
> Hi Joel,
>
> I'm looking more closely at the OffsetCommitRequest wire protocol change
> you mentioned below, and I cannot figure out how to explicitly construct a
> request with the earlier version.  Should the api version be different for
> requests that do not include it and/or servers that do not support the
> timestamp field?  It looks like 0.8.1.1 did not include the timestamp
field
> and used api version 0.  But 0.8.2-beta seems to now require timestamps
> even when I explicitly encode OffsetCommitRequest api version 0 (server
> logs a BufferUnderflowException).
>
> Is this the expected server behavior?  Can you provide any tips on how
> third-party clients should manage the wire-protocol change for this api
> method (I'm working on kafka-python)?
>
> Thanks,
>
> -Dana
>
> On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy  wrote:
>
>> Yes it should be backwards compatible. So for e.g., you should be able
>> to use an 0.8.1 client with an 0.8.2 broker. In general, you should
>> not upgrade your clients until after the brokers have been upgraded.
>> However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
>> protocol change I'm aware of is the OffsetCommitRequest.  There is a
>> change in the OffsetCommitRequest format (KAFKA-1634) although you can
>> explicitly construct an OffsetCommitRequest with the earlier version.
>>
>> Thanks,
>>
>> Joel
>>
>> On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
>> > Hi Joel,
>> >
>> > Thanks for all the clarifications!  Just another question on this: will
>> > 0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8?
>> > Generally speaking, would there be any concerns with using the 0.8.2
>> > consumer with a 0.8.1 broker, for instance?
>> >
>> > Marius
>> >
>> > On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy 
wrote:
>> >
>> > > Inline..
>> > >
>> > > On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
>> > > > Hello everyone,
>> > > >
>> > > > I have a few questions about the current status and future of the
>> Kafka
>> > > > consumers.
>> > > >
>> > > > We have been working to adding Kafka support in Spring XD [1],
>> currently
>> > > > using the high level consumer via Spring Integration Kafka [2]. We
>> are
>> > > > working on adding features such as:
>> > > > - the ability to control offsets/replay topics;
>> > > > - the ability to control partition allocation across multiple
>> consumers;
>> > > >
>> > > > We are currently at version 0.8.1.1, so using the simple consumer
is
>> a
>> > > > pretty straightforward choice right now. However, in the light of
the
>> > > > upcoming consumer changes for 0.8.2 and 0.9, I have a few
questions:
>> > > >
>> > > > 1) With respect to the consumer redesign for 0.9, what is the
future
>> of
>> > > the
>> > > > Simple Consumer and High Level Consumer? To my best understanding,
>> the
>> > > > existing high level consumer API will be deprecated in favour of
the
>> new
>> > > > consumer API. What is the future of the Simple Consumer, in this
>> case? it
>> > > > will continue to exist as a low-level API implementing the Kafka
>> protocol
>> > > > [3] and providing the building blocks for the new consumer, or will
>> it be
>> > > > deprecated as well?
>> > >
>> > > The new consumer will subsume both use-cases (simple and high-level).
>> > > You can still use the old SimpleConsumer if you wish - i.e., the wire
>> > > protocol for fetch and other requests will still be supported.
>> > >
>> > > >
>> > > > 2) Regarding the new consumer: the v0.8.2 codebase contains an
early
>> > > > implementation of it, but since this a feature scheduled only for
>> 0.9,
>> > > what
>> > > > is its status as well? Is it included only as a future reference
and
>> for
>> > > > stabilizing the API?
>> > >
>> > > It is a WIP so you cannot really use it.
>> > >
>> > > > 3) Obviously, offset management is a concern if using the simple
>> > > consumer,
>> > > > so - wondering about the Offset Management API as well. The Kafka
>> > > protocol
>> > > > document specifically indicates that it will be fully functional in
>> 0.8.2
>> > > > [4] - however, a functional implementation is already available in
>> > > 0.8.1.1
>> > > > (accessible via the SimpleConsumer API but not documented in [5]).
>> Again,
>> > > > trying to understand the extent of what 0.8.1.1 alre

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-05 Thread Gwen Shapira
OffsetCommitRequest has two constructors now:

For version 0:
 OffsetCommitRequest(String groupId, Map offsetData)

And version 1:
OffsetCommitRequest(String groupId, int generationId, String
consumerId, Map offsetData)

None of them seem to require timestamps... so I'm not sure where you
see that this is required. Can you share an example?

Gwen

On Sun, Jan 4, 2015 at 11:15 PM, Dana Powers  wrote:
> Hi Joel,
>
> I'm looking more closely at the OffsetCommitRequest wire protocol change
> you mentioned below, and I cannot figure out how to explicitly construct a
> request with the earlier version.  Should the api version be different for
> requests that do not include it and/or servers that do not support the
> timestamp field?  It looks like 0.8.1.1 did not include the timestamp field
> and used api version 0.  But 0.8.2-beta seems to now require timestamps
> even when I explicitly encode OffsetCommitRequest api version 0 (server
> logs a BufferUnderflowException).
>
> Is this the expected server behavior?  Can you provide any tips on how
> third-party clients should manage the wire-protocol change for this api
> method (I'm working on kafka-python)?
>
> Thanks,
>
> -Dana
>
> On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy  wrote:
>
>> Yes it should be backwards compatible. So for e.g., you should be able
>> to use an 0.8.1 client with an 0.8.2 broker. In general, you should
>> not upgrade your clients until after the brokers have been upgraded.
>> However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
>> protocol change I'm aware of is the OffsetCommitRequest.  There is a
>> change in the OffsetCommitRequest format (KAFKA-1634) although you can
>> explicitly construct an OffsetCommitRequest with the earlier version.
>>
>> Thanks,
>>
>> Joel
>>
>> On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
>> > Hi Joel,
>> >
>> > Thanks for all the clarifications!  Just another question on this: will
>> > 0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8?
>> > Generally speaking, would there be any concerns with using the 0.8.2
>> > consumer with a 0.8.1 broker, for instance?
>> >
>> > Marius
>> >
>> > On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy  wrote:
>> >
>> > > Inline..
>> > >
>> > > On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
>> > > > Hello everyone,
>> > > >
>> > > > I have a few questions about the current status and future of the
>> Kafka
>> > > > consumers.
>> > > >
>> > > > We have been working to adding Kafka support in Spring XD [1],
>> currently
>> > > > using the high level consumer via Spring Integration Kafka [2]. We
>> are
>> > > > working on adding features such as:
>> > > > - the ability to control offsets/replay topics;
>> > > > - the ability to control partition allocation across multiple
>> consumers;
>> > > >
>> > > > We are currently at version 0.8.1.1, so using the simple consumer is
>> a
>> > > > pretty straightforward choice right now. However, in the light of the
>> > > > upcoming consumer changes for 0.8.2 and 0.9, I have a few questions:
>> > > >
>> > > > 1) With respect to the consumer redesign for 0.9, what is the future
>> of
>> > > the
>> > > > Simple Consumer and High Level Consumer? To my best understanding,
>> the
>> > > > existing high level consumer API will be deprecated in favour of the
>> new
>> > > > consumer API. What is the future of the Simple Consumer, in this
>> case? it
>> > > > will continue to exist as a low-level API implementing the Kafka
>> protocol
>> > > > [3] and providing the building blocks for the new consumer, or will
>> it be
>> > > > deprecated as well?
>> > >
>> > > The new consumer will subsume both use-cases (simple and high-level).
>> > > You can still use the old SimpleConsumer if you wish - i.e., the wire
>> > > protocol for fetch and other requests will still be supported.
>> > >
>> > > >
>> > > > 2) Regarding the new consumer: the v0.8.2 codebase contains an early
>> > > > implementation of it, but since this a feature scheduled only for
>> 0.9,
>> > > what
>> > > > is its status as well? Is it included only as a future reference and
>> for
>> > > > stabilizing the API?
>> > >
>> > > It is a WIP so you cannot really use it.
>> > >
>> > > > 3) Obviously, offset management is a concern if using the simple
>> > > consumer,
>> > > > so - wondering about the Offset Management API as well. The Kafka
>> > > protocol
>> > > > document specifically indicates that it will be fully functional in
>> 0.8.2
>> > > > [4] - however, a functional implementation is already available in
>> > > 0.8.1.1
>> > > > (accessible via the SimpleConsumer API but not documented in [5]).
>> Again,
>> > > > trying to understand the extent of what 0.8.1.1 already supports
>> > > > (ostensibly, the offset manager support seems to have been added
>> only in
>> > > > 0.8.2 - please correct me if I am wrong), and whether if it is
>> > > recommended
>> > > > for use in production in any form (with the caveats that acco

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-04 Thread Dana Powers
Hi Joel,

I'm looking more closely at the OffsetCommitRequest wire protocol change
you mentioned below, and I cannot figure out how to explicitly construct a
request with the earlier version.  Should the api version be different for
requests that do not include it and/or servers that do not support the
timestamp field?  It looks like 0.8.1.1 did not include the timestamp field
and used api version 0.  But 0.8.2-beta seems to now require timestamps
even when I explicitly encode OffsetCommitRequest api version 0 (server
logs a BufferUnderflowException).

Is this the expected server behavior?  Can you provide any tips on how
third-party clients should manage the wire-protocol change for this api
method (I'm working on kafka-python)?

Thanks,

-Dana

On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy  wrote:

> Yes it should be backwards compatible. So for e.g., you should be able
> to use an 0.8.1 client with an 0.8.2 broker. In general, you should
> not upgrade your clients until after the brokers have been upgraded.
> However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
> protocol change I'm aware of is the OffsetCommitRequest.  There is a
> change in the OffsetCommitRequest format (KAFKA-1634) although you can
> explicitly construct an OffsetCommitRequest with the earlier version.
>
> Thanks,
>
> Joel
>
> On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
> > Hi Joel,
> >
> > Thanks for all the clarifications!  Just another question on this: will
> > 0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8?
> > Generally speaking, would there be any concerns with using the 0.8.2
> > consumer with a 0.8.1 broker, for instance?
> >
> > Marius
> >
> > On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy  wrote:
> >
> > > Inline..
> > >
> > > On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
> > > > Hello everyone,
> > > >
> > > > I have a few questions about the current status and future of the
> Kafka
> > > > consumers.
> > > >
> > > > We have been working to adding Kafka support in Spring XD [1],
> currently
> > > > using the high level consumer via Spring Integration Kafka [2]. We
> are
> > > > working on adding features such as:
> > > > - the ability to control offsets/replay topics;
> > > > - the ability to control partition allocation across multiple
> consumers;
> > > >
> > > > We are currently at version 0.8.1.1, so using the simple consumer is
> a
> > > > pretty straightforward choice right now. However, in the light of the
> > > > upcoming consumer changes for 0.8.2 and 0.9, I have a few questions:
> > > >
> > > > 1) With respect to the consumer redesign for 0.9, what is the future
> of
> > > the
> > > > Simple Consumer and High Level Consumer? To my best understanding,
> the
> > > > existing high level consumer API will be deprecated in favour of the
> new
> > > > consumer API. What is the future of the Simple Consumer, in this
> case? it
> > > > will continue to exist as a low-level API implementing the Kafka
> protocol
> > > > [3] and providing the building blocks for the new consumer, or will
> it be
> > > > deprecated as well?
> > >
> > > The new consumer will subsume both use-cases (simple and high-level).
> > > You can still use the old SimpleConsumer if you wish - i.e., the wire
> > > protocol for fetch and other requests will still be supported.
> > >
> > > >
> > > > 2) Regarding the new consumer: the v0.8.2 codebase contains an early
> > > > implementation of it, but since this a feature scheduled only for
> 0.9,
> > > what
> > > > is its status as well? Is it included only as a future reference and
> for
> > > > stabilizing the API?
> > >
> > > It is a WIP so you cannot really use it.
> > >
> > > > 3) Obviously, offset management is a concern if using the simple
> > > consumer,
> > > > so - wondering about the Offset Management API as well. The Kafka
> > > protocol
> > > > document specifically indicates that it will be fully functional in
> 0.8.2
> > > > [4] - however, a functional implementation is already available in
> > > 0.8.1.1
> > > > (accessible via the SimpleConsumer API but not documented in [5]).
> Again,
> > > > trying to understand the extent of what 0.8.1.1 already supports
> > > > (ostensibly, the offset manager support seems to have been added
> only in
> > > > 0.8.2 - please correct me if I am wrong), and whether if it is
> > > recommended
> > > > for use in production in any form (with the caveats that accompany
> the
> > > use
> > > > of ZooKeeper).
> > >
> > > In 0.8.2 the OffsetCommitRequest and OffsetFetchRequest will use Kafka
> > > as the offsets storage mechanism (not zookeeper). High-level Java
> > > consumers can choose to store offsets in ZooKeeper instead by setting
> > > offsets.storage=zookeeper
> > >
> > > However, if you are using the simple consumer and wish to store
> > > offsets in ZooKeeper you will need to commit to ZooKeeper directly.
> > > You can use ZkUtils in the kafka.utils package for this.
> > >
> > >

Re: Consumer and offset management support in 0.8.2 and 0.9

2014-11-18 Thread Joel Koshy
Yes it should be backwards compatible. So for e.g., you should be able
to use an 0.8.1 client with an 0.8.2 broker. In general, you should
not upgrade your clients until after the brokers have been upgraded.
However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
protocol change I'm aware of is the OffsetCommitRequest.  There is a
change in the OffsetCommitRequest format (KAFKA-1634) although you can
explicitly construct an OffsetCommitRequest with the earlier version.

Thanks,

Joel

On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
> Hi Joel,
> 
> Thanks for all the clarifications!  Just another question on this: will
> 0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8?
> Generally speaking, would there be any concerns with using the 0.8.2
> consumer with a 0.8.1 broker, for instance?
> 
> Marius
> 
> On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy  wrote:
> 
> > Inline..
> >
> > On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
> > > Hello everyone,
> > >
> > > I have a few questions about the current status and future of the Kafka
> > > consumers.
> > >
> > > We have been working to adding Kafka support in Spring XD [1], currently
> > > using the high level consumer via Spring Integration Kafka [2]. We are
> > > working on adding features such as:
> > > - the ability to control offsets/replay topics;
> > > - the ability to control partition allocation across multiple consumers;
> > >
> > > We are currently at version 0.8.1.1, so using the simple consumer is a
> > > pretty straightforward choice right now. However, in the light of the
> > > upcoming consumer changes for 0.8.2 and 0.9, I have a few questions:
> > >
> > > 1) With respect to the consumer redesign for 0.9, what is the future of
> > the
> > > Simple Consumer and High Level Consumer? To my best understanding, the
> > > existing high level consumer API will be deprecated in favour of the new
> > > consumer API. What is the future of the Simple Consumer, in this case? it
> > > will continue to exist as a low-level API implementing the Kafka protocol
> > > [3] and providing the building blocks for the new consumer, or will it be
> > > deprecated as well?
> >
> > The new consumer will subsume both use-cases (simple and high-level).
> > You can still use the old SimpleConsumer if you wish - i.e., the wire
> > protocol for fetch and other requests will still be supported.
> >
> > >
> > > 2) Regarding the new consumer: the v0.8.2 codebase contains an early
> > > implementation of it, but since this a feature scheduled only for 0.9,
> > what
> > > is its status as well? Is it included only as a future reference and for
> > > stabilizing the API?
> >
> > It is a WIP so you cannot really use it.
> >
> > > 3) Obviously, offset management is a concern if using the simple
> > consumer,
> > > so - wondering about the Offset Management API as well. The Kafka
> > protocol
> > > document specifically indicates that it will be fully functional in 0.8.2
> > > [4] - however, a functional implementation is already available in
> > 0.8.1.1
> > > (accessible via the SimpleConsumer API but not documented in [5]). Again,
> > > trying to understand the extent of what 0.8.1.1 already supports
> > > (ostensibly, the offset manager support seems to have been added only in
> > > 0.8.2 - please correct me if I am wrong), and whether if it is
> > recommended
> > > for use in production in any form (with the caveats that accompany the
> > use
> > > of ZooKeeper).
> >
> > In 0.8.2 the OffsetCommitRequest and OffsetFetchRequest will use Kafka
> > as the offsets storage mechanism (not zookeeper). High-level Java
> > consumers can choose to store offsets in ZooKeeper instead by setting
> > offsets.storage=zookeeper
> >
> > However, if you are using the simple consumer and wish to store
> > offsets in ZooKeeper you will need to commit to ZooKeeper directly.
> > You can use ZkUtils in the kafka.utils package for this.
> >
> > If you wish to move to Kafka-based offsets we will be adding a new
> > OffsetsClient that can be used to commit/fetch offsets to/from Kafka.
> > This is currently not listed as a blocker for 0.8.2 but I think we
> > should include it. I will update that ticket.
> >
> > > 4) Trying to interpret the existing examples in [6] and the comments on
> > [7]
> > > - the version of the Offset Management API that exists in 0.8.1.1 is
> > using
> > > ZooKeeper - whereas ZooKeeper will be optional in 0.8.2 - to be replaced
> > by
> > > Kafka, and phased out if possible. To my understanding, the switch
> > between
> > > the two will be controlled by the broker configuration (along with other
> > > parameters that control the performance of offset queues. Is that
> > correct?
> >
> > The switch is a client-side configuration. That wiki is not
> > up-to-date. The most current documentation is available as a patch in
> > https://issues.apache.org/jira/browse/KAFKA-1729
> >
> > > 5) Also, wondering about the timeline o

Re: Consumer and offset management support in 0.8.2 and 0.9

2014-11-18 Thread Marius Bogoevici
Hi Joel,

Thanks for all the clarifications!  Just another question on this: will
0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8?
Generally speaking, would there be any concerns with using the 0.8.2
consumer with a 0.8.1 broker, for instance?

Marius

On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy  wrote:

> Inline..
>
> On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
> > Hello everyone,
> >
> > I have a few questions about the current status and future of the Kafka
> > consumers.
> >
> > We have been working to adding Kafka support in Spring XD [1], currently
> > using the high level consumer via Spring Integration Kafka [2]. We are
> > working on adding features such as:
> > - the ability to control offsets/replay topics;
> > - the ability to control partition allocation across multiple consumers;
> >
> > We are currently at version 0.8.1.1, so using the simple consumer is a
> > pretty straightforward choice right now. However, in the light of the
> > upcoming consumer changes for 0.8.2 and 0.9, I have a few questions:
> >
> > 1) With respect to the consumer redesign for 0.9, what is the future of
> the
> > Simple Consumer and High Level Consumer? To my best understanding, the
> > existing high level consumer API will be deprecated in favour of the new
> > consumer API. What is the future of the Simple Consumer, in this case? it
> > will continue to exist as a low-level API implementing the Kafka protocol
> > [3] and providing the building blocks for the new consumer, or will it be
> > deprecated as well?
>
> The new consumer will subsume both use-cases (simple and high-level).
> You can still use the old SimpleConsumer if you wish - i.e., the wire
> protocol for fetch and other requests will still be supported.
>
> >
> > 2) Regarding the new consumer: the v0.8.2 codebase contains an early
> > implementation of it, but since this a feature scheduled only for 0.9,
> what
> > is its status as well? Is it included only as a future reference and for
> > stabilizing the API?
>
> It is a WIP so you cannot really use it.
>
> > 3) Obviously, offset management is a concern if using the simple
> consumer,
> > so - wondering about the Offset Management API as well. The Kafka
> protocol
> > document specifically indicates that it will be fully functional in 0.8.2
> > [4] - however, a functional implementation is already available in
> 0.8.1.1
> > (accessible via the SimpleConsumer API but not documented in [5]). Again,
> > trying to understand the extent of what 0.8.1.1 already supports
> > (ostensibly, the offset manager support seems to have been added only in
> > 0.8.2 - please correct me if I am wrong), and whether if it is
> recommended
> > for use in production in any form (with the caveats that accompany the
> use
> > of ZooKeeper).
>
> In 0.8.2 the OffsetCommitRequest and OffsetFetchRequest will use Kafka
> as the offsets storage mechanism (not zookeeper). High-level Java
> consumers can choose to store offsets in ZooKeeper instead by setting
> offsets.storage=zookeeper
>
> However, if you are using the simple consumer and wish to store
> offsets in ZooKeeper you will need to commit to ZooKeeper directly.
> You can use ZkUtils in the kafka.utils package for this.
>
> If you wish to move to Kafka-based offsets we will be adding a new
> OffsetsClient that can be used to commit/fetch offsets to/from Kafka.
> This is currently not listed as a blocker for 0.8.2 but I think we
> should include it. I will update that ticket.
>
> > 4) Trying to interpret the existing examples in [6] and the comments on
> [7]
> > - the version of the Offset Management API that exists in 0.8.1.1 is
> using
> > ZooKeeper - whereas ZooKeeper will be optional in 0.8.2 - to be replaced
> by
> > Kafka, and phased out if possible. To my understanding, the switch
> between
> > the two will be controlled by the broker configuration (along with other
> > parameters that control the performance of offset queues. Is that
> correct?
>
> The switch is a client-side configuration. That wiki is not
> up-to-date. The most current documentation is available as a patch in
> https://issues.apache.org/jira/browse/KAFKA-1729
>
> > 5) Also, wondering about the timeline of 0.8.2 - according to the
> roadmaps
> > it should be released relatively shortly. Is that correct?
>
> Yes - once the blockers are ironed out.
>
> >
> > Thanks,
> > Marius
> >
> > [1] http://projects.spring.io/spring-xd/
> > [2] https://github.com/spring-projects/spring-integration-kafka
> > [3]
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> > [4]
> >
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
> > [5] http://kafka.apache.org/082/documentation.html#simpleconsumerapi
> > [6]
> >
> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
> > [7] https://issues.apache.org/jira/browse/KAFKA

Re: Consumer and offset management support in 0.8.2 and 0.9

2014-11-18 Thread Joel Koshy
Inline..

On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
> Hello everyone,
> 
> I have a few questions about the current status and future of the Kafka
> consumers.
> 
> We have been working to adding Kafka support in Spring XD [1], currently
> using the high level consumer via Spring Integration Kafka [2]. We are
> working on adding features such as:
> - the ability to control offsets/replay topics;
> - the ability to control partition allocation across multiple consumers;
> 
> We are currently at version 0.8.1.1, so using the simple consumer is a
> pretty straightforward choice right now. However, in the light of the
> upcoming consumer changes for 0.8.2 and 0.9, I have a few questions:
> 
> 1) With respect to the consumer redesign for 0.9, what is the future of the
> Simple Consumer and High Level Consumer? To my best understanding, the
> existing high level consumer API will be deprecated in favour of the new
> consumer API. What is the future of the Simple Consumer, in this case? it
> will continue to exist as a low-level API implementing the Kafka protocol
> [3] and providing the building blocks for the new consumer, or will it be
> deprecated as well?

The new consumer will subsume both use-cases (simple and high-level).
You can still use the old SimpleConsumer if you wish - i.e., the wire
protocol for fetch and other requests will still be supported.

> 
> 2) Regarding the new consumer: the v0.8.2 codebase contains an early
> implementation of it, but since this a feature scheduled only for 0.9, what
> is its status as well? Is it included only as a future reference and for
> stabilizing the API?

It is a WIP so you cannot really use it.

> 3) Obviously, offset management is a concern if using the simple consumer,
> so - wondering about the Offset Management API as well. The Kafka protocol
> document specifically indicates that it will be fully functional in 0.8.2
> [4] - however, a functional implementation is already available in 0.8.1.1
> (accessible via the SimpleConsumer API but not documented in [5]). Again,
> trying to understand the extent of what 0.8.1.1 already supports
> (ostensibly, the offset manager support seems to have been added only in
> 0.8.2 - please correct me if I am wrong), and whether if it is recommended
> for use in production in any form (with the caveats that accompany the use
> of ZooKeeper).

In 0.8.2 the OffsetCommitRequest and OffsetFetchRequest will use Kafka
as the offsets storage mechanism (not zookeeper). High-level Java
consumers can choose to store offsets in ZooKeeper instead by setting
offsets.storage=zookeeper

However, if you are using the simple consumer and wish to store
offsets in ZooKeeper you will need to commit to ZooKeeper directly.
You can use ZkUtils in the kafka.utils package for this.

If you wish to move to Kafka-based offsets we will be adding a new
OffsetsClient that can be used to commit/fetch offsets to/from Kafka.
This is currently not listed as a blocker for 0.8.2 but I think we
should include it. I will update that ticket.

> 4) Trying to interpret the existing examples in [6] and the comments on [7]
> - the version of the Offset Management API that exists in 0.8.1.1 is using
> ZooKeeper - whereas ZooKeeper will be optional in 0.8.2 - to be replaced by
> Kafka, and phased out if possible. To my understanding, the switch between
> the two will be controlled by the broker configuration (along with other
> parameters that control the performance of offset queues. Is that correct?

The switch is a client-side configuration. That wiki is not
up-to-date. The most current documentation is available as a patch in
https://issues.apache.org/jira/browse/KAFKA-1729

> 5) Also, wondering about the timeline of 0.8.2 - according to the roadmaps
> it should be released relatively shortly. Is that correct?

Yes - once the blockers are ironed out.

> 
> Thanks,
> Marius
> 
> [1] http://projects.spring.io/spring-xd/
> [2] https://github.com/spring-projects/spring-integration-kafka
> [3]
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
> [4]
> https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
> [5] http://kafka.apache.org/082/documentation.html#simpleconsumerapi
> [6]
> https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
> [7] https://issues.apache.org/jira/browse/KAFKA-1729



Consumer and offset management support in 0.8.2 and 0.9

2014-11-18 Thread Marius Bogoevici
Hello everyone,

I have a few questions about the current status and future of the Kafka
consumers.

We have been working to adding Kafka support in Spring XD [1], currently
using the high level consumer via Spring Integration Kafka [2]. We are
working on adding features such as:
- the ability to control offsets/replay topics;
- the ability to control partition allocation across multiple consumers;

We are currently at version 0.8.1.1, so using the simple consumer is a
pretty straightforward choice right now. However, in the light of the
upcoming consumer changes for 0.8.2 and 0.9, I have a few questions:

1) With respect to the consumer redesign for 0.9, what is the future of the
Simple Consumer and High Level Consumer? To my best understanding, the
existing high level consumer API will be deprecated in favour of the new
consumer API. What is the future of the Simple Consumer, in this case? it
will continue to exist as a low-level API implementing the Kafka protocol
[3] and providing the building blocks for the new consumer, or will it be
deprecated as well?

2) Regarding the new consumer: the v0.8.2 codebase contains an early
implementation of it, but since this a feature scheduled only for 0.9, what
is its status as well? Is it included only as a future reference and for
stabilizing the API?

3) Obviously, offset management is a concern if using the simple consumer,
so - wondering about the Offset Management API as well. The Kafka protocol
document specifically indicates that it will be fully functional in 0.8.2
[4] - however, a functional implementation is already available in 0.8.1.1
(accessible via the SimpleConsumer API but not documented in [5]). Again,
trying to understand the extent of what 0.8.1.1 already supports
(ostensibly, the offset manager support seems to have been added only in
0.8.2 - please correct me if I am wrong), and whether if it is recommended
for use in production in any form (with the caveats that accompany the use
of ZooKeeper).

4) Trying to interpret the existing examples in [6] and the comments on [7]
- the version of the Offset Management API that exists in 0.8.1.1 is using
ZooKeeper - whereas ZooKeeper will be optional in 0.8.2 - to be replaced by
Kafka, and phased out if possible. To my understanding, the switch between
the two will be controlled by the broker configuration (along with other
parameters that control the performance of offset queues. Is that correct?

5) Also, wondering about the timeline of 0.8.2 - according to the roadmaps
it should be released relatively shortly. Is that correct?

Thanks,
Marius

[1] http://projects.spring.io/spring-xd/
[2] https://github.com/spring-projects/spring-integration-kafka
[3]
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
[4]
https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
[5] http://kafka.apache.org/082/documentation.html#simpleconsumerapi
[6]
https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
[7] https://issues.apache.org/jira/browse/KAFKA-1729