Repository: kafka Updated Branches: refs/heads/0.11.0 973f9d2b2 -> 3a515505e
KAFKA-5600; Fix group loading regression causing stale metadata/offset cache the while loop was too big and need to be closed earlier to see the fix, ignore whitespace since most of it is indentation this bug was introduced by commit 5bd06f1d542e6b588a1d402d059bc24690017d32 Author: Jan Burkhardt <jan.burkhardt@just.social> Reviewers: Ismael Juma <ism...@juma.me.uk>, Jason Gustafson <ja...@confluent.io> Closes #3538 from bjrke/trunk (cherry picked from commit e2fe19d22a42525001c5a66f21f02f49c051ecb5) Signed-off-by: Jason Gustafson <ja...@confluent.io> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3a515505 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3a515505 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3a515505 Branch: refs/heads/0.11.0 Commit: 3a515505e07fb4ccc961afcf52b5bad5fbf57e0f Parents: 973f9d2 Author: Jan Burkhardt <jan.burkhardt@just.social> Authored: Mon Jul 17 11:49:50 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Mon Jul 17 11:53:45 2017 -0700 ---------------------------------------------------------------------- .../group/GroupMetadataManager.scala | 86 ++++++++++---------- .../group/GroupMetadataManagerTest.scala | 66 ++++++++++++++- 2 files changed, 105 insertions(+), 47 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/3a515505/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala index a8419fd..9322ff2 100644 --- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala +++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala @@ -550,57 +550,57 @@ class GroupMetadataManager(brokerId: Int, } currOffset = batch.nextOffset } + } + val (groupOffsets, emptyGroupOffsets) = loadedOffsets + .groupBy(_._1.group) + .mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset) }) + .partition { case (group, _) => loadedGroups.contains(group) } - val (groupOffsets, emptyGroupOffsets) = loadedOffsets + val pendingOffsetsByGroup = mutable.Map[String, mutable.Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]]() + pendingOffsets.foreach { case (producerId, producerOffsets) => + producerOffsets.keySet.map(_.group).foreach(addProducerGroup(producerId, _)) + producerOffsets .groupBy(_._1.group) - .mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset) }) - .partition { case (group, _) => loadedGroups.contains(group) } - - val pendingOffsetsByGroup = mutable.Map[String, mutable.Map[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]]() - pendingOffsets.foreach { case (producerId, producerOffsets) => - producerOffsets.keySet.map(_.group).foreach(addProducerGroup(producerId, _)) - producerOffsets - .groupBy(_._1.group) - .mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset)}) - .foreach { case (group, offsets) => - val groupPendingOffsets = pendingOffsetsByGroup.getOrElseUpdate(group, mutable.Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]) - val groupProducerOffsets = groupPendingOffsets.getOrElseUpdate(producerId, mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset]) - groupProducerOffsets ++= offsets - } - } + .mapValues(_.map { case (groupTopicPartition, offset) => (groupTopicPartition.topicPartition, offset)}) + .foreach { case (group, offsets) => + val groupPendingOffsets = pendingOffsetsByGroup.getOrElseUpdate(group, mutable.Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]) + val groupProducerOffsets = groupPendingOffsets.getOrElseUpdate(producerId, mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset]) + groupProducerOffsets ++= offsets + } + } - val (pendingGroupOffsets, pendingEmptyGroupOffsets) = pendingOffsetsByGroup - .partition { case (group, _) => loadedGroups.contains(group)} + val (pendingGroupOffsets, pendingEmptyGroupOffsets) = pendingOffsetsByGroup + .partition { case (group, _) => loadedGroups.contains(group)} - loadedGroups.values.foreach { group => - val offsets = groupOffsets.getOrElse(group.groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset]) - val pendingOffsets = pendingGroupOffsets.getOrElse(group.groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]) - debug(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets") - loadGroup(group, offsets, pendingOffsets) - onGroupLoaded(group) - } + loadedGroups.values.foreach { group => + val offsets = groupOffsets.getOrElse(group.groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset]) + val pendingOffsets = pendingGroupOffsets.getOrElse(group.groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]) + debug(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets") + loadGroup(group, offsets, pendingOffsets) + onGroupLoaded(group) + } - // load groups which store offsets in kafka, but which have no active members and thus no group - // metadata stored in the log - (emptyGroupOffsets.keySet ++ pendingEmptyGroupOffsets.keySet).foreach { case(groupId) => - val group = new GroupMetadata(groupId) - val offsets = emptyGroupOffsets.getOrElse(groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset]) - val pendingOffsets = pendingEmptyGroupOffsets.getOrElse(groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]) - debug(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets") - loadGroup(group, offsets, pendingOffsets) - onGroupLoaded(group) - } + // load groups which store offsets in kafka, but which have no active members and thus no group + // metadata stored in the log + (emptyGroupOffsets.keySet ++ pendingEmptyGroupOffsets.keySet).foreach { case(groupId) => + val group = new GroupMetadata(groupId) + val offsets = emptyGroupOffsets.getOrElse(groupId, Map.empty[TopicPartition, CommitRecordMetadataAndOffset]) + val pendingOffsets = pendingEmptyGroupOffsets.getOrElse(groupId, Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]) + debug(s"Loaded group metadata $group with offsets $offsets and pending offsets $pendingOffsets") + loadGroup(group, offsets, pendingOffsets) + onGroupLoaded(group) + } - removedGroups.foreach { groupId => - // if the cache already contains a group which should be removed, raise an error. Note that it - // is possible (however unlikely) for a consumer group to be removed, and then to be used only for - // offset storage (i.e. by "simple" consumers) - if (groupMetadataCache.contains(groupId) && !emptyGroupOffsets.contains(groupId)) - throw new IllegalStateException(s"Unexpected unload of active group $groupId while " + - s"loading partition $topicPartition") - } + removedGroups.foreach { groupId => + // if the cache already contains a group which should be removed, raise an error. Note that it + // is possible (however unlikely) for a consumer group to be removed, and then to be used only for + // offset storage (i.e. by "simple" consumers) + if (groupMetadataCache.contains(groupId) && !emptyGroupOffsets.contains(groupId)) + throw new IllegalStateException(s"Unexpected unload of active group $groupId while " + + s"loading partition $topicPartition") } + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/3a515505/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala index 6245e85..a8ce17e 100644 --- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala +++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala @@ -610,6 +610,51 @@ class GroupMetadataManagerTest { } @Test + def testLoadGroupAndOffsetsFromDifferentSegments(): Unit = { + val startOffset = 15L + val tp0 = new TopicPartition("foo", 0) + val tp1 = new TopicPartition("foo", 1) + val tp2 = new TopicPartition("bar", 0) + val tp3 = new TopicPartition("xxx", 0) + + val logMock = EasyMock.mock(classOf[Log]) + EasyMock.expect(replicaManager.getLog(groupTopicPartition)).andStubReturn(Some(logMock)) + + val segment1MemberId = "a" + val segment1Offsets = Map(tp0 -> 23L, tp1 -> 455L, tp3 -> 42L) + val segment1Records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, + createCommittedOffsetRecords(segment1Offsets) ++ Seq(buildStableGroupRecordWithMember(segment1MemberId)): _*) + val segment1End = expectGroupMetadataLoad(logMock, startOffset, segment1Records) + + val segment2MemberId = "b" + val segment2Offsets = Map(tp0 -> 33L, tp2 -> 8992L, tp3 -> 10L) + val segment2Records = MemoryRecords.withRecords(segment1End, CompressionType.NONE, + createCommittedOffsetRecords(segment2Offsets) ++ Seq(buildStableGroupRecordWithMember(segment2MemberId)): _*) + val segment2End = expectGroupMetadataLoad(logMock, segment1End, segment2Records) + + EasyMock.expect(replicaManager.getLogEndOffset(groupTopicPartition)).andStubReturn(Some(segment2End)) + + EasyMock.replay(logMock, replicaManager) + + groupMetadataManager.loadGroupsAndOffsets(groupTopicPartition, _ => ()) + + val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache")) + assertEquals(groupId, group.groupId) + assertEquals(Stable, group.currentState) + + assertEquals("segment2 group record member should be elected", segment2MemberId, group.leaderId) + assertEquals("segment2 group record member should be only member", Set(segment2MemberId), group.allMembers) + + // offsets of segment1 should be overridden by segment2 offsets of the same topic partitions + val committedOffsets = segment1Offsets ++ segment2Offsets + assertEquals(committedOffsets.size, group.allOffsets.size) + committedOffsets.foreach { case (topicPartition, offset) => + assertEquals(Some(offset), group.offset(topicPartition).map(_.offset)) + } + } + + + @Test def testAddGroup() { val group = new GroupMetadata("foo") assertEquals(group, groupMetadataManager.addGroup(group)) @@ -1303,20 +1348,33 @@ class GroupMetadataManagerTest { private def expectGroupMetadataLoad(groupMetadataTopicPartition: TopicPartition, startOffset: Long, records: MemoryRecords): Unit = { - val endOffset = startOffset + records.records.asScala.size val logMock = EasyMock.mock(classOf[Log]) + EasyMock.expect(replicaManager.getLog(groupMetadataTopicPartition)).andStubReturn(Some(logMock)) + val endOffset = expectGroupMetadataLoad(logMock, startOffset, records) + EasyMock.expect(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).andStubReturn(Some(endOffset)) + EasyMock.replay(logMock) + } + + /** + * mock records into a mocked log + * + * @return the calculated end offset to be mocked into [[ReplicaManager.getLogEndOffset]] + */ + private def expectGroupMetadataLoad(logMock: Log, + startOffset: Long, + records: MemoryRecords): Long = { + val endOffset = startOffset + records.records.asScala.size val fileRecordsMock = EasyMock.mock(classOf[FileRecords]) - EasyMock.expect(replicaManager.getLog(groupMetadataTopicPartition)).andStubReturn(Some(logMock)) EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset) - EasyMock.expect(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).andStubReturn(Some(endOffset)) EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), EasyMock.eq(None), EasyMock.eq(true), EasyMock.eq(IsolationLevel.READ_UNCOMMITTED))) .andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), fileRecordsMock)) EasyMock.expect(fileRecordsMock.readInto(EasyMock.anyObject(classOf[ByteBuffer]), EasyMock.anyInt())) .andReturn(records.buffer) + EasyMock.replay(fileRecordsMock) - EasyMock.replay(logMock, fileRecordsMock) + endOffset } private def createCommittedOffsetRecords(committedOffsets: Map[TopicPartition, Long],