This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1293658ccaa KAFKA-19163: Avoid deleting groups with pending
transactional offsets (#19496)
1293658ccaa is described below
commit 1293658ccaa5ada0f1e12ae78cb395bf81f43be0
Author: Sean Quah <[email protected]>
AuthorDate: Tue May 13 13:10:26 2025 +0100
KAFKA-19163: Avoid deleting groups with pending transactional offsets
(#19496)
When a group has pending transactional offsets but no committed offsets,
we can accidentally delete it while cleaning up expired offsets. Add a
check to avoid this case.
Reviewers: David Jacot <[email protected]>
---
.../coordinator/group/OffsetMetadataManager.java | 5 +--
.../group/OffsetMetadataManagerTest.java | 36 ++++++++++++++++++++++
2 files changed, 39 insertions(+), 2 deletions(-)
diff --git
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
index 54de475d4bf..f946af2ff62 100644
---
a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
+++
b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java
@@ -1000,13 +1000,14 @@ public class OffsetMetadataManager {
* @param groupId The group id.
* @param records The list of records to populate with offset commit
tombstone records.
*
- * @return True if no offsets exist or if all offsets expired, false
otherwise.
+ * @return True if no offsets exist after expiry and no pending
transactional offsets exist,
+ * false otherwise.
*/
public boolean cleanupExpiredOffsets(String groupId,
List<CoordinatorRecord> records) {
TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>>
offsetsByTopic =
offsets.offsetsByGroup.get(groupId);
if (offsetsByTopic == null) {
- return true;
+ return !openTransactions.contains(groupId);
}
// We expect the group to exist.
diff --git
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
index 382b2a9b0e5..d0925bb4c21 100644
---
a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
+++
b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/OffsetMetadataManagerTest.java
@@ -2662,6 +2662,42 @@ public class OffsetMetadataManagerTest {
assertEquals(List.of(), records);
}
+ @Test
+ public void testCleanupExpiredOffsetsWithPendingTransactionalOffsetsOnly()
{
+ GroupMetadataManager groupMetadataManager =
mock(GroupMetadataManager.class);
+ Group group = mock(Group.class);
+
+ OffsetMetadataManagerTestContext context = new
OffsetMetadataManagerTestContext.Builder()
+ .withGroupMetadataManager(groupMetadataManager)
+ .withOffsetsRetentionMinutes(1)
+ .build();
+
+ long commitTimestamp = context.time.milliseconds();
+
+ context.commitOffset("group-id", "foo", 0, 100L, 0, commitTimestamp);
+ context.commitOffset(10L, "group-id", "foo", 1, 101L, 0,
commitTimestamp + 500);
+
+ context.time.sleep(Duration.ofMinutes(1).toMillis());
+
+ when(groupMetadataManager.group("group-id")).thenReturn(group);
+ when(group.offsetExpirationCondition()).thenReturn(Optional.of(
+ new OffsetExpirationConditionImpl(offsetAndMetadata ->
offsetAndMetadata.commitTimestampMs)));
+ when(group.isSubscribedToTopic("foo")).thenReturn(false);
+
+ // foo-0 is expired, but the group is not deleted beacuse it has
pending transactional offset commits.
+ List<CoordinatorRecord> expectedRecords = List.of(
+
GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord("group-id", "foo",
0)
+ );
+ List<CoordinatorRecord> records = new ArrayList<>();
+ assertFalse(context.cleanupExpiredOffsets("group-id", records));
+ assertEquals(expectedRecords, records);
+
+ // No offsets are expired, and the group is still not deleted because
it has pending transactional offset commits.
+ records = new ArrayList<>();
+ assertFalse(context.cleanupExpiredOffsets("group-id", records));
+ assertEquals(List.of(), records);
+ }
+
private static OffsetFetchResponseData.OffsetFetchResponsePartitions
mkOffsetPartitionResponse(
int partition,
long offset,