lucasbru commented on code in PR #14281: URL: https://github.com/apache/kafka/pull/14281#discussion_r1331866929
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java: ########## @@ -194,12 +204,18 @@ public KafkaFuture<Void> lockTasks(final Set<TaskId> taskIds) { } if (assignedTasks.containsKey(taskId)) { - final KafkaFuture<StreamTask> future = assignedTasks.get(taskId).unassign(); + final TaskExecutor executor = assignedTasks.get(taskId); + log.debug("Requesting release of task {} from {}", taskId, executor.name()); + final KafkaFuture<StreamTask> future = executor.unassign(); future.whenComplete((streamTask, throwable) -> { if (throwable != null) { result.completeExceptionally(throwable); } else { - remainingTaskIds.remove(streamTask.id()); + assert !assignedTasks.containsKey(taskId); + // It can happen that the executor handed back the task before we asked it to + // in which case `streamTask` will be null here. + assert streamTask == null || streamTask.id() == taskId; Review Comment: You use assertions to add extra checks to fail early during development, refactoring and testing which simplifies debugging and also serves as executable documentation. However, assertions will be disabled in production runs, both to not slow down the code and to avoid crashing in production just because the assertion is wrong. Using `IllegalStateException` in these places can backfire. An example is this bug, which caused several incidents: https://github.com/apache/kafka/pull/13534 where a "this should not happen here" kind of check regressed into not being true anymore. `IllegalStateException` may still make sense if you want to avoid something from happening that is worse than crashing, or if you want to catch it. ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java: ########## @@ -60,29 +64,52 @@ public void setUp() { when(taskManager.assignNextTask(taskExecutor)).thenReturn(task).thenReturn(null); when(taskExecutionMetadata.canProcessTask(eq(task), anyLong())).thenReturn(true); when(task.isProcessable(anyLong())).thenReturn(true); + when(task.id()).thenReturn(new TaskId(0, 0, "A")); when(task.process(anyLong())).thenReturn(true); when(task.prepareCommit()).thenReturn(Collections.emptyMap()); } @AfterEach public void tearDown() { - taskExecutor.shutdown(Duration.ofMinutes(1)); + taskExecutor.requestShutdown(); + taskExecutor.awaitShutdown(Duration.ofMinutes(1)); } @Test public void shouldShutdownTaskExecutor() { assertNull(taskExecutor.currentTask(), "Have task assigned before startup"); + assertFalse(taskExecutor.isRunning()); taskExecutor.start(); + assertTrue(taskExecutor.isRunning()); verify(taskManager, timeout(VERIFICATION_TIMEOUT)).assignNextTask(taskExecutor); - taskExecutor.shutdown(Duration.ofMinutes(1)); + taskExecutor.requestShutdown(); + taskExecutor.awaitShutdown(Duration.ofMinutes(1)); - verify(task).prepareCommit(); + verify(task).flush(); verify(taskManager).unassignTask(task, taskExecutor); assertNull(taskExecutor.currentTask(), "Have task assigned after shutdown"); + assertFalse(taskExecutor.isRunning()); + } + + @Test + public void shouldClearTaskReleaseFutureOnShutdown() throws InterruptedException { + assertNull(taskExecutor.currentTask(), "Have task assigned before startup"); + + taskExecutor.start(); + + verify(taskManager, timeout(VERIFICATION_TIMEOUT)).assignNextTask(taskExecutor); + + final KafkaFuture<StreamTask> future = taskExecutor.unassign(); + taskExecutor.requestShutdown(); + taskExecutor.awaitShutdown(Duration.ofMinutes(1)); + + waitForCondition(future::isDone, "Await for unassign future to complete"); + assertTrue(future.isDone()); Review Comment: Done ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManagerTest.java: ########## @@ -233,6 +236,19 @@ public void shouldAssignTasksThatCanBeStreamTimePunctuated() { assertNull(taskManager.assignNextTask(taskExecutor)); } + @Test + public void shouldNotAssignTasksIfUncaughtExceptionPresent() { + taskManager.add(Collections.singleton(task)); + when(tasks.activeTasks()).thenReturn(Collections.singleton(task)); + when(taskExecutionMetadata.canPunctuateTask(eq(task))).thenReturn(true); + when(task.canPunctuateStreamTime()).thenReturn(true); Review Comment: Done ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskManager.java: ########## @@ -333,5 +360,21 @@ private boolean canProgress(final StreamTask task, final long nowMs) { taskExecutionMetadata.canProcessTask(task, nowMs) && task.isProcessable(nowMs) || taskExecutionMetadata.canPunctuateTask(task) && (task.canPunctuateStreamTime() || task.canPunctuateSystemTime()); } + + public void start() { Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org