> On Jan. 22, 2015, 7:10 p.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/common/network/Selector.java, line 
> > 198
> > <https://reviews.apache.org/r/27799/diff/4/?file=828402#file828402line198>
> >
> >     Just trying to understand the rationale why we want to special-care for 
> > failures in send() call and leave them as disconnected state at the 
> > beginning of the next poll() call?

The rationale is that the disconnection can either happen prior to the send 
call or after the send call. But how you handle these are pretty much the same. 
So it is easier to just treat them the same, otherwise you need to have the 
same handling logic in two places, once as a try/catch on send and once to 
handle the disconnection response.


> On Jan. 22, 2015, 7:10 p.m., Guozhang Wang wrote:
> > clients/src/main/java/org/apache/kafka/common/utils/Utils.java, lines 
> > 334-336
> > <https://reviews.apache.org/r/27799/diff/4/?file=828426#file828426line334>
> >
> >     Newline here?

Hmm, I don't think we've been very consistent about that to date. If we want to 
get consistent let's do a clean sweep and fix them all. If I change just these 
two methods they will be inconsistent with everything else in Utils.java, and 
if I change that it will still be inconsistent with much of the other code.


> On Jan. 22, 2015, 7:10 p.m., Guozhang Wang wrote:
> > clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java,
> >  lines 285-297
> > <https://reviews.apache.org/r/27799/diff/4/?file=828429#file828429line285>
> >
> >     Is this intentional?

No, totally forgot about that, I think these examples should just be deleted.


> On Jan. 22, 2015, 7:10 p.m., Guozhang Wang wrote:
> > clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java,
> >  lines 45-53
> > <https://reviews.apache.org/r/27799/diff/4/?file=828431#file828431line45>
> >
> >     "expected" usage may be fragile with junit, see KAFKA-1782 for more 
> > details.

Yeah but that is just due to scala stupidity. This is java, so it should work 
fine, right?


> On Jan. 22, 2015, 7:10 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/server/KafkaApis.scala, lines 460-461
> > <https://reviews.apache.org/r/27799/diff/4/?file=828445#file828445line460>
> >
> >     Just realized we might have a mis-match in our design while reading 
> > this code:
> >     
> >     When consumer subscribe (topic-partition), does it still needs to send 
> > a join-group request to the coordinator? If not, then we will not be able 
> > to detect / exclude the case where consumers within the same group 
> > subscribe to both topic-partition and topic and the coordinator will only 
> > try to balance with those only subscribing to topics; if yes then the 
> > join-group request needs to be modified as it only contain topics field 
> > today.

Well the goal of this code was just to give back something to be able to test 
against.

But to answer your question, here is my belief about how it is supposed to work:
1. If you subscribe to topics we check validity on the server side and do all 
the fancy assignment.
2. If you subscribe to particular topic-partitions we don't check anything at 
all. It is up to you. You may have consumers for all partitions, you may not, 
we don't help in any way and there is no join group request. Basically if you 
subscribe to a partitition you are subscribed to that partition, there is 
nothing to check. I think this is actually the right thing as there are many 
possible patterns for partition-level subscription and nothing we can check 
that will be correct or helpful across them.
3. We disallow mixing of partition and topic subscriptions in the same client, 
just for implementation simplicity.

So say you have some code that subscribes to partitions and other code that 
subscribes to topics. The "right" behavior is that the topic subscribers all 
join the group and divide up ALL the partitions amongst themselves. The code 
subscribing to partitions gets whatever it subscribes to, duplicating the topic 
stuff. In practice this is a mistake that is a bit hard to make, I'd imagine.


> On Jan. 22, 2015, 7:10 p.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/tools/ConsumerPerformance.scala, lines 60-64
> > <https://reviews.apache.org/r/27799/diff/5/?file=830061#file830061line60>
> >
> >     The usage of the new consumer here assumes there is only one broker and 
> > hence according to the stub logic it will get all the partitions, which is 
> > a bit risky.

That is why you need to implement the co-ordinator :-)


> On Jan. 22, 2015, 7:10 p.m., Guozhang Wang wrote:
> > core/src/test/scala/integration/kafka/api/ConsumerTest.scala, line 64
> > <https://reviews.apache.org/r/27799/diff/5/?file=830064#file830064line64>
> >
> >     Shall we add the @Test label just in case?

No I don't think so. I think the point you made in the other JIRA was that 
@Test only works for junit 4, junit 3 requires the name test. So adding Junit4 
annotations to junit 3 tests which has no effect will be super confusing, right?


> On Jan. 22, 2015, 7:10 p.m., Guozhang Wang wrote:
> > core/src/test/scala/integration/kafka/api/ConsumerTest.scala, lines 158-180
> > <https://reviews.apache.org/r/27799/diff/5/?file=830064#file830064line158>
> >
> >     I think a more comprehensive test will be running the producer / 
> > consumer in background threads while the main thread will just iterate over 
> > killing / restarting brokers, as with this we are assured at least enough 
> > iterations will be executed before all produced records get consumed.

Hmm, not sure I agree. Those threaded tests are insane to read and debug. I 
really really think we need to focus on maintainability/debugability of tests.

I actually think this catches all the cases, right? You don't actually care 
that the disconnect and execution happens in parallel or not.

Also not sure that helps the number of iterations which is really determined by 
the number of messages. That is parameterized already.


> On Jan. 22, 2015, 7:10 p.m., Guozhang Wang wrote:
> > core/src/test/scala/integration/kafka/api/ConsumerTest.scala, lines 268-270
> > <https://reviews.apache.org/r/27799/diff/5/?file=830064#file830064line268>
> >
> >     If there is not enough records then this will be blocked forever, so 
> > shall we add a timeout config and fail the test if timeout is reached?

Yeah but I mean we have this all over where the failure case is to hang, right? 
It isn't ideal but the fix tends to be pretty convoluted, and it is always 
clear the problem since the test that hangs is the failing test.


> On Jan. 22, 2015, 7:10 p.m., Guozhang Wang wrote:
> > core/src/test/scala/unit/kafka/utils/TestUtils.scala, lines 733-743
> > <https://reviews.apache.org/r/27799/diff/5/?file=830071#file830071line733>
> >
> >     Is this the same as createTopic in line 172?

Yeah, weird, I swear I actually didn't add that method. The code style isn't 
even mine (e.g. I would never never put spaces after the paren in a for loop). 
Yet there it is. Maybe it came with the patch that added the request that I 
built off of...dunno. Anyhow it isn't used so I'll delete it.


- Jay


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/27799/#review68947
-----------------------------------------------------------


On Jan. 22, 2015, 6:06 p.m., Jay Kreps wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/27799/
> -----------------------------------------------------------
> 
> (Updated Jan. 22, 2015, 6:06 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1760
>     https://issues.apache.org/jira/browse/KAFKA-1760
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> New consumer.
> 
> Addressed second round of comments, rebased again.
> 
> 
> Diffs
> -----
> 
>   build.gradle 1cbab29ce83e20dae0561b51eed6fdb86d522f28 
>   clients/src/main/java/org/apache/kafka/clients/ClientRequest.java 
> d32c319d8ee4c46dad309ea54b136ea9798e2fd7 
>   clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java 
> 8aece7e81a804b177a6f2c12e2dc6c89c1613262 
>   clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/ConnectionState.java 
> ab7e3220f9b76b92ef981d695299656f041ad5ed 
>   clients/src/main/java/org/apache/kafka/clients/KafkaClient.java 
> 397695568d3fd8e835d8f923a89b3b00c96d0ead 
>   clients/src/main/java/org/apache/kafka/clients/NetworkClient.java 
> 6746275d0b2596cd6ff7ce464a3a8225ad75ef00 
>   
> clients/src/main/java/org/apache/kafka/clients/RequestCompletionHandler.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/CommitType.java 
> PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/Consumer.java 
> c0c636b3e1ba213033db6d23655032c9bbd5e378 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java 
> 57c1807ccba9f264186f83e91f37c34b959c8060 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRebalanceCallback.java
>  e4cf7d1cfa01c2844b53213a7b539cdcbcbeaa3a 
>   clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecord.java 
> 16af70a5de52cca786fdea147a6a639b7dc4a311 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerRecords.java 
> bdf4b26942d5a8c8a9503e05908e9a9eff6228a7 
>   clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java 
> 76efc216c9e6c3ab084461d792877092a189ad0f 
>   clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java 
> fa88ac1a8b19b4294f211c4467fe68c7707ddbae 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/NoOffsetForPartitionException.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/consumer/OffsetMetadata.java 
> ea423ad15eebd262d20d5ec05d592cc115229177 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Heartbeat.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/NoOpConsumerRebalanceCallback.java
>  PRE-CREATION 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java
>  PRE-CREATION 
>   clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
> fc71710dd5997576d3841a1c3b0f7e19a8c9698e 
>   clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
> 904976fadf0610982958628eaee810b60a98d725 
>   clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java 
> 8b3e565edd1ae04d8d34bd9f1a41e9fa8c880a75 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Metadata.java
>  dcf46581b912cfb1b5c8d4cbc293d2d1444b7740 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java
>  483899d2e69b33655d0e08949f5f64af2519660a 
>   
> clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
> ccc03d8447ebba40131a70e16969686ac4aab58a 
>   clients/src/main/java/org/apache/kafka/common/Cluster.java 
> d3299b944062d96852452de455902659ad8af757 
>   clients/src/main/java/org/apache/kafka/common/PartitionInfo.java 
> b15aa2c3ef2d7c4b24618ff42fd4da324237a813 
>   clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java 
> 98cb79b701918eca3f6ca9823b6c7b7c97b3ecec 
>   clients/src/main/java/org/apache/kafka/common/errors/ApiException.java 
> 7c948b166a8ac07616809f260754116ae7764973 
>   clients/src/main/java/org/apache/kafka/common/network/Selectable.java 
> b68bbf00ab8eba6c5867d346c91188142593ca6e 
>   clients/src/main/java/org/apache/kafka/common/network/Selector.java 
> 74d695ba39de44b6a3d15340ec0114bc4fce2ba2 
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
> 3316b6a1098311b8603a4a5893bf57b75d2e43cb 
>   clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java 
> 121e880a941fcd3e6392859edba11a94236494cc 
>   clients/src/main/java/org/apache/kafka/common/record/LogEntry.java 
> e4d688cbe0c61b74ea15fc8dd3b634f9e5ee9b83 
>   clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
> 040e5b91005edb8f015afdfa76fd94e0bf3cb4ca 
>   
> clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataRequest.java
>  99b52c23d639df010bf2affc0f79d1c6e16ed67c 
>   
> clients/src/main/java/org/apache/kafka/common/requests/ConsumerMetadataResponse.java
>  8b8f591c4b2802a9cbbe34746c0b3ca4a64a8681 
>   clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java 
> 2fc471f64f4352eeb128bbd3941779780076fb8c 
>   clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java 
> f719010119951e55795a583796a93a251e1ba404 
>   
> clients/src/main/java/org/apache/kafka/common/requests/HeartbeatRequest.java 
> 9512db2365d8d1f7a9a9e93b2bebabd877881143 
>   
> clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java 
> 8997ffc44c18b7ae14365161c04eb423793cb8c9 
>   
> clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java 
> d6e91f393179809f72d52028d6a8a2b2bb43b820 
>   
> clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java 
> efe89796a7bd0d1cabc138f695b0eb28007663a7 
>   
> clients/src/main/java/org/apache/kafka/common/requests/ListOffsetRequest.java 
> 99364c1ca464f7b81be7d3da15b40ab66717a659 
>   
> clients/src/main/java/org/apache/kafka/common/requests/ListOffsetResponse.java
>  ac239712f11848755cc01399b4027ea0a8d7d8ac 
>   clients/src/main/java/org/apache/kafka/common/requests/MetadataRequest.java 
> b22ca1dce65f665d84c2a980fd82f816e93d9960 
>   
> clients/src/main/java/org/apache/kafka/common/requests/MetadataResponse.java 
> d97962d3840179b1abf01459522c58e59102ac8d 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitRequest.java
>  3ee5cbad55ce836fd04bb954dcf6ef2f9bc3288f 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetCommitResponse.java
>  711232ac613786392f1389f2f883818bf4ac2a33 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchRequest.java
>  90d5135b97a44d1181bd595c8eab2f2aef1a728a 
>   
> clients/src/main/java/org/apache/kafka/common/requests/OffsetFetchResponse.java
>  6b7c269ad7679df57c6bd505516075add39b7534 
>   clients/src/main/java/org/apache/kafka/common/requests/ProduceRequest.java 
> 3dbba8a360f11b2b1db259ecce11e15d8eeaccfb 
>   clients/src/main/java/org/apache/kafka/common/requests/ProduceResponse.java 
> 5220464913e6e82734e7f9ff886be7fdfa80361f 
>   
> clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java 
> 3c001d33091c0f04ac3bf49a6731ab9e9f2bb0c4 
>   clients/src/main/java/org/apache/kafka/common/utils/Utils.java 
> 527dd0f9c47fce7310b7a37a9b95bf87f1b9c292 
>   clients/src/test/java/org/apache/kafka/clients/MockClient.java 
> 47b5d4ac1f2a5d162541cab658add9cc40b3aa4e 
>   clients/src/test/java/org/apache/kafka/clients/NetworkClientTest.java 
> 1a55242e9399fa4669630b55110d530f954e1279 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/ConsumerExampleTest.java
>  29ad25e90606f065b409bc194ef9200189933aa6 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/MockConsumerTest.java 
> PRE-CREATION 
>   
> clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java
>  PRE-CREATION 
>   clients/src/test/java/org/apache/kafka/clients/producer/BufferPoolTest.java 
> 12368038e1381f89352a5a1c98b2e0f8cbc04cbd 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 
> 3676b05eb8b83bbd8a38754fc3a979124908281b 
>   
> clients/src/test/java/org/apache/kafka/clients/producer/PartitionerTest.java 
> 1d077fd4c56bf28557a0ca1e15c82cc427f36e6c 
>   clients/src/test/java/org/apache/kafka/clients/producer/SenderTest.java 
> 66cbdf5babed3ac27c5254d945372644a053df87 
>   clients/src/test/java/org/apache/kafka/common/config/ConfigDefTest.java 
> 3c442a27a7ba3eaa908becc802aab6242594df85 
>   clients/src/test/java/org/apache/kafka/common/network/SelectorTest.java 
> 74c19573a29b0d5ac4264e1430904dca52571404 
>   clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 
> a39fab532f73148316a56c0f8e9197f38ea66f79 
>   clients/src/test/java/org/apache/kafka/test/MockSelector.java 
> d61de529173314c044261ad9662bec735d67e97f 
>   core/src/main/scala/kafka/api/ConsumerMetadataRequest.scala 
> 6d00ed090d76cd7925621a9c6db8fb00fb9d48f4 
>   core/src/main/scala/kafka/api/ConsumerMetadataResponse.scala 
> 84f60178f6ebae735c8aa3e79ed93fe21ac4aea7 
>   core/src/main/scala/kafka/cluster/Partition.scala 
> b230e9a1fb1a3161f4c9d164e4890a16eceb2ad4 
>   core/src/main/scala/kafka/controller/ControllerChannelManager.scala 
> eb492f00449744bc8d63f55b393e2a1659d38454 
>   core/src/main/scala/kafka/log/LogConfig.scala 
> 4631bc78106e6167ab6c0a781f40dcd8cd265598 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> ec8d9f7ba44741db40875458bd524c4062ad6a26 
>   core/src/main/scala/kafka/server/ReplicaManager.scala 
> e58fbb922e93b0c31dff04f187fcadb4ece986d7 
>   core/src/main/scala/kafka/tools/ConsoleConsumer.scala 
> e455cb9a1de221a6d080e3abd909ea8f24ff7fe9 
>   core/src/main/scala/kafka/tools/ConsumerPerformance.scala 
> 093c800ea7f8a9c972bb66e99ac4e4d431cf11cc 
>   core/src/main/scala/kafka/tools/SimpleConsumerPerformance.scala 
> 7602b8d705970a5dab49ed36d117346a960701ac 
>   core/src/main/scala/kafka/utils/KafkaScheduler.scala 
> 9a16343d2ff7192b741f0b23a6bdf58d8f2bbd3e 
>   core/src/test/scala/integration/kafka/api/ConsumerTest.scala PRE-CREATION 
>   core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala 
> PRE-CREATION 
>   core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala 
> cd16ced5465d098be7a60498326b2a98c248f343 
>   core/src/test/scala/unit/kafka/integration/KafkaServerTestHarness.scala 
> 3cf7c9bcd64492d05590067a8ad11d31096a8e5e 
>   core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala 
> a5386a03b62956bc440b40783247c8cdf7432315 
>   core/src/test/scala/unit/kafka/utils/MockScheduler.scala 
> d5896ed4d3b73aecb652436b5dfc80c2835af595 
>   core/src/test/scala/unit/kafka/utils/SchedulerTest.scala 
> b364ac2d6d623f6e86805710ca68dc32d92558c1 
>   core/src/test/scala/unit/kafka/utils/TestUtils.scala 
> ac15d34425795d5be20c51b01fa1108bdcd66583 
> 
> Diff: https://reviews.apache.org/r/27799/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Jay Kreps
> 
>

Reply via email to