lucasbru commented on code in PR #20755:
URL: https://github.com/apache/kafka/pull/20755#discussion_r2495931755
##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -17371,14 +17377,89 @@ public void
testStreamsGroupHeartbeatPartialResponseWhenNothingChanges() {
.setMemberId(memberId)
.setMemberEpoch(result.response().data().memberEpoch()));
+ assertResponseEquals(
+ new StreamsGroupHeartbeatResponseData()
+ .setMemberId(memberId)
+ .setMemberEpoch(2)
+ .setHeartbeatIntervalMs(5000)
+ .setEndpointInformationEpoch(-1),
+ result.response().data()
+ );
+ }
+
+ @Test
+ public void
testStreamsInitialRebalanceDelay_EmptyDuringDelay_AssignsAfterTimer() {
Review Comment:
I don't think we put underscores in the tests in this files
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3976,8 +4003,11 @@ private TasksTuple updateStreamsTargetAssignment(
.withTopology(configuredTopology)
.withStaticMembers(group.staticMembers())
.withMetadataImage(metadataImage)
- .withTargetAssignment(group.targetAssignment())
- .addOrUpdateMember(updatedMember.memberId(), updatedMember);
+ .withTargetAssignment(group.targetAssignment());
+
+ if (updatedMember.isPresent()) {
Review Comment:
Could this use updatedMember.ifPresent(() => ...)?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3994,7 +4024,7 @@ private TasksTuple updateStreamsTargetAssignment(
records.addAll(assignmentResult.records());
- return
assignmentResult.targetAssignment().get(updatedMember.memberId());
+ return updatedMember.isPresent() ?
assignmentResult.targetAssignment().get(updatedMember.get().memberId()) :
TasksTuple.EMPTY;
Review Comment:
map(..).orElse(...)?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1985,30 +1985,57 @@ private CoordinatorResult<StreamsGroupHeartbeatResult,
CoordinatorRecord> stream
// Actually bump the group epoch
int groupEpoch = group.groupEpoch();
+ int initialGroupEpoch = groupEpoch;
if (bumpGroupEpoch) {
- groupEpoch += 1;
+ if (groupEpoch == 0) {
+ groupEpoch = 2;
+ } else {
+ groupEpoch += 1;
+ }
records.add(newStreamsGroupMetadataRecord(groupId, groupEpoch,
metadataHash, validatedTopologyEpoch, currentAssignmentConfigs));
log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to
{} with metadata hash {} and validated topic epoch {}.", groupId, memberId,
groupEpoch, metadataHash, validatedTopologyEpoch);
metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
group.setMetadataRefreshDeadline(currentTimeMs +
METADATA_REFRESH_INTERVAL_MS, groupEpoch);
}
+ // Schedule initial rebalance delay for new streams groups to coalesce
joins.
+ boolean isInitialRebalance = (initialGroupEpoch == 0);
Review Comment:
could we just use `group.groupEpoch()` here?
##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -8570,6 +8645,8 @@ private boolean maybeDeleteEmptyStreamsGroup(String
groupId, List<CoordinatorRec
// Add tombstones for the previous streams group. The tombstones
won't actually be
// replayed because its coordinator result has a non-null
appendFuture.
createGroupTombstoneRecords(group, records);
+ // Cancel initial rebalance timer.
+ timer.cancel(streamsInitialRebalanceKey(groupId));
Review Comment:
I think we have to move this into `createGroupTombstoneRecords`, so that
it's also done when delete group is called
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]