Updated Branches:
  refs/heads/flume-1.3.0 4dea8489e -> df969d394

FLUME-1607. FileChannel should use a regex as opposed to simple filename filter 
when finding logs

(Brock Noland via Hari Shreedharan)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/df969d39
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/df969d39
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/df969d39

Branch: refs/heads/flume-1.3.0
Commit: df969d3946e3800474d272a022a8897aa7ea1c06
Parents: 4dea848
Author: Hari Shreedharan <[email protected]>
Authored: Tue Sep 25 16:52:36 2012 -0700
Committer: Hari Shreedharan <[email protected]>
Committed: Tue Sep 25 16:53:30 2012 -0700

----------------------------------------------------------------------
 .../org/apache/flume/channel/file/LogFileV3.java   |    2 +-
 .../org/apache/flume/channel/file/LogUtils.java    |    6 ++--
 .../apache/flume/channel/file/Serialization.java   |    5 +++
 .../flume/channel/file/TestFileChannelRestart.java |   24 +++++++++++++++
 .../org/apache/flume/channel/file/TestLog.java     |   23 ++++++++++++++
 5 files changed, 56 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/df969d39/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
index 32ebac7..f768d23 100644
--- 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
+++ 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogFileV3.java
@@ -129,7 +129,7 @@ class LogFileV3 extends LogFile {
    */
   public static void writeDelimitedTo(GeneratedMessage msg, File file)
   throws IOException {
-    File tmp = new File(file.getParentFile(), file.getName() + ".tmp");
+    File tmp = Serialization.getMetaDataTempFile(file);
     FileOutputStream outputStream = new FileOutputStream(tmp);
     boolean closed = false;
     try {

http://git-wip-us.apache.org/repos/asf/flume/blob/df969d39/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java
 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java
index c14a6f0..c2b9564 100644
--- 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java
+++ 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/LogUtils.java
@@ -22,6 +22,7 @@ import java.io.File;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
+import java.util.regex.Pattern;
 
 import com.google.common.collect.Lists;
 
@@ -58,15 +59,14 @@ public class LogUtils {
    * @return List of data files within logDir
    */
   static List<File> getLogs(File logDir) {
+    Pattern pattern = Pattern.compile("^" + Log.PREFIX + "\\d+$");
     List<File> result = Lists.newArrayList();
     for (File file : logDir.listFiles()) {
       String name = file.getName();
-      if (name.startsWith(Log.PREFIX) &&
-          !name.endsWith(Serialization.METADATA_FILENAME)) {
+      if (pattern.matcher(name).matches()) {
         result.add(file);
       }
     }
     return result;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/df969d39/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
index ad2a645..6b0eeb3 100644
--- 
a/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
+++ 
b/flume-ng-channels/flume-file-channel/src/main/java/org/apache/flume/channel/file/Serialization.java
@@ -31,8 +31,13 @@ class Serialization {
   static final int VERSION_3 = 3;
 
   static final String METADATA_FILENAME = ".meta";
+  static final String METADATA_TMP_FILENAME = ".tmp";
 
+  static File getMetaDataTempFile(File metaDataFile) {
+    String metaDataFileName = metaDataFile.getName() + METADATA_TMP_FILENAME;
+    return new File(metaDataFile.getParentFile(), metaDataFileName);
 
+  }
   static File getMetaDataFile(File file) {
     String metaDataFileName = file.getName() + METADATA_FILENAME;
     return new File(file.getParentFile(), metaDataFileName);

http://git-wip-us.apache.org/repos/asf/flume/blob/df969d39/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
index 68285cc..3f90805 100644
--- 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
+++ 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestFileChannelRestart.java
@@ -150,4 +150,28 @@ public class TestFileChannelRestart extends 
TestFileChannelBase {
     channel.start();
     Assert.assertFalse(channel.isOpen());
   }
+  @Test
+  public void testWithExtraLogs()
+      throws Exception {
+    Map<String, String> overrides = Maps.newHashMap();
+    overrides.put(FileChannelConfiguration.CAPACITY, "10");
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> in = fillChannel(channel, "extralogs");
+    for (int i = 0; i < dataDirs.length; i++) {
+      File file = new File(dataDirs[i], Log.PREFIX + (1000 + i));
+      Assert.assertTrue(file.createNewFile());
+      Assert.assertTrue(file.length() == 0);
+      File metaDataFile = Serialization.getMetaDataFile(file);
+      File metaDataTempFile = Serialization.getMetaDataTempFile(metaDataFile);
+      Assert.assertTrue(metaDataTempFile.createNewFile());
+    }
+    channel.stop();
+    channel = createFileChannel(overrides);
+    channel.start();
+    Assert.assertTrue(channel.isOpen());
+    Set<String> out = consumeChannel(channel);
+    compareInputAndOut(in, out);
+  }
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/df969d39/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
index e3eb184..a165d6a 100644
--- 
a/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
+++ 
b/flume-ng-channels/flume-file-channel/src/test/java/org/apache/flume/channel/file/TestLog.java
@@ -20,6 +20,7 @@ package org.apache.flume.channel.file;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.List;
 
 import org.apache.commons.io.FileUtils;
 import org.junit.After;
@@ -27,6 +28,7 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import com.google.common.collect.Lists;
 import com.google.common.io.Files;
 
 public class TestLog {
@@ -248,6 +250,27 @@ public class TestLog {
     Assert.assertNull(eventPointerOut);
   }
 
+  @Test
+  public void testGetLogs() throws IOException {
+    File logDir = dataDirs[0];
+    List<File> expected = Lists.newArrayList();
+    for (int i = 0; i < 10; i++) {
+      File log = new File(logDir, Log.PREFIX + i);
+      expected.add(log);
+      Assert.assertTrue(log.isFile() || log.createNewFile());
+      File metaDataFile = Serialization.getMetaDataFile(log);
+      File metaDataTempFile = Serialization.getMetaDataTempFile(metaDataFile);
+      File logGzip = new File(logDir, Log.PREFIX + i + ".gz");
+      Assert.assertTrue(metaDataFile.isFile() || metaDataFile.createNewFile());
+      Assert.assertTrue(metaDataTempFile.isFile() ||
+          metaDataTempFile.createNewFile());
+      Assert.assertTrue(log.isFile() || logGzip.createNewFile());
+    }
+    List<File> actual = LogUtils.getLogs(logDir);
+    LogUtils.sort(actual);
+    LogUtils.sort(expected);
+    Assert.assertEquals(expected, actual);
+  }
   private void takeAndVerify(FlumeEventPointer eventPointerIn,
       FlumeEvent eventIn) throws IOException, InterruptedException {
     FlumeEventQueue queue = log.getFlumeEventQueue();

Reply via email to