jolshan commented on code in PR #14845: URL: https://github.com/apache/kafka/pull/14845#discussion_r1414621182
########## group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java: ########## @@ -180,7 +185,78 @@ public OffsetMetadataManager build() { /** * The offsets keyed by group id, topic name and partition id. */ - private final TimelineHashMap<String, TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>>> offsetsByGroup; + private final Offsets offsets; + + /** + * The offsets keyed by producer id, group id, topic name and partition id. This + * structure holds all the transactional offsets that are part of ongoing transactions. + * When the transaction is committed, they are transferred to the offsetsByGroup; when + * the transaction is aborted, they are removed. + */ + private final TimelineHashMap<Long, Offsets> pendingTransactionalOffsets; + + private class Offsets { + private final TimelineHashMap<String, TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>>> offsetsByGroup; + + private Offsets() { + this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0); + } + + private OffsetAndMetadata get( + String groupId, + String topic, + int partition + ) { + TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> topicOffsets = offsetsByGroup.get(groupId); + if (topicOffsets == null) { + return null; + } else { + TimelineHashMap<Integer, OffsetAndMetadata> partitionOffsets = topicOffsets.get(topic); + if (partitionOffsets == null) { + return null; + } else { + return partitionOffsets.get(partition); + } + } + } + + private OffsetAndMetadata put( + String groupId, + String topic, + int partition, + OffsetAndMetadata offsetAndMetadata + ) { + TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> topicOffsets = offsetsByGroup + .computeIfAbsent(groupId, __ -> new TimelineHashMap<>(snapshotRegistry, 0)); + TimelineHashMap<Integer, OffsetAndMetadata> partitionOffsets = topicOffsets + .computeIfAbsent(topic, __ -> new TimelineHashMap<>(snapshotRegistry, 0)); + return partitionOffsets.put(partition, offsetAndMetadata); + } + + private OffsetAndMetadata remove( + String groupId, + String topic, + int partition + ) { + TimelineHashMap<String, TimelineHashMap<Integer, OffsetAndMetadata>> topicOffsets = offsetsByGroup.get(groupId); + if (topicOffsets == null) + return null; + + TimelineHashMap<Integer, OffsetAndMetadata> partitionOffsets = topicOffsets.get(topic); + if (partitionOffsets == null) + return null; + + OffsetAndMetadata removedValue = partitionOffsets.remove(partition); + + if (partitionOffsets.isEmpty()) Review Comment: Was about to ask if we remove the nested maps, but now I see we do so here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org