squah-confluent commented on code in PR #21508:
URL: https://github.com/apache/kafka/pull/21508#discussion_r2838826022
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -672,9 +673,23 @@ public CommitPartitionValidator validateOffsetCommit(
throw new UnsupportedVersionException("OffsetCommit version 9 or
above must be used " +
"by members using the modern group protocol");
}
+ // For members using the classic protocol, use strict epoch validation.
+ if (member.useClassicProtocol()) {
+ validateMemberEpoch(memberEpoch, member.memberEpoch(), true);
+ return CommitPartitionValidator.NO_OP;
+ }
- validateMemberEpoch(memberEpoch, member.memberEpoch(),
member.useClassicProtocol());
- return CommitPartitionValidator.NO_OP;
+ // For member using the consumer protocol
Review Comment:
nit:
```suggestion
// For members using the consumer protocol, the epoch must either
match the last epoch sent
// in a heartbeat or be greater than or equal to the partition's
assignment epoch.
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -672,9 +673,23 @@ public CommitPartitionValidator validateOffsetCommit(
throw new UnsupportedVersionException("OffsetCommit version 9 or
above must be used " +
"by members using the modern group protocol");
}
+ // For members using the classic protocol, use strict epoch validation.
Review Comment:
nit:
```suggestion
// For members using the classic protocol, use strict epoch
validation.
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -63,9 +65,9 @@ public static class Builder {
private Set<String> subscribedTopicNames = Set.of();
private String subscribedTopicRegex = "";
private String serverAssignorName = null;
- private Map<Uuid, Set<Integer>> assignedPartitions = Map.of();
- private Map<Uuid, Set<Integer>> partitionsPendingRevocation = Map.of();
private ConsumerGroupMemberMetadataValue.ClassicMemberMetadata
classicMemberMetadata = null;
+ private Map<Uuid, Map<Integer, Integer>> assignedPartitionsWithEpochs
= Map.of();
Review Comment:
nit: Could you minimize the diff churn in this file by making the
replacement methods/fields occupy the same position as the previous ones?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -299,47 +300,56 @@ private ConsumerGroupMember updateCurrentAssignment(
Map<Uuid, Set<Integer>> memberAssignedPartitions
) {
Set<Uuid> subscribedTopicIds = subscribedTopicIds();
-
// Reuse the original map if no topics need to be removed.
- Map<Uuid, Set<Integer>> newAssignedPartitions;
- Map<Uuid, Set<Integer>> newPartitionsPendingRevocation;
+ Map<Uuid, Map<Integer, Integer>> newAssignedPartitionsWithEpochs;
+ Map<Uuid, Map<Integer, Integer>>
newPartitionsPendingRevocationWithEpochs;
+ boolean changed = false;
Review Comment:
Is this refactor necessary?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -370,10 +380,13 @@ private ConsumerGroupMember computeNextAssignment(
Set<Uuid> subscribedTopicIds = subscribedTopicIds();
boolean hasUnreleasedPartitions = false;
- Map<Uuid, Set<Integer>> newAssignedPartitions = new HashMap<>();
- Map<Uuid, Set<Integer>> newPartitionsPendingRevocation = new
HashMap<>();
+ Map<Uuid, Map<Integer, Integer>> newAssignedPartitionsWithEpochs = new
HashMap<>();
+ Map<Uuid, Map<Integer, Integer>>
newPartitionsPendingRevocationWithEpochs = new HashMap<>();
Map<Uuid, Set<Integer>> newPartitionsPendingAssignment = new
HashMap<>();
+ // Get existing epochs from member
+ Map<Uuid, Map<Integer, Integer>> existingAssignedEpochs =
member.assignedPartitionsWithEpochs();
Review Comment:
Can we change the type of `memberAssignedPartitions` to `Map<topic id,
Map<partition, assignment epoch>>` and work with epoch maps instead?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -672,9 +673,23 @@ public CommitPartitionValidator validateOffsetCommit(
throw new UnsupportedVersionException("OffsetCommit version 9 or
above must be used " +
"by members using the modern group protocol");
}
+ // For members using the classic protocol, use strict epoch validation.
+ if (member.useClassicProtocol()) {
+ validateMemberEpoch(memberEpoch, member.memberEpoch(), true);
+ return CommitPartitionValidator.NO_OP;
+ }
- validateMemberEpoch(memberEpoch, member.memberEpoch(),
member.useClassicProtocol());
- return CommitPartitionValidator.NO_OP;
+ // For member using the consumer protocol
+ // Case 1: Strict epoch match
Review Comment:
nit: Do these cases correspond to the KIP? I would drop the comments
otherwise.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -190,21 +192,61 @@ public Builder setState(MemberState state) {
return this;
}
- public Builder setAssignedPartitions(Map<Uuid, Set<Integer>>
assignedPartitions) {
- this.assignedPartitions = assignedPartitions;
+ public Builder
setClassicMemberMetadata(ConsumerGroupMemberMetadataValue.ClassicMemberMetadata
classicMemberMetadata) {
+ this.classicMemberMetadata = classicMemberMetadata;
return this;
}
- public Builder setPartitionsPendingRevocation(Map<Uuid, Set<Integer>>
partitionsPendingRevocation) {
- this.partitionsPendingRevocation = partitionsPendingRevocation;
+ public Builder setAssignedPartitionsWithEpochs(Map<Uuid, Map<Integer,
Integer>> assignedPartitionsWithEpochs) {
+ this.assignedPartitionsWithEpochs = assignedPartitionsWithEpochs;
return this;
}
- public Builder
setClassicMemberMetadata(ConsumerGroupMemberMetadataValue.ClassicMemberMetadata
classicMemberMetadata) {
- this.classicMemberMetadata = classicMemberMetadata;
+ public Builder setPartitionsPendingRevocationWithEpochs(Map<Uuid,
Map<Integer, Integer>> partitionsPendingRevocationWithEpochs) {
+ this.partitionsPendingRevocationWithEpochs =
partitionsPendingRevocationWithEpochs;
+ return this;
+ }
+
+ /**
+ * Resets the assignment epochs to 0 for all assigned partitions.
+ * Used when a static member leaves, so that the rejoining member
Review Comment:
```suggestion
* Used when a static member leaves, so that the rejoining member's
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -672,9 +673,23 @@ public CommitPartitionValidator validateOffsetCommit(
throw new UnsupportedVersionException("OffsetCommit version 9 or
above must be used " +
"by members using the modern group protocol");
}
+ // For members using the classic protocol, use strict epoch validation.
+ if (member.useClassicProtocol()) {
+ validateMemberEpoch(memberEpoch, member.memberEpoch(), true);
+ return CommitPartitionValidator.NO_OP;
+ }
- validateMemberEpoch(memberEpoch, member.memberEpoch(),
member.useClassicProtocol());
- return CommitPartitionValidator.NO_OP;
+ // For member using the consumer protocol
+ // Case 1: Strict epoch match
+ if (memberEpoch == member.memberEpoch()) {
+ return CommitPartitionValidator.NO_OP;
+ }
+ // Case 2: Client epoch > broker epoch, which is an invalid request
+ if (memberEpoch > member.memberEpoch()) {
+ throw new StaleMemberEpochException(String.format("The received
member epoch %d is larger than "
+ + "the expected member epoch %d.", memberEpoch,
member.memberEpoch()));
+ }
+ return createAssignmentEpochValidator(member, memberEpoch);
Review Comment:
nit: Could we follow the streams implementation?
```suggestion
// Member epoch is older; validate against per-partition assignment
epochs.
return createAssignmentEpochValidator(member, memberEpoch);
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -672,9 +673,23 @@ public CommitPartitionValidator validateOffsetCommit(
throw new UnsupportedVersionException("OffsetCommit version 9 or
above must be used " +
"by members using the modern group protocol");
}
+ // For members using the classic protocol, use strict epoch validation.
+ if (member.useClassicProtocol()) {
+ validateMemberEpoch(memberEpoch, member.memberEpoch(), true);
+ return CommitPartitionValidator.NO_OP;
+ }
- validateMemberEpoch(memberEpoch, member.memberEpoch(),
member.useClassicProtocol());
- return CommitPartitionValidator.NO_OP;
+ // For member using the consumer protocol
+ // Case 1: Strict epoch match
+ if (memberEpoch == member.memberEpoch()) {
+ return CommitPartitionValidator.NO_OP;
+ }
+ // Case 2: Client epoch > broker epoch, which is an invalid request
+ if (memberEpoch > member.memberEpoch()) {
+ throw new StaleMemberEpochException(String.format("The received
member epoch %d is larger than "
+ + "the expected member epoch %d.", memberEpoch,
member.memberEpoch()));
Review Comment:
nit: Could we follow the streams implementation?
```suggestion
throw new StaleMemberEpochException(String.format("Received
member epoch %d is newer than "
+ "current member epoch %d.", memberEpoch,
member.memberEpoch()));
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -1184,17 +1240,24 @@ public static ConsumerGroup fromClassicGroup(
// assignment of the classic group. All the members are put in the
Stable state. If the classic
// group was in Preparing Rebalance or Completing Rebalance
states, the classic members are
// asked to rejoin the group to re-trigger a rebalance or collect
their assignments.
+ int memberEpoch = classicGroup.generationId();
+ // Convert assigned partitions to epochs map
+ Map<Uuid, Map<Integer, Integer>> assignedPartitionsWithEpochs =
assignedPartitions.entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ e -> e.getValue().stream().collect(Collectors.toMap(p ->
p, p -> memberEpoch))
+ ));
Review Comment:
nit: We could move this into a
`Builder.setAssignedPartitions(assignedPartitions, assignmentEpoch)`
convenience method.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -337,6 +406,50 @@ public Map<Uuid, Set<Integer>>
partitionsPendingRevocation() {
return partitionsPendingRevocation;
}
+ /**
+ * @return The epoch-annotated assigned partitions map.
+ */
+ public Map<Uuid, Map<Integer, Integer>> assignedPartitionsWithEpochs() {
+ return assignedPartitionsWithEpochs;
+ }
+
+ /**
+ * @return The epoch-annotated pending revocation partitions map.
+ */
+ public Map<Uuid, Map<Integer, Integer>>
partitionsPendingRevocationWithEpochs() {
+ return partitionsPendingRevocationWithEpochs;
+ }
+
+ /**
+ * Gets the assignment epoch for a specific partition.
+ *
+ * @param topicId The topic UUID.
+ * @param partitionId The partition index.
+ * @return The epoch at which the partition was assigned, or null if not
assigned.
+ */
+ public Integer getAssignmentEpoch(Uuid topicId, int partitionId) {
+ Map<Integer, Integer> partitionEpochs =
assignedPartitionsWithEpochs.get(topicId);
+ if (partitionEpochs != null) {
+ return partitionEpochs.get(partitionId);
+ }
+ return null;
+ }
+
+ /**
+ * Gets the assignment epoch for a partition pending revocation.
+ *
+ * @param topicId The topic UUID.
+ * @param partitionId The partition index.
+ * @return The epoch at which the partition was assigned, or null if not
pending revocation.
+ */
+ public Integer getPendingRevocationEpoch(Uuid topicId, int partitionId) {
Review Comment:
nit: The convention in this class is to omit `get` from method names.
```suggestion
public Integer pendingRevocationEpoch(Uuid topicId, int partitionId) {
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -190,21 +192,61 @@ public Builder setState(MemberState state) {
return this;
}
- public Builder setAssignedPartitions(Map<Uuid, Set<Integer>>
assignedPartitions) {
- this.assignedPartitions = assignedPartitions;
+ public Builder
setClassicMemberMetadata(ConsumerGroupMemberMetadataValue.ClassicMemberMetadata
classicMemberMetadata) {
+ this.classicMemberMetadata = classicMemberMetadata;
return this;
}
- public Builder setPartitionsPendingRevocation(Map<Uuid, Set<Integer>>
partitionsPendingRevocation) {
- this.partitionsPendingRevocation = partitionsPendingRevocation;
+ public Builder setAssignedPartitionsWithEpochs(Map<Uuid, Map<Integer,
Integer>> assignedPartitionsWithEpochs) {
+ this.assignedPartitionsWithEpochs = assignedPartitionsWithEpochs;
return this;
}
- public Builder
setClassicMemberMetadata(ConsumerGroupMemberMetadataValue.ClassicMemberMetadata
classicMemberMetadata) {
- this.classicMemberMetadata = classicMemberMetadata;
+ public Builder setPartitionsPendingRevocationWithEpochs(Map<Uuid,
Map<Integer, Integer>> partitionsPendingRevocationWithEpochs) {
+ this.partitionsPendingRevocationWithEpochs =
partitionsPendingRevocationWithEpochs;
+ return this;
+ }
+
+ /**
+ * Resets the assignment epochs to 0 for all assigned partitions.
+ * Used when a static member leaves, so that the rejoining member
+ * partitions will be assigned from epoch 0 to the new member ID.
+ * All commits using the old member ID will be fenced.
+ */
+ public Builder resetAssignedPartitionsEpochsToZero() {
+ if (this.assignedPartitionsWithEpochs.isEmpty()) {
+ return this;
+ }
+ Map<Uuid, Map<Integer, Integer>> resetEpochs = new HashMap<>();
+ for (Map.Entry<Uuid, Map<Integer, Integer>> entry :
this.assignedPartitionsWithEpochs.entrySet()) {
+ Map<Integer, Integer> partitionEpochs = new HashMap<>();
+ for (Integer partitionId : entry.getValue().keySet()) {
+ partitionEpochs.put(partitionId, 0);
+ }
+ resetEpochs.put(entry.getKey(),
Collections.unmodifiableMap(partitionEpochs));
+ }
+ this.assignedPartitionsWithEpochs =
Collections.unmodifiableMap(resetEpochs);
return this;
}
+ private static Map<Uuid, Map<Integer, Integer>>
assignmentWithEpochsFromTopicPartitions(
+ List<ConsumerGroupCurrentMemberAssignmentValue.TopicPartitions>
topicPartitions,
+ int defaultEpoch
+ ) {
Review Comment:
nit: This method should replace `Utils.assignmentFromTopicPartitions` and
live in `Utils`.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -299,47 +300,56 @@ private ConsumerGroupMember updateCurrentAssignment(
Map<Uuid, Set<Integer>> memberAssignedPartitions
) {
Set<Uuid> subscribedTopicIds = subscribedTopicIds();
-
Review Comment:
nit: stray newline change
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -337,6 +406,50 @@ public Map<Uuid, Set<Integer>>
partitionsPendingRevocation() {
return partitionsPendingRevocation;
}
+ /**
+ * @return The epoch-annotated assigned partitions map.
+ */
+ public Map<Uuid, Map<Integer, Integer>> assignedPartitionsWithEpochs() {
+ return assignedPartitionsWithEpochs;
+ }
+
+ /**
+ * @return The epoch-annotated pending revocation partitions map.
+ */
+ public Map<Uuid, Map<Integer, Integer>>
partitionsPendingRevocationWithEpochs() {
+ return partitionsPendingRevocationWithEpochs;
+ }
+
+ /**
+ * Gets the assignment epoch for a specific partition.
+ *
+ * @param topicId The topic UUID.
+ * @param partitionId The partition index.
+ * @return The epoch at which the partition was assigned, or null if not
assigned.
+ */
+ public Integer getAssignmentEpoch(Uuid topicId, int partitionId) {
Review Comment:
nit: The convention in this class is to omit `get` from method names.
```suggestion
public Integer assignmentEpoch(Uuid topicId, int partitionId) {
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroup.java:
##########
@@ -837,6 +852,47 @@ private void validateMemberEpoch(
}
}
+ /**
+ * Creates a validator that checks per-partition assignment epochs.
+ * A commit is rejected if the partition is not assigned to the member
+ * or if the received client-side epoch is older than the partition's
assignment epoch(KIP-1251).
+ *
+ * @param member The consumer group member.
+ * @param receivedMemberEpoch The member epoch from the offset commit
request.
+ * @return A validator that checks each partition's assignment epoch.
+ */
+ private CommitPartitionValidator createAssignmentEpochValidator(
+ ConsumerGroupMember member,
+ int receivedMemberEpoch
+ ) {
+ return (topicName, topicId, partitionId) -> {
+ // Check if the partition is in the assigned partitions.
+ // If not found in assigned, check partitions pending revocation.
+ Integer assignmentEpoch = member.getAssignmentEpoch(topicId,
partitionId);
+ if (assignmentEpoch == null) {
+ assignmentEpoch = member.getPendingRevocationEpoch(topicId,
partitionId);
+ }
+
+ // If client-side epoch != broker-side epoch, and the partition is
not assigned to this member, reject.
+ if (assignmentEpoch == null) {
+ throw new StaleMemberEpochException(String.format(
+ "Partition %s-%d is not assigned or pending revocation for
member %s. " +
+ "Committing unassigned partitions is only allowed when
member epoch matches exactly " +
+ "(received: %d, current: %d).",
+ topicName, partitionId, member.memberId(),
receivedMemberEpoch, member.memberEpoch()));
+ }
+
+ // If the received epoch is older than when this partition was
assigned,
+ // It is a zombie commit and should be rejected.
+ if (receivedMemberEpoch < assignmentEpoch) {
+ throw new StaleMemberEpochException(
+ String.format("The received member epoch %d is older than
the assignment epoch %d for partition %s-%d.",
+ receivedMemberEpoch, assignmentEpoch, topicName,
partitionId)
+ );
+ }
+ };
+ }
Review Comment:
nit: Could we follow the streams implementation?
```suggestion
/**
* Creates a validator that checks if the received member epoch is valid
for each partition's assignment epoch.
* A commit is rejected if the partition is not assigned to the member
* or if the received client-side epoch is older than the partition's
assignment epoch (KIP-1251).
*
* @param member The member whose assignments are being validated.
* @param receivedMemberEpoch The received member epoch.
* @return A validator for per-partition validation.
*/
private CommitPartitionValidator createAssignmentEpochValidator(
ConsumerGroupMember member,
int receivedMemberEpoch
) {
return (topicName, topicId, partitionId) -> {
// Search for the partition in assigned partitions, then in
partitions pending revocation
Integer assignmentEpoch = member.getAssignmentEpoch(topicId,
partitionId);
if (assignmentEpoch == null) {
assignmentEpoch = member.getPendingRevocationEpoch(topicId,
partitionId);
}
if (assignmentEpoch == null) {
throw new StaleMemberEpochException(String.format(
"Partition %s-%d is not assigned or pending revocation
for member.",
topicName, partitionId));
}
if (receivedMemberEpoch < assignmentEpoch) {
throw new StaleMemberEpochException(
String.format("Received member epoch %d is older than
assignment epoch %d for partition %s-%d.",
receivedMemberEpoch, assignmentEpoch, topicName,
partitionId)
);
}
};
}
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -407,19 +422,35 @@ private ConsumerGroupMember computeNextAssignment(
!member.partitionsPendingRevocation().getOrDefault(topicId,
Set.of()).contains(partitionId)
) || hasUnreleasedPartitions;
+ // Build epochs map for assigned partitions (preserve existing
epochs)
if (!assignedPartitions.isEmpty()) {
- newAssignedPartitions.put(topicId, assignedPartitions);
+ Map<Integer, Integer> partitionEpochs = new HashMap<>();
+ for (Integer partitionId : assignedPartitions) {
+ partitionEpochs.put(partitionId,
existingTopicEpochs.getOrDefault(partitionId, memberEpoch));
Review Comment:
The default is never used because `assignedPartitions` comes from
`member.assignedPartitions()`.
```suggestion
partitionEpochs.put(partitionId,
existingTopicEpochs.get(partitionId));
```
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/ConsumerGroupMember.java:
##########
@@ -66,6 +69,8 @@ public static class Builder {
private Map<Uuid, Set<Integer>> assignedPartitions = Map.of();
private Map<Uuid, Set<Integer>> partitionsPendingRevocation = Map.of();
private ConsumerGroupMemberMetadataValue.ClassicMemberMetadata
classicMemberMetadata = null;
+ private Map<Uuid, Map<Integer, Integer>> assignedPartitionsWithEpochs
= Map.of();
Review Comment:
> I would leave `assignedPartitions` in place for now but would like to see
it removed eventually.
Actually if we remove it now, we can call the new methods
`assignedPartitions` and `partitionsPendingRevocation` without conflict and we
don't need to invent a view to satisfy the base class.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/modern/consumer/CurrentAssignmentBuilder.java:
##########
@@ -299,47 +300,56 @@ private ConsumerGroupMember updateCurrentAssignment(
Map<Uuid, Set<Integer>> memberAssignedPartitions
) {
Set<Uuid> subscribedTopicIds = subscribedTopicIds();
-
// Reuse the original map if no topics need to be removed.
- Map<Uuid, Set<Integer>> newAssignedPartitions;
- Map<Uuid, Set<Integer>> newPartitionsPendingRevocation;
+ Map<Uuid, Map<Integer, Integer>> newAssignedPartitionsWithEpochs;
+ Map<Uuid, Map<Integer, Integer>>
newPartitionsPendingRevocationWithEpochs;
+ boolean changed = false;
+
if (subscribedTopicIds.isEmpty() &&
member.partitionsPendingRevocation().isEmpty()) {
- newAssignedPartitions = Map.of();
- newPartitionsPendingRevocation = memberAssignedPartitions;
+ newAssignedPartitionsWithEpochs = Map.of();
+ // Move all assigned to pending revocation with their epochs
+ newPartitionsPendingRevocationWithEpochs = new
HashMap<>(member.assignedPartitionsWithEpochs());
+ changed = true;
} else {
- newAssignedPartitions = memberAssignedPartitions;
- newPartitionsPendingRevocation = new
HashMap<>(member.partitionsPendingRevocation());
+ newAssignedPartitionsWithEpochs = new
HashMap<>(member.assignedPartitionsWithEpochs());
+ newPartitionsPendingRevocationWithEpochs = new
HashMap<>(member.partitionsPendingRevocationWithEpochs());
for (Map.Entry<Uuid, Set<Integer>> entry :
memberAssignedPartitions.entrySet()) {
if (!subscribedTopicIds.contains(entry.getKey())) {
- if (newAssignedPartitions == memberAssignedPartitions) {
- newAssignedPartitions = new
HashMap<>(memberAssignedPartitions);
- newPartitionsPendingRevocation = new
HashMap<>(member.partitionsPendingRevocation());
+ changed = true;
+ Uuid topicId = entry.getKey();
+ Map<Integer, Integer> removedEpochs =
newAssignedPartitionsWithEpochs.remove(topicId);
+ if (removedEpochs != null) {
+ newPartitionsPendingRevocationWithEpochs.merge(
+ topicId,
+ removedEpochs,
+ (existing, additional) -> {
+ Map<Integer, Integer> merged = new
HashMap<>(existing);
+ merged.putAll(additional);
+ return merged;
+ }
+ );
}
- newAssignedPartitions.remove(entry.getKey());
- newPartitionsPendingRevocation.merge(
- entry.getKey(),
- entry.getValue(),
- (existing, additional) -> {
- existing = new HashSet<>(existing);
- existing.addAll(additional);
- return existing;
- }
- );
}
}
}
- if (newAssignedPartitions == memberAssignedPartitions) {
+ if (!changed) {
// If no partitions were removed, we can return the member as is.
return member;
}
+ Map<Uuid, Set<Integer>> newPartitionsPendingRevocation =
newPartitionsPendingRevocationWithEpochs.entrySet().stream()
+ .collect(Collectors.toMap(
+ Map.Entry::getKey,
+ e -> e.getValue().keySet()
+ ));
Review Comment:
Can we try updating `ownsRevokedPartitions` to accept a `Map<topic id,
Map<partition, assignment epoch>>` instead?
##########
group-coordinator/src/main/resources/common/message/ConsumerGroupCurrentMemberAssignmentValue.json:
##########
@@ -36,7 +36,10 @@
{ "name": "TopicId", "type": "uuid", "versions": "0+",
"about": "The topic Id." },
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
- "about": "The partition Ids." }
+ "about": "The partition Ids." },
+ { "name": "AssignmentEpochs", "versions": "0+", "nullableVersions": "0+",
+ "taggedVersions": "0+", "tag": 0, "type": "[]int32", "default": null,
+ "about": "The epoch at which each partition was assigned to this
member. Aligned with Partitions array. Used to validate offset commits
(KIP-1251)." }
Review Comment:
nit: Could we follow the streams implementation?
```suggestion
"about": "The epoch at which the partition was assigned to the
member. Used to fence zombie commits requests. Of the same length as
Partitions." }
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]