Copilot commented on code in PR #21110:
URL: https://github.com/apache/kafka/pull/21110#discussion_r2600135402


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java:
##########
@@ -699,17 +710,42 @@ public void 
onHeartbeatSuccess(StreamsGroupHeartbeatResponse response) {
             processAssignmentReceived(
                 toTasksAssignment(activeTasks),
                 toTasksAssignment(standbyTasks),
-                toTasksAssignment(warmupTasks)
+                toTasksAssignment(warmupTasks),
+                isGroupReady
             );
-        } else {
-            if (responseData.activeTasks() != null ||
-                responseData.standbyTasks() != null ||
-                responseData.warmupTasks() != null) {
+        } else if (responseData.activeTasks() != null ||
+            responseData.standbyTasks() != null ||
+            responseData.warmupTasks() != null) {
+
+            throw new IllegalStateException("Invalid response data, task 
collections must be all null or all non-null: "
+                + responseData);
+        } else if (isGroupReady != targetAssignment.isGroupReady) {
+            // If the client did not provide a new assignment, but the group 
is now ready, update the target
+            // assignment and reconcile it.
+            processAssignmentReceived(
+                targetAssignment.activeTasks,
+                targetAssignment.standbyTasks,
+                targetAssignment.warmupTasks,
+                isGroupReady
+            );
+        }
+    }
 
-                throw new IllegalStateException("Invalid response data, task 
collections must be all null or all non-null: "
-                    + responseData);
+    private boolean 
isGroupReady(List<StreamsGroupHeartbeatResponseData.Status> statuses) {
+        if (statuses != null) {
+            for (final StreamsGroupHeartbeatResponseData.Status status : 
statuses) {
+                switch 
(StreamsGroupHeartbeatResponse.Status.fromCode(status.statusCode())) {
+                    case MISSING_SOURCE_TOPICS:
+                    case MISSING_INTERNAL_TOPICS:
+                    case INCORRECTLY_PARTITIONED_TOPICS:
+                    case ASSIGNMENT_DELAYED:
+                        return false;

Review Comment:
   The switch statement should handle the `UNKNOWN_STATUS` case explicitly. If 
an unknown status code is received from the coordinator, the current 
implementation will fall through to the default case and continue checking 
other statuses, which may not be the intended behavior. Consider whether 
unknown statuses should result in the group being considered not ready, or if 
they should be ignored with a log warning.
   ```suggestion
                           return false;
                       case UNKNOWN_STATUS:
                           log.warn("Received UNKNOWN_STATUS from coordinator 
for status code {}. Treating group as not ready.", status.statusCode());
                           return false;
   ```



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/StreamsMembershipManager.java:
##########
@@ -1025,8 +1065,9 @@ private void maybeReconcile() {
         SortedSet<StreamsRebalanceData.TaskId> ownedStandbyTasks = 
toTaskIdSet(currentAssignment.standbyTasks);
         SortedSet<StreamsRebalanceData.TaskId> assignedWarmupTasks = 
toTaskIdSet(targetAssignment.warmupTasks);
         SortedSet<StreamsRebalanceData.TaskId> ownedWarmupTasks = 
toTaskIdSet(currentAssignment.warmupTasks);
+        boolean isGroupReady = targetAssignment.isGroupReady;
 
-        log.info("Assigned tasks with local epoch {}\n" +
+        log.info("Assigned tasks with local epoch {} and group {}\n" +

Review Comment:
   The log message formatting is inconsistent. The message says "Assigned tasks 
with local epoch {} and group {}" but the group status message at line 1080 is 
"is ready" or "is not ready", which doesn't align with "and group is ready". 
Consider using "Assigned tasks with local epoch {} where group {}" or "Assigned 
tasks with local epoch {}; group {}" for better readability.
   ```suggestion
           log.info("Assigned tasks with local epoch {}; group: {}\n" +
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1544,6 +1561,10 @@ private ConsumerRecords<byte[], byte[]> 
pollRequests(final Duration pollTime) {
         return records;
     }
 

Review Comment:
   The `setStreamsGroupReady` method lacks documentation. Consider adding a 
JavaDoc comment explaining its purpose, what the `ready` parameter indicates, 
and how it affects the thread's state transitions.
   ```suggestion
   
       /**
        * Sets the readiness state of the Streams group for this thread.
        *
        * @param ready {@code true} if the Streams group is ready to process 
records; {@code false} otherwise.
        *              When set to {@code true}, this thread may transition to 
an active processing state.
        *              When set to {@code false}, the thread will not process 
records until the group is ready.
        */
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to