Repository: kafka
Updated Branches:
  refs/heads/0.11.0 973f9d2b2 -> 3a515505e


KAFKA-5600; Fix group loading regression causing stale metadata/offset cache

the while loop was too big and need to be closed earlier
to see the fix, ignore whitespace since most of it is indentation

this bug was introduced by commit
5bd06f1d542e6b588a1d402d059bc24690017d32

Author: Jan Burkhardt <jan.burkhardt@just.social>

Reviewers: Ismael Juma <ism...@juma.me.uk>, Jason Gustafson <ja...@confluent.io>

Closes #3538 from bjrke/trunk

(cherry picked from commit e2fe19d22a42525001c5a66f21f02f49c051ecb5)
Signed-off-by: Jason Gustafson <ja...@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/3a515505
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/3a515505
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/3a515505

Branch: refs/heads/0.11.0
Commit: 3a515505e07fb4ccc961afcf52b5bad5fbf57e0f
Parents: 973f9d2
Author: Jan Burkhardt <jan.burkhardt@just.social>
Authored: Mon Jul 17 11:49:50 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon Jul 17 11:53:45 2017 -0700

----------------------------------------------------------------------
 .../group/GroupMetadataManager.scala            | 86 ++++++++++----------
 .../group/GroupMetadataManagerTest.scala        | 66 ++++++++++++++-
 2 files changed, 105 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/3a515505/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala 
b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index a8419fd..9322ff2 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -550,57 +550,57 @@ class GroupMetadataManager(brokerId: Int,
             }
             currOffset = batch.nextOffset
           }
+        }
 
+        val (groupOffsets, emptyGroupOffsets) = loadedOffsets
+          .groupBy(_._1.group)
+          .mapValues(_.map { case (groupTopicPartition, offset) => 
(groupTopicPartition.topicPartition, offset) })
+          .partition { case (group, _) => loadedGroups.contains(group) }
 
-          val (groupOffsets, emptyGroupOffsets) = loadedOffsets
+        val pendingOffsetsByGroup = mutable.Map[String, mutable.Map[Long, 
mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]]()
+        pendingOffsets.foreach { case (producerId, producerOffsets) =>
+          
producerOffsets.keySet.map(_.group).foreach(addProducerGroup(producerId, _))
+          producerOffsets
             .groupBy(_._1.group)
-            .mapValues(_.map { case (groupTopicPartition, offset) => 
(groupTopicPartition.topicPartition, offset) })
-            .partition { case (group, _) => loadedGroups.contains(group) }
-
-          val pendingOffsetsByGroup = mutable.Map[String, mutable.Map[Long, 
mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]]]()
-          pendingOffsets.foreach { case (producerId, producerOffsets) =>
-            
producerOffsets.keySet.map(_.group).foreach(addProducerGroup(producerId, _))
-            producerOffsets
-              .groupBy(_._1.group)
-              .mapValues(_.map { case (groupTopicPartition, offset) => 
(groupTopicPartition.topicPartition, offset)})
-              .foreach { case (group, offsets) =>
-                val groupPendingOffsets = 
pendingOffsetsByGroup.getOrElseUpdate(group, mutable.Map.empty[Long, 
mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
-                val groupProducerOffsets = 
groupPendingOffsets.getOrElseUpdate(producerId, 
mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
-                groupProducerOffsets ++= offsets
-              }
-          }
+            .mapValues(_.map { case (groupTopicPartition, offset) => 
(groupTopicPartition.topicPartition, offset)})
+            .foreach { case (group, offsets) =>
+              val groupPendingOffsets = 
pendingOffsetsByGroup.getOrElseUpdate(group, mutable.Map.empty[Long, 
mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
+              val groupProducerOffsets = 
groupPendingOffsets.getOrElseUpdate(producerId, 
mutable.Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
+              groupProducerOffsets ++= offsets
+            }
+        }
 
-          val (pendingGroupOffsets, pendingEmptyGroupOffsets) = 
pendingOffsetsByGroup
-            .partition { case (group, _) => loadedGroups.contains(group)}
+        val (pendingGroupOffsets, pendingEmptyGroupOffsets) = 
pendingOffsetsByGroup
+          .partition { case (group, _) => loadedGroups.contains(group)}
 
-          loadedGroups.values.foreach { group =>
-            val offsets = groupOffsets.getOrElse(group.groupId, 
Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
-            val pendingOffsets = pendingGroupOffsets.getOrElse(group.groupId, 
Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
-            debug(s"Loaded group metadata $group with offsets $offsets and 
pending offsets $pendingOffsets")
-            loadGroup(group, offsets, pendingOffsets)
-            onGroupLoaded(group)
-          }
+        loadedGroups.values.foreach { group =>
+          val offsets = groupOffsets.getOrElse(group.groupId, 
Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
+          val pendingOffsets = pendingGroupOffsets.getOrElse(group.groupId, 
Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
+          debug(s"Loaded group metadata $group with offsets $offsets and 
pending offsets $pendingOffsets")
+          loadGroup(group, offsets, pendingOffsets)
+          onGroupLoaded(group)
+        }
 
-          // load groups which store offsets in kafka, but which have no 
active members and thus no group
-          // metadata stored in the log
-          (emptyGroupOffsets.keySet ++ 
pendingEmptyGroupOffsets.keySet).foreach { case(groupId) =>
-            val group = new GroupMetadata(groupId)
-            val offsets = emptyGroupOffsets.getOrElse(groupId, 
Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
-            val pendingOffsets = pendingEmptyGroupOffsets.getOrElse(groupId, 
Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
-            debug(s"Loaded group metadata $group with offsets $offsets and 
pending offsets $pendingOffsets")
-            loadGroup(group, offsets, pendingOffsets)
-            onGroupLoaded(group)
-          }
+        // load groups which store offsets in kafka, but which have no active 
members and thus no group
+        // metadata stored in the log
+        (emptyGroupOffsets.keySet ++ pendingEmptyGroupOffsets.keySet).foreach 
{ case(groupId) =>
+          val group = new GroupMetadata(groupId)
+          val offsets = emptyGroupOffsets.getOrElse(groupId, 
Map.empty[TopicPartition, CommitRecordMetadataAndOffset])
+          val pendingOffsets = pendingEmptyGroupOffsets.getOrElse(groupId, 
Map.empty[Long, mutable.Map[TopicPartition, CommitRecordMetadataAndOffset]])
+          debug(s"Loaded group metadata $group with offsets $offsets and 
pending offsets $pendingOffsets")
+          loadGroup(group, offsets, pendingOffsets)
+          onGroupLoaded(group)
+        }
 
-          removedGroups.foreach { groupId =>
-            // if the cache already contains a group which should be removed, 
raise an error. Note that it
-            // is possible (however unlikely) for a consumer group to be 
removed, and then to be used only for
-            // offset storage (i.e. by "simple" consumers)
-            if (groupMetadataCache.contains(groupId) && 
!emptyGroupOffsets.contains(groupId))
-              throw new IllegalStateException(s"Unexpected unload of active 
group $groupId while " +
-                s"loading partition $topicPartition")
-          }
+        removedGroups.foreach { groupId =>
+          // if the cache already contains a group which should be removed, 
raise an error. Note that it
+          // is possible (however unlikely) for a consumer group to be 
removed, and then to be used only for
+          // offset storage (i.e. by "simple" consumers)
+          if (groupMetadataCache.contains(groupId) && 
!emptyGroupOffsets.contains(groupId))
+            throw new IllegalStateException(s"Unexpected unload of active 
group $groupId while " +
+              s"loading partition $topicPartition")
         }
+
     }
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/3a515505/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 6245e85..a8ce17e 100644
--- 
a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ 
b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -610,6 +610,51 @@ class GroupMetadataManagerTest {
   }
 
   @Test
+  def testLoadGroupAndOffsetsFromDifferentSegments(): Unit = {
+    val startOffset = 15L
+    val tp0 = new TopicPartition("foo", 0)
+    val tp1 = new TopicPartition("foo", 1)
+    val tp2 = new TopicPartition("bar", 0)
+    val tp3 = new TopicPartition("xxx", 0)
+
+    val logMock =  EasyMock.mock(classOf[Log])
+    
EasyMock.expect(replicaManager.getLog(groupTopicPartition)).andStubReturn(Some(logMock))
+
+    val segment1MemberId = "a"
+    val segment1Offsets = Map(tp0 -> 23L, tp1 -> 455L, tp3 -> 42L)
+    val segment1Records = MemoryRecords.withRecords(startOffset, 
CompressionType.NONE,
+      createCommittedOffsetRecords(segment1Offsets) ++ 
Seq(buildStableGroupRecordWithMember(segment1MemberId)): _*)
+    val segment1End = expectGroupMetadataLoad(logMock, startOffset, 
segment1Records)
+
+    val segment2MemberId = "b"
+    val segment2Offsets = Map(tp0 -> 33L, tp2 -> 8992L, tp3 -> 10L)
+    val segment2Records = MemoryRecords.withRecords(segment1End, 
CompressionType.NONE,
+      createCommittedOffsetRecords(segment2Offsets) ++ 
Seq(buildStableGroupRecordWithMember(segment2MemberId)): _*)
+    val segment2End = expectGroupMetadataLoad(logMock, segment1End, 
segment2Records)
+
+    
EasyMock.expect(replicaManager.getLogEndOffset(groupTopicPartition)).andStubReturn(Some(segment2End))
+
+    EasyMock.replay(logMock, replicaManager)
+
+    groupMetadataManager.loadGroupsAndOffsets(groupTopicPartition, _ => ())
+
+    val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group 
was not loaded into the cache"))
+    assertEquals(groupId, group.groupId)
+    assertEquals(Stable, group.currentState)
+
+    assertEquals("segment2 group record member should be elected", 
segment2MemberId, group.leaderId)
+    assertEquals("segment2 group record member should be only member", 
Set(segment2MemberId), group.allMembers)
+
+    // offsets of segment1 should be overridden by segment2 offsets of the 
same topic partitions
+    val committedOffsets = segment1Offsets ++ segment2Offsets
+    assertEquals(committedOffsets.size, group.allOffsets.size)
+    committedOffsets.foreach { case (topicPartition, offset) =>
+      assertEquals(Some(offset), group.offset(topicPartition).map(_.offset))
+    }
+  }
+
+
+  @Test
   def testAddGroup() {
     val group = new GroupMetadata("foo")
     assertEquals(group, groupMetadataManager.addGroup(group))
@@ -1303,20 +1348,33 @@ class GroupMetadataManagerTest {
   private def expectGroupMetadataLoad(groupMetadataTopicPartition: 
TopicPartition,
                                       startOffset: Long,
                                       records: MemoryRecords): Unit = {
-    val endOffset = startOffset + records.records.asScala.size
     val logMock =  EasyMock.mock(classOf[Log])
+    
EasyMock.expect(replicaManager.getLog(groupMetadataTopicPartition)).andStubReturn(Some(logMock))
+    val endOffset = expectGroupMetadataLoad(logMock, startOffset, records)
+    
EasyMock.expect(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).andStubReturn(Some(endOffset))
+    EasyMock.replay(logMock)
+  }
+
+  /**
+   * mock records into a mocked log
+   *
+   * @return the calculated end offset to be mocked into 
[[ReplicaManager.getLogEndOffset]]
+   */
+  private def expectGroupMetadataLoad(logMock: Log,
+                                      startOffset: Long,
+                                      records: MemoryRecords): Long = {
+    val endOffset = startOffset + records.records.asScala.size
     val fileRecordsMock = EasyMock.mock(classOf[FileRecords])
 
-    
EasyMock.expect(replicaManager.getLog(groupMetadataTopicPartition)).andStubReturn(Some(logMock))
     EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset)
-    
EasyMock.expect(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).andStubReturn(Some(endOffset))
     EasyMock.expect(logMock.read(EasyMock.eq(startOffset), EasyMock.anyInt(), 
EasyMock.eq(None),
       EasyMock.eq(true), EasyMock.eq(IsolationLevel.READ_UNCOMMITTED)))
       .andReturn(FetchDataInfo(LogOffsetMetadata(startOffset), 
fileRecordsMock))
     
EasyMock.expect(fileRecordsMock.readInto(EasyMock.anyObject(classOf[ByteBuffer]),
 EasyMock.anyInt()))
       .andReturn(records.buffer)
+    EasyMock.replay(fileRecordsMock)
 
-    EasyMock.replay(logMock, fileRecordsMock)
+    endOffset
   }
 
   private def createCommittedOffsetRecords(committedOffsets: 
Map[TopicPartition, Long],

Reply via email to