Repository: kafka Updated Branches: refs/heads/trunk 43fb2df7a -> d345d53e4
KAFKA-4902; Utils#delete should correctly handle I/O errors and symlinks Author: Colin P. Mccabe <[email protected]> Reviewers: Jun Rao <[email protected]>, Apurva Mehta <[email protected]>, Ismael Juma <[email protected]> Closes #2691 from cmccabe/KAFKA-4902 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/d345d53e Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/d345d53e Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/d345d53e Branch: refs/heads/trunk Commit: d345d53e4e5e4f74707e2521aa635b93ba3f1e7b Parents: 43fb2df Author: Colin P. Mccabe <[email protected]> Authored: Thu Mar 30 13:34:52 2017 +0100 Committer: Ismael Juma <[email protected]> Committed: Thu Mar 30 13:38:09 2017 +0100 ---------------------------------------------------------------------- checkstyle/suppressions.xml | 2 +- .../org/apache/kafka/common/utils/Utils.java | 37 ++++++++++++++------ .../apache/kafka/common/utils/UtilsTest.java | 24 +++++++++++++ .../java/org/apache/kafka/test/TestUtils.java | 9 ++++- .../examples/pageview/JsonPOJOSerializer.java | 3 +- .../kafka/streams/state/internals/Segment.java | 9 ++++- .../kafka/streams/state/internals/Segments.java | 11 +++++- .../internals/ProcessorStateManagerTest.java | 2 +- .../processor/internals/StandbyTaskTest.java | 3 +- .../processor/internals/StateDirectoryTest.java | 7 ++-- .../processor/internals/StreamTaskTest.java | 3 +- .../StreamThreadStateStoreProviderTest.java | 2 +- 12 files changed, 87 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/checkstyle/suppressions.xml ---------------------------------------------------------------------- diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml index f722aba..ea1619e 100644 --- a/checkstyle/suppressions.xml +++ b/checkstyle/suppressions.xml @@ -8,7 +8,7 @@ <!-- Clients --> <suppress checks="ClassFanOutComplexity" - files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator).java"/> + files="(Fetcher|ConsumerCoordinator|KafkaConsumer|KafkaProducer|SaslServerAuthenticator|Utils).java"/> <suppress checks="ClassFanOutComplexity" files=".*/protocol/Errors.java"/> <suppress checks="ClassFanOutComplexity" http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/clients/src/main/java/org/apache/kafka/common/utils/Utils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index 796b019..a7d2a1b 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -35,9 +35,13 @@ import java.nio.MappedByteBuffer; import java.nio.channels.FileChannel; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.nio.file.FileVisitResult; import java.nio.file.Files; +import java.nio.file.NoSuchFileException; import java.nio.file.Path; +import java.nio.file.SimpleFileVisitor; import java.nio.file.StandardCopyOption; +import java.nio.file.attribute.BasicFileAttributes; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -545,19 +549,30 @@ public class Utils { * * @param file The root file at which to begin deleting */ - public static void delete(File file) { - if (file == null) { + public static void delete(final File file) throws IOException { + if (file == null) return; - } else if (file.isDirectory()) { - File[] files = file.listFiles(); - if (files != null) { - for (File f : files) - delete(f); + Files.walkFileTree(file.toPath(), new SimpleFileVisitor<Path>() { + @Override + public FileVisitResult visitFileFailed(Path path, IOException exc) throws IOException { + // If the root path did not exist, ignore the error; otherwise throw it. + if (exc instanceof NoSuchFileException && path.toFile().equals(file)) + return FileVisitResult.TERMINATE; + throw exc; } - file.delete(); - } else { - file.delete(); - } + + @Override + public FileVisitResult visitFile(Path path, BasicFileAttributes attrs) throws IOException { + Files.delete(path); + return FileVisitResult.CONTINUE; + } + + @Override + public FileVisitResult postVisitDirectory(Path path, IOException exc) throws IOException { + Files.delete(path); + return FileVisitResult.CONTINUE; + } + }); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java index 512c29c..2d6d05c 100755 --- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java @@ -24,10 +24,12 @@ import org.junit.Test; import java.io.Closeable; import java.io.DataOutputStream; import java.io.EOFException; +import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; import java.nio.charset.StandardCharsets; +import java.nio.file.Files; import java.nio.file.StandardOpenOption; import java.util.Arrays; import java.util.Collections; @@ -425,4 +427,26 @@ public class UtilsTest { } } + @Test(timeout = 120000) + public void testRecursiveDelete() throws IOException { + Utils.delete(null); // delete of null does nothing. + + // Test that deleting a temporary file works. + File tempFile = TestUtils.tempFile(); + Utils.delete(tempFile); + assertFalse(Files.exists(tempFile.toPath())); + + // Test recursive deletes + File tempDir = TestUtils.tempDirectory(); + File tempDir2 = TestUtils.tempDirectory(tempDir.toPath(), "a"); + TestUtils.tempDirectory(tempDir.toPath(), "b"); + TestUtils.tempDirectory(tempDir2.toPath(), "c"); + Utils.delete(tempDir); + assertFalse(Files.exists(tempDir.toPath())); + assertFalse(Files.exists(tempDir2.toPath())); + + // Test that deleting a non-existent directory hierarchy works. + Utils.delete(tempDir); + assertFalse(Files.exists(tempDir.toPath())); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/clients/src/test/java/org/apache/kafka/test/TestUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index b5fada4..ea857a0 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -22,6 +22,8 @@ import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.xml.bind.DatatypeConverter; import java.io.File; @@ -50,6 +52,7 @@ import static org.junit.Assert.fail; * Helper functions for writing unit tests */ public class TestUtils { + private static final Logger log = LoggerFactory.getLogger(TestUtils.class); public static final File IO_TMP_DIR = new File(System.getProperty("java.io.tmpdir")); @@ -165,7 +168,11 @@ public class TestUtils { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { - Utils.delete(file); + try { + Utils.delete(file); + } catch (IOException e) { + log.error("Error deleting {}", file.getAbsolutePath(), e); + } } }); http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java index 6a5abea..625bda9 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java @@ -31,8 +31,7 @@ public class JsonPOJOSerializer<T> implements Serializer<T> { */ public JsonPOJOSerializer() { } - - @SuppressWarnings("unchecked") + @Override public void configure(Map<String, ?> props, boolean isKey) { } http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java index f3f42a2..1311a27 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segment.java @@ -20,6 +20,8 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.processor.ProcessorContext; +import java.io.IOException; + // Use the Bytes wrapper for underlying rocksDB keys since they are used for hashing data structures class Segment extends RocksDBStore<Bytes, byte[]> { public final long id; @@ -29,7 +31,7 @@ class Segment extends RocksDBStore<Bytes, byte[]> { this.id = id; } - void destroy() { + void destroy() throws IOException { Utils.delete(dbDir); } @@ -41,4 +43,9 @@ class Segment extends RocksDBStore<Bytes, byte[]> { open = true; } + + @Override + public String toString() { + return "Segment(id=" + id + ", name=" + name() + ")"; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java index 5dedb40..a02f87e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/internals/Segments.java @@ -19,8 +19,11 @@ package org.apache.kafka.streams.state.internals; import org.apache.kafka.streams.errors.InvalidStateStoreException; import org.apache.kafka.streams.errors.ProcessorStateException; import org.apache.kafka.streams.processor.ProcessorContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; +import java.io.IOException; import java.text.SimpleDateFormat; import java.util.ArrayList; import java.util.Arrays; @@ -34,6 +37,8 @@ import java.util.concurrent.ConcurrentHashMap; * Manages the {@link Segment}s that are used by the {@link RocksDBSegmentedBytesStore} */ class Segments { + private static final Logger log = LoggerFactory.getLogger(Segments.class); + static final long MIN_SEGMENT_INTERVAL = 60 * 1000L; private final ConcurrentHashMap<Long, Segment> segments = new ConcurrentHashMap<>(); @@ -167,7 +172,11 @@ class Segments { if (segment != null && segment.id <= oldestSegmentId) { segments.remove(segmentEntry.getKey()); segment.close(); - segment.destroy(); + try { + segment.destroy(); + } catch (IOException e) { + log.error("Error destroying {}", segment, e); + } } } if (oldestSegmentId > minSegmentId) { http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java index a1e1c01..fe8c186 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorStateManagerTest.java @@ -85,7 +85,7 @@ public class ProcessorStateManagerTest { } @After - public void cleanup() { + public void cleanup() throws IOException { Utils.delete(baseDir); } http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index b3a8dab..1442b9e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -45,6 +45,7 @@ import org.junit.Before; import org.junit.Test; import java.io.File; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -149,7 +150,7 @@ public class StandbyTaskTest { } @After - public void cleanup() { + public void cleanup() throws IOException { Utils.delete(baseDir); } http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java index 23fed21..e8d2763 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java @@ -25,6 +25,7 @@ import org.junit.Before; import org.junit.Test; import java.io.File; +import java.io.IOException; import java.nio.channels.FileChannel; import java.nio.channels.OverlappingFileLockException; import java.nio.file.StandardOpenOption; @@ -52,10 +53,8 @@ public class StateDirectoryTest { } @After - public void cleanup() { - if (stateDir.exists()) { - Utils.delete(stateDir); - } + public void cleanup() throws IOException { + Utils.delete(stateDir); } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 7c9f46b..6256434 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -55,6 +55,7 @@ import org.junit.Test; import org.junit.Before; import java.io.File; +import java.io.IOException; import java.util.Arrays; import java.util.Collections; import java.util.HashMap; @@ -146,7 +147,7 @@ public class StreamTaskTest { } @After - public void cleanup() { + public void cleanup() throws IOException { if (task != null) { try { task.close(); http://git-wip-us.apache.org/repos/asf/kafka/blob/d345d53e/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index 3886be8..3102685 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -132,7 +132,7 @@ public class StreamThreadStateStoreProviderTest { } @After - public void cleanUp() { + public void cleanUp() throws IOException { Utils.delete(stateDir); }
