shaan150 commented on code in PR #19497:
URL: https://github.com/apache/kafka/pull/19497#discussion_r2047739805
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -650,24 +658,18 @@ public int deleteAllOffsets(
// Delete all the pending transactional offsets too. Here we only
write a tombstone
// if the topic-partition was not in the main storage because we don't
need to write
// two consecutive tombstones.
- TimelineHashSet<Long> openTransactions =
openTransactionsByGroup.get(groupId);
- if (openTransactions != null) {
- openTransactions.forEach(producerId -> {
- Offsets pendingOffsets =
pendingTransactionalOffsets.get(producerId);
- if (pendingOffsets != null) {
- TimelineHashMap<String, TimelineHashMap<Integer,
OffsetAndMetadata>> pendingGroupOffsets =
- pendingOffsets.offsetsByGroup.get(groupId);
- if (pendingGroupOffsets != null) {
- pendingGroupOffsets.forEach((topic,
offsetsByPartition) -> {
- offsetsByPartition.keySet().forEach(partition -> {
- if (!hasCommittedOffset(groupId, topic,
partition)) {
-
records.add(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord(groupId,
topic, partition));
- numDeletedOffsets.getAndIncrement();
- }
- });
- });
- }
- }
+ TimelineHashMap<String, TimelineHashMap<Integer,
TimelineHashSet<Long>>> openTransactionsByTopic =
+ openTransactionsByGroupTopicAndPartition.get(groupId);
+ if (openTransactionsByTopic != null) {
Review Comment:
There are three nested for loops, this can be avoided.
Something like the following might help (was done for quickness)
```java
var openTransactionsByTopic =
openTransactionsByGroupTopicAndPartition.get(groupId);
if (openTransactionsByTopic == null) return;
for (var topicEntry : openTransactionsByTopic.entrySet()) {
String topic = topicEntry.getKey();
var openTransactionsByPartition = topicEntry.getValue();
for (var partitionEntry : openTransactionsByPartition.entrySet()) {
int partition = partitionEntry.getKey();
// Single check per partition
if (!hasCommittedOffset(groupId, topic, partition)) {
records.add(GroupCoordinatorRecordHelpers.newOffsetCommitTombstoneRecord(groupId,
topic, partition));
numDeletedOffsets.getAndIncrement();
}
}
}
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -1005,21 +1005,41 @@ public void replay(
openTransactionsByGroup
.computeIfAbsent(groupId, __ -> new
TimelineHashSet<>(snapshotRegistry, 1))
.add(producerId);
+ openTransactionsByGroupTopicAndPartition
+ .computeIfAbsent(groupId, __ -> new
TimelineHashMap<>(snapshotRegistry, 1))
+ .computeIfAbsent(topic, __ -> new
TimelineHashMap<>(snapshotRegistry, 1))
+ .computeIfAbsent(partition, __ -> new
TimelineHashSet<>(snapshotRegistry, 1))
+ .add(producerId);
}
} else {
if (offsets.remove(groupId, topic, partition) != null) {
metrics.decrementNumOffsets();
}
// Remove all the pending offset commits related to the tombstone.
- TimelineHashSet<Long> openTransactions =
openTransactionsByGroup.get(groupId);
- if (openTransactions != null) {
- openTransactions.forEach(openProducerId -> {
- Offsets pendingOffsets =
pendingTransactionalOffsets.get(openProducerId);
- if (pendingOffsets != null) {
- pendingOffsets.remove(groupId, topic, partition);
+ TimelineHashMap<String, TimelineHashMap<Integer,
TimelineHashSet<Long>>> openTransactionsByTopic =
+ openTransactionsByGroupTopicAndPartition.get(groupId);
+ if (openTransactionsByTopic != null) {
+ TimelineHashMap<Integer, TimelineHashSet<Long>>
openTransactionsByPartition = openTransactionsByTopic.get(topic);
+ if (openTransactionsByPartition != null) {
+ TimelineHashSet<Long> openTransactions =
openTransactionsByPartition.get(partition);
+ if (openTransactions != null) {
+ openTransactions.forEach(openProducerId -> {
+ Offsets pendingOffsets =
pendingTransactionalOffsets.get(openProducerId);
+ if (pendingOffsets != null) {
+ pendingOffsets.remove(groupId, topic,
partition);
+ }
+ });
+
+ openTransactionsByPartition.remove(partition);
+ if (openTransactionsByPartition.isEmpty()) {
+ openTransactionsByTopic.remove(topic);
+ }
+ if (openTransactionsByTopic.isEmpty()) {
+
openTransactionsByGroupTopicAndPartition.remove(groupId);
+ }
}
- });
+ }
}
Review Comment:
This whole area is pretty badly structured, and maintainability will prove
difficult, potentially fragile.
I would split this out into methods, and tidy some of it up. This is an
example on how i'd put it, this code isn't tested baring mind and is more of a
boiler plate example:
```java
public void clearOpenTransactions(final String groupId, final String
topic, inal int partition) {
final TimelineHashMap<Integer, TimelineHashSet<Long>> partitionMap =
getPartitionMap(groupId, topic);
if (partitionMap == null) return;
final TimelineHashSet<Long> openProducerIds =
partitionMap.get(partition);
if (openProducerIds == null) return;
removePendingOffsets(openProducerIds, groupId, topic, partition);
partitionMap.remove(partition);
cleanupIfEmpty(partitionMap, getTopicMap(groupId), topic);
cleanupIfEmpty(getTopicMap(groupId),
openTransactionsByGroupTopicAndPartition, groupId);
}
private TimelineHashMap<Integer, TimelineHashSet<Long>>
getPartitionMap(final String groupId, final String topic) {
final TimelineHashMap<String, TimelineHashMap<Integer,
TimelineHashSet<Long>>> topicMap =
openTransactionsByGroupTopicAndPartition.get(groupId);
if (topicMap == null) return null;
return topicMap.get(topic);
}
private TimelineHashMap<String, TimelineHashMap<Integer,
TimelineHashSet<Long>>> getTopicMap(final String groupId) {
return openTransactionsByGroupTopicAndPartition.get(groupId);
}
private void removePendingOffsets(
final Set<Long> producerIds,
final String groupId,
final String topic,
final int partition) {
for (final Long producerId : producerIds) {
final Offsets offsets =
pendingTransactionalOffsets.get(producerId);
if (offsets != null) {
offsets.remove(groupId, topic, partition);
}
}
}
private <K, V extends Map<?, ?>> void cleanupIfEmpty(final V innerMap,
final Map<K, V> outerMap, final K key) {
if (innerMap != null && innerMap.isEmpty()) {
outerMap.remove(key);
}
}
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -194,9 +194,16 @@ public OffsetMetadataManager build() {
/**
* The open transactions (producer ids) keyed by group.
+ * Tracks whether groups have any open transactions.
*/
private final TimelineHashMap<String, TimelineHashSet<Long>>
openTransactionsByGroup;
+ /**
+ * The open transactions (producer ids) keyed by group id, topic name and
partition id.
+ * Tracks whether partitions have any pending transactional offsets.
+ */
+ private final TimelineHashMap<String, TimelineHashMap<String,
TimelineHashMap<Integer, TimelineHashSet<Long>>>>
openTransactionsByGroupTopicAndPartition;
Review Comment:
I understand your reasoning behind this addition, but it does introduce
significant complexity and carries a high risk of data duplication if not
carefully managed. While I appreciate this may serve as a stop-gap for now, I
do think it’s important that this is revisited going forward. A more
maintainable long-term solution, perhaps wrapping this logic in a dedicated
structure or abstraction, would help reduce coupling and make it easier to
evolve the logic cleanly.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/OffsetMetadataManager.java:
##########
@@ -1043,14 +1064,39 @@ public void replayEndTransactionMarker(
return;
}
- pendingOffsets.offsetsByGroup.keySet().forEach(groupId -> {
- TimelineHashSet<Long> openTransactions =
openTransactionsByGroup.get(groupId);
- if (openTransactions != null) {
- openTransactions.remove(producerId);
- if (openTransactions.isEmpty()) {
+ pendingOffsets.offsetsByGroup.forEach((groupId, topicOffsets) -> {
+ TimelineHashSet<Long> groupTransactions =
openTransactionsByGroup.get(groupId);
+ if (groupTransactions != null) {
+ groupTransactions.remove(producerId);
+ if (groupTransactions.isEmpty()) {
openTransactionsByGroup.remove(groupId);
}
}
+
+ TimelineHashMap<String, TimelineHashMap<Integer,
TimelineHashSet<Long>>> openTransactionsByTopic =
+ openTransactionsByGroupTopicAndPartition.get(groupId);
+ if (openTransactionsByTopic == null) return;
+
+ topicOffsets.forEach((topic, partitionOffsets) -> {
+ TimelineHashMap<Integer, TimelineHashSet<Long>>
openTransactionsByPartition = openTransactionsByTopic.get(topic);
+ if (openTransactionsByPartition == null) return;
+
+ partitionOffsets.keySet().forEach(partitionId -> {
+ TimelineHashSet<Long> partitionTransactions =
openTransactionsByPartition.get(partitionId);
+ if (partitionTransactions != null) {
+ partitionTransactions.remove(producerId);
+ if (partitionTransactions.isEmpty()) {
+ openTransactionsByPartition.remove(partitionId);
+ }
+ if (openTransactionsByPartition.isEmpty()) {
+ openTransactionsByTopic.remove(topic);
+ }
+ if (openTransactionsByTopic.isEmpty()) {
+
openTransactionsByGroupTopicAndPartition.remove(groupId);
+ }
+ }
+ });
+ });
});
Review Comment:
Previous point stands, how i'd do it would be something like the below
attempt, the cleanupIfEmpty method from earlier here could be used if
implemented
```java
public void clearOpenTransactionsForProducer(final long producerId, final
PendingOffsets pendingOffsets) {
for (final Map.Entry<String, Map<String, Map<Integer,
OffsetAndMetadata>>> groupEntry : pendingOffsets.offsetsByGroup.entrySet()) {
final String groupId = groupEntry.getKey();
final Map<String, Map<Integer, OffsetAndMetadata>> topicOffsets =
groupEntry.getValue();
removeProducerFromGroupTransactions(groupId, producerId);
final TimelineHashMap<String, TimelineHashMap<Integer,
TimelineHashSet<Long>>> topicMap =
openTransactionsByGroupTopicAndPartition.get(groupId);
if (topicMap == null) continue;
processTopicOffsets(producerId, groupId, topicOffsets, topicMap);
cleanupIfEmpty(topicMap, openTransactionsByGroupTopicAndPartition,
groupId);
}
}
private void removeProducerFromGroupTransactions(final String groupId, final
long producerId) {
final TimelineHashSet<Long> groupTransactions =
openTransactionsByGroup.get(groupId);
if (groupTransactions == null) return;
groupTransactions.remove(producerId);
if (groupTransactions.isEmpty()) {
openTransactionsByGroup.remove(groupId);
}
}
private void processTopicOffsets(
final long producerId,
final String groupId,
final Map<String, Map<Integer, OffsetAndMetadata>> topicOffsets,
final TimelineHashMap<String, TimelineHashMap<Integer,
TimelineHashSet<Long>>> topicMap) {
for (final Map.Entry<String, Map<Integer, OffsetAndMetadata>> topicEntry
: topicOffsets.entrySet()) {
final String topic = topicEntry.getKey();
final Map<Integer, OffsetAndMetadata> partitionOffsets =
topicEntry.getValue();
final TimelineHashMap<Integer, TimelineHashSet<Long>> partitionMap =
topicMap.get(topic);
if (partitionMap == null) continue;
for (final Integer partitionId : partitionOffsets.keySet()) {
removeProducerFromPartitionMap(producerId, partitionId,
partitionMap);
}
cleanupIfEmpty(partitionMap, topicMap, topic);
}
}
private void removeProducerFromPartitionMap(
final long producerId,
final int partitionId,
final TimelineHashMap<Integer, TimelineHashSet<Long>> partitionMap) {
final TimelineHashSet<Long> partitionTransactions =
partitionMap.get(partitionId);
if (partitionTransactions == null) return;
partitionTransactions.remove(producerId);
if (partitionTransactions.isEmpty()) {
partitionMap.remove(partitionId);
}
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]