lucasbru commented on code in PR #14281:
URL: https://github.com/apache/kafka/pull/14281#discussion_r1331866929


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java:
##########
@@ -194,12 +204,18 @@ public KafkaFuture<Void> lockTasks(final Set<TaskId> 
taskIds) {
                 }
 
                 if (assignedTasks.containsKey(taskId)) {
-                    final KafkaFuture<StreamTask> future = 
assignedTasks.get(taskId).unassign();
+                    final TaskExecutor executor = assignedTasks.get(taskId);
+                    log.debug("Requesting release of task {} from {}", taskId, 
executor.name());
+                    final KafkaFuture<StreamTask> future = executor.unassign();
                     future.whenComplete((streamTask, throwable) -> {
                         if (throwable != null) {
                             result.completeExceptionally(throwable);
                         } else {
-                            remainingTaskIds.remove(streamTask.id());
+                            assert !assignedTasks.containsKey(taskId);
+                            // It can happen that the executor handed back the 
task before we asked it to
+                            // in which case `streamTask` will be null here.
+                            assert streamTask == null || streamTask.id() == 
taskId;

Review Comment:
   You use assertions to add extra checks to fail early during development, 
refactoring and testing which simplifies debugging and also serves as 
executable documentation. However, assertions will be disabled in production 
runs, both to not slow down the code and to avoid crashing in production just 
because the assertion is wrong.
   
   Using `IllegalStateException` in these places can backfire. An example is 
this bug, which caused several incidents: 
https://github.com/apache/kafka/pull/13534 where a "this should not happen 
here" kind of check regressed into not being true anymore.
   
   `IllegalStateException` may still make sense if you want to avoid something 
from happening that is worse than crashing, or if you want to catch it.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java:
##########
@@ -60,29 +64,52 @@ public void setUp() {
         
when(taskManager.assignNextTask(taskExecutor)).thenReturn(task).thenReturn(null);
         when(taskExecutionMetadata.canProcessTask(eq(task), 
anyLong())).thenReturn(true);
         when(task.isProcessable(anyLong())).thenReturn(true);
+        when(task.id()).thenReturn(new TaskId(0, 0, "A"));
         when(task.process(anyLong())).thenReturn(true);
         when(task.prepareCommit()).thenReturn(Collections.emptyMap());
     }
 
     @AfterEach
     public void tearDown() {
-        taskExecutor.shutdown(Duration.ofMinutes(1));
+        taskExecutor.requestShutdown();
+        taskExecutor.awaitShutdown(Duration.ofMinutes(1));
     }
 
     @Test
     public void shouldShutdownTaskExecutor() {
         assertNull(taskExecutor.currentTask(), "Have task assigned before 
startup");
+        assertFalse(taskExecutor.isRunning());
 
         taskExecutor.start();
 
+        assertTrue(taskExecutor.isRunning());
         verify(taskManager, 
timeout(VERIFICATION_TIMEOUT)).assignNextTask(taskExecutor);
 
-        taskExecutor.shutdown(Duration.ofMinutes(1));
+        taskExecutor.requestShutdown();
+        taskExecutor.awaitShutdown(Duration.ofMinutes(1));
 
-        verify(task).prepareCommit();
+        verify(task).flush();
         verify(taskManager).unassignTask(task, taskExecutor);
 
         assertNull(taskExecutor.currentTask(), "Have task assigned after 
shutdown");
+        assertFalse(taskExecutor.isRunning());
+    }
+
+    @Test
+    public void shouldClearTaskReleaseFutureOnShutdown() throws 
InterruptedException {
+        assertNull(taskExecutor.currentTask(), "Have task assigned before 
startup");
+
+        taskExecutor.start();
+
+        verify(taskManager, 
timeout(VERIFICATION_TIMEOUT)).assignNextTask(taskExecutor);
+
+        final KafkaFuture<StreamTask> future = taskExecutor.unassign();
+        taskExecutor.requestShutdown();
+        taskExecutor.awaitShutdown(Duration.ofMinutes(1));
+
+        waitForCondition(future::isDone, "Await for unassign future to 
complete");
+        assertTrue(future.isDone());

Review Comment:
   Done



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java:
##########
@@ -233,6 +236,19 @@ public void 
shouldAssignTasksThatCanBeStreamTimePunctuated() {
         assertNull(taskManager.assignNextTask(taskExecutor));
     }
 
+    @Test
+    public void shouldNotAssignTasksIfUncaughtExceptionPresent() {
+        taskManager.add(Collections.singleton(task));
+        when(tasks.activeTasks()).thenReturn(Collections.singleton(task));
+        
when(taskExecutionMetadata.canPunctuateTask(eq(task))).thenReturn(true);
+        when(task.canPunctuateStreamTime()).thenReturn(true);

Review Comment:
   Done



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java:
##########
@@ -333,5 +360,21 @@ private boolean canProgress(final StreamTask task, final 
long nowMs) {
             taskExecutionMetadata.canProcessTask(task, nowMs) && 
task.isProcessable(nowMs) ||
                 taskExecutionMetadata.canPunctuateTask(task) && 
(task.canPunctuateStreamTime() || task.canPunctuateSystemTime());
     }
+
+    public void start() {

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to