lucasbru commented on code in PR #19114:
URL: https://github.com/apache/kafka/pull/19114#discussion_r1998207274
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3338,9 +3342,9 @@ private boolean hasMemberSubscriptionChanged(
}
/**
- * Creates the member subscription record if the updatedMember is
different from
- * the old member. Returns true if the topologyEpoch of the member has
changed,
- * which is always true when a member is first created.
+ * Creates the member metadatarecord record if the updatedMember is
different from
+ * the old member. Returns true if the metadata has changed, which is
always true
Review Comment:
Done
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3338,9 +3342,9 @@ private boolean hasMemberSubscriptionChanged(
}
/**
- * Creates the member subscription record if the updatedMember is
different from
- * the old member. Returns true if the topologyEpoch of the member has
changed,
- * which is always true when a member is first created.
+ * Creates the member metadatarecord record if the updatedMember is
different from
Review Comment:
As mentioned below, we somewhat changed the behavior of this method in a
corner case.
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3358,12 +3362,10 @@ private boolean hasStreamsMemberMetadataChanged(
String memberId = updatedMember.memberId();
if (!updatedMember.equals(member)) {
records.add(newStreamsGroupMemberRecord(groupId, updatedMember));
+ log.info("[GroupId {}] Member {} updated its member metdata to
{}.",
+ groupId, memberId, updatedMember);
- if (!Objects.equals(updatedMember.topologyEpoch(),
member.topologyEpoch())) {
Review Comment:
I moved the "return true" outside the if. There was a discussion with Bruno
about this. The thing is, that there is a bunch of information - rackId,
clientTags, ... that in theory, may influence the assignment, so we need to
bump the group epoch / rerun the assignor - in practice, a member will not
change these during its lifetime, so they only change when a member joins,
which anyway, will cause a group epoch bump. However, since we do not force the
member to never change those configs, we need to be conservative here and bump
the group epoch any time any of those metadata fields changes (not just when
teh topology epoch changes, as it was implemented before).
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -15555,6 +15575,100 @@ public void
testStreamsUpdatingMetadataTriggersNewTargetAssignment() {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId,
+ TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5),
+ TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1, 2)
+ )),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
11),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
+ );
+
+ assertRecordsEquals(expectedRecords, result.records());
+ }
+
+ @Test
+ public void
testStreamsUpdatingPartitonMetadataTriggersNewTargetAssignment() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]