> On April 28, 2015, 12:13 a.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, lines 
> > 281-355
> > <https://reviews.apache.org/r/33088/diff/9/?file=941787#file941787line281>
> >
> >     If we add the CoordinatorMetadata class, these functions can all be 
> > moved to that class.

I made a CoordinatorMetadata that only manages groups and keeps track of which 
topics to listen to changes for. It delegates as much group and consumer logic 
as possible to the ConsumerCoordinator (stuff like group synchronization, 
adding/removing/updating consumers and the corresponding topic bind/unbind 
operations), as I wanted to keep all the group logic in one place 
(ConsumerCoordinator).


> On April 28, 2015, 12:13 a.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, line 463
> > <https://reviews.apache.org/r/33088/diff/9/?file=941787#file941787line463>
> >
> >     Let's call "finishCurrentHeartbeat" upon receiving the join group 
> > request (line 155 above), and only "expectNextHeartbeat" here. Because 
> > otherwise if the rebalance is waiting long then this consumer will be 
> > unnecessarily marked as dead right?

Just to make sure I understand your concern, is your concern that:
1. we scheduleHeartbeatExpiration for a consumer c in group g
2. g for some reason transitions to PreparingRebalance
3. c rejoins and is waiting for the rebalance to complete.
4. PreparingRebalance is taking a long time. The DelayedHeartbeat expires. c is 
marked dead.
5. g rebalances and stabilizes without c?

If this is your concern, it won't happen. Expired DelayedHeartbeats won't mark 
rejoined consumers that are awaiting rebalance as dead due to the following 
concept from the 
[wiki](https://cwiki.apache.org/confluence/display/KAFKA/Kafka+0.9+Consumer+Rewrite+Design#Kafka0.9ConsumerRewriteDesign-Heartbeatsduringrebalance):
 The co-ordinator pauses failure detection for a consumer that has sent a 
JoinGroupRequest until a JoinGroupResponse is sent out. It restarts the 
hearbeat timer once the JoinGroupResponse is sent out and marks a consumer dead 
if it does not receive a HeartbeatRequest from that time for another 
session.timeout.ms milliseconds.

I implemented this pause by making sure an expired DelayedHeartbeat on c won't 
mark c as dead if c is awaitingRebalance (line 353).


> On April 28, 2015, 12:13 a.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, line 499
> > <https://reviews.apache.org/r/33088/diff/9/?file=941787#file941787line499>
> >
> >     I think once we split the scheduleHeartbeat Expiration function, we do 
> > not need to check here as after the join-group is received but before 
> > rebalance is finished, we will not expect any heartbeat anymore.

It's true that failure detection will pause during rebalance. But as stated 
above, some sort of check is still needed in onExpirationHeartbeat to prevent 
DelayedHeartbeats that were scheduled before rebalance and expired during 
rebalance from marking rejoined consumers as dead regardless of how that 
scheduleHeartbeatExpiration method is split up.


On April 28, 2015, 12:13 a.m., Onur Karaman wrote:
> > Some general comments:
> > 
> > 1. Could you add some step-by-step and if-else comments in coordinator's 
> > functions to make it more self-explanary?
> > 
> > 2. Note that onCompleteXXX functions will be called from purgatory repear 
> > threads, which will in turn call other internal functions like rebalance(), 
> > while other times these functions will be called by the request handler 
> > threads. Are there any possible deadlock issues? I ask for another 
> > thought-through just because we have been bite-on before due to this fact.

For 2: I see two types of resources here: group locks and the metadataLock. We 
either only acquire a group lock, only acquire a metadataLock, or nest the 
metadataLock inside a group lock. We never nest group locks, so we don't get 
circular waits between group locks. We also never nest a group lock inside the 
metadataLock, so we don't get circular waits between group locks and the 
metadataLock.

ZkClient doesn't cause any circular waits because internally, it just puts all 
ZkEvents into a queue and has a dedicated thread processing each event's 
corresponding TopicPartitionChangeListener handleDataChange or 
handleDataDeleted callbacks one by one, and these callbacks adhere to the 
locking patterns above. Since there doesn't seem to be any circular waits, it 
looks like there are no deadlocks.


- Onur


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33088/#review81743
-----------------------------------------------------------


On May 5, 2015, 5:50 p.m., Onur Karaman wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33088/
> -----------------------------------------------------------
> 
> (Updated May 5, 2015, 5:50 p.m.)
> 
> 
> Review request for kafka.
> 
> 
> Bugs: KAFKA-1334
>     https://issues.apache.org/jira/browse/KAFKA-1334
> 
> 
> Repository: kafka
> 
> 
> Description
> -------
> 
> add heartbeat to coordinator
> 
> todo:
> - see how it performs under real load
> - add error code handling on the consumer side
> 
> 
> Diffs
> -----
> 
>   
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
>  e55ab11df4db0b0084f841a74cbcf819caf780d5 
>   clients/src/main/java/org/apache/kafka/common/protocol/Errors.java 
> 36aa412404ff1458c7bef0feecaaa8bc45bed9c7 
>   core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala 
> 456b602245e111880e1b8b361319cabff38ee0e9 
>   core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala 
> 2f5797064d4131ecfc9d2750d9345a9fa3972a9a 
>   core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala 
> PRE-CREATION 
>   core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala 
> 6a6bc7bc4ceb648b67332e789c2c33de88e4cd86 
>   core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala 
> df60cbc35d09937b4e9c737c67229889c69d8698 
>   core/src/main/scala/kafka/coordinator/DelayedRebalance.scala 
> 8defa2e41c92f1ebe255177679d275c70dae5b3e 
>   core/src/main/scala/kafka/coordinator/Group.scala PRE-CREATION 
>   core/src/main/scala/kafka/coordinator/GroupRegistry.scala 
> 94ef5829b3a616c90018af1db7627bfe42e259e5 
>   core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala 
> 821e26e97eaa97b5f4520474fff0fedbf406c82a 
>   core/src/main/scala/kafka/coordinator/PartitionAssignor.scala PRE-CREATION 
>   core/src/main/scala/kafka/server/DelayedOperationKey.scala 
> b673e43b0ba401b2e22f27aef550e3ab0ef4323c 
>   core/src/main/scala/kafka/server/KafkaApis.scala 
> b4004aa3a1456d337199aa1245fb0ae61f6add46 
>   core/src/main/scala/kafka/server/KafkaServer.scala 
> c63f4ba9d622817ea8636d4e6135fba917ce085a 
>   core/src/main/scala/kafka/server/OffsetManager.scala 
> 18680ce100f10035175cc0263ba7787ab0f6a17a 
>   core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala 
> PRE-CREATION 
>   core/src/test/scala/unit/kafka/coordinator/GroupTest.scala PRE-CREATION 
>   core/src/test/scala/unit/kafka/coordinator/PartitionAssignorTest.scala 
> PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/33088/diff/
> 
> 
> Testing
> -------
> 
> 
> Thanks,
> 
> Onur Karaman
> 
>

Reply via email to