Re: Review Request 34450: Fix KAFKA-2017; rebased
On May 21, 2015, 12:16 a.m., Jun Rao wrote: core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 102-106 https://reviews.apache.org/r/34450/diff/2/?file=965426#file965426line102 Another way to do this is to only load from ZK on the becoming leader event for an offsetTopic partition. Then, we don't have to read from ZK during join group, which will introduce unnecessary overhead when joining a new group. I thought about this while working on the patch. The reason I feel it may not worth doing the loading thing upon become-leader is that: 1. When we are loading from ZK, we probably need to still reject any join-group request which is not loaded yet, like what we did in offset manager; this will introduce two more round trips (one for rediscover coordinator and one for another join-group, unless we introduce a separate loading in progress error code, then we can reduce it to one) compared with loading from ZK on the fly, which is just one ZK read. 2. It is likely that we only need to load from ZK once for each group, upon the first join-group request received (when two join requests are received at the same time we may need to unnecessarily read twice). And hence the latency overhead is not much compared with loading-all-at-once. The only concern is that it will slow down all handler threads a little bit when coordinator migration happens instead of taking one thread for reading all the ZK paths, which I feel is OK. On May 21, 2015, 12:16 a.m., Jun Rao wrote: core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 369-381 https://reviews.apache.org/r/34450/diff/2/?file=965426#file965426line369 I was thinking whether it's worth including the leader epoch (of the corresponding offset topic partition) in the ZK value as we did for leaderAndIsr to prevent a zombie consumer coordinator from overwriting the value, during a soft failure. I am not sure if it's worth doing this immediately because 1. When this happens, consumers can still recover after the heartbeat fails. 2. It seems that doing this right is a bit more complicated. We need to keep the leader epoch in the ZK value. However, during a leader change, we probably need to update the values in ZK with the new leader epoch as well, in order to truely prevent the zombie coordinator from overwriting the value. So, I think for now, we can just use the simple approach in this patch. I think this is handled by the generation id, which is ever increasing, and coordinator writing to ZK must have its generation id = ZK value + 1. One caveat though, is that when a group is empty we will remove it from ZK and when it appears again we will take it as a new group with generation id resetting to 1. Then a zombie coordinator happen to hold the right generation id after resetting maybe able to override. For this case we can create another JIRA. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34450/#review84604 --- On May 20, 2015, 4:13 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34450/ --- (Updated May 20, 2015, 4:13 p.m.) Review request for kafka. Bugs: KAFKA-2017 https://issues.apache.org/jira/browse/KAFKA-2017 Repository: kafka Description --- 1. Upon receiving join-group, if the group metadata cannot be found in the local cache try to read it from ZK; 2. Upon completing rebalance, update the ZK with new group registry or delete the registry if the group becomes empty Diffs - core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala af06ad45cdc46ac3bc27898ebc1a5bd5b1c7b19e core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala 47bdfa7cc86fd4e841e2b1d6bfd40f1508e643bd core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala c39e6de34ee531c6dfa9107b830752bd7f8fbe59 core/src/main/scala/kafka/utils/ZkUtils.scala 2618dd39b925b979ad6e4c0abd5c6eaafb3db5d5 Diff: https://reviews.apache.org/r/34450/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 34450: Fix KAFKA-2017; rebased
On May 20, 2015, 5:15 p.m., Onur Karaman wrote: I only did a brief skim. This optimization tries to switch consumers over to a new coordinator without a rebalance. From my understanding, the consumers would detect a coordinator failure, discover the new coordinator to work with, and try heartbeating that new coordinator withouth a rebalance. So it seems to me that putting the logic in handleJoinGroup isn't right, as the rebalance is what we're trying to avoid. The code should be in handleHeartbeat. It should lookup zk for the group info, add it to CoordinatorMetadata, and start up a DelayedHeartbeat for every consumer of that group. **More importantly: given that this is just an optimization, and we haven't even seen the performance hit without this, I think KAFKA-2017 should be very low priority.** The following are higher priority: 1. Getting the consumer to properly handle error codes of the join group and heartbeat responses. 2. Getting the consumer to detect coordinator failures and switch over to another coordinator (my KAFKA-1334 patch just had the coordinator detect consumer failures). A nice benefit of completing this first is that if we decide that the rebalances on coordinator failover are an actual issue, this would greatly facilitate testing any coordinator failover logic. Right now, it's unclear how this rb's logic can be tested. I added a ticket for 2: [KAFKA-2208](https://issues.apache.org/jira/browse/KAFKA-2208) - Onur --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34450/#review84539 --- On May 20, 2015, 4:13 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34450/ --- (Updated May 20, 2015, 4:13 p.m.) Review request for kafka. Bugs: KAFKA-2017 https://issues.apache.org/jira/browse/KAFKA-2017 Repository: kafka Description --- 1. Upon receiving join-group, if the group metadata cannot be found in the local cache try to read it from ZK; 2. Upon completing rebalance, update the ZK with new group registry or delete the registry if the group becomes empty Diffs - core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala af06ad45cdc46ac3bc27898ebc1a5bd5b1c7b19e core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala 47bdfa7cc86fd4e841e2b1d6bfd40f1508e643bd core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala c39e6de34ee531c6dfa9107b830752bd7f8fbe59 core/src/main/scala/kafka/utils/ZkUtils.scala 2618dd39b925b979ad6e4c0abd5c6eaafb3db5d5 Diff: https://reviews.apache.org/r/34450/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 34450: Fix KAFKA-2017; rebased
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34450/ --- (Updated May 20, 2015, 4:13 p.m.) Review request for kafka. Summary (updated) - Fix KAFKA-2017; rebased Bugs: KAFKA-2017 https://issues.apache.org/jira/browse/KAFKA-2017 Repository: kafka Description --- 1. Upon receiving join-group, if the group metadata cannot be found in the local cache try to read it from ZK; 2. Upon completing rebalance, update the ZK with new group registry or delete the registry if the group becomes empty Diffs (updated) - core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala af06ad45cdc46ac3bc27898ebc1a5bd5b1c7b19e core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala 47bdfa7cc86fd4e841e2b1d6bfd40f1508e643bd core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala c39e6de34ee531c6dfa9107b830752bd7f8fbe59 core/src/main/scala/kafka/utils/ZkUtils.scala 2618dd39b925b979ad6e4c0abd5c6eaafb3db5d5 Diff: https://reviews.apache.org/r/34450/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 34450: Fix KAFKA-2017; rebased
On May 20, 2015, 5:15 p.m., Onur Karaman wrote: I only did a brief skim. This optimization tries to switch consumers over to a new coordinator without a rebalance. From my understanding, the consumers would detect a coordinator failure, discover the new coordinator to work with, and try heartbeating that new coordinator withouth a rebalance. So it seems to me that putting the logic in handleJoinGroup isn't right, as the rebalance is what we're trying to avoid. The code should be in handleHeartbeat. It should lookup zk for the group info, add it to CoordinatorMetadata, and start up a DelayedHeartbeat for every consumer of that group. **More importantly: given that this is just an optimization, and we haven't even seen the performance hit without this, I think KAFKA-2017 should be very low priority.** The following are higher priority: 1. Getting the consumer to properly handle error codes of the join group and heartbeat responses. 2. Getting the consumer to detect coordinator failures and switch over to another coordinator (my KAFKA-1334 patch just had the coordinator detect consumer failures). A nice benefit of completing this first is that if we decide that the rebalances on coordinator failover are an actual issue, this would greatly facilitate testing any coordinator failover logic. Right now, it's unclear how this rb's logic can be tested. Onur Karaman wrote: I added a ticket for 2: [KAFKA-2208](https://issues.apache.org/jira/browse/KAFKA-2208) Thanks for the prompt response Onur! 1. I think I agree with about the priority, and also I agree that the getting group logic should not be in join but rather heartbeat. 2. About consumer side failure detection, actually the consumer would not use heartbeat expiration to detect coordinator failure, but would only mark the current coordinator as dead upon disconnection / error-code reception. - Guozhang --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34450/#review84539 --- On May 20, 2015, 4:13 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34450/ --- (Updated May 20, 2015, 4:13 p.m.) Review request for kafka. Bugs: KAFKA-2017 https://issues.apache.org/jira/browse/KAFKA-2017 Repository: kafka Description --- 1. Upon receiving join-group, if the group metadata cannot be found in the local cache try to read it from ZK; 2. Upon completing rebalance, update the ZK with new group registry or delete the registry if the group becomes empty Diffs - core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala af06ad45cdc46ac3bc27898ebc1a5bd5b1c7b19e core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala 47bdfa7cc86fd4e841e2b1d6bfd40f1508e643bd core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala c39e6de34ee531c6dfa9107b830752bd7f8fbe59 core/src/main/scala/kafka/utils/ZkUtils.scala 2618dd39b925b979ad6e4c0abd5c6eaafb3db5d5 Diff: https://reviews.apache.org/r/34450/diff/ Testing --- Thanks, Guozhang Wang
Re: Review Request 34450: Fix KAFKA-2017; rebased
--- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34450/#review84604 --- Thanks for the patch. A few comments below. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala https://reviews.apache.org/r/34450/#comment135934 Another way to do this is to only load from ZK on the becoming leader event for an offsetTopic partition. Then, we don't have to read from ZK during join group, which will introduce unnecessary overhead when joining a new group. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala https://reviews.apache.org/r/34450/#comment135970 I was thinking whether it's worth including the leader epoch (of the corresponding offset topic partition) in the ZK value as we did for leaderAndIsr to prevent a zombie consumer coordinator from overwriting the value, during a soft failure. I am not sure if it's worth doing this immediately because 1. When this happens, consumers can still recover after the heartbeat fails. 2. It seems that doing this right is a bit more complicated. We need to keep the leader epoch in the ZK value. However, during a leader change, we probably need to update the values in ZK with the new leader epoch as well, in order to truely prevent the zombie coordinator from overwriting the value. So, I think for now, we can just use the simple approach in this patch. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala https://reviews.apache.org/r/34450/#comment135961 Should we encode topics as an array of String? core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala https://reviews.apache.org/r/34450/#comment135956 updatePersistentPath already handles the case when the node doesn't exist. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala https://reviews.apache.org/r/34450/#comment135959 Not sure if we need to wrap unexpected exceptions. The callers already handles unexpected exceptions. core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala https://reviews.apache.org/r/34450/#comment135963 Should we auto fix this or throw a KafkaException here? If we do auto fix this, we probably should log a warning since this is not expected. Ditto for the handling of partitionStrategy. core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala https://reviews.apache.org/r/34450/#comment135965 Could we just do groups(group.groupId)? core/src/main/scala/kafka/utils/ZkUtils.scala https://reviews.apache.org/r/34450/#comment135967 This doesn't seem to be used. - Jun Rao On May 20, 2015, 4:13 p.m., Guozhang Wang wrote: --- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/34450/ --- (Updated May 20, 2015, 4:13 p.m.) Review request for kafka. Bugs: KAFKA-2017 https://issues.apache.org/jira/browse/KAFKA-2017 Repository: kafka Description --- 1. Upon receiving join-group, if the group metadata cannot be found in the local cache try to read it from ZK; 2. Upon completing rebalance, update the ZK with new group registry or delete the registry if the group becomes empty Diffs - core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala af06ad45cdc46ac3bc27898ebc1a5bd5b1c7b19e core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala 47bdfa7cc86fd4e841e2b1d6bfd40f1508e643bd core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala c39e6de34ee531c6dfa9107b830752bd7f8fbe59 core/src/main/scala/kafka/utils/ZkUtils.scala 2618dd39b925b979ad6e4c0abd5c6eaafb3db5d5 Diff: https://reviews.apache.org/r/34450/diff/ Testing --- Thanks, Guozhang Wang