Repository: kafka Updated Branches: refs/heads/trunk 0cc32abc1 -> b512cd474
KAFKA-6259: Make KafkaStreams.cleanup() clean global state directory Author: Matthias J. Sax <matth...@confluent.io> Reviewers: Bill Bejeck <b...@confluent.io>, Guozhang Wang <wangg...@gmail.com> Closes #4255 from mjsax/kafka-6259-clean-global-state-dir add log4j entry Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b512cd47 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b512cd47 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b512cd47 Branch: refs/heads/trunk Commit: b512cd474c6ebfbbbe0192764cec3413a94baba0 Parents: 0cc32ab Author: Matthias J. Sax <matth...@confluent.io> Authored: Wed Nov 29 11:18:31 2017 -0800 Committer: Guozhang Wang <wangg...@gmail.com> Committed: Wed Nov 29 11:20:29 2017 -0800 ---------------------------------------------------------------------- .../org/apache/kafka/streams/KafkaStreams.java | 3 +- .../processor/internals/StateDirectory.java | 73 ++++++++++++++++---- .../processor/internals/StateDirectoryTest.java | 19 ++++- 3 files changed, 81 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b512cd47/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index c7dfe71..1844cde 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -892,12 +892,13 @@ public class KafkaStreams { * Calling this method triggers a restore of local {@link StateStore}s on the next {@link #start() application start}. * * @throws IllegalStateException if this {@code KafkaStreams} instance is currently {@link State#RUNNING running} + * @throws StreamsException if cleanup failed */ public void cleanUp() { if (isRunning()) { throw new IllegalStateException("Cannot clean up while running."); } - stateDirectory.cleanRemovedTasks(0); + stateDirectory.clean(); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/b512cd47/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java index 1bfe98c..c33ade6 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java @@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.errors.ProcessorStateException; +import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.TaskId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -222,6 +223,21 @@ public class StateDirectory { } } + public synchronized void clean() { + try { + cleanRemovedTasks(0, true); + } catch (final Exception e) { + // this is already logged within cleanRemovedTasks + throw new StreamsException(e); + } + try { + Utils.delete(globalStateDir().getAbsoluteFile()); + } catch (final IOException e) { + log.error("{} Failed to delete global state directory due to an unexpected exception", logPrefix(), e); + throw new StreamsException(e); + } + } + /** * Remove the directories for any {@link TaskId}s that are no-longer * owned by this {@link StreamThread} and aren't locked by either @@ -230,37 +246,70 @@ public class StateDirectory { * this amount of time (milliseconds) */ public synchronized void cleanRemovedTasks(final long cleanupDelayMs) { + try { + cleanRemovedTasks(cleanupDelayMs, false); + } catch (final Exception cannotHappen) { + throw new IllegalStateException("Should have swallowed exception.", cannotHappen); + } + } + + private synchronized void cleanRemovedTasks(final long cleanupDelayMs, + final boolean manualUserCall) throws Exception { final File[] taskDirs = listTaskDirectories(); if (taskDirs == null || taskDirs.length == 0) { return; // nothing to do } - for (File taskDir : taskDirs) { + + for (final File taskDir : taskDirs) { final String dirName = taskDir.getName(); - TaskId id = TaskId.parse(dirName); + final TaskId id = TaskId.parse(dirName); if (!locks.containsKey(id)) { try { if (lock(id)) { - long now = time.milliseconds(); - long lastModifiedMs = taskDir.lastModified(); - if (now > lastModifiedMs + cleanupDelayMs) { - log.info("{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms)", logPrefix(), dirName, id, now - lastModifiedMs, cleanupDelayMs); + final long now = time.milliseconds(); + final long lastModifiedMs = taskDir.lastModified(); + if (now > lastModifiedMs + cleanupDelayMs || manualUserCall) { + if (!manualUserCall) { + log.info( + "{} Deleting obsolete state directory {} for task {} as {}ms has elapsed (cleanup delay is {}ms).", + logPrefix(), + dirName, + id, + now - lastModifiedMs, + cleanupDelayMs); + } else { + log.info( + "{} Deleting state directory {} for task {} as user calling cleanup.", + logPrefix(), + dirName, + id); + } Utils.delete(taskDir); } } - } catch (OverlappingFileLockException e) { + } catch (final OverlappingFileLockException e) { // locked by another thread - } catch (IOException e) { - log.error("{} Failed to lock the state directory due to an unexpected exception", logPrefix(), e); + if (manualUserCall) { + log.error("{} Failed to get the state directory lock.", logPrefix(), e); + throw e; + } + } catch (final IOException e) { + log.error("{} Failed to delete the state directory.", logPrefix(), e); + if (manualUserCall) { + throw e; + } } finally { try { unlock(id); - } catch (IOException e) { - log.error("{} Failed to release the state directory lock", logPrefix()); + } catch (final IOException e) { + log.error("{} Failed to release the state directory lock.", logPrefix()); + if (manualUserCall) { + throw e; + } } } } } - } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/b512cd47/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index 1a5d46d..e14d010 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -195,9 +195,13 @@ public class StateDirectoryTest { directory.lock(task1); directory.directoryForTask(new TaskId(2, 0)); + List<File> files = Arrays.asList(appDir.listFiles()); + assertEquals(3, files.size()); + time.sleep(1000); directory.cleanRemovedTasks(0); - final List<File> files = Arrays.asList(appDir.listFiles()); + + files = Arrays.asList(appDir.listFiles()); assertEquals(2, files.size()); assertTrue(files.contains(new File(appDir, task0.toString()))); assertTrue(files.contains(new File(appDir, task1.toString()))); @@ -341,4 +345,17 @@ public class StateDirectoryTest { assertTrue(directory.lock(taskId)); } + @Test + public void shouldCleanupAllTaskDirectoriesIncludingGlobalOne() { + directory.directoryForTask(new TaskId(1, 0)); + directory.globalStateDir(); + + List<File> files = Arrays.asList(appDir.listFiles()); + assertEquals(2, files.size()); + + directory.clean(); + + files = Arrays.asList(appDir.listFiles()); + assertEquals(0, files.size()); + } } \ No newline at end of file