Updated Branches: refs/heads/flume-1.3.0 4dea8489e -> df969d394
FLUME-1607. FileChannel should use a regex as opposed to simple filename filter when finding logs (Brock Noland via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/df969d39 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/df969d39 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/df969d39 Branch: refs/heads/flume-1.3.0 Commit: df969d3946e3800474d272a022a8897aa7ea1c06 Parents: 4dea848 Author: Hari Shreedharan <[email protected]> Authored: Tue Sep 25 16:52:36 2012 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Tue Sep 25 16:53:30 2012 -0700 ---------------------------------------------------------------------- .../org/apache/flume/channel/file/LogFileV3.java | 2 +- .../org/apache/flume/channel/file/LogUtils.java | 6 ++-- .../apache/flume/channel/file/Serialization.java | 5 +++ .../flume/channel/file/TestFileChannelRestart.java | 24 +++++++++++++++ .../org/apache/flume/channel/file/TestLog.java | 23 ++++++++++++++ 5 files changed, 56 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/df969d39/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java index 32ebac7..f768d23 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java @@ -129,7 +129,7 @@ class LogFileV3 extends LogFile { */ public static void writeDelimitedTo(GeneratedMessage msg, File file) throws IOException { - File tmp = new File(file.getParentFile(), file.getName() + ".tmp"); + File tmp = Serialization.getMetaDataTempFile(file); FileOutputStream outputStream = new FileOutputStream(tmp); boolean closed = false; try { http://git-wip-us.apache.org/repos/asf/flume/blob/df969d39/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java index c14a6f0..c2b9564 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java @@ -22,6 +22,7 @@ import java.io.File; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.regex.Pattern; import com.google.common.collect.Lists; @@ -58,15 +59,14 @@ public class LogUtils { * @return List of data files within logDir */ static List<File> getLogs(File logDir) { + Pattern pattern = Pattern.compile("^" + Log.PREFIX + "\\d+$"); List<File> result = Lists.newArrayList(); for (File file : logDir.listFiles()) { String name = file.getName(); - if (name.startsWith(Log.PREFIX) && - !name.endsWith(Serialization.METADATA_FILENAME)) { + if (pattern.matcher(name).matches()) { result.add(file); } } return result; } - } http://git-wip-us.apache.org/repos/asf/flume/blob/df969d39/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java index ad2a645..6b0eeb3 100644 --- a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java +++ b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java @@ -31,8 +31,13 @@ class Serialization { static final int VERSION_3 = 3; static final String METADATA_FILENAME = ".meta"; + static final String METADATA_TMP_FILENAME = ".tmp"; + static File getMetaDataTempFile(File metaDataFile) { + String metaDataFileName = metaDataFile.getName() + METADATA_TMP_FILENAME; + return new File(metaDataFile.getParentFile(), metaDataFileName); + } static File getMetaDataFile(File file) { String metaDataFileName = file.getName() + METADATA_FILENAME; return new File(file.getParentFile(), metaDataFileName); http://git-wip-us.apache.org/repos/asf/flume/blob/df969d39/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java index 68285cc..3f90805 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java @@ -150,4 +150,28 @@ public class TestFileChannelRestart extends TestFileChannelBase { channel.start(); Assert.assertFalse(channel.isOpen()); } + @Test + public void testWithExtraLogs() + throws Exception { + Map<String, String> overrides = Maps.newHashMap(); + overrides.put(FileChannelConfiguration.CAPACITY, "10"); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set<String> in = fillChannel(channel, "extralogs"); + for (int i = 0; i < dataDirs.length; i++) { + File file = new File(dataDirs[i], Log.PREFIX + (1000 + i)); + Assert.assertTrue(file.createNewFile()); + Assert.assertTrue(file.length() == 0); + File metaDataFile = Serialization.getMetaDataFile(file); + File metaDataTempFile = Serialization.getMetaDataTempFile(metaDataFile); + Assert.assertTrue(metaDataTempFile.createNewFile()); + } + channel.stop(); + channel = createFileChannel(overrides); + channel.start(); + Assert.assertTrue(channel.isOpen()); + Set<String> out = consumeChannel(channel); + compareInputAndOut(in, out); + } } http://git-wip-us.apache.org/repos/asf/flume/blob/df969d39/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java ---------------------------------------------------------------------- diff --git a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java index e3eb184..a165d6a 100644 --- a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java +++ b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java @@ -20,6 +20,7 @@ package org.apache.flume.channel.file; import java.io.File; import java.io.IOException; +import java.util.List; import org.apache.commons.io.FileUtils; import org.junit.After; @@ -27,6 +28,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import com.google.common.collect.Lists; import com.google.common.io.Files; public class TestLog { @@ -248,6 +250,27 @@ public class TestLog { Assert.assertNull(eventPointerOut); } + @Test + public void testGetLogs() throws IOException { + File logDir = dataDirs[0]; + List<File> expected = Lists.newArrayList(); + for (int i = 0; i < 10; i++) { + File log = new File(logDir, Log.PREFIX + i); + expected.add(log); + Assert.assertTrue(log.isFile() || log.createNewFile()); + File metaDataFile = Serialization.getMetaDataFile(log); + File metaDataTempFile = Serialization.getMetaDataTempFile(metaDataFile); + File logGzip = new File(logDir, Log.PREFIX + i + ".gz"); + Assert.assertTrue(metaDataFile.isFile() || metaDataFile.createNewFile()); + Assert.assertTrue(metaDataTempFile.isFile() || + metaDataTempFile.createNewFile()); + Assert.assertTrue(log.isFile() || logGzip.createNewFile()); + } + List<File> actual = LogUtils.getLogs(logDir); + LogUtils.sort(actual); + LogUtils.sort(expected); + Assert.assertEquals(expected, actual); + } private void takeAndVerify(FlumeEventPointer eventPointerIn, FlumeEvent eventIn) throws IOException, InterruptedException { FlumeEventQueue queue = log.getFlumeEventQueue();
