----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27799/#review69355 -----------------------------------------------------------
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java <https://reviews.apache.org/r/27799/#comment114034> This should probably just return the ConnectionState since that's all it's used for. In fact, NodeConnectionState could be made a private nested class since ClusterConnectionStates is the only class that uses it. clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java <https://reviews.apache.org/r/27799/#comment114036> METADATA_MAX_AGE_CONFIG/DOC are still duplicated in the ProducerConfig and ConsumerConfig clients/src/main/java/org/apache/kafka/clients/NetworkClient.java <https://reviews.apache.org/r/27799/#comment114037> Parameter now is not used, should be removed. clients/src/main/java/org/apache/kafka/clients/NetworkClient.java <https://reviews.apache.org/r/27799/#comment114038> I think these methods need to have timeouts on them. They get called via synchronized methods in KafkaConsumer and KafkaConsumer.close() is also synchronized, so an attempt to shutdown the consumer could be blocked indefinitely by a call to completeAll(). clients/src/main/java/org/apache/kafka/clients/NetworkClient.java <https://reviews.apache.org/r/27799/#comment114039> These are just for debugging, but there's a return statement earlier where the selected node won't be logged. clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java <https://reviews.apache.org/r/27799/#comment114040> Using ConcatenatedIterable here too would require less copying -- just build up a list of iterators rather than copying all the entries into the ArrayList clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java <https://reviews.apache.org/r/27799/#comment114043> Does this give the right metric names? Looks different than the one in KafkaProducer (which doesn't have a trailing dot. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java <https://reviews.apache.org/r/27799/#comment114047> Don't need to call time.milliseconds twice here clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java <https://reviews.apache.org/r/27799/#comment114050> Won't this cause busy looping during network failures (and maybe due to other errors returned in the OffsetCommitResponse)? clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java <https://reviews.apache.org/r/27799/#comment114055> Timeout? Need to eventually allow things like close() requests to process. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java <https://reviews.apache.org/r/27799/#comment114063> A bunch of redundant time.milliseconds() calls in this method. A couple are necessary with the loops, but a lot could be removed. clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java <https://reviews.apache.org/r/27799/#comment114062> I'm confused what's going on here -- why Integer.MIN_VALUE? And how does this end up working with the rest of the code since this result node is used for consumerCoordinator and other code compares node IDs? core/src/main/scala/kafka/tools/ConsumerPerformance.scala <https://reviews.apache.org/r/27799/#comment114069> Lots of unused import cleanup here. - Ewen Cheslack-Postava On Jan. 23, 2015, 4:22 a.m., Jay Kreps wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/27799/ > ----------------------------------------------------------- > > (Updated Jan. 23, 2015, 4:22 a.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1760 > https://issues.apache.org/jira/browse/KAFKA-1760 > > > Repository: kafka > > > Description > ------- > > New consumer. > > > Diffs > ----- > > build.gradle 1cbab29ce83e20dae0561b51eed6fdb86d522f28 > clients/src/main/java/org/apache/kafka/clients/ClientRequest.java > d32c319d8ee4c46dad309ea54b136ea9798e2fd7 > clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java > 8aece7e81a804b177a6f2c12e2dc6c89c1613262 > clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/ConnectionState.java > ab7e3220f9b76b92ef981d695299656f041ad5ed > clients/src/main/java/org/apache/kafka/clients/KafkaClient.java > 397695568d3fd8e835d8f923a89b3b00c96d0ead > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java > 6746275d0b2596cd6ff7ce464a3a8225ad75ef00 > > clients/src/main/java/org/apache/kafka/clients/RequestCompletionHandler.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java > c0c636b3e1ba213033db6d23655032c9bbd5e378 > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java > 57c1807ccba9f264186f83e91f37c34b959c8060 > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java > e4cf7d1cfa01c2844b53213a7b539cdcbcbeaa3a > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java > 16af70a5de52cca786fdea147a6a639b7dc4a311 > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java > bdf4b26942d5a8c8a9503e05908e9a9eff6228a7 > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java > 76efc216c9e6c3ab084461d792877092a189ad0f > clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java > fa88ac1a8b19b4294f211c4467fe68c7707ddbae > > clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java > ea423ad15eebd262d20d5ec05d592cc115229177 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > fc71710dd5997576d3841a1c3b0f7e19a8c9698e > clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java > 904976fadf0610982958628eaee810b60a98d725 > clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java > 8b3e565edd1ae04d8d34bd9f1a41e9fa8c880a75 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java > dcf46581b912cfb1b5c8d4cbc293d2d1444b7740 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java > 483899d2e69b33655d0e08949f5f64af2519660a > > clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java > ccc03d8447ebba40131a70e16969686ac4aab58a > clients/src/main/java/org/apache/kafka/common/Cluster.java > d3299b944062d96852452de455902659ad8af757 > clients/src/main/java/org/apache/kafka/common/PartitionInfo.java > b15aa2c3ef2d7c4b24618ff42fd4da324237a813 > clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java > 98cb79b701918eca3f6ca9823b6c7b7c97b3ecec > clients/src/main/java/org/apache/kafka/common/errors/ApiException.java > 7c948b166a8ac07616809f260754116ae7764973 > clients/src/main/java/org/apache/kafka/common/network/Selectable.java > b68bbf00ab8eba6c5867d346c91188142593ca6e > clients/src/main/java/org/apache/kafka/common/network/Selector.java > 74d695ba39de44b6a3d15340ec0114bc4fce2ba2 > clients/src/main/java/org/apache/kafka/common/protocol/Errors.java > 3316b6a1098311b8603a4a5893bf57b75d2e43cb > clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java > 121e880a941fcd3e6392859edba11a94236494cc > clients/src/main/java/org/apache/kafka/common/record/LogEntry.java > e4d688cbe0c61b74ea15fc8dd3b634f9e5ee9b83 > clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java > 040e5b91005edb8f015afdfa76fd94e0bf3cb4ca > > clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java > 99b52c23d639df010bf2affc0f79d1c6e16ed67c > > clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java > 8b8f591c4b2802a9cbbe34746c0b3ca4a64a8681 > clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java > 2fc471f64f4352eeb128bbd3941779780076fb8c > clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java > f719010119951e55795a583796a93a251e1ba404 > > clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java > 9512db2365d8d1f7a9a9e93b2bebabd877881143 > > clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java > 8997ffc44c18b7ae14365161c04eb423793cb8c9 > > clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java > d6e91f393179809f72d52028d6a8a2b2bb43b820 > > clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java > efe89796a7bd0d1cabc138f695b0eb28007663a7 > > clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java > 99364c1ca464f7b81be7d3da15b40ab66717a659 > > clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java > ac239712f11848755cc01399b4027ea0a8d7d8ac > clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java > b22ca1dce65f665d84c2a980fd82f816e93d9960 > > clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java > d97962d3840179b1abf01459522c58e59102ac8d > > clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java > 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f > > clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java > 711232ac613786392f1389f2f883818bf4ac2a33 > > clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java > 90d5135b97a44d1181bd595c8eab2f2aef1a728a > > clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java > 6b7c269ad7679df57c6bd505516075add39b7534 > clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java > 3dbba8a360f11b2b1db259ecce11e15d8eeaccfb > clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java > 5220464913e6e82734e7f9ff886be7fdfa80361f > > clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java > 3c001d33091c0f04ac3bf49a6731ab9e9f2bb0c4 > clients/src/main/java/org/apache/kafka/common/utils/Utils.java > 527dd0f9c47fce7310b7a37a9b95bf87f1b9c292 > clients/src/test/java/org/apache/kafka/clients/MockClient.java > 47b5d4ac1f2a5d162541cab658add9cc40b3aa4e > clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java > 1a55242e9399fa4669630b55110d530f954e1279 > > clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java > 29ad25e90606f065b409bc194ef9200189933aa6 > > clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java > PRE-CREATION > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java > PRE-CREATION > clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java > 12368038e1381f89352a5a1c98b2e0f8cbc04cbd > > clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java > 3676b05eb8b83bbd8a38754fc3a979124908281b > > clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java > 1d077fd4c56bf28557a0ca1e15c82cc427f36e6c > clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java > 66cbdf5babed3ac27c5254d945372644a053df87 > clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java > 3c442a27a7ba3eaa908becc802aab6242594df85 > clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java > 74c19573a29b0d5ac4264e1430904dca52571404 > clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java > a39fab532f73148316a56c0f8e9197f38ea66f79 > clients/src/test/java/org/apache/kafka/test/MockSelector.java > d61de529173314c044261ad9662bec735d67e97f > core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala > 6d00ed090d76cd7925621a9c6db8fb00fb9d48f4 > core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala > 84f60178f6ebae735c8aa3e79ed93fe21ac4aea7 > core/src/main/scala/kafka/cluster/Partition.scala > b230e9a1fb1a3161f4c9d164e4890a16eceb2ad4 > core/src/main/scala/kafka/controller/ControllerChannelManager.scala > eb492f00449744bc8d63f55b393e2a1659d38454 > core/src/main/scala/kafka/log/LogConfig.scala > 4631bc78106e6167ab6c0a781f40dcd8cd265598 > core/src/main/scala/kafka/server/KafkaApis.scala > ec8d9f7ba44741db40875458bd524c4062ad6a26 > core/src/main/scala/kafka/server/ReplicaManager.scala > e58fbb922e93b0c31dff04f187fcadb4ece986d7 > core/src/main/scala/kafka/tools/ConsoleConsumer.scala > e455cb9a1de221a6d080e3abd909ea8f24ff7fe9 > core/src/main/scala/kafka/tools/ConsumerPerformance.scala > 093c800ea7f8a9c972bb66e99ac4e4d431cf11cc > core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala > 7602b8d705970a5dab49ed36d117346a960701ac > core/src/main/scala/kafka/utils/KafkaScheduler.scala > 9a16343d2ff7192b741f0b23a6bdf58d8f2bbd3e > core/src/test/scala/integration/kafka/api/ConsumerTest.scala PRE-CREATION > core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala > PRE-CREATION > core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala > cd16ced5465d098be7a60498326b2a98c248f343 > core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala > 3cf7c9bcd64492d05590067a8ad11d31096a8e5e > core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala > a5386a03b62956bc440b40783247c8cdf7432315 > core/src/test/scala/unit/kafka/utils/MockScheduler.scala > d5896ed4d3b73aecb652436b5dfc80c2835af595 > core/src/test/scala/unit/kafka/utils/SchedulerTest.scala > b364ac2d6d623f6e86805710ca68dc32d92558c1 > core/src/test/scala/unit/kafka/utils/TestUtils.scala > ac15d34425795d5be20c51b01fa1108bdcd66583 > > Diff: https://reviews.apache.org/r/27799/diff/ > > > Testing > ------- > > > Thanks, > > Jay Kreps > >