Re: Review Request 27391: Fix KAFKA-1634
On Feb. 4, 2015, 2:15 a.m., Joel Koshy wrote: core/src/main/scala/kafka/api/OffsetCommitRequest.scala, line 48 https://reviews.apache.org/r/27391/diff/11/?file=832423#file832423line48 I our convention is to include the if in the previous line. Guozhang Wang wrote: I checked the code base and it seems we do not have a consensus here.. and personally I would prefer this as it actually make the logic clearer. We don't have a formal convention here but I think we should and incorporate it into our coding guidelines. The problem with a separate line is that at first glance (especially with just two character indentation) it does not seem to be associated with the assignment. Also, most current occurrences put the if on the same line. ``` find . -name *.scala -exec pcregrep -c '=(\s)*if' {} \; | grep -v 0 | paste -s -d+ | bc 61 find . -name *.scala -exec pcregrep -Mc '=(\s)*\n(\s)*if' {} \; | grep -v 0 | paste -s -d+ | bc 36 ``` On Feb. 4, 2015, 2:15 a.m., Joel Koshy wrote: core/src/main/scala/kafka/common/OffsetMetadataAndError.scala, line 36 https://reviews.apache.org/r/27391/diff/11/?file=832425#file832425line36 (This is also a public API change - although you did add an Object wrapper further down that comes close to the original API.) Guozhang Wang wrote: I think the wrapper MessageAndMetadata preserves the existing public API right? You mean the wrapper object? It comes close, but not quite - since you can instantiate a case class with a `new` keyword or without. You need it for the secondary constructors of the case class. With the object wrapper we assume that the objects were being constructed without the new. I don't know how many people actually used it though, but it was part of the public API since you would need to create those objects to form an OffsetCommitRequest. On Feb. 4, 2015, 2:15 a.m., Joel Koshy wrote: core/src/main/scala/kafka/server/KafkaApis.scala, line 163 https://reviews.apache.org/r/27391/diff/11/?file=832427#file832427line163 Shouldn't the commit timestamp _always_ be set to the current time? What I was thinking is this: If v0: - An explicit timestamp is provided only to override the v0 default retention which is add the server-side retention to the current timestamp. The (true) commit timestamp - i.e., receive time is useful for debugging purposes. So if an explicit timestamp is provided in v0 then use that to compute the absolute expire timestamp which will be the given commit timestamp; so you would store (commitTimestamp = now; expireTimestamp = given commitTimeStamp); if v0 and commit timestamp is default, then you would store (commitTimestamp = now, expireTimestamp = now + offsetRetention) - if v1: (commitTimestamp = now, expireTimestamp = now + offsetRetention) This way, you should have correct expiration behavior for v0, v1 and v2 and at the same time have the true commit timestamp - i.e., the receive time at the broker which is useful for debugging. (also see comment in OffsetManager) Guozhang Wang wrote: In v0/v1, the commit timestamp can be specified as a future timestamp so the expiration timestamp = commit timestamp + retention (in v0/v1 it is always the default value). This behavior should not be respected, i.e. offsets already stored in v0 and v1 format should be expired correctly using 0.8.2 code. Details can be found in Jun's comments and my replies. I don't think we are on the same page here. Let's discuss offline to follow-up. On Feb. 4, 2015, 2:15 a.m., Joel Koshy wrote: core/src/main/scala/kafka/server/OffsetManager.scala, line 65 https://reviews.apache.org/r/27391/diff/11/?file=832430#file832430line65 Should we call this maxOffsetRetentionMs instead? Guozhang Wang wrote: Not exactly, as it is just the default offset retention, not the upper limit: users can specify a value larger than this default and it will still be accepted. Yes you are right. - Joel --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review70836 --- On Feb. 6, 2015, 7:01 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Feb. 6, 2015, 7:01 p.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- Incorporated Joel's comments Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 09a6f11163ecb1e733c604ade04646e83bbc0c85
Re: Review Request 27391: Fix KAFKA-1634
On Feb. 4, 2015, 2:15 a.m., Joel Koshy wrote: core/src/main/scala/kafka/api/OffsetCommitRequest.scala, line 48 https://reviews.apache.org/r/27391/diff/11/?file=832423#file832423line48 I our convention is to include the if in the previous line. I checked the code base and it seems we do not have a consensus here.. and personally I would prefer this as it actually make the logic clearer. On Feb. 4, 2015, 2:15 a.m., Joel Koshy wrote: core/src/main/scala/kafka/common/OffsetMetadataAndError.scala, line 36 https://reviews.apache.org/r/27391/diff/11/?file=832425#file832425line36 (This is also a public API change - although you did add an Object wrapper further down that comes close to the original API.) I think the wrapper MessageAndMetadata preserves the existing public API right? On Feb. 4, 2015, 2:15 a.m., Joel Koshy wrote: core/src/main/scala/kafka/server/OffsetManager.scala, line 65 https://reviews.apache.org/r/27391/diff/11/?file=832430#file832430line65 Should we call this maxOffsetRetentionMs instead? Not exactly, as it is just the default offset retention, not the upper limit: users can specify a value larger than this default and it will still be accepted. On Feb. 4, 2015, 2:15 a.m., Joel Koshy wrote: core/src/main/scala/kafka/server/KafkaApis.scala, line 163 https://reviews.apache.org/r/27391/diff/11/?file=832427#file832427line163 Shouldn't the commit timestamp _always_ be set to the current time? What I was thinking is this: If v0: - An explicit timestamp is provided only to override the v0 default retention which is add the server-side retention to the current timestamp. The (true) commit timestamp - i.e., receive time is useful for debugging purposes. So if an explicit timestamp is provided in v0 then use that to compute the absolute expire timestamp which will be the given commit timestamp; so you would store (commitTimestamp = now; expireTimestamp = given commitTimeStamp); if v0 and commit timestamp is default, then you would store (commitTimestamp = now, expireTimestamp = now + offsetRetention) - if v1: (commitTimestamp = now, expireTimestamp = now + offsetRetention) This way, you should have correct expiration behavior for v0, v1 and v2 and at the same time have the true commit timestamp - i.e., the receive time at the broker which is useful for debugging. (also see comment in OffsetManager) In v0/v1, the commit timestamp can be specified as a future timestamp so the expiration timestamp = commit timestamp + retention (in v0/v1 it is always the default value). This behavior should not be respected, i.e. offsets already stored in v0 and v1 format should be expired correctly using 0.8.2 code. Details can be found in Jun's comments and my replies. On Feb. 4, 2015, 2:15 a.m., Joel Koshy wrote: core/src/main/scala/kafka/server/OffsetManager.scala, line 557 https://reviews.apache.org/r/27391/diff/11/?file=832430#file832430line557 follow-up from above comment... and here you would set: commitTimestamp = timestamp expireTimestamp = timestamp So do you think this would work overall? I could be wrong - this patch has proven to be much trickier than we originally thought. See the comments above. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review70836 --- On Jan. 24, 2015, 12:06 a.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Jan. 24, 2015, 12:06 a.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- Incorporated Jun's comments Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909
Re: Review Request 27391: Fix KAFKA-1634
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Feb. 6, 2015, 7:01 p.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description (updated) --- Incorporated Joel's comments Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 09a6f11163ecb1e733c604ade04646e83bbc0c85 clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 101f382170ad6740b3f8ff2d27b93a64874a857f clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java ff89f0e37d5fa787b0218eff86d169aaeae2107b clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 94e9d376235b3288836807d8e8d2547b3743aad5 clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java 13237fd72da5448a3d596b882fef141f336f827d core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 5487259751ebe19f137948249aa1fd2637d2deb4 core/src/main/scala/kafka/server/KafkaApis.scala f2b027bf944e735fd52cc282690ec1b8395f9290 core/src/main/scala/kafka/server/KafkaConfig.scala 6d74983472249eac808d361344c58cc2858ec971 core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4 core/src/main/scala/kafka/server/OffsetManager.scala 0bdd42fea931cddd072c0fff765b10526db6840a core/src/main/scala/kafka/server/ReplicaManager.scala fb948b9ab28c516e81dab14dcbe211dcd99842b6 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala a1f72f8c2042ff2a43af503b2e06f84706dad9db core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 5b93239cdc26b5be7696f4e7863adb9fbe5f0ed5 core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ba1e48e4300c9fb32e36e7266cb05294f2a481e5 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review70836 --- clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java https://reviews.apache.org/r/27391/#comment116277 remove both clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java https://reviews.apache.org/r/27391/#comment116310 Should we revert this rename since this is part of the public API? I would be surprised if people are using it though - but still. core/src/main/scala/kafka/api/OffsetCommitRequest.scala https://reviews.apache.org/r/27391/#comment116314 I our convention is to include the if in the previous line. core/src/main/scala/kafka/api/OffsetCommitRequest.scala https://reviews.apache.org/r/27391/#comment116315 same here core/src/main/scala/kafka/api/OffsetCommitRequest.scala https://reviews.apache.org/r/27391/#comment116316 and here core/src/main/scala/kafka/api/OffsetCommitRequest.scala https://reviews.apache.org/r/27391/#comment116317 and here core/src/main/scala/kafka/api/OffsetFetchRequest.scala https://reviews.apache.org/r/27391/#comment116318 Can you run organize imports? Some of these seem redundant/unnecessary. core/src/main/scala/kafka/common/OffsetMetadataAndError.scala https://reviews.apache.org/r/27391/#comment116320 (This is also a public API change - although you did add an Object wrapper further down that comes close to the original API.) core/src/main/scala/kafka/server/KafkaApis.scala https://reviews.apache.org/r/27391/#comment116322 Shouldn't the commit timestamp _always_ be set to the current time? What I was thinking is this: If v0: - An explicit timestamp is provided only to override the v0 default retention which is add the server-side retention to the current timestamp. The (true) commit timestamp - i.e., receive time is useful for debugging purposes. So if an explicit timestamp is provided in v0 then use that to compute the absolute expire timestamp which will be the given commit timestamp; so you would store (commitTimestamp = now; expireTimestamp = given commitTimeStamp); if v0 and commit timestamp is default, then you would store (commitTimestamp = now, expireTimestamp = now + offsetRetention) - if v1: (commitTimestamp = now, expireTimestamp = now + offsetRetention) This way, you should have correct expiration behavior for v0, v1 and v2 and at the same time have the true commit timestamp - i.e., the receive time at the broker which is useful for debugging. (also see comment in OffsetManager) core/src/main/scala/kafka/server/OffsetManager.scala https://reviews.apache.org/r/27391/#comment116374 Should we call this maxOffsetRetentionMs instead? core/src/main/scala/kafka/server/OffsetManager.scala https://reviews.apache.org/r/27391/#comment116372 follow-up from above comment... and here you would set: commitTimestamp = timestamp expireTimestamp = timestamp So do you think this would work overall? I could be wrong - this patch has proven to be much trickier than we originally thought. - Joel Koshy On Jan. 24, 2015, 12:06 a.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Jan. 24, 2015, 12:06 a.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- Incorporated Jun's comments Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 5487259751ebe19f137948249aa1fd2637d2deb4 core/src/main/scala/kafka/server/KafkaApis.scala ec8d9f7ba44741db40875458bd524c4062ad6a26 core/src/main/scala/kafka/server/KafkaConfig.scala 6d74983472249eac808d361344c58cc2858ec971
Re: Review Request 27391: Fix KAFKA-1634
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Jan. 24, 2015, 12:06 a.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- Incorporated Jun's comments Diffs (updated) - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 5487259751ebe19f137948249aa1fd2637d2deb4 core/src/main/scala/kafka/server/KafkaApis.scala ec8d9f7ba44741db40875458bd524c4062ad6a26 core/src/main/scala/kafka/server/KafkaConfig.scala 6d74983472249eac808d361344c58cc2858ec971 core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4 core/src/main/scala/kafka/server/OffsetManager.scala 0bdd42fea931cddd072c0fff765b10526db6840a core/src/main/scala/kafka/server/ReplicaManager.scala e58fbb922e93b0c31dff04f187fcadb4ece986d7 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 5b93239cdc26b5be7696f4e7863adb9fbe5f0ed5 core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ba1e48e4300c9fb32e36e7266cb05294f2a481e5 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634
On Jan. 22, 2015, 1:56 a.m., Jun Rao wrote: core/src/main/scala/kafka/server/KafkaApis.scala, lines 145-166 https://reviews.apache.org/r/27391/diff/9/?file=829147#file829147line145 I am not sure that we should change the timestamp for offsets produced in V0 and V1. There could be data in the offset topic already written by 0.8.2 code. See the other comment in OffsetManager on expiring. I think if it (the commit timestamp) is set to default value -1, we should override it according to the wiki: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetCommitRequest Otherwise it should not be overriden. On Jan. 22, 2015, 1:56 a.m., Jun Rao wrote: core/src/main/scala/kafka/server/OffsetManager.scala, lines 121-123 https://reviews.apache.org/r/27391/diff/9/?file=829150#file829150line121 Does that change work correctly with offsets already stored in v0 and v1 format using 0.8.2 code? Would those offsets still be expired at the right time? Changed the logic of overriding commit / expire timestamps as the following: 1. If version = 1 or retention time is default (-1) override retention time to server default value. 2. If the original time stamp (i.e. the commit timestamp) is set to default (-1), override to the current time. 3. After 2) is done, compute the expire time to be commit timestamp + retention time. 4. Hence the above logic of checking expiration will be compatible (i.e. expiration time now). - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review69106 --- On Jan. 22, 2015, 12:43 a.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Jan. 22, 2015, 12:43 a.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- Incorporated Joel's comments Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 191a8677444e53b043e9ad6e94c5a9191c32599e core/src/main/scala/kafka/server/KafkaApis.scala ec8d9f7ba44741db40875458bd524c4062ad6a26 core/src/main/scala/kafka/server/KafkaConfig.scala 6d74983472249eac808d361344c58cc2858ec971 core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4 core/src/main/scala/kafka/server/OffsetManager.scala 0bdd42fea931cddd072c0fff765b10526db6840a core/src/main/scala/kafka/server/ReplicaManager.scala e58fbb922e93b0c31dff04f187fcadb4ece986d7 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 5b93239cdc26b5be7696f4e7863adb9fbe5f0ed5 core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ba1e48e4300c9fb32e36e7266cb05294f2a481e5 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Jan. 23, 2015, 2:47 a.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description (updated) --- Incorporated Jun's comments Diffs (updated) - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 5487259751ebe19f137948249aa1fd2637d2deb4 core/src/main/scala/kafka/server/KafkaApis.scala ec8d9f7ba44741db40875458bd524c4062ad6a26 core/src/main/scala/kafka/server/KafkaConfig.scala 6d74983472249eac808d361344c58cc2858ec971 core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4 core/src/main/scala/kafka/server/OffsetManager.scala 0bdd42fea931cddd072c0fff765b10526db6840a core/src/main/scala/kafka/server/ReplicaManager.scala e58fbb922e93b0c31dff04f187fcadb4ece986d7 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 5b93239cdc26b5be7696f4e7863adb9fbe5f0ed5 core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ba1e48e4300c9fb32e36e7266cb05294f2a481e5 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634
On Jan. 20, 2015, 4:35 p.m., Joel Koshy wrote: core/src/main/scala/kafka/server/KafkaApis.scala, line 147 https://reviews.apache.org/r/27391/diff/8/?file=822017#file822017line147 I just realized that if we have a v0 or v1 request then we use the offset manager default retention which is one day. However, if it is v2 and the user does not override it in the offset commit request, then the retention defaults to Long.MaxValue. I think that default makes sense for OffsetCommitRequest. However, I think the broker needs to protect itself and have an upper threshold for retention. i.e., maybe we should have a maxRetentionMs config in the broker. What do you think? Agreed, I change the behavior to be use the default value if it is v2 or if the retention period is default value (meaning user did not specify it). - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review68729 --- On Jan. 14, 2015, 11:50 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Jan. 14, 2015, 11:50 p.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- Incorporated Joel and Jun's comments Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 191a8677444e53b043e9ad6e94c5a9191c32599e core/src/main/scala/kafka/server/KafkaApis.scala c011a1b79bd6c4e832fe7d097daacb0d647d1cd4 core/src/main/scala/kafka/server/KafkaServer.scala a069eb9272c92ef62387304b60de1fe473d7ff49 core/src/main/scala/kafka/server/OffsetManager.scala 3c79428962604800983415f6f705e04f52acb8fb core/src/main/scala/kafka/server/ReplicaManager.scala e58fbb922e93b0c31dff04f187fcadb4ece986d7 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 4a3a5b264a021e55c39f4d7424ce04ee591503ef core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ba1e48e4300c9fb32e36e7266cb05294f2a481e5 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Jan. 22, 2015, 12:43 a.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description (updated) --- Incorporated Joel's comments Diffs (updated) - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 191a8677444e53b043e9ad6e94c5a9191c32599e core/src/main/scala/kafka/server/KafkaApis.scala ec8d9f7ba44741db40875458bd524c4062ad6a26 core/src/main/scala/kafka/server/KafkaConfig.scala 6d74983472249eac808d361344c58cc2858ec971 core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4 core/src/main/scala/kafka/server/OffsetManager.scala 0bdd42fea931cddd072c0fff765b10526db6840a core/src/main/scala/kafka/server/ReplicaManager.scala e58fbb922e93b0c31dff04f187fcadb4ece986d7 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 5b93239cdc26b5be7696f4e7863adb9fbe5f0ed5 core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ba1e48e4300c9fb32e36e7266cb05294f2a481e5 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review69106 --- Thanks for the patch. A few more comments. clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java https://reviews.apache.org/r/27391/#comment113727 Would it be better to use -1L as the default retention time? MAX_VALUE could be useful for the case when a client wants the offset never to be expired. core/src/main/scala/kafka/api/OffsetCommitRequest.scala https://reviews.apache.org/r/27391/#comment113724 It seems that our coding convention has been not to use {} on a single line in the body. So, we use if () do sth instead of if () { do sth } core/src/main/scala/kafka/server/KafkaApis.scala https://reviews.apache.org/r/27391/#comment113730 I am not sure that we should change the timestamp for offsets produced in V0 and V1. There could be data in the offset topic already written by 0.8.2 code. See the other comment in OffsetManager on expiring. core/src/main/scala/kafka/server/OffsetManager.scala https://reviews.apache.org/r/27391/#comment113729 Does that change work correctly with offsets already stored in v0 and v1 format using 0.8.2 code? Would those offsets still be expired at the right time? - Jun Rao On Jan. 22, 2015, 12:43 a.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Jan. 22, 2015, 12:43 a.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- Incorporated Joel's comments Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 191a8677444e53b043e9ad6e94c5a9191c32599e core/src/main/scala/kafka/server/KafkaApis.scala ec8d9f7ba44741db40875458bd524c4062ad6a26 core/src/main/scala/kafka/server/KafkaConfig.scala 6d74983472249eac808d361344c58cc2858ec971 core/src/main/scala/kafka/server/KafkaServer.scala 89200da30a04943f0b9befe84ab17e62b747c8c4 core/src/main/scala/kafka/server/OffsetManager.scala 0bdd42fea931cddd072c0fff765b10526db6840a core/src/main/scala/kafka/server/ReplicaManager.scala e58fbb922e93b0c31dff04f187fcadb4ece986d7 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 5b93239cdc26b5be7696f4e7863adb9fbe5f0ed5 core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ba1e48e4300c9fb32e36e7266cb05294f2a481e5 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review68729 --- core/src/main/scala/kafka/server/KafkaApis.scala https://reviews.apache.org/r/27391/#comment113113 I just realized that if we have a v0 or v1 request then we use the offset manager default retention which is one day. However, if it is v2 and the user does not override it in the offset commit request, then the retention defaults to Long.MaxValue. I think that default makes sense for OffsetCommitRequest. However, I think the broker needs to protect itself and have an upper threshold for retention. i.e., maybe we should have a maxRetentionMs config in the broker. What do you think? core/src/main/scala/kafka/server/KafkaApis.scala https://reviews.apache.org/r/27391/#comment113111 if it is _after_ v2 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala https://reviews.apache.org/r/27391/#comment113114 This file needs to be rebased. - Joel Koshy On Jan. 14, 2015, 11:50 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Jan. 14, 2015, 11:50 p.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- Incorporated Joel and Jun's comments Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 191a8677444e53b043e9ad6e94c5a9191c32599e core/src/main/scala/kafka/server/KafkaApis.scala c011a1b79bd6c4e832fe7d097daacb0d647d1cd4 core/src/main/scala/kafka/server/KafkaServer.scala a069eb9272c92ef62387304b60de1fe473d7ff49 core/src/main/scala/kafka/server/OffsetManager.scala 3c79428962604800983415f6f705e04f52acb8fb core/src/main/scala/kafka/server/ReplicaManager.scala e58fbb922e93b0c31dff04f187fcadb4ece986d7 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 4a3a5b264a021e55c39f4d7424ce04ee591503ef core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ba1e48e4300c9fb32e36e7266cb05294f2a481e5 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634
On Jan. 8, 2015, 2:24 a.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java, lines 79-85 https://reviews.apache.org/r/27391/diff/1/?file=743505#file743505line79 Perhaps these code can just be changed to this(groupId, DEFAULT_GENERATION_ID, DEFAULT_CONSUMER_ID, offsetData); This cannot be forwarded as in super() call we need to specify the version id. On Jan. 8, 2015, 2:24 a.m., Jun Rao wrote: clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java, lines 97-106 https://reviews.apache.org/r/27391/diff/1/?file=743505#file743505line97 Same here. These code can just be replaced by forwarding the request to the next constructor. Ditto above. On Jan. 8, 2015, 2:24 a.m., Jun Rao wrote: core/src/main/scala/kafka/server/OffsetManager.scala, lines 524-525 https://reviews.apache.org/r/27391/diff/7/?file=779891#file779891line524 Shouldn't we just set the expiration time field to expirationTimestamp, instead of taking it from offsetAndMetadata? For v0/1, we should just take the value of the offsetAndMetadata.timestamp, for v2 we will take the value of expirationTimestamp. This has been changed in the latest patch where offsetAndMetadata.timestamp is updated accordingly before calling offsetCommitValue(). - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review67114 --- On Dec. 2, 2014, 2:03 a.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Dec. 2, 2014, 2:03 a.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- Add another api in offset manager to return the struct, and the cache layer will only read its expiration timestamp while the offset formatter will read the struct as a whole Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala da29a8cb461099eb675161db2f11a9937424a5c6 core/src/main/scala/kafka/server/KafkaApis.scala 2a1c0326b6e6966d8b8254bd6a1cb83ad98a3b80 core/src/main/scala/kafka/server/KafkaServer.scala 1bf7d10cef23a77e71eb16bf6d0e68bc4ebe core/src/main/scala/kafka/server/OffsetManager.scala 3c79428962604800983415f6f705e04f52acb8fb core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Jan. 14, 2015, 11:50 p.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description (updated) --- Incorporated Joel and Jun's comments Diffs (updated) - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 191a8677444e53b043e9ad6e94c5a9191c32599e core/src/main/scala/kafka/server/KafkaApis.scala c011a1b79bd6c4e832fe7d097daacb0d647d1cd4 core/src/main/scala/kafka/server/KafkaServer.scala a069eb9272c92ef62387304b60de1fe473d7ff49 core/src/main/scala/kafka/server/OffsetManager.scala 3c79428962604800983415f6f705e04f52acb8fb core/src/main/scala/kafka/server/ReplicaManager.scala e58fbb922e93b0c31dff04f187fcadb4ece986d7 core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 4a3a5b264a021e55c39f4d7424ce04ee591503ef core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ba1e48e4300c9fb32e36e7266cb05294f2a481e5 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634
On Dec. 18, 2014, 8:42 a.m., Joel Koshy wrote: core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala, line 224 https://reviews.apache.org/r/27391/diff/7/?file=779893#file779893line224 If the offset in fact did expire, the assertion itself won't fail - i.e., you will get a NoSuchElementException Same comments apply to checks below. Not sure I understand this: When the offset expire the server will return an error code with offset set to -1 upon receiving offset fetch request. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review65462 --- On Dec. 2, 2014, 2:03 a.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Dec. 2, 2014, 2:03 a.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- Add another api in offset manager to return the struct, and the cache layer will only read its expiration timestamp while the offset formatter will read the struct as a whole Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala da29a8cb461099eb675161db2f11a9937424a5c6 core/src/main/scala/kafka/server/KafkaApis.scala 2a1c0326b6e6966d8b8254bd6a1cb83ad98a3b80 core/src/main/scala/kafka/server/KafkaServer.scala 1bf7d10cef23a77e71eb16bf6d0e68bc4ebe core/src/main/scala/kafka/server/OffsetManager.scala 3c79428962604800983415f6f705e04f52acb8fb core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review67114 --- clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java https://reviews.apache.org/r/27391/#comment110938 Perhaps these code can just be changed to this(groupId, DEFAULT_GENERATION_ID, DEFAULT_CONSUMER_ID, offsetData); clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java https://reviews.apache.org/r/27391/#comment110939 Same here. These code can just be replaced by forwarding the request to the next constructor. core/src/main/scala/kafka/server/OffsetManager.scala https://reviews.apache.org/r/27391/#comment111015 Shouldn't we just set the expiration time field to expirationTimestamp, instead of taking it from offsetAndMetadata? core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala https://reviews.apache.org/r/27391/#comment111016 Should we remove the commented out code? - Jun Rao On Dec. 2, 2014, 2:03 a.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Dec. 2, 2014, 2:03 a.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- Add another api in offset manager to return the struct, and the cache layer will only read its expiration timestamp while the offset formatter will read the struct as a whole Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala da29a8cb461099eb675161db2f11a9937424a5c6 core/src/main/scala/kafka/server/KafkaApis.scala 2a1c0326b6e6966d8b8254bd6a1cb83ad98a3b80 core/src/main/scala/kafka/server/KafkaServer.scala 1bf7d10cef23a77e71eb16bf6d0e68bc4ebe core/src/main/scala/kafka/server/OffsetManager.scala 3c79428962604800983415f6f705e04f52acb8fb core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review65462 --- clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java https://reviews.apache.org/r/27391/#comment108626 can you add a comment: // only v0, v1 of offsetcommitrequest clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java https://reviews.apache.org/r/27391/#comment108637 Not introduced by your patch, but it is odd that these are named topicResponseObj and partitionResponse below - probably an artifact of copy/paste. Can you do a rename here before checking in? core/src/main/scala/kafka/common/OffsetMetadataAndError.scala https://reviews.apache.org/r/27391/#comment108638 I think we discussed before that timestamp does not need to be a var. We can use the case class copy method to make a copy + edit. core/src/main/scala/kafka/server/KafkaServer.scala https://reviews.apache.org/r/27391/#comment108642 Thanks for fixing this core/src/main/scala/kafka/server/OffsetManager.scala https://reviews.apache.org/r/27391/#comment108643 I think it would be better to move this to just before the call to offsetCommitValue in the loop in line 228. This method should only be responsible for taking the offsetAndMetadata and converting that into the on-disk bytes and should not concern itself with setting a critical field like the expiration timestamp. I was actually looking for where this happens (i.e., setting the expiration time) and took me a while to realize it was hidden in here. core/src/main/scala/kafka/server/OffsetManager.scala https://reviews.apache.org/r/27391/#comment108644 I think we can make this and some other methods here private. core/src/main/scala/kafka/server/OffsetManager.scala https://reviews.apache.org/r/27391/#comment108645 private core/src/main/scala/kafka/server/OffsetManager.scala https://reviews.apache.org/r/27391/#comment108646 Also, let us use a case class instead of a tuple core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala https://reviews.apache.org/r/27391/#comment108655 Rather than sleep, we should improve OffsetManager to take in a MockScheduler instance - we can pass through the time instance from KafkaServer to offsetManager as we do for LogManager and replicaManager. That way we can advance time with MockTime. This test will need to change from OffsetCommitTest to OffsetManagerTest and we will just test the OffsetManager. Can you file a jira for that? Although that would make sense only after you check this in. core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala https://reviews.apache.org/r/27391/#comment108647 If the offset in fact did expire, the assertion itself won't fail - i.e., you will get a NoSuchElementException Same comments apply to checks below. - Joel Koshy On Dec. 2, 2014, 2:03 a.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Dec. 2, 2014, 2:03 a.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- Add another api in offset manager to return the struct, and the cache layer will only read its expiration timestamp while the offset formatter will read the struct as a whole Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala da29a8cb461099eb675161db2f11a9937424a5c6 core/src/main/scala/kafka/server/KafkaApis.scala 2a1c0326b6e6966d8b8254bd6a1cb83ad98a3b80 core/src/main/scala/kafka/server/KafkaServer.scala 1bf7d10cef23a77e71eb16bf6d0e68bc4ebe core/src/main/scala/kafka/server/OffsetManager.scala 3c79428962604800983415f6f705e04f52acb8fb core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
Re: Review Request 27391: Fix KAFKA-1634
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Dec. 1, 2014, 7:44 p.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description (updated) --- Add another api in offset manager to return the struct, and the cache layer will only read its expiration timestamp while the offset formatter will read the struct as a whole Diffs (updated) - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala da29a8cb461099eb675161db2f11a9937424a5c6 core/src/main/scala/kafka/server/KafkaApis.scala 2a1c0326b6e6966d8b8254bd6a1cb83ad98a3b80 core/src/main/scala/kafka/server/KafkaServer.scala 1bf7d10cef23a77e71eb16bf6d0e68bc4ebe core/src/main/scala/kafka/server/OffsetManager.scala 3c79428962604800983415f6f705e04f52acb8fb core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Dec. 2, 2014, 2:03 a.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- Add another api in offset manager to return the struct, and the cache layer will only read its expiration timestamp while the offset formatter will read the struct as a whole Diffs (updated) - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 121e880a941fcd3e6392859edba11a94236494cc clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala da29a8cb461099eb675161db2f11a9937424a5c6 core/src/main/scala/kafka/server/KafkaApis.scala 2a1c0326b6e6966d8b8254bd6a1cb83ad98a3b80 core/src/main/scala/kafka/server/KafkaServer.scala 1bf7d10cef23a77e71eb16bf6d0e68bc4ebe core/src/main/scala/kafka/server/OffsetManager.scala 3c79428962604800983415f6f705e04f52acb8fb core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634
On Nov. 21, 2014, 2:41 p.m., Joel Koshy wrote: core/src/main/scala/kafka/common/OffsetMetadataAndError.scala, line 17 https://reviews.apache.org/r/27391/diff/4/?file=766684#file766684line17 I thought we would be going with separate format for on-disk storage? E.g., one thing that is extremely useful (until we have timestamp as a first-class field of messages) is to have the receive time of the offsetcommit in the stored offset entries. This is very useful for debugging. Guozhang Wang wrote: Yes they are separated: for on-disk storage the timestamp will always been stored, and for wire protocol only v0/1 will contain that value, but for v2 this value will be computed via retention. So the on-disk format is specified as OffsetAndMetadata, and when we deprecating v0/1 and adding the timestmap to message header we will replace this with OffsetMetadata. What i meant is that for the on-disk format it is useful to have the receive time (apart from the expiration time). Right now it seems only one timestamp (which is the expiration timestamp is stored). - Joel --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review62553 --- On Nov. 21, 2014, 10 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Nov. 21, 2014, 10 p.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- The timestamp field of OffsetAndMetadata is preserved since we need to be backward compatible with older versions Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 3e1718bc7ca6c835a59ca7c6879f558ec06866ee core/src/main/scala/kafka/server/KafkaApis.scala 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 core/src/main/scala/kafka/server/KafkaServer.scala 1bf7d10cef23a77e71eb16bf6d0e68bc4ebe core/src/main/scala/kafka/server/OffsetManager.scala 2957bc435102bc4004d8f100dbcdd56287c8ffae core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review62553 --- core/src/main/scala/kafka/api/OffsetCommitRequest.scala https://reviews.apache.org/r/27391/#comment104608 `=` core/src/main/scala/kafka/api/OffsetCommitRequest.scala https://reviews.apache.org/r/27391/#comment104609 `=` core/src/main/scala/kafka/common/OffsetMetadataAndError.scala https://reviews.apache.org/r/27391/#comment104615 I thought we would be going with separate format for on-disk storage? E.g., one thing that is extremely useful (until we have timestamp as a first-class field of messages) is to have the receive time of the offsetcommit in the stored offset entries. This is very useful for debugging. core/src/main/scala/kafka/server/KafkaApis.scala https://reviews.apache.org/r/27391/#comment104612 I think what you meant earlier was with v2 you could have different timestamps for each partition so a global retention won't work with v2 core/src/main/scala/kafka/server/OffsetManager.scala https://reviews.apache.org/r/27391/#comment104613 use core/src/main/scala/kafka/server/OffsetManager.scala https://reviews.apache.org/r/27391/#comment104614 This and the above use give compilation warnings. I think it is reasonable to copy those constants here to get rid of the warnings since we are anyway in an undesirable state right now of maintaining a mirror wire-protocol implementation in scala - Joel Koshy On Nov. 18, 2014, 1:42 a.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Nov. 18, 2014, 1:42 a.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- The timestamp field of OffsetAndMetadata is preserved since we need to be backward compatible with older versions Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala f476973eeff653473a60c3ecf36e870e386536bc core/src/main/scala/kafka/server/KafkaApis.scala 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 core/src/main/scala/kafka/server/KafkaServer.scala 4de812374e8fb1fed834d2be3f9655f55b511a74 core/src/main/scala/kafka/server/OffsetManager.scala 2957bc435102bc4004d8f100dbcdd56287c8ffae core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634
On Nov. 21, 2014, 2:41 p.m., Joel Koshy wrote: core/src/main/scala/kafka/server/KafkaApis.scala, line 147 https://reviews.apache.org/r/27391/diff/4/?file=766686#file766686line147 I think what you meant earlier was with v2 you could have different timestamps for each partition so a global retention won't work with v2 In the OffsetManager, the timestamp will only be overriden by the global one if it is set to Default which is -1, so for v0/1 although the retention is used it will not be overriding the timestamp. On Nov. 21, 2014, 2:41 p.m., Joel Koshy wrote: core/src/main/scala/kafka/common/OffsetMetadataAndError.scala, line 17 https://reviews.apache.org/r/27391/diff/4/?file=766684#file766684line17 I thought we would be going with separate format for on-disk storage? E.g., one thing that is extremely useful (until we have timestamp as a first-class field of messages) is to have the receive time of the offsetcommit in the stored offset entries. This is very useful for debugging. Yes they are separated: for on-disk storage the timestamp will always been stored, and for wire protocol only v0/1 will contain that value, but for v2 this value will be computed via retention. So the on-disk format is specified as OffsetAndMetadata, and when we deprecating v0/1 and adding the timestmap to message header we will replace this with OffsetMetadata. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review62553 --- On Nov. 18, 2014, 1:42 a.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Nov. 18, 2014, 1:42 a.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- The timestamp field of OffsetAndMetadata is preserved since we need to be backward compatible with older versions Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala f476973eeff653473a60c3ecf36e870e386536bc core/src/main/scala/kafka/server/KafkaApis.scala 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 core/src/main/scala/kafka/server/KafkaServer.scala 4de812374e8fb1fed834d2be3f9655f55b511a74 core/src/main/scala/kafka/server/OffsetManager.scala 2957bc435102bc4004d8f100dbcdd56287c8ffae core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634
On Nov. 21, 2014, 2:41 p.m., Joel Koshy wrote: core/src/main/scala/kafka/server/OffsetManager.scala, line 500 https://reviews.apache.org/r/27391/diff/4/?file=766688#file766688line500 This and the above use give compilation warnings. I think it is reasonable to copy those constants here to get rid of the warnings since we are anyway in an undesirable state right now of maintaining a mirror wire-protocol implementation in scala Not sure what you mean by copy those contants? I did the copy inside the if block and it is the compilation error from the condition. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review62553 --- On Nov. 18, 2014, 1:42 a.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Nov. 18, 2014, 1:42 a.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- The timestamp field of OffsetAndMetadata is preserved since we need to be backward compatible with older versions Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala f476973eeff653473a60c3ecf36e870e386536bc core/src/main/scala/kafka/server/KafkaApis.scala 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 core/src/main/scala/kafka/server/KafkaServer.scala 4de812374e8fb1fed834d2be3f9655f55b511a74 core/src/main/scala/kafka/server/OffsetManager.scala 2957bc435102bc4004d8f100dbcdd56287c8ffae core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Nov. 21, 2014, 10 p.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- The timestamp field of OffsetAndMetadata is preserved since we need to be backward compatible with older versions Diffs (updated) - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 3e1718bc7ca6c835a59ca7c6879f558ec06866ee core/src/main/scala/kafka/server/KafkaApis.scala 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 core/src/main/scala/kafka/server/KafkaServer.scala 1bf7d10cef23a77e71eb16bf6d0e68bc4ebe core/src/main/scala/kafka/server/OffsetManager.scala 2957bc435102bc4004d8f100dbcdd56287c8ffae core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review61761 --- clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java https://reviews.apache.org/r/27391/#comment103640 Not sure if this will make it worse; would it be clearer to call this DEFAULT_TIMESTAMP_V0_V1? clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java https://reviews.apache.org/r/27391/#comment103719 Made a follow-up comment in the earlier RB. But pasting here as well: Agree that it is still common in the object but it is completely removed from the wire protocol - i.e., OFFSET_COMMIT_REQUEST_PARTITION_V1 which is in the V2 OffsetCommitRequest does not have a timestamp. This method should probably be read as initCommonFieldsInStruct - i.e., effectively the wire protocol. That said, I'm loathe to add another init method which reads initCommonFieldsInV0AndV1. So I think rather than checking fetchPartitionData.timestamp it would be better to explicitly check the (already set) request version in the struct. If v0 or v1 then set the timestamp key name. core/src/main/scala/kafka/api/OffsetCommitRequest.scala https://reviews.apache.org/r/27391/#comment103727 = 2 (or we may forget when we go to version 3) core/src/main/scala/kafka/common/OffsetMetadataAndError.scala https://reviews.apache.org/r/27391/#comment103734 I think our coding convention is to use the parameterless form in the absence of side-effects core/src/main/scala/kafka/common/OffsetMetadataAndError.scala https://reviews.apache.org/r/27391/#comment103735 Similar comment as above. core/src/main/scala/kafka/server/KafkaApis.scala https://reviews.apache.org/r/27391/#comment103737 See reply to earlier comment. - Joel Koshy On Nov. 8, 2014, 12:54 a.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Nov. 8, 2014, 12:54 a.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- The timestamp field of OffsetAndMetadata is preserved since we need to be backward compatible with older versions Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala fbc680fde21b02f11285a4f4b442987356abd17b core/src/main/scala/kafka/server/KafkaApis.scala 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 core/src/main/scala/kafka/server/KafkaServer.scala 4de812374e8fb1fed834d2be3f9655f55b511a74 core/src/main/scala/kafka/server/OffsetManager.scala 2957bc435102bc4004d8f100dbcdd56287c8ffae core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp
On Nov. 7, 2014, 3 a.m., Joel Koshy wrote: clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java, line 152 https://reviews.apache.org/r/27391/diff/2/?file=753752#file753752line152 This confused me a bit, and I think it is because initCommonFields is intended to initialize fields common to all versions of the request. It is a useful helper method but it becomes somewhat clunky when removing fields. The partition-level timestamp is no longer a common field. If this is v2 then we should _never_ set anything in the timestamp field of the struct; and if it is v2 then we should _always_ set the timestamp field (even if it is the default). However, since the timestamp field in the Field declaration for OFFSET_COMMIT_REQUEST_PARTITION_V0 does not have a default explicitly specified, I think this will break with a SchemaException(missing value...) for offset commit request v0, v1 if we choose to write to a bytebuffer under those versions with this code. One option is to explicitly pass in the constructor version (0, 1, 2) to initCommonFields and use that to decide whether to include/exclude this field, but that is weird. Another alternative is a separate helper method for v0v1. That is even weirder. Guozhang Wang wrote: Actually, the partition-level timestamp is still a commen field (we are just deprecating it, and chose to not serialize / de-ser in v2). I agree this is a bit wired as it is written in this way, I thought about this when I started the first version but did not come up with a better approach. Agree that it is still common in the object but it is completely removed from the wire protocol - i.e., OFFSET_COMMIT_REQUEST_PARTITION_V1 which is in the V2 OffsetCommitRequest does not have a timestamp. This method should probably be read as initCommonFieldsInStruct - i.e., effectively the wire protocol. That said, I'm loathe to add another init method which reads initCommonFieldsInV0AndV1. So I think rather than checking fetchPartitionData.timestamp it would be better to explicitly check the (already set) request version in the struct. If v0 or v1 then set the timestamp key name. On Nov. 7, 2014, 3 a.m., Joel Koshy wrote: core/src/main/scala/kafka/common/OffsetMetadataAndError.scala, line 22 https://reviews.apache.org/r/27391/diff/2/?file=753755#file753755line22 Can we mark this @Deprecated as well? We should probably make the primary constructor without timestamp and add a secondary constructor with timestamp and mark deprecated there. Also, can we use case class.copy if timestamp needs to be modified? However, per comment further down I don't think it needs to be touched. Guozhang Wang wrote: Actually we cannot make it deprecated as it will be preserved even in the new version, right? Note this is not used for the wire protocol but for the cache / disk format. Guozhang Wang wrote: I should say not only for the wire protocol but also for cache disk storage format. And thinking about this twice, I will change to two separate classes, one for wire protocol and one for server storage format. Yes that is what I was thinking - we should ideally have a separate wire protocol and storage format. On Nov. 7, 2014, 3 a.m., Joel Koshy wrote: core/src/main/scala/kafka/server/OffsetManager.scala, line 498 https://reviews.apache.org/r/27391/diff/2/?file=753758#file753758line498 Actually, since we always set the retention period (for v0, v1) in KafkaApis do we need to even touch this timestamp? i.e., we should basically ignore it right? So we only need to do: value.set(VALUE_TIMESTAMP_FIELD, expirationTimestamp). Guozhang Wang wrote: I think note. In v0/v1, if the timestamp is explicitly specified (i.e. not -1) we need to use it as the expiration timestamp, or at least that was how I understood the semantics. Guozhang Wang wrote: I think we cannot not Right - what I meant was in KafkaApis we can just compute the retentionPeriod if v0 or v1. So if vo/v1 and timestamp = (now + 7 days), then set retention to 7 days. - Joel --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review60285 --- On Nov. 8, 2014, 12:54 a.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Nov. 8, 2014, 12:54 a.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- The timestamp field of OffsetAndMetadata is preserved since we
Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp
On Nov. 17, 2014, 10:31 p.m., Joel Koshy wrote: clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java, line 151 https://reviews.apache.org/r/27391/diff/3/?file=755182#file755182line151 Made a follow-up comment in the earlier RB. But pasting here as well: Agree that it is still common in the object but it is completely removed from the wire protocol - i.e., OFFSET_COMMIT_REQUEST_PARTITION_V1 which is in the V2 OffsetCommitRequest does not have a timestamp. This method should probably be read as initCommonFieldsInStruct - i.e., effectively the wire protocol. That said, I'm loathe to add another init method which reads initCommonFieldsInV0AndV1. So I think rather than checking fetchPartitionData.timestamp it would be better to explicitly check the (already set) request version in the struct. If v0 or v1 then set the timestamp key name. The version number info is not in the struct, so we cannot get its value unless we are going to add that from the request header into the constructor. But we can check if the struct has that field or not. Changed to this accordingly. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review61761 --- On Nov. 8, 2014, 12:54 a.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Nov. 8, 2014, 12:54 a.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- The timestamp field of OffsetAndMetadata is preserved since we need to be backward compatible with older versions Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala fbc680fde21b02f11285a4f4b442987356abd17b core/src/main/scala/kafka/server/KafkaApis.scala 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 core/src/main/scala/kafka/server/KafkaServer.scala 4de812374e8fb1fed834d2be3f9655f55b511a74 core/src/main/scala/kafka/server/OffsetManager.scala 2957bc435102bc4004d8f100dbcdd56287c8ffae core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp
On Nov. 7, 2014, 3 a.m., Joel Koshy wrote: core/src/main/scala/kafka/server/OffsetManager.scala, line 498 https://reviews.apache.org/r/27391/diff/2/?file=753758#file753758line498 Actually, since we always set the retention period (for v0, v1) in KafkaApis do we need to even touch this timestamp? i.e., we should basically ignore it right? So we only need to do: value.set(VALUE_TIMESTAMP_FIELD, expirationTimestamp). Guozhang Wang wrote: I think note. In v0/v1, if the timestamp is explicitly specified (i.e. not -1) we need to use it as the expiration timestamp, or at least that was how I understood the semantics. Guozhang Wang wrote: I think we cannot not Joel Koshy wrote: Right - what I meant was in KafkaApis we can just compute the retentionPeriod if v0 or v1. So if vo/v1 and timestamp = (now + 7 days), then set retention to 7 days. retention is a global parameter which we use later to set the per-message timestamp; but if the timestamps from the v0/v1 requests are different then we cannot just use a single retention right? - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review60285 --- On Nov. 8, 2014, 12:54 a.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Nov. 8, 2014, 12:54 a.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- The timestamp field of OffsetAndMetadata is preserved since we need to be backward compatible with older versions Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala fbc680fde21b02f11285a4f4b442987356abd17b core/src/main/scala/kafka/server/KafkaApis.scala 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 core/src/main/scala/kafka/server/KafkaServer.scala 4de812374e8fb1fed834d2be3f9655f55b511a74 core/src/main/scala/kafka/server/OffsetManager.scala 2957bc435102bc4004d8f100dbcdd56287c8ffae core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Nov. 18, 2014, 1:42 a.m.) Review request for kafka. Summary (updated) - Fix KAFKA-1634 Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- The timestamp field of OffsetAndMetadata is preserved since we need to be backward compatible with older versions Diffs (updated) - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala f476973eeff653473a60c3ecf36e870e386536bc core/src/main/scala/kafka/server/KafkaApis.scala 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 core/src/main/scala/kafka/server/KafkaServer.scala 4de812374e8fb1fed834d2be3f9655f55b511a74 core/src/main/scala/kafka/server/OffsetManager.scala 2957bc435102bc4004d8f100dbcdd56287c8ffae core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp
On Nov. 7, 2014, 3 a.m., Joel Koshy wrote: clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java, line 152 https://reviews.apache.org/r/27391/diff/2/?file=753752#file753752line152 This confused me a bit, and I think it is because initCommonFields is intended to initialize fields common to all versions of the request. It is a useful helper method but it becomes somewhat clunky when removing fields. The partition-level timestamp is no longer a common field. If this is v2 then we should _never_ set anything in the timestamp field of the struct; and if it is v2 then we should _always_ set the timestamp field (even if it is the default). However, since the timestamp field in the Field declaration for OFFSET_COMMIT_REQUEST_PARTITION_V0 does not have a default explicitly specified, I think this will break with a SchemaException(missing value...) for offset commit request v0, v1 if we choose to write to a bytebuffer under those versions with this code. One option is to explicitly pass in the constructor version (0, 1, 2) to initCommonFields and use that to decide whether to include/exclude this field, but that is weird. Another alternative is a separate helper method for v0v1. That is even weirder. Actually, the partition-level timestamp is still a commen field (we are just deprecating it, and chose to not serialize / de-ser in v2). I agree this is a bit wired as it is written in this way, I thought about this when I started the first version but did not come up with a better approach. On Nov. 7, 2014, 3 a.m., Joel Koshy wrote: core/src/main/scala/kafka/common/OffsetMetadataAndError.scala, line 22 https://reviews.apache.org/r/27391/diff/2/?file=753755#file753755line22 Can we mark this @Deprecated as well? We should probably make the primary constructor without timestamp and add a secondary constructor with timestamp and mark deprecated there. Also, can we use case class.copy if timestamp needs to be modified? However, per comment further down I don't think it needs to be touched. Actually we cannot make it deprecated as it will be preserved even in the new version, right? Note this is not used for the wire protocol but for the cache / disk format. On Nov. 7, 2014, 3 a.m., Joel Koshy wrote: core/src/main/scala/kafka/server/OffsetManager.scala, line 498 https://reviews.apache.org/r/27391/diff/2/?file=753758#file753758line498 Actually, since we always set the retention period (for v0, v1) in KafkaApis do we need to even touch this timestamp? i.e., we should basically ignore it right? So we only need to do: value.set(VALUE_TIMESTAMP_FIELD, expirationTimestamp). I think note. In v0/v1, if the timestamp is explicitly specified (i.e. not -1) we need to use it as the expiration timestamp, or at least that was how I understood the semantics. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review60285 --- On Nov. 6, 2014, 11:35 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Nov. 6, 2014, 11:35 p.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- The timestamp field of OffsetAndMetadata is preserved since we need to be backward compatible with older versions Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/server/KafkaApis.scala 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 core/src/main/scala/kafka/server/KafkaServer.scala 4de812374e8fb1fed834d2be3f9655f55b511a74 core/src/main/scala/kafka/server/OffsetManager.scala 2957bc435102bc4004d8f100dbcdd56287c8ffae core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 Diff: https://reviews.apache.org/r/27391/diff/ Testing ---
Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp
On Nov. 7, 2014, 3 a.m., Joel Koshy wrote: core/src/main/scala/kafka/common/OffsetMetadataAndError.scala, line 22 https://reviews.apache.org/r/27391/diff/2/?file=753755#file753755line22 Can we mark this @Deprecated as well? We should probably make the primary constructor without timestamp and add a secondary constructor with timestamp and mark deprecated there. Also, can we use case class.copy if timestamp needs to be modified? However, per comment further down I don't think it needs to be touched. Guozhang Wang wrote: Actually we cannot make it deprecated as it will be preserved even in the new version, right? Note this is not used for the wire protocol but for the cache / disk format. I should say not only for the wire protocol but also for cache disk storage format. And thinking about this twice, I will change to two separate classes, one for wire protocol and one for server storage format. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review60285 --- On Nov. 6, 2014, 11:35 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Nov. 6, 2014, 11:35 p.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- The timestamp field of OffsetAndMetadata is preserved since we need to be backward compatible with older versions Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/server/KafkaApis.scala 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 core/src/main/scala/kafka/server/KafkaServer.scala 4de812374e8fb1fed834d2be3f9655f55b511a74 core/src/main/scala/kafka/server/OffsetManager.scala 2957bc435102bc4004d8f100dbcdd56287c8ffae core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Nov. 8, 2014, 12:54 a.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- The timestamp field of OffsetAndMetadata is preserved since we need to be backward compatible with older versions Diffs (updated) - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala fbc680fde21b02f11285a4f4b442987356abd17b core/src/main/scala/kafka/server/KafkaApis.scala 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 core/src/main/scala/kafka/server/KafkaServer.scala 4de812374e8fb1fed834d2be3f9655f55b511a74 core/src/main/scala/kafka/server/OffsetManager.scala 2957bc435102bc4004d8f100dbcdd56287c8ffae core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Nov. 6, 2014, 11:35 p.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- The timestamp field of OffsetAndMetadata is preserved since we need to be backward compatible with older versions Diffs (updated) - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/server/KafkaApis.scala 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 core/src/main/scala/kafka/server/KafkaServer.scala 4de812374e8fb1fed834d2be3f9655f55b511a74 core/src/main/scala/kafka/server/OffsetManager.scala 2957bc435102bc4004d8f100dbcdd56287c8ffae core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review60285 --- clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java https://reviews.apache.org/r/27391/#comment101649 @Deprecated clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java https://reviews.apache.org/r/27391/#comment101651 This confused me a bit, and I think it is because initCommonFields is intended to initialize fields common to all versions of the request. It is a useful helper method but it becomes somewhat clunky when removing fields. The partition-level timestamp is no longer a common field. If this is v2 then we should _never_ set anything in the timestamp field of the struct; and if it is v2 then we should _always_ set the timestamp field (even if it is the default). However, since the timestamp field in the Field declaration for OFFSET_COMMIT_REQUEST_PARTITION_V0 does not have a default explicitly specified, I think this will break with a SchemaException(missing value...) for offset commit request v0, v1 if we choose to write to a bytebuffer under those versions with this code. One option is to explicitly pass in the constructor version (0, 1, 2) to initCommonFields and use that to decide whether to include/exclude this field, but that is weird. Another alternative is a separate helper method for v0v1. That is even weirder. clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java https://reviews.apache.org/r/27391/#comment101650 Would help to add a comment This field only exists in v0 and v1 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala https://reviews.apache.org/r/27391/#comment101657 Can we mark this @Deprecated as well? We should probably make the primary constructor without timestamp and add a secondary constructor with timestamp and mark deprecated there. Also, can we use case class.copy if timestamp needs to be modified? However, per comment further down I don't think it needs to be touched. core/src/main/scala/kafka/server/OffsetManager.scala https://reviews.apache.org/r/27391/#comment101658 This was already there, but it would be clearer to use: filter { case (..., ...) = } core/src/main/scala/kafka/server/OffsetManager.scala https://reviews.apache.org/r/27391/#comment101659 Found %d expired offsets. core/src/main/scala/kafka/server/OffsetManager.scala https://reviews.apache.org/r/27391/#comment101660 Actually, since we always set the retention period (for v0, v1) in KafkaApis do we need to even touch this timestamp? i.e., we should basically ignore it right? So we only need to do: value.set(VALUE_TIMESTAMP_FIELD, expirationTimestamp). - Joel Koshy On Nov. 6, 2014, 11:35 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Nov. 6, 2014, 11:35 p.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- The timestamp field of OffsetAndMetadata is preserved since we need to be backward compatible with older versions Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/server/KafkaApis.scala 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 core/src/main/scala/kafka/server/KafkaServer.scala 4de812374e8fb1fed834d2be3f9655f55b511a74 core/src/main/scala/kafka/server/OffsetManager.scala 2957bc435102bc4004d8f100dbcdd56287c8ffae core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review59839 --- clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java https://reviews.apache.org/r/27391/#comment101209 Would Long.MaxValue be a better default? clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java https://reviews.apache.org/r/27391/#comment101137 I think we should mark this with @deprecated clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java https://reviews.apache.org/r/27391/#comment101138 This can also be marked @deprecated and we can add a new constructor without timestamp and initialize to -1 there clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java https://reviews.apache.org/r/27391/#comment101121 can you add @param for the new parameter? for consistency only - the existing javadoc is pretty pointless anyway clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java https://reviews.apache.org/r/27391/#comment101124 If uninitialized it would be zero. However, see comment on deprecating the constructor above. core/src/main/scala/kafka/api/OffsetCommitRequest.scala https://reviews.apache.org/r/27391/#comment101198 val retentionMs = if (versionId == 2) { ... } else { ... } core/src/main/scala/kafka/common/OffsetMetadataAndError.scala https://reviews.apache.org/r/27391/#comment101127 Rather than making this a var it would be preferrable to use offsetAndMetadataInstance.copy(timstamp=new val) However, I don't think we need to modify the timestamp - see comment in KafkaApis core/src/main/scala/kafka/server/KafkaApis.scala https://reviews.apache.org/r/27391/#comment101166 As long as we do support v0 and v1, since we are changing the semantics of the _stored_ timestamp in the log I think it is reasonable to use the OffsetManager.retentionPeriod config here (which btw should be deprecated). i.e., if v0 or v1 request is received, pass in OffsetManager.retentionPeriod config. If it is v2 format, then just pass in retentionMs untouched. core/src/main/scala/kafka/server/OffsetManager.scala https://reviews.apache.org/r/27391/#comment101164 Can be marked @deprecated and we can get rid of it when we get rid of v0,v1 core/src/main/scala/kafka/server/OffsetManager.scala https://reviews.apache.org/r/27391/#comment101163 This will break - we now need to check that the absolute timestamp in the offset metadata has passed. core/src/main/scala/kafka/server/OffsetManager.scala https://reviews.apache.org/r/27391/#comment101150 val expirationTimestamp core/src/main/scala/kafka/server/OffsetManager.scala https://reviews.apache.org/r/27391/#comment101149 Expiration time of the offset core/src/main/scala/kafka/server/OffsetManager.scala https://reviews.apache.org/r/27391/#comment101204 expirationTimestamp would be clearer core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala https://reviews.apache.org/r/27391/#comment101140 Can we also enhance the OffsetCommitTest to do offset commits with v0, v1 and v2? i.e., test that the latest broker can handle all three versions correctly. - Joel Koshy On Oct. 30, 2014, 6:43 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Oct. 30, 2014, 6:43 p.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- The timestamp field of OffsetAndMetadata is preserved since we need to be backward compatible with older versions Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/server/KafkaApis.scala 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 core/src/main/scala/kafka/server/OffsetManager.scala 2957bc435102bc4004d8f100dbcdd56287c8ffae core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang