[DISCUSSION] Error Handling and Logging at Kafka
Hello all, We want to kick off some discussions about error handling and logging conventions. With a number of great patch contributions to Kafka recently, it is good time for us to sit down and think a little bit more about the coding style guidelines we have (http://kafka.apache.org/coding-guide.html). People at LinkedIn have discussed a bit about some observed issues about the error handling and logging verboseness, and here is a summary of the bullet points covered: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Error+Handling+and+Logging We would like to collect as much comments as possible before we move ahead to those logging cleaning JIRAs mentioned in the wiki, so please feel free to shoot any thoughts and suggestions around this topic. -- Guozhang
Re: Review Request 24676: Fix KAFKA-1583
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/ --- (Updated Aug. 27, 2014, 4:44 p.m.) Review request for kafka. Summary (updated) - Fix KAFKA-1583 Bugs: KAFKA-1583 https://issues.apache.org/jira/browse/KAFKA-1583 Repository: kafka Description (updated) --- TBD Diffs (updated) - core/src/main/scala/kafka/api/FetchRequest.scala 51cdccf7f90eb530cc62b094ed822b8469d50b12 core/src/main/scala/kafka/api/FetchResponse.scala af9308737bf7832eca018c2b3ede703f7d1209f1 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe core/src/main/scala/kafka/cluster/Partition.scala ff106b47e6ee194cea1cf589474fef975b9dd7e2 core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f core/src/main/scala/kafka/server/ReplicaManager.scala 68758e35d496a4659819960ae8e809d6e215568e core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 168712de241125982d556c188c76514fceb93779 core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 Diff: https://reviews.apache.org/r/24676/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 24676: Fix KAFKA-1583
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/ --- (Updated Aug. 27, 2014, 5 p.m.) Review request for kafka. Bugs: KAFKA-1583 https://issues.apache.org/jira/browse/KAFKA-1583 Repository: kafka Description (updated) --- Incorporated Jun's comments. 1. I left some cases in Log since they are return values for some of their APIs. 2. I kept the fetch info in the delayed fetch metadata since it needs to be used for re-reading the log. 3. I kept the name of callbackOnComplete by following the principle that only the caller knows what the callback is used for, and hence they can name the callback as reponseCallback (from KafkaApi) and putCacheCallback (from OffsetManager), all the callee will take the callback as callbackOnComplete. Unit test passed, with some other notes: 1. Found and fix a bug in the current delayed fetch satisifaction logic: previously when we calculate the bytes, we do not take in the fetchMaxBytes into consideration as an upper limit for a single partition's log, but simply get the diff between the current HW/LEO and the fetch offset. 2. Fount and fix a bug in the unit tests: we used to create replica manager on the fly but did not shut it down upon completing the test, which will leak the background thread (i.e. reaper thread of purgatory). 3. Changed the RequestPurgatory API a bit with Jun's comments. Now it has two implemented functions: forceComplete() and isCompleted(), and two functions that need to be instantiated in the subclasses: tryComplete() and complete(). Please let me know if people have more comments on the current API. 4. Cleaned the SimpleFetch test, previously this test is too complicate but it actually just test a simple logic of the replica manager. One concern I have now is about the online creation of a new callback function (i.e. the def inside the handling functions and offset manager's storeOffset function, when I am running the unit test with the patch it seems causing a higher CPU consumption than trunk). And could some one take a another pair of eyes in running the unit tests and check the CPU performance? Diffs - core/src/main/scala/kafka/api/FetchRequest.scala 51cdccf7f90eb530cc62b094ed822b8469d50b12 core/src/main/scala/kafka/api/FetchResponse.scala af9308737bf7832eca018c2b3ede703f7d1209f1 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe core/src/main/scala/kafka/cluster/Partition.scala ff106b47e6ee194cea1cf589474fef975b9dd7e2 core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f core/src/main/scala/kafka/server/ReplicaManager.scala 68758e35d496a4659819960ae8e809d6e215568e core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 168712de241125982d556c188c76514fceb93779 core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 Diff: https://reviews.apache.org/r/24676/diff/ Testing (updated) --- Unit tests Thanks, Guozhang Wang
Re: [DISCUSSION] Error Handling and Logging at Kafka
Thanks Joe / Gwen for the great input. I have commented on the page accordingly also. There is a ticket for the background connection issue: KAFKA-1592 https://issues.apache.org/jira/browse/KAFKA-1592. On Wed, Aug 27, 2014 at 10:25 AM, Gwen Shapira gshap...@cloudera.com wrote: The wiki is great place to record the results or make suggestions that require formatting and graphics to explain, but I think I prefer mailing lists for discussion. So, let me repeat here a question I raised in the wiki, since I don't want it to get lost :) Is there a JIRA for the issues mentioned in the first background example? (closing socket and connection unsuccessful). I'll be more than happy to pick those up as they've made my life pretty painful. Gwen On Tue, Aug 26, 2014 at 7:21 PM, Joe Stein joe.st...@stealth.ly wrote: Hi Guozhang thanks for kicking this off. I made some comments in the Wiki (and we can continue the discussion there) but think this type of collaborative mailing list discussion and confluence writeup is a great way for different discussions about the same thing in different organizations to coalesce together in code. This may also be a good place for folks looking to get their feet wet contributing code to-do so. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Tue, Aug 26, 2014 at 6:22 PM, Guozhang Wang wangg...@gmail.com wrote: Hello all, We want to kick off some discussions about error handling and logging conventions. With a number of great patch contributions to Kafka recently, it is good time for us to sit down and think a little bit more about the coding style guidelines we have ( http://kafka.apache.org/coding-guide.html ). People at LinkedIn have discussed a bit about some observed issues about the error handling and logging verboseness, and here is a summary of the bullet points covered: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Error+Handling+and+Logging We would like to collect as much comments as possible before we move ahead to those logging cleaning JIRAs mentioned in the wiki, so please feel free to shoot any thoughts and suggestions around this topic. -- Guozhang -- -- Guozhang
Review Request 25155: Fix KAFKA-1616
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25155/ --- Review request for kafka. Bugs: KAFKA-1616 https://issues.apache.org/jira/browse/KAFKA-1616 Repository: kafka Description --- Purgatory size to be the sum of watched list sizes; delayed request to be the expiry queue length; remove atomic integers for metrics Diffs - core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 Diff: https://reviews.apache.org/r/25155/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 25155: Fix KAFKA-1616
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25155/ --- (Updated Aug. 28, 2014, 5:12 p.m.) Review request for kafka. Bugs: KAFKA-1616 https://issues.apache.org/jira/browse/KAFKA-1616 Repository: kafka Description --- Purgatory size to be the sum of watched list sizes; delayed request to be the expiry queue length; remove atomic integers for metrics Diffs (updated) - core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 Diff: https://reviews.apache.org/r/25155/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 25136: KAFKA-1610-Review Request
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25136/#review51793 --- Thanks for the patch, some general comments: 1. In general we would like to avoid using ._1 and ._2 simply due to clarity of the code; instead we can use { case (key, value) = }. 2. After thinking about it twice, I think even if the resulted collection is passed to some function as parameters, as long as we know that function will only read that value (for example ZkUtils.updatePartitionReassignmentData), but have no intention to modify it we can probably still use mapValues, which gives you the benefit of not creating one more Java collection in JVM. What do you think? 3. For places we do need to use map instead of mapValues (for example in ReplicaManager when we created the delayed request's reponse status). Add comments explaning why we do so (for the above example since acksPending and errorCode may be modified after the collection is created). Some detailed comments below. core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala https://reviews.apache.org/r/25136/#comment90400 Is this necessary? ReassignedPartitionContext seems not used anywhere. core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala https://reviews.apache.org/r/25136/#comment90401 Is this intentional? core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala https://reviews.apache.org/r/25136/#comment90402 Is this intentional? core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala https://reviews.apache.org/r/25136/#comment90404 Could we use a new line for the nested map? core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala https://reviews.apache.org/r/25136/#comment90406 For resulted maps that are used in the constructor parameters, as long as the constructor parameter will not change we can use mapValues. - Guozhang Wang On Aug. 28, 2014, 2:28 a.m., Mayuresh Gharat wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25136/ --- (Updated Aug. 28, 2014, 2:28 a.m.) Review request for kafka. Bugs: KAFKA-1610 https://issues.apache.org/jira/browse/KAFKA-1610 Repository: kafka Description --- Patch for replacing mapValues by map wherever necessary so that local modifications to collections are not lost Diffs - core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 691d69a49a240f38883d2025afaec26fd61281b5 core/src/main/scala/kafka/controller/KafkaController.scala 8ab4a1b8072c9dd187a9a6e94138b725d1f1b153 core/src/main/scala/kafka/log/LogManager.scala 4d2924d04bc4bd62413edb0ee2d4aaf3c0052867 core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb core/src/main/scala/kafka/server/KafkaServer.scala 28711182aaa70eaa623de858bc063cb2613b2a4d core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala af4783646803e58714770c21f8c3352370f26854 core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala c2ba07c5fdbaf0e65ca033b2e4d88f45a8a15b2e Diff: https://reviews.apache.org/r/25136/diff/ Testing --- Thanks, Mayuresh Gharat
Re: Review Request 25044: KAFKA-1611 - Improve system test configuration
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25044/#review51803 --- Ship it! Thanks for the patch. LGTM. - Guozhang Wang On Aug. 26, 2014, 12:04 a.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25044/ --- (Updated Aug. 26, 2014, 12:04 a.m.) Review request for kafka. Repository: kafka Description --- make the config a bit more out of the box for the common case of local cluster. This includes: 1. Fix cluster_config.json of migration testsuite, it has hardcoded path that prevents it from working out of the box at all. 2. Use JAVA_HOME environment variable if default is specified and if JAVA_HOME is defined. The current guessing method is a bit broken and using JAVA_HOME will allow devs to configure their default java dir without editing multiple cluster_config.json files in multiple places. Diffs - system_test/migration_tool_testsuite/cluster_config.json 8353e56 system_test/utils/system_test_utils.py 50340f0 Diff: https://reviews.apache.org/r/25044/diff/ Testing --- Running system tests bunch of times :) Thanks, Gwen Shapira
Re: Review Request 25136: KAFKA-1610-Review Request
On Aug. 28, 2014, 5:36 p.m., Guozhang Wang wrote: Thanks for the patch, some general comments: 1. In general we would like to avoid using ._1 and ._2 simply due to clarity of the code; instead we can use { case (key, value) = }. 2. After thinking about it twice, I think even if the resulted collection is passed to some function as parameters, as long as we know that function will only read that value (for example ZkUtils.updatePartitionReassignmentData), but have no intention to modify it we can probably still use mapValues, which gives you the benefit of not creating one more Java collection in JVM. What do you think? 3. For places we do need to use map instead of mapValues (for example in ReplicaManager when we created the delayed request's reponse status). Add comments explaning why we do so (for the above example since acksPending and errorCode may be modified after the collection is created). Some detailed comments below. Mayuresh Gharat wrote: I agree with point 1.) Regarding point 2.) I think that even if the function is only reading what happens when the collection gets changed and that function reads a different value. Of course if the collection is created locally and passed to a function then its better to use mapValues. Regarding point 3.) I will add those comments 2) Yeah I agree. If the collection is used outside the current block then we should use map. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25136/#review51793 --- On Aug. 28, 2014, 6:12 p.m., Mayuresh Gharat wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25136/ --- (Updated Aug. 28, 2014, 6:12 p.m.) Review request for kafka. Bugs: KAFKA-1610 https://issues.apache.org/jira/browse/KAFKA-1610 Repository: kafka Description --- Patch for replacing mapValues by map wherever necessary so that local modifications to collections are not lost Diffs - core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 691d69a49a240f38883d2025afaec26fd61281b5 core/src/main/scala/kafka/controller/KafkaController.scala 8ab4a1b8072c9dd187a9a6e94138b725d1f1b153 core/src/main/scala/kafka/log/LogManager.scala 4d2924d04bc4bd62413edb0ee2d4aaf3c0052867 core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb core/src/main/scala/kafka/server/KafkaServer.scala 28711182aaa70eaa623de858bc063cb2613b2a4d core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala af4783646803e58714770c21f8c3352370f26854 core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala c2ba07c5fdbaf0e65ca033b2e4d88f45a8a15b2e Diff: https://reviews.apache.org/r/25136/diff/ Testing --- Ran the unit tests and everything passed and the build succeeeded Thanks, Mayuresh Gharat
Re: Review Request 25136: Patch for KAFKA-1610
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25136/#review51909 --- LGTM. One minor thing about comments is that we do not need to say Changing mapValues to map since we do not need to leave comments indicating code change, but just comment on the purpose of coding. We can generally say sth. like Create a new collection with map since it (maybe modified outside this block / in another function call, etc). - Guozhang Wang On Aug. 29, 2014, 5:04 p.m., Mayuresh Gharat wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25136/ --- (Updated Aug. 29, 2014, 5:04 p.m.) Review request for kafka. Bugs: KAFKA-1610 https://issues.apache.org/jira/browse/KAFKA-1610 Repository: kafka Description --- Added comments explaining the changes and reverted back some changes as per comments on the reviewboard Removed the unnecessary import Diffs - core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 691d69a49a240f38883d2025afaec26fd61281b5 core/src/main/scala/kafka/controller/KafkaController.scala 8ab4a1b8072c9dd187a9a6e94138b725d1f1b153 core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb core/src/main/scala/kafka/server/KafkaServer.scala 28711182aaa70eaa623de858bc063cb2613b2a4d core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala af4783646803e58714770c21f8c3352370f26854 core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala c2ba07c5fdbaf0e65ca033b2e4d88f45a8a15b2e Diff: https://reviews.apache.org/r/25136/diff/ Testing --- Ran the unit tests and everything passed and the build succeeeded Thanks, Mayuresh Gharat
Re: Review Request 25155: Fix KAFKA-1616
On Aug. 28, 2014, 11:44 p.m., Jun Rao wrote: core/src/main/scala/kafka/server/RequestPurgatory.scala, line 258 https://reviews.apache.org/r/25155/diff/2/?file=671427#file671427line258 I am wonder if we should do two separate tests: (1) if enqueued() = purgeInterval, we purge the dealyed queue, (2) if size = purgeInterval, we purge the watchers. Added one test in RequestPurgatoryTest, which tests the number of watched elements and the enqueued requests before and after the purging. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25155/#review51850 --- On Aug. 28, 2014, 5:12 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25155/ --- (Updated Aug. 28, 2014, 5:12 p.m.) Review request for kafka. Bugs: KAFKA-1616 https://issues.apache.org/jira/browse/KAFKA-1616 Repository: kafka Description --- Purgatory size to be the sum of watched list sizes; delayed request to be the expiry queue length; remove atomic integers for metrics Diffs - core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 Diff: https://reviews.apache.org/r/25155/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 24676: Fix KAFKA-1583
On Aug. 29, 2014, 1:42 a.m., Jun Rao wrote: core/src/main/scala/kafka/server/ReplicaManager.scala, line 725 https://reviews.apache.org/r/24676/diff/3-4/?file=666252#file666252line725 Do we need to add the new parameter? Does it hurt to write the checkpoint file in unit tests? The reason is that in some unit tests with Mock, checkoutpoint function will try to access Log.dir(), which is not easy to be mocked. On Aug. 29, 2014, 1:42 a.m., Jun Rao wrote: core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala, lines 171-172 https://reviews.apache.org/r/24676/diff/3-4/?file=666256#file666256line171 What it be better to return the correct offset and just return empty MessageSet? The equality test can be on the offset. The LogOffsetMetadata is only for the fetching start offset, which would be 0 for both cases; we need to make sure that the fetched data's end offset does not exceed the limit. On Aug. 29, 2014, 1:42 a.m., Jun Rao wrote: core/src/main/scala/kafka/server/KafkaApis.scala, line 169 https://reviews.apache.org/r/24676/diff/4/?file=670284#file670284line169 Yes, I think this should be debug level logging. I will fix on other places for handling requests. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/#review51855 --- On Aug. 27, 2014, 5 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/ --- (Updated Aug. 27, 2014, 5 p.m.) Review request for kafka. Bugs: KAFKA-1583 https://issues.apache.org/jira/browse/KAFKA-1583 Repository: kafka Description --- Incorporated Jun's comments. 1. I left some cases in Log since they are return values for some of their APIs. 2. I kept the fetch info in the delayed fetch metadata since it needs to be used for re-reading the log. 3. I kept the name of callbackOnComplete by following the principle that only the caller knows what the callback is used for, and hence they can name the callback as reponseCallback (from KafkaApi) and putCacheCallback (from OffsetManager), all the callee will take the callback as callbackOnComplete. Unit test passed, with some other notes: 1. Found and fix a bug in the current delayed fetch satisifaction logic: previously when we calculate the bytes, we do not take in the fetchMaxBytes into consideration as an upper limit for a single partition's log, but simply get the diff between the current HW/LEO and the fetch offset. 2. Fount and fix a bug in the unit tests: we used to create replica manager on the fly but did not shut it down upon completing the test, which will leak the background thread (i.e. reaper thread of purgatory). 3. Changed the RequestPurgatory API a bit with Jun's comments. Now it has two implemented functions: forceComplete() and isCompleted(), and two functions that need to be instantiated in the subclasses: tryComplete() and complete(). Please let me know if people have more comments on the current API. 4. Cleaned the SimpleFetch test, previously this test is too complicate but it actually just test a simple logic of the replica manager. One concern I have now is about the online creation of a new callback function (i.e. the def inside the handling functions and offset manager's storeOffset function, when I am running the unit test with the patch it seems causing a higher CPU consumption than trunk). And could some one take a another pair of eyes in running the unit tests and check the CPU performance? Diffs - core/src/main/scala/kafka/api/FetchRequest.scala 51cdccf7f90eb530cc62b094ed822b8469d50b12 core/src/main/scala/kafka/api/FetchResponse.scala af9308737bf7832eca018c2b3ede703f7d1209f1 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe core/src/main/scala/kafka/cluster/Partition.scala ff106b47e6ee194cea1cf589474fef975b9dd7e2 core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka
Re: Review Request 24676: Fix KAFKA-1583
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/ --- (Updated Sept. 2, 2014, 1:07 a.m.) Review request for kafka. Bugs: KAFKA-1583 https://issues.apache.org/jira/browse/KAFKA-1583 Repository: kafka Description (updated) --- TBD Diffs (updated) - core/src/main/scala/kafka/api/FetchRequest.scala 51cdccf7f90eb530cc62b094ed822b8469d50b12 core/src/main/scala/kafka/api/FetchResponse.scala af9308737bf7832eca018c2b3ede703f7d1209f1 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe core/src/main/scala/kafka/cluster/Partition.scala ff106b47e6ee194cea1cf589474fef975b9dd7e2 core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f core/src/main/scala/kafka/server/ReplicaManager.scala 68758e35d496a4659819960ae8e809d6e215568e core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 168712de241125982d556c188c76514fceb93779 core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 Diff: https://reviews.apache.org/r/24676/diff/ Testing --- Unit tests Thanks, Guozhang Wang
Re: Review Request 24676: Fix KAFKA-1583
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/ --- (Updated Sept. 2, 2014, 1:09 a.m.) Review request for kafka. Bugs: KAFKA-1583 https://issues.apache.org/jira/browse/KAFKA-1583 Repository: kafka Description (updated) --- Incorporated Jun's comments round two. Incorporated Jun's comments. 1. I left some cases in Log since they are return values for some of their APIs. 2. I kept the fetch info in the delayed fetch metadata since it needs to be used for re-reading the log. 3. I kept the name of callbackOnComplete by following the principle that only the caller knows what the callback is used for, and hence they can name the callback as reponseCallback (from KafkaApi) and putCacheCallback (from OffsetManager), all the callee will take the callback as callbackOnComplete. Unit test passed, with some other notes: 1. Found and fix a bug in the current delayed fetch satisifaction logic: previously when we calculate the bytes, we do not take in the fetchMaxBytes into consideration as an upper limit for a single partition's log, but simply get the diff between the current HW/LEO and the fetch offset. 2. Found and fix a bug in the unit tests: we used to create replica manager on the fly but did not shut it down upon completing the test, which will leak the background thread (i.e. reaper thread of purgatory). 3. Changed the RequestPurgatory API a bit with Jun's comments. Now it has two implemented functions: forceComplete() and isCompleted(), and two functions that need to be instantiated in the subclasses: tryComplete() and complete(). Please let me know if people have more comments on the current API. 4. Cleaned the SimpleFetch test, previously this test is too complicate but it actually just test a simple logic of the replica manager. Diffs - core/src/main/scala/kafka/api/FetchRequest.scala 51cdccf7f90eb530cc62b094ed822b8469d50b12 core/src/main/scala/kafka/api/FetchResponse.scala af9308737bf7832eca018c2b3ede703f7d1209f1 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe core/src/main/scala/kafka/cluster/Partition.scala ff106b47e6ee194cea1cf589474fef975b9dd7e2 core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f core/src/main/scala/kafka/server/ReplicaManager.scala 68758e35d496a4659819960ae8e809d6e215568e core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 168712de241125982d556c188c76514fceb93779 core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 Diff: https://reviews.apache.org/r/24676/diff/ Testing --- Unit tests Thanks, Guozhang Wang
Re: Review Request 25155: Fix KAFKA-1616
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25155/ --- (Updated Sept. 2, 2014, 8:22 p.m.) Review request for kafka. Bugs: KAFKA-1616 https://issues.apache.org/jira/browse/KAFKA-1616 Repository: kafka Description --- Purgatory size to be the sum of watched list sizes; delayed request to be the expiry queue length; remove atomic integers for metrics; add a unit test for watched list sizes and enqueued requests Diffs (updated) - core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 168712de241125982d556c188c76514fceb93779 Diff: https://reviews.apache.org/r/25155/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 24676: Fix KAFKA-1583
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/ --- (Updated Sept. 2, 2014, 8:37 p.m.) Review request for kafka. Bugs: KAFKA-1583 https://issues.apache.org/jira/browse/KAFKA-1583 Repository: kafka Description (updated) --- Incorporated Jun's comments round three Diffs (updated) - core/src/main/scala/kafka/api/FetchRequest.scala 51cdccf7f90eb530cc62b094ed822b8469d50b12 core/src/main/scala/kafka/api/FetchResponse.scala af9308737bf7832eca018c2b3ede703f7d1209f1 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe core/src/main/scala/kafka/cluster/Partition.scala ff106b47e6ee194cea1cf589474fef975b9dd7e2 core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f core/src/main/scala/kafka/server/ReplicaManager.scala 68758e35d496a4659819960ae8e809d6e215568e core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 168712de241125982d556c188c76514fceb93779 core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 Diff: https://reviews.apache.org/r/24676/diff/ Testing --- Unit tests Thanks, Guozhang Wang
Re: Offset Request with timestamp
The semantic of the offset API is to return the latest possible offset of the message that is appended no later than the given timestamp. For implementation, it will get the starting offset of the log segment that is created no later than the given timestamp, and hence if your log segment contains data for a long period of time, then the offset API may return you just the starting offset of the current log segment. If your traffic is small and you still want a finer grained offset response, you can try to reduce the log segment size (default to 1 GB); however doing so will increase the number of file handlers with more frequent log segment rolling. Guozhang On Tue, Sep 2, 2014 at 10:21 AM, Manjunath Shivakumar manjunath.shivaku...@betfair.com wrote: Hi, My usecase is to fetch the offsets for a given topic from X milliseconds ago. If I use the offset api https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetAPI to do this and pass in a timestamp of (now() - X), I get the earliest offset in the current log segment and not the offset from X milliseconds ago. Is this the correct usage or behaviour? Thanks, Manju In order to protect our email recipients, Betfair Group use SkyScan from MessageLabs to scan all Incoming and Outgoing mail for viruses. -- -- Guozhang
Re: Review Request 24676: Fix KAFKA-1583
On Sept. 2, 2014, 5:53 p.m., Jun Rao wrote: Are you including the changes in kafka-1616 too? That would be fine. However, the comments in the other jira also need to be addressed in this patch. I was not intending to include the changes of KAFKA-1616. The plan is to first check int K-1616, then rebase K-1583 on that. However some of the changes may refect some review comments in K-1616 just for ease of rebasing. I can revert these back if you want. On Sept. 2, 2014, 5:53 p.m., Jun Rao wrote: core/src/main/scala/kafka/cluster/Partition.scala, line 282 https://reviews.apache.org/r/24676/diff/5/?file=673601#file673601line282 may now in = may not be in I think it should be may now be in? On Sept. 2, 2014, 5:53 p.m., Jun Rao wrote: core/src/main/scala/kafka/server/RequestPurgatory.scala, lines 343-349 https://reviews.apache.org/r/24676/diff/5/?file=673612#file673612line343 Not sure if we need the while loop any more. The comments may also need to be adjusted. Good point. For case curr != null cur.forceComplete() == false we can just return as well. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/#review52044 --- On Sept. 2, 2014, 8:37 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/ --- (Updated Sept. 2, 2014, 8:37 p.m.) Review request for kafka. Bugs: KAFKA-1583 https://issues.apache.org/jira/browse/KAFKA-1583 Repository: kafka Description --- Incorporated Jun's comments round three Diffs - core/src/main/scala/kafka/api/FetchRequest.scala 51cdccf7f90eb530cc62b094ed822b8469d50b12 core/src/main/scala/kafka/api/FetchResponse.scala af9308737bf7832eca018c2b3ede703f7d1209f1 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe core/src/main/scala/kafka/cluster/Partition.scala ff106b47e6ee194cea1cf589474fef975b9dd7e2 core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f core/src/main/scala/kafka/server/ReplicaManager.scala 68758e35d496a4659819960ae8e809d6e215568e core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 168712de241125982d556c188c76514fceb93779 core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 Diff: https://reviews.apache.org/r/24676/diff/ Testing --- Unit tests Thanks, Guozhang Wang
Re: Review Request 25155: Fix KAFKA-1616
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25155/ --- (Updated Sept. 3, 2014, 7:52 p.m.) Review request for kafka. Bugs: KAFKA-1616 https://issues.apache.org/jira/browse/KAFKA-1616 Repository: kafka Description (updated) --- Incorporated Jun's comments round four Diffs (updated) - core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 168712de241125982d556c188c76514fceb93779 Diff: https://reviews.apache.org/r/25155/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 25136: Patch for KAFKA-1610
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25136/#review52234 --- Ship it! Ship It! - Guozhang Wang On Sept. 3, 2014, 6:27 p.m., Mayuresh Gharat wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25136/ --- (Updated Sept. 3, 2014, 6:27 p.m.) Review request for kafka. Bugs: KAFKA-1610 https://issues.apache.org/jira/browse/KAFKA-1610 Repository: kafka Description --- Added comments explaining the changes and reverted back some changes as per comments on the reviewboard Removed the unnecessary import Made changes to comments as per the suggestions on the reviewboard Diffs - core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 691d69a49a240f38883d2025afaec26fd61281b5 core/src/main/scala/kafka/controller/KafkaController.scala 8ab4a1b8072c9dd187a9a6e94138b725d1f1b153 core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb core/src/main/scala/kafka/server/KafkaServer.scala 28711182aaa70eaa623de858bc063cb2613b2a4d core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala af4783646803e58714770c21f8c3352370f26854 core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala c2ba07c5fdbaf0e65ca033b2e4d88f45a8a15b2e Diff: https://reviews.apache.org/r/25136/diff/ Testing --- Ran the unit tests and everything passed and the build succeeeded Thanks, Mayuresh Gharat
Re: Review Request 24676: Fix KAFKA-1583
On Sept. 3, 2014, 6:23 p.m., Jun Rao wrote: core/src/main/scala/kafka/server/DelayedFetch.scala, line 100 https://reviews.apache.org/r/24676/diff/6/?file=674148#file674148line100 Should we log topicAndPartition as well? fetchMetadata includes the fetchPartitionStatus, that includes the mapping of topicAndPartition to fetchPartitionStatus. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/#review52092 --- On Sept. 2, 2014, 8:37 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/ --- (Updated Sept. 2, 2014, 8:37 p.m.) Review request for kafka. Bugs: KAFKA-1583 https://issues.apache.org/jira/browse/KAFKA-1583 Repository: kafka Description --- Incorporated Jun's comments round three Diffs - core/src/main/scala/kafka/api/FetchRequest.scala 51cdccf7f90eb530cc62b094ed822b8469d50b12 core/src/main/scala/kafka/api/FetchResponse.scala af9308737bf7832eca018c2b3ede703f7d1209f1 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe core/src/main/scala/kafka/cluster/Partition.scala ff106b47e6ee194cea1cf589474fef975b9dd7e2 core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f core/src/main/scala/kafka/server/ReplicaManager.scala 68758e35d496a4659819960ae8e809d6e215568e core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 168712de241125982d556c188c76514fceb93779 core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 Diff: https://reviews.apache.org/r/24676/diff/ Testing --- Unit tests Thanks, Guozhang Wang
Re: Review Request 24676: Fix KAFKA-1583
On Sept. 3, 2014, 6:23 p.m., Jun Rao wrote: Looks good. I only have the following minor comments. Thanks Jun. If there is no more comments for now I will wait for KAFKA-1616 to be checked in first, and then do the rebase and the class / function renaming (which will make the diff file quite hard to review) as well as comments modification accordingly. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/#review52092 --- On Sept. 2, 2014, 8:37 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/ --- (Updated Sept. 2, 2014, 8:37 p.m.) Review request for kafka. Bugs: KAFKA-1583 https://issues.apache.org/jira/browse/KAFKA-1583 Repository: kafka Description --- Incorporated Jun's comments round three Diffs - core/src/main/scala/kafka/api/FetchRequest.scala 51cdccf7f90eb530cc62b094ed822b8469d50b12 core/src/main/scala/kafka/api/FetchResponse.scala af9308737bf7832eca018c2b3ede703f7d1209f1 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe core/src/main/scala/kafka/cluster/Partition.scala ff106b47e6ee194cea1cf589474fef975b9dd7e2 core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f core/src/main/scala/kafka/server/ReplicaManager.scala 68758e35d496a4659819960ae8e809d6e215568e core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 168712de241125982d556c188c76514fceb93779 core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 Diff: https://reviews.apache.org/r/24676/diff/ Testing --- Unit tests Thanks, Guozhang Wang
Re: [DISCUSS] 0.8.2 release branch, unofficial release candidates(s), 0.8.1.2 release
Just made a pass over the unresolved tickets tagged for 0.8.2, I think many of them can be pushed to 0.8.3 / 0.9. On Wed, Sep 3, 2014 at 8:05 PM, Jonathan Weeks jonathanbwe...@gmail.com wrote: +1 on a 0.8.1.2 release as described. I manually applied patches to cobble together a working gradle build for kafka for scala 2.11, but would really appreciate an official release — i.e. 0.8.1.2, as we also have other dependent libraries we use as well (e.g. akka-kafka) that would be much easier to migrate and support if the build was public and official. There were at least several others on the “users” list that expressed interest in scala 2.11 support, who knows how many more “lurkers” are out there. Best Regards, -Jonathan Hey, I wanted to take a quick pulse to see if we are getting closer to a branch for 0.8.2. 1) There still seems to be a lot of open issues https://issues.apache.org/jira/browse/KAFKA/fixforversion/12326167/?selectedTab=com.atlassian.jira.jira-projects-plugin:version-issues-panel and our 30 day summary is showing issues: 51 created and *34* resolved and not sure how much of that we could really just decide to push off to 0.8.3 or 0.9.0 vs working on 0.8.2 as stable for release. There is already so much goodness on trunk. I appreciate the double commit pain especially as trunk and branch drift (ugh). 2) Also, I wanted to float the idea of after making the 0.8.2 branch that I would do some unofficial release candidates for folks to test prior to release and vote. What I was thinking was I would build, upload and stage like I was preparing artifacts for vote but let the community know to go in and have at it well prior to the vote release. We don't get a lot of community votes during a release but issues after (which is natural because of how things are done). I have seen four Apache projects doing this very successfully not only have they had less iterations of RC votes (sensitive to that myself) but the community kicked back issues they saw by giving them some pre release time to go through their own test and staging environments as the release are coming about. 3) Checking again on should we have a 0.8.1.2 release if folks in the community find important features (this might be best asked on the user list maybe not sure) they don't want/can't wait for which wouldn't be too much pain/dangerous to back port. Two things that spring to the top of my head are 2.11 Scala support and fixing the source jars. Both of these are easy to patch personally I don't mind but want to gauge more from the community on this too. I have heard gripes ad hoc from folks in direct communication but no complains really in the public forum and wanted to open the floor if folks had a need. 4) 0.9 work I feel is being held up some (or at least resourcing it from my perspective). We decided to hold up including SSL (even though we have a path for it). Jay did a nice update recently to the Security wiki which I think we should move forward with. I have some more to add/change/update and want to start getting down to more details and getting specific people working on specific tasks but without knowing what we are doing when it is hard to manage. 5) I just updated https://issues.apache.org/jira/browse/KAFKA-1555 I think it is a really important feature update doesn't have to be in 0.8.2 but we need consensus (no pun intended). It fundamentally allows for data in min two rack requirement which A LOT of data requires for successful save to occur. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / -- -- Guozhang
Re: Review Request 25155: Fix KAFKA-1616
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25155/ --- (Updated Sept. 4, 2014, 8:26 p.m.) Review request for kafka. Bugs: KAFKA-1616 https://issues.apache.org/jira/browse/KAFKA-1616 Repository: kafka Description (updated) --- Purgatory size to be the sum of watched list sizes; delayed request to be the expiry queue length; remove atomic integers for metrics; add a unit test for watched list sizes and enqueued requests Diffs (updated) - core/src/main/scala/kafka/server/RequestPurgatory.scala ce06d2c381348deef8559374869fcaed923da1d1 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala 168712de241125982d556c188c76514fceb93779 Diff: https://reviews.apache.org/r/25155/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 24676: Fix KAFKA-1583
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/ --- (Updated Sept. 5, 2014, 9:08 p.m.) Review request for kafka. Bugs: KAFKA-1583 https://issues.apache.org/jira/browse/KAFKA-1583 Repository: kafka Description (updated) --- rebase on KAFKA-1616 for checking diff files, please do not review Diffs (updated) - core/src/main/scala/kafka/api/FetchRequest.scala 51cdccf7f90eb530cc62b094ed822b8469d50b12 core/src/main/scala/kafka/api/FetchResponse.scala af9308737bf7832eca018c2b3ede703f7d1209f1 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe core/src/main/scala/kafka/cluster/Partition.scala ff106b47e6ee194cea1cf589474fef975b9dd7e2 core/src/main/scala/kafka/common/ErrorMapping.scala 3fae7910e4ce17bc8325887a046f383e0c151d44 core/src/main/scala/kafka/log/Log.scala 0ddf97bd30311b6039e19abade41d2fbbad2f59b core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f core/src/main/scala/kafka/server/ReplicaManager.scala 68758e35d496a4659819960ae8e809d6e215568e core/src/main/scala/kafka/server/RequestPurgatory.scala cf3ed4c8f197d1197658645ccb55df0bce86bdd4 core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala ab60e9b3a4d063c838bdc7f97b3ac7d2ede87072 core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 Diff: https://reviews.apache.org/r/24676/diff/ Testing --- Unit tests Thanks, Guozhang Wang
Re: Review Request 25136: Patch for KAFKA-1610
On Sept. 9, 2014, 1:38 a.m., Joel Koshy wrote: core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala, line 125 https://reviews.apache.org/r/25136/diff/4/?file=675768#file675768line125 I don't think this is required, since we are subsequently doing a map over the view which materializes the correct final collection. So it seems this map creates an unnecessary intermediate collection. Actually, we are passing the view into another function checkIfPartitionReassignmentSucceeded inside the subsequent map, and hence we cannot really controll how this function will be using this view. On Sept. 9, 2014, 1:38 a.m., Mayuresh Gharat wrote: I only reviewed up to DelayedFetch, because I think these changes are mostly unnecessary and also add overhead. Can you re-evaluate the above and the rest of the patch to see if there are any _correctness_ issues? I agree with the premise of this jira - i.e., we should fix places where we use mapValues *and* cannot have a non-strict (i.e., lazily materialized) view. However, most of the existing uses of mapValues are as far as I can see legitimate uses of views where we don't need to materialize intermediate collections. Joel, I see your point. I think we should discuss a little bit on the principle of using map v.s. mapView; I was originally thinking for any views that are passing to another function or used outside of the current code block, they should be converted into an intermediate collection since it is out of controll. We can discuss over this convention. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25136/#review52660 --- On Sept. 3, 2014, 6:27 p.m., Mayuresh Gharat wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25136/ --- (Updated Sept. 3, 2014, 6:27 p.m.) Review request for kafka. Bugs: KAFKA-1610 https://issues.apache.org/jira/browse/KAFKA-1610 Repository: kafka Description --- Added comments explaining the changes and reverted back some changes as per comments on the reviewboard Removed the unnecessary import Made changes to comments as per the suggestions on the reviewboard Diffs - core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 691d69a49a240f38883d2025afaec26fd61281b5 core/src/main/scala/kafka/controller/KafkaController.scala 8ab4a1b8072c9dd187a9a6e94138b725d1f1b153 core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb core/src/main/scala/kafka/server/KafkaServer.scala 28711182aaa70eaa623de858bc063cb2613b2a4d core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala af4783646803e58714770c21f8c3352370f26854 core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala c2ba07c5fdbaf0e65ca033b2e4d88f45a8a15b2e Diff: https://reviews.apache.org/r/25136/diff/ Testing --- Ran the unit tests and everything passed and the build succeeeded Thanks, Mayuresh Gharat
Re: Error handling in new Java Producer
Hi Steve: 1. the new producer will be included in the 0.8.2 release for production usage. 2. The error you reported has been changed as a WARN in the latest trunk. We realize this should not really be an error case. In general, any errors will be propagated to the producer in the following two ways: a. producer.send() can throw ApiException and runtime exceptions, indicating the sending is not triggered in the underlying sending thread. b. producer.send().get() can throw KafkaException such as leader not find, indicating the sending response from the broker says it has failed receiving the message. Guozhang On Mon, Sep 8, 2014 at 8:16 AM, Tarzia star...@signal.co wrote: Hello, I am trying to use the new org.apache.kafka.clients.producer.KafkaProducer to take advantage of error reporting that is lacking in the current stable Scala client (import kafka.javaapi.producer.Producer). Two questions: * I know that 0.8.2 is not yet released but is the new Producer feature-complete and ready for testing? * If so, how should I check for errors in KafkaProducer#send()? In my tests I brought down the Kafka sever and hoped to detect errors in the producer so that I could respond by re-queueing failed requests. However, I was not getting any exceptions on KafkaProducer#send(), instead I got an exception inside the producer Thread: WARN org.apache.kafka.common.network.Selector - Error in I/O with localhost.localdomain/127.0.0.1 java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) at org.apache.kafka.common.network.Selector.poll(Selector.java:232) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:178) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Should this be bubbling up the the send() method, or should there be a getError() method in the RecordMetadata that is asynchronously returned? Basically, I don't understand the error-reporting API. Thanks, Steve Tarzia -- -- Guozhang
Re: Error handling in new Java Producer
Hi Steve, You can try to reduce the metadata.fetch.timeout.ms config value, which will controll how much the send() call can be blocked upon broker metadata not available (due to broker itself not available). Guozhang On Tue, Sep 9, 2014 at 7:00 PM, Tarzia star...@signal.co wrote: Thanks Guozhang, Another case I am seeing is that producer.send() seems to block when the brokers are unavailable. This is not the behavior I want (I would rather have it throw an exception immediately so I can queue the messages for replay). I will try to confirm this tomorrow. -Steve On Sep 9, 2014, at 8:55 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Steve: 1. the new producer will be included in the 0.8.2 release for production usage. 2. The error you reported has been changed as a WARN in the latest trunk. We realize this should not really be an error case. In general, any errors will be propagated to the producer in the following two ways: a. producer.send() can throw ApiException and runtime exceptions, indicating the sending is not triggered in the underlying sending thread. b. producer.send().get() can throw KafkaException such as leader not find, indicating the sending response from the broker says it has failed receiving the message. Guozhang On Mon, Sep 8, 2014 at 8:16 AM, Tarzia star...@signal.co wrote: Hello, I am trying to use the new org.apache.kafka.clients.producer.KafkaProducer to take advantage of error reporting that is lacking in the current stable Scala client (import kafka.javaapi.producer.Producer). Two questions: * I know that 0.8.2 is not yet released but is the new Producer feature-complete and ready for testing? * If so, how should I check for errors in KafkaProducer#send()? In my tests I brought down the Kafka sever and hoped to detect errors in the producer so that I could respond by re-queueing failed requests. However, I was not getting any exceptions on KafkaProducer#send(), instead I got an exception inside the producer Thread: WARN org.apache.kafka.common.network.Selector - Error in I/O with localhost.localdomain/127.0.0.1 java.net.ConnectException: Connection refused at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:739) at org.apache.kafka.common.network.Selector.poll(Selector.java:232) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:178) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:175) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:115) at java.lang.Thread.run(Thread.java:744) Should this be bubbling up the the send() method, or should there be a getError() method in the RecordMetadata that is asynchronously returned? Basically, I don't understand the error-reporting API. Thanks, Steve Tarzia -- -- Guozhang -- -- Guozhang
Re: Review Request 25420: Patch for KAFKA-686
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25420/#review52980 --- Thanks for the patch. Overall it is a great clean-up. Some comments below: core/src/main/scala/kafka/controller/PartitionStateMachine.scala https://reviews.apache.org/r/25420/#comment92277 The controller will only be shutdown upon kafka server shutting down, we should probably just call controller.onControllerResignation() core/src/main/scala/kafka/utils/ZkUtils.scala https://reviews.apache.org/r/25420/#comment92282 Do we still need this function any more? Shall we just change the usages of getReplicasForPartition to getPartitionAssignmentForTopic? core/src/main/scala/kafka/utils/ZkUtils.scala https://reviews.apache.org/r/25420/#comment92285 Why do we want it to be mutable? core/src/main/scala/kafka/utils/ZkUtils.scala https://reviews.apache.org/r/25420/#comment92286 Shall we also use failAsValue on catching ZkNoNodeException in getChildrenParentMayNotExist, and make its return type an Option? core/src/main/scala/kafka/utils/ZkUtils.scala https://reviews.apache.org/r/25420/#comment92288 Can we rename to consumerOffsetForPartitionDir for naming consistency? - Guozhang Wang On Sept. 7, 2014, 7:22 p.m., Viktor Tarananeko wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25420/ --- (Updated Sept. 7, 2014, 7:22 p.m.) Review request for kafka. Bugs: KAFKA-686 https://issues.apache.org/jira/browse/KAFKA-686 Repository: kafka Description --- Merge branch 'trunk' into fix-null-pointer-in-zk-utils unify topic partition path constructing; refactoring and better code reuse reuse method to fetch broker data unify fetching topics raise InvalidTopicException if unable to properly parse json data from Zookeeper TopicRegistrationInfo class base controller failure test on invalid topic data Diffs - core/src/main/scala/kafka/common/TopicRegistrationInfo.scala PRE-CREATION core/src/main/scala/kafka/consumer/TopicCount.scala 0954b3c3ff8b3b7a7a4095436bc9e6c494a38c37 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala fbc680fde21b02f11285a4f4b442987356abd17b core/src/main/scala/kafka/controller/PartitionStateMachine.scala e20b63a6ec1c1a848bc3823701b0f8ceaeb6100d core/src/main/scala/kafka/server/KafkaHealthcheck.scala 4acdd70fe9c1ee78d6510741006c2ece65450671 core/src/main/scala/kafka/tools/ConsumerOffsetChecker.scala d1e7c434e77859d746b8dc68dd5d5a3740425e79 core/src/main/scala/kafka/tools/ExportZkOffsets.scala 4d051bc2db12f0e15aa6a3289abeb9dd25d373d2 core/src/main/scala/kafka/tools/UpdateOffsetsInZK.scala 111c9a8b94ce45d95551482e9fd3f8c1cccbf548 core/src/main/scala/kafka/utils/ZkUtils.scala a7b1fdcb50d5cf930352d37e39cb4fc9a080cb12 core/src/test/scala/unit/kafka/integration/AutoOffsetResetTest.scala 95303e098d40cd790fb370e9b5a47d20860a6da3 core/src/test/scala/unit/kafka/server/ControllerFailureTest.scala PRE-CREATION core/src/test/scala/unit/kafka/utils/TestUtils.scala c4e13c5240c8303853d08cc3b40088f8c7dae460 Diff: https://reviews.apache.org/r/25420/diff/ Testing --- Thanks, Viktor Tarananeko
Re: Review Request 25136: Patch for KAFKA-1610
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25136/#review53391 --- Have talked with Mayuresh and Joel about which mapValues should be converted to map. The summary is that 1. For now the only place we do in-place modification of a map is in ProducePartitionStatus, on isAcksPending and ErrorCode; however it is not guaranteed that we will do some in-place modification in the future that are actually on a view created by mapValues. 2. On the other hand, converting all mapValues to map whose resulted collection may be used outside the current block will increase many new object creations, especially for places where a map is called for each instance in map loop (for example the fetch response). So the suggested solution is that we should keep in mind of this risk for in-place modifications in Scala (fortunately no-one does that often) by: 1. Adding comments on where map is already used and MUST be used since its resultd collection items will be modified (for now the only place I know of is ProducePartitionStatus, Mayuresh could you double check if there are other cases?). 2. Add comments on function parameters that are passed in as a view indicating the given map should not be modified in-place. 3. Once we are making in-place modification into a Map in Scala, check if this collection can be created as a view. Mayuresh, could you submit another patch following this suggestion and also incorporating Neha's comments? - Guozhang Wang On Sept. 3, 2014, 6:27 p.m., Mayuresh Gharat wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25136/ --- (Updated Sept. 3, 2014, 6:27 p.m.) Review request for kafka. Bugs: KAFKA-1610 https://issues.apache.org/jira/browse/KAFKA-1610 Repository: kafka Description --- Added comments explaining the changes and reverted back some changes as per comments on the reviewboard Removed the unnecessary import Made changes to comments as per the suggestions on the reviewboard Diffs - core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala 691d69a49a240f38883d2025afaec26fd61281b5 core/src/main/scala/kafka/controller/KafkaController.scala 8ab4a1b8072c9dd187a9a6e94138b725d1f1b153 core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/KafkaApis.scala c584b559416b3ee4bcbec5966be4891e0a03eefb core/src/main/scala/kafka/server/KafkaServer.scala 28711182aaa70eaa623de858bc063cb2613b2a4d core/src/main/scala/kafka/tools/ReplicaVerificationTool.scala af4783646803e58714770c21f8c3352370f26854 core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala c2ba07c5fdbaf0e65ca033b2e4d88f45a8a15b2e Diff: https://reviews.apache.org/r/25136/diff/ Testing --- Ran the unit tests and everything passed and the build succeeeded Thanks, Mayuresh Gharat
Re: (info)kafka truncate data of specific topic
Hi Jacky, Could you elaborate a bit on your use cases, like why you want to manually truncate logs? Kafka provide a set of configs for data retention based on data size and time (for example maintaining as much as 100 GB or up to 7 days of old data), would that be sufficient to you? Guozhang On Mon, Sep 15, 2014 at 8:00 PM, Jacky.J.Wang (mis.cnxa01.Newegg) 43048 jacky.j.w...@newegg.com.invalid wrote: Hello kafka I truncate data of kafka as follow 1:stop kafka service 2:delete zookeeper /broker/topics/topic and /consumers/group 3:delete kafka log dir 4:restart kafka service 5:recreate topic info but this way need to stop the service,so how truncate kafka topic data with no stopping kafka service? Eagerly awaiting your reply,thanks Best regards, Jacky.J.Wang Eng Software Engineer,NESC-XA Newegg Tech (Xian) Co., Ltd. 15th to 16th floor, 01 Plaza, Xi’an Software Park, No. 72 Keji 2nd Road, Xi’an P.R.China(710075) Once you know, you Newegg. - CONFIDENTIALITY NOTICE: This email and any files transmitted with it may contain privileged or otherwise confidential information. It is intended only for the person or persons to whom it is addressed. If you received this message in error, you are not authorized to read, print, retain, copy, disclose, disseminate, distribute, or use this message any part thereof or any information contained therein. Please notify the sender immediately and delete all copies of this message. Thank you in advance for your cooperation. 保密注意:此邮件及其附随文件可能包含了保密信息。该邮件的目的是发送给指定收件人。如果您非指定收件人而错误地收到了本邮件,您将无权阅读、打印、保存、复制、泄露、传播、分发或使用此邮件全部或部分内容或者邮件中包含的任何信息。请立即通知发件人,并删除该邮件。感谢您的配合! -- -- Guozhang
Re: How producer gets the acknowledgement back
Hi, In the new (Java) producer, you can pass in a callback function in the FutureRecordMetadata send(ProducerRecord record, Callback callback) call, which will be triggered when the ack is received. Alternatively, you can also call Future.get() on the returned future metadata, which will block until the ack is received (i.e., synced sending). Guozhang On Wed, Sep 24, 2014 at 6:10 AM, Sreenivasulu Nallapati sreenu.nallap...@gmail.com wrote: Hello, Can you please help me to get the acknowledgement in producer? After setting the property *request.required.acks to 1, *how producer gets the acknowledgement back? I am trying to get the acknowledgement in java producer. Thanks Sreeni -- -- Guozhang
Re: Review Request 25944: Patch for KAFKA-1013
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25944/#review54609 --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/25944/#comment94796 This comment line is for code line 320, better move it above it. core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/25944/#comment94797 This comment line is for code line 320, better move it above it. core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/25944/#comment94799 Is there a difference between these two: Thread.sleep() TimeUnit.MILLISECONDS.sleep() since we do the former everywhere in the code base. core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/25944/#comment94800 Do we still need this variable? core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/25944/#comment94802 Can we omit collection.Seq() here can just use groupId = config.groupId? core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/25944/#comment94804 merge in a single line core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/25944/#comment94806 revert core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/25944/#comment94807 revert core/src/main/scala/kafka/tools/ExportOffsets.scala https://reviews.apache.org/r/25944/#comment94812 Can we actually make the same format for ZK / offsetmanager, making two different formats would make it harder to be parsed since the user needs to know whether ZK or offsetmanager is used. core/src/main/scala/kafka/tools/ExportOffsets.scala https://reviews.apache.org/r/25944/#comment94810 We can make a parseBrokerList in Utils and use it there, since I have seens this similar parsing logic at multiple places. core/src/main/scala/kafka/tools/ExportOffsets.scala https://reviews.apache.org/r/25944/#comment94811 You can take a look at KAFKA-686's latest patch which did some cleanup on the util functions; these function may probably merged into Utils. core/src/main/scala/kafka/tools/ImportOffsets.scala https://reviews.apache.org/r/25944/#comment94813 Ditto as above, can we unify the input offset format? core/src/main/scala/kafka/tools/ImportOffsets.scala https://reviews.apache.org/r/25944/#comment94814 Ditto as above. core/src/main/scala/kafka/tools/ImportOffsets.scala https://reviews.apache.org/r/25944/#comment94815 Same as Joel suggested: we can just use config's default values. core/src/main/scala/kafka/tools/OffsetClient.scala https://reviews.apache.org/r/25944/#comment94816 The apache header is missing. core/src/main/scala/kafka/tools/OffsetClient.scala https://reviews.apache.org/r/25944/#comment94819 This could be error. core/src/main/scala/kafka/tools/OffsetClient.scala https://reviews.apache.org/r/25944/#comment94822 This can be trace, or we can just print the offset manager id in debug if it does not contain error code. core/src/main/scala/kafka/tools/OffsetClient.scala https://reviews.apache.org/r/25944/#comment94820 Could be error(Error while connecting to %s:%d for fetching consumer metadata), since thi is not a general exception. core/src/main/scala/kafka/tools/OffsetClient.scala https://reviews.apache.org/r/25944/#comment94821 When an exception is thrown and caught here, we should skip the rest of the loop. core/src/main/scala/kafka/tools/OffsetClient.scala https://reviews.apache.org/r/25944/#comment94823 Could be error(Error while connecting to offset manager %s) core/src/main/scala/kafka/tools/OffsetClient.scala https://reviews.apache.org/r/25944/#comment94824 Some logging inconsistency: broker [%s:%d] broker %s:%d %s:%d core/src/main/scala/kafka/tools/OffsetClient.scala https://reviews.apache.org/r/25944/#comment94825 Why this API needs to return an Option while the previous one can directly return the reponse? core/src/main/scala/kafka/tools/OffsetClient.scala https://reviews.apache.org/r/25944/#comment94826 We do not need Possible cause any more, just print the exception's message if necessary. core/src/main/scala/kafka/tools/OffsetClient.scala https://reviews.apache.org/r/25944/#comment94827 Are there any other exceptions that can be caught besides IOExceptions? We need to be careful about always catching Throwable and printing stack trace. - Guozhang Wang On Sept. 23, 2014, 5:48 p.m., Mayuresh Gharat wrote
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review54620 --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/25995/#comment94830 Do we need to do this check every time in the loop? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94831 no need empty line here. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94832 No need bracket core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94833 No need bracket core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94835 Maximum bytes that can be buffered in the data channels core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94834 in terms of bytes core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94836 Inconsistency indentation. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94838 Capitalize: Offset commit interval in ms core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94841 Do you need to turn off auto commit on the consumer threads here? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94840 We can add some more comment here, explaning: 1) why we add the offset commit thread for new producer, but not old producer; 2) what risks does the old producer have (for not having offset commit thread). core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94842 For clean shutdown, you need to 1) halt consumer threads first. 2) wait for producer to drain all the messages in data channel. 3) manually commit offsets on consumer threads. 4) shut down consumer threads. Otherwise we will have data duplicates as we commit offsets based on min. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94844 queueId core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94846 How about having a histogram for each queue instead of getting the sum? The update call would be a bit less expensive and we can monitor if some queues are empty while others get all the data. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94847 Ditto above. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94849 Add comments explaining why we force an unclean shutdown with System.exit here. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94857 Unfortunately this may not be the case, as we can have multiple connectors which are using different consumer configs with different group ids. We need to either 1) change the config settings to enforce this to be true, or 2) use a separate offset client that remembers which topics belongs to which groups. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94858 Capitalize first word core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94859 Capitalize first word core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94863 Adding comment to the logic of how this works. Also a few questions: 1) is the map() call synchronized with other threads putting new offsets into the map? 2) after the sorting, the logic may be clearer as val commitableOffsetIndex = 0 while (offsets[commitableOffsetIndex] - offsets.head == commitableOffsetIndex) commitableOffsetIndex += 1 offsetToCommit = offsets[commitableOffsetIndex] + 1 core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment94855 The send().get() call is missing. core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala https://reviews.apache.org/r/25995/#comment94853 Apache header missing. - Guozhang Wang On Sept. 24, 2014, 4:26 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Sept. 24, 2014, 4:26 p.m.) Review request for kafka. Bugs: KAFKA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 Repository: kafka Description
Re: Review Request 26306: Patch for KAFKA-1663
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26306/#review55415 --- core/src/main/scala/kafka/controller/TopicDeletionManager.scala https://reviews.apache.org/r/26306/#comment95770 Can we change this comment accordingly, since we have other places calling resumeTopicDeletionThread() than these three cases. - Guozhang Wang On Oct. 3, 2014, 1:31 a.m., Sriharsha Chintalapani wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26306/ --- (Updated Oct. 3, 2014, 1:31 a.m.) Review request for kafka. Bugs: KAFKA-1663 https://issues.apache.org/jira/browse/KAFKA-1663 Repository: kafka Description --- KAFKA-1663. Controller unable to shutdown after a soft failure. Diffs - core/src/main/scala/kafka/controller/TopicDeletionManager.scala 219c4136e905a59a745c3a596c95d59b550e7383 Diff: https://reviews.apache.org/r/26306/diff/ Testing --- Thanks, Sriharsha Chintalapani
Review Request 26390: Fix KAFKA-1641
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26390/ --- Review request for kafka. Bugs: KAFKA-1641 https://issues.apache.org/jira/browse/KAFKA-1641 Repository: kafka Description --- Reset cleaning start offset upon abnormal log truncation Diffs - core/src/main/scala/kafka/log/LogCleanerManager.scala e8ced6a5922508ea3274905be7c3d6e728f320ac Diff: https://reviews.apache.org/r/26390/diff/ Testing --- Thanks, Guozhang Wang
Review Request 26393: Follow-up KAFKA-1468
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26393/ --- Review request for kafka. Bugs: KAFKA-1468 https://issues.apache.org/jira/browse/KAFKA-1468 Repository: kafka Description --- Change array list back to linked list for watchers Diffs - core/src/main/scala/kafka/server/RequestPurgatory.scala cf3ed4c8f197d1197658645ccb55df0bce86bdd4 Diff: https://reviews.apache.org/r/26393/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 26373: Patch for KAFKA-1647
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26373/#review55615 --- Since now the first iteration of if statements is only used for logging, could we just merge it into the second check? - Guozhang Wang On Oct. 6, 2014, 5:06 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26373/ --- (Updated Oct. 6, 2014, 5:06 p.m.) Review request for kafka. Bugs: KAFKA-1647 https://issues.apache.org/jira/browse/KAFKA-1647 Repository: kafka Description --- Fix for Kafka-1647. Diffs - core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 Diff: https://reviews.apache.org/r/26373/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review55612 --- core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala https://reviews.apache.org/r/25995/#comment95974 Do we need to add = here? core/src/main/scala/kafka/server/ReplicaManager.scala https://reviews.apache.org/r/25995/#comment95975 We should keep the changes of KAFKA-1647 in its only RB and do not merge them here. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment95978 Could we add some introduction comment here on: 1. The architecture of the MM: producer / consumer thread, data channel per producer thread, offset commit thread, and how different modules interact with each other. 2. Why we need a separate offset commit thread, and how it works. 3. The startup / shutdown process, like which modules to start / shutdown first (this could be moved to the head of the corresponding functions also). core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment95979 Embedded consumer config for consuming from the source cluster. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment95980 Embedded producer config for producing to the target cluster. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment95981 The offset commit thread periodically commit consumed offsets to the source cluster. With the new producer, the offsets are updated upon the returned future metadata of the send() call; with the old producer, the offsets are updated upon the consumer's iterator advances. By doing this, it is guaranteed no data loss even when mirror maker is uncleanly shutdown with the new producer, while with the old producer messages inside the data channel could be lost upon mirror maker unclean shutdown. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment96019 numMessageCapacity and byteCapacity? numGetters and numPutters (since the producer is the consumer of this buffer and vice versa)? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment96021 How about MirrorMaker-DataChannel-queue%d-NumMessages and MirrorMaker-DataChannel-queue%d-Bytes? and variable name channelNumMessageHists and channelByteHists? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment96020 Can we define put(record, queueId) and put(record), and the latter includes the logic of determining the queueId and then call the former? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment96022 comment on why we use the hashCode of source topic / partition here. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment96026 Instead of letting the consumer to check on the global shutdown flag, could we just add a shutdown function which sets it own flag like the producer thread and the commit thread? Then the process of the shutdown becomes consumers.shutdown consumers.awaitShutdown producers.shutdown producers.awaitShutdown committer.shutdown committer.awaitShutdown connector.shutdown core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment96023 Maybe just // if it exits accidentally, stop the entire mirror maker as we did below? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment96024 // if it exits accidentally, stop the entire mirror maker core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment96025 // the committed offset will be the first offset of the un-consumed message, hence we need to increment by one. core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala https://reviews.apache.org/r/25995/#comment96027 queueNumItemCapacity and queueByteCapacity? - Guozhang Wang On Oct. 6, 2014, 5:20 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Oct. 6, 2014, 5:20 p.m.) Review request for kafka. Bugs: KAFKA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Talked with Joel and decided to remove multi connector support as people can always creat multiple mirror maker instances if they want to consumer from multiple clusters. Diffs - core/src
Re: Review Request 26373: Patch for KAFKA-1647
On Oct. 7, 2014, 12:15 a.m., Guozhang Wang wrote: Since now the first iteration of if statements is only used for logging, could we just merge it into the second check? Jiangjie Qin wrote: Guozhang, thanks for the review. I actually thought about it before. I agree that the code looks simpler if we just call partition.makeFollower without checking whether leader is up or not. The reason I retained the first if statement is that it seems more reasonable to create the remote replicas only when the leader broker is online, which is done in partition.makeFollower. And I'm not 100% sure whether it matters if we just create the remote replicas objects without checking the liveliness of leader broker. The checking of the liveness of the leader is just for assertation (i.e. the leader should always be alive when the leaderAndISR request is received, otherwise something bad has happended and we need to log the errors, but still proceed by skiping this request). - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26373/#review55615 --- On Oct. 6, 2014, 5:06 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26373/ --- (Updated Oct. 6, 2014, 5:06 p.m.) Review request for kafka. Bugs: KAFKA-1647 https://issues.apache.org/jira/browse/KAFKA-1647 Repository: kafka Description --- Fix for Kafka-1647. Diffs - core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 Diff: https://reviews.apache.org/r/26373/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 26390: Fix KAFKA-1641
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26390/ --- (Updated Oct. 9, 2014, 8:04 p.m.) Review request for kafka. Bugs: KAFKA-1641 https://issues.apache.org/jira/browse/KAFKA-1641 Repository: kafka Description --- Reset cleaning start offset upon abnormal log truncation Diffs (updated) - core/src/main/scala/kafka/log/LogCleanerManager.scala e8ced6a5922508ea3274905be7c3d6e728f320ac Diff: https://reviews.apache.org/r/26390/diff/ Testing --- Thanks, Guozhang Wang
[DISCUSSION] Message Metadata
Hello all, I put some thoughts on enhancing our current message metadata format to solve a bunch of existing issues: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata This wiki page is for kicking off some discussions about the feasibility of adding more info into the message header, and if possible how we would add them. -- Guozhang
Re: Review Request 24676: Rebase KAFKA-1583
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/ --- (Updated Oct. 14, 2014, 2:42 a.m.) Review request for kafka. Summary (updated) - Rebase KAFKA-1583 Bugs: KAFKA-1583 https://issues.apache.org/jira/browse/KAFKA-1583 Repository: kafka Description (updated) --- Rebase KAFKA-1583 on trunk: pass requiredAcks to Partition for min.isr + minor changes Diffs (updated) - core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 core/src/main/scala/kafka/log/Log.scala a123cdc52f341a802b3e4bfeb29a6154332e5f73 core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 core/src/main/scala/kafka/server/KafkaApis.scala 67f2833804cb15976680e42b9dc49e275c89d266 core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 Diff: https://reviews.apache.org/r/24676/diff/ Testing --- Unit tests Thanks, Guozhang Wang
Re: [DISCUSSION] Message Metadata
and give feedback. Also, wrt discussion in the past we have used a mix of wiki comments and the mailing list. Personally, I think it is better to discuss on the mailing list (for more visibility) and just post a bold link to the (archived) mailing list thread on the wiki. Joel On Fri, Oct 10, 2014 at 05:33:52PM -0700, Guozhang Wang wrote: Hello all, I put some thoughts on enhancing our current message metadata format to solve a bunch of existing issues: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata This wiki page is for kicking off some discussions about the feasibility of adding more info into the message header, and if possible how we would add them. -- Guozhang -- -- Guozhang
Re: Review Request 26373: Patch for KAFKA-1647
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26373/#review56736 --- Ship it! Ship It! - Guozhang Wang On Oct. 13, 2014, 11:38 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26373/ --- (Updated Oct. 13, 2014, 11:38 p.m.) Review request for kafka. Bugs: KAFKA-1647 https://issues.apache.org/jira/browse/KAFKA-1647 Repository: kafka Description --- Addressed Joel's comments. Diffs - core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 Diff: https://reviews.apache.org/r/26373/diff/ Testing --- Thanks, Jiangjie Qin
Re: [DISCUSSION] Message Metadata
Thanks Joe, I think we now have a few open questions to discuss around this topic: 1. Shall we make core Kafka properties as first class fields in message header or put them as tags? The pros of the first approach is more compacted format and hence less message header overhead; the cons are that any future message header change needs protocol bump and possible multi-versioned handling on the server side. Vice versa for the second approach. 2. Shall we leave app properties still in message content and enforce schema based topics or make them as extensible tags? The pros of the first approach is again saving message header overhead for apps properties; and the cons are that it enforce schema usage for message content to be partially de-serialized only for app header. At LinkedIn we enforce Avro schemas for auditing purposes, and as a result the Kafka team has to manage the schema registration process / schema repository as well. 3. Which properties should be core KAFKA and which should be app properties? For example, shall we make properties that only MM cares about as app properties or Kafka properties? Guozhang On Tue, Oct 14, 2014 at 5:10 AM, Joe Stein joe.st...@stealth.ly wrote: I think we could add schemaId(binary) to the MessageAndMetaData With the schemaId you can implement different downstream software pattern on the messages reliably. I wrote up more thoughts on this use https://cwiki.apache.org/confluence/display/KAFKA/Schema+based+topics it should strive to encompass all implementation needs for producer, broker, consumer hooks. So if the application and tagged fields are important you can package that into a specific Kafka topic plug-in and assign it to topic(s). Kafka server should be able to validate your expected formats (like encoders/decoders but in broker by topic regardless of producer) to the topics that have it enabled. We should have these maintained in the project under contrib. =- Joestein On Mon, Oct 13, 2014 at 11:02 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Jay, Thanks for the comments. Replied inline. Guozhang On Mon, Oct 13, 2014 at 11:11 AM, Jay Kreps jay.kr...@gmail.com wrote: I need to take more time to think about this. Here are a few off-the-cuff remarks: - To date we have tried really, really hard to keep the data model for message simple since after all you can always add whatever you like inside the message body. - For system tags, why not just make these fields first class fields in message? The purpose of a system tag is presumably that Why have a bunch of key-value pairs versus first-class fields? Yes, we can alternatively make system tags as first class fields in the message header to make the format / processing logic simpler. The main reasons I put them as systems tags are 1) when I think about these possible system tags, some of them are for all types of messages (e.g. timestamps), but some of them may be for a specific type of message (compressed, control message) and for those not all of them are necessarily required all the time, hence making them as compact tags may save us some space when not all of them are available; 2) with tags we do not need to bump up the protocol version every time we make a change to it, which includes keeping the logic to handle all versions on the broker until the old ones are officially discarded; instead, the broker can just ignore a tag if its id is not recognizable since the client is on a newer version, or use some default value / throw exception if a required tag is missing since the client is on an older version. - You don't necessarily need application-level tags explicitly represented in the message format for efficiency. The application can define their own header (e.g. their message could be a size delimited header followed by a size delimited body). But actually if you use Avro you don't even need this I don't think. Avro has the ability to just deserialize the header fields in your message. Avro has a notion of reader and writer schemas. The writer schema is whatever the message was written with. If the reader schema is just the header, avro will skip any fields it doesn't need and just deserialize the fields it does need. This is actually a much more usable and flexible way to define a header since you get all the types avro allows instead of just bytes. I agree that we can use a reader schema to just read out the header without de-serializing the full message, and probably for compressed message we can add an Avro / etc header for the compressed wrapper message also, but that would enforce these applications (MM, auditor, clients) to be schema-aware, which would usually require the people who manage this data pipeline also manage the schemas, whereas ideally Kafka itself should just consider bytes-in and bytes-out
Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?
Regarding 1634, I was intended to work on that after 1583 since it will changes the commit offset request handling logic a lot. If people think 1583 is only a few days away before check-in, we can leave in in 0.8.2-beta; otherwise we can push to 0.8.3. Guozhang On Thu, Oct 16, 2014 at 2:19 PM, Joe Stein joe.st...@stealth.ly wrote: +1 for a 0.8.2-beta next week and 0.8.2.0 final 4-5 weeks later. I agree to the tickets you brought up to have in 0.8.2-beta and also https://issues.apache.org/jira/browse/KAFKA-1493 for lz4 compression. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop / On Oct 16, 2014 12:55 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Another JIRA that will be nice to include as part of 0.8.2-beta is https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the mbean naming. Looking for people's thoughts on 2 things here - 1. How do folks feel about doing a 0.8.2-beta release right now and 0.8.2 final 4-5 weeks later? 2. Do people want to include any JIRAs (other than the ones mentioned above) in 0.8.2-beta? If so, it will be great to know now so it will allow us to move forward with the beta release quickly. Thanks, Neha On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Hi, We have accumulated an impressive list of pretty major features in 0.8.2 - Delete topic Automated leader rebalancing Controlled shutdown Offset management Parallel recovery min.isr and clean leader election In the past, what has worked for major feature releases is a beta release prior to a final release. I'm proposing we do the same for 0.8.2. The only blockers for 0.8.2-beta, that I know of are - https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major change and requires some thinking about the new dependency. Since it is not fully ready and there are things to think about, I suggest we take it out, think it end to end and then include it in 0.8.3.) https://issues.apache.org/jira/browse/KAFKA-1634 (This has an owner: Guozhang Wang) https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and is waiting on a review by Joe Stein) It seems that 1634 and 1671 can get wrapped up in a week. Do people think we should cut 0.8.2-beta by next week? Thanks, Neha -- -- Guozhang
Re: [DISCUSS] Release 0.8.2-beta before 0.8.2?
Agree. On Oct 16, 2014 3:16 PM, Jun Rao jun...@gmail.com wrote: +1 on doing an 0.8.2 beta. Guozhang, kafka-1583 is relatively large. Given that we are getting close to releasing 0.8.2 beta, my feeling is that we probably shouldn't include it in 0.8.2 beta even if we can commit it in a few days. Thanks, Jun On Thu, Oct 16, 2014 at 3:01 PM, Guozhang Wang wangg...@gmail.com wrote: Regarding 1634, I was intended to work on that after 1583 since it will changes the commit offset request handling logic a lot. If people think 1583 is only a few days away before check-in, we can leave in in 0.8.2-beta; otherwise we can push to 0.8.3. Guozhang On Thu, Oct 16, 2014 at 2:19 PM, Joe Stein joe.st...@stealth.ly wrote: +1 for a 0.8.2-beta next week and 0.8.2.0 final 4-5 weeks later. I agree to the tickets you brought up to have in 0.8.2-beta and also https://issues.apache.org/jira/browse/KAFKA-1493 for lz4 compression. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop / On Oct 16, 2014 12:55 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Another JIRA that will be nice to include as part of 0.8.2-beta is https://issues.apache.org/jira/browse/KAFKA-1481 that fixes the mbean naming. Looking for people's thoughts on 2 things here - 1. How do folks feel about doing a 0.8.2-beta release right now and 0.8.2 final 4-5 weeks later? 2. Do people want to include any JIRAs (other than the ones mentioned above) in 0.8.2-beta? If so, it will be great to know now so it will allow us to move forward with the beta release quickly. Thanks, Neha On Wed, Oct 15, 2014 at 4:46 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Hi, We have accumulated an impressive list of pretty major features in 0.8.2 - Delete topic Automated leader rebalancing Controlled shutdown Offset management Parallel recovery min.isr and clean leader election In the past, what has worked for major feature releases is a beta release prior to a final release. I'm proposing we do the same for 0.8.2. The only blockers for 0.8.2-beta, that I know of are - https://issues.apache.org/jira/browse/KAFKA-1493 (Is a major change and requires some thinking about the new dependency. Since it is not fully ready and there are things to think about, I suggest we take it out, think it end to end and then include it in 0.8.3.) https://issues.apache.org/jira/browse/KAFKA-1634 (This has an owner: Guozhang Wang) https://issues.apache.org/jira/browse/KAFKA-1671 (Has a patch and is waiting on a review by Joe Stein) It seems that 1634 and 1671 can get wrapped up in a week. Do people think we should cut 0.8.2-beta by next week? Thanks, Neha -- -- Guozhang
Re: Review Request 24676: Rebase KAFKA-1583
On Oct. 16, 2014, 1:29 a.m., Jun Rao wrote: core/src/main/scala/kafka/server/KafkaApis.scala, line 167 https://reviews.apache.org/r/24676/diff/9/?file=720184#file720184line167 Should replica manager be offset manager? This is replica manager actually, when it tries to write the commit message to the local log. I have changed the comment a bit to make it more clear. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/#review56843 --- On Oct. 14, 2014, 2:42 a.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/ --- (Updated Oct. 14, 2014, 2:42 a.m.) Review request for kafka. Bugs: KAFKA-1583 https://issues.apache.org/jira/browse/KAFKA-1583 Repository: kafka Description --- Rebase KAFKA-1583 on trunk: pass requiredAcks to Partition for min.isr + minor changes Diffs - core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 core/src/main/scala/kafka/log/Log.scala a123cdc52f341a802b3e4bfeb29a6154332e5f73 core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 core/src/main/scala/kafka/server/KafkaApis.scala 67f2833804cb15976680e42b9dc49e275c89d266 core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 Diff: https://reviews.apache.org/r/24676/diff/ Testing --- Unit tests Thanks, Guozhang Wang
Re: Review Request 24676: Fix KAFKA-1583
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/ --- (Updated Oct. 17, 2014, 4:15 a.m.) Review request for kafka. Summary (updated) - Fix KAFKA-1583 Bugs: KAFKA-1583 https://issues.apache.org/jira/browse/KAFKA-1583 Repository: kafka Description (updated) --- Incoporate Jun's comments after rebase Diffs (updated) - core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 core/src/main/scala/kafka/log/Log.scala 157d67369baabd2206a2356b2aa421e848adab17 core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 core/src/main/scala/kafka/server/KafkaApis.scala 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 Diff: https://reviews.apache.org/r/24676/diff/ Testing --- Unit tests Thanks, Guozhang Wang
Re: [DISCUSSION] Message Metadata
. What I was proposing is an alternative solution given that we have this message header enhancement; with this we do not need to add another logic for leadership map and checkpoint file, but just the logic on replica-manager to handle this extra controlled message and remembering the current leader epoch instead of a map. Thanks, Jun On Fri, Oct 10, 2014 at 5:33 PM, Guozhang Wang wangg...@gmail.com wrote: Hello all, I put some thoughts on enhancing our current message metadata format to solve a bunch of existing issues: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Enriched+Message+Metadata This wiki page is for kicking off some discussions about the feasibility of adding more info into the message header, and if possible how we would add them. -- Guozhang -- -- Guozhang
Re: [DISCUSSION] Message Metadata
I have updated the wiki page incorporating received comments. We can discuss some more details on: 1. How we want to do audit? Whether we want to have in-built auditing on brokers or even MMs or use an audit consumer to fetch all messages from just brokers. 2. How we can avoid de-/re-compression on brokers and MMs with log compaction turned on. 3. How we can resolve unclean leader election resulted data inconsistency with control messages. Guozhang On Sun, Oct 19, 2014 at 11:41 PM, Guozhang Wang wangg...@gmail.com wrote: Thanks for the detailed comments Jun! Some replies inlined. On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao jun...@gmail.com wrote: Hi, Guozhang, Thanks for the writeup. A few high level comments. 1. Associating (versioned) schemas to a topic can be a good thing overall. Yes, this could add a bit more management overhead in Kafka. However, it makes sure that the data format contract between a producer and a consumer is kept and managed in a central place, instead of in the application. The latter is probably easier to start with, but is likely to be brittle in the long run. I am actually not proposing to not support associated versioned schemas for topics, but to not let some core Kafka functionalities like auditing being depend on schemas. I think this alone can separate the schema management from Kafka piping management (i.e. making sure every single message is delivered, and within some latency, etc). Adding additional auditing info into an existing schema will force Kafka to be aware of the schema systems (Avro, JSON, etc). 2. Auditing can be a general feature that's useful for many applications. Such a feature can be implemented by extending the low level message format with a header. However, it can also be added as part of the schema management. For example, you can imagine a type of audited schema that adds additional auditing info to an existing schema automatically. Performance wise, it probably doesn't make a big difference whether the auditing info is added in the message header or the schema header. See replies above. 3. We talked about avoiding the overhead of decompressing in both the broker and the mirror maker. We probably need to think through how this works with auditing. In the more general case where you want to audit every message, one has to do the decompression to get the individual message, independent of how the auditing info is stored. This means that if we want to audit the broker directly or the consumer in mirror maker, we have to pay the decompression cost anyway. Similarly, if we want to extend mirror maker to support some customized filtering/transformation logic, we also have to pay the decompression cost. I see your point. For that I would prefer to have a MM implementation that is able to do de-compress / re-compress ONLY if required, for example by auditing, etc. I agree that we have not thought through whether we should enable auditing on MM, and if yes how to do that, and we could discuss about that in a different thread. Overall, this proposal is not just for tackling de-compression on MM but about the feasibility of extending Kafka message header for system properties / app properties. Some low level comments. 4. Broker offset reassignment (kafka-527): This probably can be done with just a format change on the compressed message set. That is true. As I mentioned in the wiki each of the problems may be resolvable separately but I am thinking about a general way to get all of them. 5. MirrorMaker refactoring: We probably can think through how general we want mirror maker to be. If we want to it to be more general, we likely need to decompress every message just like in a normal consumer. There will definitely be overhead. However, as long as mirror maker is made scalable, we can overcome the overhead by just running more instances on more hardware resources. As for the proposed message format change, we probably need to think through it a bit more. The honor-ship flag seems a bit hacky to me. Replied as part of 3). Sure we can discuss more about that, will update the wiki for collected comments. 6. Adding a timestamp in each message can be a useful thing. It (1) allows log segments to be rolled more accurately; (2) allows finding an offset for a particular timestamp more accurately. I am thinking that the timestamp in the message should probably be the time when the leader receives the message. Followers preserve the timestamp set by leader. To avoid time going back during leader change, the leader can probably set the timestamp to be the max of current time and the timestamp of the last message, if present. That timestamp can potentially be added to the index file to answer offsetBeforeTimestamp queries more efficiently. Agreed. 7. Log compaction: It seems that you are suggesting an improvement to compact the active segment as well. This can
Re: Review Request 26994: Patch for KAFKA-1719
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26994/#review57680 --- core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/26994/#comment98497 Just a stylish comment: could you group java imports with scala / other lib imports, and leave kafka imports at the top? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/26994/#comment98503 Can we add a FATAL log entry here: Consumer thread existed abnormally, stopping the whole mirror maker? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/26994/#comment98501 Ditto above. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/26994/#comment98499 Is this change intended? - Guozhang Wang On Oct. 21, 2014, 8:37 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26994/ --- (Updated Oct. 21, 2014, 8:37 p.m.) Review request for kafka. Bugs: KAFKA-1719 https://issues.apache.org/jira/browse/KAFKA-1719 Repository: kafka Description --- make mirror maker exit when one consumer/producer thread exits. Diffs - core/src/main/scala/kafka/tools/MirrorMaker.scala b8698ee1469c8fbc92ccc176d916eb3e28b87867 Diff: https://reviews.apache.org/r/26994/diff/ Testing --- Thanks, Jiangjie Qin
Re: [DISCUSSION] Message Metadata
Hi Jun, Regarding 4) in your comment, after thinking it for a while I cannot come up a way to it along with log compaction without adding new fields into the current format on message set. Do you have a better way that do not require protocol changes? Guozhang On Mon, Oct 20, 2014 at 9:53 AM, Guozhang Wang wangg...@gmail.com wrote: I have updated the wiki page incorporating received comments. We can discuss some more details on: 1. How we want to do audit? Whether we want to have in-built auditing on brokers or even MMs or use an audit consumer to fetch all messages from just brokers. 2. How we can avoid de-/re-compression on brokers and MMs with log compaction turned on. 3. How we can resolve unclean leader election resulted data inconsistency with control messages. Guozhang On Sun, Oct 19, 2014 at 11:41 PM, Guozhang Wang wangg...@gmail.com wrote: Thanks for the detailed comments Jun! Some replies inlined. On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao jun...@gmail.com wrote: Hi, Guozhang, Thanks for the writeup. A few high level comments. 1. Associating (versioned) schemas to a topic can be a good thing overall. Yes, this could add a bit more management overhead in Kafka. However, it makes sure that the data format contract between a producer and a consumer is kept and managed in a central place, instead of in the application. The latter is probably easier to start with, but is likely to be brittle in the long run. I am actually not proposing to not support associated versioned schemas for topics, but to not let some core Kafka functionalities like auditing being depend on schemas. I think this alone can separate the schema management from Kafka piping management (i.e. making sure every single message is delivered, and within some latency, etc). Adding additional auditing info into an existing schema will force Kafka to be aware of the schema systems (Avro, JSON, etc). 2. Auditing can be a general feature that's useful for many applications. Such a feature can be implemented by extending the low level message format with a header. However, it can also be added as part of the schema management. For example, you can imagine a type of audited schema that adds additional auditing info to an existing schema automatically. Performance wise, it probably doesn't make a big difference whether the auditing info is added in the message header or the schema header. See replies above. 3. We talked about avoiding the overhead of decompressing in both the broker and the mirror maker. We probably need to think through how this works with auditing. In the more general case where you want to audit every message, one has to do the decompression to get the individual message, independent of how the auditing info is stored. This means that if we want to audit the broker directly or the consumer in mirror maker, we have to pay the decompression cost anyway. Similarly, if we want to extend mirror maker to support some customized filtering/transformation logic, we also have to pay the decompression cost. I see your point. For that I would prefer to have a MM implementation that is able to do de-compress / re-compress ONLY if required, for example by auditing, etc. I agree that we have not thought through whether we should enable auditing on MM, and if yes how to do that, and we could discuss about that in a different thread. Overall, this proposal is not just for tackling de-compression on MM but about the feasibility of extending Kafka message header for system properties / app properties. Some low level comments. 4. Broker offset reassignment (kafka-527): This probably can be done with just a format change on the compressed message set. That is true. As I mentioned in the wiki each of the problems may be resolvable separately but I am thinking about a general way to get all of them. 5. MirrorMaker refactoring: We probably can think through how general we want mirror maker to be. If we want to it to be more general, we likely need to decompress every message just like in a normal consumer. There will definitely be overhead. However, as long as mirror maker is made scalable, we can overcome the overhead by just running more instances on more hardware resources. As for the proposed message format change, we probably need to think through it a bit more. The honor-ship flag seems a bit hacky to me. Replied as part of 3). Sure we can discuss more about that, will update the wiki for collected comments. 6. Adding a timestamp in each message can be a useful thing. It (1) allows log segments to be rolled more accurately; (2) allows finding an offset for a particular timestamp more accurately. I am thinking that the timestamp in the message should probably be the time when the leader receives the message. Followers preserve the timestamp set by leader. To avoid time going back during leader change, the leader can
Re: Review Request 24676: Fix KAFKA-1583
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/ --- (Updated Oct. 23, 2014, 1:53 a.m.) Review request for kafka. Summary (updated) - Fix KAFKA-1583 Bugs: KAFKA-1583 https://issues.apache.org/jira/browse/KAFKA-1583 Repository: kafka Description (updated) --- Incoporate Joel's comments after rebase Diffs (updated) - core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 core/src/main/scala/kafka/log/Log.scala 157d67369baabd2206a2356b2aa421e848adab17 core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 core/src/main/scala/kafka/server/KafkaApis.scala 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 Diff: https://reviews.apache.org/r/24676/diff/ Testing --- Unit tests Thanks, Guozhang Wang
Re: Review Request 24676: Fix KAFKA-1583
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/#review57235 --- On Oct. 23, 2014, 1:53 a.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/ --- (Updated Oct. 23, 2014, 1:53 a.m.) Review request for kafka. Bugs: KAFKA-1583 https://issues.apache.org/jira/browse/KAFKA-1583 Repository: kafka Description --- Incoporate Joel's comments after rebase Diffs - core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 core/src/main/scala/kafka/log/Log.scala 157d67369baabd2206a2356b2aa421e848adab17 core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 core/src/main/scala/kafka/server/KafkaApis.scala 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 Diff: https://reviews.apache.org/r/24676/diff/ Testing --- Unit tests Thanks, Guozhang Wang
Re: Review Request 26994: Patch for KAFKA-1719
On Oct. 22, 2014, 9:32 p.m., Neha Narkhede wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 271 https://reviews.apache.org/r/26994/diff/1/?file=727975#file727975line271 Is there any value in setting this to true? It seems that just checking if it is false and exiting the process suffices. Setting to true something that is called cleanShutdown, when in fact, it isn't a clean shutdown is confusing to read. Also good to add a FATAL log entry as suggested by Guozhang as well. The boolean is used when the internal threads tries to exist, when it is not set, the threads knows it is closing abnormally and hence goes on to handle it. I agree its name is a bit misleading, and probably we can just name it as isShuttingDown. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26994/#review57907 --- On Oct. 22, 2014, 10:04 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26994/ --- (Updated Oct. 22, 2014, 10:04 p.m.) Review request for kafka. Bugs: KAFKA-1719 https://issues.apache.org/jira/browse/KAFKA-1719 Repository: kafka Description --- Addressed Guozhang's comments. Diffs - core/src/main/scala/kafka/tools/MirrorMaker.scala b8698ee1469c8fbc92ccc176d916eb3e28b87867 Diff: https://reviews.apache.org/r/26994/diff/ Testing --- Thanks, Jiangjie Qin
Review Request 27101: Fix KAFKA-1501 by enabling reuse address
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27101/ --- Review request for kafka. Bugs: KAFKA-1501 https://issues.apache.org/jira/browse/KAFKA-1501 Repository: kafka Description --- needs repeated unit tests to verify Diffs - core/src/main/scala/kafka/network/SocketServer.scala cee76b323e5f3e4c783749ac9e78e1ef02897e3b Diff: https://reviews.apache.org/r/27101/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 26994: Patch for KAFKA-1719
On Oct. 22, 2014, 9:32 p.m., Neha Narkhede wrote: core/src/main/scala/kafka/tools/MirrorMaker.scala, line 271 https://reviews.apache.org/r/26994/diff/1/?file=727975#file727975line271 Is there any value in setting this to true? It seems that just checking if it is false and exiting the process suffices. Setting to true something that is called cleanShutdown, when in fact, it isn't a clean shutdown is confusing to read. Also good to add a FATAL log entry as suggested by Guozhang as well. Guozhang Wang wrote: The boolean is used when the internal threads tries to exist, when it is not set, the threads knows it is closing abnormally and hence goes on to handle it. I agree its name is a bit misleading, and probably we can just name it as isShuttingDown. Jiangjie Qin wrote: I'm thinking maybe we can use a separate flag in each thread to indicate whether it exits normally or not. So in the catch clause we set that flag to indicate the thread is exiting abnormally. That might be clearer. I personally think the flag-per-thread is an overkill here: when each thread is about to exist (i.e. in the finally block), all it needs to check is if the whole MM is currently shutdown or not, if it is, then the thread itself knows it exist normally, otherwise log the FATAL and force killing the MM. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26994/#review57907 --- On Oct. 22, 2014, 10:04 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26994/ --- (Updated Oct. 22, 2014, 10:04 p.m.) Review request for kafka. Bugs: KAFKA-1719 https://issues.apache.org/jira/browse/KAFKA-1719 Repository: kafka Description --- Addressed Guozhang's comments. Diffs - core/src/main/scala/kafka/tools/MirrorMaker.scala b8698ee1469c8fbc92ccc176d916eb3e28b87867 Diff: https://reviews.apache.org/r/26994/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 26994: Patch for KAFKA-1719
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26994/#review58168 --- Ship it! LGTM, one minor thing upon check in. core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/26994/#comment99094 fatal(Consumer thread failure due to , t) - Guozhang Wang On Oct. 23, 2014, 11:20 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26994/ --- (Updated Oct. 23, 2014, 11:20 p.m.) Review request for kafka. Bugs: KAFKA-1719 https://issues.apache.org/jira/browse/KAFKA-1719 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Neha and Guzhang's comments. Diffs - core/src/main/scala/kafka/tools/MirrorMaker.scala b8698ee1469c8fbc92ccc176d916eb3e28b87867 Diff: https://reviews.apache.org/r/26994/diff/ Testing --- Thanks, Jiangjie Qin
Re: ConsumerFetcherThread deadlock?
Jack, The fetchers are blocked on the queue since it is full, is your consumer iterator stopped and hence not getting more data from it? Guozhang On Thu, Oct 23, 2014 at 3:53 PM, Jack Foy j...@whitepages.com wrote: Hi all, We run kafka 0.8.1.1. We’re tracking down a problem where consumer groups stop pulling from their respective partitions a few minutes or hours into execution. It looks like all ConsumerFetcherThreads associated with that consumer are blocking while waiting to write data to a LinkedBlockingQueue. They are waiting on ConditionObjects with different object IDs, and those object IDs do not occur elsewhere within our snapshot of thread data. It appears that those threads never make progress once they enter this waiting state. KAFKA-937 looks like a very similar symptom: https://issues.apache.org/jira/browse/KAFKA-937 According to Jun Rao’s comments on that issue, a ConsumerFetcherThread should never be blocked. Is that still the case? Here’s the thread dump for the relevant threads. I can provide more information if needed. ConsumerFetcherThread-sumo-relay_kafka0.util.pages-1414089954648-5244fae6-0-0 prio=10 tid=0x7f1954a9c800 nid=0xbf0 waiting on condition [0x7f19339f8000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x8ac24dd8 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349) at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) at scala.collection.immutable.Map$Map3.foreach(Map.scala:154) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka.utils.Utils$.inLock(Utils.scala:538) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) ConsumerFetcherThread-sumo-relay_kafka0.util.pages-1414089954648-5244fae6-0-1 prio=10 tid=0x7f1955657000 nid=0xbf3 waiting on condition [0x7f19321e] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x8ad280e8 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:186) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2043) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:349) at kafka.consumer.PartitionTopicInfo.enqueue(PartitionTopicInfo.scala:60) at kafka.consumer.ConsumerFetcherThread.processPartitionData(ConsumerFetcherThread.scala:49) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:130) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1$$anonfun$apply$mcV$sp$2.apply(AbstractFetcherThread.scala:111) at scala.collection.immutable.Map$Map3.foreach(Map.scala:154) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply$mcV$sp(AbstractFetcherThread.scala:111) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$1.apply(AbstractFetcherThread.scala:111) at kafka.utils.Utils$.inLock(Utils.scala:538) at kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:110) at kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:88) at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:51) ConsumerFetcherThread-sumo-relay_kafka0.util.pages-1414089954648-5244fae6-0-2 prio=10 tid=0x7f1954001000 nid=0xbf1 waiting on condition [0x7f19326e5000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method)
Review Request 27214: Fix KAFKA-1501 by enabling reuse address
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27214/ --- Review request for kafka. Bugs: KAFKA-1501 https://issues.apache.org/jira/browse/KAFKA-1501 Repository: kafka Description --- unit tests 10/10 Diffs - core/src/main/scala/kafka/network/SocketServer.scala cee76b323e5f3e4c783749ac9e78e1ef02897e3b core/src/test/scala/unit/kafka/utils/TestUtils.scala dd3640f47b26a4ff1515c2dc7e964550ac35b7b2 Diff: https://reviews.apache.org/r/27214/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 26885: Patch for KAFKA-1642
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26885/#review58575 --- Ship it! LGTM, with one minor comment below. clients/src/main/java/org/apache/kafka/clients/NetworkClient.java https://reviews.apache.org/r/26885/#comment99633 The comments When connecting or connected, this handles slow/stalled connections here are a bit misleading: after checking the code I realize connectionDelay is only triggered to detemine the delay in milis that we can re-check connectivity for node that is not connected, and hence if the node is connected again while we are determining its delay, we just set it to MAX. Instead of making it general to the KafkaClient interface, shall we just add this to the code block of line 155? - Guozhang Wang On Oct. 23, 2014, 11:19 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/26885/ --- (Updated Oct. 23, 2014, 11:19 p.m.) Review request for kafka. Bugs: KAFKA-1642 https://issues.apache.org/jira/browse/KAFKA-1642 Repository: kafka Description --- Fixes two issues with the computation of ready nodes and poll timeouts in Sender/RecordAccumulator: 1. The timeout was computed incorrectly because it took into account all nodes, even if they had data to send such that their timeout would be 0. However, nodes were then filtered based on whether it was possible to send (i.e. their connection was still good) which could result in nothing to send and a 0 timeout, resulting in busy looping. Instead, the timeout needs to be computed only using data that cannot be immediately sent, i.e. where the timeout will be greater than 0. This timeout is only used if, after filtering by whether connections are ready for sending, there is no data to be sent. Other events can wake the thread up earlier, e.g. a client reconnects and becomes ready again. 2. One of the conditions indicating whether data is sendable is whether a timeout has expired -- either the linger time or the retry backoff. This condition wasn't accounting for both cases properly, always using the linger time. This means the retry backoff was probably not being respected. KAFKA-1642 Compute correct poll timeout when all nodes have sendable data but none can send data because they are in a connection backoff period. Addressing Jun's comments. Diffs - clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java d304660f29246e9600efe3ddb28cfcc2b074bed3 clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 29658d4a15f112dc0af5ce517eaab93e6f00134b clients/src/main/java/org/apache/kafka/clients/NetworkClient.java eea270abb16f40c9f3b47c4ea96be412fb4fdc8b clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java c5d470011d334318d5ee801021aadd0c000974a6 clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 8ebe7ed82c9384b71ce0cc3ddbef2c2325363ab9 clients/src/test/java/org/apache/kafka/clients/MockClient.java aae8d4a1e98279470587d397cc779a9baf6fee6c clients/src/test/java/org/apache/kafka/clients/producer/RecordAccumulatorTest.java 0762b35abba0551f23047348c5893bb8c9acff14 Diff: https://reviews.apache.org/r/26885/diff/ Testing --- Thanks, Ewen Cheslack-Postava
[DISCUSSION] Nested compression in Kafka?
Hello folks, I came across this testComplexCompressDecompress in kafka.message.MessageCompressionTest while I'm working some consumer decompression optimization. This test checks if nested compression is supported. I remember vaguely that some time ago we decide not to support nested compression at Kafka, and in the new producer's MemoryRecords I also make this assumption in this iterator implementation. Is that still the case? If yes shall we remove this test case? -- Guozhang
Review Request 27256: Fix KAFKA-1735
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27256/ --- Review request for kafka. Bugs: KAFKA-1735 https://issues.apache.org/jira/browse/KAFKA-1735 Repository: kafka Description --- Handle partial reads from compressed stream Diffs - clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 040e5b91005edb8f015afdfa76fd94e0bf3cb4ca Diff: https://reviews.apache.org/r/27256/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27256: Fix KAFKA-1735
On Oct. 28, 2014, 1:17 a.m., Neha Narkhede wrote: clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java, line 207 https://reviews.apache.org/r/27256/diff/1/?file=734746#file734746line207 Would it be possible to add a unit test for this? Sure. This scenario is covered for the old consumer in ZookeeperConsumerCompressionTest, and I originally plan to migrate to the new consumer unit test in the future; thinking it twice I now feel better to add it to the MemoryRecordsTest. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27256/#review58744 --- On Oct. 27, 2014, 11:59 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27256/ --- (Updated Oct. 27, 2014, 11:59 p.m.) Review request for kafka. Bugs: KAFKA-1735 https://issues.apache.org/jira/browse/KAFKA-1735 Repository: kafka Description --- Handle partial reads from compressed stream Diffs - clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 040e5b91005edb8f015afdfa76fd94e0bf3cb4ca Diff: https://reviews.apache.org/r/27256/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 24676: Fix KAFKA-1583
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/ --- (Updated Oct. 28, 2014, 10:09 p.m.) Review request for kafka. Bugs: KAFKA-1583 https://issues.apache.org/jira/browse/KAFKA-1583 Repository: kafka Description (updated) --- Incoporated Joel's comments round two Diffs (updated) - core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 core/src/main/scala/kafka/log/Log.scala 157d67369baabd2206a2356b2aa421e848adab17 core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 core/src/main/scala/kafka/server/KafkaApis.scala 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 Diff: https://reviews.apache.org/r/24676/diff/ Testing --- Unit tests Thanks, Guozhang Wang
Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- The timestamp field of OffsetAndMetadata is preserved since we need to be backward compatible with older versions Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/server/KafkaApis.scala 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 core/src/main/scala/kafka/server/OffsetManager.scala 2957bc435102bc4004d8f100dbcdd56287c8ffae core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: [DISCUSSION] Nested compression in Kafka?
Thanks Jun and Neha. I will go ahead and file the JIRA to remove this test case then.
Review Request 27430: Fix KAFKA-1720
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27430/ --- Review request for kafka. Bugs: KAFKA-1720 https://issues.apache.org/jira/browse/KAFKA-1720 Repository: kafka Description --- Rename delayed requests to delayed operations, change some class names Diffs - core/src/main/scala/kafka/cluster/Partition.scala 1be57008e983fc3a831626ecf9a861f164fcca92 core/src/main/scala/kafka/server/DelayedFetch.scala 1ccbb4b6fdbbd4412ba77ffe7d4cf5adf939e439 core/src/main/scala/kafka/server/DelayedProduce.scala 8049e07e5d6d65913ec2492f1e22e5ed3ecbbea8 core/src/main/scala/kafka/server/DelayedRequestKey.scala 628ef59564b9b9238d7b05d26aef79d3cfec174d core/src/main/scala/kafka/server/ReplicaManager.scala 02fa3821271e97b24fd2ae25163933222797585d core/src/main/scala/kafka/server/RequestPurgatory.scala 323b12e765f981e9bba736a204e4a8159e5c5ada core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a7720d579ea15b71511c9da0e241bd087de3674e system_test/metrics.json cd3fc142176b8a3638db5230fb74f055c04c59d4 Diff: https://reviews.apache.org/r/27430/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 24676: Fix KAFKA-1583
On Oct. 31, 2014, 6:08 p.m., Jun Rao wrote: core/src/main/scala/kafka/cluster/Partition.scala, line 236 https://reviews.apache.org/r/24676/diff/13/?file=735940#file735940line236 Could we get rid of = since this method is supposed to not return any value? Thanks Jun. I will address this comment in KAFKA-1720 - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/#review59370 --- On Oct. 28, 2014, 10:09 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/24676/ --- (Updated Oct. 28, 2014, 10:09 p.m.) Review request for kafka. Bugs: KAFKA-1583 https://issues.apache.org/jira/browse/KAFKA-1583 Repository: kafka Description --- Incoporated Joel's comments round two Diffs - core/src/main/scala/kafka/api/FetchRequest.scala 59c09155dd25fad7bed07d3d00039e3dc66db95c core/src/main/scala/kafka/api/FetchResponse.scala 8d085a1f18f803b3cebae4739ad8f58f95a6c600 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 861a6cf11dc6b6431fcbbe9de00c74a122f204bd core/src/main/scala/kafka/api/ProducerRequest.scala b2366e7eedcac17f657271d5293ff0bef6f3cbe6 core/src/main/scala/kafka/api/ProducerResponse.scala a286272c834b6f40164999ff8b7f8998875f2cfe core/src/main/scala/kafka/cluster/Partition.scala e88ecf224a4dab8bbd26ba7b0c3ccfe844c6b7f4 core/src/main/scala/kafka/common/ErrorMapping.scala 880ab4a004f078e5d84446ea6e4454ecc06c95f2 core/src/main/scala/kafka/log/Log.scala 157d67369baabd2206a2356b2aa421e848adab17 core/src/main/scala/kafka/network/BoundedByteBufferSend.scala a624359fb2059340bb8dc1619c5b5f226e26eb9b core/src/main/scala/kafka/server/DelayedFetch.scala e0f14e25af03e6d4344386dcabc1457ee784d345 core/src/main/scala/kafka/server/DelayedProduce.scala 9481508fc2d6140b36829840c337e557f3d090da core/src/main/scala/kafka/server/FetchRequestPurgatory.scala ed1318891253556cdf4d908033b704495acd5724 core/src/main/scala/kafka/server/KafkaApis.scala 85498b4a1368d3506f19c4cfc64934e4d0ac4c90 core/src/main/scala/kafka/server/OffsetManager.scala 43eb2a35bb54d32c66cdb94772df657b3a104d1a core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala d4a7d4a79b44263a1f7e1a92874dd36aa06e5a3f core/src/main/scala/kafka/server/ReplicaManager.scala 78b7514cc109547c562e635824684fad581af653 core/src/main/scala/kafka/server/RequestPurgatory.scala 9d76234bc2c810ec08621dc92bb4061b8e7cd993 core/src/main/scala/kafka/utils/DelayedItem.scala d7276494072f14f1cdf7d23f755ac32678c5675c core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 209a409cb47eb24f83cee79f4e064dbc5f5e9d62 core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala fb61d552f2320fedec547400fbbe402a0b2f5d87 core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala 03a424d45215e1e7780567d9559dae4d0ae6fc29 core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala cd302aa51eb8377d88b752d48274e403926439f2 core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala a9c4ddc78df0b3695a77a12cf8cf25521a203122 core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a577f4a8bf420a5bc1e62fad6d507a240a42bcaa core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala 3804a114e97c849cae48308997037786614173fc core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala 09ed8f5a7a414ae139803bf82d336c2d80bf4ac5 Diff: https://reviews.apache.org/r/24676/diff/ Testing --- Unit tests Thanks, Guozhang Wang
Re: Review Request 27430: Fix KAFKA-1720
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27430/ --- (Updated Nov. 1, 2014, 12:21 a.m.) Review request for kafka. Bugs: KAFKA-1720 https://issues.apache.org/jira/browse/KAFKA-1720 Repository: kafka Description --- Rename delayed requests to delayed operations, change some class names Diffs (updated) - core/src/main/scala/kafka/cluster/Partition.scala 1be57008e983fc3a831626ecf9a861f164fcca92 core/src/main/scala/kafka/server/DelayedFetch.scala 1ccbb4b6fdbbd4412ba77ffe7d4cf5adf939e439 core/src/main/scala/kafka/server/DelayedProduce.scala 8049e07e5d6d65913ec2492f1e22e5ed3ecbbea8 core/src/main/scala/kafka/server/DelayedRequestKey.scala 628ef59564b9b9238d7b05d26aef79d3cfec174d core/src/main/scala/kafka/server/ReplicaManager.scala 3007a6d89b637b93f71fdb7adab561a93d9c4c62 core/src/main/scala/kafka/server/RequestPurgatory.scala 323b12e765f981e9bba736a204e4a8159e5c5ada core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala a7720d579ea15b71511c9da0e241bd087de3674e system_test/metrics.json cd3fc142176b8a3638db5230fb74f055c04c59d4 Diff: https://reviews.apache.org/r/27430/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27534: Patch for KAFKA-1746
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27534/#review59898 --- Where will testcaseEnv.validationStatusDict[Test completed] be used? - Guozhang Wang On Nov. 3, 2014, 7:46 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27534/ --- (Updated Nov. 3, 2014, 7:46 p.m.) Review request for kafka. Bugs: KAFKA-1746 https://issues.apache.org/jira/browse/KAFKA-1746 Repository: kafka Description --- KAFKA-1746 Make system tests return a useful exit code. KAFKA-1746 Check the exit code when running DumpLogSegments to verify data. Diffs - system_test/mirror_maker_testsuite/mirror_maker_test.py c0117c64cbb7687ca8fbcec6b5c188eb880300ef system_test/offset_management_testsuite/offset_management_test.py 12b5cd25140e1eb407dd57eef63d9783257688b2 system_test/replication_testsuite/replica_basic_test.py 660006cc253bbae3e7cd9f02601f1c1937dd1714 system_test/system_test_runner.py ee7aa252333553e8eb0bc046edf968ec99dddb70 system_test/utils/kafka_system_test_utils.py 1093b660ebd0cb5ab6d3731d26f151e1bf717f8a Diff: https://reviews.apache.org/r/27534/diff/ Testing --- Thanks, Ewen Cheslack-Postava
Re: Review Request 27535: Patch for KAFKA-1747
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27535/#review59899 --- Ship it! Thanks for the patch. LGTM. - Guozhang Wang On Nov. 3, 2014, 7:46 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27535/ --- (Updated Nov. 3, 2014, 7:46 p.m.) Review request for kafka. Bugs: KAFKA-1747 https://issues.apache.org/jira/browse/KAFKA-1747 Repository: kafka Description --- KAKFA-1747 Fix TestcaseEnv so state isn't shared between instances. Diffs - system_test/utils/testcase_env.py b3c29105c04348f036efbbdc430e14e099ca8c70 Diff: https://reviews.apache.org/r/27535/diff/ Testing --- Thanks, Ewen Cheslack-Postava
Re: Review Request 27534: Patch for KAFKA-1746
On Nov. 5, 2014, 1 a.m., Guozhang Wang wrote: Where will testcaseEnv.validationStatusDict[Test completed] be used? Ewen Cheslack-Postava wrote: That's where all the validation results (the test's assertions) are stored. It gets scanned through at the end of the test run in system_test_runner.py to generate the report and count the number of failures. The setup is a bit confusing because it's named by testcaseEnv.validationStatusDict and testcaseEnv.testcaseResultsDict[validation_status] (see TestcaseEnv's constructor). I see. Thanks. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27534/#review59898 --- On Nov. 3, 2014, 7:46 p.m., Ewen Cheslack-Postava wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27534/ --- (Updated Nov. 3, 2014, 7:46 p.m.) Review request for kafka. Bugs: KAFKA-1746 https://issues.apache.org/jira/browse/KAFKA-1746 Repository: kafka Description --- KAFKA-1746 Make system tests return a useful exit code. KAFKA-1746 Check the exit code when running DumpLogSegments to verify data. Diffs - system_test/mirror_maker_testsuite/mirror_maker_test.py c0117c64cbb7687ca8fbcec6b5c188eb880300ef system_test/offset_management_testsuite/offset_management_test.py 12b5cd25140e1eb407dd57eef63d9783257688b2 system_test/replication_testsuite/replica_basic_test.py 660006cc253bbae3e7cd9f02601f1c1937dd1714 system_test/system_test_runner.py ee7aa252333553e8eb0bc046edf968ec99dddb70 system_test/utils/kafka_system_test_utils.py 1093b660ebd0cb5ab6d3731d26f151e1bf717f8a Diff: https://reviews.apache.org/r/27534/diff/ Testing --- Thanks, Ewen Cheslack-Postava
Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Nov. 6, 2014, 11:35 p.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- The timestamp field of OffsetAndMetadata is preserved since we need to be backward compatible with older versions Diffs (updated) - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/server/KafkaApis.scala 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 core/src/main/scala/kafka/server/KafkaServer.scala 4de812374e8fb1fed834d2be3f9655f55b511a74 core/src/main/scala/kafka/server/OffsetManager.scala 2957bc435102bc4004d8f100dbcdd56287c8ffae core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27723: Patch for KAFKA-1739
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27723/#review60359 --- Ship it! - Guozhang Wang On Nov. 7, 2014, 12:26 p.m., Manikumar Reddy O wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27723/ --- (Updated Nov. 7, 2014, 12:26 p.m.) Review request for kafka. Bugs: KAFKA-1739 https://issues.apache.org/jira/browse/KAFKA-1739 Repository: kafka Description --- Removed testComplexCompressDecompress from MessageCompressionTest class Diffs - core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala 0bb275d0dc840e40289e488b1a00aca2e26fe6f9 Diff: https://reviews.apache.org/r/27723/diff/ Testing --- Thanks, Manikumar Reddy O
Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp
On Nov. 7, 2014, 3 a.m., Joel Koshy wrote: clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java, line 152 https://reviews.apache.org/r/27391/diff/2/?file=753752#file753752line152 This confused me a bit, and I think it is because initCommonFields is intended to initialize fields common to all versions of the request. It is a useful helper method but it becomes somewhat clunky when removing fields. The partition-level timestamp is no longer a common field. If this is v2 then we should _never_ set anything in the timestamp field of the struct; and if it is v2 then we should _always_ set the timestamp field (even if it is the default). However, since the timestamp field in the Field declaration for OFFSET_COMMIT_REQUEST_PARTITION_V0 does not have a default explicitly specified, I think this will break with a SchemaException(missing value...) for offset commit request v0, v1 if we choose to write to a bytebuffer under those versions with this code. One option is to explicitly pass in the constructor version (0, 1, 2) to initCommonFields and use that to decide whether to include/exclude this field, but that is weird. Another alternative is a separate helper method for v0v1. That is even weirder. Actually, the partition-level timestamp is still a commen field (we are just deprecating it, and chose to not serialize / de-ser in v2). I agree this is a bit wired as it is written in this way, I thought about this when I started the first version but did not come up with a better approach. On Nov. 7, 2014, 3 a.m., Joel Koshy wrote: core/src/main/scala/kafka/common/OffsetMetadataAndError.scala, line 22 https://reviews.apache.org/r/27391/diff/2/?file=753755#file753755line22 Can we mark this @Deprecated as well? We should probably make the primary constructor without timestamp and add a secondary constructor with timestamp and mark deprecated there. Also, can we use case class.copy if timestamp needs to be modified? However, per comment further down I don't think it needs to be touched. Actually we cannot make it deprecated as it will be preserved even in the new version, right? Note this is not used for the wire protocol but for the cache / disk format. On Nov. 7, 2014, 3 a.m., Joel Koshy wrote: core/src/main/scala/kafka/server/OffsetManager.scala, line 498 https://reviews.apache.org/r/27391/diff/2/?file=753758#file753758line498 Actually, since we always set the retention period (for v0, v1) in KafkaApis do we need to even touch this timestamp? i.e., we should basically ignore it right? So we only need to do: value.set(VALUE_TIMESTAMP_FIELD, expirationTimestamp). I think note. In v0/v1, if the timestamp is explicitly specified (i.e. not -1) we need to use it as the expiration timestamp, or at least that was how I understood the semantics. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review60285 --- On Nov. 6, 2014, 11:35 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Nov. 6, 2014, 11:35 p.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- The timestamp field of OffsetAndMetadata is preserved since we need to be backward compatible with older versions Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/server/KafkaApis.scala 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 core/src/main/scala/kafka/server/KafkaServer.scala 4de812374e8fb1fed834d2be3f9655f55b511a74 core/src/main/scala/kafka/server/OffsetManager.scala 2957bc435102bc4004d8f100dbcdd56287c8ffae core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 Diff: https://reviews.apache.org/r/27391/diff/ Testing
Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp
On Nov. 7, 2014, 3 a.m., Joel Koshy wrote: core/src/main/scala/kafka/common/OffsetMetadataAndError.scala, line 22 https://reviews.apache.org/r/27391/diff/2/?file=753755#file753755line22 Can we mark this @Deprecated as well? We should probably make the primary constructor without timestamp and add a secondary constructor with timestamp and mark deprecated there. Also, can we use case class.copy if timestamp needs to be modified? However, per comment further down I don't think it needs to be touched. Guozhang Wang wrote: Actually we cannot make it deprecated as it will be preserved even in the new version, right? Note this is not used for the wire protocol but for the cache / disk format. I should say not only for the wire protocol but also for cache disk storage format. And thinking about this twice, I will change to two separate classes, one for wire protocol and one for server storage format. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review60285 --- On Nov. 6, 2014, 11:35 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Nov. 6, 2014, 11:35 p.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- The timestamp field of OffsetAndMetadata is preserved since we need to be backward compatible with older versions Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/server/KafkaApis.scala 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 core/src/main/scala/kafka/server/KafkaServer.scala 4de812374e8fb1fed834d2be3f9655f55b511a74 core/src/main/scala/kafka/server/OffsetManager.scala 2957bc435102bc4004d8f100dbcdd56287c8ffae core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Nov. 8, 2014, 12:54 a.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- The timestamp field of OffsetAndMetadata is preserved since we need to be backward compatible with older versions Diffs (updated) - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala fbc680fde21b02f11285a4f4b442987356abd17b core/src/main/scala/kafka/server/KafkaApis.scala 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 core/src/main/scala/kafka/server/KafkaServer.scala 4de812374e8fb1fed834d2be3f9655f55b511a74 core/src/main/scala/kafka/server/OffsetManager.scala 2957bc435102bc4004d8f100dbcdd56287c8ffae core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Support for topics+streams at class WildcardStreamsHandler
Hi Alan, The reason we do not have per-topic parallelism spec in wildcard is two folds: 1) we use a per-topic hash-based partition algorithm, and hence having each topic with the same num. of streams may give us better load balance, 2) with the topicFilter we will not know exactly which topics to consume at the construction time, hence no way to specify per-topic specs. 1) has been lifted since we have implemented new partitioning algorithm, and for 2) we need to think about how to support it if we really want to, perhaps we can also use a regex-ed topic-count map, while ensuring that each regex in the map is precedent of the topic filter, and no overlap with each other, etc. What is your usecase that requires per-topic numStream spec? Guozhang On Sun, Nov 9, 2014 at 6:03 AM, Alan Lavintman alan.lavint...@gmail.com wrote: Hi guys, i have seen that if create a message stream by using: createMessageStreams I can define a map with Topic-#Streams Is there a reason why createMessageStreamsByFilter us not giving the same support? I have only a TopicFilter and numStreams interface such as: public ListKafkaStreambyte[], byte[] createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams); But it does not allow me to specify the parallelism per topic. Am I missing something or my assumption is correct? Bests and thanks, Alan. -- -- Guozhang
Re: New Java producer question
Joe, Are you looking for this? /** coderetries/code */ public static final String RETRIES_CONFIG = retries; Guozhang On Mon, Nov 10, 2014 at 8:11 AM, Joe Stein joe.st...@stealth.ly wrote: In the new Java producer I see that you can set RETRY_BACKOFF_MS_CONFIG https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java#L112 which is used https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java#L217-L226 but I don't see (maybe just missing it??? I haven't spent a lot of quality time with the code yet) how to set the number of retry this happens before failure as we have in the existing producer https://github.com/apache/kafka/blob/0.8.2/core/src/main/scala/kafka/producer/ProducerConfig.scala#L98 or is this done another way or no longer required or used??? /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / -- -- Guozhang
Re: Question about ZookeeperConsumerConnector
Thanks Jun! Jiangjie, could you file a JIRA? Thanks. Guozhang On Tue, Nov 11, 2014 at 9:27 AM, Jun Rao jun...@gmail.com wrote: Hi, Jiangjie, Thanks for the investigation. Yes, this seems like a real issue. 1. It doesn't seem that we need to put the shutdownCommand back into the queue. Once an iterator receives a shutdownCommand, it will be in a Done state and will remain in that state forever. 2. Yes, we just need to get the unique set of queues and put in a shutdownCommand per queue. Jun On Mon, Nov 10, 2014 at 7:27 PM, Becket Qin becket@gmail.com wrote: Hi, We encountered a production issue recently that Mirror Maker could not properly shutdown because ZookeeperConsumerConnector is blocked on shutdown(). After looking into the code, we found 2 issues that caused this problem. 1. After consumer iterator receives the shutdownCommand, It puts the shutdownCommand back into the data chunk queue. Is there any reason for doing this? 2. In ZookeeperConsumerConnector shutdown(), we could potentially put multiple shutdownCommand into the same data chunk queue, provided the topics are sharing the same data chunk queue in topicThreadIdAndQueues. (KAFKA-1764 is opened) In our case, we only have 1 consumer stream for all the topics, the data chunk queue capacity is set to 1. The execution sequence causing problem is as below: 1. ZookeeperConsumerConnector shutdown() is called, it tries to put shutdownCommand for each queue in topicThreadIdAndQueues. Since we only have 1 queue, multiple shutdownCommand will be put into the queue. 2. In sendShutdownToAllQueues(), between queue.clean() and queue.put(shutdownCommand), consumer iterator receives the shutdownCommand and put it back into the data chunk queue. After that, ZookeeperConsumerConnector tries to put another shutdownCommand into the data chunk queue but will block forever. The thread stack trace is as below: Thread-23 #58 prio=5 os_prio=0 tid=0x7ff440004800 nid=0x40a waiting on condition [0x7ff4f0124000] java.lang.Thread.State: WAITING (parking) at sun.misc.Unsafe.park(Native Method) - parking to wait for 0x000680b96bf0 (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject) at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175) at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039) at java.util.concurrent.LinkedBlockingQueue.put(LinkedBlockingQueue.java:350) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$sendShutdownToAllQueues$1.apply(ZookeeperConsumerConnector.scala:262) at kafka.consumer.ZookeeperConsumerConnector$$anonfun$sendShutdownToAllQueues$1.apply(ZookeeperConsumerConnector.scala:259) at scala.collection.Iterator$class.foreach(Iterator.scala:727) at scala.collection.AbstractIterator.foreach(Iterator.scala:1157) at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) at scala.collection.AbstractIterable.foreach(Iterable.scala:54) at kafka.consumer.ZookeeperConsumerConnector.sendShutdownToAllQueues(ZookeeperConsumerConnector.scala:259) at kafka.consumer.ZookeeperConsumerConnector.liftedTree1$1(ZookeeperConsumerConnector.scala:199) at kafka.consumer.ZookeeperConsumerConnector.shutdown(ZookeeperConsumerConnector.scala:192) - locked 0x000680dd5848 (a java.lang.Object) at kafka.tools.MirrorMaker$$anonfun$cleanShutdown$1.apply(MirrorMaker.scala:185) at kafka.tools.MirrorMaker$$anonfun$cleanShutdown$1.apply(MirrorMaker.scala:185) at scala.collection.immutable.List.foreach(List.scala:318) at kafka.tools.MirrorMaker$.cleanShutdown(MirrorMaker.scala:185) at kafka.tools.MirrorMaker$$anon$1.run(MirrorMaker.scala:169) Thanks. Jiangjie (Becket) Qin -- -- Guozhang
Re: Review Request 27890: Patch for KAFKA-1764
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27890/#review61001 --- Could you also remove the line in consumer that sends back the shutdown command? - Guozhang Wang On Nov. 11, 2014, 10:59 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27890/ --- (Updated Nov. 11, 2014, 10:59 p.m.) Review request for kafka. Bugs: KAFKA-1764 https://issues.apache.org/jira/browse/KAFKA-1764 Repository: kafka Description --- fix for KAFKA-1764 Diffs - core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala fbc680fde21b02f11285a4f4b442987356abd17b Diff: https://reviews.apache.org/r/27890/diff/ Testing --- Thanks, Jiangjie Qin
Re: Kafka Command Line Shell
Thanks Joe. I will read the wiki page. On Tue, Nov 11, 2014 at 11:47 PM, Joe Stein joe.st...@stealth.ly wrote: I started writing this up on the wiki https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Command+Line+and+Related+Improvements Instead of starting a new thread I figure just continue this one I started. I also added another (important) component for centralized management of configuration as global level much like we have topic level. These global configuration would be overridden (perhaps not all) from the server.properties on start (so like in case one broker needs a different port, sure). One concern I have is that using RQ/RP wire protocol to the controller instead of the current way (via ZK admin path) may expose concurrency on the admin requests, which may not be supported yet. Guozhang, take a look at the diagram how I am thinking of this it would be a new handle request that will execute the tools pretty much how they are today. My thinking is maybe to-do one at a time (so TopicCommand first I think) and have what the TopicCommand is doing happen on server and send the RQ/RP to the client but execute on the server. If there is something not supported we will of course have to deal with that and implement it for sure. Once we get one working end to end I think adding the rest will be (more or less) concise iterations to get it done. I added your concern to the wiki under the gotchas section. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / On Mon, Oct 20, 2014 at 2:15 AM, Guozhang Wang wangg...@gmail.com wrote: One concern I have is that using RQ/RP wire protocol to the controller instead of the current way (via ZK admin path) may expose concurrency on the admin requests, which may not be supported yet. Some initial discussion about this is on KAFKA-1305. Guozhang On Sun, Oct 19, 2014 at 1:55 PM, Joe Stein joe.st...@stealth.ly wrote: Maybe we should add some AdminMessage RQ/RP wire protocol structure(s) and let the controller handle it? We could then build the CLI and Shell in the project both as useful tools and samples for others. Making a http interface should be simple after KAFKA-1494 is done which all client libraries could offer. I will update the design tonight/tomorrow and should be able to have someone starting to work on it this week. /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop / On Oct 19, 2014 1:21 PM, Harsha ka...@harsha.io wrote: +1 for Web Api On Sat, Oct 18, 2014, at 11:48 PM, Glen Mazza wrote: Apache Karaf has been doing this for quite a few years, albeit in Java not Scala. Still, their coding approach to creating a CLI probably captures many lessons learned over that time. Glen On 10/17/2014 08:03 PM, Joe Stein wrote: Hi, I have been thinking about the ease of use for operations with Kafka. We have lots of tools doing a lot of different things and they are all kind of in different places. So, what I was thinking is to have a single interface for our tooling https://issues.apache.org/jira/browse/KAFKA-1694 This would manifest itself in two ways 1) a command line interface 2) a repl We would have one entry point centrally for all Kafka commands. kafka CMD ARGS kafka createTopic --brokerList etc, kafka reassignPartition --brokerList etc, or execute and run the shell kafka --brokerList localhost kafkause topicName; kafkaset acl='label'; I was thinking that all calls would be initialized through --brokerList and the broker can tell the KafkaCommandTool what server to connect to for MetaData. Thoughts? Tomatoes? /*** Joe Stein Founder, Principal Consultant Big Data Open Source Security LLC http://www.stealth.ly Twitter: @allthingshadoop http://www.twitter.com/allthingshadoop / -- -- Guozhang -- -- Guozhang
Re: Review Request 27890: Patch for KAFKA-1764
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27890/#review61099 --- Ship it! Ship It! - Guozhang Wang On Nov. 12, 2014, 10:05 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27890/ --- (Updated Nov. 12, 2014, 10:05 p.m.) Review request for kafka. Bugs: KAFKA-1764 https://issues.apache.org/jira/browse/KAFKA-1764 Repository: kafka Description --- Changed Consumer iterator to stop putting the shutdown message back into channel. Diffs - core/src/main/scala/kafka/consumer/ConsumerIterator.scala ac491b4da2583ef7227c67f5b8bc0fd731d705c3 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala fbc680fde21b02f11285a4f4b442987356abd17b Diff: https://reviews.apache.org/r/27890/diff/ Testing --- Thanks, Jiangjie Qin
Re: [DISCUSSION] Message Metadata
Hi Jun, Sorry for the delay on your comments in the wiki page as well as this thread; quite swamped now. I will get back to you as soon as I find some time. Guozhang On Tue, Nov 11, 2014 at 6:26 PM, Jun Rao jun...@gmail.com wrote: Thinking about this a bit more. For adding the auditing support, I am not sure if we need to change the message format by adding the application tags. An alternative way to do that is to add it in the producer client. For example, for each message payload (doesn't matter what the serialization mechanism is) that a producer receives, the producer can just add a header before the original payload. The header will contain all needed fields (e.g. timestamp, host, etc) for the purpose of auditing. This way, we don't need to change the message format and the auditing info can be added independent of the serialization mechanism of the message. The header can use a different serialization mechanism for better efficiency. For example, if we use Avro to serialize the header, the encoded bytes won't include the field names in the header. This is potentially more efficient than representing those fields as application tags in the message where the tags have to be explicitly store in every message. To make it easier for the client to add and make use of this kind of auditing support, I was imagining that we can add a ProducerFactory in the new java client. The ProducerFactory will create an instance of Producer based on a config property. By default, the current KafkaProducer will be returned. However, a user can plug in a different implementation of Producer that does auditing. For example, an implementation of an AuditProducer.send() can take the original ProducerRecord, add the header to the value byte array and then forward the record to an underlying KafkaProducer. We can add a similar ConsumerFactory to the new consumer client. If a user plugs in an implementation of the AuditingConsumer, the consumer will then be audited automatically. Thanks, Jun On Tue, Oct 21, 2014 at 4:06 PM, Guozhang Wang wangg...@gmail.com wrote: Hi Jun, Regarding 4) in your comment, after thinking it for a while I cannot come up a way to it along with log compaction without adding new fields into the current format on message set. Do you have a better way that do not require protocol changes? Guozhang On Mon, Oct 20, 2014 at 9:53 AM, Guozhang Wang wangg...@gmail.com wrote: I have updated the wiki page incorporating received comments. We can discuss some more details on: 1. How we want to do audit? Whether we want to have in-built auditing on brokers or even MMs or use an audit consumer to fetch all messages from just brokers. 2. How we can avoid de-/re-compression on brokers and MMs with log compaction turned on. 3. How we can resolve unclean leader election resulted data inconsistency with control messages. Guozhang On Sun, Oct 19, 2014 at 11:41 PM, Guozhang Wang wangg...@gmail.com wrote: Thanks for the detailed comments Jun! Some replies inlined. On Sun, Oct 19, 2014 at 7:42 PM, Jun Rao jun...@gmail.com wrote: Hi, Guozhang, Thanks for the writeup. A few high level comments. 1. Associating (versioned) schemas to a topic can be a good thing overall. Yes, this could add a bit more management overhead in Kafka. However, it makes sure that the data format contract between a producer and a consumer is kept and managed in a central place, instead of in the application. The latter is probably easier to start with, but is likely to be brittle in the long run. I am actually not proposing to not support associated versioned schemas for topics, but to not let some core Kafka functionalities like auditing being depend on schemas. I think this alone can separate the schema management from Kafka piping management (i.e. making sure every single message is delivered, and within some latency, etc). Adding additional auditing info into an existing schema will force Kafka to be aware of the schema systems (Avro, JSON, etc). 2. Auditing can be a general feature that's useful for many applications. Such a feature can be implemented by extending the low level message format with a header. However, it can also be added as part of the schema management. For example, you can imagine a type of audited schema that adds additional auditing info to an existing schema automatically. Performance wise, it probably doesn't make a big difference whether the auditing info is added in the message header or the schema header. See replies above. 3. We talked about avoiding the overhead of decompressing in both the broker and the mirror maker. We probably need to think through how this works with auditing. In the more general case where you want to audit every
Re: Kafka Simple Consumer API for 0.9
Hi Dibyendu, Yes we are changing the consumer API in 0.9, which will be different with the current high-level consumer API. We are also trying to figure out a way to preserve the functionality: KAFKA-1655 https://issues.apache.org/jira/browse/KAFKA-1655 Guozhang On Fri, Nov 14, 2014 at 5:29 PM, Dibyendu Bhattacharya dibyendu.bhattach...@gmail.com wrote: Hi, Is the Simple Consumer API will change in Kafka 0.9 ? I can see a Consumer Re-design approach for Kafka 0.9 , which I believe will not impact any client written using Simple Consumer API . Is that correct ? Regards, Dibyendu -- -- Guozhang
Re: Support for topics+streams at class WildcardStreamsHandler
Just added an entry in the FAQ page: https://cwiki.apache.org/confluence/display/KAFKA/FAQ#FAQ-Whycan%27tIspecifythenumberofstreamsparallelismpertopicmapusingwildcardstreamasIusestaticstreamhandler ? On Mon, Nov 10, 2014 at 7:56 AM, Guozhang Wang wangg...@gmail.com wrote: Hi Alan, The reason we do not have per-topic parallelism spec in wildcard is two folds: 1) we use a per-topic hash-based partition algorithm, and hence having each topic with the same num. of streams may give us better load balance, 2) with the topicFilter we will not know exactly which topics to consume at the construction time, hence no way to specify per-topic specs. 1) has been lifted since we have implemented new partitioning algorithm, and for 2) we need to think about how to support it if we really want to, perhaps we can also use a regex-ed topic-count map, while ensuring that each regex in the map is precedent of the topic filter, and no overlap with each other, etc. What is your usecase that requires per-topic numStream spec? Guozhang On Sun, Nov 9, 2014 at 6:03 AM, Alan Lavintman alan.lavint...@gmail.com wrote: Hi guys, i have seen that if create a message stream by using: createMessageStreams I can define a map with Topic-#Streams Is there a reason why createMessageStreamsByFilter us not giving the same support? I have only a TopicFilter and numStreams interface such as: public ListKafkaStreambyte[], byte[] createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams); But it does not allow me to specify the parallelism per topic. Am I missing something or my assumption is correct? Bests and thanks, Alan. -- -- Guozhang -- -- Guozhang
Re: Review Request 28040: Patch for KAFKA-1770
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/28040/#review61751 --- UnknownTopicOrPartitionException can be thrown either at the producer side when the cached metadata does not have the info for this partitionId, or thrown at the server side when the specified partition does not exist on the broker. So how about: Indicates one of the following situation: 1. Producer does not have the know the partition metadata for this id upon sending messages 2. Broker does not have the specified partition by id upon receiving messages core/src/main/scala/kafka/common/UnknownTopicOrPartitionException.scala https://reviews.apache.org/r/28040/#comment103633 remove (and the end on line 23) - Guozhang Wang On Nov. 14, 2014, 4:57 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/28040/ --- (Updated Nov. 14, 2014, 4:57 p.m.) Review request for kafka. Bugs: KAFKA-1770 https://issues.apache.org/jira/browse/KAFKA-1770 Repository: kafka Description --- Modified doc for UnknownTopicOrPartitionException Diffs - core/src/main/scala/kafka/common/UnknownTopicOrPartitionException.scala 781e551e5b78b5f436431575c2961fe15acd1414 Diff: https://reviews.apache.org/r/28040/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 25995: Patch for KAFKA-1650
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/#review61528 --- core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment103712 based on the hash value of the source topic-partitonId string core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment103221 Could you make this consistent with others as offset.commit.internal.ms? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment103713 In this case, for MaxInFlightRequests 1 we are not only at the risk of message reordering but also at the risk of message loss upon failures right? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment103714 Actually, can we move this comment into the top comment of the MM class as a NOTE under the producer paragraph? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment103715 Is it possible that the current chunk has been consumed completely and the fetcher thread has yet put in a new chunk, and hence hasNext() will return false? If this case shall we stop the consumer or just let it block? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment103716 Since we already has the logIdent as threadName, I think we do not need this.getName here? core/src/main/scala/kafka/tools/MirrorMaker.scala https://reviews.apache.org/r/25995/#comment103718 With the logIdent it will become: FATAL [mirrormaker-offset-commit-thread] Offset commit thread exits due to ... which is a bit verbose? - Guozhang Wang On Nov. 12, 2014, 5:51 p.m., Jiangjie Qin wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/25995/ --- (Updated Nov. 12, 2014, 5:51 p.m.) Review request for kafka. Bugs: KAFKA-1650 and KAKFA-1650 https://issues.apache.org/jira/browse/KAFKA-1650 https://issues.apache.org/jira/browse/KAKFA-1650 Repository: kafka Description --- Addressed Guozhang's comments. Addressed Guozhang's comments commit before switch to trunk commit before rebase Rebased on trunk, Addressed Guozhang's comments. Addressed Guozhang's comments on MaxInFlightRequests Merge branch 'trunk' of http://git-wip-us.apache.org/repos/asf/kafka into mirrormaker-redesign Diffs - core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala fbc680fde21b02f11285a4f4b442987356abd17b core/src/main/scala/kafka/tools/MirrorMaker.scala f399105087588946987bbc84e3759935d9498b6a Diff: https://reviews.apache.org/r/25995/diff/ Testing --- Thanks, Jiangjie Qin
Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp
On Nov. 17, 2014, 10:31 p.m., Joel Koshy wrote: clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java, line 151 https://reviews.apache.org/r/27391/diff/3/?file=755182#file755182line151 Made a follow-up comment in the earlier RB. But pasting here as well: Agree that it is still common in the object but it is completely removed from the wire protocol - i.e., OFFSET_COMMIT_REQUEST_PARTITION_V1 which is in the V2 OffsetCommitRequest does not have a timestamp. This method should probably be read as initCommonFieldsInStruct - i.e., effectively the wire protocol. That said, I'm loathe to add another init method which reads initCommonFieldsInV0AndV1. So I think rather than checking fetchPartitionData.timestamp it would be better to explicitly check the (already set) request version in the struct. If v0 or v1 then set the timestamp key name. The version number info is not in the struct, so we cannot get its value unless we are going to add that from the request header into the constructor. But we can check if the struct has that field or not. Changed to this accordingly. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review61761 --- On Nov. 8, 2014, 12:54 a.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Nov. 8, 2014, 12:54 a.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- The timestamp field of OffsetAndMetadata is preserved since we need to be backward compatible with older versions Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala fbc680fde21b02f11285a4f4b442987356abd17b core/src/main/scala/kafka/server/KafkaApis.scala 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 core/src/main/scala/kafka/server/KafkaServer.scala 4de812374e8fb1fed834d2be3f9655f55b511a74 core/src/main/scala/kafka/server/OffsetManager.scala 2957bc435102bc4004d8f100dbcdd56287c8ffae core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634: Add the global retentionTime (in milis) and disable the per-offset timestamp
On Nov. 7, 2014, 3 a.m., Joel Koshy wrote: core/src/main/scala/kafka/server/OffsetManager.scala, line 498 https://reviews.apache.org/r/27391/diff/2/?file=753758#file753758line498 Actually, since we always set the retention period (for v0, v1) in KafkaApis do we need to even touch this timestamp? i.e., we should basically ignore it right? So we only need to do: value.set(VALUE_TIMESTAMP_FIELD, expirationTimestamp). Guozhang Wang wrote: I think note. In v0/v1, if the timestamp is explicitly specified (i.e. not -1) we need to use it as the expiration timestamp, or at least that was how I understood the semantics. Guozhang Wang wrote: I think we cannot not Joel Koshy wrote: Right - what I meant was in KafkaApis we can just compute the retentionPeriod if v0 or v1. So if vo/v1 and timestamp = (now + 7 days), then set retention to 7 days. retention is a global parameter which we use later to set the per-message timestamp; but if the timestamps from the v0/v1 requests are different then we cannot just use a single retention right? - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review60285 --- On Nov. 8, 2014, 12:54 a.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Nov. 8, 2014, 12:54 a.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- The timestamp field of OffsetAndMetadata is preserved since we need to be backward compatible with older versions Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala fbc680fde21b02f11285a4f4b442987356abd17b core/src/main/scala/kafka/server/KafkaApis.scala 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 core/src/main/scala/kafka/server/KafkaServer.scala 4de812374e8fb1fed834d2be3f9655f55b511a74 core/src/main/scala/kafka/server/OffsetManager.scala 2957bc435102bc4004d8f100dbcdd56287c8ffae core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Nov. 18, 2014, 1:42 a.m.) Review request for kafka. Summary (updated) - Fix KAFKA-1634 Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- The timestamp field of OffsetAndMetadata is preserved since we need to be backward compatible with older versions Diffs (updated) - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala f476973eeff653473a60c3ecf36e870e386536bc core/src/main/scala/kafka/server/KafkaApis.scala 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 core/src/main/scala/kafka/server/KafkaServer.scala 4de812374e8fb1fed834d2be3f9655f55b511a74 core/src/main/scala/kafka/server/OffsetManager.scala 2957bc435102bc4004d8f100dbcdd56287c8ffae core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Review Request 28240: Follow-up KAFKA-1580
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/28240/ --- Review request for kafka. Bugs: KAFKA-1580 https://issues.apache.org/jira/browse/KAFKA-1580 Repository: kafka Description --- Add the logic for checking internal topics in the replica manager layer Diffs - core/src/main/scala/kafka/admin/AdminUtils.scala 94c53320b768b83b0324336b73cc06537c0fe78d core/src/main/scala/kafka/server/KafkaApis.scala 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 core/src/main/scala/kafka/server/OffsetManager.scala 2957bc435102bc4004d8f100dbcdd56287c8ffae core/src/main/scala/kafka/server/ReplicaManager.scala 3007a6d89b637b93f71fdb7adab561a93d9c4c62 core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 8531f533f3a6431f4f9fc8cb1ad3e9f1f1b110e0 Diff: https://reviews.apache.org/r/28240/diff/ Testing --- Thanks, Guozhang Wang
Review Request 28268: Bump up default scala version from 2.10.1 to 2.10.4
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/28268/ --- Review request for kafka. Bugs: KAFKA-1624 https://issues.apache.org/jira/browse/KAFKA-1624 Repository: kafka Description --- Compiles with Java 1.6/7/8 Diffs - bin/kafka-run-class.sh 36c742b67a7259fa35c3ed862f7ccc4673b69d9f bin/windows/kafka-run-class.bat 8e9780e2eb74a35a726787155c09b151d0ba build.gradle 11eb11355efddacf62d61690ad13b9c82a200230 gradle.properties 5d3155fd4461438d8b2ec4faa9534cc2383d4951 scala.gradle 6adf9af7dbbe71e68a07b387c3854d8c9ad339e0 Diff: https://reviews.apache.org/r/28268/diff/ Testing --- Thanks, Guozhang Wang
Re: Welcome Kafka's newest committer
Thank you folks! Glad to be on board. Guozhang On Wed, Nov 19, 2014 at 4:34 PM, Harsha ka...@harsha.io wrote: Congrats Guozhang On Wed, Nov 19, 2014, at 04:31 PM, Joe Stein wrote: Congrats!!! On Wed, Nov 19, 2014 at 7:05 PM, Neha Narkhede neha.narkh...@gmail.com wrote: Hi everyone, I'm very happy to announce that the Kafka PMC has invited Guozhang Wang to become a committer. Guozhang has made significant contributions to Kafka over the past year, along with being very active on code reviews and the mailing list. Please join me in welcoming him. Thanks, Neha (on behalf of the Kafka PMC) -- -- Guozhang
Re: Review Request 28240: Follow-up KAFKA-1580
On Nov. 21, 2014, 1:48 p.m., Joel Koshy wrote: core/src/main/scala/kafka/server/KafkaApis.scala, line 193 https://reviews.apache.org/r/28240/diff/1/?file=770058#file770058line193 I think it is more suitable to have this access control implemented inside ReplicaManager. One can argue both ways though... I was actually back and forth quite some times back then. The main reason I choose to do this in the Kafka API layer is that otherwise the offset manager / replica manager has to handle operations with clientId, which defeats the purpose of the refactoring effort. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/28240/#review62547 --- On Nov. 19, 2014, 7:01 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/28240/ --- (Updated Nov. 19, 2014, 7:01 p.m.) Review request for kafka. Bugs: KAFKA-1580 https://issues.apache.org/jira/browse/KAFKA-1580 Repository: kafka Description --- Add the logic for checking internal topics in the replica manager layer Diffs - core/src/main/scala/kafka/admin/AdminUtils.scala 94c53320b768b83b0324336b73cc06537c0fe78d core/src/main/scala/kafka/server/KafkaApis.scala 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 core/src/main/scala/kafka/server/OffsetManager.scala 2957bc435102bc4004d8f100dbcdd56287c8ffae core/src/main/scala/kafka/server/ReplicaManager.scala 3007a6d89b637b93f71fdb7adab561a93d9c4c62 core/src/test/scala/integration/kafka/api/ProducerFailureHandlingTest.scala 8531f533f3a6431f4f9fc8cb1ad3e9f1f1b110e0 Diff: https://reviews.apache.org/r/28240/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634
On Nov. 21, 2014, 2:41 p.m., Joel Koshy wrote: core/src/main/scala/kafka/server/KafkaApis.scala, line 147 https://reviews.apache.org/r/27391/diff/4/?file=766686#file766686line147 I think what you meant earlier was with v2 you could have different timestamps for each partition so a global retention won't work with v2 In the OffsetManager, the timestamp will only be overriden by the global one if it is set to Default which is -1, so for v0/1 although the retention is used it will not be overriding the timestamp. On Nov. 21, 2014, 2:41 p.m., Joel Koshy wrote: core/src/main/scala/kafka/common/OffsetMetadataAndError.scala, line 17 https://reviews.apache.org/r/27391/diff/4/?file=766684#file766684line17 I thought we would be going with separate format for on-disk storage? E.g., one thing that is extremely useful (until we have timestamp as a first-class field of messages) is to have the receive time of the offsetcommit in the stored offset entries. This is very useful for debugging. Yes they are separated: for on-disk storage the timestamp will always been stored, and for wire protocol only v0/1 will contain that value, but for v2 this value will be computed via retention. So the on-disk format is specified as OffsetAndMetadata, and when we deprecating v0/1 and adding the timestmap to message header we will replace this with OffsetMetadata. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review62553 --- On Nov. 18, 2014, 1:42 a.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Nov. 18, 2014, 1:42 a.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- The timestamp field of OffsetAndMetadata is preserved since we need to be backward compatible with older versions Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala f476973eeff653473a60c3ecf36e870e386536bc core/src/main/scala/kafka/server/KafkaApis.scala 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 core/src/main/scala/kafka/server/KafkaServer.scala 4de812374e8fb1fed834d2be3f9655f55b511a74 core/src/main/scala/kafka/server/OffsetManager.scala 2957bc435102bc4004d8f100dbcdd56287c8ffae core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 27391: Fix KAFKA-1634
On Nov. 21, 2014, 2:41 p.m., Joel Koshy wrote: core/src/main/scala/kafka/server/OffsetManager.scala, line 500 https://reviews.apache.org/r/27391/diff/4/?file=766688#file766688line500 This and the above use give compilation warnings. I think it is reasonable to copy those constants here to get rid of the warnings since we are anyway in an undesirable state right now of maintaining a mirror wire-protocol implementation in scala Not sure what you mean by copy those contants? I did the copy inside the if block and it is the compilation error from the condition. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/#review62553 --- On Nov. 18, 2014, 1:42 a.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27391/ --- (Updated Nov. 18, 2014, 1:42 a.m.) Review request for kafka. Bugs: KAFKA-1634 https://issues.apache.org/jira/browse/KAFKA-1634 Repository: kafka Description --- The timestamp field of OffsetAndMetadata is preserved since we need to be backward compatible with older versions Diffs - clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java 7517b879866fc5dad5f8d8ad30636da8bbe7784a clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java df37fc6d8f0db0b8192a948426af603be3444da4 core/src/main/scala/kafka/api/OffsetCommitRequest.scala 050615c72efe7dbaa4634f53943bd73273d20ffb core/src/main/scala/kafka/api/OffsetFetchRequest.scala c7604b9cdeb8f46507f04ed7d35fcb3d5f150713 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 4cabffeacea09a49913505db19a96a55d58c0909 core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala f476973eeff653473a60c3ecf36e870e386536bc core/src/main/scala/kafka/server/KafkaApis.scala 968b0c4f809ea9f9c3f322aef9db105f9d2e0e23 core/src/main/scala/kafka/server/KafkaServer.scala 4de812374e8fb1fed834d2be3f9655f55b511a74 core/src/main/scala/kafka/server/OffsetManager.scala 2957bc435102bc4004d8f100dbcdd56287c8ffae core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala cd16ced5465d098be7a60498326b2a98c248f343 core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 8c5364fa97da1be09973c176d1baeb339455d319 Diff: https://reviews.apache.org/r/27391/diff/ Testing --- Thanks, Guozhang Wang