cadonna commented on code in PR #13621: URL: https://github.com/apache/kafka/pull/13621#discussion_r1175663484
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2602,23 +2608,21 @@ public void shouldUpdateInputPartitionsAfterRebalance() { assertThat(taskManager.tryToCompleteRestoration(time.milliseconds(), null), is(true)); assertThat(task00.state(), is(Task.State.RUNNING)); assertEquals(newPartitionsSet, task00.inputPartitions()); - verify(activeTaskCreator, consumer, changeLogReader); + verify(activeTaskCreator, consumer); } @Test public void shouldAddNewActiveTasks() { final Map<TaskId, Set<TopicPartition>> assignment = taskId00Assignment; final Task task00 = new StateMachineTask(taskId00, taskId00Partitions, true); - expect(changeLogReader.completedChangelogs()).andReturn(emptySet()); expect(consumer.assignment()).andReturn(emptySet()); consumer.resume(eq(emptySet())); expectLastCall(); - changeLogReader.enforceRestoreActive(); expectLastCall(); Review Comment: This line can also be removed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -2653,15 +2658,13 @@ public void initializeIfNeeded() { consumer.commitSync(Collections.emptyMap()); expectLastCall(); - expect(changeLogReader.completedChangelogs()).andReturn(emptySet()); expect(consumer.assignment()).andReturn(emptySet()); consumer.resume(eq(emptySet())); expectLastCall(); - changeLogReader.enforceRestoreActive(); expectLastCall(); Review Comment: This line can also be removed. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -3045,17 +3048,14 @@ public void closeDirty() { } }; - resetToStrict(changeLogReader); - changeLogReader.enforceRestoreActive(); - expect(changeLogReader.completedChangelogs()).andReturn(emptySet()); Review Comment: Since the mock was reset to strict, I think you need to verify or explicitly stub the call to `changeLogReader.completedChangelogs()`. Otherwise, the test would not fail, if `completedChangelogs()` were not called. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -3168,16 +3167,13 @@ public Set<TopicPartition> changelogPartitions() { } }; - resetToStrict(changeLogReader); - changeLogReader.enforceRestoreActive(); - expect(changeLogReader.completedChangelogs()).andReturn(emptySet()); Review Comment: Same here ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -3117,16 +3118,13 @@ public Set<TopicPartition> changelogPartitions() { final Map<TopicPartition, OffsetAndMetadata> offsets = singletonMap(t1p0, new OffsetAndMetadata(0L, null)); task00.setCommittableOffsetsAndMetadata(offsets); - resetToStrict(changeLogReader); - changeLogReader.enforceRestoreActive(); - expect(changeLogReader.completedChangelogs()).andReturn(emptySet()); Review Comment: Same here. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -3297,16 +3294,13 @@ public void suspend() { } }; - resetToStrict(changeLogReader); - changeLogReader.enforceRestoreActive(); - expect(changeLogReader.completedChangelogs()).andReturn(emptySet()); Review Comment: Same here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org