lucasbru commented on code in PR #20760:
URL: https://github.com/apache/kafka/pull/20760#discussion_r2455501231
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroup.java:
##########
@@ -1120,4 +1130,54 @@ public void setLastAssignmentConfigs(Map<String, String>
lastAssignmentConfigs)
this.lastAssignmentConfigs.putAll(lastAssignmentConfigs);
}
}
+
+ /**
+ * Creates a validator that checks if the received member epoch is valid
for each partition's assignment epoch.
+ *
+ * @param member The member whose assignments are being validated.
+ * @param receivedMemberEpoch The received member epoch.
+ * @return A validator for per-partition validation.
+ */
+ private CommitPartitionValidator createAssignmentEpochValidator(
+ final StreamsGroupMember member,
+ int receivedMemberEpoch
+ ) {
+ // Retrieve topology once for all partitions - not per partition!
+ final StreamsTopology streamsTopology = topology.get().orElseThrow(()
->
+ new StaleMemberEpochException("Topology is not available for
offset commit validation."));
+
+ final TasksTupleWithEpochs assignedTasks = member.assignedTasks();
+ final TasksTupleWithEpochs tasksPendingRevocation =
member.tasksPendingRevocation();
+
+ return (topicName, topicId, partitionId) -> {
+ final StreamsGroupTopologyValue.Subtopology subtopology =
streamsTopology.sourceTopicMap().get(topicName);
+ if (subtopology == null) {
+ throw new StaleMemberEpochException("Topic " + topicName + "
is not in the topology.");
Review Comment:
This case is actually impossible right now because we do not allow updating
the topology yet. But I think this would be the correct behavior once we allow
changing the topology: We are trying to commit for a subtopology that does not
exist anymore, so we should fence the member.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]