Repository: kafka Updated Branches: refs/heads/trunk 5afa16601 -> c4282371d
KAFKA-3343; Use NoTimestamp in GroupMetadataManager when message v0 i⦠â¦s used. Author: Jiangjie Qin <[email protected]> Reviewers: Ismael Juma <[email protected]>, Jun Rao <[email protected]> Closes #1023 from becketqin/KAFKA-3343 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c4282371 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c4282371 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c4282371 Branch: refs/heads/trunk Commit: c4282371d954d7ae6decd32252d85f0d2a254e8c Parents: 5afa166 Author: Jiangjie Qin <[email protected]> Authored: Tue Mar 8 18:34:07 2016 -0600 Committer: Jun Rao <[email protected]> Committed: Tue Mar 8 18:34:07 2016 -0600 ---------------------------------------------------------------------- .../coordinator/GroupMetadataManager.scala | 24 ++++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c4282371/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala index cbdb854..2c0236e 100644 --- a/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/GroupMetadataManager.scala @@ -143,8 +143,9 @@ class GroupMetadataManager(val brokerId: Int, // if we crash or leaders move) since the new leaders will still expire the consumers with heartbeat and // retry removing this group. val groupPartition = partitionFor(group.groupId) + val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(groupPartition) val tombstone = new Message(bytes = null, key = GroupMetadataManager.groupMetadataKey(group.groupId), - timestamp = time.milliseconds(), magicValue = getMessageFormatVersion(groupPartition)) + timestamp = timestamp, magicValue = magicValue) val partitionOpt = replicaManager.getPartition(GroupCoordinator.GroupMetadataTopicName, groupPartition) partitionOpt.foreach { partition => @@ -169,12 +170,12 @@ class GroupMetadataManager(val brokerId: Int, def prepareStoreGroup(group: GroupMetadata, groupAssignment: Map[String, Array[Byte]], responseCallback: Short => Unit): DelayedStore = { + val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(partitionFor(group.groupId)) val message = new Message( key = GroupMetadataManager.groupMetadataKey(group.groupId), bytes = GroupMetadataManager.groupMetadataValue(group, groupAssignment), - timestamp = time.milliseconds(), - magicValue = getMessageFormatVersion(partitionFor(group.groupId)) - ) + timestamp = timestamp, + magicValue = magicValue) val groupMetadataPartition = new TopicPartition(GroupCoordinator.GroupMetadataTopicName, partitionFor(group.groupId)) @@ -253,11 +254,12 @@ class GroupMetadataManager(val brokerId: Int, // construct the message set to append val messages = filteredOffsetMetadata.map { case (topicAndPartition, offsetAndMetadata) => + val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(partitionFor(groupId)) new Message( key = GroupMetadataManager.offsetCommitKey(groupId, topicAndPartition.topic, topicAndPartition.partition), bytes = GroupMetadataManager.offsetCommitValue(offsetAndMetadata), - timestamp = time.milliseconds(), - magicValue = getMessageFormatVersion(partitionFor(groupId)) + timestamp = timestamp, + magicValue = magicValue ) }.toSeq @@ -557,8 +559,8 @@ class GroupMetadataManager(val brokerId: Int, val commitKey = GroupMetadataManager.offsetCommitKey(groupTopicAndPartition.group, groupTopicAndPartition.topicPartition.topic, groupTopicAndPartition.topicPartition.partition) - (offsetsPartition, new Message(bytes = null, key = commitKey, timestamp = time.milliseconds(), - magicValue = getMessageFormatVersion(offsetsPartition))) + val (magicValue, timestamp) = getMessageFormatVersionAndTimestamp(offsetsPartition) + (offsetsPartition, new Message(bytes = null, key = commitKey, timestamp = timestamp, magicValue = magicValue)) }.groupBy { case (partition, tombstone) => partition } // Append the tombstone messages to the offset partitions. It is okay if the replicas don't receive these (say, @@ -627,11 +629,13 @@ class GroupMetadataManager(val brokerId: Int, config.offsetsTopicNumPartitions } - private def getMessageFormatVersion(partition: Int): Byte = { + private def getMessageFormatVersionAndTimestamp(partition: Int): (Byte, Long) = { val groupMetadataTopicAndPartition = new TopicAndPartition(GroupCoordinator.GroupMetadataTopicName, partition) - replicaManager.getMessageFormatVersion(groupMetadataTopicAndPartition).getOrElse { + val messageFormatVersion = replicaManager.getMessageFormatVersion(groupMetadataTopicAndPartition).getOrElse { throw new IllegalArgumentException(s"Message format version for partition $groupMetadataTopicPartitionCount not found") } + val timestamp = if (messageFormatVersion == Message.MagicValue_V0) Message.NoTimestamp else time.milliseconds() + (messageFormatVersion, timestamp) } /**
