squah-confluent commented on code in PR #19497:
URL: https://github.com/apache/kafka/pull/19497#discussion_r2070405167
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -192,10 +193,165 @@ public OffsetMetadataManager build() {
*/
private final TimelineHashMap<Long, Offsets> pendingTransactionalOffsets;
+ private final OpenTransactions openTransactions;
+
/**
- * The open transactions (producer ids) keyed by group.
+ * Tracks open transactions (producer ids) by group id, topic name and
partition id.
+ * It is the responsiblity of the caller to update {@link
#pendingTransactionalOffsets}.
*/
- private final TimelineHashMap<String, TimelineHashSet<Long>>
openTransactionsByGroup;
+ private class OpenTransactions {
+ /**
+ * The open transactions (producer ids) keyed by group id, topic name
and partition id.
+ * Tracks whether partitions have any pending transactional offsets
that have not been deleted.
+ *
+ * Values in each level of the map will never be empty collections.
+ */
+ private final TimelineHashMap<String, TimelineHashMap<String,
TimelineHashMap<Integer, TimelineHashSet<Long>>>> openTransactionsByGroup;
Review Comment:
I tried a couple of approaches and settled on inlining
`hasPendingTransactionalOffsets` into the offset fetch path. This way we don't
do string comparisons of the group id and topic name for every partition and
also avoid allocations from a compound key.
```
Benchmark (partitionCount) (transactionCount)
Mode Cnt Score Error Units
TransactionalOffsetFetchBenchmark.run 4000 4000
avgt 5 0.129 ± 0.002 ms/op
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]