[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks

2021-03-30 Thread GitBox


ableegoldman commented on a change in pull request #10342:
URL: https://github.com/apache/kafka/pull/10342#discussion_r604366210



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -444,22 +408,30 @@ public synchronized void clean() {
  * Remove the directories for any {@link TaskId}s that are no-longer
  * owned by this {@link StreamThread} and aren't locked by either
  * another process or another {@link StreamThread}
- * @param cleanupDelayMs only remove directories if they haven't been 
modified for at least
- *   this amount of time (milliseconds)
+ * @param cleanupDelayMsonly remove directories if they haven't 
been modified for at least
+ *  this amount of time (milliseconds)
+ * @param currentThreadNamesthe names of all non-DEAD stream threads 
so we can clean up any
+ *  orphaned task directories
  */
-public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
+public synchronized void cleanRemovedTasks(final long cleanupDelayMs, 
final Set currentThreadNames) {
 try {
-cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
+cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs, 
currentThreadNames);
 } catch (final Exception cannotHappen) {
 throw new IllegalStateException("Should have swallowed 
exception.", cannotHappen);
 }
 }
 
-private void cleanRemovedTasksCalledByCleanerThread(final long 
cleanupDelayMs) {
+private void cleanRemovedTasksCalledByCleanerThread(final long 
cleanupDelayMs, final Set currentThreadNames) {
 for (final File taskDir : listNonEmptyTaskDirectories()) {
 final String dirName = taskDir.getName();
 final TaskId id = TaskId.parse(dirName);
-if (!locks.containsKey(id)) {
+
+final String owningThread = lockedTasksToStreamThreadOwner.get(id);
+if (owningThread != null && 
!currentThreadNames.contains(owningThread)) {
+log.warn("Deleting lock for task directory {} since the thread 
owning the lock is gone: {}", id, owningThread);

Review comment:
   Ok actually I decided to do this in a separate PR, to keep this from 
creeping scope and to unblock this while I work on adding tests for 
KAFKA-10563. Reverted these changes from this PR for now




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks

2021-03-22 Thread GitBox


ableegoldman commented on a change in pull request #10342:
URL: https://github.com/apache/kafka/pull/10342#discussion_r599077785



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -410,8 +374,8 @@ public void close() {
 }
 
 // all threads should be stopped and cleaned up by now, so none 
should remain holding a lock
-if (locks.isEmpty()) {
-log.error("Some task directories still locked while closing 
state, this indicates unclean shutdown: {}", locks);
+if (lockedTasksToStreamThreadOwner.isEmpty()) {

Review comment:
   Ahhh yeah it should. I kept wondering why I was seeing this sometimes in 
the soak or various unit tests, and just assumed it was because of an unclean 
shutdown 臘‍♀️. Thanks for catching this




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks

2021-03-22 Thread GitBox


ableegoldman commented on a change in pull request #10342:
URL: https://github.com/apache/kafka/pull/10342#discussion_r599076729



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -444,22 +408,30 @@ public synchronized void clean() {
  * Remove the directories for any {@link TaskId}s that are no-longer
  * owned by this {@link StreamThread} and aren't locked by either
  * another process or another {@link StreamThread}
- * @param cleanupDelayMs only remove directories if they haven't been 
modified for at least
- *   this amount of time (milliseconds)
+ * @param cleanupDelayMsonly remove directories if they haven't 
been modified for at least
+ *  this amount of time (milliseconds)
+ * @param currentThreadNamesthe names of all non-DEAD stream threads 
so we can clean up any
+ *  orphaned task directories
  */
-public synchronized void cleanRemovedTasks(final long cleanupDelayMs) {
+public synchronized void cleanRemovedTasks(final long cleanupDelayMs, 
final Set currentThreadNames) {
 try {
-cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs);
+cleanRemovedTasksCalledByCleanerThread(cleanupDelayMs, 
currentThreadNames);
 } catch (final Exception cannotHappen) {
 throw new IllegalStateException("Should have swallowed 
exception.", cannotHappen);
 }
 }
 
-private void cleanRemovedTasksCalledByCleanerThread(final long 
cleanupDelayMs) {
+private void cleanRemovedTasksCalledByCleanerThread(final long 
cleanupDelayMs, final Set currentThreadNames) {
 for (final File taskDir : listNonEmptyTaskDirectories()) {
 final String dirName = taskDir.getName();
 final TaskId id = TaskId.parse(dirName);
-if (!locks.containsKey(id)) {
+
+final String owningThread = lockedTasksToStreamThreadOwner.get(id);
+if (owningThread != null && 
!currentThreadNames.contains(owningThread)) {
+log.warn("Deleting lock for task directory {} since the thread 
owning the lock is gone: {}", id, owningThread);

Review comment:
   See [KAFKA-10563](https://issues.apache.org/jira/browse/KAFKA-10563) -- 
the short answer is that yes, it should never happen. But the long answer is 
that it may be possible during some exceptional case, or if we introduce a bug 
somewhere, and since users may choose to replace a thread which died and failed 
during the cleanup we must make sure this task is not permanently blocked from 
any other thread ever picking it up.
   There are probably better ways to guard against this possibility, hence I'm 
not considering this PR to fully address 
[KAFKA-10563](https://issues.apache.org/jira/browse/KAFKA-10563), but I did 
want to put in at least some way to escape the situation of an orphaned task 
directory




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks

2021-03-19 Thread GitBox


ableegoldman commented on a change in pull request #10342:
URL: https://github.com/apache/kafka/pull/10342#discussion_r597427365



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -288,50 +277,31 @@ private String logPrefix() {
  * Get the lock for the {@link TaskId}s directory if it is available
  * @param taskId task id
  * @return true if successful
- * @throws IOException if the file cannot be created or file handle cannot 
be grabbed, should be considered as fatal
  */
-synchronized boolean lock(final TaskId taskId) throws IOException {
+synchronized boolean lock(final TaskId taskId) {
 if (!hasPersistentStores) {
 return true;
 }
 
-final File lockFile;
-// we already have the lock so bail out here
-final LockAndOwner lockAndOwner = locks.get(taskId);
-if (lockAndOwner != null && 
lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
-log.trace("{} Found cached state dir lock for task {}", 
logPrefix(), taskId);
+final String lockOwner = lockedTasksToStreamThreadOwner.get(taskId);
+if (lockOwner != null) {
+if (lockOwner.equals(Thread.currentThread().getName())) {

Review comment:
   Hm I can't seem to find that ticket, maybe I just thought about filing 
one. Anyways here's https://issues.apache.org/jira/browse/KAFKA-12509




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks

2021-03-18 Thread GitBox


ableegoldman commented on a change in pull request #10342:
URL: https://github.com/apache/kafka/pull/10342#discussion_r597424785



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -288,50 +277,31 @@ private String logPrefix() {
  * Get the lock for the {@link TaskId}s directory if it is available
  * @param taskId task id
  * @return true if successful
- * @throws IOException if the file cannot be created or file handle cannot 
be grabbed, should be considered as fatal
  */
-synchronized boolean lock(final TaskId taskId) throws IOException {
+synchronized boolean lock(final TaskId taskId) {
 if (!hasPersistentStores) {
 return true;
 }
 
-final File lockFile;
-// we already have the lock so bail out here
-final LockAndOwner lockAndOwner = locks.get(taskId);
-if (lockAndOwner != null && 
lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
-log.trace("{} Found cached state dir lock for task {}", 
logPrefix(), taskId);
+final String lockOwner = lockedTasksToStreamThreadOwner.get(taskId);
+if (lockOwner != null) {
+if (lockOwner.equals(Thread.currentThread().getName())) {

Review comment:
   Yeah I agree, I filed a ticket to improve the locking mechanism a little 
while back. Just wanted to keep this PR focused




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks

2021-03-18 Thread GitBox


ableegoldman commented on a change in pull request #10342:
URL: https://github.com/apache/kafka/pull/10342#discussion_r596443720



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -186,7 +186,7 @@ public ProcessorStateManager(final TaskId taskId,
 this.changelogReader = changelogReader;
 this.sourcePartitions = sourcePartitions;
 
-this.baseDir = stateDirectory.directoryForTask(taskId);
+this.baseDir = stateDirectory.getOrCreateDirectoryForTask(taskId);

Review comment:
   Note: the only logical changes are in `StateDirectory` and 
`KafkaStreams`, the rest of the files were just touched by renaming this method 
to include the `getOrCreate` prefix. Sorry for the expanded surface area, I 
felt the renaming was merited since otherwise this critical functionality is 
easy to miss. 
   
   There are also some non-renaming changes in StateManagerUtil and TaskManager 
that just involve removing the try-catch logic for the no longer throwable 
IOException. For tests, you should focus on `StateDirectoryTest` and 
`StateManagerUtilTest`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks

2021-03-18 Thread GitBox


ableegoldman commented on a change in pull request #10342:
URL: https://github.com/apache/kafka/pull/10342#discussion_r597281724



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -477,15 +441,7 @@ private void cleanRemovedTasksCalledByCleanerThread(final 
long cleanupDelayMs) {
 exception
 );
 } finally {
-try {
-unlock(id);
-} catch (final IOException exception) {
-log.warn(
-String.format("%s Swallowed the following 
exception during unlocking after deletion of obsolete " +
-"state directory %s for task %s:", 
logPrefix(), dirName, id),
-exception
-);
-}
+unlock(id);

Review comment:
   I think we need to keep that one because `Utils.delete` can throw 
IOException too




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks

2021-03-18 Thread GitBox


ableegoldman commented on a change in pull request #10342:
URL: https://github.com/apache/kafka/pull/10342#discussion_r597278900



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -288,50 +277,31 @@ private String logPrefix() {
  * Get the lock for the {@link TaskId}s directory if it is available
  * @param taskId task id
  * @return true if successful
- * @throws IOException if the file cannot be created or file handle cannot 
be grabbed, should be considered as fatal
  */
-synchronized boolean lock(final TaskId taskId) throws IOException {
+synchronized boolean lock(final TaskId taskId) {
 if (!hasPersistentStores) {
 return true;
 }
 
-final File lockFile;
-// we already have the lock so bail out here
-final LockAndOwner lockAndOwner = locks.get(taskId);
-if (lockAndOwner != null && 
lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
-log.trace("{} Found cached state dir lock for task {}", 
logPrefix(), taskId);
-return true;
-} else if (lockAndOwner != null) {
-// another thread owns the lock
-return false;
-}
-
-try {
-lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME);
-} catch (final ProcessorStateException e) {
-// directoryForTask could be throwing an exception if another 
thread
-// has concurrently deleted the directory
-return false;
-}
-
-final FileChannel channel;
-
-try {
-channel = getOrCreateFileChannel(taskId, lockFile.toPath());
-} catch (final NoSuchFileException e) {
-// FileChannel.open(..) could throw NoSuchFileException when there 
is another thread
-// concurrently deleting the parent directory (i.e. the directory 
of the taskId) of the lock
-// file, in this case we will return immediately indicating 
locking failed.
+final String lockOwner = lockedTasksToStreamThreadOwner.get(taskId);
+if (lockOwner != null) {
+if (lockOwner.equals(Thread.currentThread().getName())) {
+log.trace("{} Found cached state dir lock for task {}", 
logPrefix(), taskId);
+// we already own the lock
+return true;
+} else {
+// another thread owns the lock
+return false;
+}
+} else if (!stateDir.exists()) {

Review comment:
   Good point -- I put this in there because there was a test for this 
behavior, but actually that makes no sense. Will do




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks

2021-03-17 Thread GitBox


ableegoldman commented on a change in pull request #10342:
URL: https://github.com/apache/kafka/pull/10342#discussion_r596468362



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
##
@@ -190,31 +167,7 @@ public void testCloseStateManagerClean() throws 
IOException {
 }
 
 @Test
-public void testCloseStateManagerThrowsExceptionWhenClean() throws 
IOException {

Review comment:
   Removed `testCloseStateManagerOnlyThrowsFirstExceptionWhenClean` since 
there's now only one possible source of exceptions in the clean path, and the 
below test `testCloseStateManagerOnlyThrowsFirstExceptionWhenClean` covers this 
case





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks

2021-03-17 Thread GitBox


ableegoldman commented on a change in pull request #10342:
URL: https://github.com/apache/kafka/pull/10342#discussion_r596466082



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
##
@@ -117,30 +117,7 @@ public void 
testRegisterStateStoreFailToLockStateDirectory() throws IOException
 }
 
 @Test
-public void 
testRegisterStateStoreLockThrowIOExceptionWrappedAsStreamException() throws 
IOException {

Review comment:
   Removed 
`testRegisterStateStoreLockThrowIOExceptionWrappedAsStreamException` since 
`lock` no longer throws IOException





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks

2021-03-17 Thread GitBox


ableegoldman commented on a change in pull request #10342:
URL: https://github.com/apache/kafka/pull/10342#discussion_r596447157



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
##
@@ -152,33 +152,15 @@ private void assertPermissions(final File file) {
 @Test
 public void shouldCreateTaskStateDirectory() {
 final TaskId taskId = new TaskId(0, 0);
-final File taskDirectory = directory.directoryForTask(taskId);
+final File taskDirectory = 
directory.getOrCreateDirectoryForTask(taskId);
 assertTrue(taskDirectory.exists());
 assertTrue(taskDirectory.isDirectory());
 }
 
 @Test
-public void shouldLockTaskStateDirectory() throws IOException {
+public void shouldBeTrueIfAlreadyHoldsLock() {
 final TaskId taskId = new TaskId(0, 0);
-final File taskDirectory = directory.directoryForTask(taskId);
-
-directory.lock(taskId);
-
-try (
-final FileChannel channel = FileChannel.open(
-new File(taskDirectory, LOCK_FILE_NAME).toPath(),
-StandardOpenOption.CREATE, StandardOpenOption.WRITE)
-) {
-assertThrows(OverlappingFileLockException.class, channel::tryLock);
-} finally {
-directory.unlock(taskId);
-}
-}
-
-@Test
-public void shouldBeTrueIfAlreadyHoldsLock() throws IOException {
-final TaskId taskId = new TaskId(0, 0);
-directory.directoryForTask(taskId);

Review comment:
   The diff here is a little weird, but essentially I just removed the 
`shouldLockTaskStateDirectory` test since it would do nothing except call 
`lock()` and `unlock()` once you take out the FileChannel stuff, and that's 
already covered by other tests





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks

2021-03-17 Thread GitBox


ableegoldman commented on a change in pull request #10342:
URL: https://github.com/apache/kafka/pull/10342#discussion_r596446057



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
##
@@ -86,15 +86,8 @@ static void registerStateStores(final Logger log,
 }
 
 final TaskId id = stateMgr.taskId();
-try {
-if (!stateDirectory.lock(id)) {
-throw new LockException(String.format("%sFailed to lock the 
state directory for task %s", logPrefix, id));
-}
-} catch (final IOException e) {
-throw new StreamsException(
-String.format("%sFatal error while trying to lock the state 
directory for task %s", logPrefix, id),
-e
-);
+if (!stateDirectory.lock(id)) {
+throw new LockException(String.format("%sFailed to lock the state 
directory for task %s", logPrefix, id));

Review comment:
   No logical changes here (or below in TaskManager), just cleaning up the 
try-catch block now that we no longer throw IOException





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks

2021-03-17 Thread GitBox


ableegoldman commented on a change in pull request #10342:
URL: https://github.com/apache/kafka/pull/10342#discussion_r596443720



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -186,7 +186,7 @@ public ProcessorStateManager(final TaskId taskId,
 this.changelogReader = changelogReader;
 this.sourcePartitions = sourcePartitions;
 
-this.baseDir = stateDirectory.directoryForTask(taskId);
+this.baseDir = stateDirectory.getOrCreateDirectoryForTask(taskId);

Review comment:
   Note: the only logical changes are in `StateDirectory` and 
`StateDirectoryTest`, the rest of the files were just touched by renaming this 
method to include the `getOrCreate` prefix, since this is otherwise pretty 
opaque.
   There are also some non-renaming changes in StateManagerUtil and TaskManager 
that just involve removing the try-catch logic for the no longer throwable 
IOException





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks

2021-03-17 Thread GitBox


ableegoldman commented on a change in pull request #10342:
URL: https://github.com/apache/kafka/pull/10342#discussion_r596445392



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##
@@ -288,50 +277,28 @@ private String logPrefix() {
  * Get the lock for the {@link TaskId}s directory if it is available
  * @param taskId task id
  * @return true if successful
- * @throws IOException if the file cannot be created or file handle cannot 
be grabbed, should be considered as fatal
  */
-synchronized boolean lock(final TaskId taskId) throws IOException {
+synchronized boolean lock(final TaskId taskId) {
 if (!hasPersistentStores) {
 return true;
 }
 
-final File lockFile;
-// we already have the lock so bail out here
-final LockAndOwner lockAndOwner = locks.get(taskId);
-if (lockAndOwner != null && 
lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
-log.trace("{} Found cached state dir lock for task {}", 
logPrefix(), taskId);
+final String lockOwner = lockedTasksToStreamThreadOwner.get(taskId);
+if (lockOwner != null) {
+if (lockOwner.equals(Thread.currentThread().getName())) {
+log.trace("{} Found cached state dir lock for task {}", 
logPrefix(), taskId);
+// we already own the lock
+return true;
+} else {
+// another thread owns the lock
+return false;
+}
+} else {
+lockedTasksToStreamThreadOwner.put(taskId, 
Thread.currentThread().getName());
+// make sure the directory actually exists, and create it if not
+getOrCreateDirectoryForTask(taskId);
 return true;
-} else if (lockAndOwner != null) {
-// another thread owns the lock
-return false;
-}
-
-try {
-lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME);
-} catch (final ProcessorStateException e) {
-// directoryForTask could be throwing an exception if another 
thread
-// has concurrently deleted the directory

Review comment:
   We don't need to worry about this condition now -- another thread can 
only delete the task directory if it owns the lock. In the new `lock()` 
implementation, this means when we call `getOrCreateDirectroyForTask` (the new 
name for this method) it's not possible for some other thread to concurrently 
wipe out the task dir from beneath us, since we've already grabbed the lock by 
then.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10342: KAFKA-12288: remove task-level filesystem locks

2021-03-17 Thread GitBox


ableegoldman commented on a change in pull request #10342:
URL: https://github.com/apache/kafka/pull/10342#discussion_r596443720



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
##
@@ -186,7 +186,7 @@ public ProcessorStateManager(final TaskId taskId,
 this.changelogReader = changelogReader;
 this.sourcePartitions = sourcePartitions;
 
-this.baseDir = stateDirectory.directoryForTask(taskId);
+this.baseDir = stateDirectory.getOrCreateDirectoryForTask(taskId);

Review comment:
   Note: the only logical changes are in `StateDirectory` and 
`StateDirectoryTest`, the rest of the files were just touched by renaming this 
method to include the `getOrCreate` prefix, since this is otherwise pretty 
opaque.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org