cadonna commented on code in PR #12743: URL: https://github.com/apache/kafka/pull/12743#discussion_r995452727
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -1481,6 +1481,36 @@ public void shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception { assertThat(taskManager.lockedTaskDirectories(), is(singleton(taskId01))); } + @Test + public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() { + final Set<TopicPartition> assigned = mkSet(t1p0, t1p1); + expect(consumer.assignment()).andReturn(assigned); + consumer.pause(assigned); + expectLastCall(); + replay(consumer); + taskManager.handleRebalanceComplete(); + verify(consumer); + } + + @Test + public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() { + taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true); + final StreamTask statefulTask0 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RESTORING) + .withInputPartitions(taskId00Partitions).build(); + taskManager.addTask(statefulTask0); Review Comment: The reason is that `TaskManager#addTask()` was added just for testing (we have a few other in the code base). IMO, it is not a clean way to design classes, because people might just use those methods in production code despite the comment on those methods which might lead to errors. Much cleaner and safer is to set the state of the task manager with the `TasksRegistry` mock. Actually, we should remove `TaskManager#addTask()` and refactor `TaskManagerTest`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org