Copilot commented on code in PR #21022:
URL: https://github.com/apache/kafka/pull/21022#discussion_r2576990126


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -2548,26 +2311,18 @@ public Set<TopicPartition> partitions() {
             "K2".getBytes(),
             "V2".getBytes()));
 
-        if (stateUpdaterEnabled) {
-            TestUtils.waitForCondition(
-                () -> mockRestoreConsumer.assignment().isEmpty(),
-                "Never get the assignment");
-        } else {
-            TestUtils.waitForCondition(
-                () -> {
-                    mockRestoreConsumer.assign(changelogPartitionSet);
-                    return mockRestoreConsumer.position(changelogPartition) == 
2L;
-                },
-                "Never finished restore");
-        }
+        TestUtils.waitForCondition(
+            () -> mockRestoreConsumer.assignment().isEmpty(),
+            "Never get the assignment");
+        

Review Comment:
   Trailing whitespace detected. Please remove the trailing whitespace on this 
line for consistency with the project's code style.
   ```suggestion
   
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -1768,15 +1724,14 @@ public void 
shouldReinitializeRevivedTasksInAnyState(final boolean stateUpdaterE
         runOnce(processingThreadsEnabled);
 
         // the third actually polls, processes the record, and throws the 
corruption exception
-        if (stateUpdaterEnabled) {
-            TestUtils.waitForCondition(
-                () -> thread.taskManager().checkStateUpdater(
-                    mockTime.milliseconds(),
-                    topicPartitions -> 
mockConsumer.seekToBeginning(singleton(t1p1))
-                ),
-                10 * 1000,
-                "State updater never returned tasks.");
-        }
+        TestUtils.waitForCondition(
+            () -> thread.taskManager().checkStateUpdater(
+                mockTime.milliseconds(),
+                topicPartitions -> 
mockConsumer.seekToBeginning(singleton(t1p1))
+            ),
+            10 * 1000,
+            "State updater never returned tasks.");
+        

Review Comment:
   Trailing whitespace detected. Please remove the trailing whitespace on this 
line for consistency with the project's code style.
   ```suggestion
   
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -1449,30 +1416,19 @@ public void shouldNotReturnDataAfterTaskMigrated(final 
boolean stateUpdaterEnabl
 
         final TaskMigratedException taskMigratedException = new 
TaskMigratedException(
             "Changelog restore found task migrated", new 
RuntimeException("restore task migrated"));
-        ChangelogReader changelogReader = this.changelogReader;
-        if (stateUpdaterEnabled) {
-            when(taskManager.checkStateUpdater(anyLong(), 
any())).thenAnswer(answer -> {
-                consumer.addRecord(new ConsumerRecord<>(topic1, 1, 11, new 
byte[0], new byte[0]));
-                consumer.addRecord(new ConsumerRecord<>(topic1, 1, 12, new 
byte[1], new byte[0]));
+        final ChangelogReader changelogReader = this.changelogReader;
+        when(taskManager.checkStateUpdater(anyLong(), 
any())).thenAnswer(answer -> {
+            consumer.addRecord(new ConsumerRecord<>(topic1, 1, 11, new 
byte[0], new byte[0]));
+            consumer.addRecord(new ConsumerRecord<>(topic1, 1, 12, new 
byte[1], new byte[0]));
 
-                throw taskMigratedException;
-            });
-        } else {
-            changelogReader = new MockChangelogReader() {
-                @Override
-                public long restore(final Map<TaskId, Task> tasks) {
-                    consumer.addRecord(new ConsumerRecord<>(topic1, 1, 11, new 
byte[0], new byte[0]));
-                    consumer.addRecord(new ConsumerRecord<>(topic1, 1, 12, new 
byte[1], new byte[0]));
-
-                    throw taskMigratedException;
-                }
-            };
-        }
+            throw taskMigratedException;
+        });

Review Comment:
   Trailing whitespace detected. Please remove the trailing whitespace on this 
line for consistency with the project's code style.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -1788,15 +1743,14 @@ public void 
shouldReinitializeRevivedTasksInAnyState(final boolean stateUpdaterE
 
         // Now, we can handle the corruption
         
thread.taskManager().handleCorruption(taskCorruptedException.corruptedTasks());
-        if (stateUpdaterEnabled) {
-            TestUtils.waitForCondition(
-                () -> thread.taskManager().checkStateUpdater(
-                    mockTime.milliseconds(),
-                    topicPartitions -> 
mockConsumer.seekToBeginning(singleton(t1p1))
-                ),
-                10 * 1000,
-                "State updater never returned tasks.");
-        }
+        TestUtils.waitForCondition(
+            () -> thread.taskManager().checkStateUpdater(
+                mockTime.milliseconds(),
+                topicPartitions -> 
mockConsumer.seekToBeginning(singleton(t1p1))
+            ),
+            10 * 1000,
+            "State updater never returned tasks.");
+        
 

Review Comment:
   Trailing whitespace detected. Please remove the trailing whitespace on this 
line for consistency with the project's code style.
   ```suggestion
   
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to