> On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/KafkaClient.java, lines 30-33
> > <https://reviews.apache.org/r/27799/diff/3/?file=824651#file824651line30>
> >
> >     Wondering why we make newline for @param but keep the same line for 
> > @return?

Yeah that is annoying auto-formatting from my IDE, will fix.


> On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/NetworkClient.java, line 167
> > <https://reviews.apache.org/r/27799/diff/3/?file=824652#file824652line167>
> >
> >     Is this function really private? If yes we do not need keep the javadoc 
> > for it.

Well we actually have javadoc'd everythin in that class.


> On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java, 
> > lines 37-44
> > <https://reviews.apache.org/r/27799/diff/3/?file=824655#file824655line37>
> >
> >     The javadocs here are a little confusing: users looking for its 
> > function APIs need to look into KafkaConsumer, an implementation of the 
> > interface Consumer.

The motivation here was that the description of behavior is actually very 
specific to the KafkaConsumer implementation (e.g. details of communication to 
the server and so on). I didn't want to duplicate the docs either. Actually the 
interface is not really meant for use, we don't want a bunch of 
implementations, people should either use our mock or the real class. Mocks for 
our class will likely break as we add methods.


> On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java,
> >  line 114
> > <https://reviews.apache.org/r/27799/diff/3/?file=824656#file824656line114>
> >
> >     Should this inherit from CommonClientConfig?

Well, actually ConsumerConfig and ProducerConfig are public and 
CommonClientConfig is private...I just wanted a way to share those variables.


> On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java,
> >  lines 185-191
> > <https://reviews.apache.org/r/27799/diff/3/?file=824656#file824656line185>
> >
> >     Could these two be moved to CommonClientConfig?

Actually it's subtly different since it is key.serializer in the producer and 
key.deserializer in the consumer.


> On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > line 845
> > <https://reviews.apache.org/r/27799/diff/3/?file=824660#file824660line845>
> >
> >     Maybe add some comments on why consumerId / generationId are 
> > initialized as such and when they will be updated and used.

This is actually not handled yet as we need the consumer co-ordinator to 
maintain and update these variables first.


> On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > line 917
> > <https://reviews.apache.org/r/27799/diff/3/?file=824660#file824660line917>
> >
> >     There is a potential risk that a topic is deleted and the consumer 
> > unscribes to it, but not removing it from its metadata topic list, causing 
> > the underlying network client to keep refreshing metadata.

I think you are flagging that we never remove topics from our metadata fetch 
list. This is true. But I don't think this will cause metadata refreshes to 
occur will it?

Another question here is just what the behavior should be for a topic that is 
deleted. There are obviously a bunch of states the consumer could be in when 
the topic gets deleted so we should ideally be sure we handle them all. I 
haven't thought a lot about this.


> On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > lines 968-973
> > <https://reviews.apache.org/r/27799/diff/3/?file=824660#file824660line968>
> >
> >     Should the ordering of these two "else if" be swapped?

I don't think so. I think the idea is that if the coordinator has hit the 
failure detection criteria you want to mark it dead and continue. You don't to 
keep trying to heartbeat to it (since you have already failed multiple times).


> On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > line 1075
> > <https://reviews.apache.org/r/27799/diff/3/?file=824660#file824660line1075>
> >
> >     Do we need to back off here?

Good call.


> On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > line 1343
> > <https://reviews.apache.org/r/27799/diff/3/?file=824660#file824660line1343>
> >
> >     Should we change the "this.client.inFlightRequestCount .. " condition 
> > to just "node.ready()"?

Actually the criteria here is stricter. We need to issue successive fetch 
requests at offset X, X', X'', etc. So we need to prevent ourselves from 
issuing fetch request for offset X' until X has completed. So if we have any 
outstanding requests on the connection we don't want to issue another one 
irrespective of what the in-flight-request limit is (which actually we probably 
want to be > 1 so that the coordinator can have offset commits, heartbeats, and 
etc all happening without blocking.


> On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java, 
> > line 1625
> > <https://reviews.apache.org/r/27799/diff/3/?file=824660#file824660line1625>
> >
> >     Rename the function name?

Hmm, to what?


> On Jan. 20, 2015, 8:03 a.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java,
> >  lines 1-29
> > <https://reviews.apache.org/r/27799/diff/3/?file=824662#file824662line1>
> >
> >     This kafka exception could be thrown other places besides committed() 
> > and position(), it could also be thrown in:
> >     
> >     private resetOffset() -> 
> >     
> >       private fetchMissingPositionsOrResetThem() ->
> >     
> >         public position()
> >         public pool()
> >             
> >       private handleFetchResponse() ->
> >     
> >         public pool()
> >         
> >         
> >     Hence in pool() we need to hanle this exception specifically.
> >     
> >     In general, I would suggest we add the @throws label to private 
> > functions also for easy maintainning the throwable exceptions.

Yeah the behavior is actually correct, I think. The general philosophy is that 
if the error is recoverable we should log it and keep trying but if it is fatal 
we should throw it. This is a fatal case (you are out of range and have no 
reset policy) so we should just blow up whatever the calling method is. I added 
the appropriate javadoc on poll() and position().


- Jay


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27799/#review68693
-----------------------------------------------------------


On Jan. 23, 2015, 9:15 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/27799/
> -----------------------------------------------------------
> 
> (Updated Jan. 23, 2015, 9:15 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1760
>     https://issues.apache.org/jira/browse/KAFKA-1760
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> New consumer.
> 
> 
> Diffs
> -----
> 
>   build.gradle 1cbab29ce83e20dae0561b51eed6fdb86d522f28 
>   clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
> d32c319d8ee4c46dad309ea54b136ea9798e2fd7 
>   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
> 8aece7e81a804b177a6f2c12e2dc6c89c1613262 
>   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/ConnectionState.java 
> ab7e3220f9b76b92ef981d695299656f041ad5ed 
>   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
> 397695568d3fd8e835d8f923a89b3b00c96d0ead 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> 6746275d0b2596cd6ff7ce464a3a8225ad75ef00 
>   clients/src/main/java/org/apache/kafka/clients/NodeConnectionState.java 
> 752a979ea0b8bde7ff6d2e1a23bf54052305d841 
>   clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> c0c636b3e1ba213033db6d23655032c9bbd5e378 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
> 57c1807ccba9f264186f83e91f37c34b959c8060 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
>  e4cf7d1cfa01c2844b53213a7b539cdcbcbeaa3a 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java 
> 16af70a5de52cca786fdea147a6a639b7dc4a311 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
> bdf4b26942d5a8c8a9503e05908e9a9eff6228a7 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> 76efc216c9e6c3ab084461d792877092a189ad0f 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> fa88ac1a8b19b4294f211c4467fe68c7707ddbae 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java 
> ea423ad15eebd262d20d5ec05d592cc115229177 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> fc71710dd5997576d3841a1c3b0f7e19a8c9698e 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
> 904976fadf0610982958628eaee810b60a98d725 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> 8b3e565edd1ae04d8d34bd9f1a41e9fa8c880a75 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
>  dcf46581b912cfb1b5c8d4cbc293d2d1444b7740 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
>  483899d2e69b33655d0e08949f5f64af2519660a 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> ccc03d8447ebba40131a70e16969686ac4aab58a 
>   clients/src/main/java/org/apache/kafka/common/Cluster.java 
> d3299b944062d96852452de455902659ad8af757 
>   clients/src/main/java/org/apache/kafka/common/PartitionInfo.java 
> b15aa2c3ef2d7c4b24618ff42fd4da324237a813 
>   clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java 
> 98cb79b701918eca3f6ca9823b6c7b7c97b3ecec 
>   clients/src/main/java/org/apache/kafka/common/errors/ApiException.java 
> 7c948b166a8ac07616809f260754116ae7764973 
>   clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
> b68bbf00ab8eba6c5867d346c91188142593ca6e 
>   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
> 74d695ba39de44b6a3d15340ec0114bc4fce2ba2 
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
> 3316b6a1098311b8603a4a5893bf57b75d2e43cb 
>   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
> 121e880a941fcd3e6392859edba11a94236494cc 
>   clients/src/main/java/org/apache/kafka/common/record/LogEntry.java 
> e4d688cbe0c61b74ea15fc8dd3b634f9e5ee9b83 
>   clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
> 040e5b91005edb8f015afdfa76fd94e0bf3cb4ca 
>   
> clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
>  99b52c23d639df010bf2affc0f79d1c6e16ed67c 
>   
> clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
>  8b8f591c4b2802a9cbbe34746c0b3ca4a64a8681 
>   clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
> 2fc471f64f4352eeb128bbd3941779780076fb8c 
>   clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
> f719010119951e55795a583796a93a251e1ba404 
>   
> clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java 
> 9512db2365d8d1f7a9a9e93b2bebabd877881143 
>   
> clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java 
> 8997ffc44c18b7ae14365161c04eb423793cb8c9 
>   
> clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 
> d6e91f393179809f72d52028d6a8a2b2bb43b820 
>   
> clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java 
> efe89796a7bd0d1cabc138f695b0eb28007663a7 
>   
> clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java 
> 99364c1ca464f7b81be7d3da15b40ab66717a659 
>   
> clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
>  ac239712f11848755cc01399b4027ea0a8d7d8ac 
>   clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java 
> b22ca1dce65f665d84c2a980fd82f816e93d9960 
>   
> clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java 
> d97962d3840179b1abf01459522c58e59102ac8d 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
>  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
>  711232ac613786392f1389f2f883818bf4ac2a33 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
>  90d5135b97a44d1181bd595c8eab2f2aef1a728a 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
>  6b7c269ad7679df57c6bd505516075add39b7534 
>   clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
> 3dbba8a360f11b2b1db259ecce11e15d8eeaccfb 
>   clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
> 5220464913e6e82734e7f9ff886be7fdfa80361f 
>   
> clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 
> 3c001d33091c0f04ac3bf49a6731ab9e9f2bb0c4 
>   clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
> 527dd0f9c47fce7310b7a37a9b95bf87f1b9c292 
>   clients/src/test/java/org/apache/kafka/clients/MockClient.java 
> 47b5d4ac1f2a5d162541cab658add9cc40b3aa4e 
>   clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
> 1a55242e9399fa4669630b55110d530f954e1279 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java
>  29ad25e90606f065b409bc194ef9200189933aa6 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
> PRE-CREATION 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
>  PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 
> 12368038e1381f89352a5a1c98b2e0f8cbc04cbd 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 
> 3676b05eb8b83bbd8a38754fc3a979124908281b 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java 
> 1d077fd4c56bf28557a0ca1e15c82cc427f36e6c 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 
> 66cbdf5babed3ac27c5254d945372644a053df87 
>   clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java 
> 3c442a27a7ba3eaa908becc802aab6242594df85 
>   clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
> 74c19573a29b0d5ac4264e1430904dca52571404 
>   clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 
> a39fab532f73148316a56c0f8e9197f38ea66f79 
>   clients/src/test/java/org/apache/kafka/test/MockSelector.java 
> d61de529173314c044261ad9662bec735d67e97f 
>   core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala 
> 6d00ed090d76cd7925621a9c6db8fb00fb9d48f4 
>   core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala 
> 84f60178f6ebae735c8aa3e79ed93fe21ac4aea7 
>   core/src/main/scala/kafka/cluster/Partition.scala 
> b230e9a1fb1a3161f4c9d164e4890a16eceb2ad4 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
> eb492f00449744bc8d63f55b393e2a1659d38454 
>   core/src/main/scala/kafka/log/LogConfig.scala 
> 4631bc78106e6167ab6c0a781f40dcd8cd265598 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> ec8d9f7ba44741db40875458bd524c4062ad6a26 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> e58fbb922e93b0c31dff04f187fcadb4ece986d7 
>   core/src/main/scala/kafka/tools/ConsoleConsumer.scala 
> e455cb9a1de221a6d080e3abd909ea8f24ff7fe9 
>   core/src/main/scala/kafka/tools/ConsumerPerformance.scala 
> 093c800ea7f8a9c972bb66e99ac4e4d431cf11cc 
>   core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala 
> 7602b8d705970a5dab49ed36d117346a960701ac 
>   core/src/main/scala/kafka/utils/KafkaScheduler.scala 
> 9a16343d2ff7192b741f0b23a6bdf58d8f2bbd3e 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala PRE-CREATION 
>   core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 
> PRE-CREATION 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
> cd16ced5465d098be7a60498326b2a98c248f343 
>   core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala 
> 3cf7c9bcd64492d05590067a8ad11d31096a8e5e 
>   core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 
> a5386a03b62956bc440b40783247c8cdf7432315 
>   core/src/test/scala/unit/kafka/utils/MockScheduler.scala 
> d5896ed4d3b73aecb652436b5dfc80c2835af595 
>   core/src/test/scala/unit/kafka/utils/SchedulerTest.scala 
> b364ac2d6d623f6e86805710ca68dc32d92558c1 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> ac15d34425795d5be20c51b01fa1108bdcd66583 
> 
> Diff: https://reviews.apache.org/r/27799/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>

Reply via email to