Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-12 Thread Joel Koshy
Catching up on email - yes the wire protocol change was a big mistake
and +1 on this approach. I'm reviewing KAFKA-1841 right now.

On Thu, Jan 08, 2015 at 10:07:44PM -0800, Dana Powers wrote:
 This approach makes sense to me -- thanks, Jun.
 
 -Dana
 
 On Thu, Jan 8, 2015 at 3:57 PM, Jun Rao j...@confluent.io wrote:
 
  I took a stab at fixing this issue in KAFKA-1841.
 
  This is actually a bit tricky to fix. In addition to the incompatible wire
  protocol change of version 0 of OffsetCommitRequest, there is also a
  functionality change. Specifically, in 0.8.1, version 0 of
  OffsetCommitRequest writes to ZK and version 0 of OffsetFetchRequest reads
  from ZK. In 0.8.2/trunk, this is changed to version 0 and version 1 of
  OffsetCommitRequest writes to Kafka and version 0 of OffsetFetchRequest
  reads from Kafka. To make this really backward compatible with 0.8.1, we
  have to
  (1) revert the wire protocol change of version 0 of OffsetCommitRequest.
  (2) change the behavior of version 0 of OffsetCommitRequest and
  OffsetFetchRequest to read from ZK
  (3) create version 1 of OffsetFetchRequest that reads from Kafka (so that
  it behaves consistently with version 1 of OffsetCommitRequest)
 
  That's what the patch in KAFKA-1841 does. This works as long as people are
  only using released final version. However, since this introduces an
  incompatible (functional) change of OffsetFetchRequest in 0.8.2-beta and
  trunk, this will create problems for people (assuming that they are using
  this api) who have a deployment of 0.8.2-beta and want to upgrade to 0.8.2
  final, or a deployment from trunk and want to upgrade to a later version of
  trunk in the future. In either case, the upgrade of the broker will cause
  the old client to behave differently and incorrectly. The only choice there
  is to stop the client and the broker and upgrade them together. Most people
  will probably only deploy final released version in production. However, I
  want to bring this up to see if anyone has concerns on this.
 
  Thanks,
 
  Jun
 
  On Wed, Jan 7, 2015 at 10:32 AM, Jay Kreps jay.kr...@gmail.com wrote:
 
   Yes, I think we are saying the same thing. Basically I am saying version
  0
   should be considered frozen as of the format and behavior in the 0.8.1
   release and we can do whatever we want as a version 1+.
  
   -Jay
  
   On Wed, Jan 7, 2015 at 10:10 AM, Joe Stein joe.st...@stealth.ly wrote:
  
 We need to take the versioning of the protocol seriously
   
amen
   
 People are definitely using the offset commit functionality in 0.8.1
   
agreed
   
 I really think we should treat this as a bug and revert the change
  to
version 0.
   
What do you mean exactly by revert? Why can't we use version as a
  feature
flag and support 0 and 1 at the same time? in the handleOffsetFetch and
handleOffsetCommit functions that process the request messages just do
  if
version == 0 old functionality else if version == 1 new functionality.
This way everyone works and nothing breaks  =8^)
   
/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
/
   
On Wed, Jan 7, 2015 at 1:04 PM, Jay Kreps j...@confluent.io wrote:
   
 Hey guys,

 We need to take the versioning of the protocol seriously. People are
 definitely using the offset commit functionality in 0.8.1 and I
  really
 think we should treat this as a bug and revert the change to version
  0.

 -Jay

 On Wed, Jan 7, 2015 at 9:24 AM, Jun Rao j...@confluent.io wrote:

  Yes, we did make an incompatible change in OffsetCommitRequest in
0.8.2,
  which is a mistake. The incompatible change was introduced in
KAFKA-1012
 in
  Mar, 2014 when we added the kafka-based offset management support.
 However,
  we didn't realize that this breaks the wire protocol until much
   later.
 Now,
  the wire protocol has evolved again and it's a bit hard to fix the
format
  in version 0. I can see a couple of options.
 
  Option 1: Just accept the incompatible change as it is.
  The argument is that even though we introduced OffsetCommitRequest
  in
  0.8.1, it's not used in the high level consumer. It's possible that
some
  users of SimpleConsumer started using it. However, that number is
likely
  small. Also, the functionality of OffsetCommitRequest has changed
   since
  it's writing the offset to a Kafka log, instead of ZK (for good
reasons).
  So, we can document this as a wire protocol and functionality
 incompatible
  change. For users who don't mind the functionality change, they
  will
need
  to upgrade the client to the new protocol before they can use the
  new

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-07 Thread Jay Kreps
Hey guys,

We need to take the versioning of the protocol seriously. People are
definitely using the offset commit functionality in 0.8.1 and I really
think we should treat this as a bug and revert the change to version 0.

-Jay

On Wed, Jan 7, 2015 at 9:24 AM, Jun Rao j...@confluent.io wrote:

 Yes, we did make an incompatible change in OffsetCommitRequest in 0.8.2,
 which is a mistake. The incompatible change was introduced in KAFKA-1012 in
 Mar, 2014 when we added the kafka-based offset management support. However,
 we didn't realize that this breaks the wire protocol until much later. Now,
 the wire protocol has evolved again and it's a bit hard to fix the format
 in version 0. I can see a couple of options.

 Option 1: Just accept the incompatible change as it is.
 The argument is that even though we introduced OffsetCommitRequest in
 0.8.1, it's not used in the high level consumer. It's possible that some
 users of SimpleConsumer started using it. However, that number is likely
 small. Also, the functionality of OffsetCommitRequest has changed since
 it's writing the offset to a Kafka log, instead of ZK (for good reasons).
 So, we can document this as a wire protocol and functionality incompatible
 change. For users who don't mind the functionality change, they will need
 to upgrade the client to the new protocol before they can use the new
 broker. For users who want to preserve the old functionality, they will
 have to write the offsets directly to ZK. In either case, hopefully the
 number of people being affected is small.

 Option 2: Revert version 0 format to what's in 0.8.1.
 There will be a few issues here. First, it's not clear how this affects
 other people who have been deploying from trunk. Second, I am not sure that
 we want to continue supporting writing the offset to ZK in
 OffsetCommitRequest
 since that can cause ZK to be overloaded.

 Joel Koshy,

 Any thoughts on this?

 Thanks,

 Jun

 On Mon, Jan 5, 2015 at 11:39 PM, Joe Stein joe.st...@stealth.ly wrote:

  In addition to the issue you bring up, the functionality as a whole has
  changed.. when you call OffsetFetchRequest the version = 0 needs to
  preserve the old functionality
 
 
 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L678-L700
  and version = 1 the new
 
 
 https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L153-L223
  .
  Also the OffsetFetchRequest functionality even though the wire protocol
 is
  the same after the 0.8.2 upgrade for OffsetFetchRequest if you were using
  0.8.1.1 OffsetFetchRequest
 
 
 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L705-L728
  will stop going to zookeeper and start going to Kafka storage
 
 
 https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L504-L519
  so more errors will happen and things break too.
 
  I think we should treat the version field not just to stop from breaking
  the wire protocol calls but also as a feature flag preserving upgrades
  and multiple pathways.
 
  I updated the JIRA for the feature flag needs for OffsetFetch and
  OffsetCommit too.
 
  /***
   Joe Stein
   Founder, Principal Consultant
   Big Data Open Source Security LLC
   http://www.stealth.ly
   Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
  /
 
  On Mon, Jan 5, 2015 at 3:21 PM, Dana Powers dana.pow...@rd.io wrote:
 
   ok, opened KAFKA-1841 .  KAFKA-1634 also related.
  
   -Dana
  
   On Mon, Jan 5, 2015 at 10:55 AM, Gwen Shapira gshap...@cloudera.com
   wrote:
  
Ooh, I see what you mean - the OffsetAndMetadata (or PartitionData)
part of the Map changed, which will modify the wire protocol.
   
This is actually not handled in the Java client either. It will send
the timestamp no matter which version is used.
   
This looks like a bug and I'd even mark it as blocker for 0.8.2 since
it may prevent rolling upgrades.
   
Are you opening the JIRA?
   
Gwen
   
On Mon, Jan 5, 2015 at 10:28 AM, Dana Powers dana.pow...@rd.io
  wrote:
 specifically comparing 0.8.1 --


   
  
 
 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L37-L50
 ```
 (1 to partitionCount).map(_ = {
   val partitionId = buffer.getInt
   val offset = buffer.getLong
   val metadata = readShortString(buffer)
   (TopicAndPartition(topic, partitionId),
   OffsetMetadataAndError(offset,
 metadata))
 })
 ```

 totrunk --


   
  
 
 https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L44-L69
 ```
 (1 to partitionCount).map(_ = {
   val partitionId = buffer.getInt
   val offset = buffer.getLong
   val timestamp = {
 val given = buffer.getLong

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-07 Thread Joe Stein
 We need to take the versioning of the protocol seriously

amen

 People are definitely using the offset commit functionality in 0.8.1

agreed

 I really think we should treat this as a bug and revert the change to
version 0.

What do you mean exactly by revert? Why can't we use version as a feature
flag and support 0 and 1 at the same time? in the handleOffsetFetch and
handleOffsetCommit functions that process the request messages just do if
version == 0 old functionality else if version == 1 new functionality.
This way everyone works and nothing breaks  =8^)

/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
/

On Wed, Jan 7, 2015 at 1:04 PM, Jay Kreps j...@confluent.io wrote:

 Hey guys,

 We need to take the versioning of the protocol seriously. People are
 definitely using the offset commit functionality in 0.8.1 and I really
 think we should treat this as a bug and revert the change to version 0.

 -Jay

 On Wed, Jan 7, 2015 at 9:24 AM, Jun Rao j...@confluent.io wrote:

  Yes, we did make an incompatible change in OffsetCommitRequest in 0.8.2,
  which is a mistake. The incompatible change was introduced in KAFKA-1012
 in
  Mar, 2014 when we added the kafka-based offset management support.
 However,
  we didn't realize that this breaks the wire protocol until much later.
 Now,
  the wire protocol has evolved again and it's a bit hard to fix the format
  in version 0. I can see a couple of options.
 
  Option 1: Just accept the incompatible change as it is.
  The argument is that even though we introduced OffsetCommitRequest in
  0.8.1, it's not used in the high level consumer. It's possible that some
  users of SimpleConsumer started using it. However, that number is likely
  small. Also, the functionality of OffsetCommitRequest has changed since
  it's writing the offset to a Kafka log, instead of ZK (for good reasons).
  So, we can document this as a wire protocol and functionality
 incompatible
  change. For users who don't mind the functionality change, they will need
  to upgrade the client to the new protocol before they can use the new
  broker. For users who want to preserve the old functionality, they will
  have to write the offsets directly to ZK. In either case, hopefully the
  number of people being affected is small.
 
  Option 2: Revert version 0 format to what's in 0.8.1.
  There will be a few issues here. First, it's not clear how this affects
  other people who have been deploying from trunk. Second, I am not sure
 that
  we want to continue supporting writing the offset to ZK in
  OffsetCommitRequest
  since that can cause ZK to be overloaded.
 
  Joel Koshy,
 
  Any thoughts on this?
 
  Thanks,
 
  Jun
 
  On Mon, Jan 5, 2015 at 11:39 PM, Joe Stein joe.st...@stealth.ly wrote:
 
   In addition to the issue you bring up, the functionality as a whole has
   changed.. when you call OffsetFetchRequest the version = 0 needs to
   preserve the old functionality
  
  
 
 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L678-L700
   and version = 1 the new
  
  
 
 https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L153-L223
   .
   Also the OffsetFetchRequest functionality even though the wire protocol
  is
   the same after the 0.8.2 upgrade for OffsetFetchRequest if you were
 using
   0.8.1.1 OffsetFetchRequest
  
  
 
 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L705-L728
   will stop going to zookeeper and start going to Kafka storage
  
  
 
 https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L504-L519
   so more errors will happen and things break too.
  
   I think we should treat the version field not just to stop from
 breaking
   the wire protocol calls but also as a feature flag preserving
 upgrades
   and multiple pathways.
  
   I updated the JIRA for the feature flag needs for OffsetFetch and
   OffsetCommit too.
  
   /***
Joe Stein
Founder, Principal Consultant
Big Data Open Source Security LLC
http://www.stealth.ly
Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
   /
  
   On Mon, Jan 5, 2015 at 3:21 PM, Dana Powers dana.pow...@rd.io wrote:
  
ok, opened KAFKA-1841 .  KAFKA-1634 also related.
   
-Dana
   
On Mon, Jan 5, 2015 at 10:55 AM, Gwen Shapira gshap...@cloudera.com
 
wrote:
   
 Ooh, I see what you mean - the OffsetAndMetadata (or PartitionData)
 part of the Map changed, which will modify the wire protocol.

 This is actually not handled in the Java client either. It will
 send
 the timestamp no matter which version is used.

 

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-07 Thread Jay Kreps
Yes, I think we are saying the same thing. Basically I am saying version 0
should be considered frozen as of the format and behavior in the 0.8.1
release and we can do whatever we want as a version 1+.

-Jay

On Wed, Jan 7, 2015 at 10:10 AM, Joe Stein joe.st...@stealth.ly wrote:

  We need to take the versioning of the protocol seriously

 amen

  People are definitely using the offset commit functionality in 0.8.1

 agreed

  I really think we should treat this as a bug and revert the change to
 version 0.

 What do you mean exactly by revert? Why can't we use version as a feature
 flag and support 0 and 1 at the same time? in the handleOffsetFetch and
 handleOffsetCommit functions that process the request messages just do if
 version == 0 old functionality else if version == 1 new functionality.
 This way everyone works and nothing breaks  =8^)

 /***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
 /

 On Wed, Jan 7, 2015 at 1:04 PM, Jay Kreps j...@confluent.io wrote:

  Hey guys,
 
  We need to take the versioning of the protocol seriously. People are
  definitely using the offset commit functionality in 0.8.1 and I really
  think we should treat this as a bug and revert the change to version 0.
 
  -Jay
 
  On Wed, Jan 7, 2015 at 9:24 AM, Jun Rao j...@confluent.io wrote:
 
   Yes, we did make an incompatible change in OffsetCommitRequest in
 0.8.2,
   which is a mistake. The incompatible change was introduced in
 KAFKA-1012
  in
   Mar, 2014 when we added the kafka-based offset management support.
  However,
   we didn't realize that this breaks the wire protocol until much later.
  Now,
   the wire protocol has evolved again and it's a bit hard to fix the
 format
   in version 0. I can see a couple of options.
  
   Option 1: Just accept the incompatible change as it is.
   The argument is that even though we introduced OffsetCommitRequest in
   0.8.1, it's not used in the high level consumer. It's possible that
 some
   users of SimpleConsumer started using it. However, that number is
 likely
   small. Also, the functionality of OffsetCommitRequest has changed since
   it's writing the offset to a Kafka log, instead of ZK (for good
 reasons).
   So, we can document this as a wire protocol and functionality
  incompatible
   change. For users who don't mind the functionality change, they will
 need
   to upgrade the client to the new protocol before they can use the new
   broker. For users who want to preserve the old functionality, they will
   have to write the offsets directly to ZK. In either case, hopefully the
   number of people being affected is small.
  
   Option 2: Revert version 0 format to what's in 0.8.1.
   There will be a few issues here. First, it's not clear how this affects
   other people who have been deploying from trunk. Second, I am not sure
  that
   we want to continue supporting writing the offset to ZK in
   OffsetCommitRequest
   since that can cause ZK to be overloaded.
  
   Joel Koshy,
  
   Any thoughts on this?
  
   Thanks,
  
   Jun
  
   On Mon, Jan 5, 2015 at 11:39 PM, Joe Stein joe.st...@stealth.ly
 wrote:
  
In addition to the issue you bring up, the functionality as a whole
 has
changed.. when you call OffsetFetchRequest the version = 0 needs to
preserve the old functionality
   
   
  
 
 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L678-L700
and version = 1 the new
   
   
  
 
 https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L153-L223
.
Also the OffsetFetchRequest functionality even though the wire
 protocol
   is
the same after the 0.8.2 upgrade for OffsetFetchRequest if you were
  using
0.8.1.1 OffsetFetchRequest
   
   
  
 
 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L705-L728
will stop going to zookeeper and start going to Kafka storage
   
   
  
 
 https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L504-L519
so more errors will happen and things break too.
   
I think we should treat the version field not just to stop from
  breaking
the wire protocol calls but also as a feature flag preserving
  upgrades
and multiple pathways.
   
I updated the JIRA for the feature flag needs for OffsetFetch and
OffsetCommit too.
   
/***
 Joe Stein
 Founder, Principal Consultant
 Big Data Open Source Security LLC
 http://www.stealth.ly
 Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
/
   
On Mon, Jan 5, 2015 at 3:21 PM, Dana Powers dana.pow...@rd.io
 wrote:
   
 ok, opened KAFKA-1841 .  

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-07 Thread Jun Rao
Yes, we did make an incompatible change in OffsetCommitRequest in 0.8.2,
which is a mistake. The incompatible change was introduced in KAFKA-1012 in
Mar, 2014 when we added the kafka-based offset management support. However,
we didn't realize that this breaks the wire protocol until much later. Now,
the wire protocol has evolved again and it's a bit hard to fix the format
in version 0. I can see a couple of options.

Option 1: Just accept the incompatible change as it is.
The argument is that even though we introduced OffsetCommitRequest in
0.8.1, it's not used in the high level consumer. It's possible that some
users of SimpleConsumer started using it. However, that number is likely
small. Also, the functionality of OffsetCommitRequest has changed since
it's writing the offset to a Kafka log, instead of ZK (for good reasons).
So, we can document this as a wire protocol and functionality incompatible
change. For users who don't mind the functionality change, they will need
to upgrade the client to the new protocol before they can use the new
broker. For users who want to preserve the old functionality, they will
have to write the offsets directly to ZK. In either case, hopefully the
number of people being affected is small.

Option 2: Revert version 0 format to what's in 0.8.1.
There will be a few issues here. First, it's not clear how this affects
other people who have been deploying from trunk. Second, I am not sure that
we want to continue supporting writing the offset to ZK in OffsetCommitRequest
since that can cause ZK to be overloaded.

Joel Koshy,

Any thoughts on this?

Thanks,

Jun

On Mon, Jan 5, 2015 at 11:39 PM, Joe Stein joe.st...@stealth.ly wrote:

 In addition to the issue you bring up, the functionality as a whole has
 changed.. when you call OffsetFetchRequest the version = 0 needs to
 preserve the old functionality

 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L678-L700
 and version = 1 the new

 https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L153-L223
 .
 Also the OffsetFetchRequest functionality even though the wire protocol is
 the same after the 0.8.2 upgrade for OffsetFetchRequest if you were using
 0.8.1.1 OffsetFetchRequest

 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/server/KafkaApis.scala#L705-L728
 will stop going to zookeeper and start going to Kafka storage

 https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/server/KafkaApis.scala#L504-L519
 so more errors will happen and things break too.

 I think we should treat the version field not just to stop from breaking
 the wire protocol calls but also as a feature flag preserving upgrades
 and multiple pathways.

 I updated the JIRA for the feature flag needs for OffsetFetch and
 OffsetCommit too.

 /***
  Joe Stein
  Founder, Principal Consultant
  Big Data Open Source Security LLC
  http://www.stealth.ly
  Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop
 /

 On Mon, Jan 5, 2015 at 3:21 PM, Dana Powers dana.pow...@rd.io wrote:

  ok, opened KAFKA-1841 .  KAFKA-1634 also related.
 
  -Dana
 
  On Mon, Jan 5, 2015 at 10:55 AM, Gwen Shapira gshap...@cloudera.com
  wrote:
 
   Ooh, I see what you mean - the OffsetAndMetadata (or PartitionData)
   part of the Map changed, which will modify the wire protocol.
  
   This is actually not handled in the Java client either. It will send
   the timestamp no matter which version is used.
  
   This looks like a bug and I'd even mark it as blocker for 0.8.2 since
   it may prevent rolling upgrades.
  
   Are you opening the JIRA?
  
   Gwen
  
   On Mon, Jan 5, 2015 at 10:28 AM, Dana Powers dana.pow...@rd.io
 wrote:
specifically comparing 0.8.1 --
   
   
  
 
 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L37-L50
```
(1 to partitionCount).map(_ = {
  val partitionId = buffer.getInt
  val offset = buffer.getLong
  val metadata = readShortString(buffer)
  (TopicAndPartition(topic, partitionId),
  OffsetMetadataAndError(offset,
metadata))
})
```
   
totrunk --
   
   
  
 
 https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L44-L69
```
(1 to partitionCount).map(_ = {
  val partitionId = buffer.getInt
  val offset = buffer.getLong
  val timestamp = {
val given = buffer.getLong
if (given == -1L) now else given
  }
  val metadata = readShortString(buffer)
  (TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset,
metadata, timestamp))
})
```
   
should the `timestamp` buffer read be wrapped in an api version
 check?
   
   
Dana Powers
Rdio, Inc.
dana.pow...@rd.io
rdio.com/people/dpkp/
   
On Mon, Jan 5, 2015 at 9:49 AM, Gwen 

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-05 Thread Gwen Shapira
OffsetCommitRequest has two constructors now:

For version 0:
 OffsetCommitRequest(String groupId, MapTopicPartition,
PartitionData offsetData)

And version 1:
OffsetCommitRequest(String groupId, int generationId, String
consumerId, MapTopicPartition, PartitionData offsetData)

None of them seem to require timestamps... so I'm not sure where you
see that this is required. Can you share an example?

Gwen

On Sun, Jan 4, 2015 at 11:15 PM, Dana Powers dana.pow...@rd.io wrote:
 Hi Joel,

 I'm looking more closely at the OffsetCommitRequest wire protocol change
 you mentioned below, and I cannot figure out how to explicitly construct a
 request with the earlier version.  Should the api version be different for
 requests that do not include it and/or servers that do not support the
 timestamp field?  It looks like 0.8.1.1 did not include the timestamp field
 and used api version 0.  But 0.8.2-beta seems to now require timestamps
 even when I explicitly encode OffsetCommitRequest api version 0 (server
 logs a BufferUnderflowException).

 Is this the expected server behavior?  Can you provide any tips on how
 third-party clients should manage the wire-protocol change for this api
 method (I'm working on kafka-python)?

 Thanks,

 -Dana

 On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy jjkosh...@gmail.com wrote:

 Yes it should be backwards compatible. So for e.g., you should be able
 to use an 0.8.1 client with an 0.8.2 broker. In general, you should
 not upgrade your clients until after the brokers have been upgraded.
 However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
 protocol change I'm aware of is the OffsetCommitRequest.  There is a
 change in the OffsetCommitRequest format (KAFKA-1634) although you can
 explicitly construct an OffsetCommitRequest with the earlier version.

 Thanks,

 Joel

 On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
  Hi Joel,
 
  Thanks for all the clarifications!  Just another question on this: will
  0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8?
  Generally speaking, would there be any concerns with using the 0.8.2
  consumer with a 0.8.1 broker, for instance?
 
  Marius
 
  On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy jjkosh...@gmail.com wrote:
 
   Inline..
  
   On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
Hello everyone,
   
I have a few questions about the current status and future of the
 Kafka
consumers.
   
We have been working to adding Kafka support in Spring XD [1],
 currently
using the high level consumer via Spring Integration Kafka [2]. We
 are
working on adding features such as:
- the ability to control offsets/replay topics;
- the ability to control partition allocation across multiple
 consumers;
   
We are currently at version 0.8.1.1, so using the simple consumer is
 a
pretty straightforward choice right now. However, in the light of the
upcoming consumer changes for 0.8.2 and 0.9, I have a few questions:
   
1) With respect to the consumer redesign for 0.9, what is the future
 of
   the
Simple Consumer and High Level Consumer? To my best understanding,
 the
existing high level consumer API will be deprecated in favour of the
 new
consumer API. What is the future of the Simple Consumer, in this
 case? it
will continue to exist as a low-level API implementing the Kafka
 protocol
[3] and providing the building blocks for the new consumer, or will
 it be
deprecated as well?
  
   The new consumer will subsume both use-cases (simple and high-level).
   You can still use the old SimpleConsumer if you wish - i.e., the wire
   protocol for fetch and other requests will still be supported.
  
   
2) Regarding the new consumer: the v0.8.2 codebase contains an early
implementation of it, but since this a feature scheduled only for
 0.9,
   what
is its status as well? Is it included only as a future reference and
 for
stabilizing the API?
  
   It is a WIP so you cannot really use it.
  
3) Obviously, offset management is a concern if using the simple
   consumer,
so - wondering about the Offset Management API as well. The Kafka
   protocol
document specifically indicates that it will be fully functional in
 0.8.2
[4] - however, a functional implementation is already available in
   0.8.1.1
(accessible via the SimpleConsumer API but not documented in [5]).
 Again,
trying to understand the extent of what 0.8.1.1 already supports
(ostensibly, the offset manager support seems to have been added
 only in
0.8.2 - please correct me if I am wrong), and whether if it is
   recommended
for use in production in any form (with the caveats that accompany
 the
   use
of ZooKeeper).
  
   In 0.8.2 the OffsetCommitRequest and OffsetFetchRequest will use Kafka
   as the offsets storage mechanism (not zookeeper). High-level Java
   consumers can choose to store offsets in ZooKeeper instead 

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-05 Thread Dana Powers
Hi Gwen, I am using/writing kafka-python to construct api requests and have
not dug too deeply into the server source code.  But I believe it is
kafka/api/OffsetCommitRequest.scala and specifically the readFrom method
used to decode the wire protocol.

-Dana
OffsetCommitRequest has two constructors now:

For version 0:
 OffsetCommitRequest(String groupId, MapTopicPartition,
PartitionData offsetData)

And version 1:
OffsetCommitRequest(String groupId, int generationId, String
consumerId, MapTopicPartition, PartitionData offsetData)

None of them seem to require timestamps... so I'm not sure where you
see that this is required. Can you share an example?

Gwen

On Sun, Jan 4, 2015 at 11:15 PM, Dana Powers dana.pow...@rd.io wrote:
 Hi Joel,

 I'm looking more closely at the OffsetCommitRequest wire protocol change
 you mentioned below, and I cannot figure out how to explicitly construct a
 request with the earlier version.  Should the api version be different for
 requests that do not include it and/or servers that do not support the
 timestamp field?  It looks like 0.8.1.1 did not include the timestamp
field
 and used api version 0.  But 0.8.2-beta seems to now require timestamps
 even when I explicitly encode OffsetCommitRequest api version 0 (server
 logs a BufferUnderflowException).

 Is this the expected server behavior?  Can you provide any tips on how
 third-party clients should manage the wire-protocol change for this api
 method (I'm working on kafka-python)?

 Thanks,

 -Dana

 On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy jjkosh...@gmail.com wrote:

 Yes it should be backwards compatible. So for e.g., you should be able
 to use an 0.8.1 client with an 0.8.2 broker. In general, you should
 not upgrade your clients until after the brokers have been upgraded.
 However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
 protocol change I'm aware of is the OffsetCommitRequest.  There is a
 change in the OffsetCommitRequest format (KAFKA-1634) although you can
 explicitly construct an OffsetCommitRequest with the earlier version.

 Thanks,

 Joel

 On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
  Hi Joel,
 
  Thanks for all the clarifications!  Just another question on this: will
  0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8?
  Generally speaking, would there be any concerns with using the 0.8.2
  consumer with a 0.8.1 broker, for instance?
 
  Marius
 
  On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy jjkosh...@gmail.com
wrote:
 
   Inline..
  
   On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
Hello everyone,
   
I have a few questions about the current status and future of the
 Kafka
consumers.
   
We have been working to adding Kafka support in Spring XD [1],
 currently
using the high level consumer via Spring Integration Kafka [2]. We
 are
working on adding features such as:
- the ability to control offsets/replay topics;
- the ability to control partition allocation across multiple
 consumers;
   
We are currently at version 0.8.1.1, so using the simple consumer
is
 a
pretty straightforward choice right now. However, in the light of
the
upcoming consumer changes for 0.8.2 and 0.9, I have a few
questions:
   
1) With respect to the consumer redesign for 0.9, what is the
future
 of
   the
Simple Consumer and High Level Consumer? To my best understanding,
 the
existing high level consumer API will be deprecated in favour of
the
 new
consumer API. What is the future of the Simple Consumer, in this
 case? it
will continue to exist as a low-level API implementing the Kafka
 protocol
[3] and providing the building blocks for the new consumer, or will
 it be
deprecated as well?
  
   The new consumer will subsume both use-cases (simple and high-level).
   You can still use the old SimpleConsumer if you wish - i.e., the wire
   protocol for fetch and other requests will still be supported.
  
   
2) Regarding the new consumer: the v0.8.2 codebase contains an
early
implementation of it, but since this a feature scheduled only for
 0.9,
   what
is its status as well? Is it included only as a future reference
and
 for
stabilizing the API?
  
   It is a WIP so you cannot really use it.
  
3) Obviously, offset management is a concern if using the simple
   consumer,
so - wondering about the Offset Management API as well. The Kafka
   protocol
document specifically indicates that it will be fully functional in
 0.8.2
[4] - however, a functional implementation is already available in
   0.8.1.1
(accessible via the SimpleConsumer API but not documented in [5]).
 Again,
trying to understand the extent of what 0.8.1.1 already supports
(ostensibly, the offset manager support seems to have been added
 only in
0.8.2 - please correct me if I am wrong), and whether if it is
   recommended
for use in production in any form (with the 

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-05 Thread Gwen Shapira
Ah, I see :)

The readFrom function basically tries to read two extra fields if you
are on version 1:

if (versionId == 1) {
  groupGenerationId = buffer.getInt
  consumerId = readShortString(buffer)
}

The rest looks identical in version 0 and 1, and still no timestamp in sight...

Gwen

On Mon, Jan 5, 2015 at 9:33 AM, Dana Powers dana.pow...@rd.io wrote:
 Hi Gwen, I am using/writing kafka-python to construct api requests and have
 not dug too deeply into the server source code.  But I believe it is
 kafka/api/OffsetCommitRequest.scala and specifically the readFrom method
 used to decode the wire protocol.

 -Dana
 OffsetCommitRequest has two constructors now:

 For version 0:
  OffsetCommitRequest(String groupId, MapTopicPartition,
 PartitionData offsetData)

 And version 1:
 OffsetCommitRequest(String groupId, int generationId, String
 consumerId, MapTopicPartition, PartitionData offsetData)

 None of them seem to require timestamps... so I'm not sure where you
 see that this is required. Can you share an example?

 Gwen

 On Sun, Jan 4, 2015 at 11:15 PM, Dana Powers dana.pow...@rd.io wrote:
 Hi Joel,

 I'm looking more closely at the OffsetCommitRequest wire protocol change
 you mentioned below, and I cannot figure out how to explicitly construct a
 request with the earlier version.  Should the api version be different for
 requests that do not include it and/or servers that do not support the
 timestamp field?  It looks like 0.8.1.1 did not include the timestamp
 field
 and used api version 0.  But 0.8.2-beta seems to now require timestamps
 even when I explicitly encode OffsetCommitRequest api version 0 (server
 logs a BufferUnderflowException).

 Is this the expected server behavior?  Can you provide any tips on how
 third-party clients should manage the wire-protocol change for this api
 method (I'm working on kafka-python)?

 Thanks,

 -Dana

 On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy jjkosh...@gmail.com wrote:

 Yes it should be backwards compatible. So for e.g., you should be able
 to use an 0.8.1 client with an 0.8.2 broker. In general, you should
 not upgrade your clients until after the brokers have been upgraded.
 However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
 protocol change I'm aware of is the OffsetCommitRequest.  There is a
 change in the OffsetCommitRequest format (KAFKA-1634) although you can
 explicitly construct an OffsetCommitRequest with the earlier version.

 Thanks,

 Joel

 On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
  Hi Joel,
 
  Thanks for all the clarifications!  Just another question on this: will
  0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8?
  Generally speaking, would there be any concerns with using the 0.8.2
  consumer with a 0.8.1 broker, for instance?
 
  Marius
 
  On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy jjkosh...@gmail.com
 wrote:
 
   Inline..
  
   On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
Hello everyone,
   
I have a few questions about the current status and future of the
 Kafka
consumers.
   
We have been working to adding Kafka support in Spring XD [1],
 currently
using the high level consumer via Spring Integration Kafka [2]. We
 are
working on adding features such as:
- the ability to control offsets/replay topics;
- the ability to control partition allocation across multiple
 consumers;
   
We are currently at version 0.8.1.1, so using the simple consumer
 is
 a
pretty straightforward choice right now. However, in the light of
 the
upcoming consumer changes for 0.8.2 and 0.9, I have a few
 questions:
   
1) With respect to the consumer redesign for 0.9, what is the
 future
 of
   the
Simple Consumer and High Level Consumer? To my best understanding,
 the
existing high level consumer API will be deprecated in favour of
 the
 new
consumer API. What is the future of the Simple Consumer, in this
 case? it
will continue to exist as a low-level API implementing the Kafka
 protocol
[3] and providing the building blocks for the new consumer, or will
 it be
deprecated as well?
  
   The new consumer will subsume both use-cases (simple and high-level).
   You can still use the old SimpleConsumer if you wish - i.e., the wire
   protocol for fetch and other requests will still be supported.
  
   
2) Regarding the new consumer: the v0.8.2 codebase contains an
 early
implementation of it, but since this a feature scheduled only for
 0.9,
   what
is its status as well? Is it included only as a future reference
 and
 for
stabilizing the API?
  
   It is a WIP so you cannot really use it.
  
3) Obviously, offset management is a concern if using the simple
   consumer,
so - wondering about the Offset Management API as well. The Kafka
   protocol
document specifically indicates that it will be fully functional in
 0.8.2
[4] - however, a functional implementation 

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-05 Thread Dana Powers
ok, opened KAFKA-1841 .  KAFKA-1634 also related.

-Dana

On Mon, Jan 5, 2015 at 10:55 AM, Gwen Shapira gshap...@cloudera.com wrote:

 Ooh, I see what you mean - the OffsetAndMetadata (or PartitionData)
 part of the Map changed, which will modify the wire protocol.

 This is actually not handled in the Java client either. It will send
 the timestamp no matter which version is used.

 This looks like a bug and I'd even mark it as blocker for 0.8.2 since
 it may prevent rolling upgrades.

 Are you opening the JIRA?

 Gwen

 On Mon, Jan 5, 2015 at 10:28 AM, Dana Powers dana.pow...@rd.io wrote:
  specifically comparing 0.8.1 --
 
 
 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L37-L50
  ```
  (1 to partitionCount).map(_ = {
val partitionId = buffer.getInt
val offset = buffer.getLong
val metadata = readShortString(buffer)
(TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset,
  metadata))
  })
  ```
 
  totrunk --
 
 
 https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L44-L69
  ```
  (1 to partitionCount).map(_ = {
val partitionId = buffer.getInt
val offset = buffer.getLong
val timestamp = {
  val given = buffer.getLong
  if (given == -1L) now else given
}
val metadata = readShortString(buffer)
(TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset,
  metadata, timestamp))
  })
  ```
 
  should the `timestamp` buffer read be wrapped in an api version check?
 
 
  Dana Powers
  Rdio, Inc.
  dana.pow...@rd.io
  rdio.com/people/dpkp/
 
  On Mon, Jan 5, 2015 at 9:49 AM, Gwen Shapira gshap...@cloudera.com
 wrote:
 
  Ah, I see :)
 
  The readFrom function basically tries to read two extra fields if you
  are on version 1:
 
  if (versionId == 1) {
groupGenerationId = buffer.getInt
consumerId = readShortString(buffer)
  }
 
  The rest looks identical in version 0 and 1, and still no timestamp in
  sight...
 
  Gwen
 
  On Mon, Jan 5, 2015 at 9:33 AM, Dana Powers dana.pow...@rd.io wrote:
   Hi Gwen, I am using/writing kafka-python to construct api requests and
  have
   not dug too deeply into the server source code.  But I believe it is
   kafka/api/OffsetCommitRequest.scala and specifically the readFrom
 method
   used to decode the wire protocol.
  
   -Dana
   OffsetCommitRequest has two constructors now:
  
   For version 0:
OffsetCommitRequest(String groupId, MapTopicPartition,
   PartitionData offsetData)
  
   And version 1:
   OffsetCommitRequest(String groupId, int generationId, String
   consumerId, MapTopicPartition, PartitionData offsetData)
  
   None of them seem to require timestamps... so I'm not sure where you
   see that this is required. Can you share an example?
  
   Gwen
  
   On Sun, Jan 4, 2015 at 11:15 PM, Dana Powers dana.pow...@rd.io
 wrote:
   Hi Joel,
  
   I'm looking more closely at the OffsetCommitRequest wire protocol
 change
   you mentioned below, and I cannot figure out how to explicitly
  construct a
   request with the earlier version.  Should the api version be
 different
  for
   requests that do not include it and/or servers that do not support
 the
   timestamp field?  It looks like 0.8.1.1 did not include the timestamp
   field
   and used api version 0.  But 0.8.2-beta seems to now require
 timestamps
   even when I explicitly encode OffsetCommitRequest api version 0
 (server
   logs a BufferUnderflowException).
  
   Is this the expected server behavior?  Can you provide any tips on
 how
   third-party clients should manage the wire-protocol change for this
 api
   method (I'm working on kafka-python)?
  
   Thanks,
  
   -Dana
  
   On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy jjkosh...@gmail.com
  wrote:
  
   Yes it should be backwards compatible. So for e.g., you should be
 able
   to use an 0.8.1 client with an 0.8.2 broker. In general, you should
   not upgrade your clients until after the brokers have been upgraded.
   However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
   protocol change I'm aware of is the OffsetCommitRequest.  There is a
   change in the OffsetCommitRequest format (KAFKA-1634) although you
 can
   explicitly construct an OffsetCommitRequest with the earlier
 version.
  
   Thanks,
  
   Joel
  
   On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
Hi Joel,
   
Thanks for all the clarifications!  Just another question on this:
  will
0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with
 0.8?
Generally speaking, would there be any concerns with using the
 0.8.2
consumer with a 0.8.1 broker, for instance?
   
Marius
   
On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy jjkosh...@gmail.com
   wrote:
   
 Inline..

 On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici
 wrote:
  Hello everyone,
 
  I have a few questions about the current status and future of
 

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-05 Thread Dana Powers
specifically comparing 0.8.1 --

https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L37-L50
```
(1 to partitionCount).map(_ = {
  val partitionId = buffer.getInt
  val offset = buffer.getLong
  val metadata = readShortString(buffer)
  (TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset,
metadata))
})
```

totrunk --

https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L44-L69
```
(1 to partitionCount).map(_ = {
  val partitionId = buffer.getInt
  val offset = buffer.getLong
  val timestamp = {
val given = buffer.getLong
if (given == -1L) now else given
  }
  val metadata = readShortString(buffer)
  (TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset,
metadata, timestamp))
})
```

should the `timestamp` buffer read be wrapped in an api version check?


Dana Powers
Rdio, Inc.
dana.pow...@rd.io
rdio.com/people/dpkp/

On Mon, Jan 5, 2015 at 9:49 AM, Gwen Shapira gshap...@cloudera.com wrote:

 Ah, I see :)

 The readFrom function basically tries to read two extra fields if you
 are on version 1:

 if (versionId == 1) {
   groupGenerationId = buffer.getInt
   consumerId = readShortString(buffer)
 }

 The rest looks identical in version 0 and 1, and still no timestamp in
 sight...

 Gwen

 On Mon, Jan 5, 2015 at 9:33 AM, Dana Powers dana.pow...@rd.io wrote:
  Hi Gwen, I am using/writing kafka-python to construct api requests and
 have
  not dug too deeply into the server source code.  But I believe it is
  kafka/api/OffsetCommitRequest.scala and specifically the readFrom method
  used to decode the wire protocol.
 
  -Dana
  OffsetCommitRequest has two constructors now:
 
  For version 0:
   OffsetCommitRequest(String groupId, MapTopicPartition,
  PartitionData offsetData)
 
  And version 1:
  OffsetCommitRequest(String groupId, int generationId, String
  consumerId, MapTopicPartition, PartitionData offsetData)
 
  None of them seem to require timestamps... so I'm not sure where you
  see that this is required. Can you share an example?
 
  Gwen
 
  On Sun, Jan 4, 2015 at 11:15 PM, Dana Powers dana.pow...@rd.io wrote:
  Hi Joel,
 
  I'm looking more closely at the OffsetCommitRequest wire protocol change
  you mentioned below, and I cannot figure out how to explicitly
 construct a
  request with the earlier version.  Should the api version be different
 for
  requests that do not include it and/or servers that do not support the
  timestamp field?  It looks like 0.8.1.1 did not include the timestamp
  field
  and used api version 0.  But 0.8.2-beta seems to now require timestamps
  even when I explicitly encode OffsetCommitRequest api version 0 (server
  logs a BufferUnderflowException).
 
  Is this the expected server behavior?  Can you provide any tips on how
  third-party clients should manage the wire-protocol change for this api
  method (I'm working on kafka-python)?
 
  Thanks,
 
  -Dana
 
  On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy jjkosh...@gmail.com
 wrote:
 
  Yes it should be backwards compatible. So for e.g., you should be able
  to use an 0.8.1 client with an 0.8.2 broker. In general, you should
  not upgrade your clients until after the brokers have been upgraded.
  However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
  protocol change I'm aware of is the OffsetCommitRequest.  There is a
  change in the OffsetCommitRequest format (KAFKA-1634) although you can
  explicitly construct an OffsetCommitRequest with the earlier version.
 
  Thanks,
 
  Joel
 
  On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
   Hi Joel,
  
   Thanks for all the clarifications!  Just another question on this:
 will
   0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8?
   Generally speaking, would there be any concerns with using the 0.8.2
   consumer with a 0.8.1 broker, for instance?
  
   Marius
  
   On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy jjkosh...@gmail.com
  wrote:
  
Inline..
   
On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
 Hello everyone,

 I have a few questions about the current status and future of the
  Kafka
 consumers.

 We have been working to adding Kafka support in Spring XD [1],
  currently
 using the high level consumer via Spring Integration Kafka [2].
 We
  are
 working on adding features such as:
 - the ability to control offsets/replay topics;
 - the ability to control partition allocation across multiple
  consumers;

 We are currently at version 0.8.1.1, so using the simple consumer
  is
  a
 pretty straightforward choice right now. However, in the light of
  the
 upcoming consumer changes for 0.8.2 and 0.9, I have a few
  questions:

 1) With respect to the consumer redesign for 0.9, what is the
  future
  of
the
 Simple Consumer and High Level Consumer? To my best
 understanding,
  the
 

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-05 Thread Gwen Shapira
Ooh, I see what you mean - the OffsetAndMetadata (or PartitionData)
part of the Map changed, which will modify the wire protocol.

This is actually not handled in the Java client either. It will send
the timestamp no matter which version is used.

This looks like a bug and I'd even mark it as blocker for 0.8.2 since
it may prevent rolling upgrades.

Are you opening the JIRA?

Gwen

On Mon, Jan 5, 2015 at 10:28 AM, Dana Powers dana.pow...@rd.io wrote:
 specifically comparing 0.8.1 --

 https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L37-L50
 ```
 (1 to partitionCount).map(_ = {
   val partitionId = buffer.getInt
   val offset = buffer.getLong
   val metadata = readShortString(buffer)
   (TopicAndPartition(topic, partitionId), OffsetMetadataAndError(offset,
 metadata))
 })
 ```

 totrunk --

 https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/api/OffsetCommitRequest.scala#L44-L69
 ```
 (1 to partitionCount).map(_ = {
   val partitionId = buffer.getInt
   val offset = buffer.getLong
   val timestamp = {
 val given = buffer.getLong
 if (given == -1L) now else given
   }
   val metadata = readShortString(buffer)
   (TopicAndPartition(topic, partitionId), OffsetAndMetadata(offset,
 metadata, timestamp))
 })
 ```

 should the `timestamp` buffer read be wrapped in an api version check?


 Dana Powers
 Rdio, Inc.
 dana.pow...@rd.io
 rdio.com/people/dpkp/

 On Mon, Jan 5, 2015 at 9:49 AM, Gwen Shapira gshap...@cloudera.com wrote:

 Ah, I see :)

 The readFrom function basically tries to read two extra fields if you
 are on version 1:

 if (versionId == 1) {
   groupGenerationId = buffer.getInt
   consumerId = readShortString(buffer)
 }

 The rest looks identical in version 0 and 1, and still no timestamp in
 sight...

 Gwen

 On Mon, Jan 5, 2015 at 9:33 AM, Dana Powers dana.pow...@rd.io wrote:
  Hi Gwen, I am using/writing kafka-python to construct api requests and
 have
  not dug too deeply into the server source code.  But I believe it is
  kafka/api/OffsetCommitRequest.scala and specifically the readFrom method
  used to decode the wire protocol.
 
  -Dana
  OffsetCommitRequest has two constructors now:
 
  For version 0:
   OffsetCommitRequest(String groupId, MapTopicPartition,
  PartitionData offsetData)
 
  And version 1:
  OffsetCommitRequest(String groupId, int generationId, String
  consumerId, MapTopicPartition, PartitionData offsetData)
 
  None of them seem to require timestamps... so I'm not sure where you
  see that this is required. Can you share an example?
 
  Gwen
 
  On Sun, Jan 4, 2015 at 11:15 PM, Dana Powers dana.pow...@rd.io wrote:
  Hi Joel,
 
  I'm looking more closely at the OffsetCommitRequest wire protocol change
  you mentioned below, and I cannot figure out how to explicitly
 construct a
  request with the earlier version.  Should the api version be different
 for
  requests that do not include it and/or servers that do not support the
  timestamp field?  It looks like 0.8.1.1 did not include the timestamp
  field
  and used api version 0.  But 0.8.2-beta seems to now require timestamps
  even when I explicitly encode OffsetCommitRequest api version 0 (server
  logs a BufferUnderflowException).
 
  Is this the expected server behavior?  Can you provide any tips on how
  third-party clients should manage the wire-protocol change for this api
  method (I'm working on kafka-python)?
 
  Thanks,
 
  -Dana
 
  On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy jjkosh...@gmail.com
 wrote:
 
  Yes it should be backwards compatible. So for e.g., you should be able
  to use an 0.8.1 client with an 0.8.2 broker. In general, you should
  not upgrade your clients until after the brokers have been upgraded.
  However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
  protocol change I'm aware of is the OffsetCommitRequest.  There is a
  change in the OffsetCommitRequest format (KAFKA-1634) although you can
  explicitly construct an OffsetCommitRequest with the earlier version.
 
  Thanks,
 
  Joel
 
  On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
   Hi Joel,
  
   Thanks for all the clarifications!  Just another question on this:
 will
   0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8?
   Generally speaking, would there be any concerns with using the 0.8.2
   consumer with a 0.8.1 broker, for instance?
  
   Marius
  
   On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy jjkosh...@gmail.com
  wrote:
  
Inline..
   
On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
 Hello everyone,

 I have a few questions about the current status and future of the
  Kafka
 consumers.

 We have been working to adding Kafka support in Spring XD [1],
  currently
 using the high level consumer via Spring Integration Kafka [2].
 We
  are
 working on adding features such as:
 - the ability to control offsets/replay topics;
 - the 

Re: Consumer and offset management support in 0.8.2 and 0.9

2015-01-04 Thread Dana Powers
Hi Joel,

I'm looking more closely at the OffsetCommitRequest wire protocol change
you mentioned below, and I cannot figure out how to explicitly construct a
request with the earlier version.  Should the api version be different for
requests that do not include it and/or servers that do not support the
timestamp field?  It looks like 0.8.1.1 did not include the timestamp field
and used api version 0.  But 0.8.2-beta seems to now require timestamps
even when I explicitly encode OffsetCommitRequest api version 0 (server
logs a BufferUnderflowException).

Is this the expected server behavior?  Can you provide any tips on how
third-party clients should manage the wire-protocol change for this api
method (I'm working on kafka-python)?

Thanks,

-Dana

On Tue, Nov 18, 2014 at 1:27 PM, Joel Koshy jjkosh...@gmail.com wrote:

 Yes it should be backwards compatible. So for e.g., you should be able
 to use an 0.8.1 client with an 0.8.2 broker. In general, you should
 not upgrade your clients until after the brokers have been upgraded.
 However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
 protocol change I'm aware of is the OffsetCommitRequest.  There is a
 change in the OffsetCommitRequest format (KAFKA-1634) although you can
 explicitly construct an OffsetCommitRequest with the earlier version.

 Thanks,

 Joel

 On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
  Hi Joel,
 
  Thanks for all the clarifications!  Just another question on this: will
  0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8?
  Generally speaking, would there be any concerns with using the 0.8.2
  consumer with a 0.8.1 broker, for instance?
 
  Marius
 
  On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy jjkosh...@gmail.com wrote:
 
   Inline..
  
   On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
Hello everyone,
   
I have a few questions about the current status and future of the
 Kafka
consumers.
   
We have been working to adding Kafka support in Spring XD [1],
 currently
using the high level consumer via Spring Integration Kafka [2]. We
 are
working on adding features such as:
- the ability to control offsets/replay topics;
- the ability to control partition allocation across multiple
 consumers;
   
We are currently at version 0.8.1.1, so using the simple consumer is
 a
pretty straightforward choice right now. However, in the light of the
upcoming consumer changes for 0.8.2 and 0.9, I have a few questions:
   
1) With respect to the consumer redesign for 0.9, what is the future
 of
   the
Simple Consumer and High Level Consumer? To my best understanding,
 the
existing high level consumer API will be deprecated in favour of the
 new
consumer API. What is the future of the Simple Consumer, in this
 case? it
will continue to exist as a low-level API implementing the Kafka
 protocol
[3] and providing the building blocks for the new consumer, or will
 it be
deprecated as well?
  
   The new consumer will subsume both use-cases (simple and high-level).
   You can still use the old SimpleConsumer if you wish - i.e., the wire
   protocol for fetch and other requests will still be supported.
  
   
2) Regarding the new consumer: the v0.8.2 codebase contains an early
implementation of it, but since this a feature scheduled only for
 0.9,
   what
is its status as well? Is it included only as a future reference and
 for
stabilizing the API?
  
   It is a WIP so you cannot really use it.
  
3) Obviously, offset management is a concern if using the simple
   consumer,
so - wondering about the Offset Management API as well. The Kafka
   protocol
document specifically indicates that it will be fully functional in
 0.8.2
[4] - however, a functional implementation is already available in
   0.8.1.1
(accessible via the SimpleConsumer API but not documented in [5]).
 Again,
trying to understand the extent of what 0.8.1.1 already supports
(ostensibly, the offset manager support seems to have been added
 only in
0.8.2 - please correct me if I am wrong), and whether if it is
   recommended
for use in production in any form (with the caveats that accompany
 the
   use
of ZooKeeper).
  
   In 0.8.2 the OffsetCommitRequest and OffsetFetchRequest will use Kafka
   as the offsets storage mechanism (not zookeeper). High-level Java
   consumers can choose to store offsets in ZooKeeper instead by setting
   offsets.storage=zookeeper
  
   However, if you are using the simple consumer and wish to store
   offsets in ZooKeeper you will need to commit to ZooKeeper directly.
   You can use ZkUtils in the kafka.utils package for this.
  
   If you wish to move to Kafka-based offsets we will be adding a new
   OffsetsClient that can be used to commit/fetch offsets to/from Kafka.
   This is currently not listed as a blocker for 0.8.2 but I think we
   should include it. I will update 

Re: Consumer and offset management support in 0.8.2 and 0.9

2014-11-18 Thread Joel Koshy
Inline..

On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
 Hello everyone,
 
 I have a few questions about the current status and future of the Kafka
 consumers.
 
 We have been working to adding Kafka support in Spring XD [1], currently
 using the high level consumer via Spring Integration Kafka [2]. We are
 working on adding features such as:
 - the ability to control offsets/replay topics;
 - the ability to control partition allocation across multiple consumers;
 
 We are currently at version 0.8.1.1, so using the simple consumer is a
 pretty straightforward choice right now. However, in the light of the
 upcoming consumer changes for 0.8.2 and 0.9, I have a few questions:
 
 1) With respect to the consumer redesign for 0.9, what is the future of the
 Simple Consumer and High Level Consumer? To my best understanding, the
 existing high level consumer API will be deprecated in favour of the new
 consumer API. What is the future of the Simple Consumer, in this case? it
 will continue to exist as a low-level API implementing the Kafka protocol
 [3] and providing the building blocks for the new consumer, or will it be
 deprecated as well?

The new consumer will subsume both use-cases (simple and high-level).
You can still use the old SimpleConsumer if you wish - i.e., the wire
protocol for fetch and other requests will still be supported.

 
 2) Regarding the new consumer: the v0.8.2 codebase contains an early
 implementation of it, but since this a feature scheduled only for 0.9, what
 is its status as well? Is it included only as a future reference and for
 stabilizing the API?

It is a WIP so you cannot really use it.

 3) Obviously, offset management is a concern if using the simple consumer,
 so - wondering about the Offset Management API as well. The Kafka protocol
 document specifically indicates that it will be fully functional in 0.8.2
 [4] - however, a functional implementation is already available in 0.8.1.1
 (accessible via the SimpleConsumer API but not documented in [5]). Again,
 trying to understand the extent of what 0.8.1.1 already supports
 (ostensibly, the offset manager support seems to have been added only in
 0.8.2 - please correct me if I am wrong), and whether if it is recommended
 for use in production in any form (with the caveats that accompany the use
 of ZooKeeper).

In 0.8.2 the OffsetCommitRequest and OffsetFetchRequest will use Kafka
as the offsets storage mechanism (not zookeeper). High-level Java
consumers can choose to store offsets in ZooKeeper instead by setting
offsets.storage=zookeeper

However, if you are using the simple consumer and wish to store
offsets in ZooKeeper you will need to commit to ZooKeeper directly.
You can use ZkUtils in the kafka.utils package for this.

If you wish to move to Kafka-based offsets we will be adding a new
OffsetsClient that can be used to commit/fetch offsets to/from Kafka.
This is currently not listed as a blocker for 0.8.2 but I think we
should include it. I will update that ticket.

 4) Trying to interpret the existing examples in [6] and the comments on [7]
 - the version of the Offset Management API that exists in 0.8.1.1 is using
 ZooKeeper - whereas ZooKeeper will be optional in 0.8.2 - to be replaced by
 Kafka, and phased out if possible. To my understanding, the switch between
 the two will be controlled by the broker configuration (along with other
 parameters that control the performance of offset queues. Is that correct?

The switch is a client-side configuration. That wiki is not
up-to-date. The most current documentation is available as a patch in
https://issues.apache.org/jira/browse/KAFKA-1729

 5) Also, wondering about the timeline of 0.8.2 - according to the roadmaps
 it should be released relatively shortly. Is that correct?

Yes - once the blockers are ironed out.

 
 Thanks,
 Marius
 
 [1] http://projects.spring.io/spring-xd/
 [2] https://github.com/spring-projects/spring-integration-kafka
 [3]
 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
 [4]
 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
 [5] http://kafka.apache.org/082/documentation.html#simpleconsumerapi
 [6]
 https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
 [7] https://issues.apache.org/jira/browse/KAFKA-1729



Re: Consumer and offset management support in 0.8.2 and 0.9

2014-11-18 Thread Marius Bogoevici
Hi Joel,

Thanks for all the clarifications!  Just another question on this: will
0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8?
Generally speaking, would there be any concerns with using the 0.8.2
consumer with a 0.8.1 broker, for instance?

Marius

On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy jjkosh...@gmail.com wrote:

 Inline..

 On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
  Hello everyone,
 
  I have a few questions about the current status and future of the Kafka
  consumers.
 
  We have been working to adding Kafka support in Spring XD [1], currently
  using the high level consumer via Spring Integration Kafka [2]. We are
  working on adding features such as:
  - the ability to control offsets/replay topics;
  - the ability to control partition allocation across multiple consumers;
 
  We are currently at version 0.8.1.1, so using the simple consumer is a
  pretty straightforward choice right now. However, in the light of the
  upcoming consumer changes for 0.8.2 and 0.9, I have a few questions:
 
  1) With respect to the consumer redesign for 0.9, what is the future of
 the
  Simple Consumer and High Level Consumer? To my best understanding, the
  existing high level consumer API will be deprecated in favour of the new
  consumer API. What is the future of the Simple Consumer, in this case? it
  will continue to exist as a low-level API implementing the Kafka protocol
  [3] and providing the building blocks for the new consumer, or will it be
  deprecated as well?

 The new consumer will subsume both use-cases (simple and high-level).
 You can still use the old SimpleConsumer if you wish - i.e., the wire
 protocol for fetch and other requests will still be supported.

 
  2) Regarding the new consumer: the v0.8.2 codebase contains an early
  implementation of it, but since this a feature scheduled only for 0.9,
 what
  is its status as well? Is it included only as a future reference and for
  stabilizing the API?

 It is a WIP so you cannot really use it.

  3) Obviously, offset management is a concern if using the simple
 consumer,
  so - wondering about the Offset Management API as well. The Kafka
 protocol
  document specifically indicates that it will be fully functional in 0.8.2
  [4] - however, a functional implementation is already available in
 0.8.1.1
  (accessible via the SimpleConsumer API but not documented in [5]). Again,
  trying to understand the extent of what 0.8.1.1 already supports
  (ostensibly, the offset manager support seems to have been added only in
  0.8.2 - please correct me if I am wrong), and whether if it is
 recommended
  for use in production in any form (with the caveats that accompany the
 use
  of ZooKeeper).

 In 0.8.2 the OffsetCommitRequest and OffsetFetchRequest will use Kafka
 as the offsets storage mechanism (not zookeeper). High-level Java
 consumers can choose to store offsets in ZooKeeper instead by setting
 offsets.storage=zookeeper

 However, if you are using the simple consumer and wish to store
 offsets in ZooKeeper you will need to commit to ZooKeeper directly.
 You can use ZkUtils in the kafka.utils package for this.

 If you wish to move to Kafka-based offsets we will be adding a new
 OffsetsClient that can be used to commit/fetch offsets to/from Kafka.
 This is currently not listed as a blocker for 0.8.2 but I think we
 should include it. I will update that ticket.

  4) Trying to interpret the existing examples in [6] and the comments on
 [7]
  - the version of the Offset Management API that exists in 0.8.1.1 is
 using
  ZooKeeper - whereas ZooKeeper will be optional in 0.8.2 - to be replaced
 by
  Kafka, and phased out if possible. To my understanding, the switch
 between
  the two will be controlled by the broker configuration (along with other
  parameters that control the performance of offset queues. Is that
 correct?

 The switch is a client-side configuration. That wiki is not
 up-to-date. The most current documentation is available as a patch in
 https://issues.apache.org/jira/browse/KAFKA-1729

  5) Also, wondering about the timeline of 0.8.2 - according to the
 roadmaps
  it should be released relatively shortly. Is that correct?

 Yes - once the blockers are ironed out.

 
  Thanks,
  Marius
 
  [1] http://projects.spring.io/spring-xd/
  [2] https://github.com/spring-projects/spring-integration-kafka
  [3]
 
 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol
  [4]
 
 https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommit/FetchAPI
  [5] http://kafka.apache.org/082/documentation.html#simpleconsumerapi
  [6]
 
 https://cwiki.apache.org/confluence/display/KAFKA/Committing+and+fetching+consumer+offsets+in+Kafka
  [7] https://issues.apache.org/jira/browse/KAFKA-1729




Re: Consumer and offset management support in 0.8.2 and 0.9

2014-11-18 Thread Joel Koshy
Yes it should be backwards compatible. So for e.g., you should be able
to use an 0.8.1 client with an 0.8.2 broker. In general, you should
not upgrade your clients until after the brokers have been upgraded.
However, you can point an 0.8.2 client at an 0.8.1 broker. One wire
protocol change I'm aware of is the OffsetCommitRequest.  There is a
change in the OffsetCommitRequest format (KAFKA-1634) although you can
explicitly construct an OffsetCommitRequest with the earlier version.

Thanks,

Joel

On Tue, Nov 18, 2014 at 03:39:41PM -0500, Marius Bogoevici wrote:
 Hi Joel,
 
 Thanks for all the clarifications!  Just another question on this: will
 0.8.2 be backwards compatible with 0.8.1, just as 0.8.1 was with 0.8?
 Generally speaking, would there be any concerns with using the 0.8.2
 consumer with a 0.8.1 broker, for instance?
 
 Marius
 
 On Tue, Nov 18, 2014 at 2:55 PM, Joel Koshy jjkosh...@gmail.com wrote:
 
  Inline..
 
  On Tue, Nov 18, 2014 at 04:26:04AM -0500, Marius Bogoevici wrote:
   Hello everyone,
  
   I have a few questions about the current status and future of the Kafka
   consumers.
  
   We have been working to adding Kafka support in Spring XD [1], currently
   using the high level consumer via Spring Integration Kafka [2]. We are
   working on adding features such as:
   - the ability to control offsets/replay topics;
   - the ability to control partition allocation across multiple consumers;
  
   We are currently at version 0.8.1.1, so using the simple consumer is a
   pretty straightforward choice right now. However, in the light of the
   upcoming consumer changes for 0.8.2 and 0.9, I have a few questions:
  
   1) With respect to the consumer redesign for 0.9, what is the future of
  the
   Simple Consumer and High Level Consumer? To my best understanding, the
   existing high level consumer API will be deprecated in favour of the new
   consumer API. What is the future of the Simple Consumer, in this case? it
   will continue to exist as a low-level API implementing the Kafka protocol
   [3] and providing the building blocks for the new consumer, or will it be
   deprecated as well?
 
  The new consumer will subsume both use-cases (simple and high-level).
  You can still use the old SimpleConsumer if you wish - i.e., the wire
  protocol for fetch and other requests will still be supported.
 
  
   2) Regarding the new consumer: the v0.8.2 codebase contains an early
   implementation of it, but since this a feature scheduled only for 0.9,
  what
   is its status as well? Is it included only as a future reference and for
   stabilizing the API?
 
  It is a WIP so you cannot really use it.
 
   3) Obviously, offset management is a concern if using the simple
  consumer,
   so - wondering about the Offset Management API as well. The Kafka
  protocol
   document specifically indicates that it will be fully functional in 0.8.2
   [4] - however, a functional implementation is already available in
  0.8.1.1
   (accessible via the SimpleConsumer API but not documented in [5]). Again,
   trying to understand the extent of what 0.8.1.1 already supports
   (ostensibly, the offset manager support seems to have been added only in
   0.8.2 - please correct me if I am wrong), and whether if it is
  recommended
   for use in production in any form (with the caveats that accompany the
  use
   of ZooKeeper).
 
  In 0.8.2 the OffsetCommitRequest and OffsetFetchRequest will use Kafka
  as the offsets storage mechanism (not zookeeper). High-level Java
  consumers can choose to store offsets in ZooKeeper instead by setting
  offsets.storage=zookeeper
 
  However, if you are using the simple consumer and wish to store
  offsets in ZooKeeper you will need to commit to ZooKeeper directly.
  You can use ZkUtils in the kafka.utils package for this.
 
  If you wish to move to Kafka-based offsets we will be adding a new
  OffsetsClient that can be used to commit/fetch offsets to/from Kafka.
  This is currently not listed as a blocker for 0.8.2 but I think we
  should include it. I will update that ticket.
 
   4) Trying to interpret the existing examples in [6] and the comments on
  [7]
   - the version of the Offset Management API that exists in 0.8.1.1 is
  using
   ZooKeeper - whereas ZooKeeper will be optional in 0.8.2 - to be replaced
  by
   Kafka, and phased out if possible. To my understanding, the switch
  between
   the two will be controlled by the broker configuration (along with other
   parameters that control the performance of offset queues. Is that
  correct?
 
  The switch is a client-side configuration. That wiki is not
  up-to-date. The most current documentation is available as a patch in
  https://issues.apache.org/jira/browse/KAFKA-1729
 
   5) Also, wondering about the timeline of 0.8.2 - according to the
  roadmaps
   it should be released relatively shortly. Is that correct?
 
  Yes - once the blockers are ironed out.
 
  
   Thanks,
   Marius
  
   [1]