ableegoldman commented on a change in pull request #10342:
URL: https://github.com/apache/kafka/pull/10342#discussion_r597427365



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
##########
@@ -288,50 +277,31 @@ private String logPrefix() {
      * Get the lock for the {@link TaskId}s directory if it is available
      * @param taskId task id
      * @return true if successful
-     * @throws IOException if the file cannot be created or file handle cannot 
be grabbed, should be considered as fatal
      */
-    synchronized boolean lock(final TaskId taskId) throws IOException {
+    synchronized boolean lock(final TaskId taskId) {
         if (!hasPersistentStores) {
             return true;
         }
 
-        final File lockFile;
-        // we already have the lock so bail out here
-        final LockAndOwner lockAndOwner = locks.get(taskId);
-        if (lockAndOwner != null && 
lockAndOwner.owningThread.equals(Thread.currentThread().getName())) {
-            log.trace("{} Found cached state dir lock for task {}", 
logPrefix(), taskId);
+        final String lockOwner = lockedTasksToStreamThreadOwner.get(taskId);
+        if (lockOwner != null) {
+            if (lockOwner.equals(Thread.currentThread().getName())) {

Review comment:
       Hm I can't seem to find that ticket, maybe I just thought about filing 
one. Anyways here's https://issues.apache.org/jira/browse/KAFKA-12509




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to