Re: Review Request 34524: Fix KAFKA-2208
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34524/ --- (Updated June 2, 2015, 9:07 p.m.) Review request for kafka. Bugs: KAFKA-2208 https://issues.apache.org/jira/browse/KAFKA-2208 Repository: kafka Description --- Incorporate Onur's comments Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b2764df11afa7a99fce46d1ff48960d889032d14 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ef9dd5238fbc771496029866ece1d85db6d7b7a5 clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java f548cd0ef70929b35ac887f8fccb7b24c3e2c11a clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java fd9c545c99058ad3fbe3b2c55ea8b6ea002f5a51 core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala af06ad45cdc46ac3bc27898ebc1a5bd5b1c7b19e core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala 47bdfa7cc86fd4e841e2b1d6bfd40f1508e643bd core/src/main/scala/kafka/network/RequestChannel.scala 1d0024c8f0c2ab0efa6d8cfca6455877a6ed8026 core/src/main/scala/kafka/server/KafkaConfig.scala 9efa15ca5567b295ab412ee9eea7c03eb4cdc18b core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala 5c4cca653b3801df3494003cc40a56ae60a789a6 core/src/test/scala/integration/kafka/api/ConsumerTest.scala a1eed965a148eb19d9a6cefbfce131f58aaffc24 core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 8014a5a6c362785539f24eb03d77278434614fe6 Diff: https://reviews.apache.org/r/34524/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 34524: Fix KAFKA-2208
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34524/#review86304 --- Ship it! I brought up a local vagrant cluster of 2 brokers, 1 zk node, and one worker node. The worker node had a producer and consumer. Both brokers were in the broker-list / bootstrap.servers lists for the producer / consumer, respectively. I kill -9'd the broker doing the coordination for the consumer. The consumer was able to switch over its coordination to the other broker. - Onur Karaman On June 2, 2015, 9:07 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34524/ --- (Updated June 2, 2015, 9:07 p.m.) Review request for kafka. Bugs: KAFKA-2208 https://issues.apache.org/jira/browse/KAFKA-2208 Repository: kafka Description --- Incorporate Onur's comments Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b2764df11afa7a99fce46d1ff48960d889032d14 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ef9dd5238fbc771496029866ece1d85db6d7b7a5 clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java f548cd0ef70929b35ac887f8fccb7b24c3e2c11a clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java fd9c545c99058ad3fbe3b2c55ea8b6ea002f5a51 core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala af06ad45cdc46ac3bc27898ebc1a5bd5b1c7b19e core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala 47bdfa7cc86fd4e841e2b1d6bfd40f1508e643bd core/src/main/scala/kafka/network/RequestChannel.scala 1d0024c8f0c2ab0efa6d8cfca6455877a6ed8026 core/src/main/scala/kafka/server/KafkaConfig.scala 9efa15ca5567b295ab412ee9eea7c03eb4cdc18b core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala 5c4cca653b3801df3494003cc40a56ae60a789a6 core/src/test/scala/integration/kafka/api/ConsumerTest.scala a1eed965a148eb19d9a6cefbfce131f58aaffc24 core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 8014a5a6c362785539f24eb03d77278434614fe6 Diff: https://reviews.apache.org/r/34524/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 34524: Fix KAFKA-2208
On June 2, 2015, 12:20 a.m., Onur Karaman wrote: It'd rather avoid mixing coordinator failover optimization logic with this rb. Can you undo the changes in ConsumerCoordinator.scala from line 214 down to the bottom of ConsumerCoordinator.scala? OK agreed. Revert the remove-group logic in maybeRebalance. I kept the function and used it for the other place where we did originally remove the group upon empty member list. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34524/#review86101 --- On June 1, 2015, 12:05 a.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34524/ --- (Updated June 1, 2015, 12:05 a.m.) Review request for kafka. Bugs: KAFKA-2208 https://issues.apache.org/jira/browse/KAFKA-2208 Repository: kafka Description --- Incorporate Onur's comments; add logic for removing the whole group from consumer. Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b2764df11afa7a99fce46d1ff48960d889032d14 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ef9dd5238fbc771496029866ece1d85db6d7b7a5 clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java f548cd0ef70929b35ac887f8fccb7b24c3e2c11a clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java fd9c545c99058ad3fbe3b2c55ea8b6ea002f5a51 core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala af06ad45cdc46ac3bc27898ebc1a5bd5b1c7b19e core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala 47bdfa7cc86fd4e841e2b1d6bfd40f1508e643bd core/src/main/scala/kafka/network/RequestChannel.scala 1d0024c8f0c2ab0efa6d8cfca6455877a6ed8026 core/src/main/scala/kafka/server/KafkaConfig.scala 9efa15ca5567b295ab412ee9eea7c03eb4cdc18b core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala 5c4cca653b3801df3494003cc40a56ae60a789a6 core/src/test/scala/integration/kafka/api/ConsumerTest.scala a1eed965a148eb19d9a6cefbfce131f58aaffc24 core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 8014a5a6c362785539f24eb03d77278434614fe6 Diff: https://reviews.apache.org/r/34524/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 34524: Fix KAFKA-2208
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34524/ --- (Updated June 2, 2015, 5:45 p.m.) Review request for kafka. Bugs: KAFKA-2208 https://issues.apache.org/jira/browse/KAFKA-2208 Repository: kafka Description (updated) --- Incorporate Onur's comments Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b2764df11afa7a99fce46d1ff48960d889032d14 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ef9dd5238fbc771496029866ece1d85db6d7b7a5 clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java f548cd0ef70929b35ac887f8fccb7b24c3e2c11a clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java fd9c545c99058ad3fbe3b2c55ea8b6ea002f5a51 core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala af06ad45cdc46ac3bc27898ebc1a5bd5b1c7b19e core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala 47bdfa7cc86fd4e841e2b1d6bfd40f1508e643bd core/src/main/scala/kafka/network/RequestChannel.scala 1d0024c8f0c2ab0efa6d8cfca6455877a6ed8026 core/src/main/scala/kafka/server/KafkaConfig.scala 9efa15ca5567b295ab412ee9eea7c03eb4cdc18b core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala 5c4cca653b3801df3494003cc40a56ae60a789a6 core/src/test/scala/integration/kafka/api/ConsumerTest.scala a1eed965a148eb19d9a6cefbfce131f58aaffc24 core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 8014a5a6c362785539f24eb03d77278434614fe6 Diff: https://reviews.apache.org/r/34524/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 34524: Fix KAFKA-2208
On May 28, 2015, 11:42 p.m., Onur Karaman wrote: core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 98-133 https://reviews.apache.org/r/34524/diff/2/?file=966038#file966038line98 Let's say a consumer sends a JoinGroupRequest for a new group g and provides his own non-unknown consumer id (it could be a faulty implementation of the new consumer). The Coordinator would notice group == null, make the group, notice that the group doesn't contain the non-unknown consumer id, and then reply with UNKNOWN_CONSUMER_ID. So basically an empty, stable group has been made with no consumers. Good point! Fixed. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34524/#review85641 --- On June 1, 2015, 12:05 a.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34524/ --- (Updated June 1, 2015, 12:05 a.m.) Review request for kafka. Bugs: KAFKA-2208 https://issues.apache.org/jira/browse/KAFKA-2208 Repository: kafka Description --- Incorporate Onur's comments; add logic for removing the whole group from consumer. Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b2764df11afa7a99fce46d1ff48960d889032d14 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ef9dd5238fbc771496029866ece1d85db6d7b7a5 clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java f548cd0ef70929b35ac887f8fccb7b24c3e2c11a clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java fd9c545c99058ad3fbe3b2c55ea8b6ea002f5a51 core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala af06ad45cdc46ac3bc27898ebc1a5bd5b1c7b19e core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala 47bdfa7cc86fd4e841e2b1d6bfd40f1508e643bd core/src/main/scala/kafka/network/RequestChannel.scala 1d0024c8f0c2ab0efa6d8cfca6455877a6ed8026 core/src/main/scala/kafka/server/KafkaConfig.scala 9efa15ca5567b295ab412ee9eea7c03eb4cdc18b core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala 5c4cca653b3801df3494003cc40a56ae60a789a6 core/src/test/scala/integration/kafka/api/ConsumerTest.scala a1eed965a148eb19d9a6cefbfce131f58aaffc24 core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 8014a5a6c362785539f24eb03d77278434614fe6 Diff: https://reviews.apache.org/r/34524/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 34524: Fix KAFKA-2208
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34524/ --- (Updated June 1, 2015, 12:05 a.m.) Review request for kafka. Bugs: KAFKA-2208 https://issues.apache.org/jira/browse/KAFKA-2208 Repository: kafka Description (updated) --- Incorporate Onur's comments; add logic for removing the whole group from consumer. Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b2764df11afa7a99fce46d1ff48960d889032d14 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ef9dd5238fbc771496029866ece1d85db6d7b7a5 clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java f548cd0ef70929b35ac887f8fccb7b24c3e2c11a clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java fd9c545c99058ad3fbe3b2c55ea8b6ea002f5a51 core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala af06ad45cdc46ac3bc27898ebc1a5bd5b1c7b19e core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala 47bdfa7cc86fd4e841e2b1d6bfd40f1508e643bd core/src/main/scala/kafka/network/RequestChannel.scala 1d0024c8f0c2ab0efa6d8cfca6455877a6ed8026 core/src/main/scala/kafka/server/KafkaConfig.scala 9efa15ca5567b295ab412ee9eea7c03eb4cdc18b core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala 5c4cca653b3801df3494003cc40a56ae60a789a6 core/src/test/scala/integration/kafka/api/ConsumerTest.scala a1eed965a148eb19d9a6cefbfce131f58aaffc24 core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 8014a5a6c362785539f24eb03d77278434614fe6 Diff: https://reviews.apache.org/r/34524/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 34524: Fix KAFKA-2208
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34524/#review85641 --- clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java https://reviews.apache.org/r/34524/#comment137304 Should we split this up into two checks? If the response is UNKNOWN_CONSUMER_ID, you might want to additionally reset the consumer id here. I was thinking something like this: ``` } else if (response.errorCode() == Errors.ILLEGAL_GENERATION.code()) { subscriptions.needReassignment(); } else if (response.errorCode() == Errors.UNKNOWN_CONSUMER_ID.code()) { this.consumerId = JoinGroupRequest.UNKNOWN_CONSUMER_ID; subscriptions.needReassignment(); } ``` core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala https://reviews.apache.org/r/34524/#comment137311 Let's say a consumer sends a JoinGroupRequest for a new group g and provides his own non-unknown consumer id (it could be a faulty implementation of the new consumer). The Coordinator would notice group == null, make the group, notice that the group doesn't contain the non-unknown consumer id, and then reply with UNKNOWN_CONSUMER_ID. So basically an empty, stable group has been made with no consumers. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala https://reviews.apache.org/r/34524/#comment137318 Is returning NOT_COORDINATOR_FOR_CONSUMER right? By this point in handleJoinGroup, we've already verified that we are the coordinator for the group. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala https://reviews.apache.org/r/34524/#comment137317 Is returning NOT_COORDINATOR_FOR_CONSUMER right? By this point in handleHeartbeat, we've already verified that we are the coordinator for the group. - Onur Karaman On May 21, 2015, 2:15 a.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34524/ --- (Updated May 21, 2015, 2:15 a.m.) Review request for kafka. Bugs: KAFKA-2208 https://issues.apache.org/jira/browse/KAFKA-2208 Repository: kafka Description --- 1. Add error handling on consumer; 2. Add the max / min consumer session timeout to kafka server configs; 3. Fixed some consumer bouncing tests Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b2764df11afa7a99fce46d1ff48960d889032d14 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ef9dd5238fbc771496029866ece1d85db6d7b7a5 clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java f548cd0ef70929b35ac887f8fccb7b24c3e2c11a clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java fd9c545c99058ad3fbe3b2c55ea8b6ea002f5a51 core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala af06ad45cdc46ac3bc27898ebc1a5bd5b1c7b19e core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala 47bdfa7cc86fd4e841e2b1d6bfd40f1508e643bd core/src/main/scala/kafka/network/RequestChannel.scala 1d0024c8f0c2ab0efa6d8cfca6455877a6ed8026 core/src/main/scala/kafka/server/KafkaConfig.scala 9efa15ca5567b295ab412ee9eea7c03eb4cdc18b core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala 5c4cca653b3801df3494003cc40a56ae60a789a6 core/src/test/scala/integration/kafka/api/ConsumerTest.scala a1eed965a148eb19d9a6cefbfce131f58aaffc24 core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 8014a5a6c362785539f24eb03d77278434614fe6 Diff: https://reviews.apache.org/r/34524/diff/ Testing --- Thanks, Guozhang Wang
Review Request 34524: Fix KAFKA-2208
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34524/ --- Review request for kafka. Bugs: KAFKA-2208 https://issues.apache.org/jira/browse/KAFKA-2208 Repository: kafka Description --- 1. Add error handling on consumer; 2. Add the max / min consumer session timeout to kafka server configs; 3. Fixed some consumer bouncing tests Diffs - clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b2764df11afa7a99fce46d1ff48960d889032d14 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ef9dd5238fbc771496029866ece1d85db6d7b7a5 clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java f548cd0ef70929b35ac887f8fccb7b24c3e2c11a clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java fd9c545c99058ad3fbe3b2c55ea8b6ea002f5a51 core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala af06ad45cdc46ac3bc27898ebc1a5bd5b1c7b19e core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala 47bdfa7cc86fd4e841e2b1d6bfd40f1508e643bd core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala c39e6de34ee531c6dfa9107b830752bd7f8fbe59 core/src/main/scala/kafka/network/RequestChannel.scala 1d0024c8f0c2ab0efa6d8cfca6455877a6ed8026 core/src/main/scala/kafka/server/KafkaConfig.scala 9efa15ca5567b295ab412ee9eea7c03eb4cdc18b core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala 5c4cca653b3801df3494003cc40a56ae60a789a6 core/src/test/scala/integration/kafka/api/ConsumerTest.scala a1eed965a148eb19d9a6cefbfce131f58aaffc24 core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 8014a5a6c362785539f24eb03d77278434614fe6 Diff: https://reviews.apache.org/r/34524/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 34524: Fix KAFKA-2208
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34524/ --- (Updated May 21, 2015, 2:15 a.m.) Review request for kafka. Bugs: KAFKA-2208 https://issues.apache.org/jira/browse/KAFKA-2208 Repository: kafka Description --- 1. Add error handling on consumer; 2. Add the max / min consumer session timeout to kafka server configs; 3. Fixed some consumer bouncing tests Diffs (updated) - clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java b2764df11afa7a99fce46d1ff48960d889032d14 clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java ef9dd5238fbc771496029866ece1d85db6d7b7a5 clients/src/main/java/org/apache/kafka/common/requests/HeartbeatResponse.java f548cd0ef70929b35ac887f8fccb7b24c3e2c11a clients/src/main/java/org/apache/kafka/common/requests/JoinGroupResponse.java fd9c545c99058ad3fbe3b2c55ea8b6ea002f5a51 core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala af06ad45cdc46ac3bc27898ebc1a5bd5b1c7b19e core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala 47bdfa7cc86fd4e841e2b1d6bfd40f1508e643bd core/src/main/scala/kafka/network/RequestChannel.scala 1d0024c8f0c2ab0efa6d8cfca6455877a6ed8026 core/src/main/scala/kafka/server/KafkaConfig.scala 9efa15ca5567b295ab412ee9eea7c03eb4cdc18b core/src/test/scala/integration/kafka/api/ConsumerBounceTest.scala 5c4cca653b3801df3494003cc40a56ae60a789a6 core/src/test/scala/integration/kafka/api/ConsumerTest.scala a1eed965a148eb19d9a6cefbfce131f58aaffc24 core/src/test/scala/unit/kafka/server/KafkaConfigConfigDefTest.scala 8014a5a6c362785539f24eb03d77278434614fe6 Diff: https://reviews.apache.org/r/34524/diff/ Testing --- Thanks, Guozhang Wang