Re: Review Request 24676: Rebase KAFKA-1583
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/#review57235 --- Very nicely done. These are all minor comments - all but one concerning emptying the producer request that should be easily fixable if it is an issue. (It is the top comment) core/src/main/scala/kafka/api/ProducerRequest.scala https://reviews.apache.org/r/24676/#comment98557 I have a concern that this may actually be still needed. See comment under handleProducerRequest.sendResponseCallback core/src/main/scala/kafka/cluster/Partition.scala https://reviews.apache.org/r/24676/#comment97784 Maybe use this: Recorded replica %d log end offset (LEO)... Also, instead of an explicit [%s,%d] format specifier I think we should start doing the following: %s.format(TopicAndPartition(topic, partition)) That way we ensure a canonical toString for topic/partition pairs and can change it in one place in the future. There are some places where we don't log with this agreed-upon format and it is a bit annoying, so going forward I think we should use the above. Can we add it to the logging improvements wiki? core/src/main/scala/kafka/cluster/Partition.scala https://reviews.apache.org/r/24676/#comment97788 Since we still may update the HW shall we rename this to maybeUpdateHWAndExpandIsr core/src/main/scala/kafka/log/Log.scala https://reviews.apache.org/r/24676/#comment97797 Since this contains hw (which is a replication detail) should it really be in the replica manager? core/src/main/scala/kafka/server/DelayedFetch.scala https://reviews.apache.org/r/24676/#comment97809 How about just calling this responseCallback? It is slightly confusing to see references to callbackOnComplete and onComplete in the same class. core/src/main/scala/kafka/server/DelayedFetch.scala https://reviews.apache.org/r/24676/#comment97801 The earlier comment was useful. i.e., (in which case we return whatever data is available for the partitions that are currently led by this broker) core/src/main/scala/kafka/server/DelayedFetch.scala https://reviews.apache.org/r/24676/#comment97805 I'm a bit confused by case C. It can also happen if the delayed fetch happens to straddle a segment roll event; the comment seems a bit misleading/incomplete without that. In fact, if it is lagging shouldn't it have been satisfied immediately without having to create a DelayedFetch in the first place? core/src/main/scala/kafka/server/DelayedProduce.scala https://reviews.apache.org/r/24676/#comment98139 Similar comment as in DelayedFetch on naming this. core/src/main/scala/kafka/server/KafkaApis.scala https://reviews.apache.org/r/24676/#comment98165 Why is this additional logging necessary? KafkaApis currently has catch-all for unhandled exceptions. Error codes can be inspected via public access logs if required right? core/src/main/scala/kafka/server/KafkaApis.scala https://reviews.apache.org/r/24676/#comment98166 Same here. core/src/main/scala/kafka/server/KafkaApis.scala https://reviews.apache.org/r/24676/#comment98558 I'm not sure how scala treats this under the hood, but it _has_ to hold a reference to request until the callback is executed. i.e., we probably still want to empty the request data. core/src/main/scala/kafka/server/KafkaApis.scala https://reviews.apache.org/r/24676/#comment98180 to fetch messages core/src/main/scala/kafka/server/KafkaApis.scala https://reviews.apache.org/r/24676/#comment98182 Are these changes intentional? core/src/main/scala/kafka/server/OffsetManager.scala https://reviews.apache.org/r/24676/#comment98184 commitStatusView Also, can we just compute the final status once at the end as opposed to preparing an initial response status and modifying later? core/src/main/scala/kafka/server/OffsetManager.scala https://reviews.apache.org/r/24676/#comment98194 Do you think it would be clearer to name this onAppend or something similar? core/src/main/scala/kafka/server/ReplicaManager.scala https://reviews.apache.org/r/24676/#comment97799 Should we rename ReplicaManager to ReplicatedLogManager? core/src/main/scala/kafka/server/ReplicaManager.scala https://reviews.apache.org/r/24676/#comment98372 (for regular consumer fetch) core/src/main/scala/kafka/server/ReplicaManager.scala https://reviews.apache.org/r/24676/#comment98380 This is old code and we don't need to address it in this patch, but I was wondering if it makes sense to respond sooner if there is at least one error in the local append. What do you think? i.e., I don't remember a good reason for holding on to the request if there are i numPartitions errors in local append. core/src/main/scala/kafka/utils/DelayedItem.scala
Re: Review Request 24676: Rebase KAFKA-1583
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/#review57178 --- Thanks for the patch. +1 after addressing a couple of more minor comments below. Also, do you plan to have a followup jira to rename request to operation globally? core/src/main/scala/kafka/server/DelayedProduce.scala https://reviews.apache.org/r/24676/#comment97691 set the response core/src/main/scala/kafka/server/RequestPurgatory.scala https://reviews.apache.org/r/24676/#comment97709 Reworded the explanation as follows. Does it look ok? * * The logic upon completing a delayed operation is defined in onComplete() and will be called exactly once. * Once an operation is completed, isCompleted() will return true. onComplete() can be triggered by either * forceComplete(), which forces calling onComplete() after delayMs if the operation is not yet completed, * or tryComplete(), which first checks if the operation can be completed or not now, and if yes calls * forceComplete(). A subclass of DelayedRequest needs to provide an implementation of both onComplete() and * tryComplete(). - Jun Rao On Oct. 17, 2014, 4:56 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/ --- (Updated Oct. 17, 2014, 4:56 p.m.) Review request for kafka. Bugs: KAFKA-1583 https://issues.apache.org/jira/browse/KAFKA-1583 Repository: kafka Description --- Incorporate Jun's comments round two after rebase Diffs - core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 core/src/main/scala/kafka/log/Log.scala 157d67369baabd2206a2356b2aa421e848adab17 core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 core/src/main/scala/kafka/server/KafkaApis.scala 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 Diff: https://reviews.apache.org/r/24676/diff/ Testing --- Unit tests Thanks, Guozhang Wang
Re: Review Request 24676: Rebase KAFKA-1583
On Oct. 16, 2014, 1:29 a.m., Jun Rao wrote: core/src/main/scala/kafka/server/KafkaApis.scala, line 167 https://reviews.apache.org/r/24676/diff/9/?file=720184#file720184line167 Should replica manager be offset manager? This is replica manager actually, when it tries to write the commit message to the local log. I have changed the comment a bit to make it more clear. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/#review56843 --- On Oct. 14, 2014, 2:42 a.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/ --- (Updated Oct. 14, 2014, 2:42 a.m.) Review request for kafka. Bugs: KAFKA-1583 https://issues.apache.org/jira/browse/KAFKA-1583 Repository: kafka Description --- Rebase KAFKA-1583 on trunk: pass requiredAcks to Partition for min.isr + minor changes Diffs - core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 core/src/main/scala/kafka/log/Log.scala a123cdc52f341a802b3e4bfeb29a6154332e5f73 core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 core/src/main/scala/kafka/server/KafkaApis.scala 67f2833804cb15976680e42b9dc49e275c89d266 core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 Diff: https://reviews.apache.org/r/24676/diff/ Testing --- Unit tests Thanks, Guozhang Wang
Re: Review Request 24676: Rebase KAFKA-1583
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/#review56843 --- Thanks for the patch. Looks good to me. I only have some minor comments below. core/src/main/scala/kafka/server/DelayedFetch.scala https://reviews.apache.org/r/24676/#comment97277 typo: is does not core/src/main/scala/kafka/server/DelayedFetch.scala https://reviews.apache.org/r/24676/#comment97280 Typo: the new the fetch operation core/src/main/scala/kafka/server/DelayedFetch.scala https://reviews.apache.org/r/24676/#comment97278 This seems to be case B. core/src/main/scala/kafka/server/DelayedFetch.scala https://reviews.apache.org/r/24676/#comment97279 This seems to be case A. core/src/main/scala/kafka/server/DelayedProduce.scala https://reviews.apache.org/r/24676/#comment97282 This doesn't match the comment. The error code is returned from checkEnoughReplicasReachOffset, not from writing to local log. core/src/main/scala/kafka/server/KafkaApis.scala https://reviews.apache.org/r/24676/#comment97283 Should replica manager be offset manager? core/src/main/scala/kafka/server/ReplicaManager.scala https://reviews.apache.org/r/24676/#comment97287 Perhaps add a comment that flag is for testing purpose only. core/src/main/scala/kafka/server/RequestPurgatory.scala https://reviews.apache.org/r/24676/#comment97288 Could we comment on the return value? - Jun Rao On Oct. 14, 2014, 2:42 a.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/ --- (Updated Oct. 14, 2014, 2:42 a.m.) Review request for kafka. Bugs: KAFKA-1583 https://issues.apache.org/jira/browse/KAFKA-1583 Repository: kafka Description --- Rebase KAFKA-1583 on trunk: pass requiredAcks to Partition for min.isr + minor changes Diffs - core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 core/src/main/scala/kafka/log/Log.scala a123cdc52f341a802b3e4bfeb29a6154332e5f73 core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 core/src/main/scala/kafka/server/KafkaApis.scala 67f2833804cb15976680e42b9dc49e275c89d266 core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 Diff: https://reviews.apache.org/r/24676/diff/ Testing --- Unit tests Thanks, Guozhang Wang
Re: Review Request 24676: Rebase KAFKA-1583
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/ --- (Updated Oct. 14, 2014, 2:42 a.m.) Review request for kafka. Summary (updated) - Rebase KAFKA-1583 Bugs: KAFKA-1583 https://issues.apache.org/jira/browse/KAFKA-1583 Repository: kafka Description (updated) --- Rebase KAFKA-1583 on trunk: pass requiredAcks to Partition for min.isr + minor changes Diffs (updated) - core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 core/src/main/scala/kafka/log/Log.scala a123cdc52f341a802b3e4bfeb29a6154332e5f73 core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 core/src/main/scala/kafka/server/KafkaApis.scala 67f2833804cb15976680e42b9dc49e275c89d266 core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 Diff: https://reviews.apache.org/r/24676/diff/ Testing --- Unit tests Thanks, Guozhang Wang