bbejeck commented on code in PR #19114:
URL: https://github.com/apache/kafka/pull/19114#discussion_r1996158053
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3338,9 +3342,9 @@ private boolean hasMemberSubscriptionChanged(
}
/**
- * Creates the member subscription record if the updatedMember is
different from
- * the old member. Returns true if the topologyEpoch of the member has
changed,
- * which is always true when a member is first created.
+ * Creates the member metadatarecord record if the updatedMember is
different from
+ * the old member. Returns true if the metadata has changed, which is
always true
Review Comment:
nit: `which is always true` -> `which is always the case` or something
similar.
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -15555,6 +15575,100 @@ public void
testStreamsUpdatingMetadataTriggersNewTargetAssignment() {
List<CoordinatorRecord> expectedRecords = List.of(
StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId,
expectedMember),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupEpochRecord(groupId, 11),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId,
memberId,
+ TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE,
+ TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3,
4, 5),
+ TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1, 2)
+ )),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId,
11),
+
StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId,
expectedMember)
+ );
+
+ assertRecordsEquals(expectedRecords, result.records());
+ }
+
+ @Test
+ public void
testStreamsUpdatingPartitonMetadataTriggersNewTargetAssignment() {
+ String groupId = "fooup";
+ String memberId = Uuid.randomUuid().toString();
+
Review Comment:
nit: remove empty line
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3358,12 +3362,10 @@ private boolean hasStreamsMemberMetadataChanged(
String memberId = updatedMember.memberId();
if (!updatedMember.equals(member)) {
records.add(newStreamsGroupMemberRecord(groupId, updatedMember));
+ log.info("[GroupId {}] Member {} updated its member metdata to
{}.",
+ groupId, memberId, updatedMember);
- if (!Objects.equals(updatedMember.topologyEpoch(),
member.topologyEpoch())) {
Review Comment:
why remove this?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3338,9 +3342,9 @@ private boolean hasMemberSubscriptionChanged(
}
/**
- * Creates the member subscription record if the updatedMember is
different from
- * the old member. Returns true if the topologyEpoch of the member has
changed,
- * which is always true when a member is first created.
+ * Creates the member metadatarecord record if the updatedMember is
different from
Review Comment:
Did something structurally in the method change, or is the javadoc change
prompted by a better description?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]