Re: [PR] KAFKA-10199: Shutdown with new remove operation in state updater [kafka]
cadonna merged PR #15894: URL: https://github.com/apache/kafka/pull/15894 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10199: Shutdown with new remove operation in state updater [kafka]
lucasbru commented on code in PR #15894: URL: https://github.com/apache/kafka/pull/15894#discussion_r1594047844 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ## @@ -787,9 +796,12 @@ public DefaultStateUpdater(final String name, this.log = logContext.logger(DefaultStateUpdater.class); } -@Override public void start() { if (stateUpdaterThread == null) { +if (!restoredActiveTasks.isEmpty() || !exceptionsAndFailedTasks.isEmpty()) { Review Comment: I wonder why we'd even allow restarting the state updater. but we don't need to fix it now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10199: Shutdown with new remove operation in state updater [kafka]
cadonna commented on code in PR #15894: URL: https://github.com/apache/kafka/pull/15894#discussion_r1593991574 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -954,8 +954,7 @@ private void recycleTaskFromStateUpdater(final Task task, } } -/** Returns true if the task closed clean */ -private boolean closeTaskClean(final Task task, +private void closeTaskClean(final Task task, final Set tasksToCloseDirty, Review Comment: Done! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10199: Shutdown with new remove operation in state updater [kafka]
cadonna commented on code in PR #15894: URL: https://github.com/apache/kafka/pull/15894#discussion_r1593990443 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ## @@ -787,9 +796,12 @@ public DefaultStateUpdater(final String name, this.log = logContext.logger(DefaultStateUpdater.class); } -@Override public void start() { if (stateUpdaterThread == null) { +if (!restoredActiveTasks.isEmpty() || !exceptionsAndFailedTasks.isEmpty()) { Review Comment: I thought that since we allow to restart the state updater, we should verify that the state updater starts with clean queue to avoid invalid states coming from a past run of the state updater. At the moment, we never restart the state updater but I thought having clear invariants might be good. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10199: Shutdown with new remove operation in state updater [kafka]
cadonna commented on code in PR #15894: URL: https://github.com/apache/kafka/pull/15894#discussion_r1593985778 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ## @@ -149,14 +149,25 @@ public void run() { } catch (final RuntimeException anyOtherException) { handleRuntimeException(anyOtherException); } finally { -removeAddedTasksFromInputQueue(); -removeUpdatingAndPausedTasks(); +clearInputQueue(); +updatingTasks.clear(); +pausedTasks.clear(); +changelogReader.clear(); Review Comment: I refactored this code a bit. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10199: Shutdown with new remove operation in state updater [kafka]
cadonna commented on code in PR #15894: URL: https://github.com/apache/kafka/pull/15894#discussion_r1593985438 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ## @@ -149,14 +149,25 @@ public void run() { } catch (final RuntimeException anyOtherException) { handleRuntimeException(anyOtherException); } finally { -removeAddedTasksFromInputQueue(); -removeUpdatingAndPausedTasks(); +clearInputQueue(); +updatingTasks.clear(); +pausedTasks.clear(); +changelogReader.clear(); Review Comment: Before, it was executed in `removeUpdatingAndPausedTasks()`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-10199: Shutdown with new remove operation in state updater [kafka]
lucasbru commented on code in PR #15894: URL: https://github.com/apache/kafka/pull/15894#discussion_r1593911025 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -954,8 +954,7 @@ private void recycleTaskFromStateUpdater(final Task task, } } -/** Returns true if the task closed clean */ -private boolean closeTaskClean(final Task task, +private void closeTaskClean(final Task task, final Set tasksToCloseDirty, Review Comment: indentation ## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ## @@ -149,14 +149,25 @@ public void run() { } catch (final RuntimeException anyOtherException) { handleRuntimeException(anyOtherException); } finally { -removeAddedTasksFromInputQueue(); -removeUpdatingAndPausedTasks(); +clearInputQueue(); +updatingTasks.clear(); +pausedTasks.clear(); +changelogReader.clear(); Review Comment: why was this added? Where was it executed before? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java: ## @@ -787,9 +796,12 @@ public DefaultStateUpdater(final String name, this.log = logContext.logger(DefaultStateUpdater.class); } -@Override public void start() { if (stateUpdaterThread == null) { +if (!restoredActiveTasks.isEmpty() || !exceptionsAndFailedTasks.isEmpty()) { Review Comment: why did you add this? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-10199: Shutdown with new remove operation in state updater [kafka]
cadonna opened a new pull request, #15894: URL: https://github.com/apache/kafka/pull/15894 Uses the new remove operation of the state updater that returns a future to shutdown the task manager. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org