> On April 28, 2015, 12:13 a.m., Guozhang Wang wrote:
> > core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala, line 261
> > <https://reviews.apache.org/r/33088/diff/9/?file=941787#file941787line261>
> >
> > Function name "scheduleHeartbeatExpiration" is a bit misleading, it is
> > really "mark the current heartbeat as completed before expiration if there
> > is any and schedule the next heartbeat". So how about spliting it into two
> > functions, "finishCurrentHeartbeat" and "expectNextHeartbeat", and upon
> > consumer creation we only need to call the latter one.
I'm assuming here that you want to the split to be something like this:
```java
private def finishCurrentHeartbeat(group: Group, consumer: Consumer) {
consumer.latestHeartbeat = SystemTime.milliseconds
val consumerKey = ConsumerKey(consumer.groupId, consumer.consumerId)
// TODO: can we fix DelayedOperationPurgatory to remove keys in
watchersForKey with empty watchers list?
heartbeatPurgatory.checkAndComplete(consumerKey)
}
private def scheduleNextHeartbeatExpiration(group: Group, consumer: Consumer) {
val consumerKey = ConsumerKey(consumer.groupId, consumer.consumerId)
val heartbeatDeadline = consumer.latestHeartbeat + consumer.sessionTimeoutMs
val delayedHeartbeat = new DelayedHeartbeat(this, group, consumer,
heartbeatDeadline, consumer.sessionTimeoutMs)
heartbeatPurgatory.tryCompleteElseWatch(delayedHeartbeat, Seq(consumerKey))
}
```
I'd rather not do the split. It adds some confusion because now you have to
think about which of the two you'd need to call in a given scenario, and in
what order. Both the finishCurrentHeartbeat and the
scheduleNextHeartbeatExpiration rely on latestHeartbeat being correctly
updated, so I'd rather just put all the related logic in one place.
- Onur
-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/33088/#review81743
-----------------------------------------------------------
On May 8, 2015, 5:55 p.m., Onur Karaman wrote:
>
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/33088/
> -----------------------------------------------------------
>
> (Updated May 8, 2015, 5:55 p.m.)
>
>
> Review request for kafka.
>
>
> Bugs: KAFKA-1334
> https://issues.apache.org/jira/browse/KAFKA-1334
>
>
> Repository: kafka
>
>
> Description
> -------
>
> add heartbeat to coordinator
>
> todo:
> - see how it performs under real load
> - add error code handling on the consumer side
>
>
> Diffs
> -----
>
>
> clients/src/main/java/org/apache/kafka/clients/consumer/internals/Coordinator.java
> e55ab11df4db0b0084f841a74cbcf819caf780d5
> clients/src/main/java/org/apache/kafka/common/protocol/Errors.java
> 36aa412404ff1458c7bef0feecaaa8bc45bed9c7
> core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
> 456b602245e111880e1b8b361319cabff38ee0e9
> core/src/main/scala/kafka/coordinator/ConsumerRegistry.scala
> 2f5797064d4131ecfc9d2750d9345a9fa3972a9a
> core/src/main/scala/kafka/coordinator/CoordinatorMetadata.scala
> PRE-CREATION
> core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
> 6a6bc7bc4ceb648b67332e789c2c33de88e4cd86
> core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala
> f5bd5dc802bc9fb8d175a0813308948e88c2a8b1
> core/src/main/scala/kafka/coordinator/DelayedRebalance.scala
> 60fbdae164fda74eab859f4ff25b7beff7fce757
> core/src/main/scala/kafka/coordinator/Group.scala PRE-CREATION
> core/src/main/scala/kafka/coordinator/GroupRegistry.scala
> 94ef5829b3a616c90018af1db7627bfe42e259e5
> core/src/main/scala/kafka/coordinator/HeartbeatBucket.scala
> 821e26e97eaa97b5f4520474fff0fedbf406c82a
> core/src/main/scala/kafka/coordinator/PartitionAssignor.scala PRE-CREATION
> core/src/main/scala/kafka/server/DelayedOperationKey.scala
> b673e43b0ba401b2e22f27aef550e3ab0ef4323c
> core/src/main/scala/kafka/server/KafkaApis.scala
> 417960dd1ab407ebebad8fdb0e97415db3e91a2f
> core/src/main/scala/kafka/server/KafkaServer.scala
> b7d2a2842e17411a823b93bdedc84657cbd62be1
> core/src/main/scala/kafka/server/OffsetManager.scala
> 18680ce100f10035175cc0263ba7787ab0f6a17a
> core/src/test/scala/integration/kafka/api/ConsumerTest.scala
> ffbdf5dc106e2a59563768280074696c76491337
> core/src/test/scala/integration/kafka/api/IntegrationTestHarness.scala
> 2bbd4c96f8c70f22d12e593941e2c26c39352900
> core/src/test/scala/unit/kafka/coordinator/CoordinatorMetadataTest.scala
> PRE-CREATION
> core/src/test/scala/unit/kafka/coordinator/GroupTest.scala PRE-CREATION
> core/src/test/scala/unit/kafka/coordinator/PartitionAssignorTest.scala
> PRE-CREATION
>
> Diff: https://reviews.apache.org/r/33088/diff/
>
>
> Testing
> -------
>
>
> Thanks,
>
> Onur Karaman
>
>