[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks
ableegoldman commented on a change in pull request #10342: URL: https://github.com/apache/kafka/pull/10342#discussion_r604366210 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -444,22 +408,30 @@ public synchronized void clean() { * Remove the directories for any {@link TaskId}s that are no-longer * owned by this {@link StreamThread} and aren't locked by either * another process or another {@link StreamThread} - * @param cleanupDelayMs only remove directories if they haven't been modified for at least - * this amount of time (milliseconds) + * @param cleanupDelayMsonly remove directories if they haven't been modified for at least + * this amount of time (milliseconds) + * @param currentThreadNamesthe names of all non-DEAD stream threads so we can clean up any + * orphaned task directories */ -public synchronized void cleanRemovedTasks(final long cleanupDelayMs) { +public synchronized void cleanRemovedTasks(final long cleanupDelayMs, final Set currentThreadNames) { try { -cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs); +cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs, currentThreadNames); } catch (final Exception cannotHappen) { throw new IllegalStateException("Should have swallowed exception.", cannotHappen); } } -private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) { +private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs, final Set currentThreadNames) { for (final File taskDir : listNonEmptyTaskDirectories()) { final String dirName = taskDir.getName(); final TaskId id = TaskId.parse(dirName); -if (!locks.containsKey(id)) { + +final String owningThread = lockedTasksToStreamThreadOwner.get(id); +if (owningThread != null && !currentThreadNames.contains(owningThread)) { +log.warn("Deleting lock for task directory {} since the thread owning the lock is gone: {}", id, owningThread); Review comment: Ok actually I decided to do this in a separate PR, to keep this from creeping scope and to unblock this while I work on adding tests for KAFKA-10563. Reverted these changes from this PR for now -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks
ableegoldman commented on a change in pull request #10342: URL: https://github.com/apache/kafka/pull/10342#discussion_r599077785 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -410,8 +374,8 @@ public void close() { } // all threads should be stopped and cleaned up by now, so none should remain holding a lock -if (locks.isEmpty()) { -log.error("Some task directories still locked while closing state, this indicates unclean shutdown: {}", locks); +if (lockedTasksToStreamThreadOwner.isEmpty()) { Review comment: Ahhh yeah it should. I kept wondering why I was seeing this sometimes in the soak or various unit tests, and just assumed it was because of an unclean shutdown 臘♀️. Thanks for catching this -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks
ableegoldman commented on a change in pull request #10342: URL: https://github.com/apache/kafka/pull/10342#discussion_r599076729 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -444,22 +408,30 @@ public synchronized void clean() { * Remove the directories for any {@link TaskId}s that are no-longer * owned by this {@link StreamThread} and aren't locked by either * another process or another {@link StreamThread} - * @param cleanupDelayMs only remove directories if they haven't been modified for at least - * this amount of time (milliseconds) + * @param cleanupDelayMsonly remove directories if they haven't been modified for at least + * this amount of time (milliseconds) + * @param currentThreadNamesthe names of all non-DEAD stream threads so we can clean up any + * orphaned task directories */ -public synchronized void cleanRemovedTasks(final long cleanupDelayMs) { +public synchronized void cleanRemovedTasks(final long cleanupDelayMs, final Set currentThreadNames) { try { -cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs); +cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs, currentThreadNames); } catch (final Exception cannotHappen) { throw new IllegalStateException("Should have swallowed exception.", cannotHappen); } } -private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) { +private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs, final Set currentThreadNames) { for (final File taskDir : listNonEmptyTaskDirectories()) { final String dirName = taskDir.getName(); final TaskId id = TaskId.parse(dirName); -if (!locks.containsKey(id)) { + +final String owningThread = lockedTasksToStreamThreadOwner.get(id); +if (owningThread != null && !currentThreadNames.contains(owningThread)) { +log.warn("Deleting lock for task directory {} since the thread owning the lock is gone: {}", id, owningThread); Review comment: See [KAFKA-10563](https://issues.apache.org/jira/browse/KAFKA-10563) -- the short answer is that yes, it should never happen. But the long answer is that it may be possible during some exceptional case, or if we introduce a bug somewhere, and since users may choose to replace a thread which died and failed during the cleanup we must make sure this task is not permanently blocked from any other thread ever picking it up. There are probably better ways to guard against this possibility, hence I'm not considering this PR to fully address [KAFKA-10563](https://issues.apache.org/jira/browse/KAFKA-10563), but I did want to put in at least some way to escape the situation of an orphaned task directory -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks
ableegoldman commented on a change in pull request #10342: URL: https://github.com/apache/kafka/pull/10342#discussion_r597427365 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -288,50 +277,31 @@ private String logPrefix() { * Get the lock for the {@link TaskId}s directory if it is available * @param taskId task id * @return true if successful - * @throws IOException if the file cannot be created or file handle cannot be grabbed, should be considered as fatal */ -synchronized boolean lock(final TaskId taskId) throws IOException { +synchronized boolean lock(final TaskId taskId) { if (!hasPersistentStores) { return true; } -final File lockFile; -// we already have the lock so bail out here -final LockAndOwner lockAndOwner = locks.get(taskId); -if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread().getName())) { -log.trace("{} Found cached state dir lock for task {}", logPrefix(), taskId); +final String lockOwner = lockedTasksToStreamThreadOwner.get(taskId); +if (lockOwner != null) { +if (lockOwner.equals(Thread.currentThread().getName())) { Review comment: Hm I can't seem to find that ticket, maybe I just thought about filing one. Anyways here's https://issues.apache.org/jira/browse/KAFKA-12509 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks
ableegoldman commented on a change in pull request #10342: URL: https://github.com/apache/kafka/pull/10342#discussion_r597424785 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -288,50 +277,31 @@ private String logPrefix() { * Get the lock for the {@link TaskId}s directory if it is available * @param taskId task id * @return true if successful - * @throws IOException if the file cannot be created or file handle cannot be grabbed, should be considered as fatal */ -synchronized boolean lock(final TaskId taskId) throws IOException { +synchronized boolean lock(final TaskId taskId) { if (!hasPersistentStores) { return true; } -final File lockFile; -// we already have the lock so bail out here -final LockAndOwner lockAndOwner = locks.get(taskId); -if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread().getName())) { -log.trace("{} Found cached state dir lock for task {}", logPrefix(), taskId); +final String lockOwner = lockedTasksToStreamThreadOwner.get(taskId); +if (lockOwner != null) { +if (lockOwner.equals(Thread.currentThread().getName())) { Review comment: Yeah I agree, I filed a ticket to improve the locking mechanism a little while back. Just wanted to keep this PR focused -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks
ableegoldman commented on a change in pull request #10342: URL: https://github.com/apache/kafka/pull/10342#discussion_r596443720 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -186,7 +186,7 @@ public ProcessorStateManager(final TaskId taskId, this.changelogReader = changelogReader; this.sourcePartitions = sourcePartitions; -this.baseDir = stateDirectory.directoryForTask(taskId); +this.baseDir = stateDirectory.getOrCreateDirectoryForTask(taskId); Review comment: Note: the only logical changes are in `StateDirectory` and `KafkaStreams`, the rest of the files were just touched by renaming this method to include the `getOrCreate` prefix. Sorry for the expanded surface area, I felt the renaming was merited since otherwise this critical functionality is easy to miss. There are also some non-renaming changes in StateManagerUtil and TaskManager that just involve removing the try-catch logic for the no longer throwable IOException. For tests, you should focus on `StateDirectoryTest` and `StateManagerUtilTest` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks
ableegoldman commented on a change in pull request #10342: URL: https://github.com/apache/kafka/pull/10342#discussion_r597281724 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -477,15 +441,7 @@ private void cleanRemovedTasksCalledByCleanerThread(final long cleanupDelayMs) { exception ); } finally { -try { -unlock(id); -} catch (final IOException exception) { -log.warn( -String.format("%s Swallowed the following exception during unlocking after deletion of obsolete " + -"state directory %s for task %s:", logPrefix(), dirName, id), -exception -); -} +unlock(id); Review comment: I think we need to keep that one because `Utils.delete` can throw IOException too -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks
ableegoldman commented on a change in pull request #10342: URL: https://github.com/apache/kafka/pull/10342#discussion_r597278900 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -288,50 +277,31 @@ private String logPrefix() { * Get the lock for the {@link TaskId}s directory if it is available * @param taskId task id * @return true if successful - * @throws IOException if the file cannot be created or file handle cannot be grabbed, should be considered as fatal */ -synchronized boolean lock(final TaskId taskId) throws IOException { +synchronized boolean lock(final TaskId taskId) { if (!hasPersistentStores) { return true; } -final File lockFile; -// we already have the lock so bail out here -final LockAndOwner lockAndOwner = locks.get(taskId); -if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread().getName())) { -log.trace("{} Found cached state dir lock for task {}", logPrefix(), taskId); -return true; -} else if (lockAndOwner != null) { -// another thread owns the lock -return false; -} - -try { -lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME); -} catch (final ProcessorStateException e) { -// directoryForTask could be throwing an exception if another thread -// has concurrently deleted the directory -return false; -} - -final FileChannel channel; - -try { -channel = getOrCreateFileChannel(taskId, lockFile.toPath()); -} catch (final NoSuchFileException e) { -// FileChannel.open(..) could throw NoSuchFileException when there is another thread -// concurrently deleting the parent directory (i.e. the directory of the taskId) of the lock -// file, in this case we will return immediately indicating locking failed. +final String lockOwner = lockedTasksToStreamThreadOwner.get(taskId); +if (lockOwner != null) { +if (lockOwner.equals(Thread.currentThread().getName())) { +log.trace("{} Found cached state dir lock for task {}", logPrefix(), taskId); +// we already own the lock +return true; +} else { +// another thread owns the lock +return false; +} +} else if (!stateDir.exists()) { Review comment: Good point -- I put this in there because there was a test for this behavior, but actually that makes no sense. Will do -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks
ableegoldman commented on a change in pull request #10342: URL: https://github.com/apache/kafka/pull/10342#discussion_r596468362 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java ## @@ -190,31 +167,7 @@ public void testCloseStateManagerClean() throws IOException { } @Test -public void testCloseStateManagerThrowsExceptionWhenClean() throws IOException { Review comment: Removed `testCloseStateManagerOnlyThrowsFirstExceptionWhenClean` since there's now only one possible source of exceptions in the clean path, and the below test `testCloseStateManagerOnlyThrowsFirstExceptionWhenClean` covers this case This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks
ableegoldman commented on a change in pull request #10342: URL: https://github.com/apache/kafka/pull/10342#discussion_r596466082 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java ## @@ -117,30 +117,7 @@ public void testRegisterStateStoreFailToLockStateDirectory() throws IOException } @Test -public void testRegisterStateStoreLockThrowIOExceptionWrappedAsStreamException() throws IOException { Review comment: Removed `testRegisterStateStoreLockThrowIOExceptionWrappedAsStreamException` since `lock` no longer throws IOException This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks
ableegoldman commented on a change in pull request #10342: URL: https://github.com/apache/kafka/pull/10342#discussion_r596447157 ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java ## @@ -152,33 +152,15 @@ private void assertPermissions(final File file) { @Test public void shouldCreateTaskStateDirectory() { final TaskId taskId = new TaskId(0, 0); -final File taskDirectory = directory.directoryForTask(taskId); +final File taskDirectory = directory.getOrCreateDirectoryForTask(taskId); assertTrue(taskDirectory.exists()); assertTrue(taskDirectory.isDirectory()); } @Test -public void shouldLockTaskStateDirectory() throws IOException { +public void shouldBeTrueIfAlreadyHoldsLock() { final TaskId taskId = new TaskId(0, 0); -final File taskDirectory = directory.directoryForTask(taskId); - -directory.lock(taskId); - -try ( -final FileChannel channel = FileChannel.open( -new File(taskDirectory, LOCK_FILE_NAME).toPath(), -StandardOpenOption.CREATE, StandardOpenOption.WRITE) -) { -assertThrows(OverlappingFileLockException.class, channel::tryLock); -} finally { -directory.unlock(taskId); -} -} - -@Test -public void shouldBeTrueIfAlreadyHoldsLock() throws IOException { -final TaskId taskId = new TaskId(0, 0); -directory.directoryForTask(taskId); Review comment: The diff here is a little weird, but essentially I just removed the `shouldLockTaskStateDirectory` test since it would do nothing except call `lock()` and `unlock()` once you take out the FileChannel stuff, and that's already covered by other tests This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks
ableegoldman commented on a change in pull request #10342: URL: https://github.com/apache/kafka/pull/10342#discussion_r596446057 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java ## @@ -86,15 +86,8 @@ static void registerStateStores(final Logger log, } final TaskId id = stateMgr.taskId(); -try { -if (!stateDirectory.lock(id)) { -throw new LockException(String.format("%sFailed to lock the state directory for task %s", logPrefix, id)); -} -} catch (final IOException e) { -throw new StreamsException( -String.format("%sFatal error while trying to lock the state directory for task %s", logPrefix, id), -e -); +if (!stateDirectory.lock(id)) { +throw new LockException(String.format("%sFailed to lock the state directory for task %s", logPrefix, id)); Review comment: No logical changes here (or below in TaskManager), just cleaning up the try-catch block now that we no longer throw IOException This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks
ableegoldman commented on a change in pull request #10342: URL: https://github.com/apache/kafka/pull/10342#discussion_r596443720 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -186,7 +186,7 @@ public ProcessorStateManager(final TaskId taskId, this.changelogReader = changelogReader; this.sourcePartitions = sourcePartitions; -this.baseDir = stateDirectory.directoryForTask(taskId); +this.baseDir = stateDirectory.getOrCreateDirectoryForTask(taskId); Review comment: Note: the only logical changes are in `StateDirectory` and `StateDirectoryTest`, the rest of the files were just touched by renaming this method to include the `getOrCreate` prefix, since this is otherwise pretty opaque. There are also some non-renaming changes in StateManagerUtil and TaskManager that just involve removing the try-catch logic for the no longer throwable IOException This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks
ableegoldman commented on a change in pull request #10342: URL: https://github.com/apache/kafka/pull/10342#discussion_r596445392 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ## @@ -288,50 +277,28 @@ private String logPrefix() { * Get the lock for the {@link TaskId}s directory if it is available * @param taskId task id * @return true if successful - * @throws IOException if the file cannot be created or file handle cannot be grabbed, should be considered as fatal */ -synchronized boolean lock(final TaskId taskId) throws IOException { +synchronized boolean lock(final TaskId taskId) { if (!hasPersistentStores) { return true; } -final File lockFile; -// we already have the lock so bail out here -final LockAndOwner lockAndOwner = locks.get(taskId); -if (lockAndOwner != null && lockAndOwner.owningThread.equals(Thread.currentThread().getName())) { -log.trace("{} Found cached state dir lock for task {}", logPrefix(), taskId); +final String lockOwner = lockedTasksToStreamThreadOwner.get(taskId); +if (lockOwner != null) { +if (lockOwner.equals(Thread.currentThread().getName())) { +log.trace("{} Found cached state dir lock for task {}", logPrefix(), taskId); +// we already own the lock +return true; +} else { +// another thread owns the lock +return false; +} +} else { +lockedTasksToStreamThreadOwner.put(taskId, Thread.currentThread().getName()); +// make sure the directory actually exists, and create it if not +getOrCreateDirectoryForTask(taskId); return true; -} else if (lockAndOwner != null) { -// another thread owns the lock -return false; -} - -try { -lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME); -} catch (final ProcessorStateException e) { -// directoryForTask could be throwing an exception if another thread -// has concurrently deleted the directory Review comment: We don't need to worry about this condition now -- another thread can only delete the task directory if it owns the lock. In the new `lock()` implementation, this means when we call `getOrCreateDirectroyForTask` (the new name for this method) it's not possible for some other thread to concurrently wipe out the task dir from beneath us, since we've already grabbed the lock by then. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks
ableegoldman commented on a change in pull request #10342: URL: https://github.com/apache/kafka/pull/10342#discussion_r596443720 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ## @@ -186,7 +186,7 @@ public ProcessorStateManager(final TaskId taskId, this.changelogReader = changelogReader; this.sourcePartitions = sourcePartitions; -this.baseDir = stateDirectory.directoryForTask(taskId); +this.baseDir = stateDirectory.getOrCreateDirectoryForTask(taskId); Review comment: Note: the only logical changes are in `StateDirectory` and `StateDirectoryTest`, the rest of the files were just touched by renaming this method to include the `getOrCreate` prefix, since this is otherwise pretty opaque. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org