lucasbru commented on code in PR #12743: URL: https://github.com/apache/kafka/pull/12743#discussion_r995065939
########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -1481,6 +1481,36 @@ public void shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception { assertThat(taskManager.lockedTaskDirectories(), is(singleton(taskId01))); } + @Test + public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() { + final Set<TopicPartition> assigned = mkSet(t1p0, t1p1); + expect(consumer.assignment()).andReturn(assigned); + consumer.pause(assigned); + expectLastCall(); Review Comment: Done ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -1481,6 +1481,36 @@ public void shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception { assertThat(taskManager.lockedTaskDirectories(), is(singleton(taskId01))); } + @Test + public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() { + final Set<TopicPartition> assigned = mkSet(t1p0, t1p1); + expect(consumer.assignment()).andReturn(assigned); + consumer.pause(assigned); + expectLastCall(); + replay(consumer); + taskManager.handleRebalanceComplete(); + verify(consumer); + } + + @Test + public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() { + taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true); + final StreamTask statefulTask0 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RESTORING) Review Comment: Good point! That was a careless copy/paste from another test. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -1481,6 +1481,36 @@ public void shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception { assertThat(taskManager.lockedTaskDirectories(), is(singleton(taskId01))); } + @Test + public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() { + final Set<TopicPartition> assigned = mkSet(t1p0, t1p1); + expect(consumer.assignment()).andReturn(assigned); + consumer.pause(assigned); + expectLastCall(); + replay(consumer); + taskManager.handleRebalanceComplete(); + verify(consumer); + } + + @Test + public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() { + taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true); + final StreamTask statefulTask0 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RESTORING) + .withInputPartitions(taskId00Partitions).build(); + taskManager.addTask(statefulTask0); Review Comment: Done. But out of interest, why not create a Tasks as it seems to be suppoorted by setUpTaskManager? ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -1481,6 +1481,36 @@ public void shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception { assertThat(taskManager.lockedTaskDirectories(), is(singleton(taskId01))); } + @Test + public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() { + final Set<TopicPartition> assigned = mkSet(t1p0, t1p1); + expect(consumer.assignment()).andReturn(assigned); + consumer.pause(assigned); + expectLastCall(); + replay(consumer); + taskManager.handleRebalanceComplete(); + verify(consumer); + } + + @Test + public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() { + taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true); + final StreamTask statefulTask0 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RESTORING) + .withInputPartitions(taskId00Partitions).build(); + taskManager.addTask(statefulTask0); + final Set<TopicPartition> assigned = mkSet(t1p0, t1p1); + + expect(consumer.assignment()).andReturn(assigned); + consumer.pause(assigned); + expectLastCall(); Review Comment: Done ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -1481,6 +1481,36 @@ public void shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception { assertThat(taskManager.lockedTaskDirectories(), is(singleton(taskId01))); } + @Test + public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() { + final Set<TopicPartition> assigned = mkSet(t1p0, t1p1); + expect(consumer.assignment()).andReturn(assigned); + consumer.pause(assigned); + expectLastCall(); + replay(consumer); + taskManager.handleRebalanceComplete(); + verify(consumer); + } + + @Test + public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() { + taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true); + final StreamTask statefulTask0 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RESTORING) + .withInputPartitions(taskId00Partitions).build(); + taskManager.addTask(statefulTask0); + final Set<TopicPartition> assigned = mkSet(t1p0, t1p1); + + expect(consumer.assignment()).andReturn(assigned); + consumer.pause(assigned); + expectLastCall(); + consumer.resume(mkSet(t1p0)); + expectLastCall(); Review Comment: Done ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -1481,6 +1481,36 @@ public void shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception { assertThat(taskManager.lockedTaskDirectories(), is(singleton(taskId01))); } + @Test + public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() { + final Set<TopicPartition> assigned = mkSet(t1p0, t1p1); + expect(consumer.assignment()).andReturn(assigned); + consumer.pause(assigned); + expectLastCall(); + replay(consumer); + taskManager.handleRebalanceComplete(); + verify(consumer); + } + + @Test + public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() { + taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true); + final StreamTask statefulTask0 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RESTORING) + .withInputPartitions(taskId00Partitions).build(); + taskManager.addTask(statefulTask0); + final Set<TopicPartition> assigned = mkSet(t1p0, t1p1); + + expect(consumer.assignment()).andReturn(assigned); + consumer.pause(assigned); + expectLastCall(); + consumer.resume(mkSet(t1p0)); + expectLastCall(); + replay(consumer); + taskManager.handleRebalanceComplete(); Review Comment: Done ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ########## @@ -177,6 +177,14 @@ void handleRebalanceComplete() { // before then the assignment has not been updated yet. mainConsumer.pause(mainConsumer.assignment()); + if (stateUpdater != null) { + // All tasks that need restoration are now owned by the state updater. + // All tasks that are owned by the task manager are ready and can be resumed immediately. + for (final Task t : tasks.allTasks()) { + mainConsumer.resume(t.inputPartitions()); + } + } Review Comment: Thanks, yes that's cleaner. I wanted to avoid the extra collections since resume/pause looked mostly switching a boolean flag, but looking at it again, there is more going on. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java: ########## @@ -1481,6 +1481,36 @@ public void shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception { assertThat(taskManager.lockedTaskDirectories(), is(singleton(taskId01))); } + @Test + public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() { + final Set<TopicPartition> assigned = mkSet(t1p0, t1p1); + expect(consumer.assignment()).andReturn(assigned); + consumer.pause(assigned); + expectLastCall(); + replay(consumer); + taskManager.handleRebalanceComplete(); + verify(consumer); + } + + @Test + public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() { + taskManager = setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true); + final StreamTask statefulTask0 = statefulTask(taskId00, taskId00ChangelogPartitions) + .inState(State.RESTORING) + .withInputPartitions(taskId00Partitions).build(); + taskManager.addTask(statefulTask0); + final Set<TopicPartition> assigned = mkSet(t1p0, t1p1); + Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org