Re: Review Request 34450: Fix KAFKA-2017; rebased

2015-05-21 Thread Guozhang Wang


 On May 21, 2015, 12:16 a.m., Jun Rao wrote:
  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 
  102-106
  https://reviews.apache.org/r/34450/diff/2/?file=965426#file965426line102
 
  Another way to do this is to only load from ZK on the becoming leader 
  event for an offsetTopic partition. Then, we don't have to read from ZK 
  during join group, which will introduce unnecessary overhead when joining a 
  new group.

I thought about this while working on the patch. The reason I feel it may not 
worth doing the loading thing upon become-leader is that:

1. When we are loading from ZK, we probably need to still reject any join-group 
request which is not loaded yet, like what we did in offset manager; this will 
introduce two more round trips (one for rediscover coordinator and one for 
another join-group, unless we introduce a separate loading in progress error 
code, then we can reduce it to one) compared with loading from ZK on the fly, 
which is just one ZK read.

2. It is likely that we only need to load from ZK once for each group, upon the 
first join-group request received (when two join requests are received at the 
same time we may need to unnecessarily read twice). And hence the latency 
overhead is not much compared with loading-all-at-once. The only concern is 
that it will slow down all handler threads a little bit when coordinator 
migration happens instead of taking one thread for reading all the ZK paths, 
which I feel is OK.


 On May 21, 2015, 12:16 a.m., Jun Rao wrote:
  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 
  369-381
  https://reviews.apache.org/r/34450/diff/2/?file=965426#file965426line369
 
  I was thinking whether it's worth including the leader epoch (of the 
  corresponding offset topic partition) in the ZK value as we did for 
  leaderAndIsr to prevent a zombie consumer coordinator from overwriting the 
  value, during a soft failure. I am not sure if it's worth doing this 
  immediately because
  
  1. When this happens, consumers can still recover after the heartbeat 
  fails.
  2. It seems that doing this right is a bit more complicated. We need to 
  keep the leader epoch in the ZK value. However, during a leader change, we 
  probably need to update the values in ZK with the new leader epoch as well, 
  in order to truely prevent the zombie coordinator from overwriting the 
  value.
  
  So, I think for now, we can just use the simple approach in this patch.

I think this is handled by the generation id, which is ever increasing, and 
coordinator writing to ZK must have its generation id = ZK value + 1.

One caveat though, is that when a group is empty we will remove it from ZK and 
when it appears again we will take it as a new group with generation id 
resetting to 1. Then a zombie coordinator happen to hold the right generation 
id after resetting maybe able to override. For this case we can create another 
JIRA.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34450/#review84604
---


On May 20, 2015, 4:13 p.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34450/
 ---
 
 (Updated May 20, 2015, 4:13 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2017
 https://issues.apache.org/jira/browse/KAFKA-2017
 
 
 Repository: kafka
 
 
 Description
 ---
 
 1. Upon receiving join-group, if the group metadata cannot be found in the 
 local cache try to read it from ZK; 2. Upon completing rebalance, update the 
 ZK with new group registry or delete the registry if the group becomes empty
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
 af06ad45cdc46ac3bc27898ebc1a5bd5b1c7b19e 
   core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala 
 47bdfa7cc86fd4e841e2b1d6bfd40f1508e643bd 
   core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala 
 c39e6de34ee531c6dfa9107b830752bd7f8fbe59 
   core/src/main/scala/kafka/utils/ZkUtils.scala 
 2618dd39b925b979ad6e4c0abd5c6eaafb3db5d5 
 
 Diff: https://reviews.apache.org/r/34450/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 34450: Fix KAFKA-2017; rebased

2015-05-20 Thread Onur Karaman


 On May 20, 2015, 5:15 p.m., Onur Karaman wrote:
  I only did a brief skim. This optimization tries to switch consumers over 
  to a new coordinator without a rebalance. From my understanding, the 
  consumers would detect a coordinator failure, discover the new coordinator 
  to work with, and try heartbeating that new coordinator withouth a 
  rebalance.
  
  So it seems to me that putting the logic in handleJoinGroup isn't right, as 
  the rebalance is what we're trying to avoid. The code should be in 
  handleHeartbeat. It should lookup zk for the group info, add it to 
  CoordinatorMetadata, and start up a DelayedHeartbeat for every consumer of 
  that group.
  
  **More importantly: given that this is just an optimization, and we haven't 
  even seen the performance hit without this, I think KAFKA-2017 should be 
  very low priority.**
  
  The following are higher priority:
  1. Getting the consumer to properly handle error codes of the join group 
  and heartbeat responses.
  2. Getting the consumer to detect coordinator failures and switch over to 
  another coordinator (my KAFKA-1334 patch just had the coordinator detect 
  consumer failures). A nice benefit of completing this first is that if we 
  decide that the rebalances on coordinator failover are an actual issue, 
  this would greatly facilitate testing any coordinator failover logic. Right 
  now, it's unclear how this rb's logic can be tested.

I added a ticket for 2: 
[KAFKA-2208](https://issues.apache.org/jira/browse/KAFKA-2208)


- Onur


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34450/#review84539
---


On May 20, 2015, 4:13 p.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34450/
 ---
 
 (Updated May 20, 2015, 4:13 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2017
 https://issues.apache.org/jira/browse/KAFKA-2017
 
 
 Repository: kafka
 
 
 Description
 ---
 
 1. Upon receiving join-group, if the group metadata cannot be found in the 
 local cache try to read it from ZK; 2. Upon completing rebalance, update the 
 ZK with new group registry or delete the registry if the group becomes empty
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
 af06ad45cdc46ac3bc27898ebc1a5bd5b1c7b19e 
   core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala 
 47bdfa7cc86fd4e841e2b1d6bfd40f1508e643bd 
   core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala 
 c39e6de34ee531c6dfa9107b830752bd7f8fbe59 
   core/src/main/scala/kafka/utils/ZkUtils.scala 
 2618dd39b925b979ad6e4c0abd5c6eaafb3db5d5 
 
 Diff: https://reviews.apache.org/r/34450/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 34450: Fix KAFKA-2017; rebased

2015-05-20 Thread Guozhang Wang

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34450/
---

(Updated May 20, 2015, 4:13 p.m.)


Review request for kafka.


Summary (updated)
-

Fix KAFKA-2017; rebased


Bugs: KAFKA-2017
https://issues.apache.org/jira/browse/KAFKA-2017


Repository: kafka


Description
---

1. Upon receiving join-group, if the group metadata cannot be found in the 
local cache try to read it from ZK; 2. Upon completing rebalance, update the ZK 
with new group registry or delete the registry if the group becomes empty


Diffs (updated)
-

  core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
af06ad45cdc46ac3bc27898ebc1a5bd5b1c7b19e 
  core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala 
47bdfa7cc86fd4e841e2b1d6bfd40f1508e643bd 
  core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala 
c39e6de34ee531c6dfa9107b830752bd7f8fbe59 
  core/src/main/scala/kafka/utils/ZkUtils.scala 
2618dd39b925b979ad6e4c0abd5c6eaafb3db5d5 

Diff: https://reviews.apache.org/r/34450/diff/


Testing
---


Thanks,

Guozhang Wang



Re: Review Request 34450: Fix KAFKA-2017; rebased

2015-05-20 Thread Guozhang Wang


 On May 20, 2015, 5:15 p.m., Onur Karaman wrote:
  I only did a brief skim. This optimization tries to switch consumers over 
  to a new coordinator without a rebalance. From my understanding, the 
  consumers would detect a coordinator failure, discover the new coordinator 
  to work with, and try heartbeating that new coordinator withouth a 
  rebalance.
  
  So it seems to me that putting the logic in handleJoinGroup isn't right, as 
  the rebalance is what we're trying to avoid. The code should be in 
  handleHeartbeat. It should lookup zk for the group info, add it to 
  CoordinatorMetadata, and start up a DelayedHeartbeat for every consumer of 
  that group.
  
  **More importantly: given that this is just an optimization, and we haven't 
  even seen the performance hit without this, I think KAFKA-2017 should be 
  very low priority.**
  
  The following are higher priority:
  1. Getting the consumer to properly handle error codes of the join group 
  and heartbeat responses.
  2. Getting the consumer to detect coordinator failures and switch over to 
  another coordinator (my KAFKA-1334 patch just had the coordinator detect 
  consumer failures). A nice benefit of completing this first is that if we 
  decide that the rebalances on coordinator failover are an actual issue, 
  this would greatly facilitate testing any coordinator failover logic. Right 
  now, it's unclear how this rb's logic can be tested.
 
 Onur Karaman wrote:
 I added a ticket for 2: 
 [KAFKA-2208](https://issues.apache.org/jira/browse/KAFKA-2208)

Thanks for the prompt response Onur!

1. I think I agree with about the priority, and also I agree that the getting 
group logic should not be in join but rather heartbeat.

2. About consumer side failure detection, actually the consumer would not use 
heartbeat expiration to detect coordinator failure, but would only mark the 
current coordinator as dead upon disconnection / error-code reception.


- Guozhang


---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34450/#review84539
---


On May 20, 2015, 4:13 p.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34450/
 ---
 
 (Updated May 20, 2015, 4:13 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2017
 https://issues.apache.org/jira/browse/KAFKA-2017
 
 
 Repository: kafka
 
 
 Description
 ---
 
 1. Upon receiving join-group, if the group metadata cannot be found in the 
 local cache try to read it from ZK; 2. Upon completing rebalance, update the 
 ZK with new group registry or delete the registry if the group becomes empty
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
 af06ad45cdc46ac3bc27898ebc1a5bd5b1c7b19e 
   core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala 
 47bdfa7cc86fd4e841e2b1d6bfd40f1508e643bd 
   core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala 
 c39e6de34ee531c6dfa9107b830752bd7f8fbe59 
   core/src/main/scala/kafka/utils/ZkUtils.scala 
 2618dd39b925b979ad6e4c0abd5c6eaafb3db5d5 
 
 Diff: https://reviews.apache.org/r/34450/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang
 




Re: Review Request 34450: Fix KAFKA-2017; rebased

2015-05-20 Thread Jun Rao

---
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34450/#review84604
---


Thanks for the patch. A few comments below.


core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
https://reviews.apache.org/r/34450/#comment135934

Another way to do this is to only load from ZK on the becoming leader event 
for an offsetTopic partition. Then, we don't have to read from ZK during join 
group, which will introduce unnecessary overhead when joining a new group.



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
https://reviews.apache.org/r/34450/#comment135970

I was thinking whether it's worth including the leader epoch (of the 
corresponding offset topic partition) in the ZK value as we did for 
leaderAndIsr to prevent a zombie consumer coordinator from overwriting the 
value, during a soft failure. I am not sure if it's worth doing this 
immediately because

1. When this happens, consumers can still recover after the heartbeat fails.
2. It seems that doing this right is a bit more complicated. We need to 
keep the leader epoch in the ZK value. However, during a leader change, we 
probably need to update the values in ZK with the new leader epoch as well, in 
order to truely prevent the zombie coordinator from overwriting the value.

So, I think for now, we can just use the simple approach in this patch.



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
https://reviews.apache.org/r/34450/#comment135961

Should we encode topics as an array of String?



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
https://reviews.apache.org/r/34450/#comment135956

updatePersistentPath already handles the case when the node doesn't exist.



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
https://reviews.apache.org/r/34450/#comment135959

Not sure if we need to wrap unexpected exceptions. The callers already 
handles unexpected exceptions.



core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
https://reviews.apache.org/r/34450/#comment135963

Should we auto fix this or throw a KafkaException here? If we do auto fix 
this, we probably should log a warning since this is not expected. Ditto for 
the handling of partitionStrategy.



core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
https://reviews.apache.org/r/34450/#comment135965

Could we just do groups(group.groupId)?



core/src/main/scala/kafka/utils/ZkUtils.scala
https://reviews.apache.org/r/34450/#comment135967

This doesn't seem to be used.


- Jun Rao


On May 20, 2015, 4:13 p.m., Guozhang Wang wrote:
 
 ---
 This is an automatically generated e-mail. To reply, visit:
 https://reviews.apache.org/r/34450/
 ---
 
 (Updated May 20, 2015, 4:13 p.m.)
 
 
 Review request for kafka.
 
 
 Bugs: KAFKA-2017
 https://issues.apache.org/jira/browse/KAFKA-2017
 
 
 Repository: kafka
 
 
 Description
 ---
 
 1. Upon receiving join-group, if the group metadata cannot be found in the 
 local cache try to read it from ZK; 2. Upon completing rebalance, update the 
 ZK with new group registry or delete the registry if the group becomes empty
 
 
 Diffs
 -
 
   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
 af06ad45cdc46ac3bc27898ebc1a5bd5b1c7b19e 
   core/src/main/scala/kafka/coordinator/ConsumerGroupMetadata.scala 
 47bdfa7cc86fd4e841e2b1d6bfd40f1508e643bd 
   core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala 
 c39e6de34ee531c6dfa9107b830752bd7f8fbe59 
   core/src/main/scala/kafka/utils/ZkUtils.scala 
 2618dd39b925b979ad6e4c0abd5c6eaafb3db5d5 
 
 Diff: https://reviews.apache.org/r/34450/diff/
 
 
 Testing
 ---
 
 
 Thanks,
 
 Guozhang Wang