cadonna commented on a change in pull request #10342:
URL: https://github.com/apache/kafka/pull/10342#discussion_r596720028



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -288,50 +277,31 @@ private String logPrefix() {
      * Get the lock for the {@link TaskId}s directory if it is available
      * @param taskId task id
      * @return true if successful
-     * @throws IOException if the file cannot be created or file handle cannot 
be grabbed, should be considered as fatal
      */
-    synchronized boolean lock(final TaskId taskId) throws IOException {
+    synchronized boolean lock(final TaskId taskId) {
         if (!hasPersistentStores) {
             return true;
         }
 
-        final File lockFile;
-        // we already have the lock so bail out here
-        final LockAndOwner lockAndOwner = locks.get(taskId);
-        if (lockAndOwner != null && 
lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
-            log.trace("{} Found cached state dir lock for task {}", 
logPrefix(), taskId);
-            return true;
-        } else if (lockAndOwner != null) {
-            // another thread owns the lock
-            return false;
-        }
-
-        try {
-            lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME);
-        } catch (final ProcessorStateException e) {
-            // directoryForTask could be throwing an exception if another 
thread
-            // has concurrently deleted the directory
-            return false;
-        }
-
-        final FileChannel channel;
-
-        try {
-            channel = getOrCreateFileChannel(taskId, lockFile.toPath());
-        } catch (final NoSuchFileException e) {
-            // FileChannel.open(..) could throw NoSuchFileException when there 
is another thread
-            // concurrently deleting the parent directory (i.e. the directory 
of the taskId) of the lock
-            // file, in this case we will return immediately indicating 
locking failed.
+        final String lockOwner = lockedTasksToStreamThreadOwner.get(taskId);
+        if (lockOwner != null) {
+            if (lockOwner.equals(Thread.currentThread().getName())) {
+                log.trace("{} Found cached state dir lock for task {}", 
logPrefix(), taskId);
+                // we already own the lock
+                return true;
+            } else {
+                // another thread owns the lock
+                return false;
+            }
+        } else if (!stateDir.exists()) {

Review comment:
       I am wondering if we should throw an `IllegalStateException` here, 
because it seems illegal to me to request a lock of a task directory in a state 
directory that does not exist. 

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -477,15 +441,7 @@ private void cleanRemovedTasksCalledByCleanerThread(final 
long cleanupDelayMs) {
                         exception
                     );
                 } finally {
-                    try {
-                        unlock(id);
-                    } catch (final IOException exception) {
-                        log.warn(
-                            String.format("%s Swallowed the following 
exception during unlocking after deletion of obsolete " +
-                                "state directory %s for task %s:", 
logPrefix(), dirName, id),
-                            exception
-                        );
-                    }
+                    unlock(id);

Review comment:
       I guess you could make the same change on line 474 in 
`cleanRemovedTasksCalledByUser()`, couldn't you?

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
##########
@@ -240,32 +192,30 @@ public void 
testCloseStateManagerOnlyThrowsFirstExceptionWhenClean() throws IOEx
     }
 
     @Test
-    public void testCloseStateManagerThrowsExceptionWhenDirty() throws 
IOException {
+    public void testCloseStateManagerThrowsExceptionWhenDirty() {
         expect(stateManager.taskId()).andReturn(taskId);
 
         expect(stateDirectory.lock(taskId)).andReturn(true);
 
         stateManager.close();
-        expectLastCall();
+        expectLastCall().andThrow(new ProcessorStateException("state manager 
failed to close"));
 
         stateDirectory.unlock(taskId);
-        expectLastCall().andThrow(new IOException("Timeout"));
+        expectLastCall();

Review comment:
       You can probably remove this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to