Copilot commented on code in PR #20755:
URL: https://github.com/apache/kafka/pull/20755#discussion_r2479077772


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1985,30 +1985,54 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
 
         // Actually bump the group epoch
         int groupEpoch = group.groupEpoch();
+        boolean isInitialRebalance = (bumpGroupEpoch && groupEpoch == 0);
         if (bumpGroupEpoch) {
-            groupEpoch += 1;
+            if (groupEpoch == 0) {
+                groupEpoch += 2;
+            } else {
+                groupEpoch += 1;
+            }
             records.add(newStreamsGroupMetadataRecord(groupId, groupEpoch, 
metadataHash, validatedTopologyEpoch, currentAssignmentConfigs));
             log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to 
{} with metadata hash {} and validated topic epoch {}.", groupId, memberId, 
groupEpoch, metadataHash, validatedTopologyEpoch);
             metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
             group.setMetadataRefreshDeadline(currentTimeMs + 
METADATA_REFRESH_INTERVAL_MS, groupEpoch);
         }
 
+        // Schedule initial rebalance delay for new streams groups to coalesce 
joins.
+        int initialDelayMs = streamsGroupInitialRebalanceDelayMs(groupId);
+        if (isInitialRebalance && initialDelayMs > 0) {
+            timer.scheduleIfAbsent(
+                    streamsInitialRebalanceKey(groupId),
+                    initialDelayMs,
+                    TimeUnit.MILLISECONDS,
+                    false,
+                    () -> fireStreamsInitialRebalance(groupId)
+            );
+        }
+
         // 4. Update the target assignment if the group epoch is larger than 
the target assignment epoch or a static member
         // replaces an existing static member.
         // The delta between the existing and the new target assignment is 
persisted to the partition.
         int targetAssignmentEpoch;
         TasksTuple targetAssignment;
         if (groupEpoch > group.assignmentEpoch()) {
-            targetAssignment = updateStreamsTargetAssignment(
-                group,
-                groupEpoch,
-                updatedMember,
-                updatedConfiguredTopology,
-                metadataImage,
-                records,
-                currentAssignmentConfigs
-            );
-            targetAssignmentEpoch = groupEpoch;
+            boolean initialDelayActive = 
timer.isScheduled(streamsInitialRebalanceKey(groupId));

Review Comment:
   The condition `group.assignmentEpoch() == 0` is checked here, but the timer 
is scheduled when `isInitialRebalance` is true (line 2003), which depends on 
`groupEpoch == 0` before bumping. Consider adding a comment explaining why we 
check `assignmentEpoch()` here instead of checking if the timer is scheduled 
for a more explicit relationship between these conditions.
   ```suggestion
               boolean initialDelayActive = 
timer.isScheduled(streamsInitialRebalanceKey(groupId));
               // Note: We check group.assignmentEpoch() == 0 here instead of 
whether the timer is scheduled,
               // because the timer for the initial rebalance is scheduled when 
isInitialRebalance is true,
               // which depends on groupEpoch == 0 before bumping. 
assignmentEpoch() == 0 indicates that the
               // group has not yet performed an assignment, which is the 
relevant state for returning an
               // empty assignment during the initial rebalance delay.
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -8716,6 +8816,20 @@ static String classicGroupSyncKey(String groupId) {
         return "sync-" + groupId;
     }
 
+
+    /**
+     * Generate a streams group initial rebalance key for the timer.
+     *
+     * Package private for testing.
+     *
+     * @param groupId   The group id.
+     *
+     * @return the initial rebalance key.
+     */
+    static String streamsInitialRebalanceKey(String groupId) {
+        return "initial-rebalance-timeout-" + groupId;
+    }

Review Comment:
   [nitpick] There's an extra blank line (8819) before the Javadoc comment. 
This is inconsistent with other methods in the class and should be removed for 
consistency.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1985,30 +1985,54 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
 
         // Actually bump the group epoch
         int groupEpoch = group.groupEpoch();
+        boolean isInitialRebalance = (bumpGroupEpoch && groupEpoch == 0);

Review Comment:
   The `isInitialRebalance` flag is computed before `groupEpoch` is potentially 
modified (lines 1990-1994). However, the variable name and its usage suggest it 
should represent whether this is the initial rebalance. Consider computing this 
flag after the epoch bump logic, or rename it to `wasInitialRebalance` to 
clarify that it represents the state before the bump.
   ```suggestion
           boolean wasInitialRebalance = (bumpGroupEpoch && groupEpoch == 0);
   ```



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java:
##########
@@ -161,8 +161,11 @@ public StreamsGroupMember build() {
             case STABLE:
                 // When the member is in the STABLE state, we verify if a newer
                 // epoch (or target assignment) is available. If it is, we can
-                // reconcile the member towards it. Otherwise, we return.
-                if (member.memberEpoch() != targetAssignmentEpoch) {
+                // reconcile the member towards it. If the epoch is the same 
but
+                // the target assignment has changed (e.g., after initial 
rebalance
+                // delay fires), we must still reconcile to propagate the new 
tasks.

Review Comment:
   The comment mentions 'after initial rebalance delay fires' as an example, 
but this condition also handles other scenarios where the target assignment 
changes without an epoch bump. Consider clarifying that this handles any case 
where the target assignment differs from current assignment at the same epoch, 
not just the initial rebalance delay scenario.
   ```suggestion
                   // the target assignment has changed (for example, after 
initial rebalance
                   // delay fires, or any other scenario where the target 
assignment differs
                   // from the current assignment at the same epoch), we must 
still reconcile
                   // to propagate the new tasks.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to