Copilot commented on code in PR #21059:
URL: https://github.com/apache/kafka/pull/21059#discussion_r2588631852


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1469,15 +1412,11 @@ private long pollPhase() {
         final ConsumerRecords<byte[], byte[]> records;
         log.debug("Invoking poll on main Consumer");
 
-        if (state == State.PARTITIONS_ASSIGNED && !stateUpdaterEnabled) {
-            // try to fetch some records with zero poll millis
-            // to unblock the restoration as soon as possible
-            records = pollRequests(Duration.ZERO);
-        } else if (state == State.PARTITIONS_REVOKED) {
+        if (state == State.PARTITIONS_REVOKED) {
             // try to fetch some records with zero poll millis to unblock
             // other useful work while waiting for the join response
             records = pollRequests(Duration.ZERO);
-        } else if (state == State.RUNNING || state == State.STARTING || (state 
== State.PARTITIONS_ASSIGNED && stateUpdaterEnabled)) {
+        } else if (state == State.RUNNING || state == State.STARTING || (state 
== State.PARTITIONS_ASSIGNED)) {

Review Comment:
   The parentheses around `(state == State.PARTITIONS_ASSIGNED)` are 
unnecessary. The condition can be simplified to `state == State.RUNNING || 
state == State.STARTING || state == State.PARTITIONS_ASSIGNED` for consistency 
with the rest of the codebase.
   ```suggestion
           } else if (state == State.RUNNING || state == State.STARTING || 
state == State.PARTITIONS_ASSIGNED) {
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##########
@@ -493,13 +488,12 @@ private ConsumerRecords<byte[], byte[]> 
pollRecordsFromRestoreConsumer(final Map
                                                                            
final Set<TopicPartition> restoringChangelogs) {
         // If we are updating only standby tasks, and are not using a separate 
thread, we should
         // use a non-blocking poll to unblock the processing as soon as 
possible.

Review Comment:
   The comment is now outdated. Since the state updater is always enabled 
(running in a separate thread), the comment about "not using a separate thread" 
and "non-blocking poll" is no longer relevant. This comment should be updated 
or removed.
   ```suggestion
           
           
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -802,7 +734,7 @@ private Map<TaskId, Set<TopicPartition>> 
pendingTasksToCreate(final Map<TaskId,
             final Map.Entry<TaskId, Set<TopicPartition>> entry = iter.next();
             final TaskId taskId = entry.getKey();
             final boolean taskIsOwned = tasks.allTaskIds().contains(taskId)
-                || (stateUpdater != null && 
stateUpdater.tasks().stream().anyMatch(task -> task.id() == taskId));
+                || (stateUpdater.tasks().stream().anyMatch(task -> task.id() 
== taskId));

Review Comment:
   The comparison `task.id() == taskId` uses reference equality (`==`) instead 
of `.equals()` for comparing `TaskId` objects. This should be 
`task.id().equals(taskId)` to ensure correct object equality comparison.
   ```suggestion
                   || (stateUpdater.tasks().stream().anyMatch(task -> 
task.id().equals(taskId)));
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1793,24 +1713,18 @@ Set<TaskId> standbyTaskIds() {
     Map<TaskId, Task> allTasks() {
         // not bothering with an unmodifiable map, since the tasks themselves 
are mutable, but
         // if any outside code modifies the map or the tasks, it would be a 
severe transgression.
-        if (stateUpdater != null) {
-            final Map<TaskId, Task> ret = 
stateUpdater.tasks().stream().collect(Collectors.toMap(Task::id, x -> x));
-            ret.putAll(tasks.allTasksPerId());
-            
ret.putAll(tasks.pendingTasksToInit().stream().collect(Collectors.toMap(Task::id,
 x -> x)));
-            return ret;
-        } else {
-            return tasks.allTasksPerId();
-        }
+        final Map<TaskId, Task> ret = 
stateUpdater.tasks().stream().collect(Collectors.toMap(Task::id, x -> x));
+        ret.putAll(tasks.allTasksPerId());
+        
ret.putAll(tasks.pendingTasksToInit().stream().collect(Collectors.toMap(Task::id,
 x -> x)));
+        return ret;
     }
 
     /**
      * Returns tasks owned by the stream thread. With state updater disabled, 
these are all tasks. With
      * state updater enabled, this does not return any tasks currently owned 
by the state updater.

Review Comment:
   The documentation comment is now outdated. Since the state updater is always 
enabled, the comment should be updated to reflect the current behavior: 
"Returns tasks owned by the stream thread. This does not return any tasks 
currently owned by the state updater."
   ```suggestion
        * Returns tasks owned by the stream thread.
        * This does not return any tasks currently owned by the state updater.
   ```



##########
tests/kafkatest/tests/streams/streams_upgrade_test.py:
##########
@@ -174,26 +174,31 @@ def test_version_probing_upgrade(self, metadata_quorum):
         self.stop_and_await()
 
     @cluster(num_nodes=6)
-    @matrix(from_version=[str(LATEST_3_2), str(DEV_VERSION)],  upgrade=[True, 
False], metadata_quorum=[quorum.combined_kraft])
-    def test_upgrade_downgrade_state_updater(self, from_version, upgrade, 
metadata_quorum):
+    @matrix(from_version=[str(LATEST_3_7), str(LATEST_3_8), str(LATEST_3_9), 
str(LATEST_4_0), str(LATEST_4_1), str(LATEST_4_2)],
+            upgrade=[True, False],
+            metadata_quorum=[quorum.combined_kraft])
+    def test_upgrade_downgrade_state_updater(self, from_version, 
metadata_quorum):

Review Comment:
   The method signature is missing the `upgrade` parameter that is declared in 
the `@matrix` decorator (line 178) and used in the method body (line 191). The 
signature should be `def test_upgrade_downgrade_state_updater(self, 
from_version, upgrade, metadata_quorum):`.
   ```suggestion
       def test_upgrade_downgrade_state_updater(self, from_version, upgrade, 
metadata_quorum):
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1871,14 +1779,11 @@ private List<Task> standbyTaskIterable() {
 
     private Stream<Task> standbyTaskStream() {
         final Stream<Task> standbyTasksInTaskRegistry = 
tasks.allTasks().stream().filter(t -> !t.isActive());
-        if (stateUpdater != null) {
-            return Stream.concat(
-                stateUpdater.standbyTasks().stream(),
-                standbyTasksInTaskRegistry
-            );
-        } else {
-            return standbyTasksInTaskRegistry;
-        }
+        return Stream.concat(
+            stateUpdater.standbyTasks().stream(),
+            standbyTasksInTaskRegistry
+        );
+

Review Comment:
   There is an unnecessary blank line before the closing brace. Consider 
removing it for consistent code formatting.
   ```suggestion
   
   ```



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -1848,13 +1758,11 @@ List<Task> activeRunningTaskIterable() {
     }
 
     private Stream<Task> activeTaskStream() {
-        if (stateUpdater != null) {
-            return Stream.concat(
-                activeRunningTaskStream(),
-                stateUpdater.tasks().stream().filter(Task::isActive)
-            );
-        }
-        return activeRunningTaskStream();
+        return Stream.concat(
+            activeRunningTaskStream(),
+            stateUpdater.tasks().stream().filter(Task::isActive)
+        );
+

Review Comment:
   There is an unnecessary blank line before the closing brace. Consider 
removing it for consistent code formatting.
   ```suggestion
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to