cadonna commented on code in PR #12200:
URL: https://github.com/apache/kafka/pull/12200#discussion_r881295817
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/DefaultStateUpdater.java:
##########
@@ -74,30 +76,44 @@ public Collection<Task> getAllUpdatingTasks() {
return updatingTasks.values();
}
+ public Collection<StandbyTask> getUpdatingStandbyTasks() {
+ return updatingTasks.values().stream()
+ .filter(t -> !t.isActive())
+ .map(t -> (StandbyTask) t)
+ .collect(Collectors.toList());
+ }
+
+ public boolean onlyStandbyTasksLeft() {
+ return !updatingTasks.isEmpty() &&
updatingTasks.values().stream().allMatch(t -> !t.isActive());
Review Comment:
The issue is that `transitToUpdateStandby()` is not idempotent. It even
throws an exception if the store changelog reader is not in `ACTIVE_RESTORING`.
So, if in the state updater all active tasks are restored and there are no
standby tasks, the changelog should not transit to update standbys because if
the next task that is added to the state updater is a standby task,
`transitToUpdateStandby()` would be called which would throw an exception since
it would have been called twice in a row.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]