Re: Review Request 35086: add response tests for ConsumerCoordinator
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/35086/#review88247 --- Hey Onur, I got some rebase errors applying the patch after applying KAFKA-1740. Could you double check? - Guozhang Wang On June 4, 2015, 7:57 p.m., Onur Karaman wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/35086/ --- (Updated June 4, 2015, 7:57 p.m.) Review request for kafka. Bugs: KAFKA-2245 https://issues.apache.org/jira/browse/KAFKA-2245 Repository: kafka Description --- add response tests for ConsumerCoordinator Diffs - core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 51e89c87ee2c20fc7f976536f01fa1055fb8e670 core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala c39e6de34ee531c6dfa9107b830752bd7f8fbe59 core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/35086/diff/ Testing --- Thanks, Onur Karaman
Re: Review Request 35086: add response tests for ConsumerCoordinator
On June 17, 2015, 5:53 p.m., Guozhang Wang wrote: Hey Onur, I got some rebase errors applying the patch after applying KAFKA-1740. Could you double check? When I apply KAFKA-1740 first and then KAFKA-2245, I get: ``` git apply ~/Downloads/rb35231.patch git apply ~/Downloads/rb35086.patch error: patch failed: core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala:46 error: core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala: patch does not apply ``` Is this what you're referring to? - Onur --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/35086/#review88247 --- On June 4, 2015, 7:57 p.m., Onur Karaman wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/35086/ --- (Updated June 4, 2015, 7:57 p.m.) Review request for kafka. Bugs: KAFKA-2245 https://issues.apache.org/jira/browse/KAFKA-2245 Repository: kafka Description --- add response tests for ConsumerCoordinator Diffs - core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 51e89c87ee2c20fc7f976536f01fa1055fb8e670 core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala c39e6de34ee531c6dfa9107b830752bd7f8fbe59 core/src/test/scala/unit/kafka/coordinator/ConsumerCoordinatorResponseTest.scala PRE-CREATION Diff: https://reviews.apache.org/r/35086/diff/ Testing --- Thanks, Onur Karaman
Re: Review Request 35347: Patch for KAFKA-2249
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/35347/ --- (Updated June 18, 2015, 12:35 a.m.) Review request for kafka. Bugs: KAFKA-2249 https://issues.apache.org/jira/browse/KAFKA-2249 Repository: kafka Description (updated) --- Moved LogConfig to implement AbstractConfig too. This means modifying most Log tests, and some changes to defaults Diffs (updated) - clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java c4fa058692f50abb4f47bd344119d805c60123f5 core/src/main/scala/kafka/cluster/Partition.scala 730a232482fdf77be5704cdf5941cfab3828db88 core/src/main/scala/kafka/controller/KafkaController.scala 69bba243a9a511cc5292b43da0cc48e421a428b0 core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala 3b15ab4eef22c6f50a7483e99a6af40fb55aca9f core/src/main/scala/kafka/log/LogConfig.scala f64fd79ee4cdd5ad15cd9b14fe7247464cde1e94 core/src/main/scala/kafka/log/LogManager.scala e781ebac2677ebb22e0c1fef0cf7e5ad57c74ea4 core/src/main/scala/kafka/server/KafkaApis.scala c7debe458ce9d80024b3f8544c92ebe3e14159dc core/src/main/scala/kafka/server/KafkaConfig.scala 2d75186a110075e0c322db4b9f7a8c964a7a3e88 core/src/main/scala/kafka/server/KafkaServer.scala b320ce9f6a12c0ee392e91beb82e8804d167f9f4 core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 181cbc16b3780ffa77966cbc26337d2c39be9a72 core/src/main/scala/kafka/server/TopicConfigManager.scala b675a7e45ea4f4179f8b15fe221fd988aff13aa0 core/src/main/scala/kafka/utils/CoreUtils.scala d0a8fa701564b4c13b3cd6501e1b6218d77e8e06 core/src/test/scala/other/kafka/StressTestLog.scala c0e248d669c7bd653f512af7f72d895c38772f83 core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala 3034c4f9b0d026e25ce045689d9a9f99a59a10ec core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala 37f0684bbd0bfaf64b765ce04a928e257f0a core/src/test/scala/unit/kafka/log/CleanerTest.scala 8b8249a35322a60ca94cb385a6cad25943dd1cc9 core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 471ddff9bff1bdfa277c071e59e5c6b749b9c74f core/src/test/scala/unit/kafka/log/LogConfigTest.scala 3fd5a53f9b0edc0a7a169a185cd3041ea1ae7658 core/src/test/scala/unit/kafka/log/LogManagerTest.scala 01dfbc4f8d21f6905327cd4ed6c61d657adc0143 core/src/test/scala/unit/kafka/log/LogTest.scala 8e095d652851f05365e1d3bbe3e9e1c3345b7a40 core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala 7877f6ca1845c2edbf96d4a9783a07a552db8f07 core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala c487f361949b2ca2b6d1b5e2c7fb9ba83c8e53c1 Diff: https://reviews.apache.org/r/35347/diff/ Testing --- Thanks, Gwen Shapira
Any reason why LogConfig defaults are different from KafkaConfig defaults?
Hi, I'm migrating KafkaConfig and LogConfig over to AbstractConfig interface,and I've ran into something strange: All of LogConfig parameters also exist at KafkaConfig level. However the defaults are completely different. Few examples: Maximum message size is 100 + MessageSet.LogOverhead in KafkaConfig defaults, but MaxInt at LogConfig level. Segment size (bytes) is 1024 * 1024 * 1024 at KafkaConfig, but 1024 * 1024 in LogConfig. This is true for most configs. From what I can see, LogConfigs are NEVER users, because LogManager is initialized with values from KafkaConfig as the default LogConfig when it starts. Anyone knows why LogConfig has different defaults? Any objections to unifying them? Gwen
Re: Review Request 35347: Patch for KAFKA-2249
On June 11, 2015, 11:22 p.m., Jun Rao wrote: core/src/main/scala/kafka/server/KafkaConfig.scala, lines 964-965 https://reviews.apache.org/r/35347/diff/1/?file=982452#file982452line964 Is this castable since props is a map? Also, toProps is really just used by LogConfig which converts Properties back to a map. Perhaps we don't even need this method and just let LogConfig access KafkaConfig.originals(). KafkaConfig.getMetricClasses() uses toProps, but is never used. We can just remove that method. Also, perhaps it's useful and convenient to turn LogConfig into an AbstractConfig too. I agree that we don't need to expose toProps since we can let other classes access originals directly. Just note that the current design allows for users to pass arbitrary properties to reporters by placing them in server.properties (since we pass original properties along when configuring reporters in getMetricClasses) - this is a critical feature and why getMetricClasses used to call toProps and will now call originals. - Gwen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/35347/#review87643 --- On June 11, 2015, 6:09 a.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/35347/ --- (Updated June 11, 2015, 6:09 a.m.) Review request for kafka. Bugs: KAFKA-2249 https://issues.apache.org/jira/browse/KAFKA-2249 Repository: kafka Description --- modified KafkaConfig to implement AbstractConfig. This resulted in somewhat cleaner code, and we preserve the original Properties for use by MetricReporter Diffs - clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java c4fa058692f50abb4f47bd344119d805c60123f5 core/src/main/scala/kafka/controller/KafkaController.scala 69bba243a9a511cc5292b43da0cc48e421a428b0 core/src/main/scala/kafka/server/KafkaApis.scala d63bc18d795a6f0e6994538ca55a9a46f7fb8ffd core/src/main/scala/kafka/server/KafkaConfig.scala 2d75186a110075e0c322db4b9f7a8c964a7a3e88 core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b31b432a226ba79546dd22ef1d2acbb439c2e9a3 core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala ace6321b36d809946554d205bc926c9c76a43bd6 Diff: https://reviews.apache.org/r/35347/diff/ Testing --- Thanks, Gwen Shapira
Re: Review Request 35347: Patch for KAFKA-2249
On June 11, 2015, 11:22 p.m., Jun Rao wrote: core/src/main/scala/kafka/server/KafkaConfig.scala, lines 964-965 https://reviews.apache.org/r/35347/diff/1/?file=982452#file982452line964 Is this castable since props is a map? Also, toProps is really just used by LogConfig which converts Properties back to a map. Perhaps we don't even need this method and just let LogConfig access KafkaConfig.originals(). KafkaConfig.getMetricClasses() uses toProps, but is never used. We can just remove that method. Also, perhaps it's useful and convenient to turn LogConfig into an AbstractConfig too. Gwen Shapira wrote: I agree that we don't need to expose toProps since we can let other classes access originals directly. Just note that the current design allows for users to pass arbitrary properties to reporters by placing them in server.properties (since we pass original properties along when configuring reporters in getMetricClasses) - this is a critical feature and why getMetricClasses used to call toProps and will now call originals. Turning LogConfig into an AbstractConfig should have been its own JIRA. The patch is now quite large since I had to modify creation of LogConfig in a bunch of tests. - Gwen --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/35347/#review87643 --- On June 18, 2015, 12:35 a.m., Gwen Shapira wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/35347/ --- (Updated June 18, 2015, 12:35 a.m.) Review request for kafka. Bugs: KAFKA-2249 https://issues.apache.org/jira/browse/KAFKA-2249 Repository: kafka Description --- Moved LogConfig to implement AbstractConfig too. This means modifying most Log tests, and some changes to defaults Diffs - clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java c4fa058692f50abb4f47bd344119d805c60123f5 core/src/main/scala/kafka/cluster/Partition.scala 730a232482fdf77be5704cdf5941cfab3828db88 core/src/main/scala/kafka/controller/KafkaController.scala 69bba243a9a511cc5292b43da0cc48e421a428b0 core/src/main/scala/kafka/controller/PartitionLeaderSelector.scala 3b15ab4eef22c6f50a7483e99a6af40fb55aca9f core/src/main/scala/kafka/log/LogConfig.scala f64fd79ee4cdd5ad15cd9b14fe7247464cde1e94 core/src/main/scala/kafka/log/LogManager.scala e781ebac2677ebb22e0c1fef0cf7e5ad57c74ea4 core/src/main/scala/kafka/server/KafkaApis.scala c7debe458ce9d80024b3f8544c92ebe3e14159dc core/src/main/scala/kafka/server/KafkaConfig.scala 2d75186a110075e0c322db4b9f7a8c964a7a3e88 core/src/main/scala/kafka/server/KafkaServer.scala b320ce9f6a12c0ee392e91beb82e8804d167f9f4 core/src/main/scala/kafka/server/ReplicaFetcherThread.scala 181cbc16b3780ffa77966cbc26337d2c39be9a72 core/src/main/scala/kafka/server/TopicConfigManager.scala b675a7e45ea4f4179f8b15fe221fd988aff13aa0 core/src/main/scala/kafka/utils/CoreUtils.scala d0a8fa701564b4c13b3cd6501e1b6218d77e8e06 core/src/test/scala/other/kafka/StressTestLog.scala c0e248d669c7bd653f512af7f72d895c38772f83 core/src/test/scala/other/kafka/TestLinearWriteSpeed.scala 3034c4f9b0d026e25ce045689d9a9f99a59a10ec core/src/test/scala/unit/kafka/log/BrokerCompressionTest.scala 37f0684bbd0bfaf64b765ce04a928e257f0a core/src/test/scala/unit/kafka/log/CleanerTest.scala 8b8249a35322a60ca94cb385a6cad25943dd1cc9 core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 471ddff9bff1bdfa277c071e59e5c6b749b9c74f core/src/test/scala/unit/kafka/log/LogConfigTest.scala 3fd5a53f9b0edc0a7a169a185cd3041ea1ae7658 core/src/test/scala/unit/kafka/log/LogManagerTest.scala 01dfbc4f8d21f6905327cd4ed6c61d657adc0143 core/src/test/scala/unit/kafka/log/LogTest.scala 8e095d652851f05365e1d3bbe3e9e1c3345b7a40 core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala 7877f6ca1845c2edbf96d4a9783a07a552db8f07 core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala c487f361949b2ca2b6d1b5e2c7fb9ba83c8e53c1 Diff: https://reviews.apache.org/r/35347/diff/ Testing --- Thanks, Gwen Shapira
[jira] [Updated] (KAFKA-2249) KafkaConfig does not preserve original Properties
[ https://issues.apache.org/jira/browse/KAFKA-2249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gwen Shapira updated KAFKA-2249: Status: Patch Available (was: In Progress) KafkaConfig does not preserve original Properties - Key: KAFKA-2249 URL: https://issues.apache.org/jira/browse/KAFKA-2249 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Assignee: Gwen Shapira Attachments: KAFKA-2249.patch, KAFKA-2249_2015-06-17_17:35:35.patch We typically generate configuration from properties objects (or maps). The old KafkaConfig, and the new ProducerConfig and ConsumerConfig all retain the original Properties object, which means that if the user specified properties that are not part of ConfigDef definitions, they are still accessible. This is important especially for MetricReporters where we want to allow users to pass arbitrary properties for the reporter. One way to support this is by having KafkaConfig implement AbstractConfig, which will give us other nice functionality too. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2249) KafkaConfig does not preserve original Properties
[ https://issues.apache.org/jira/browse/KAFKA-2249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gwen Shapira updated KAFKA-2249: Attachment: KAFKA-2249_2015-06-17_17:35:35.patch KafkaConfig does not preserve original Properties - Key: KAFKA-2249 URL: https://issues.apache.org/jira/browse/KAFKA-2249 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Assignee: Gwen Shapira Attachments: KAFKA-2249.patch, KAFKA-2249_2015-06-17_17:35:35.patch We typically generate configuration from properties objects (or maps). The old KafkaConfig, and the new ProducerConfig and ConsumerConfig all retain the original Properties object, which means that if the user specified properties that are not part of ConfigDef definitions, they are still accessible. This is important especially for MetricReporters where we want to allow users to pass arbitrary properties for the reporter. One way to support this is by having KafkaConfig implement AbstractConfig, which will give us other nice functionality too. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2249) KafkaConfig does not preserve original Properties
[ https://issues.apache.org/jira/browse/KAFKA-2249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14590932#comment-14590932 ] Gwen Shapira commented on KAFKA-2249: - Updated reviewboard https://reviews.apache.org/r/35347/diff/ against branch trunk KafkaConfig does not preserve original Properties - Key: KAFKA-2249 URL: https://issues.apache.org/jira/browse/KAFKA-2249 Project: Kafka Issue Type: Bug Reporter: Gwen Shapira Assignee: Gwen Shapira Attachments: KAFKA-2249.patch, KAFKA-2249_2015-06-17_17:35:35.patch We typically generate configuration from properties objects (or maps). The old KafkaConfig, and the new ProducerConfig and ConsumerConfig all retain the original Properties object, which means that if the user specified properties that are not part of ConfigDef definitions, they are still accessible. This is important especially for MetricReporters where we want to allow users to pass arbitrary properties for the reporter. One way to support this is by having KafkaConfig implement AbstractConfig, which will give us other nice functionality too. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Any reason why LogConfig defaults are different from KafkaConfig defaults?
I think this is just a mistake and you should definitely fix it. Nice catch. -Jay On Wed, Jun 17, 2015 at 4:34 PM, Gwen Shapira gshap...@cloudera.com wrote: Hi, I'm migrating KafkaConfig and LogConfig over to AbstractConfig interface,and I've ran into something strange: All of LogConfig parameters also exist at KafkaConfig level. However the defaults are completely different. Few examples: Maximum message size is 100 + MessageSet.LogOverhead in KafkaConfig defaults, but MaxInt at LogConfig level. Segment size (bytes) is 1024 * 1024 * 1024 at KafkaConfig, but 1024 * 1024 in LogConfig. This is true for most configs. From what I can see, LogConfigs are NEVER users, because LogManager is initialized with values from KafkaConfig as the default LogConfig when it starts. Anyone knows why LogConfig has different defaults? Any objections to unifying them? Gwen
Re: Review Request 35231: Fix KAFKA-1740
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/35231/#review88301 --- clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java (line 558) https://reviews.apache.org/r/35231/#comment140747 Did you intend to actually rethrow or just log? core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala (lines 229 - 236) https://reviews.apache.org/r/35231/#comment140772 In manual group management, would we expect consumerId and generationId to be null? core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala (line 280) https://reviews.apache.org/r/35231/#comment140755 This looks like the wrong ticket. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala (line 285) https://reviews.apache.org/r/35231/#comment140756 Same as above, wrong ticket. - Jason Gustafson On June 8, 2015, 11:12 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/35231/ --- (Updated June 8, 2015, 11:12 p.m.) Review request for kafka. Bugs: KAFKA-1740 https://issues.apache.org/jira/browse/KAFKA-1740 Repository: kafka Description --- Move offset manager to coordinator, add validation logic for offset commit and fetch Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java c1496a0851526f3c7d3905ce4bdff2129c83a6c1 clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 5b898c8f8ad5d0469f469b600c4b2eb13d1fc662 clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java 70844d65369f6ff300cbeb513dbb6650050c7eec clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java deec1fa480d5a5c5884a1c007b076aa64e902472 clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java 512a0ef7e619d54e74122c38119209f5cf9590e3 clients/src/test/java/org/apache/kafka/clients/consumer/internals/CoordinatorTest.java b06c4a73e2b4e9472cd772c8bc32bf4a29f431bb core/src/main/scala/kafka/admin/TopicCommand.scala dacbdd089f96c67b7bbc1c0cd36490d4fe094c1b core/src/main/scala/kafka/cluster/Partition.scala 730a232482fdf77be5704cdf5941cfab3828db88 core/src/main/scala/kafka/common/OffsetMetadataAndError.scala 6b4242c7cd1df9b3465db0fec35a25102c76cd60 core/src/main/scala/kafka/common/Topic.scala ad759786d1c22f67c47808c0b8f227eb2b1a9aa8 core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 51e89c87ee2c20fc7f976536f01fa1055fb8e670 core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala c39e6de34ee531c6dfa9107b830752bd7f8fbe59 core/src/main/scala/kafka/server/KafkaApis.scala d63bc18d795a6f0e6994538ca55a9a46f7fb8ffd core/src/main/scala/kafka/server/KafkaServer.scala b320ce9f6a12c0ee392e91beb82e8804d167f9f4 core/src/main/scala/kafka/server/OffsetManager.scala 5cca85cf727975f6d3acb2223fd186753ad761dc core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 core/src/test/scala/integration/kafka/api/ConsumerTest.scala 17b17b9b1520c7cc2e2ba96cdb1f9ff06e47bcad core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 07b1ff47bfc3cd3f948c9533c8dc977fa36d996f core/src/test/scala/unit/kafka/admin/TopicCommandTest.scala c7136f20972614ac47aa57ab13e3c94ef775a4b7 core/src/test/scala/unit/kafka/consumer/TopicFilterTest.scala 4f124af5c3e946045a78ad1519c37372a72c8985 core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala 08854c5e6ec249368206298b2ac2623df18f266a core/src/test/scala/unit/kafka/server/OffsetCommitTest.scala 528525b719ec916e16f8b3ae3715bec4b5dcc47d Diff: https://reviews.apache.org/r/35231/diff/ Testing --- Thanks, Guozhang Wang
[jira] [Updated] (KAFKA-2282) ConsumerConnector enhance
[ https://issues.apache.org/jira/browse/KAFKA-2282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiankang Liu updated KAFKA-2282: Status: Patch Available (was: Open) ConsumerConnector enhance - Key: KAFKA-2282 URL: https://issues.apache.org/jira/browse/KAFKA-2282 Project: Kafka Issue Type: Improvement Components: consumer Reporter: Jiankang Liu Assignee: Neha Narkhede Attachments: KAFKA-2282.patch 1. As a developer, I want to manually handle rebalance situation, such as commit offset to avoid consume duplicate message; thus export the setConsumerRebalanceListener interface in java api. 2. It will throw a ClassCastException to just simply use asInstanceOf to cast java.util.map to scala immutable.map; it'd better use JavaConversions, not use JavaConverters to keep code style syched and compatible for scala under 2.8 version. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2092) New partitioning for better load balancing
[ https://issues.apache.org/jira/browse/KAFKA-2092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14589634#comment-14589634 ] Gianmarco De Francisci Morales commented on KAFKA-2092: --- Thanks for your comment [~jkreps]. Indeed, this uses the load estimated at the producer to infer the load at the consumer. You might think this does not work but indeed it does in most cases (see [1] for details). I am not sure whether the lifecycle of the producer has any impact here. The goal is simply to send balanced partitions out of the producer. Regarding the key=partition mapping, yes this breaks the 1 key to 1 partition mapping. That's exactly the point, to offer a new primitive for stream partitioning. If you are doing word count you need a final aggregator as you say, but the aggregation is O(1) rather than O(W) [where W is the number of workers, i.e., parallelism of the operator]. Also, if you imagine building views out of these partitions, you can query 2 views rather than 1 to obtain the final answer (again, compared to shuffle grouping where you need p queries). I disagree with your last point (and the results do too). Given that you have 2 options, the imbalance is reduced much more than just by 2 times, because you create options to offload part of the load on a heavy partition to the second choice, thus creating a network of backup/offload options to move to when one key becomes hot. It's as creating interconnected pipes where you pump a fluid into. What is true is that if the single heavy key is larger than (2/W)% of the stream, then this technique cannot help you to achieve perfect load balance. New partitioning for better load balancing -- Key: KAFKA-2092 URL: https://issues.apache.org/jira/browse/KAFKA-2092 Project: Kafka Issue Type: Improvement Components: producer Reporter: Gianmarco De Francisci Morales Assignee: Jun Rao Attachments: KAFKA-2092-v1.patch We have recently studied the problem of load balancing in distributed stream processing systems such as Samza [1]. In particular, we focused on what happens when the key distribution of the stream is skewed when using key grouping. We developed a new stream partitioning scheme (which we call Partial Key Grouping). It achieves better load balancing than hashing while being more scalable than round robin in terms of memory. In the paper we show a number of mining algorithms that are easy to implement with partial key grouping, and whose performance can benefit from it. We think that it might also be useful for a larger class of algorithms. PKG has already been integrated in Storm [2], and I would like to be able to use it in Samza as well. As far as I understand, Kafka producers are the ones that decide how to partition the stream (or Kafka topic). I do not have experience with Kafka, however partial key grouping is very easy to implement: it requires just a few lines of code in Java when implemented as a custom grouping in Storm [3]. I believe it should be very easy to integrate. For all these reasons, I believe it will be a nice addition to Kafka/Samza. If the community thinks it's a good idea, I will be happy to offer support in the porting. References: [1] https://melmeric.files.wordpress.com/2014/11/the-power-of-both-choices-practical-load-balancing-for-distributed-stream-processing-engines.pdf [2] https://issues.apache.org/jira/browse/STORM-632 [3] https://github.com/gdfm/partial-key-grouping -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2159) offsets.topic.segment.bytes and offsets.topic.retention.minutes are ignored
[ https://issues.apache.org/jira/browse/KAFKA-2159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar Reddy updated KAFKA-2159: --- Attachment: KAFKA-2159_2015-06-17_11:44:03.patch offsets.topic.segment.bytes and offsets.topic.retention.minutes are ignored --- Key: KAFKA-2159 URL: https://issues.apache.org/jira/browse/KAFKA-2159 Project: Kafka Issue Type: Bug Components: offset manager Affects Versions: 0.8.2.1 Reporter: Rafał Boniecki Assignee: Manikumar Reddy Labels: newbie Attachments: KAFKA-2159.patch, KAFKA-2159_2015-06-17_11:44:03.patch My broker configuration: {quote}offsets.topic.num.partitions=20 offsets.topic.segment.bytes=10485760 offsets.topic.retention.minutes=10080{quote} Describe of __consumer_offsets topic: {quote}Topic:__consumer_offsets PartitionCount:20 ReplicationFactor:3 Configs:segment.bytes=104857600,cleanup.policy=compact Topic: __consumer_offsets Partition: 0Leader: 112 Replicas: 112,212,312 Isr: 212,312,112 Topic: __consumer_offsets Partition: 1Leader: 212 Replicas: 212,312,412 Isr: 212,312,412 Topic: __consumer_offsets Partition: 2Leader: 312 Replicas: 312,412,512 Isr: 312,412,512 Topic: __consumer_offsets Partition: 3Leader: 412 Replicas: 412,512,112 Isr: 412,512,112 Topic: __consumer_offsets Partition: 4Leader: 512 Replicas: 512,112,212 Isr: 512,212,112 Topic: __consumer_offsets Partition: 5Leader: 112 Replicas: 112,312,412 Isr: 312,412,112 Topic: __consumer_offsets Partition: 6Leader: 212 Replicas: 212,412,512 Isr: 212,412,512 Topic: __consumer_offsets Partition: 7Leader: 312 Replicas: 312,512,112 Isr: 312,512,112 Topic: __consumer_offsets Partition: 8Leader: 412 Replicas: 412,112,212 Isr: 412,212,112 Topic: __consumer_offsets Partition: 9Leader: 512 Replicas: 512,212,312 Isr: 512,212,312 Topic: __consumer_offsets Partition: 10 Leader: 112 Replicas: 112,412,512 Isr: 412,512,112 Topic: __consumer_offsets Partition: 11 Leader: 212 Replicas: 212,512,112 Isr: 212,512,112 Topic: __consumer_offsets Partition: 12 Leader: 312 Replicas: 312,112,212 Isr: 312,212,112 Topic: __consumer_offsets Partition: 13 Leader: 412 Replicas: 412,212,312 Isr: 412,212,312 Topic: __consumer_offsets Partition: 14 Leader: 512 Replicas: 512,312,412 Isr: 512,312,412 Topic: __consumer_offsets Partition: 15 Leader: 112 Replicas: 112,512,212 Isr: 512,212,112 Topic: __consumer_offsets Partition: 16 Leader: 212 Replicas: 212,112,312 Isr: 212,312,112 Topic: __consumer_offsets Partition: 17 Leader: 312 Replicas: 312,212,412 Isr: 312,212,412 Topic: __consumer_offsets Partition: 18 Leader: 412 Replicas: 412,312,512 Isr: 412,312,512 Topic: __consumer_offsets Partition: 19 Leader: 512 Replicas: 512,412,112 Isr: 512,412,112{quote} OffsetManager logs: {quote}2015-04-29 17:58:43:403 CEST DEBUG [kafka-scheduler-3][kafka.server.OffsetManager] Compacting offsets cache. 2015-04-29 17:58:43:403 CEST DEBUG [kafka-scheduler-3][kafka.server.OffsetManager] Found 1 stale offsets (older than 8640 ms). 2015-04-29 17:58:43:404 CEST TRACE [kafka-scheduler-3][kafka.server.OffsetManager] Removing stale offset and metadata for [drafts,tasks,1]: OffsetAndMetadata[824,consumer_id = drafts, time = 1430322433,0] 2015-04-29 17:58:43:404 CEST TRACE [kafka-scheduler-3][kafka.server.OffsetManager] Marked 1 offsets in [__consumer_offsets,2] for deletion. 2015-04-29 17:58:43:404 CEST DEBUG [kafka-scheduler-3][kafka.server.OffsetManager] Removed 1 stale offsets in 1 milliseconds.{quote} Parameters are ignored and default values are used instead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2159) offsets.topic.segment.bytes and offsets.topic.retention.minutes are ignored
[ https://issues.apache.org/jira/browse/KAFKA-2159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14589362#comment-14589362 ] Manikumar Reddy commented on KAFKA-2159: Updated reviewboard https://reviews.apache.org/r/35454/diff/ against branch origin/trunk offsets.topic.segment.bytes and offsets.topic.retention.minutes are ignored --- Key: KAFKA-2159 URL: https://issues.apache.org/jira/browse/KAFKA-2159 Project: Kafka Issue Type: Bug Components: offset manager Affects Versions: 0.8.2.1 Reporter: Rafał Boniecki Assignee: Manikumar Reddy Labels: newbie Attachments: KAFKA-2159.patch, KAFKA-2159_2015-06-17_11:44:03.patch My broker configuration: {quote}offsets.topic.num.partitions=20 offsets.topic.segment.bytes=10485760 offsets.topic.retention.minutes=10080{quote} Describe of __consumer_offsets topic: {quote}Topic:__consumer_offsets PartitionCount:20 ReplicationFactor:3 Configs:segment.bytes=104857600,cleanup.policy=compact Topic: __consumer_offsets Partition: 0Leader: 112 Replicas: 112,212,312 Isr: 212,312,112 Topic: __consumer_offsets Partition: 1Leader: 212 Replicas: 212,312,412 Isr: 212,312,412 Topic: __consumer_offsets Partition: 2Leader: 312 Replicas: 312,412,512 Isr: 312,412,512 Topic: __consumer_offsets Partition: 3Leader: 412 Replicas: 412,512,112 Isr: 412,512,112 Topic: __consumer_offsets Partition: 4Leader: 512 Replicas: 512,112,212 Isr: 512,212,112 Topic: __consumer_offsets Partition: 5Leader: 112 Replicas: 112,312,412 Isr: 312,412,112 Topic: __consumer_offsets Partition: 6Leader: 212 Replicas: 212,412,512 Isr: 212,412,512 Topic: __consumer_offsets Partition: 7Leader: 312 Replicas: 312,512,112 Isr: 312,512,112 Topic: __consumer_offsets Partition: 8Leader: 412 Replicas: 412,112,212 Isr: 412,212,112 Topic: __consumer_offsets Partition: 9Leader: 512 Replicas: 512,212,312 Isr: 512,212,312 Topic: __consumer_offsets Partition: 10 Leader: 112 Replicas: 112,412,512 Isr: 412,512,112 Topic: __consumer_offsets Partition: 11 Leader: 212 Replicas: 212,512,112 Isr: 212,512,112 Topic: __consumer_offsets Partition: 12 Leader: 312 Replicas: 312,112,212 Isr: 312,212,112 Topic: __consumer_offsets Partition: 13 Leader: 412 Replicas: 412,212,312 Isr: 412,212,312 Topic: __consumer_offsets Partition: 14 Leader: 512 Replicas: 512,312,412 Isr: 512,312,412 Topic: __consumer_offsets Partition: 15 Leader: 112 Replicas: 112,512,212 Isr: 512,212,112 Topic: __consumer_offsets Partition: 16 Leader: 212 Replicas: 212,112,312 Isr: 212,312,112 Topic: __consumer_offsets Partition: 17 Leader: 312 Replicas: 312,212,412 Isr: 312,212,412 Topic: __consumer_offsets Partition: 18 Leader: 412 Replicas: 412,312,512 Isr: 412,312,512 Topic: __consumer_offsets Partition: 19 Leader: 512 Replicas: 512,412,112 Isr: 512,412,112{quote} OffsetManager logs: {quote}2015-04-29 17:58:43:403 CEST DEBUG [kafka-scheduler-3][kafka.server.OffsetManager] Compacting offsets cache. 2015-04-29 17:58:43:403 CEST DEBUG [kafka-scheduler-3][kafka.server.OffsetManager] Found 1 stale offsets (older than 8640 ms). 2015-04-29 17:58:43:404 CEST TRACE [kafka-scheduler-3][kafka.server.OffsetManager] Removing stale offset and metadata for [drafts,tasks,1]: OffsetAndMetadata[824,consumer_id = drafts, time = 1430322433,0] 2015-04-29 17:58:43:404 CEST TRACE [kafka-scheduler-3][kafka.server.OffsetManager] Marked 1 offsets in [__consumer_offsets,2] for deletion. 2015-04-29 17:58:43:404 CEST DEBUG [kafka-scheduler-3][kafka.server.OffsetManager] Removed 1 stale offsets in 1 milliseconds.{quote} Parameters are ignored and default values are used instead. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 35454: Patch for KAFKA-2159
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/35454/ --- (Updated June 17, 2015, 6:19 a.m.) Review request for kafka. Bugs: KAFKA-2159 https://issues.apache.org/jira/browse/KAFKA-2159 Repository: kafka Description (updated) --- Addessing Joel's comments, enabled offsets.topic.compression.codec config usage Diffs (updated) - core/src/main/scala/kafka/server/KafkaServer.scala b320ce9f6a12c0ee392e91beb82e8804d167f9f4 core/src/main/scala/kafka/server/OffsetManager.scala 5cca85cf727975f6d3acb2223fd186753ad761dc Diff: https://reviews.apache.org/r/35454/diff/ Testing --- Thanks, Manikumar Reddy O
Re: Review Request 34805: Patch for KAFKA-2213
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34805/ --- (Updated June 17, 2015, 10:38 a.m.) Review request for kafka. Bugs: KAFKA-2213 https://issues.apache.org/jira/browse/KAFKA-2213 Repository: kafka Description (updated) --- Addessing Joel's comments Diffs (updated) - clients/src/main/java/org/apache/kafka/common/record/Compressor.java e570b29d5ffba5d3754c46670b708f7d511086f3 clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b2db2403868b1e7361b8514cfed2e76ef785edee core/src/main/scala/kafka/log/LogCleaner.scala c9ade7208798fbd92d4ff49e183fe5f8925c82a9 core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala 471ddff9bff1bdfa277c071e59e5c6b749b9c74f Diff: https://reviews.apache.org/r/34805/diff/ Testing --- Thanks, Manikumar Reddy O
[jira] [Updated] (KAFKA-2213) Log cleaner should write compacted messages using configured compression type
[ https://issues.apache.org/jira/browse/KAFKA-2213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar Reddy updated KAFKA-2213: --- Status: Patch Available (was: In Progress) Log cleaner should write compacted messages using configured compression type - Key: KAFKA-2213 URL: https://issues.apache.org/jira/browse/KAFKA-2213 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Manikumar Reddy Attachments: KAFKA-2213.patch, KAFKA-2213_2015-05-30_00:23:01.patch, KAFKA-2213_2015-06-17_16:05:53.patch In KAFKA-1374 the log cleaner was improved to handle compressed messages. There were a couple of follow-ups from that: * We write compacted messages using the original compression type in the compressed message-set. We should instead append all retained messages with the configured broker compression type of the topic. * While compressing messages we should ideally do some batching before compression. * Investigate the use of the client compressor. (See the discussion in the RBs for KAFKA-1374) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2213) Log cleaner should write compacted messages using configured compression type
[ https://issues.apache.org/jira/browse/KAFKA-2213?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Manikumar Reddy updated KAFKA-2213: --- Attachment: KAFKA-2213_2015-06-17_16:05:53.patch Log cleaner should write compacted messages using configured compression type - Key: KAFKA-2213 URL: https://issues.apache.org/jira/browse/KAFKA-2213 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Manikumar Reddy Attachments: KAFKA-2213.patch, KAFKA-2213_2015-05-30_00:23:01.patch, KAFKA-2213_2015-06-17_16:05:53.patch In KAFKA-1374 the log cleaner was improved to handle compressed messages. There were a couple of follow-ups from that: * We write compacted messages using the original compression type in the compressed message-set. We should instead append all retained messages with the configured broker compression type of the topic. * While compressing messages we should ideally do some batching before compression. * Investigate the use of the client compressor. (See the discussion in the RBs for KAFKA-1374) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2213) Log cleaner should write compacted messages using configured compression type
[ https://issues.apache.org/jira/browse/KAFKA-2213?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14589610#comment-14589610 ] Manikumar Reddy commented on KAFKA-2213: Updated reviewboard https://reviews.apache.org/r/34805/diff/ against branch origin/trunk Log cleaner should write compacted messages using configured compression type - Key: KAFKA-2213 URL: https://issues.apache.org/jira/browse/KAFKA-2213 Project: Kafka Issue Type: Bug Reporter: Joel Koshy Assignee: Manikumar Reddy Attachments: KAFKA-2213.patch, KAFKA-2213_2015-05-30_00:23:01.patch, KAFKA-2213_2015-06-17_16:05:53.patch In KAFKA-1374 the log cleaner was improved to handle compressed messages. There were a couple of follow-ups from that: * We write compacted messages using the original compression type in the compressed message-set. We should instead append all retained messages with the configured broker compression type of the topic. * While compressing messages we should ideally do some batching before compression. * Investigate the use of the client compressor. (See the discussion in the RBs for KAFKA-1374) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2282) ConsumerConnector enhance
Jiankang Liu created KAFKA-2282: --- Summary: ConsumerConnector enhance Key: KAFKA-2282 URL: https://issues.apache.org/jira/browse/KAFKA-2282 Project: Kafka Issue Type: Improvement Components: consumer Reporter: Jiankang Liu Assignee: Neha Narkhede 1. As a developer, I want to manually handle rebalance situation, such as commit offset to avoid consume duplicate message; thus export the setConsumerRebalanceListener interface in java api. 2. It will throw a ClassCastException to just simply use asInstanceOf to cast java.util.map to scala immutable.map; it'd better use JavaConversions, not use JavaConverters to keep code style syched and compatible for scala under 2.8 version. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (KAFKA-2282) ConsumerConnector enhance
[ https://issues.apache.org/jira/browse/KAFKA-2282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jiankang Liu updated KAFKA-2282: Attachment: KAFKA-2282.patch Attached patch to fix this issue. ConsumerConnector enhance - Key: KAFKA-2282 URL: https://issues.apache.org/jira/browse/KAFKA-2282 Project: Kafka Issue Type: Improvement Components: consumer Reporter: Jiankang Liu Assignee: Neha Narkhede Attachments: KAFKA-2282.patch 1. As a developer, I want to manually handle rebalance situation, such as commit offset to avoid consume duplicate message; thus export the setConsumerRebalanceListener interface in java api. 2. It will throw a ClassCastException to just simply use asInstanceOf to cast java.util.map to scala immutable.map; it'd better use JavaConversions, not use JavaConverters to keep code style syched and compatible for scala under 2.8 version. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Created] (KAFKA-2283) scheduler exception on non-controller node when shutdown
allenlee created KAFKA-2283: --- Summary: scheduler exception on non-controller node when shutdown Key: KAFKA-2283 URL: https://issues.apache.org/jira/browse/KAFKA-2283 Project: Kafka Issue Type: Bug Components: controller Affects Versions: 0.8.2.1 Environment: linux debian Reporter: allenlee Assignee: Neha Narkhede Priority: Minor When broker shutdown, there is an error log about 'Kafka scheduler has not been started'. It only appears on non-controller node. If this broker is the controller, it shutdown without warning log. IMHO, *autoRebalanceScheduler.shutdown()* should only valid for controller, right? {quote} [2015-06-17 22:32:51,814] INFO Shutdown complete. (kafka.log.LogManager) [2015-06-17 22:32:51,815] WARN Kafka scheduler has not been started (kafka.utils.Utils$) java.lang.IllegalStateException: Kafka scheduler has not been started at kafka.utils.KafkaScheduler.ensureStarted(KafkaScheduler.scala:114) at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:86) at kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:350) at kafka.controller.KafkaController.shutdown(KafkaController.scala:664) at kafka.server.KafkaServer$$anonfun$shutdown$8.apply$mcV$sp(KafkaServer.scala:285) at kafka.utils.Utils$.swallow(Utils.scala:172) at kafka.utils.Logging$class.swallowWarn(Logging.scala:92) at kafka.utils.Utils$.swallowWarn(Utils.scala:45) at kafka.utils.Logging$class.swallow(Logging.scala:94) at kafka.utils.Utils$.swallow(Utils.scala:45) at kafka.server.KafkaServer.shutdown(KafkaServer.scala:285) at kafka.server.KafkaServerStartable.shutdown(KafkaServerStartable.scala:42) at kafka.Kafka$$anon$1.run(Kafka.scala:42) [2015-06-17 22:32:51,818] INFO Terminate ZkClient event thread. (org.I0Itec.zkclient.ZkEventThread) {quote} -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (KAFKA-2078) Getting Selector [WARN] Error in I/O with host java.io.EOFException
[ https://issues.apache.org/jira/browse/KAFKA-2078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanelfocusedCommentId=14589891#comment-14589891 ] Sayat Satybaldiyev commented on KAFKA-2078: --- Thanks! No I don't see something wrong in broker log. Getting Selector [WARN] Error in I/O with host java.io.EOFException --- Key: KAFKA-2078 URL: https://issues.apache.org/jira/browse/KAFKA-2078 Project: Kafka Issue Type: Bug Components: producer Affects Versions: 0.8.2.0 Environment: OS Version: 2.6.39-400.209.1.el5uek and Hardware: 8 x Intel(R) Xeon(R) CPU X5660 @ 2.80GHz/44GB Reporter: Aravind Assignee: Jun Rao When trying to Produce 1000 (10 MB) messages, getting this below error some where between 997 to 1000th message. There is no pattern but able to reproduce. [PDT] 2015-03-31 13:53:50 Selector [WARN] Error in I/O with our host java.io.EOFException at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:62) at org.apache.kafka.common.network.Selector.poll(Selector.java:248) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) at java.lang.Thread.run(Thread.java:724) This error I am getting some times @ 997th message or 999th message. There is no pattern but able to reproduce. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
Re: Review Request 33049: Patch for KAFKA-2084
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33049/#review88180 --- This is looking very good. clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java (line 38) https://reviews.apache.org/r/33049/#comment140622 We can remove the setter. clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java (line 120) https://reviews.apache.org/r/33049/#comment140623 How about making this a bit more concise and past tense (since you would `getMessage` on the exception after the fact): `%s violated quota. Actual: %f, Threshold: %f` clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java (line 88) https://reviews.apache.org/r/33049/#comment140648 Occurences - Occurrences Also, `(0..%d) = %d` substituted with `count` and `count / elapsedSecs` - similar comment for the asserts below. clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java (line 92) https://reviews.apache.org/r/33049/#comment140649 `long sleepTimeMs = 2000` core/src/main/scala/kafka/server/ClientQuotaMetrics.scala (line 1) https://reviews.apache.org/r/33049/#comment140650 Why was MockTime moved from test to main? core/src/main/scala/kafka/server/ClientQuotaMetrics.scala (line 140) https://reviews.apache.org/r/33049/#comment140654 would prefer to see this on a single line (no braces) core/src/main/scala/kafka/server/KafkaConfig.scala (line 842) https://reviews.apache.org/r/33049/#comment140656 Prefer a: b over a : b core/src/main/scala/kafka/server/ThrottledRequest.scala (line 42) https://reviews.apache.org/r/33049/#comment140657 `if (` core/src/main/scala/kafka/server/ThrottledRequest.scala (line 43) https://reviews.apache.org/r/33049/#comment140658 same core/src/test/scala/integration/kafka/api/QuotasTest.scala (line 142) https://reviews.apache.org/r/33049/#comment140659 This is an important test, but this is a bit non-deterministic no? i.e., the replicas could have been throttled, but caught up soon after that. We would just need to assert (after) this test that the elapsed time is within the expected delay time for an otherwised throttled consumer. - Joel Koshy On June 12, 2015, 5:40 p.m., Aditya Auradkar wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/33049/ --- (Updated June 12, 2015, 5:40 p.m.) Review request for kafka, Joel Koshy and Jun Rao. Bugs: KAFKA-2084 https://issues.apache.org/jira/browse/KAFKA-2084 Repository: kafka Description --- Updated patch for quotas. This patch does the following: 1. Add per-client metrics for both producer and consumers 2. Add configuration for quotas 3. Compute delay times in the metrics package and return the delay times in QuotaViolationException 4. Add a DelayQueue in KafkaApi's that can be used to throttle any type of request. Implemented request throttling for produce and fetch requests. 5. Added unit and integration test cases for both producer and consumer 6. This doesn't include a system test. There is a separate ticket for that 7. Fixed KAFKA-2191 - (Included fix from : https://reviews.apache.org/r/34418/ ) Addressing Joel's comments Diffs - clients/src/main/java/org/apache/kafka/common/metrics/Quota.java d82bb0c055e631425bc1ebbc7d387baac76aeeaa clients/src/main/java/org/apache/kafka/common/metrics/QuotaViolationException.java a451e5385c9eca76b38b425e8ac856b2715fcffe clients/src/main/java/org/apache/kafka/common/metrics/Sensor.java ca823fd4639523018311b814fde69b6177e73b97 clients/src/main/java/org/apache/kafka/common/metrics/stats/Rate.java 98429da34418f7f1deba1b5e44e2e6025212edb3 clients/src/test/java/org/apache/kafka/common/metrics/MetricsTest.java 544e120594de78c43581a980b1e4087b4fb98ccb clients/src/test/java/org/apache/kafka/common/utils/MockTime.java core/src/main/scala/kafka/server/ClientQuotaMetrics.scala PRE-CREATION core/src/main/scala/kafka/server/KafkaApis.scala d63bc18d795a6f0e6994538ca55a9a46f7fb8ffd core/src/main/scala/kafka/server/KafkaConfig.scala 2d75186a110075e0c322db4b9f7a8c964a7a3e88 core/src/main/scala/kafka/server/KafkaServer.scala b320ce9f6a12c0ee392e91beb82e8804d167f9f4 core/src/main/scala/kafka/server/ReplicaManager.scala 59c9bc3ac3a8afc07a6f8c88c5871304db588d17 core/src/main/scala/kafka/server/ThrottledRequest.scala PRE-CREATION core/src/main/scala/kafka/utils/ShutdownableThread.scala fc226c863095b7761290292cd8755cd7ad0f155c core/src/test/scala/integration/kafka/api/QuotasTest.scala PRE-CREATION