lucasbru commented on code in PR #12743:
URL: https://github.com/apache/kafka/pull/12743#discussion_r995065939


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1481,6 +1481,36 @@ public void 
shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
         assertThat(taskManager.lockedTaskDirectories(), 
is(singleton(taskId01)));
     }
 
+    @Test
+    public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
+        final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+        expect(consumer.assignment()).andReturn(assigned);
+        consumer.pause(assigned);
+        expectLastCall();

Review Comment:
   Done



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1481,6 +1481,36 @@ public void 
shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
         assertThat(taskManager.lockedTaskDirectories(), 
is(singleton(taskId01)));
     }
 
+    @Test
+    public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
+        final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+        expect(consumer.assignment()).andReturn(assigned);
+        consumer.pause(assigned);
+        expectLastCall();
+        replay(consumer);
+        taskManager.handleRebalanceComplete();
+        verify(consumer);
+    }
+
+    @Test
+    public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() {
+        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true);
+        final StreamTask statefulTask0 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RESTORING)

Review Comment:
   Good point! That was a careless copy/paste from another test.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1481,6 +1481,36 @@ public void 
shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
         assertThat(taskManager.lockedTaskDirectories(), 
is(singleton(taskId01)));
     }
 
+    @Test
+    public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
+        final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+        expect(consumer.assignment()).andReturn(assigned);
+        consumer.pause(assigned);
+        expectLastCall();
+        replay(consumer);
+        taskManager.handleRebalanceComplete();
+        verify(consumer);
+    }
+
+    @Test
+    public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() {
+        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true);
+        final StreamTask statefulTask0 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RESTORING)
+            .withInputPartitions(taskId00Partitions).build();
+        taskManager.addTask(statefulTask0);

Review Comment:
   Done. But out of interest, why not create a Tasks as it seems to be 
suppoorted by setUpTaskManager?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1481,6 +1481,36 @@ public void 
shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
         assertThat(taskManager.lockedTaskDirectories(), 
is(singleton(taskId01)));
     }
 
+    @Test
+    public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
+        final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+        expect(consumer.assignment()).andReturn(assigned);
+        consumer.pause(assigned);
+        expectLastCall();
+        replay(consumer);
+        taskManager.handleRebalanceComplete();
+        verify(consumer);
+    }
+
+    @Test
+    public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() {
+        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true);
+        final StreamTask statefulTask0 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RESTORING)
+            .withInputPartitions(taskId00Partitions).build();
+        taskManager.addTask(statefulTask0);
+        final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+
+        expect(consumer.assignment()).andReturn(assigned);
+        consumer.pause(assigned);
+        expectLastCall();

Review Comment:
   Done



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1481,6 +1481,36 @@ public void 
shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
         assertThat(taskManager.lockedTaskDirectories(), 
is(singleton(taskId01)));
     }
 
+    @Test
+    public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
+        final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+        expect(consumer.assignment()).andReturn(assigned);
+        consumer.pause(assigned);
+        expectLastCall();
+        replay(consumer);
+        taskManager.handleRebalanceComplete();
+        verify(consumer);
+    }
+
+    @Test
+    public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() {
+        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true);
+        final StreamTask statefulTask0 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RESTORING)
+            .withInputPartitions(taskId00Partitions).build();
+        taskManager.addTask(statefulTask0);
+        final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+
+        expect(consumer.assignment()).andReturn(assigned);
+        consumer.pause(assigned);
+        expectLastCall();
+        consumer.resume(mkSet(t1p0));
+        expectLastCall();

Review Comment:
   Done



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1481,6 +1481,36 @@ public void 
shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
         assertThat(taskManager.lockedTaskDirectories(), 
is(singleton(taskId01)));
     }
 
+    @Test
+    public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
+        final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+        expect(consumer.assignment()).andReturn(assigned);
+        consumer.pause(assigned);
+        expectLastCall();
+        replay(consumer);
+        taskManager.handleRebalanceComplete();
+        verify(consumer);
+    }
+
+    @Test
+    public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() {
+        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true);
+        final StreamTask statefulTask0 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RESTORING)
+            .withInputPartitions(taskId00Partitions).build();
+        taskManager.addTask(statefulTask0);
+        final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+
+        expect(consumer.assignment()).andReturn(assigned);
+        consumer.pause(assigned);
+        expectLastCall();
+        consumer.resume(mkSet(t1p0));
+        expectLastCall();
+        replay(consumer);
+        taskManager.handleRebalanceComplete();

Review Comment:
   Done



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##########
@@ -177,6 +177,14 @@ void handleRebalanceComplete() {
         // before then the assignment has not been updated yet.
         mainConsumer.pause(mainConsumer.assignment());
 
+        if (stateUpdater != null) {
+            // All tasks that need restoration are now owned by the state 
updater.
+            // All tasks that are owned by the task manager are ready and can 
be resumed immediately.
+            for (final Task t : tasks.allTasks()) {
+                mainConsumer.resume(t.inputPartitions());
+            }
+        }

Review Comment:
   Thanks, yes that's cleaner. I wanted to avoid the extra collections since 
resume/pause looked mostly switching a boolean flag, but looking at it again, 
there is more going on. 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java:
##########
@@ -1481,6 +1481,36 @@ public void 
shouldTryToLockValidTaskDirsAtRebalanceStart() throws Exception {
         assertThat(taskManager.lockedTaskDirectories(), 
is(singleton(taskId01)));
     }
 
+    @Test
+    public void shouldPauseAllTopicsWithoutStateUpdaterOnRebalanceComplete() {
+        final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+        expect(consumer.assignment()).andReturn(assigned);
+        consumer.pause(assigned);
+        expectLastCall();
+        replay(consumer);
+        taskManager.handleRebalanceComplete();
+        verify(consumer);
+    }
+
+    @Test
+    public void shouldNotPauseReadyTasksWithStateUpdaterOnRebalanceComplete() {
+        taskManager = 
setUpTaskManager(StreamsConfigUtils.ProcessingMode.AT_LEAST_ONCE, true);
+        final StreamTask statefulTask0 = statefulTask(taskId00, 
taskId00ChangelogPartitions)
+            .inState(State.RESTORING)
+            .withInputPartitions(taskId00Partitions).build();
+        taskManager.addTask(statefulTask0);
+        final Set<TopicPartition> assigned = mkSet(t1p0, t1p1);
+

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to