Re: Review Request 27391: Fix KAFKA-1634

2015-02-17 Thread Joel Koshy


 On Feb. 4, 2015, 2:15 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala, line 48
  https://reviews.apache.org/r/27391/diff/11/?file=832423#file832423line48
 
  I our convention is to include the if in the previous line.
 
 Guozhang Wang wrote:
 I checked the code base and it seems we do not have a consensus here.. 
 and personally I would prefer this as it actually make the logic clearer.

We don't have a formal convention here but I think we should and incorporate it 
into our coding guidelines. The problem with a separate line is that at first 
glance (especially with just two character indentation) it does not seem to be 
associated with the assignment. Also, most current occurrences put the if on 
the same line.
```
find . -name *.scala -exec pcregrep -c '=(\s)*if' {} \; | grep -v 0 | paste 
-s -d+ | bc
61
find . -name *.scala -exec pcregrep -Mc '=(\s)*\n(\s)*if' {} \; | grep -v 0 | 
paste -s -d+ | bc
36
```


 On Feb. 4, 2015, 2:15 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala, line 36
  https://reviews.apache.org/r/27391/diff/11/?file=832425#file832425line36
 
  (This is also a public API change - although you did add an Object 
  wrapper further down that comes close to the original API.)
 
 Guozhang Wang wrote:
 I think the wrapper MessageAndMetadata preserves the existing public API 
 right?

You mean the wrapper object? It comes close, but not quite - since you can 
instantiate a case class with a `new` keyword or without. You need it for the 
secondary constructors of the case class. With the object wrapper we assume 
that the objects were being constructed without the new. I don't know how many 
people actually used it though, but it was part of the public API since you 
would need to create those objects to form an OffsetCommitRequest.


 On Feb. 4, 2015, 2:15 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/server/KafkaApis.scala, line 163
  https://reviews.apache.org/r/27391/diff/11/?file=832427#file832427line163
 
  Shouldn't the commit timestamp _always_ be set to the current time?
  
  What I was thinking is this:
  If v0:
  - An explicit timestamp is provided only to override the v0 default 
  retention which is add the server-side retention to the current timestamp. 
  The (true) commit timestamp - i.e., receive time is useful for debugging 
  purposes. So if an explicit timestamp is provided in v0 then use that to 
  compute the absolute expire timestamp which will be the given commit 
  timestamp; so you would store (commitTimestamp = now; expireTimestamp = 
  given commitTimeStamp); if v0 and commit timestamp is default, then you 
  would store (commitTimestamp = now, expireTimestamp = now + offsetRetention)
  - if v1: (commitTimestamp = now, expireTimestamp = now + 
  offsetRetention)
  
  This way, you should have correct expiration behavior for v0, v1 and v2 
  and at the same time have the true commit timestamp - i.e., the receive 
  time at the broker which is useful for debugging. (also see comment in 
  OffsetManager)
 
 Guozhang Wang wrote:
 In v0/v1, the commit timestamp can be specified as a future timestamp so 
 the expiration timestamp = commit timestamp + retention (in v0/v1 it is 
 always the default value).
 
 This behavior should not be respected, i.e. offsets already stored in v0 
 and v1 format should be expired correctly using 0.8.2 code. Details can be 
 found in Jun's comments and my replies.

I don't think we are on the same page here. Let's discuss offline to follow-up.


 On Feb. 4, 2015, 2:15 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/server/OffsetManager.scala, line 65
  https://reviews.apache.org/r/27391/diff/11/?file=832430#file832430line65
 
  Should we call this maxOffsetRetentionMs instead?
 
 Guozhang Wang wrote:
 Not exactly, as it is just the default offset retention, not the upper 
 limit: users can specify a value larger than this default and it will still 
 be accepted.

Yes you are right.


- Joel


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review70836
---


On Feb. 6, 2015, 7:01 p.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27391/
 ---
 
 (Updated Feb. 6, 2015, 7:01 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1634
 https://issues.apache.org/jira/browse/KAFKA-1634
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Incorporated Joel's comments
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
 09a6f11163ecb1e733c604ade04646e83bbc0c85 
   

Re: Review Request 27391: Fix KAFKA-1634

2015-02-06 Thread Guozhang Wang


 On Feb. 4, 2015, 2:15 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala, line 48
  https://reviews.apache.org/r/27391/diff/11/?file=832423#file832423line48
 
  I our convention is to include the if in the previous line.

I checked the code base and it seems we do not have a consensus here.. and 
personally I would prefer this as it actually make the logic clearer.


 On Feb. 4, 2015, 2:15 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala, line 36
  https://reviews.apache.org/r/27391/diff/11/?file=832425#file832425line36
 
  (This is also a public API change - although you did add an Object 
  wrapper further down that comes close to the original API.)

I think the wrapper MessageAndMetadata preserves the existing public API right?


 On Feb. 4, 2015, 2:15 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/server/OffsetManager.scala, line 65
  https://reviews.apache.org/r/27391/diff/11/?file=832430#file832430line65
 
  Should we call this maxOffsetRetentionMs instead?

Not exactly, as it is just the default offset retention, not the upper limit: 
users can specify a value larger than this default and it will still be 
accepted.


 On Feb. 4, 2015, 2:15 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/server/KafkaApis.scala, line 163
  https://reviews.apache.org/r/27391/diff/11/?file=832427#file832427line163
 
  Shouldn't the commit timestamp _always_ be set to the current time?
  
  What I was thinking is this:
  If v0:
  - An explicit timestamp is provided only to override the v0 default 
  retention which is add the server-side retention to the current timestamp. 
  The (true) commit timestamp - i.e., receive time is useful for debugging 
  purposes. So if an explicit timestamp is provided in v0 then use that to 
  compute the absolute expire timestamp which will be the given commit 
  timestamp; so you would store (commitTimestamp = now; expireTimestamp = 
  given commitTimeStamp); if v0 and commit timestamp is default, then you 
  would store (commitTimestamp = now, expireTimestamp = now + offsetRetention)
  - if v1: (commitTimestamp = now, expireTimestamp = now + 
  offsetRetention)
  
  This way, you should have correct expiration behavior for v0, v1 and v2 
  and at the same time have the true commit timestamp - i.e., the receive 
  time at the broker which is useful for debugging. (also see comment in 
  OffsetManager)

In v0/v1, the commit timestamp can be specified as a future timestamp so the 
expiration timestamp = commit timestamp + retention (in v0/v1 it is always the 
default value).

This behavior should not be respected, i.e. offsets already stored in v0 and v1 
format should be expired correctly using 0.8.2 code. Details can be found in 
Jun's comments and my replies.


 On Feb. 4, 2015, 2:15 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/server/OffsetManager.scala, line 557
  https://reviews.apache.org/r/27391/diff/11/?file=832430#file832430line557
 
  follow-up from above comment...
  and here you would set:
  commitTimestamp = timestamp
  expireTimestamp = timestamp
  
  So do you think this would work overall?
  
  I could be wrong - this patch has proven to be much trickier than we 
  originally thought.

See the comments above.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review70836
---


On Jan. 24, 2015, 12:06 a.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27391/
 ---
 
 (Updated Jan. 24, 2015, 12:06 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1634
 https://issues.apache.org/jira/browse/KAFKA-1634
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Incorporated Jun's comments
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
 121e880a941fcd3e6392859edba11a94236494cc 
   
 clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
   
 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  df37fc6d8f0db0b8192a948426af603be3444da4 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 050615c72efe7dbaa4634f53943bd73273d20ffb 
   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
 c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
 4cabffeacea09a49913505db19a96a55d58c0909 
   

Re: Review Request 27391: Fix KAFKA-1634

2015-02-06 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/
---

(Updated Feb. 6, 2015, 7:01 p.m.)


Review request for kafka.


Bugs: KAFKA-1634
https://issues.apache.org/jira/browse/KAFKA-1634


Repository: kafka


Description (updated)
---

Incorporated Joel's comments


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
09a6f11163ecb1e733c604ade04646e83bbc0c85 
  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
101f382170ad6740b3f8ff2d27b93a64874a857f 
  clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
ff89f0e37d5fa787b0218eff86d169aaeae2107b 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 
94e9d376235b3288836807d8e8d2547b3743aad5 
  
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 
13237fd72da5448a3d596b882fef141f336f827d 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
050615c72efe7dbaa4634f53943bd73273d20ffb 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
4cabffeacea09a49913505db19a96a55d58c0909 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
5487259751ebe19f137948249aa1fd2637d2deb4 
  core/src/main/scala/kafka/server/KafkaApis.scala 
f2b027bf944e735fd52cc282690ec1b8395f9290 
  core/src/main/scala/kafka/server/KafkaConfig.scala 
6d74983472249eac808d361344c58cc2858ec971 
  core/src/main/scala/kafka/server/KafkaServer.scala 
89200da30a04943f0b9befe84ab17e62b747c8c4 
  core/src/main/scala/kafka/server/OffsetManager.scala 
0bdd42fea931cddd072c0fff765b10526db6840a 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
fb948b9ab28c516e81dab14dcbe211dcd99842b6 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
a1f72f8c2042ff2a43af503b2e06f84706dad9db 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
5b93239cdc26b5be7696f4e7863adb9fbe5f0ed5 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
ba1e48e4300c9fb32e36e7266cb05294f2a481e5 

Diff: https://reviews.apache.org/r/27391/diff/


Testing
---


Thanks,

Guozhang Wang



Re: Review Request 27391: Fix KAFKA-1634

2015-02-03 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review70836
---



clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
https://reviews.apache.org/r/27391/#comment116277

remove both



clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
https://reviews.apache.org/r/27391/#comment116310

Should we revert this rename since this is part of the public API? I would 
be surprised if people are using it though - but still.



core/src/main/scala/kafka/api/OffsetCommitRequest.scala
https://reviews.apache.org/r/27391/#comment116314

I our convention is to include the if in the previous line.



core/src/main/scala/kafka/api/OffsetCommitRequest.scala
https://reviews.apache.org/r/27391/#comment116315

same here



core/src/main/scala/kafka/api/OffsetCommitRequest.scala
https://reviews.apache.org/r/27391/#comment116316

and here



core/src/main/scala/kafka/api/OffsetCommitRequest.scala
https://reviews.apache.org/r/27391/#comment116317

and here



core/src/main/scala/kafka/api/OffsetFetchRequest.scala
https://reviews.apache.org/r/27391/#comment116318

Can you run organize imports? Some of these seem redundant/unnecessary.



core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
https://reviews.apache.org/r/27391/#comment116320

(This is also a public API change - although you did add an Object wrapper 
further down that comes close to the original API.)



core/src/main/scala/kafka/server/KafkaApis.scala
https://reviews.apache.org/r/27391/#comment116322

Shouldn't the commit timestamp _always_ be set to the current time?

What I was thinking is this:
If v0:
- An explicit timestamp is provided only to override the v0 default 
retention which is add the server-side retention to the current timestamp. The 
(true) commit timestamp - i.e., receive time is useful for debugging purposes. 
So if an explicit timestamp is provided in v0 then use that to compute the 
absolute expire timestamp which will be the given commit timestamp; so you 
would store (commitTimestamp = now; expireTimestamp = given commitTimeStamp); 
if v0 and commit timestamp is default, then you would store (commitTimestamp = 
now, expireTimestamp = now + offsetRetention)
- if v1: (commitTimestamp = now, expireTimestamp = now + offsetRetention)

This way, you should have correct expiration behavior for v0, v1 and v2 and 
at the same time have the true commit timestamp - i.e., the receive time at the 
broker which is useful for debugging. (also see comment in OffsetManager)



core/src/main/scala/kafka/server/OffsetManager.scala
https://reviews.apache.org/r/27391/#comment116374

Should we call this maxOffsetRetentionMs instead?



core/src/main/scala/kafka/server/OffsetManager.scala
https://reviews.apache.org/r/27391/#comment116372

follow-up from above comment...
and here you would set:
commitTimestamp = timestamp
expireTimestamp = timestamp

So do you think this would work overall?

I could be wrong - this patch has proven to be much trickier than we 
originally thought.


- Joel Koshy


On Jan. 24, 2015, 12:06 a.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27391/
 ---
 
 (Updated Jan. 24, 2015, 12:06 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1634
 https://issues.apache.org/jira/browse/KAFKA-1634
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Incorporated Jun's comments
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
 121e880a941fcd3e6392859edba11a94236494cc 
   
 clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
   
 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  df37fc6d8f0db0b8192a948426af603be3444da4 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 050615c72efe7dbaa4634f53943bd73273d20ffb 
   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
 c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
 4cabffeacea09a49913505db19a96a55d58c0909 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 5487259751ebe19f137948249aa1fd2637d2deb4 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 ec8d9f7ba44741db40875458bd524c4062ad6a26 
   core/src/main/scala/kafka/server/KafkaConfig.scala 
 6d74983472249eac808d361344c58cc2858ec971 
   

Re: Review Request 27391: Fix KAFKA-1634

2015-01-23 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/
---

(Updated Jan. 24, 2015, 12:06 a.m.)


Review request for kafka.


Bugs: KAFKA-1634
https://issues.apache.org/jira/browse/KAFKA-1634


Repository: kafka


Description
---

Incorporated Jun's comments


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
7517b879866fc5dad5f8d8ad30636da8bbe7784a 
  clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
121e880a941fcd3e6392859edba11a94236494cc 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 
3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
  
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 
df37fc6d8f0db0b8192a948426af603be3444da4 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
050615c72efe7dbaa4634f53943bd73273d20ffb 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
4cabffeacea09a49913505db19a96a55d58c0909 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
5487259751ebe19f137948249aa1fd2637d2deb4 
  core/src/main/scala/kafka/server/KafkaApis.scala 
ec8d9f7ba44741db40875458bd524c4062ad6a26 
  core/src/main/scala/kafka/server/KafkaConfig.scala 
6d74983472249eac808d361344c58cc2858ec971 
  core/src/main/scala/kafka/server/KafkaServer.scala 
89200da30a04943f0b9befe84ab17e62b747c8c4 
  core/src/main/scala/kafka/server/OffsetManager.scala 
0bdd42fea931cddd072c0fff765b10526db6840a 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
e58fbb922e93b0c31dff04f187fcadb4ece986d7 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
cd16ced5465d098be7a60498326b2a98c248f343 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
5b93239cdc26b5be7696f4e7863adb9fbe5f0ed5 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
ba1e48e4300c9fb32e36e7266cb05294f2a481e5 

Diff: https://reviews.apache.org/r/27391/diff/


Testing
---


Thanks,

Guozhang Wang



Re: Review Request 27391: Fix KAFKA-1634

2015-01-22 Thread Guozhang Wang


 On Jan. 22, 2015, 1:56 a.m., Jun Rao wrote:
  core/src/main/scala/kafka/server/KafkaApis.scala, lines 145-166
  https://reviews.apache.org/r/27391/diff/9/?file=829147#file829147line145
 
  I am not sure that we should change the timestamp for offsets produced 
  in V0 and V1. There could be data in the offset topic already written by 
  0.8.2 code. See the other comment in OffsetManager on expiring.

I think if it (the commit timestamp) is set to default value -1, we should 
override it according to the wiki:

https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest

Otherwise it should not be overriden.


 On Jan. 22, 2015, 1:56 a.m., Jun Rao wrote:
  core/src/main/scala/kafka/server/OffsetManager.scala, lines 121-123
  https://reviews.apache.org/r/27391/diff/9/?file=829150#file829150line121
 
  Does that change work correctly with offsets already stored in v0 and 
  v1 format using 0.8.2 code? Would those offsets still be expired at the 
  right time?

Changed the logic of overriding commit / expire timestamps as the following:

1. If version = 1 or retention time is default (-1) override retention time to 
server default value.
2. If the original time stamp (i.e. the commit timestamp) is set to default 
(-1), override to the current time.
3. After 2) is done, compute the expire time to be commit timestamp + retention 
time.
4. Hence the above logic of checking expiration will be compatible (i.e. 
expiration time  now).


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review69106
---


On Jan. 22, 2015, 12:43 a.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27391/
 ---
 
 (Updated Jan. 22, 2015, 12:43 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1634
 https://issues.apache.org/jira/browse/KAFKA-1634
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Incorporated Joel's comments
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
 121e880a941fcd3e6392859edba11a94236494cc 
   
 clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
   
 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  df37fc6d8f0db0b8192a948426af603be3444da4 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 050615c72efe7dbaa4634f53943bd73273d20ffb 
   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
 c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
 4cabffeacea09a49913505db19a96a55d58c0909 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 191a8677444e53b043e9ad6e94c5a9191c32599e 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 ec8d9f7ba44741db40875458bd524c4062ad6a26 
   core/src/main/scala/kafka/server/KafkaConfig.scala 
 6d74983472249eac808d361344c58cc2858ec971 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 89200da30a04943f0b9befe84ab17e62b747c8c4 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 0bdd42fea931cddd072c0fff765b10526db6840a 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 e58fbb922e93b0c31dff04f187fcadb4ece986d7 
   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
 cd16ced5465d098be7a60498326b2a98c248f343 
   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
 5b93239cdc26b5be7696f4e7863adb9fbe5f0ed5 
   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
 ba1e48e4300c9fb32e36e7266cb05294f2a481e5 
 
 Diff: https://reviews.apache.org/r/27391/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 27391: Fix KAFKA-1634

2015-01-22 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/
---

(Updated Jan. 23, 2015, 2:47 a.m.)


Review request for kafka.


Bugs: KAFKA-1634
https://issues.apache.org/jira/browse/KAFKA-1634


Repository: kafka


Description (updated)
---

Incorporated Jun's comments


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
7517b879866fc5dad5f8d8ad30636da8bbe7784a 
  clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
121e880a941fcd3e6392859edba11a94236494cc 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 
3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
  
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 
df37fc6d8f0db0b8192a948426af603be3444da4 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
050615c72efe7dbaa4634f53943bd73273d20ffb 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
4cabffeacea09a49913505db19a96a55d58c0909 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
5487259751ebe19f137948249aa1fd2637d2deb4 
  core/src/main/scala/kafka/server/KafkaApis.scala 
ec8d9f7ba44741db40875458bd524c4062ad6a26 
  core/src/main/scala/kafka/server/KafkaConfig.scala 
6d74983472249eac808d361344c58cc2858ec971 
  core/src/main/scala/kafka/server/KafkaServer.scala 
89200da30a04943f0b9befe84ab17e62b747c8c4 
  core/src/main/scala/kafka/server/OffsetManager.scala 
0bdd42fea931cddd072c0fff765b10526db6840a 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
e58fbb922e93b0c31dff04f187fcadb4ece986d7 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
cd16ced5465d098be7a60498326b2a98c248f343 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
5b93239cdc26b5be7696f4e7863adb9fbe5f0ed5 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
ba1e48e4300c9fb32e36e7266cb05294f2a481e5 

Diff: https://reviews.apache.org/r/27391/diff/


Testing
---


Thanks,

Guozhang Wang



Re: Review Request 27391: Fix KAFKA-1634

2015-01-21 Thread Guozhang Wang


 On Jan. 20, 2015, 4:35 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/server/KafkaApis.scala, line 147
  https://reviews.apache.org/r/27391/diff/8/?file=822017#file822017line147
 
  I just realized that if we have a v0 or v1 request then we use the 
  offset manager default retention which is one day.
  
  However, if it is v2 and the user does not override it in the offset 
  commit request, then the retention defaults to Long.MaxValue. I think that 
  default makes sense for OffsetCommitRequest. However, I think the broker 
  needs to protect itself and have an upper threshold for retention. i.e., 
  maybe we should have a maxRetentionMs config in the broker.
  
  What do you think?

Agreed, I change the behavior to be use the default value if it is  v2 or if 
the retention period is default value (meaning user did not specify it).


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review68729
---


On Jan. 14, 2015, 11:50 p.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27391/
 ---
 
 (Updated Jan. 14, 2015, 11:50 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1634
 https://issues.apache.org/jira/browse/KAFKA-1634
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Incorporated Joel and Jun's comments
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
 121e880a941fcd3e6392859edba11a94236494cc 
   
 clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
   
 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  df37fc6d8f0db0b8192a948426af603be3444da4 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 050615c72efe7dbaa4634f53943bd73273d20ffb 
   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
 c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
 4cabffeacea09a49913505db19a96a55d58c0909 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 191a8677444e53b043e9ad6e94c5a9191c32599e 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 c011a1b79bd6c4e832fe7d097daacb0d647d1cd4 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 a069eb9272c92ef62387304b60de1fe473d7ff49 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 3c79428962604800983415f6f705e04f52acb8fb 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 e58fbb922e93b0c31dff04f187fcadb4ece986d7 
   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
 cd16ced5465d098be7a60498326b2a98c248f343 
   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
 4a3a5b264a021e55c39f4d7424ce04ee591503ef 
   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
 ba1e48e4300c9fb32e36e7266cb05294f2a481e5 
 
 Diff: https://reviews.apache.org/r/27391/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 27391: Fix KAFKA-1634

2015-01-21 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/
---

(Updated Jan. 22, 2015, 12:43 a.m.)


Review request for kafka.


Bugs: KAFKA-1634
https://issues.apache.org/jira/browse/KAFKA-1634


Repository: kafka


Description (updated)
---

Incorporated Joel's comments


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
7517b879866fc5dad5f8d8ad30636da8bbe7784a 
  clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
121e880a941fcd3e6392859edba11a94236494cc 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 
3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
  
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 
df37fc6d8f0db0b8192a948426af603be3444da4 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
050615c72efe7dbaa4634f53943bd73273d20ffb 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
4cabffeacea09a49913505db19a96a55d58c0909 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
191a8677444e53b043e9ad6e94c5a9191c32599e 
  core/src/main/scala/kafka/server/KafkaApis.scala 
ec8d9f7ba44741db40875458bd524c4062ad6a26 
  core/src/main/scala/kafka/server/KafkaConfig.scala 
6d74983472249eac808d361344c58cc2858ec971 
  core/src/main/scala/kafka/server/KafkaServer.scala 
89200da30a04943f0b9befe84ab17e62b747c8c4 
  core/src/main/scala/kafka/server/OffsetManager.scala 
0bdd42fea931cddd072c0fff765b10526db6840a 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
e58fbb922e93b0c31dff04f187fcadb4ece986d7 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
cd16ced5465d098be7a60498326b2a98c248f343 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
5b93239cdc26b5be7696f4e7863adb9fbe5f0ed5 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
ba1e48e4300c9fb32e36e7266cb05294f2a481e5 

Diff: https://reviews.apache.org/r/27391/diff/


Testing
---


Thanks,

Guozhang Wang



Re: Review Request 27391: Fix KAFKA-1634

2015-01-21 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review69106
---


Thanks for the patch. A few more comments.


clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
https://reviews.apache.org/r/27391/#comment113727

Would it be better to use -1L as the default retention time? MAX_VALUE 
could be useful for the case when a client wants the offset never to be expired.



core/src/main/scala/kafka/api/OffsetCommitRequest.scala
https://reviews.apache.org/r/27391/#comment113724

It seems that our coding convention has been not to use {} on a single line 
in the body. So, we use
if ()
  do sth
instead of 
if () {
  do sth
}



core/src/main/scala/kafka/server/KafkaApis.scala
https://reviews.apache.org/r/27391/#comment113730

I am not sure that we should change the timestamp for offsets produced in 
V0 and V1. There could be data in the offset topic already written by 0.8.2 
code. See the other comment in OffsetManager on expiring.



core/src/main/scala/kafka/server/OffsetManager.scala
https://reviews.apache.org/r/27391/#comment113729

Does that change work correctly with offsets already stored in v0 and v1 
format using 0.8.2 code? Would those offsets still be expired at the right time?


- Jun Rao


On Jan. 22, 2015, 12:43 a.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27391/
 ---
 
 (Updated Jan. 22, 2015, 12:43 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1634
 https://issues.apache.org/jira/browse/KAFKA-1634
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Incorporated Joel's comments
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
 121e880a941fcd3e6392859edba11a94236494cc 
   
 clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
   
 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  df37fc6d8f0db0b8192a948426af603be3444da4 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 050615c72efe7dbaa4634f53943bd73273d20ffb 
   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
 c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
 4cabffeacea09a49913505db19a96a55d58c0909 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 191a8677444e53b043e9ad6e94c5a9191c32599e 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 ec8d9f7ba44741db40875458bd524c4062ad6a26 
   core/src/main/scala/kafka/server/KafkaConfig.scala 
 6d74983472249eac808d361344c58cc2858ec971 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 89200da30a04943f0b9befe84ab17e62b747c8c4 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 0bdd42fea931cddd072c0fff765b10526db6840a 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 e58fbb922e93b0c31dff04f187fcadb4ece986d7 
   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
 cd16ced5465d098be7a60498326b2a98c248f343 
   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
 5b93239cdc26b5be7696f4e7863adb9fbe5f0ed5 
   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
 ba1e48e4300c9fb32e36e7266cb05294f2a481e5 
 
 Diff: https://reviews.apache.org/r/27391/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 27391: Fix KAFKA-1634

2015-01-20 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review68729
---



core/src/main/scala/kafka/server/KafkaApis.scala
https://reviews.apache.org/r/27391/#comment113113

I just realized that if we have a v0 or v1 request then we use the offset 
manager default retention which is one day.

However, if it is v2 and the user does not override it in the offset commit 
request, then the retention defaults to Long.MaxValue. I think that default 
makes sense for OffsetCommitRequest. However, I think the broker needs to 
protect itself and have an upper threshold for retention. i.e., maybe we should 
have a maxRetentionMs config in the broker.

What do you think?



core/src/main/scala/kafka/server/KafkaApis.scala
https://reviews.apache.org/r/27391/#comment113111

if it is _after_ v2



core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
https://reviews.apache.org/r/27391/#comment113114

This file needs to be rebased.


- Joel Koshy


On Jan. 14, 2015, 11:50 p.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27391/
 ---
 
 (Updated Jan. 14, 2015, 11:50 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1634
 https://issues.apache.org/jira/browse/KAFKA-1634
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Incorporated Joel and Jun's comments
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
 121e880a941fcd3e6392859edba11a94236494cc 
   
 clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
   
 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  df37fc6d8f0db0b8192a948426af603be3444da4 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 050615c72efe7dbaa4634f53943bd73273d20ffb 
   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
 c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
 4cabffeacea09a49913505db19a96a55d58c0909 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 191a8677444e53b043e9ad6e94c5a9191c32599e 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 c011a1b79bd6c4e832fe7d097daacb0d647d1cd4 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 a069eb9272c92ef62387304b60de1fe473d7ff49 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 3c79428962604800983415f6f705e04f52acb8fb 
   core/src/main/scala/kafka/server/ReplicaManager.scala 
 e58fbb922e93b0c31dff04f187fcadb4ece986d7 
   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
 cd16ced5465d098be7a60498326b2a98c248f343 
   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
 4a3a5b264a021e55c39f4d7424ce04ee591503ef 
   core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
 ba1e48e4300c9fb32e36e7266cb05294f2a481e5 
 
 Diff: https://reviews.apache.org/r/27391/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 27391: Fix KAFKA-1634

2015-01-14 Thread Guozhang Wang


 On Jan. 8, 2015, 2:24 a.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java,
   lines 79-85
  https://reviews.apache.org/r/27391/diff/1/?file=743505#file743505line79
 
  Perhaps these code can just be changed to 
  
  this(groupId, DEFAULT_GENERATION_ID, DEFAULT_CONSUMER_ID, offsetData);

This cannot be forwarded as in super() call we need to specify the version id.


 On Jan. 8, 2015, 2:24 a.m., Jun Rao wrote:
  clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java,
   lines 97-106
  https://reviews.apache.org/r/27391/diff/1/?file=743505#file743505line97
 
  Same here. These code can just be replaced by forwarding the request to 
  the next constructor.

Ditto above.


 On Jan. 8, 2015, 2:24 a.m., Jun Rao wrote:
  core/src/main/scala/kafka/server/OffsetManager.scala, lines 524-525
  https://reviews.apache.org/r/27391/diff/7/?file=779891#file779891line524
 
  Shouldn't we just set the expiration time field to expirationTimestamp, 
  instead of taking it from offsetAndMetadata?

For v0/1, we should just take the value of the offsetAndMetadata.timestamp, for 
v2 we will take the value of expirationTimestamp. This has been changed in the 
latest patch where offsetAndMetadata.timestamp is updated accordingly before 
calling offsetCommitValue().


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review67114
---


On Dec. 2, 2014, 2:03 a.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27391/
 ---
 
 (Updated Dec. 2, 2014, 2:03 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1634
 https://issues.apache.org/jira/browse/KAFKA-1634
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Add another api in offset manager to return the struct, and the cache layer 
 will only read its expiration timestamp while the offset formatter will read 
 the struct as a whole
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
 121e880a941fcd3e6392859edba11a94236494cc 
   
 clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
   
 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  df37fc6d8f0db0b8192a948426af603be3444da4 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 050615c72efe7dbaa4634f53943bd73273d20ffb 
   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
 c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
 4cabffeacea09a49913505db19a96a55d58c0909 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 da29a8cb461099eb675161db2f11a9937424a5c6 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 2a1c0326b6e6966d8b8254bd6a1cb83ad98a3b80 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 1bf7d10cef23a77e71eb16bf6d0e68bc4ebe 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 3c79428962604800983415f6f705e04f52acb8fb 
   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
 cd16ced5465d098be7a60498326b2a98c248f343 
   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
 8c5364fa97da1be09973c176d1baeb339455d319 
 
 Diff: https://reviews.apache.org/r/27391/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 27391: Fix KAFKA-1634

2015-01-14 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/
---

(Updated Jan. 14, 2015, 11:50 p.m.)


Review request for kafka.


Bugs: KAFKA-1634
https://issues.apache.org/jira/browse/KAFKA-1634


Repository: kafka


Description (updated)
---

Incorporated Joel and Jun's comments


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
7517b879866fc5dad5f8d8ad30636da8bbe7784a 
  clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
121e880a941fcd3e6392859edba11a94236494cc 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 
3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
  
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 
df37fc6d8f0db0b8192a948426af603be3444da4 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
050615c72efe7dbaa4634f53943bd73273d20ffb 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
4cabffeacea09a49913505db19a96a55d58c0909 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
191a8677444e53b043e9ad6e94c5a9191c32599e 
  core/src/main/scala/kafka/server/KafkaApis.scala 
c011a1b79bd6c4e832fe7d097daacb0d647d1cd4 
  core/src/main/scala/kafka/server/KafkaServer.scala 
a069eb9272c92ef62387304b60de1fe473d7ff49 
  core/src/main/scala/kafka/server/OffsetManager.scala 
3c79428962604800983415f6f705e04f52acb8fb 
  core/src/main/scala/kafka/server/ReplicaManager.scala 
e58fbb922e93b0c31dff04f187fcadb4ece986d7 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
cd16ced5465d098be7a60498326b2a98c248f343 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
4a3a5b264a021e55c39f4d7424ce04ee591503ef 
  core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 
ba1e48e4300c9fb32e36e7266cb05294f2a481e5 

Diff: https://reviews.apache.org/r/27391/diff/


Testing
---


Thanks,

Guozhang Wang



Re: Review Request 27391: Fix KAFKA-1634

2015-01-13 Thread Guozhang Wang


 On Dec. 18, 2014, 8:42 a.m., Joel Koshy wrote:
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala, line 224
  https://reviews.apache.org/r/27391/diff/7/?file=779893#file779893line224
 
  If the offset in fact did expire, the assertion itself won't fail - 
  i.e., you will get a NoSuchElementException
  
  Same comments apply to checks below.

Not sure I understand this: When the offset expire the server will return an 
error code with offset set to -1 upon receiving offset fetch request.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review65462
---


On Dec. 2, 2014, 2:03 a.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27391/
 ---
 
 (Updated Dec. 2, 2014, 2:03 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1634
 https://issues.apache.org/jira/browse/KAFKA-1634
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Add another api in offset manager to return the struct, and the cache layer 
 will only read its expiration timestamp while the offset formatter will read 
 the struct as a whole
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
 121e880a941fcd3e6392859edba11a94236494cc 
   
 clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
   
 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  df37fc6d8f0db0b8192a948426af603be3444da4 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 050615c72efe7dbaa4634f53943bd73273d20ffb 
   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
 c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
 4cabffeacea09a49913505db19a96a55d58c0909 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 da29a8cb461099eb675161db2f11a9937424a5c6 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 2a1c0326b6e6966d8b8254bd6a1cb83ad98a3b80 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 1bf7d10cef23a77e71eb16bf6d0e68bc4ebe 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 3c79428962604800983415f6f705e04f52acb8fb 
   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
 cd16ced5465d098be7a60498326b2a98c248f343 
   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
 8c5364fa97da1be09973c176d1baeb339455d319 
 
 Diff: https://reviews.apache.org/r/27391/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 27391: Fix KAFKA-1634

2015-01-07 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review67114
---



clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
https://reviews.apache.org/r/27391/#comment110938

Perhaps these code can just be changed to 

this(groupId, DEFAULT_GENERATION_ID, DEFAULT_CONSUMER_ID, offsetData);



clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
https://reviews.apache.org/r/27391/#comment110939

Same here. These code can just be replaced by forwarding the request to the 
next constructor.



core/src/main/scala/kafka/server/OffsetManager.scala
https://reviews.apache.org/r/27391/#comment111015

Shouldn't we just set the expiration time field to expirationTimestamp, 
instead of taking it from offsetAndMetadata?



core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
https://reviews.apache.org/r/27391/#comment111016

Should we remove the commented out code?


- Jun Rao


On Dec. 2, 2014, 2:03 a.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27391/
 ---
 
 (Updated Dec. 2, 2014, 2:03 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1634
 https://issues.apache.org/jira/browse/KAFKA-1634
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Add another api in offset manager to return the struct, and the cache layer 
 will only read its expiration timestamp while the offset formatter will read 
 the struct as a whole
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
 121e880a941fcd3e6392859edba11a94236494cc 
   
 clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
   
 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  df37fc6d8f0db0b8192a948426af603be3444da4 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 050615c72efe7dbaa4634f53943bd73273d20ffb 
   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
 c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
 4cabffeacea09a49913505db19a96a55d58c0909 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 da29a8cb461099eb675161db2f11a9937424a5c6 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 2a1c0326b6e6966d8b8254bd6a1cb83ad98a3b80 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 1bf7d10cef23a77e71eb16bf6d0e68bc4ebe 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 3c79428962604800983415f6f705e04f52acb8fb 
   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
 cd16ced5465d098be7a60498326b2a98c248f343 
   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
 8c5364fa97da1be09973c176d1baeb339455d319 
 
 Diff: https://reviews.apache.org/r/27391/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 27391: Fix KAFKA-1634

2014-12-18 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review65462
---



clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
https://reviews.apache.org/r/27391/#comment108626

can you add a comment: // only v0, v1 of offsetcommitrequest



clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
https://reviews.apache.org/r/27391/#comment108637

Not introduced by your patch, but it is odd that these are named 
topicResponseObj and partitionResponse below - probably an artifact of 
copy/paste. Can you do a rename here before checking in?



core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
https://reviews.apache.org/r/27391/#comment108638

I think we discussed before that timestamp does not need to be a var. We 
can use the case class copy method to make a copy + edit.



core/src/main/scala/kafka/server/KafkaServer.scala
https://reviews.apache.org/r/27391/#comment108642

Thanks for fixing this



core/src/main/scala/kafka/server/OffsetManager.scala
https://reviews.apache.org/r/27391/#comment108643

I think it would be better to move this to just before the call to 
offsetCommitValue in the loop in line 228. This method should only be 
responsible for taking the offsetAndMetadata and converting that into the 
on-disk bytes and should not concern itself with setting a critical field like 
the expiration timestamp. I was actually looking for where this happens (i.e., 
setting the expiration time) and took me a while to realize it was hidden in 
here.



core/src/main/scala/kafka/server/OffsetManager.scala
https://reviews.apache.org/r/27391/#comment108644

I think we can make this and some other methods here private.



core/src/main/scala/kafka/server/OffsetManager.scala
https://reviews.apache.org/r/27391/#comment108645

private



core/src/main/scala/kafka/server/OffsetManager.scala
https://reviews.apache.org/r/27391/#comment108646

Also, let us use a case class instead of a tuple



core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
https://reviews.apache.org/r/27391/#comment108655

Rather than sleep, we should improve OffsetManager to take in a 
MockScheduler instance - we can pass through the time instance from KafkaServer 
to offsetManager as we do for LogManager and replicaManager. That way we can 
advance time with MockTime. This test will need to change from OffsetCommitTest 
to OffsetManagerTest and we will just test the OffsetManager. Can you file a 
jira for that? Although that would make sense only after you check this in.



core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala
https://reviews.apache.org/r/27391/#comment108647

If the offset in fact did expire, the assertion itself won't fail - i.e., 
you will get a NoSuchElementException

Same comments apply to checks below.


- Joel Koshy


On Dec. 2, 2014, 2:03 a.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27391/
 ---
 
 (Updated Dec. 2, 2014, 2:03 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1634
 https://issues.apache.org/jira/browse/KAFKA-1634
 
 
 Repository: kafka
 
 
 Description
 ---
 
 Add another api in offset manager to return the struct, and the cache layer 
 will only read its expiration timestamp while the offset formatter will read 
 the struct as a whole
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
 121e880a941fcd3e6392859edba11a94236494cc 
   
 clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
   
 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  df37fc6d8f0db0b8192a948426af603be3444da4 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 050615c72efe7dbaa4634f53943bd73273d20ffb 
   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
 c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
 4cabffeacea09a49913505db19a96a55d58c0909 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 da29a8cb461099eb675161db2f11a9937424a5c6 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 2a1c0326b6e6966d8b8254bd6a1cb83ad98a3b80 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 1bf7d10cef23a77e71eb16bf6d0e68bc4ebe 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 3c79428962604800983415f6f705e04f52acb8fb 
   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 

Re: Review Request 27391: Fix KAFKA-1634

2014-12-01 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/
---

(Updated Dec. 1, 2014, 7:44 p.m.)


Review request for kafka.


Bugs: KAFKA-1634
https://issues.apache.org/jira/browse/KAFKA-1634


Repository: kafka


Description (updated)
---

Add another api in offset manager to return the struct, and the cache layer 
will only read its expiration timestamp while the offset formatter will read 
the struct as a whole


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
7517b879866fc5dad5f8d8ad30636da8bbe7784a 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 
3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
  
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 
df37fc6d8f0db0b8192a948426af603be3444da4 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
050615c72efe7dbaa4634f53943bd73273d20ffb 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
4cabffeacea09a49913505db19a96a55d58c0909 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
da29a8cb461099eb675161db2f11a9937424a5c6 
  core/src/main/scala/kafka/server/KafkaApis.scala 
2a1c0326b6e6966d8b8254bd6a1cb83ad98a3b80 
  core/src/main/scala/kafka/server/KafkaServer.scala 
1bf7d10cef23a77e71eb16bf6d0e68bc4ebe 
  core/src/main/scala/kafka/server/OffsetManager.scala 
3c79428962604800983415f6f705e04f52acb8fb 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
cd16ced5465d098be7a60498326b2a98c248f343 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
8c5364fa97da1be09973c176d1baeb339455d319 

Diff: https://reviews.apache.org/r/27391/diff/


Testing
---


Thanks,

Guozhang Wang



Re: Review Request 27391: Fix KAFKA-1634

2014-12-01 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/
---

(Updated Dec. 2, 2014, 2:03 a.m.)


Review request for kafka.


Bugs: KAFKA-1634
https://issues.apache.org/jira/browse/KAFKA-1634


Repository: kafka


Description
---

Add another api in offset manager to return the struct, and the cache layer 
will only read its expiration timestamp while the offset formatter will read 
the struct as a whole


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
7517b879866fc5dad5f8d8ad30636da8bbe7784a 
  clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
121e880a941fcd3e6392859edba11a94236494cc 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 
3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
  
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 
df37fc6d8f0db0b8192a948426af603be3444da4 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
050615c72efe7dbaa4634f53943bd73273d20ffb 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
4cabffeacea09a49913505db19a96a55d58c0909 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
da29a8cb461099eb675161db2f11a9937424a5c6 
  core/src/main/scala/kafka/server/KafkaApis.scala 
2a1c0326b6e6966d8b8254bd6a1cb83ad98a3b80 
  core/src/main/scala/kafka/server/KafkaServer.scala 
1bf7d10cef23a77e71eb16bf6d0e68bc4ebe 
  core/src/main/scala/kafka/server/OffsetManager.scala 
3c79428962604800983415f6f705e04f52acb8fb 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
cd16ced5465d098be7a60498326b2a98c248f343 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
8c5364fa97da1be09973c176d1baeb339455d319 

Diff: https://reviews.apache.org/r/27391/diff/


Testing
---


Thanks,

Guozhang Wang



Re: Review Request 27391: Fix KAFKA-1634

2014-11-24 Thread Joel Koshy


 On Nov. 21, 2014, 2:41 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala, line 17
  https://reviews.apache.org/r/27391/diff/4/?file=766684#file766684line17
 
  I thought we would be going with separate format for on-disk storage?
  
  E.g., one thing that is extremely useful (until we have timestamp as a 
  first-class field of messages) is to have the receive time of the 
  offsetcommit in the stored offset entries. This is very useful for 
  debugging.
 
 Guozhang Wang wrote:
 Yes they are separated: for on-disk storage the timestamp will always 
 been stored, and for wire protocol only v0/1 will contain that value, but for 
 v2 this value will be computed via retention. So the on-disk format is 
 specified as OffsetAndMetadata, and when we deprecating v0/1 and adding the 
 timestmap to message header we will replace this with OffsetMetadata.

What i meant is that for the on-disk format it is useful to have the receive 
time (apart from the expiration time). Right now it seems only one timestamp 
(which is the expiration timestamp is stored).


- Joel


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review62553
---


On Nov. 21, 2014, 10 p.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27391/
 ---
 
 (Updated Nov. 21, 2014, 10 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1634
 https://issues.apache.org/jira/browse/KAFKA-1634
 
 
 Repository: kafka
 
 
 Description
 ---
 
 The timestamp field of OffsetAndMetadata is preserved since we need to be 
 backward compatible with older versions
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
   
 clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
   
 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  df37fc6d8f0db0b8192a948426af603be3444da4 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 050615c72efe7dbaa4634f53943bd73273d20ffb 
   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
 c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
 4cabffeacea09a49913505db19a96a55d58c0909 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 3e1718bc7ca6c835a59ca7c6879f558ec06866ee 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 1bf7d10cef23a77e71eb16bf6d0e68bc4ebe 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 2957bc435102bc4004d8f100dbcdd56287c8ffae 
   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
 cd16ced5465d098be7a60498326b2a98c248f343 
   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
 8c5364fa97da1be09973c176d1baeb339455d319 
 
 Diff: https://reviews.apache.org/r/27391/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 27391: Fix KAFKA-1634

2014-11-21 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review62553
---



core/src/main/scala/kafka/api/OffsetCommitRequest.scala
https://reviews.apache.org/r/27391/#comment104608

`=`



core/src/main/scala/kafka/api/OffsetCommitRequest.scala
https://reviews.apache.org/r/27391/#comment104609

`=`



core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
https://reviews.apache.org/r/27391/#comment104615

I thought we would be going with separate format for on-disk storage?

E.g., one thing that is extremely useful (until we have timestamp as a 
first-class field of messages) is to have the receive time of the offsetcommit 
in the stored offset entries. This is very useful for debugging.



core/src/main/scala/kafka/server/KafkaApis.scala
https://reviews.apache.org/r/27391/#comment104612

I think what you meant earlier was with  v2 you could have different 
timestamps for each partition so a global retention won't work with  v2



core/src/main/scala/kafka/server/OffsetManager.scala
https://reviews.apache.org/r/27391/#comment104613

use



core/src/main/scala/kafka/server/OffsetManager.scala
https://reviews.apache.org/r/27391/#comment104614

This and the above use give compilation warnings. I think it is reasonable 
to copy those constants here to get rid of the warnings since we are anyway 
in an undesirable state right now of maintaining a mirror wire-protocol 
implementation in scala


- Joel Koshy


On Nov. 18, 2014, 1:42 a.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27391/
 ---
 
 (Updated Nov. 18, 2014, 1:42 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1634
 https://issues.apache.org/jira/browse/KAFKA-1634
 
 
 Repository: kafka
 
 
 Description
 ---
 
 The timestamp field of OffsetAndMetadata is preserved since we need to be 
 backward compatible with older versions
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
   
 clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
   
 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  df37fc6d8f0db0b8192a948426af603be3444da4 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 050615c72efe7dbaa4634f53943bd73273d20ffb 
   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
 c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
 4cabffeacea09a49913505db19a96a55d58c0909 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 f476973eeff653473a60c3ecf36e870e386536bc 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 4de812374e8fb1fed834d2be3f9655f55b511a74 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 2957bc435102bc4004d8f100dbcdd56287c8ffae 
   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
 cd16ced5465d098be7a60498326b2a98c248f343 
   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
 8c5364fa97da1be09973c176d1baeb339455d319 
 
 Diff: https://reviews.apache.org/r/27391/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 27391: Fix KAFKA-1634

2014-11-21 Thread Guozhang Wang


 On Nov. 21, 2014, 2:41 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/server/KafkaApis.scala, line 147
  https://reviews.apache.org/r/27391/diff/4/?file=766686#file766686line147
 
  I think what you meant earlier was with  v2 you could have different 
  timestamps for each partition so a global retention won't work with  v2

In the OffsetManager, the timestamp will only be overriden by the global one if 
it is set to Default which is -1, so for v0/1 although the retention is used it 
will not be overriding the timestamp.


 On Nov. 21, 2014, 2:41 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala, line 17
  https://reviews.apache.org/r/27391/diff/4/?file=766684#file766684line17
 
  I thought we would be going with separate format for on-disk storage?
  
  E.g., one thing that is extremely useful (until we have timestamp as a 
  first-class field of messages) is to have the receive time of the 
  offsetcommit in the stored offset entries. This is very useful for 
  debugging.

Yes they are separated: for on-disk storage the timestamp will always been 
stored, and for wire protocol only v0/1 will contain that value, but for v2 
this value will be computed via retention. So the on-disk format is specified 
as OffsetAndMetadata, and when we deprecating v0/1 and adding the timestmap to 
message header we will replace this with OffsetMetadata.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review62553
---


On Nov. 18, 2014, 1:42 a.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27391/
 ---
 
 (Updated Nov. 18, 2014, 1:42 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1634
 https://issues.apache.org/jira/browse/KAFKA-1634
 
 
 Repository: kafka
 
 
 Description
 ---
 
 The timestamp field of OffsetAndMetadata is preserved since we need to be 
 backward compatible with older versions
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
   
 clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
   
 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  df37fc6d8f0db0b8192a948426af603be3444da4 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 050615c72efe7dbaa4634f53943bd73273d20ffb 
   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
 c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
 4cabffeacea09a49913505db19a96a55d58c0909 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 f476973eeff653473a60c3ecf36e870e386536bc 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 4de812374e8fb1fed834d2be3f9655f55b511a74 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 2957bc435102bc4004d8f100dbcdd56287c8ffae 
   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
 cd16ced5465d098be7a60498326b2a98c248f343 
   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
 8c5364fa97da1be09973c176d1baeb339455d319 
 
 Diff: https://reviews.apache.org/r/27391/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 27391: Fix KAFKA-1634

2014-11-21 Thread Guozhang Wang


 On Nov. 21, 2014, 2:41 p.m., Joel Koshy wrote:
  core/src/main/scala/kafka/server/OffsetManager.scala, line 500
  https://reviews.apache.org/r/27391/diff/4/?file=766688#file766688line500
 
  This and the above use give compilation warnings. I think it is 
  reasonable to copy those constants here to get rid of the warnings since 
  we are anyway in an undesirable state right now of maintaining a mirror 
  wire-protocol implementation in scala

Not sure what you mean by copy those contants? I did the copy inside the if 
block and it is the compilation error from the condition.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review62553
---


On Nov. 18, 2014, 1:42 a.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27391/
 ---
 
 (Updated Nov. 18, 2014, 1:42 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1634
 https://issues.apache.org/jira/browse/KAFKA-1634
 
 
 Repository: kafka
 
 
 Description
 ---
 
 The timestamp field of OffsetAndMetadata is preserved since we need to be 
 backward compatible with older versions
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
   
 clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
   
 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  df37fc6d8f0db0b8192a948426af603be3444da4 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 050615c72efe7dbaa4634f53943bd73273d20ffb 
   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
 c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
 4cabffeacea09a49913505db19a96a55d58c0909 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 f476973eeff653473a60c3ecf36e870e386536bc 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 4de812374e8fb1fed834d2be3f9655f55b511a74 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 2957bc435102bc4004d8f100dbcdd56287c8ffae 
   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
 cd16ced5465d098be7a60498326b2a98c248f343 
   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
 8c5364fa97da1be09973c176d1baeb339455d319 
 
 Diff: https://reviews.apache.org/r/27391/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 27391: Fix KAFKA-1634

2014-11-21 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/
---

(Updated Nov. 21, 2014, 10 p.m.)


Review request for kafka.


Bugs: KAFKA-1634
https://issues.apache.org/jira/browse/KAFKA-1634


Repository: kafka


Description
---

The timestamp field of OffsetAndMetadata is preserved since we need to be 
backward compatible with older versions


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
7517b879866fc5dad5f8d8ad30636da8bbe7784a 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 
3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
  
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 
df37fc6d8f0db0b8192a948426af603be3444da4 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
050615c72efe7dbaa4634f53943bd73273d20ffb 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
4cabffeacea09a49913505db19a96a55d58c0909 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
3e1718bc7ca6c835a59ca7c6879f558ec06866ee 
  core/src/main/scala/kafka/server/KafkaApis.scala 
968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 
  core/src/main/scala/kafka/server/KafkaServer.scala 
1bf7d10cef23a77e71eb16bf6d0e68bc4ebe 
  core/src/main/scala/kafka/server/OffsetManager.scala 
2957bc435102bc4004d8f100dbcdd56287c8ffae 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
cd16ced5465d098be7a60498326b2a98c248f343 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
8c5364fa97da1be09973c176d1baeb339455d319 

Diff: https://reviews.apache.org/r/27391/diff/


Testing
---


Thanks,

Guozhang Wang



Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp

2014-11-17 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review61761
---



clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
https://reviews.apache.org/r/27391/#comment103640

Not sure if this will make it worse; would it be clearer to call this 
DEFAULT_TIMESTAMP_V0_V1?



clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
https://reviews.apache.org/r/27391/#comment103719

Made a follow-up comment in the earlier RB. But pasting here as well:

Agree that it is still common in the object but it is completely removed 
from the wire protocol - i.e., OFFSET_COMMIT_REQUEST_PARTITION_V1 which is in 
the V2 OffsetCommitRequest does not have a timestamp.

This method should probably be read as initCommonFieldsInStruct - i.e., 
effectively the wire protocol. That said, I'm loathe to add another init method 
which reads initCommonFieldsInV0AndV1. So I think rather than checking 
fetchPartitionData.timestamp it would be better to explicitly check the 
(already set) request version in the struct. If v0 or v1 then set the timestamp 
key name.



core/src/main/scala/kafka/api/OffsetCommitRequest.scala
https://reviews.apache.org/r/27391/#comment103727

= 2 (or we may forget when we go to version 3)



core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
https://reviews.apache.org/r/27391/#comment103734

I think our coding convention is to use the parameterless form in the 
absence of side-effects



core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
https://reviews.apache.org/r/27391/#comment103735

Similar comment as above.



core/src/main/scala/kafka/server/KafkaApis.scala
https://reviews.apache.org/r/27391/#comment103737

See reply to earlier comment.


- Joel Koshy


On Nov. 8, 2014, 12:54 a.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27391/
 ---
 
 (Updated Nov. 8, 2014, 12:54 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1634
 https://issues.apache.org/jira/browse/KAFKA-1634
 
 
 Repository: kafka
 
 
 Description
 ---
 
 The timestamp field of OffsetAndMetadata is preserved since we need to be 
 backward compatible with older versions
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
   
 clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
   
 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  df37fc6d8f0db0b8192a948426af603be3444da4 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 050615c72efe7dbaa4634f53943bd73273d20ffb 
   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
 c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
 4cabffeacea09a49913505db19a96a55d58c0909 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 fbc680fde21b02f11285a4f4b442987356abd17b 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 4de812374e8fb1fed834d2be3f9655f55b511a74 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 2957bc435102bc4004d8f100dbcdd56287c8ffae 
   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
 cd16ced5465d098be7a60498326b2a98c248f343 
   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
 8c5364fa97da1be09973c176d1baeb339455d319 
 
 Diff: https://reviews.apache.org/r/27391/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp

2014-11-17 Thread Joel Koshy


 On Nov. 7, 2014, 3 a.m., Joel Koshy wrote:
  clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java,
   line 152
  https://reviews.apache.org/r/27391/diff/2/?file=753752#file753752line152
 
  This confused me a bit, and I think it is because initCommonFields is 
  intended to initialize fields common to all versions of the request. It is 
  a useful helper method but it becomes somewhat clunky when removing fields. 
  The partition-level timestamp is no longer a common field.
  
  If this is v2 then we should _never_ set anything in the timestamp 
  field of the struct; and if it is  v2 then we should _always_ set the 
  timestamp field (even if it is the default). However, since the timestamp 
  field in the Field declaration for OFFSET_COMMIT_REQUEST_PARTITION_V0 does 
  not have a default explicitly specified, I think this will break with a 
  SchemaException(missing value...) for offset commit request v0, v1 if we 
  choose to write to a bytebuffer under those versions with this code.
  
  One option is to explicitly pass in the constructor version (0, 1, 2) 
  to initCommonFields and use that to decide whether to include/exclude this 
  field, but that is weird. Another alternative is a separate helper method 
  for v0v1. That is even weirder.
 
 Guozhang Wang wrote:
 Actually, the partition-level timestamp is still a commen field (we are 
 just deprecating it, and chose to not serialize / de-ser in v2). I agree this 
 is a bit wired as it is written in this way, I thought about this when I 
 started the first version but did not come up with a better approach.

Agree that it is still common in the object but it is completely removed from 
the wire protocol - i.e., OFFSET_COMMIT_REQUEST_PARTITION_V1 which is in the V2 
OffsetCommitRequest does not have a timestamp.

This method should probably be read as initCommonFieldsInStruct - i.e., 
effectively the wire protocol. That said, I'm loathe to add another init method 
which reads initCommonFieldsInV0AndV1. So I think rather than checking 
fetchPartitionData.timestamp it would be better to explicitly check the 
(already set) request version in the struct. If v0 or v1 then set the timestamp 
key name.


 On Nov. 7, 2014, 3 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala, line 22
  https://reviews.apache.org/r/27391/diff/2/?file=753755#file753755line22
 
  Can we mark this @Deprecated as well?
  
  We should probably make the primary constructor without timestamp and 
  add a secondary constructor with timestamp and mark deprecated there.
  
  Also, can we use case class.copy if timestamp needs to be modified? 
  However, per comment further down I don't think it needs to be touched.
 
 Guozhang Wang wrote:
 Actually we cannot make it deprecated as it will be preserved even in the 
 new version, right? Note this is not used for the wire protocol but for the 
 cache / disk format.
 
 Guozhang Wang wrote:
 I should say not only for the wire protocol but also for cache disk 
 storage format. And thinking about this twice, I will change to two separate 
 classes, one for wire protocol and one for server storage format.

Yes that is what I was thinking - we should ideally have a separate wire 
protocol and storage format.


 On Nov. 7, 2014, 3 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/server/OffsetManager.scala, line 498
  https://reviews.apache.org/r/27391/diff/2/?file=753758#file753758line498
 
  Actually, since we always set the retention period (for v0, v1) in 
  KafkaApis do we need to even touch this timestamp? i.e., we should 
  basically ignore it right? So we only need to do:
  value.set(VALUE_TIMESTAMP_FIELD, expirationTimestamp).
 
 Guozhang Wang wrote:
 I think note. In v0/v1, if the timestamp is explicitly specified (i.e. 
 not -1) we need to use it as the expiration timestamp, or at least that was 
 how I understood the semantics.
 
 Guozhang Wang wrote:
 I think we cannot not

Right - what I meant was in KafkaApis we can just compute the retentionPeriod 
if v0 or v1.

So if vo/v1 and timestamp = (now + 7 days), then set retention to 7 days.


- Joel


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review60285
---


On Nov. 8, 2014, 12:54 a.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27391/
 ---
 
 (Updated Nov. 8, 2014, 12:54 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1634
 https://issues.apache.org/jira/browse/KAFKA-1634
 
 
 Repository: kafka
 
 
 Description
 ---
 
 The timestamp field of OffsetAndMetadata is preserved since we 

Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp

2014-11-17 Thread Guozhang Wang


 On Nov. 17, 2014, 10:31 p.m., Joel Koshy wrote:
  clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java,
   line 151
  https://reviews.apache.org/r/27391/diff/3/?file=755182#file755182line151
 
  Made a follow-up comment in the earlier RB. But pasting here as well:
  
  Agree that it is still common in the object but it is completely 
  removed from the wire protocol - i.e., OFFSET_COMMIT_REQUEST_PARTITION_V1 
  which is in the V2 OffsetCommitRequest does not have a timestamp.
  
  This method should probably be read as initCommonFieldsInStruct - 
  i.e., effectively the wire protocol. That said, I'm loathe to add another 
  init method which reads initCommonFieldsInV0AndV1. So I think rather than 
  checking fetchPartitionData.timestamp it would be better to explicitly 
  check the (already set) request version in the struct. If v0 or v1 then set 
  the timestamp key name.

The version number info is not in the struct, so we cannot get its value unless 
we are going to add that from the request header into the constructor. But we 
can check if the struct has that field or not. Changed to this accordingly.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review61761
---


On Nov. 8, 2014, 12:54 a.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27391/
 ---
 
 (Updated Nov. 8, 2014, 12:54 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1634
 https://issues.apache.org/jira/browse/KAFKA-1634
 
 
 Repository: kafka
 
 
 Description
 ---
 
 The timestamp field of OffsetAndMetadata is preserved since we need to be 
 backward compatible with older versions
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
   
 clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
   
 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  df37fc6d8f0db0b8192a948426af603be3444da4 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 050615c72efe7dbaa4634f53943bd73273d20ffb 
   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
 c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
 4cabffeacea09a49913505db19a96a55d58c0909 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 fbc680fde21b02f11285a4f4b442987356abd17b 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 4de812374e8fb1fed834d2be3f9655f55b511a74 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 2957bc435102bc4004d8f100dbcdd56287c8ffae 
   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
 cd16ced5465d098be7a60498326b2a98c248f343 
   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
 8c5364fa97da1be09973c176d1baeb339455d319 
 
 Diff: https://reviews.apache.org/r/27391/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp

2014-11-17 Thread Guozhang Wang


 On Nov. 7, 2014, 3 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/server/OffsetManager.scala, line 498
  https://reviews.apache.org/r/27391/diff/2/?file=753758#file753758line498
 
  Actually, since we always set the retention period (for v0, v1) in 
  KafkaApis do we need to even touch this timestamp? i.e., we should 
  basically ignore it right? So we only need to do:
  value.set(VALUE_TIMESTAMP_FIELD, expirationTimestamp).
 
 Guozhang Wang wrote:
 I think note. In v0/v1, if the timestamp is explicitly specified (i.e. 
 not -1) we need to use it as the expiration timestamp, or at least that was 
 how I understood the semantics.
 
 Guozhang Wang wrote:
 I think we cannot not
 
 Joel Koshy wrote:
 Right - what I meant was in KafkaApis we can just compute the 
 retentionPeriod if v0 or v1.
 
 So if vo/v1 and timestamp = (now + 7 days), then set retention to 7 days.

retention is a global parameter which we use later to set the per-message 
timestamp; but if the timestamps from the v0/v1 requests are different then we 
cannot just use a single retention right?


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review60285
---


On Nov. 8, 2014, 12:54 a.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27391/
 ---
 
 (Updated Nov. 8, 2014, 12:54 a.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1634
 https://issues.apache.org/jira/browse/KAFKA-1634
 
 
 Repository: kafka
 
 
 Description
 ---
 
 The timestamp field of OffsetAndMetadata is preserved since we need to be 
 backward compatible with older versions
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
   
 clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
   
 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  df37fc6d8f0db0b8192a948426af603be3444da4 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 050615c72efe7dbaa4634f53943bd73273d20ffb 
   core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
 c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
 4cabffeacea09a49913505db19a96a55d58c0909 
   core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
 fbc680fde21b02f11285a4f4b442987356abd17b 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 4de812374e8fb1fed834d2be3f9655f55b511a74 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 2957bc435102bc4004d8f100dbcdd56287c8ffae 
   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
 cd16ced5465d098be7a60498326b2a98c248f343 
   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
 8c5364fa97da1be09973c176d1baeb339455d319 
 
 Diff: https://reviews.apache.org/r/27391/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 27391: Fix KAFKA-1634

2014-11-17 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/
---

(Updated Nov. 18, 2014, 1:42 a.m.)


Review request for kafka.


Summary (updated)
-

Fix KAFKA-1634


Bugs: KAFKA-1634
https://issues.apache.org/jira/browse/KAFKA-1634


Repository: kafka


Description
---

The timestamp field of OffsetAndMetadata is preserved since we need to be 
backward compatible with older versions


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
7517b879866fc5dad5f8d8ad30636da8bbe7784a 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 
3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
  
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 
df37fc6d8f0db0b8192a948426af603be3444da4 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
050615c72efe7dbaa4634f53943bd73273d20ffb 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
4cabffeacea09a49913505db19a96a55d58c0909 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
f476973eeff653473a60c3ecf36e870e386536bc 
  core/src/main/scala/kafka/server/KafkaApis.scala 
968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 
  core/src/main/scala/kafka/server/KafkaServer.scala 
4de812374e8fb1fed834d2be3f9655f55b511a74 
  core/src/main/scala/kafka/server/OffsetManager.scala 
2957bc435102bc4004d8f100dbcdd56287c8ffae 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
cd16ced5465d098be7a60498326b2a98c248f343 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
8c5364fa97da1be09973c176d1baeb339455d319 

Diff: https://reviews.apache.org/r/27391/diff/


Testing
---


Thanks,

Guozhang Wang



Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp

2014-11-07 Thread Guozhang Wang


 On Nov. 7, 2014, 3 a.m., Joel Koshy wrote:
  clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java,
   line 152
  https://reviews.apache.org/r/27391/diff/2/?file=753752#file753752line152
 
  This confused me a bit, and I think it is because initCommonFields is 
  intended to initialize fields common to all versions of the request. It is 
  a useful helper method but it becomes somewhat clunky when removing fields. 
  The partition-level timestamp is no longer a common field.
  
  If this is v2 then we should _never_ set anything in the timestamp 
  field of the struct; and if it is  v2 then we should _always_ set the 
  timestamp field (even if it is the default). However, since the timestamp 
  field in the Field declaration for OFFSET_COMMIT_REQUEST_PARTITION_V0 does 
  not have a default explicitly specified, I think this will break with a 
  SchemaException(missing value...) for offset commit request v0, v1 if we 
  choose to write to a bytebuffer under those versions with this code.
  
  One option is to explicitly pass in the constructor version (0, 1, 2) 
  to initCommonFields and use that to decide whether to include/exclude this 
  field, but that is weird. Another alternative is a separate helper method 
  for v0v1. That is even weirder.

Actually, the partition-level timestamp is still a commen field (we are just 
deprecating it, and chose to not serialize / de-ser in v2). I agree this is a 
bit wired as it is written in this way, I thought about this when I started the 
first version but did not come up with a better approach.


 On Nov. 7, 2014, 3 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala, line 22
  https://reviews.apache.org/r/27391/diff/2/?file=753755#file753755line22
 
  Can we mark this @Deprecated as well?
  
  We should probably make the primary constructor without timestamp and 
  add a secondary constructor with timestamp and mark deprecated there.
  
  Also, can we use case class.copy if timestamp needs to be modified? 
  However, per comment further down I don't think it needs to be touched.

Actually we cannot make it deprecated as it will be preserved even in the new 
version, right? Note this is not used for the wire protocol but for the cache / 
disk format.


 On Nov. 7, 2014, 3 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/server/OffsetManager.scala, line 498
  https://reviews.apache.org/r/27391/diff/2/?file=753758#file753758line498
 
  Actually, since we always set the retention period (for v0, v1) in 
  KafkaApis do we need to even touch this timestamp? i.e., we should 
  basically ignore it right? So we only need to do:
  value.set(VALUE_TIMESTAMP_FIELD, expirationTimestamp).

I think note. In v0/v1, if the timestamp is explicitly specified (i.e. not -1) 
we need to use it as the expiration timestamp, or at least that was how I 
understood the semantics.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review60285
---


On Nov. 6, 2014, 11:35 p.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27391/
 ---
 
 (Updated Nov. 6, 2014, 11:35 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1634
 https://issues.apache.org/jira/browse/KAFKA-1634
 
 
 Repository: kafka
 
 
 Description
 ---
 
 The timestamp field of OffsetAndMetadata is preserved since we need to be 
 backward compatible with older versions
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
   
 clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
   
 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  df37fc6d8f0db0b8192a948426af603be3444da4 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 050615c72efe7dbaa4634f53943bd73273d20ffb 
   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
 4cabffeacea09a49913505db19a96a55d58c0909 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 4de812374e8fb1fed834d2be3f9655f55b511a74 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 2957bc435102bc4004d8f100dbcdd56287c8ffae 
   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
 cd16ced5465d098be7a60498326b2a98c248f343 
   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
 8c5364fa97da1be09973c176d1baeb339455d319 
 
 Diff: https://reviews.apache.org/r/27391/diff/
 
 
 Testing
 ---
 

Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp

2014-11-07 Thread Guozhang Wang


 On Nov. 7, 2014, 3 a.m., Joel Koshy wrote:
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala, line 22
  https://reviews.apache.org/r/27391/diff/2/?file=753755#file753755line22
 
  Can we mark this @Deprecated as well?
  
  We should probably make the primary constructor without timestamp and 
  add a secondary constructor with timestamp and mark deprecated there.
  
  Also, can we use case class.copy if timestamp needs to be modified? 
  However, per comment further down I don't think it needs to be touched.
 
 Guozhang Wang wrote:
 Actually we cannot make it deprecated as it will be preserved even in the 
 new version, right? Note this is not used for the wire protocol but for the 
 cache / disk format.

I should say not only for the wire protocol but also for cache disk storage 
format. And thinking about this twice, I will change to two separate classes, 
one for wire protocol and one for server storage format.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review60285
---


On Nov. 6, 2014, 11:35 p.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27391/
 ---
 
 (Updated Nov. 6, 2014, 11:35 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1634
 https://issues.apache.org/jira/browse/KAFKA-1634
 
 
 Repository: kafka
 
 
 Description
 ---
 
 The timestamp field of OffsetAndMetadata is preserved since we need to be 
 backward compatible with older versions
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
   
 clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
   
 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  df37fc6d8f0db0b8192a948426af603be3444da4 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 050615c72efe7dbaa4634f53943bd73273d20ffb 
   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
 4cabffeacea09a49913505db19a96a55d58c0909 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 4de812374e8fb1fed834d2be3f9655f55b511a74 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 2957bc435102bc4004d8f100dbcdd56287c8ffae 
   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
 cd16ced5465d098be7a60498326b2a98c248f343 
   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
 8c5364fa97da1be09973c176d1baeb339455d319 
 
 Diff: https://reviews.apache.org/r/27391/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp

2014-11-07 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/
---

(Updated Nov. 8, 2014, 12:54 a.m.)


Review request for kafka.


Bugs: KAFKA-1634
https://issues.apache.org/jira/browse/KAFKA-1634


Repository: kafka


Description
---

The timestamp field of OffsetAndMetadata is preserved since we need to be 
backward compatible with older versions


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
7517b879866fc5dad5f8d8ad30636da8bbe7784a 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 
3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
  
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 
df37fc6d8f0db0b8192a948426af603be3444da4 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
050615c72efe7dbaa4634f53943bd73273d20ffb 
  core/src/main/scala/kafka/api/OffsetFetchRequest.scala 
c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
4cabffeacea09a49913505db19a96a55d58c0909 
  core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
fbc680fde21b02f11285a4f4b442987356abd17b 
  core/src/main/scala/kafka/server/KafkaApis.scala 
968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 
  core/src/main/scala/kafka/server/KafkaServer.scala 
4de812374e8fb1fed834d2be3f9655f55b511a74 
  core/src/main/scala/kafka/server/OffsetManager.scala 
2957bc435102bc4004d8f100dbcdd56287c8ffae 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
cd16ced5465d098be7a60498326b2a98c248f343 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
8c5364fa97da1be09973c176d1baeb339455d319 

Diff: https://reviews.apache.org/r/27391/diff/


Testing
---


Thanks,

Guozhang Wang



Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp

2014-11-06 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/
---

(Updated Nov. 6, 2014, 11:35 p.m.)


Review request for kafka.


Bugs: KAFKA-1634
https://issues.apache.org/jira/browse/KAFKA-1634


Repository: kafka


Description
---

The timestamp field of OffsetAndMetadata is preserved since we need to be 
backward compatible with older versions


Diffs (updated)
-

  clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
7517b879866fc5dad5f8d8ad30636da8bbe7784a 
  
clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 
3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
  
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 
df37fc6d8f0db0b8192a948426af603be3444da4 
  core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
050615c72efe7dbaa4634f53943bd73273d20ffb 
  core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
4cabffeacea09a49913505db19a96a55d58c0909 
  core/src/main/scala/kafka/server/KafkaApis.scala 
968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 
  core/src/main/scala/kafka/server/KafkaServer.scala 
4de812374e8fb1fed834d2be3f9655f55b511a74 
  core/src/main/scala/kafka/server/OffsetManager.scala 
2957bc435102bc4004d8f100dbcdd56287c8ffae 
  core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
cd16ced5465d098be7a60498326b2a98c248f343 
  core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
8c5364fa97da1be09973c176d1baeb339455d319 

Diff: https://reviews.apache.org/r/27391/diff/


Testing
---


Thanks,

Guozhang Wang



Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp

2014-11-06 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review60285
---



clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
https://reviews.apache.org/r/27391/#comment101649

@Deprecated



clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
https://reviews.apache.org/r/27391/#comment101651

This confused me a bit, and I think it is because initCommonFields is 
intended to initialize fields common to all versions of the request. It is a 
useful helper method but it becomes somewhat clunky when removing fields. The 
partition-level timestamp is no longer a common field.

If this is v2 then we should _never_ set anything in the timestamp field of 
the struct; and if it is  v2 then we should _always_ set the timestamp field 
(even if it is the default). However, since the timestamp field in the Field 
declaration for OFFSET_COMMIT_REQUEST_PARTITION_V0 does not have a default 
explicitly specified, I think this will break with a SchemaException(missing 
value...) for offset commit request v0, v1 if we choose to write to a 
bytebuffer under those versions with this code.

One option is to explicitly pass in the constructor version (0, 1, 2) to 
initCommonFields and use that to decide whether to include/exclude this field, 
but that is weird. Another alternative is a separate helper method for v0v1. 
That is even weirder.



clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
https://reviews.apache.org/r/27391/#comment101650

Would help to add a comment This field only exists in v0 and v1



core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
https://reviews.apache.org/r/27391/#comment101657

Can we mark this @Deprecated as well?

We should probably make the primary constructor without timestamp and add a 
secondary constructor with timestamp and mark deprecated there.

Also, can we use case class.copy if timestamp needs to be modified? 
However, per comment further down I don't think it needs to be touched.



core/src/main/scala/kafka/server/OffsetManager.scala
https://reviews.apache.org/r/27391/#comment101658

This was already there, but it would be clearer to use:

filter { case (..., ...) = 
}



core/src/main/scala/kafka/server/OffsetManager.scala
https://reviews.apache.org/r/27391/#comment101659

Found %d expired offsets.



core/src/main/scala/kafka/server/OffsetManager.scala
https://reviews.apache.org/r/27391/#comment101660

Actually, since we always set the retention period (for v0, v1) in 
KafkaApis do we need to even touch this timestamp? i.e., we should basically 
ignore it right? So we only need to do:
value.set(VALUE_TIMESTAMP_FIELD, expirationTimestamp).


- Joel Koshy


On Nov. 6, 2014, 11:35 p.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27391/
 ---
 
 (Updated Nov. 6, 2014, 11:35 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1634
 https://issues.apache.org/jira/browse/KAFKA-1634
 
 
 Repository: kafka
 
 
 Description
 ---
 
 The timestamp field of OffsetAndMetadata is preserved since we need to be 
 backward compatible with older versions
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
   
 clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
   
 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
  df37fc6d8f0db0b8192a948426af603be3444da4 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 050615c72efe7dbaa4634f53943bd73273d20ffb 
   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
 4cabffeacea09a49913505db19a96a55d58c0909 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 
   core/src/main/scala/kafka/server/KafkaServer.scala 
 4de812374e8fb1fed834d2be3f9655f55b511a74 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 2957bc435102bc4004d8f100dbcdd56287c8ffae 
   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
 cd16ced5465d098be7a60498326b2a98c248f343 
   core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 
 8c5364fa97da1be09973c176d1baeb339455d319 
 
 Diff: https://reviews.apache.org/r/27391/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp

2014-11-04 Thread Joel Koshy

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27391/#review59839
---



clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
https://reviews.apache.org/r/27391/#comment101209

Would Long.MaxValue be a better default?



clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
https://reviews.apache.org/r/27391/#comment101137

I think we should mark this with @deprecated



clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
https://reviews.apache.org/r/27391/#comment101138

This can also be marked @deprecated and we can add a new constructor 
without timestamp and initialize to -1 there



clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
https://reviews.apache.org/r/27391/#comment101121

can you add @param for the new parameter? for consistency only - the 
existing javadoc is pretty pointless anyway



clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
https://reviews.apache.org/r/27391/#comment101124

If uninitialized it would be zero.

However, see comment on deprecating the constructor above.



core/src/main/scala/kafka/api/OffsetCommitRequest.scala
https://reviews.apache.org/r/27391/#comment101198

val retentionMs = if (versionId == 2) {
...
} else {
...
}



core/src/main/scala/kafka/common/OffsetMetadataAndError.scala
https://reviews.apache.org/r/27391/#comment101127

Rather than making this a var it would be preferrable to use 
offsetAndMetadataInstance.copy(timstamp=new val)

However, I don't think we need to modify the timestamp - see comment in 
KafkaApis



core/src/main/scala/kafka/server/KafkaApis.scala
https://reviews.apache.org/r/27391/#comment101166

As long as we do support v0 and v1, since we are changing the semantics of 
the _stored_ timestamp in the log I think it is reasonable to use the 
OffsetManager.retentionPeriod config here (which btw should be deprecated).

i.e., if v0  or v1 request is received, pass in 
OffsetManager.retentionPeriod config. If it is v2 format, then just pass in 
retentionMs untouched.



core/src/main/scala/kafka/server/OffsetManager.scala
https://reviews.apache.org/r/27391/#comment101164

Can be marked @deprecated and we can get rid of it when we get rid of v0,v1



core/src/main/scala/kafka/server/OffsetManager.scala
https://reviews.apache.org/r/27391/#comment101163

This will break - we now need to check that the absolute timestamp in the 
offset metadata has passed.



core/src/main/scala/kafka/server/OffsetManager.scala
https://reviews.apache.org/r/27391/#comment101150

val expirationTimestamp



core/src/main/scala/kafka/server/OffsetManager.scala
https://reviews.apache.org/r/27391/#comment101149

Expiration time of the offset



core/src/main/scala/kafka/server/OffsetManager.scala
https://reviews.apache.org/r/27391/#comment101204

expirationTimestamp would be clearer



core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
https://reviews.apache.org/r/27391/#comment101140

Can we also enhance the OffsetCommitTest to do offset commits with v0, v1 
and v2? i.e., test that the latest broker can handle all three versions 
correctly.


- Joel Koshy


On Oct. 30, 2014, 6:43 p.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/27391/
 ---
 
 (Updated Oct. 30, 2014, 6:43 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-1634
 https://issues.apache.org/jira/browse/KAFKA-1634
 
 
 Repository: kafka
 
 
 Description
 ---
 
 The timestamp field of OffsetAndMetadata is preserved since we need to be 
 backward compatible with older versions
 
 
 Diffs
 -
 
   clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 
 7517b879866fc5dad5f8d8ad30636da8bbe7784a 
   
 clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
   core/src/main/scala/kafka/api/OffsetCommitRequest.scala 
 050615c72efe7dbaa4634f53943bd73273d20ffb 
   core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 
 4cabffeacea09a49913505db19a96a55d58c0909 
   core/src/main/scala/kafka/server/KafkaApis.scala 
 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 
   core/src/main/scala/kafka/server/OffsetManager.scala 
 2957bc435102bc4004d8f100dbcdd56287c8ffae 
   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
 cd16ced5465d098be7a60498326b2a98c248f343 
 
 Diff: https://reviews.apache.org/r/27391/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang