lucasbru commented on code in PR #20755:
URL: https://github.com/apache/kafka/pull/20755#discussion_r2495931755


##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java:
##########
@@ -17371,14 +17377,89 @@ public void 
testStreamsGroupHeartbeatPartialResponseWhenNothingChanges() {
                 .setMemberId(memberId)
                 .setMemberEpoch(result.response().data().memberEpoch()));
 
+        assertResponseEquals(
+            new StreamsGroupHeartbeatResponseData()
+                .setMemberId(memberId)
+                .setMemberEpoch(2)
+                .setHeartbeatIntervalMs(5000)
+                .setEndpointInformationEpoch(-1),
+            result.response().data()
+        );
+    }
+
+    @Test
+    public void 
testStreamsInitialRebalanceDelay_EmptyDuringDelay_AssignsAfterTimer() {

Review Comment:
   I don't think we put underscores in the tests in this files



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3976,8 +4003,11 @@ private TasksTuple updateStreamsTargetAssignment(
                 .withTopology(configuredTopology)
                 .withStaticMembers(group.staticMembers())
                 .withMetadataImage(metadataImage)
-                .withTargetAssignment(group.targetAssignment())
-                .addOrUpdateMember(updatedMember.memberId(), updatedMember);
+                .withTargetAssignment(group.targetAssignment());
+
+            if (updatedMember.isPresent()) {

Review Comment:
   Could this use updatedMember.ifPresent(() => ...)?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3994,7 +4024,7 @@ private TasksTuple updateStreamsTargetAssignment(
 
             records.addAll(assignmentResult.records());
 
-            return 
assignmentResult.targetAssignment().get(updatedMember.memberId());
+            return updatedMember.isPresent() ? 
assignmentResult.targetAssignment().get(updatedMember.get().memberId()) : 
TasksTuple.EMPTY;

Review Comment:
   map(..).orElse(...)?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1985,30 +1985,57 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
 
         // Actually bump the group epoch
         int groupEpoch = group.groupEpoch();
+        int initialGroupEpoch = groupEpoch;
         if (bumpGroupEpoch) {
-            groupEpoch += 1;
+            if (groupEpoch == 0) {
+                groupEpoch = 2;
+            } else {
+                groupEpoch += 1;
+            }
             records.add(newStreamsGroupMetadataRecord(groupId, groupEpoch, 
metadataHash, validatedTopologyEpoch, currentAssignmentConfigs));
             log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to 
{} with metadata hash {} and validated topic epoch {}.", groupId, memberId, 
groupEpoch, metadataHash, validatedTopologyEpoch);
             metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
             group.setMetadataRefreshDeadline(currentTimeMs + 
METADATA_REFRESH_INTERVAL_MS, groupEpoch);
         }
 
+        // Schedule initial rebalance delay for new streams groups to coalesce 
joins.
+        boolean isInitialRebalance = (initialGroupEpoch == 0);

Review Comment:
   could we just use `group.groupEpoch()` here?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -8570,6 +8645,8 @@ private boolean maybeDeleteEmptyStreamsGroup(String 
groupId, List<CoordinatorRec
             // Add tombstones for the previous streams group. The tombstones 
won't actually be
             // replayed because its coordinator result has a non-null 
appendFuture.
             createGroupTombstoneRecords(group, records);
+            // Cancel initial rebalance timer.
+            timer.cancel(streamsInitialRebalanceKey(groupId));

Review Comment:
   I think we have to move this into `createGroupTombstoneRecords`, so that 
it's also done when delete group is called



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to