dajac commented on code in PR #15183: URL: https://github.com/apache/kafka/pull/15183#discussion_r1457474626
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -979,15 +981,30 @@ public void replayEndTransactionMarker( pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> { topicOffsets.forEach((topicName, partitionOffsets) -> { partitionOffsets.forEach((partitionId, offsetAndMetadata) -> { - log.debug("Committed transaction offset commit for producer id {} in group {} " + - "with topic {}, partition {}, and offset {}.", - producerId, groupId, topicName, partitionId, offsetAndMetadata); - offsets.put( + OffsetAndMetadata existingOffsetAndMetadata = offsets.get( groupId, topicName, - partitionId, - offsetAndMetadata + partitionId ); + + // We always keep the most recent committed offset when we have a mix of transactional and regular + // offset commits. Without preserving information of the commit record offset, compaction of the + // __consumer_offsets topic itself may result in the wrong offset commit being materialized. + if (existingOffsetAndMetadata == null || offsetAndMetadata.recordOffset > existingOffsetAndMetadata.recordOffset) { + log.debug("Committed transactional offset commit {} for producer id {} in group {} " + + "with topic {} and partition {}.", + offsetAndMetadata, producerId, groupId, topicName, partitionId); + offsets.put( Review Comment: yeah, that's right. the overall idea is to ensure that the last committed offset in the log is the one meterialized in memory. imagine the following sequence: 1. committed transactional offset 100 as part of transaction X. it goes to the pending structure. 2. committed regular offset 101. it goes to the main structure. 3. transaction X is committed. putting 100 in the main structure is wrong here because the last one in the log is 101. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org