jolshan commented on code in PR #15183: URL: https://github.com/apache/kafka/pull/15183#discussion_r1454285529
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -979,15 +981,30 @@ public void replayEndTransactionMarker( pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> { topicOffsets.forEach((topicName, partitionOffsets) -> { partitionOffsets.forEach((partitionId, offsetAndMetadata) -> { - log.debug("Committed transaction offset commit for producer id {} in group {} " + - "with topic {}, partition {}, and offset {}.", - producerId, groupId, topicName, partitionId, offsetAndMetadata); - offsets.put( + OffsetAndMetadata existingOffsetAndMetadata = offsets.get( groupId, topicName, - partitionId, - offsetAndMetadata + partitionId ); + + // We always keep the most recent committed offset when we have a mix of transactional and regular + // offset commits. Without preserving information of the commit record offset, compaction of the + // __consumer_offsets topic itself may result in the wrong offset commit being materialized. + if (existingOffsetAndMetadata == null || offsetAndMetadata.recordOffset > existingOffsetAndMetadata.recordOffset) { + log.debug("Committed transactional offset commit {} for producer id {} in group {} " + + "with topic {} and partition {}.", + offsetAndMetadata, producerId, groupId, topicName, partitionId); + offsets.put( Review Comment: if the offset is not the latest offset were we incorrectly saying the last committed offset was the latest one? ########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -979,15 +981,30 @@ public void replayEndTransactionMarker( pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> { topicOffsets.forEach((topicName, partitionOffsets) -> { partitionOffsets.forEach((partitionId, offsetAndMetadata) -> { - log.debug("Committed transaction offset commit for producer id {} in group {} " + - "with topic {}, partition {}, and offset {}.", - producerId, groupId, topicName, partitionId, offsetAndMetadata); - offsets.put( + OffsetAndMetadata existingOffsetAndMetadata = offsets.get( groupId, topicName, - partitionId, - offsetAndMetadata + partitionId ); + + // We always keep the most recent committed offset when we have a mix of transactional and regular + // offset commits. Without preserving information of the commit record offset, compaction of the + // __consumer_offsets topic itself may result in the wrong offset commit being materialized. + if (existingOffsetAndMetadata == null || offsetAndMetadata.recordOffset > existingOffsetAndMetadata.recordOffset) { + log.debug("Committed transactional offset commit {} for producer id {} in group {} " + + "with topic {} and partition {}.", + offsetAndMetadata, producerId, groupId, topicName, partitionId); + offsets.put( Review Comment: if the offset is not the latest offset were we incorrectly saying the last transactionally committed offset was the latest one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org