dajac commented on code in PR #15183:
URL: https://github.com/apache/kafka/pull/15183#discussion_r1457474626


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -979,15 +981,30 @@ public void replayEndTransactionMarker(
             pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> {
                 topicOffsets.forEach((topicName, partitionOffsets) -> {
                     partitionOffsets.forEach((partitionId, offsetAndMetadata) 
-> {
-                        log.debug("Committed transaction offset commit for 
producer id {} in group {} " +
-                            "with topic {}, partition {}, and offset {}.",
-                            producerId, groupId, topicName, partitionId, 
offsetAndMetadata);
-                        offsets.put(
+                        OffsetAndMetadata existingOffsetAndMetadata = 
offsets.get(
                             groupId,
                             topicName,
-                            partitionId,
-                            offsetAndMetadata
+                            partitionId
                         );
+
+                        // We always keep the most recent committed offset 
when we have a mix of transactional and regular
+                        // offset commits. Without preserving information of 
the commit record offset, compaction of the
+                        // __consumer_offsets topic itself may result in the 
wrong offset commit being materialized.
+                        if (existingOffsetAndMetadata == null || 
offsetAndMetadata.recordOffset > existingOffsetAndMetadata.recordOffset) {
+                            log.debug("Committed transactional offset commit 
{} for producer id {} in group {} " +
+                                "with topic {} and partition {}.",
+                                offsetAndMetadata, producerId, groupId, 
topicName, partitionId);
+                            offsets.put(

Review Comment:
   yeah, that's right. the overall idea is to ensure that the last committed 
offset in the log is the one meterialized in memory. imagine the following 
sequence:
   1. committed transactional offset 100 as part of transaction X. it goes to 
the pending structure.
   2. committed regular offset 101. it goes to the main structure.
   3. transaction X is committed. putting 100 in the main structure is wrong 
here because the last one in the log is 101.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to