> On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote: > > clients/src/main/java/org/apache/kafka/clients/KafkaClient.java, lines 30-33 > > <https://reviews.apache.org/r/27799/diff/3/?file=824651#file824651line30> > > > > Wondering why we make newline for @param but keep the same line for > > @return?
Yeah that is annoying auto-formatting from my IDE, will fix. > On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote: > > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java, line 167 > > <https://reviews.apache.org/r/27799/diff/3/?file=824652#file824652line167> > > > > Is this function really private? If yes we do not need keep the javadoc > > for it. Well we actually have javadoc'd everythin in that class. > On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java, > > lines 37-44 > > <https://reviews.apache.org/r/27799/diff/3/?file=824655#file824655line37> > > > > The javadocs here are a little confusing: users looking for its > > function APIs need to look into KafkaConsumer, an implementation of the > > interface Consumer. The motivation here was that the description of behavior is actually very specific to the KafkaConsumer implementation (e.g. details of communication to the server and so on). I didn't want to duplicate the docs either. Actually the interface is not really meant for use, we don't want a bunch of implementations, people should either use our mock or the real class. Mocks for our class will likely break as we add methods. > On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java, > > line 114 > > <https://reviews.apache.org/r/27799/diff/3/?file=824656#file824656line114> > > > > Should this inherit from CommonClientConfig? Well, actually ConsumerConfig and ProducerConfig are public and CommonClientConfig is private...I just wanted a way to share those variables. > On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java, > > lines 185-191 > > <https://reviews.apache.org/r/27799/diff/3/?file=824656#file824656line185> > > > > Could these two be moved to CommonClientConfig? Actually it's subtly different since it is key.serializer in the producer and key.deserializer in the consumer. > On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, > > line 845 > > <https://reviews.apache.org/r/27799/diff/3/?file=824660#file824660line845> > > > > Maybe add some comments on why consumerId / generationId are > > initialized as such and when they will be updated and used. This is actually not handled yet as we need the consumer co-ordinator to maintain and update these variables first. > On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, > > line 917 > > <https://reviews.apache.org/r/27799/diff/3/?file=824660#file824660line917> > > > > There is a potential risk that a topic is deleted and the consumer > > unscribes to it, but not removing it from its metadata topic list, causing > > the underlying network client to keep refreshing metadata. I think you are flagging that we never remove topics from our metadata fetch list. This is true. But I don't think this will cause metadata refreshes to occur will it? Another question here is just what the behavior should be for a topic that is deleted. There are obviously a bunch of states the consumer could be in when the topic gets deleted so we should ideally be sure we handle them all. I haven't thought a lot about this. > On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, > > lines 968-973 > > <https://reviews.apache.org/r/27799/diff/3/?file=824660#file824660line968> > > > > Should the ordering of these two "else if" be swapped? I don't think so. I think the idea is that if the coordinator has hit the failure detection criteria you want to mark it dead and continue. You don't to keep trying to heartbeat to it (since you have already failed multiple times). > On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, > > line 1075 > > <https://reviews.apache.org/r/27799/diff/3/?file=824660#file824660line1075> > > > > Do we need to back off here? Good call. > On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, > > line 1343 > > <https://reviews.apache.org/r/27799/diff/3/?file=824660#file824660line1343> > > > > Should we change the "this.client.inFlightRequestCount .. " condition > > to just "node.ready()"? Actually the criteria here is stricter. We need to issue successive fetch requests at offset X, X', X'', etc. So we need to prevent ourselves from issuing fetch request for offset X' until X has completed. So if we have any outstanding requests on the connection we don't want to issue another one irrespective of what the in-flight-request limit is (which actually we probably want to be > 1 so that the coordinator can have offset commits, heartbeats, and etc all happening without blocking. > On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, > > line 1625 > > <https://reviews.apache.org/r/27799/diff/3/?file=824660#file824660line1625> > > > > Rename the function name? Hmm, to what? > On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote: > > clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java, > > lines 1-29 > > <https://reviews.apache.org/r/27799/diff/3/?file=824662#file824662line1> > > > > This kafka exception could be thrown other places besides committed() > > and position(), it could also be thrown in: > > > > private resetOffset() -> > > > > private fetchMissingPositionsOrResetThem() -> > > > > public position() > > public pool() > > > > private handleFetchResponse() -> > > > > public pool() > > > > > > Hence in pool() we need to hanle this exception specifically. > > > > In general, I would suggest we add the @throws label to private > > functions also for easy maintainning the throwable exceptions. Yeah the behavior is actually correct, I think. The general philosophy is that if the error is recoverable we should log it and keep trying but if it is fatal we should throw it. This is a fatal case (you are out of range and have no reset policy) so we should just blow up whatever the calling method is. I added the appropriate javadoc on poll() and position(). - Jay ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/27799/#review68693 ----------------------------------------------------------- On Jan. 23, 2015, 9:15 p.m., Jay Kreps wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/27799/ > ----------------------------------------------------------- > > (Updated Jan. 23, 2015, 9:15 p.m.) > > > Review request for kafka. > > > Bugs: KAFKA-1760 > https://issues.apache.org/jira/browse/KAFKA-1760 > > > Repository: kafka > > > Description > ------- > > New consumer. > > > Diffs > ----- > > build.gradle 1cbab29ce83e20dae0561b51eed6fdb86d522f28 > clients/src/main/java/org/apache/kafka/clients/ClientRequest.java > d32c319d8ee4c46dad309ea54b136ea9798e2fd7 > clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java > 8aece7e81a804b177a6f2c12e2dc6c89c1613262 > clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/ConnectionState.java > ab7e3220f9b76b92ef981d695299656f041ad5ed > clients/src/main/java/org/apache/kafka/clients/KafkaClient.java > 397695568d3fd8e835d8f923a89b3b00c96d0ead > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java > 6746275d0b2596cd6ff7ce464a3a8225ad75ef00 > clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java > 752a979ea0b8bde7ff6d2e1a23bf54052305d841 > clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java > c0c636b3e1ba213033db6d23655032c9bbd5e378 > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java > 57c1807ccba9f264186f83e91f37c34b959c8060 > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java > e4cf7d1cfa01c2844b53213a7b539cdcbcbeaa3a > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java > 16af70a5de52cca786fdea147a6a639b7dc4a311 > > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java > bdf4b26942d5a8c8a9503e05908e9a9eff6228a7 > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java > 76efc216c9e6c3ab084461d792877092a189ad0f > clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java > fa88ac1a8b19b4294f211c4467fe68c7707ddbae > > clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java > ea423ad15eebd262d20d5ec05d592cc115229177 > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java > PRE-CREATION > > clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java > PRE-CREATION > clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java > fc71710dd5997576d3841a1c3b0f7e19a8c9698e > clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java > 904976fadf0610982958628eaee810b60a98d725 > clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java > 8b3e565edd1ae04d8d34bd9f1a41e9fa8c880a75 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java > dcf46581b912cfb1b5c8d4cbc293d2d1444b7740 > > clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java > 483899d2e69b33655d0e08949f5f64af2519660a > > clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java > ccc03d8447ebba40131a70e16969686ac4aab58a > clients/src/main/java/org/apache/kafka/common/Cluster.java > d3299b944062d96852452de455902659ad8af757 > clients/src/main/java/org/apache/kafka/common/PartitionInfo.java > b15aa2c3ef2d7c4b24618ff42fd4da324237a813 > clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java > 98cb79b701918eca3f6ca9823b6c7b7c97b3ecec > clients/src/main/java/org/apache/kafka/common/errors/ApiException.java > 7c948b166a8ac07616809f260754116ae7764973 > clients/src/main/java/org/apache/kafka/common/network/Selectable.java > b68bbf00ab8eba6c5867d346c91188142593ca6e > clients/src/main/java/org/apache/kafka/common/network/Selector.java > 74d695ba39de44b6a3d15340ec0114bc4fce2ba2 > clients/src/main/java/org/apache/kafka/common/protocol/Errors.java > 3316b6a1098311b8603a4a5893bf57b75d2e43cb > clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java > 121e880a941fcd3e6392859edba11a94236494cc > clients/src/main/java/org/apache/kafka/common/record/LogEntry.java > e4d688cbe0c61b74ea15fc8dd3b634f9e5ee9b83 > clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java > 040e5b91005edb8f015afdfa76fd94e0bf3cb4ca > > clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java > 99b52c23d639df010bf2affc0f79d1c6e16ed67c > > clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java > 8b8f591c4b2802a9cbbe34746c0b3ca4a64a8681 > clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java > 2fc471f64f4352eeb128bbd3941779780076fb8c > clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java > f719010119951e55795a583796a93a251e1ba404 > > clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java > 9512db2365d8d1f7a9a9e93b2bebabd877881143 > > clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java > 8997ffc44c18b7ae14365161c04eb423793cb8c9 > > clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java > d6e91f393179809f72d52028d6a8a2b2bb43b820 > > clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java > efe89796a7bd0d1cabc138f695b0eb28007663a7 > > clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java > 99364c1ca464f7b81be7d3da15b40ab66717a659 > > clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java > ac239712f11848755cc01399b4027ea0a8d7d8ac > clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java > b22ca1dce65f665d84c2a980fd82f816e93d9960 > > clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java > d97962d3840179b1abf01459522c58e59102ac8d > > clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java > 3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f > > clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java > 711232ac613786392f1389f2f883818bf4ac2a33 > > clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java > 90d5135b97a44d1181bd595c8eab2f2aef1a728a > > clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java > 6b7c269ad7679df57c6bd505516075add39b7534 > clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java > 3dbba8a360f11b2b1db259ecce11e15d8eeaccfb > clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java > 5220464913e6e82734e7f9ff886be7fdfa80361f > > clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java > 3c001d33091c0f04ac3bf49a6731ab9e9f2bb0c4 > clients/src/main/java/org/apache/kafka/common/utils/Utils.java > 527dd0f9c47fce7310b7a37a9b95bf87f1b9c292 > clients/src/test/java/org/apache/kafka/clients/MockClient.java > 47b5d4ac1f2a5d162541cab658add9cc40b3aa4e > clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java > 1a55242e9399fa4669630b55110d530f954e1279 > > clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java > 29ad25e90606f065b409bc194ef9200189933aa6 > > clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java > PRE-CREATION > > clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java > PRE-CREATION > clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java > 12368038e1381f89352a5a1c98b2e0f8cbc04cbd > > clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java > 3676b05eb8b83bbd8a38754fc3a979124908281b > > clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java > 1d077fd4c56bf28557a0ca1e15c82cc427f36e6c > clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java > 66cbdf5babed3ac27c5254d945372644a053df87 > clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java > 3c442a27a7ba3eaa908becc802aab6242594df85 > clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java > 74c19573a29b0d5ac4264e1430904dca52571404 > clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java > a39fab532f73148316a56c0f8e9197f38ea66f79 > clients/src/test/java/org/apache/kafka/test/MockSelector.java > d61de529173314c044261ad9662bec735d67e97f > core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala > 6d00ed090d76cd7925621a9c6db8fb00fb9d48f4 > core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala > 84f60178f6ebae735c8aa3e79ed93fe21ac4aea7 > core/src/main/scala/kafka/cluster/Partition.scala > b230e9a1fb1a3161f4c9d164e4890a16eceb2ad4 > core/src/main/scala/kafka/controller/ControllerChannelManager.scala > eb492f00449744bc8d63f55b393e2a1659d38454 > core/src/main/scala/kafka/log/LogConfig.scala > 4631bc78106e6167ab6c0a781f40dcd8cd265598 > core/src/main/scala/kafka/server/KafkaApis.scala > ec8d9f7ba44741db40875458bd524c4062ad6a26 > core/src/main/scala/kafka/server/ReplicaManager.scala > e58fbb922e93b0c31dff04f187fcadb4ece986d7 > core/src/main/scala/kafka/tools/ConsoleConsumer.scala > e455cb9a1de221a6d080e3abd909ea8f24ff7fe9 > core/src/main/scala/kafka/tools/ConsumerPerformance.scala > 093c800ea7f8a9c972bb66e99ac4e4d431cf11cc > core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala > 7602b8d705970a5dab49ed36d117346a960701ac > core/src/main/scala/kafka/utils/KafkaScheduler.scala > 9a16343d2ff7192b741f0b23a6bdf58d8f2bbd3e > core/src/test/scala/integration/kafka/api/ConsumerTest.scala PRE-CREATION > core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala > PRE-CREATION > core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala > cd16ced5465d098be7a60498326b2a98c248f343 > core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala > 3cf7c9bcd64492d05590067a8ad11d31096a8e5e > core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala > a5386a03b62956bc440b40783247c8cdf7432315 > core/src/test/scala/unit/kafka/utils/MockScheduler.scala > d5896ed4d3b73aecb652436b5dfc80c2835af595 > core/src/test/scala/unit/kafka/utils/SchedulerTest.scala > b364ac2d6d623f6e86805710ca68dc32d92558c1 > core/src/test/scala/unit/kafka/utils/TestUtils.scala > ac15d34425795d5be20c51b01fa1108bdcd66583 > > Diff: https://reviews.apache.org/r/27799/diff/ > > > Testing > ------- > > > Thanks, > > Jay Kreps > >