dongnuo123 commented on code in PR #19497:
URL: https://github.com/apache/kafka/pull/19497#discussion_r2067408510
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -192,10 +193,165 @@ public OffsetMetadataManager build() {
*/
private final TimelineHashMap<Long, Offsets> pendingTransactionalOffsets;
+ private final OpenTransactions openTransactions;
+
/**
- * The open transactions (producer ids) keyed by group.
+ * Tracks open transactions (producer ids) by group id, topic name and
partition id.
+ * It is the responsiblity of the caller to update {@link
#pendingTransactionalOffsets}.
*/
- private final TimelineHashMap<String, TimelineHashSet<Long>>
openTransactionsByGroup;
+ private class OpenTransactions {
+ /**
+ * The open transactions (producer ids) keyed by group id, topic name
and partition id.
+ * Tracks whether partitions have any pending transactional offsets
that have not been deleted.
+ *
+ * Values in each level of the map will never be empty collections.
+ */
+ private final TimelineHashMap<String, TimelineHashMap<String,
TimelineHashMap<Integer, TimelineHashSet<Long>>>> openTransactionsByGroup;
+
+ private OpenTransactions() {
+ this.openTransactionsByGroup = new
TimelineHashMap<>(snapshotRegistry, 0);
+ }
+
+ /**
+ * Adds a producer id to the open transactions for the given group and
topic partition.
+ *
+ * @param groupId The group id.
+ * @param topic The topic name.
+ * @param partition The partition.
+ * @param producerId The producer id.
+ * @return {@code true} if the partition did not already have a
pending offset from the producer id.
+ */
+ private boolean add(String groupId, String topic, int partition, long
producerId) {
+ return openTransactionsByGroup
+ .computeIfAbsent(groupId, __ -> new
TimelineHashMap<>(snapshotRegistry, 1))
+ .computeIfAbsent(topic, __ -> new
TimelineHashMap<>(snapshotRegistry, 1))
+ .computeIfAbsent(partition, __ -> new
TimelineHashSet<>(snapshotRegistry, 1))
+ .add(producerId);
+ }
+
+ /**
+ * Clears all open transactions for the given group and topic
partition.
+ *
+ * @param groupId The group id.
+ * @param topic The topic name.
+ * @param partition The partition.
+ */
+ private void clear(String groupId, String topic, int partition) {
+ TimelineHashMap<String, TimelineHashMap<Integer,
TimelineHashSet<Long>>> openTransactionsByTopic =
+ openTransactionsByGroup.get(groupId);
+ if (openTransactionsByTopic == null) return;
+
+ TimelineHashMap<Integer, TimelineHashSet<Long>>
openTransactionsByPartition = openTransactionsByTopic.get(topic);
+ if (openTransactionsByPartition == null) return;
+
+ openTransactionsByPartition.remove(partition);
+
+ if (openTransactionsByPartition.isEmpty()) {
+ openTransactionsByTopic.remove(topic);
+ if (openTransactionsByTopic.isEmpty()) {
+ openTransactionsByGroup.remove(groupId);
+ }
+ }
+ }
+
+ /**
+ * Returns {@code true} if the given group has any pending
transactional offsets.
+ *
+ * @param groupId The group id.
+ * @return {@code true} if the given group has any pending
transactional offsets.
+ */
+ private boolean contains(String groupId) {
+ return openTransactionsByGroup.containsKey(groupId);
+ }
+
+ /**
+ * Returns {@code true} if the given group has any pending
transactional offsets for the given topic and partition.
+ *
+ * @param groupId The group id.
+ * @param topic The topic name.
+ * @param partition The partition.
+ * @return {@code true} if the given group has any pending
transactional offsets for the given topic and partition.
+ */
+ private boolean contains(String groupId, String topic, int partition) {
+ TimelineHashSet<Long> openTransactions = get(groupId, topic,
partition);
+ return openTransactions != null;
+ }
Review Comment:
Could we do something like
```
TimelineHashMap<String, TimelineHashMap<Integer,
TimelineHashSet<Long>>> topicMap =
openTransactionsByGroup.get(groupId);
if (topicMap == null) return false;
TimelineHashMap<Integer, TimelineHashSet<Long>> partitionMap =
topicMap.get(topic);
return partitionMap != null && partitionMap.containsKey(partition);
```
to avoid extra lookup?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]