Repository: kafka Updated Branches: refs/heads/0.10.2 5f728532a -> 863867582
KAFKA-5038; Throw correct exception of locking of state directory fails Author: Eno Thereska <[email protected]> Reviewers: Damian Guy <[email protected]>, Matthias J. Sax <[email protected]>, Ismael Juma <[email protected]> Closes #2841 from enothereska/KAFKA-5038 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/86386758 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/86386758 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/86386758 Branch: refs/heads/0.10.2 Commit: 8638675828b63c4bae6fb85de56543d7b3088546 Parents: 5f72853 Author: Eno Thereska <[email protected]> Authored: Wed Apr 12 15:26:37 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Wed Apr 12 15:26:37 2017 +0100 ---------------------------------------------------------------------- .../internals/ProcessorStateManager.java | 16 ++++++++++++---- .../processor/internals/StateDirectory.java | 9 ++++++++- .../processor/internals/StateDirectoryTest.java | 18 ++++++++++++++++++ 3 files changed, 38 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/86386758/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java index 65831a2..2ef9634 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java @@ -82,7 +82,7 @@ public class ProcessorStateManager implements StateManager { final Map<String, String> storeToChangelogTopic) throws LockException, IOException { this.taskId = taskId; this.stateDirectory = stateDirectory; - this.baseDir = stateDirectory.directoryForTask(taskId); + this.logPrefix = String.format("task [%s]", taskId); this.partitionForTopic = new HashMap<>(); for (TopicPartition source : sources) { this.partitionForTopic.put(source.topic(), source); @@ -96,10 +96,18 @@ public class ProcessorStateManager implements StateManager { this.restoreCallbacks = isStandby ? new HashMap<String, StateRestoreCallback>() : null; this.storeToChangelogTopic = storeToChangelogTopic; - this.logPrefix = String.format("task [%s]", taskId); - if (!stateDirectory.lock(taskId, 5)) { - throw new LockException(String.format("%s Failed to lock the state directory: %s", logPrefix, baseDir.getCanonicalPath())); + throw new LockException(String.format("%s Failed to lock the state directory for task %s", + logPrefix, taskId)); + } + // get a handle on the parent/base directory of the task directory + // note that the parent directory could have been accidentally deleted here, + // so catch that exception if that is the case + try { + this.baseDir = stateDirectory.directoryForTask(taskId); + } catch (ProcessorStateException e) { + throw new LockException(String.format("%s Failed to get the directory for task %s. Exception %s", + logPrefix, taskId, e)); } // load the checkpoint information http://git-wip-us.apache.org/repos/asf/kafka/blob/86386758/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index d264b26..b081e27 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -95,11 +95,18 @@ public class StateDirectory { * @throws IOException */ public boolean lock(final TaskId taskId, int retry) throws IOException { + final File lockFile; // we already have the lock so bail out here if (locks.containsKey(taskId)) { return true; } - final File lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME); + try { + lockFile = new File(directoryForTask(taskId), LOCK_FILE_NAME); + } catch (ProcessorStateException e) { + // directoryForTask could be throwing an exception if another thread + // has concurrently deleted the directory + return false; + } final FileChannel channel; http://git-wip-us.apache.org/repos/asf/kafka/blob/86386758/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index fb55796..6b1d077 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.test.TestUtils; import org.junit.After; @@ -32,6 +33,7 @@ import java.util.Arrays; import java.util.List; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -97,6 +99,22 @@ public class StateDirectoryTest { assertTrue(directory.lock(taskId, 0)); } + @Test(expected = ProcessorStateException.class) + public void shouldThrowProcessorStateException() throws Exception { + final TaskId taskId = new TaskId(0, 0); + + Utils.delete(stateDir); + directory.directoryForTask(taskId); + } + + @Test + public void shouldNotLockDeletedDirectory() throws Exception { + final TaskId taskId = new TaskId(0, 0); + + Utils.delete(stateDir); + assertFalse(directory.lock(taskId, 0)); + } + @Test public void shouldLockMulitpleTaskDirectories() throws Exception {
