mjsax commented on code in PR #18732:
URL: https://github.com/apache/kafka/pull/18732#discussion_r1933071908
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -295,6 +294,9 @@ private void closeStartupTasks(final Predicate<Task>
predicate) {
// now that we have exclusive ownership of the drained tasks,
close them
for (final Task task : drainedTasks) {
+ // main thread locked the task initially on startup, but has
moved on and will not unlock
+ // so we need to explicitly swap lock ownership here as this
method is called by a StreamThread
+ lockedTasksToOwner.replace(task.id(), Thread.currentThread());
Review Comment:
I understand that `StandbyTask.closeClean()` would block, as it cannot get
the lock. I guess it only on the `StandbyTask.closeClean()` pass, because
"startup tasks" are always standbys, right? So it can never happen that we call
`StreamTask.closeClean()` for cleanup an unused "startup task".
However, I am not sure if I understand this fix. Transferring ownership from
the main thread to `StreamThread` implies that, `StreamThread` would call
`closeStartupTasks(...)` -- I could not find the code that would result in this
call. It seems `closeStartupTasks(...)` is only called from `KafkaStreams`
instance, ie, the main thread, with the exception of the background
state-dir-cleaner thread.
Can you help me out with it?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java:
##########
@@ -107,7 +106,7 @@ public StateDirectoryProcessFile() {
private final boolean hasPersistentStores;
private final boolean hasNamedTopologies;
- private final HashMap<TaskId, Thread> lockedTasksToOwner = new HashMap<>();
+ private final ConcurrentMap<TaskId, Thread> lockedTasksToOwner = new
ConcurrentHashMap<>();
Review Comment:
I think it was used across threads already before... `TaskManager` actually
calls `StateDirectory#removeStartupTask(...)`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]