jolshan commented on code in PR #14845:
URL: https://github.com/apache/kafka/pull/14845#discussion_r1414621182


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -180,7 +185,78 @@ public OffsetMetadataManager build() {
     /**
      * The offsets keyed by group id, topic name and partition id.
      */
-    private final TimelineHashMap<String, TimelineHashMap<String, 
TimelineHashMap<Integer, OffsetAndMetadata>>> offsetsByGroup;
+    private final Offsets offsets;
+
+    /**
+     * The offsets keyed by producer id, group id, topic name and partition 
id. This
+     * structure holds all the transactional offsets that are part of ongoing 
transactions.
+     * When the transaction is committed, they are transferred to the 
offsetsByGroup; when
+     * the transaction is aborted, they are removed.
+     */
+    private final TimelineHashMap<Long, Offsets> pendingTransactionalOffsets;
+
+    private class Offsets {
+        private final TimelineHashMap<String, TimelineHashMap<String, 
TimelineHashMap<Integer, OffsetAndMetadata>>> offsetsByGroup;
+
+        private Offsets() {
+            this.offsetsByGroup = new TimelineHashMap<>(snapshotRegistry, 0);
+        }
+
+        private OffsetAndMetadata get(
+            String groupId,
+            String topic,
+            int partition
+        ) {
+            TimelineHashMap<String, TimelineHashMap<Integer, 
OffsetAndMetadata>> topicOffsets = offsetsByGroup.get(groupId);
+            if (topicOffsets == null) {
+                return null;
+            } else {
+                TimelineHashMap<Integer, OffsetAndMetadata> partitionOffsets = 
topicOffsets.get(topic);
+                if (partitionOffsets == null) {
+                    return null;
+                } else {
+                    return partitionOffsets.get(partition);
+                }
+            }
+        }
+
+        private OffsetAndMetadata put(
+            String groupId,
+            String topic,
+            int partition,
+            OffsetAndMetadata offsetAndMetadata
+        ) {
+            TimelineHashMap<String, TimelineHashMap<Integer, 
OffsetAndMetadata>> topicOffsets = offsetsByGroup
+                .computeIfAbsent(groupId, __ -> new 
TimelineHashMap<>(snapshotRegistry, 0));
+            TimelineHashMap<Integer, OffsetAndMetadata> partitionOffsets = 
topicOffsets
+                .computeIfAbsent(topic, __ -> new 
TimelineHashMap<>(snapshotRegistry, 0));
+            return partitionOffsets.put(partition, offsetAndMetadata);
+        }
+
+        private OffsetAndMetadata remove(
+            String groupId,
+            String topic,
+            int partition
+        ) {
+            TimelineHashMap<String, TimelineHashMap<Integer, 
OffsetAndMetadata>> topicOffsets = offsetsByGroup.get(groupId);
+            if (topicOffsets == null)
+                return null;
+
+            TimelineHashMap<Integer, OffsetAndMetadata> partitionOffsets = 
topicOffsets.get(topic);
+            if (partitionOffsets == null)
+                return null;
+
+            OffsetAndMetadata removedValue = 
partitionOffsets.remove(partition);
+
+            if (partitionOffsets.isEmpty())

Review Comment:
   Was about to ask if we remove the nested maps, but now I see we do so here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to