lucasbru commented on code in PR #20755:
URL: https://github.com/apache/kafka/pull/20755#discussion_r2489901598


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java:
##########
@@ -161,7 +161,9 @@ public StreamsGroupMember build() {
             case STABLE:
                 // When the member is in the STABLE state, we verify if a newer
                 // epoch (or target assignment) is available. If it is, we can
-                // reconcile the member towards it. Otherwise, we return.
+                // reconcile the member towards it. If the epoch is the same 
but
+                // the target assignment has changed (e.g., after initial 
rebalance
+                // delay fires), we must still reconcile to propagate the new 
tasks.

Review Comment:
   Is this comment update accurate? I don't understand it. If the epoch is the 
same, the target asiggnment should be the same.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1985,30 +1985,54 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
 
         // Actually bump the group epoch
         int groupEpoch = group.groupEpoch();
+        boolean isInitialRebalance = (bumpGroupEpoch && groupEpoch == 0);
         if (bumpGroupEpoch) {
-            groupEpoch += 1;
+            if (groupEpoch == 0) {
+                groupEpoch += 2;
+            } else {
+                groupEpoch += 1;
+            }
             records.add(newStreamsGroupMetadataRecord(groupId, groupEpoch, 
metadataHash, validatedTopologyEpoch, currentAssignmentConfigs));
             log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to 
{} with metadata hash {} and validated topic epoch {}.", groupId, memberId, 
groupEpoch, metadataHash, validatedTopologyEpoch);
             metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
             group.setMetadataRefreshDeadline(currentTimeMs + 
METADATA_REFRESH_INTERVAL_MS, groupEpoch);
         }
 
+        // Schedule initial rebalance delay for new streams groups to coalesce 
joins.
+        int initialDelayMs = streamsGroupInitialRebalanceDelayMs(groupId);

Review Comment:
   Maybe we can only read the configuration if `inInitialRebalance == true`



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4003,6 +4027,69 @@ private TasksTuple updateStreamsTargetAssignment(
         }
     }
 
+    /**
+     * Fires the initial rebalance for a streams group when the delay timer 
expires.
+     * Computes and persists target assignment for all members if conditions 
are met.
+     *
+     * @param groupId The group id.
+     * @return A CoordinatorResult with records to persist the target 
assignment, or EMPTY_RESULT.
+     */
+    private CoordinatorResult<Void, CoordinatorRecord> 
fireStreamsInitialRebalance(
+        String groupId
+    ) {
+        try {
+            StreamsGroup group = streamsGroup(groupId);
+
+            if (group.groupEpoch() <= group.assignmentEpoch()) {

Review Comment:
   This case should not be possible, so I'd throw an IllegalStateException in 
this case.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1985,30 +1985,54 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
 
         // Actually bump the group epoch
         int groupEpoch = group.groupEpoch();
+        boolean isInitialRebalance = (bumpGroupEpoch && groupEpoch == 0);
         if (bumpGroupEpoch) {
-            groupEpoch += 1;
+            if (groupEpoch == 0) {
+                groupEpoch += 2;

Review Comment:
   This could be `groupEpoch = 2`



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1985,30 +1985,54 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
 
         // Actually bump the group epoch
         int groupEpoch = group.groupEpoch();
+        boolean isInitialRebalance = (bumpGroupEpoch && groupEpoch == 0);

Review Comment:
   Do we really need to check `bumpGroupEpoch` here? We should only get here 
with `groupEpoch == 0` once, and the `bumpGroupEpoch` should be implied.
   
   I would also move this variable further down to where it is used.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -8570,6 +8657,10 @@ private boolean maybeDeleteEmptyStreamsGroup(String 
groupId, List<CoordinatorRec
             // Add tombstones for the previous streams group. The tombstones 
won't actually be
             // replayed because its coordinator result has a non-null 
appendFuture.
             createGroupTombstoneRecords(group, records);
+            // Cancel any pending initial rebalance timer.
+            if (timer.isScheduled(streamsInitialRebalanceKey(groupId))) {

Review Comment:
   I think we don't need to wrap this in an if



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4003,6 +4027,69 @@ private TasksTuple updateStreamsTargetAssignment(
         }
     }
 
+    /**
+     * Fires the initial rebalance for a streams group when the delay timer 
expires.
+     * Computes and persists target assignment for all members if conditions 
are met.
+     *
+     * @param groupId The group id.
+     * @return A CoordinatorResult with records to persist the target 
assignment, or EMPTY_RESULT.
+     */
+    private CoordinatorResult<Void, CoordinatorRecord> 
fireStreamsInitialRebalance(

Review Comment:
   `computeDelayedTargetAssignment`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java:
##########
@@ -288,6 +288,7 @@ public static Builder withDefaults(String memberId) {
                 .setClientTags(Collections.emptyMap())
                 .setState(MemberState.STABLE)
                 .setMemberEpoch(0)
+                .setPreviousMemberEpoch(0)

Review Comment:
   Do we need to update the test for this?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4003,6 +4027,69 @@ private TasksTuple updateStreamsTargetAssignment(
         }
     }
 
+    /**
+     * Fires the initial rebalance for a streams group when the delay timer 
expires.
+     * Computes and persists target assignment for all members if conditions 
are met.
+     *
+     * @param groupId The group id.
+     * @return A CoordinatorResult with records to persist the target 
assignment, or EMPTY_RESULT.
+     */
+    private CoordinatorResult<Void, CoordinatorRecord> 
fireStreamsInitialRebalance(
+        String groupId
+    ) {
+        try {
+            StreamsGroup group = streamsGroup(groupId);
+
+            if (group.groupEpoch() <= group.assignmentEpoch()) {
+                return EMPTY_RESULT;
+            }
+
+            if (!group.configuredTopology().isPresent()) {
+                return EMPTY_RESULT;

Review Comment:
   This could be possible. We should probably log a warning here.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -3693,7 +3717,7 @@ private StreamsGroupMember maybeReconcile(
         List<StreamsGroupHeartbeatRequestData.TaskIds> ownedWarmupTasks,
         List<CoordinatorRecord> records
     ) {
-        if (member.isReconciledTo(targetAssignmentEpoch)) {
+        if (member.isReconciledTo(targetAssignmentEpoch) && 
member.assignedTasks().equals(targetAssignment)) {

Review Comment:
   Why are we doing this change?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -1985,30 +1985,54 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, 
CoordinatorRecord> stream
 
         // Actually bump the group epoch
         int groupEpoch = group.groupEpoch();
+        boolean isInitialRebalance = (bumpGroupEpoch && groupEpoch == 0);
         if (bumpGroupEpoch) {
-            groupEpoch += 1;
+            if (groupEpoch == 0) {
+                groupEpoch += 2;
+            } else {
+                groupEpoch += 1;
+            }
             records.add(newStreamsGroupMetadataRecord(groupId, groupEpoch, 
metadataHash, validatedTopologyEpoch, currentAssignmentConfigs));
             log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to 
{} with metadata hash {} and validated topic epoch {}.", groupId, memberId, 
groupEpoch, metadataHash, validatedTopologyEpoch);
             metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
             group.setMetadataRefreshDeadline(currentTimeMs + 
METADATA_REFRESH_INTERVAL_MS, groupEpoch);
         }
 
+        // Schedule initial rebalance delay for new streams groups to coalesce 
joins.
+        int initialDelayMs = streamsGroupInitialRebalanceDelayMs(groupId);
+        if (isInitialRebalance && initialDelayMs > 0) {
+            timer.scheduleIfAbsent(
+                    streamsInitialRebalanceKey(groupId),
+                    initialDelayMs,
+                    TimeUnit.MILLISECONDS,
+                    false,
+                    () -> fireStreamsInitialRebalance(groupId)
+            );
+        }
+
         // 4. Update the target assignment if the group epoch is larger than 
the target assignment epoch or a static member
         // replaces an existing static member.
         // The delta between the existing and the new target assignment is 
persisted to the partition.
         int targetAssignmentEpoch;
         TasksTuple targetAssignment;
         if (groupEpoch > group.assignmentEpoch()) {
-            targetAssignment = updateStreamsTargetAssignment(
-                group,
-                groupEpoch,
-                updatedMember,
-                updatedConfiguredTopology,
-                metadataImage,
-                records,
-                currentAssignmentConfigs
-            );
-            targetAssignmentEpoch = groupEpoch;
+            boolean initialDelayActive = 
timer.isScheduled(streamsInitialRebalanceKey(groupId));
+            if (initialDelayActive && group.assignmentEpoch() == 0) {
+                // During initial rebalance delay, return empty assignment to 
first joining members.
+                targetAssignmentEpoch = groupEpoch - 1;

Review Comment:
   Shouldn't this always be 1? Why `groupEpoch - 1`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4003,6 +4027,69 @@ private TasksTuple updateStreamsTargetAssignment(
         }
     }
 
+    /**
+     * Fires the initial rebalance for a streams group when the delay timer 
expires.
+     * Computes and persists target assignment for all members if conditions 
are met.
+     *
+     * @param groupId The group id.
+     * @return A CoordinatorResult with records to persist the target 
assignment, or EMPTY_RESULT.
+     */
+    private CoordinatorResult<Void, CoordinatorRecord> 
fireStreamsInitialRebalance(
+        String groupId
+    ) {
+        try {
+            StreamsGroup group = streamsGroup(groupId);
+
+            if (group.groupEpoch() <= group.assignmentEpoch()) {
+                return EMPTY_RESULT;
+            }
+
+            if (!group.configuredTopology().isPresent()) {
+                return EMPTY_RESULT;
+            }
+
+            TaskAssignor assignor = streamsGroupAssignor(group.groupId());

Review Comment:
   I think in the interest of not copying to much code, We could call 
`updateStreamsTargetAssignment` here and make updatedMember optional



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java:
##########
@@ -4003,6 +4027,69 @@ private TasksTuple updateStreamsTargetAssignment(
         }
     }
 
+    /**
+     * Fires the initial rebalance for a streams group when the delay timer 
expires.
+     * Computes and persists target assignment for all members if conditions 
are met.
+     *
+     * @param groupId The group id.
+     * @return A CoordinatorResult with records to persist the target 
assignment, or EMPTY_RESULT.
+     */
+    private CoordinatorResult<Void, CoordinatorRecord> 
fireStreamsInitialRebalance(
+        String groupId
+    ) {
+        try {
+            StreamsGroup group = streamsGroup(groupId);
+
+            if (group.groupEpoch() <= group.assignmentEpoch()) {
+                return EMPTY_RESULT;
+            }
+
+            if (!group.configuredTopology().isPresent()) {
+                return EMPTY_RESULT;
+            }
+
+            TaskAssignor assignor = streamsGroupAssignor(group.groupId());
+
+            try {

Review Comment:
   I wonder if there are any other corner cases here. For example, I think we 
should return early if the group is empty (it could become empty if the last 
member leaves before we compute the target assignment).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to