Repository: kafka Updated Branches: refs/heads/0.10.1 9ffadbfdc -> 0e20e5fbf
KAFKA-4399; Fix deadlock between cleanupGroupMetadata and offset commit Author: Alexey Ozeritsky <[email protected]> Reviewers: Ismael Juma <[email protected]>, Jason Gustafson <[email protected]> Closes #2125 from resetius/KAFKA-4399 (cherry picked from commit ea370be518a783f3a5d8d834f78c82e36bf968b3) Signed-off-by: Jason Gustafson <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/0e20e5fb Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/0e20e5fb Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/0e20e5fb Branch: refs/heads/0.10.1 Commit: 0e20e5fbfe683257882f0426357b6c2e75bcacca Parents: 9ffadbf Author: Alexey Ozeritsky <[email protected]> Authored: Thu Dec 1 19:09:06 2016 -0800 Committer: Jason Gustafson <[email protected]> Committed: Thu Dec 1 19:57:12 2016 -0800 ---------------------------------------------------------------------- .../coordinator/GroupMetadataManager.scala | 106 +++++++++---------- 1 file changed, 48 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/0e20e5fb/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala index feedc45..fafc39c 100644 --- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala @@ -136,43 +136,6 @@ class GroupMetadataManager(val brokerId: Int, } } - /** - * Remove the group from the cache and delete all metadata associated with it. This should be - * called only after all offsets for the group have expired and no members are remaining (i.e. - * it is in the Empty state). - */ - private def evictGroupAndDeleteMetadata(group: GroupMetadata) { - // guard this removal in case of concurrent access (e.g. if a delayed join completes with no members - // while the group is being removed due to coordinator emigration). We also avoid writing the tombstone - // when the generationId is 0, since this group is only using Kafka for offset storage. - if (groupMetadataCache.remove(group.groupId, group) && group.generationId > 0) { - // Append the tombstone messages to the partition. It is okay if the replicas don't receive these (say, - // if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and - // retry removing this group. - val groupPartition = partitionFor(group.groupId) - getMessageFormatVersionAndTimestamp(groupPartition).foreach { case (magicValue, timestamp) => - val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId), - timestamp = timestamp, magicValue = magicValue) - - val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName, groupPartition) - partitionOpt.foreach { partition => - val appendPartition = TopicAndPartition(Topic.GroupMetadataTopicName, groupPartition) - - trace("Marking group %s as deleted.".format(group.groupId)) - - try { - // do not need to require acks since even if the tombstone is lost, - // it will be appended again by the new leader - partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstone)) - } catch { - case t: Throwable => - error("Failed to mark group %s as deleted in %s.".format(group.groupId, appendPartition), t) - // ignore and continue - } - } - } - } - } def prepareStoreGroup(group: GroupMetadata, groupAssignment: Map[String, Array[Byte]], @@ -598,7 +561,7 @@ class GroupMetadataManager(val brokerId: Int, val startMs = time.milliseconds() var offsetsRemoved = 0 - groupMetadataCache.foreach { case (groupId, group) => + val result = groupMetadataCache.flatMap { case (groupId, group) => group synchronized { if (!group.is(Dead)) { val offsetsPartition = partitionFor(groupId) @@ -612,32 +575,59 @@ class GroupMetadataManager(val brokerId: Int, new Message(bytes = null, key = commitKey, timestamp = timestamp, magicValue = magicValue) }.toBuffer - val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName, offsetsPartition) - partitionOpt.foreach { partition => - val appendPartition = TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition) - trace("Marked %d offsets in %s for deletion.".format(tombstones.size, appendPartition)) - - try { - // do not need to require acks since even if the tombstone is lost, - // it will be appended again in the next purge cycle - partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstones: _*)) - offsetsRemoved += tombstones.size - } - catch { - case t: Throwable => - error("Failed to mark %d expired offsets for deletion in %s.".format(tombstones.size, appendPartition), t) - // ignore and continue - } - } + val numOffsetsExpired = tombstones.size if (group.is(Empty) && !group.hasOffsets) { group.transitionTo(Dead) - evictGroupAndDeleteMetadata(group) - info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId)) + + // We avoid writing the tombstone + // when the generationId is 0, since this group is only using Kafka for offset storage. + if (groupMetadataCache.get(groupId) == group && group.generationId > 0) { + // Append the tombstone messages to the partition. It is okay if the replicas don't receive these (say, + // if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and + // retry removing this group. + + trace("Marking group %s as deleted.".format(groupId)) + + tombstones += new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(groupId), + timestamp = timestamp, magicValue = magicValue) + } } + Some((group, offsetsPartition, tombstones, numOffsetsExpired)) case None => - info("BrokerId %d is no longer a coordinator for the group %s. Proceeding cleanup for other alive groups".format(brokerId, group.groupId)) + info("BrokerId %d is no longer a coordinator for the group %s. Proceeding cleanup for other alive groups".format(brokerId, groupId)) + None + } + } else { + None + } + } + } + + for ((group, offsetsPartition, tombstones, numOffsetsExpired) <- result) { + val partitionOpt = replicaManager.getPartition(Topic.GroupMetadataTopicName, offsetsPartition) + + partitionOpt.foreach { partition => + val appendPartition = TopicAndPartition(Topic.GroupMetadataTopicName, offsetsPartition) + trace("Marked %d offsets in %s for deletion.".format(numOffsetsExpired, appendPartition)) + + try { + // do not need to require acks since even if the tombstone is lost, + // it will be appended again in the next purge cycle + partition.appendMessagesToLeader(new ByteBufferMessageSet(config.offsetsTopicCompressionCodec, tombstones: _*)) + offsetsRemoved += numOffsetsExpired + } + catch { + case t: Throwable => + error(s"Failed to write ${tombstones.size} tombstones for group ${group.groupId} to $appendPartition.", t) + // ignore and continue + } + + group synchronized { + if (group.is(Dead)) { + groupMetadataCache.remove(group.groupId, group) + info("Group %s generation %s is dead and removed".format(group.groupId, group.generationId)) } } }
