Updated Branches:
  refs/heads/flume-1.4 ddfa991e6 -> 919353dbd

FLUME-1732: SpoolableDirectorySource should have configurable support for 
deleting files it has already completed instead of renaming

(Mike Percy via Brock Noland)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/919353db
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/919353db
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/919353db

Branch: refs/heads/flume-1.4
Commit: 919353dbdf6f82fce6d67d096f4b43c9bedc3a71
Parents: ddfa991
Author: Brock Noland <[email protected]>
Authored: Tue Jan 15 17:24:17 2013 -0800
Committer: Brock Noland <[email protected]>
Committed: Tue Jan 15 17:24:32 2013 -0800

----------------------------------------------------------------------
 .../apache/flume/client/avro/AvroCLIClient.java    |    8 +-
 .../avro/ReliableSpoolingFileEventReader.java      |  195 ++++++++++++---
 .../apache/flume/source/SpoolDirectorySource.java  |   31 ++--
 ...SpoolDirectorySourceConfigurationConstants.java |    5 +-
 .../avro/TestReliableSpoolingFileEventReader.java  |  179 +++++++++++++
 .../client/avro/TestSpoolingFileLineReader.java    |   12 +-
 .../TestReliableSpoolingFileEventReader.java       |  157 ------------
 7 files changed, 373 insertions(+), 214 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/919353db/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java 
b/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
index da23a75..3c8c267 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/client/avro/AvroCLIClient.java
@@ -36,7 +36,6 @@ import org.apache.commons.cli.GnuParser;
 import org.apache.commons.cli.HelpFormatter;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
-import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.EventDeliveryException;
 import org.apache.flume.FlumeException;
@@ -204,11 +203,8 @@ public class AvroCLIClient {
       if (fileName != null) {
         reader = new SimpleTextLineEventReader(new FileReader(new 
File(fileName)));
       } else if (dirName != null) {
-        reader = new ReliableSpoolingFileEventReader(
-            new File(dirName), ".COMPLETED",
-            "^$", new File(new File(dirName), ".flumespool"),
-            false, "",
-            "LINE", new Context());
+        reader = new ReliableSpoolingFileEventReader.Builder()
+            .spoolDirectory(new File(dirName)).build();
       } else {
         reader = new SimpleTextLineEventReader(new 
InputStreamReader(System.in));
       }

http://git-wip-us.apache.org/repos/asf/flume/blob/919353db/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
 
b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
index f2d587f..b19d0ea 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/client/avro/ReliableSpoolingFileEventReader.java
@@ -32,6 +32,7 @@ import org.apache.flume.FlumeException;
 import org.apache.flume.annotations.InterfaceAudience;
 import org.apache.flume.annotations.InterfaceStability;
 import org.apache.flume.serialization.*;
+import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants;
 import org.apache.flume.tools.PlatformDetect;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -74,7 +75,7 @@ public class ReliableSpoolingFileEventReader implements 
ReliableEventReader {
 
   static final String metaFileName = ".flumespool-main.meta";
 
-  private final File directory;
+  private final File spoolDirectory;
   private final String completedSuffix;
   private final String deserializerType;
   private final Context deserializerContext;
@@ -82,6 +83,7 @@ public class ReliableSpoolingFileEventReader implements 
ReliableEventReader {
   private final File metaFile;
   private final boolean annotateFileName;
   private final String fileNameHeader;
+  private final String deletePolicy;
 
   private Optional<FileInfo> currentFile = Optional.absent();
   /** Always contains the last file from which lines have been read. **/
@@ -91,35 +93,44 @@ public class ReliableSpoolingFileEventReader implements 
ReliableEventReader {
   /**
    * Create a ReliableSpoolingFileEventReader to watch the given directory.
    */
-  public ReliableSpoolingFileEventReader(File directory, String 
completedSuffix,
-      String ignorePattern, File trackerDirectory,
+  private ReliableSpoolingFileEventReader(File spoolDirectory,
+      String completedSuffix, String ignorePattern, String trackerDirPath,
       boolean annotateFileName, String fileNameHeader,
-      String deserializerType, Context deserializerContext) throws IOException 
{
+      String deserializerType, Context deserializerContext,
+      String deletePolicy) throws IOException {
 
     // Sanity checks
-    Preconditions.checkNotNull(directory);
+    Preconditions.checkNotNull(spoolDirectory);
     Preconditions.checkNotNull(completedSuffix);
     Preconditions.checkNotNull(ignorePattern);
-    Preconditions.checkNotNull(trackerDirectory);
+    Preconditions.checkNotNull(trackerDirPath);
     Preconditions.checkNotNull(deserializerType);
     Preconditions.checkNotNull(deserializerContext);
+    Preconditions.checkNotNull(deletePolicy);
+
+    // validate delete policy
+    if (!deletePolicy.equalsIgnoreCase(DeletePolicy.NEVER.name()) &&
+        !deletePolicy.equalsIgnoreCase(DeletePolicy.IMMEDIATE.name())) {
+      throw new IllegalArgumentException("Delete policies other than " +
+          "NEVER and IMMEDIATE are not yet supported");
+    }
 
     if (logger.isDebugEnabled()) {
       logger.debug("Initializing {} with directory={}, metaDir={}, " +
           "deserializer={}",
           new Object[] { ReliableSpoolingFileEventReader.class.getSimpleName(),
-          directory, trackerDirectory, deserializerType });
+          spoolDirectory, trackerDirPath, deserializerType });
     }
 
     // Verify directory exists and is readable/writable
-    Preconditions.checkState(directory.exists(),
-        "Directory does not exist: " + directory.getAbsolutePath());
-    Preconditions.checkState(directory.isDirectory(),
-        "Path is not a directory: " + directory.getAbsolutePath());
+    Preconditions.checkState(spoolDirectory.exists(),
+        "Directory does not exist: " + spoolDirectory.getAbsolutePath());
+    Preconditions.checkState(spoolDirectory.isDirectory(),
+        "Path is not a directory: " + spoolDirectory.getAbsolutePath());
 
     // Do a canary test to make sure we have access to spooling directory
     try {
-      File f1 = File.createTempFile("flume", "test", directory);
+      File f1 = File.createTempFile("flume", "test", spoolDirectory);
       Files.write("testing flume file permissions\n", f1, Charsets.UTF_8);
       Files.readLines(f1, Charsets.UTF_8);
       if (!f1.delete()) {
@@ -127,15 +138,24 @@ public class ReliableSpoolingFileEventReader implements 
ReliableEventReader {
       }
     } catch (IOException e) {
       throw new FlumeException("Unable to read and modify files" +
-          " in the spooling directory: " + directory, e);
+          " in the spooling directory: " + spoolDirectory, e);
     }
-    this.directory = directory;
+
+    this.spoolDirectory = spoolDirectory;
     this.completedSuffix = completedSuffix;
     this.deserializerType = deserializerType;
     this.deserializerContext = deserializerContext;
     this.annotateFileName = annotateFileName;
     this.fileNameHeader = fileNameHeader;
     this.ignorePattern = Pattern.compile(ignorePattern);
+    this.deletePolicy = deletePolicy;
+
+    File trackerDirectory = new File(trackerDirPath);
+
+    // if relative path, treat as relative to spool directory
+    if (!trackerDirectory.isAbsolute()) {
+      trackerDirectory = new File(spoolDirectory, trackerDirPath);
+    }
 
     // ensure that meta directory exists
     if (!trackerDirectory.exists()) {
@@ -248,27 +268,43 @@ public class ReliableSpoolingFileEventReader implements 
ReliableEventReader {
   private void retireCurrentFile() throws IOException {
     Preconditions.checkState(currentFile.isPresent());
 
-    String currPath = currentFile.get().getFile().getAbsolutePath();
-    String newPath = currPath + completedSuffix;
-    logger.info("Preparing to move file {} to {}", currPath, newPath);
+    File fileToRoll = new File(currentFile.get().getFile().getAbsolutePath());
 
     currentFile.get().getDeserializer().close();
-    File fileToRoll = new File(currPath);
 
     // Verify that spooling assumptions hold
     if (fileToRoll.lastModified() != currentFile.get().getLastModified()) {
-      String message = "File has been modified since being read: " + currPath;
+      String message = "File has been modified since being read: " + 
fileToRoll;
       throw new IllegalStateException(message);
     }
     if (fileToRoll.length() != currentFile.get().getLength()) {
-      String message = "File has changed size since being read: " + currPath;
+      String message = "File has changed size since being read: " + fileToRoll;
       throw new IllegalStateException(message);
     }
 
-    File destination = new File(newPath);
+    if (deletePolicy.equalsIgnoreCase(DeletePolicy.NEVER.name())) {
+      rollCurrentFile(fileToRoll);
+    } else if (deletePolicy.equalsIgnoreCase(DeletePolicy.IMMEDIATE.name())) {
+      deleteCurrentFile(fileToRoll);
+    } else {
+      // TODO: implement delay in the future
+      throw new IllegalArgumentException("Unsupported delete policy: " +
+          deletePolicy);
+    }
+  }
+
+  /**
+   * Rename the given spooled file
+   * @param fileToRoll
+   * @throws IOException
+   */
+  private void rollCurrentFile(File fileToRoll) throws IOException {
+
+    File dest = new File(fileToRoll.getPath() + completedSuffix);
+    logger.info("Preparing to move file {} to {}", fileToRoll, dest);
 
     // Before renaming, check whether destination file name exists
-    if (destination.exists() && PlatformDetect.isWindows()) {
+    if (dest.exists() && PlatformDetect.isWindows()) {
       /*
        * If we are here, it means the completed file already exists. In almost
        * every case this means the user is violating an assumption of Flume
@@ -277,8 +313,8 @@ public class ReliableSpoolingFileEventReader implements 
ReliableEventReader {
        * file was already rolled but the rename was not atomic. If that seems
        * likely, we let it pass with only a warning.
        */
-      if (Files.equal(currentFile.get().getFile(), destination)) {
-        logger.warn("Completed file " + newPath +
+      if (Files.equal(currentFile.get().getFile(), dest)) {
+        logger.warn("Completed file " + dest +
             " already exists, but files match, so continuing.");
         boolean deleted = fileToRoll.delete();
         if (!deleted) {
@@ -287,21 +323,21 @@ public class ReliableSpoolingFileEventReader implements 
ReliableEventReader {
         }
       } else {
         String message = "File name has been re-used with different" +
-            " files. Spooling assumptions violated for " + newPath;
+            " files. Spooling assumptions violated for " + dest;
         throw new IllegalStateException(message);
       }
 
     // Dest file exists and not on windows
-    } else if (destination.exists()) {
+    } else if (dest.exists()) {
       String message = "File name has been re-used with different" +
-          " files. Spooling assumptions violated for " + newPath;
+          " files. Spooling assumptions violated for " + dest;
       throw new IllegalStateException(message);
 
     // Destination file does not already exist. We are good to go!
     } else {
-      boolean renamed = fileToRoll.renameTo(new File(newPath));
+      boolean renamed = fileToRoll.renameTo(dest);
       if (renamed) {
-        logger.debug("Successfully rolled file {} to {}", fileToRoll, newPath);
+        logger.debug("Successfully rolled file {} to {}", fileToRoll, dest);
 
         // now we no longer need the meta file
         deleteMetaFile();
@@ -309,7 +345,7 @@ public class ReliableSpoolingFileEventReader implements 
ReliableEventReader {
         /* If we are here then the file cannot be renamed for a reason other
          * than that the destination file exists (actually, that remains
          * possible w/ small probability due to TOC-TOU conditions).*/
-        String message = "Unable to move " + currPath + " to " + newPath +
+        String message = "Unable to move " + fileToRoll + " to " + dest +
             ". This will likely cause duplicate events. Please verify that " +
             "flume has sufficient permissions to perform these operations.";
         throw new FlumeException(message);
@@ -318,6 +354,22 @@ public class ReliableSpoolingFileEventReader implements 
ReliableEventReader {
   }
 
   /**
+   * Delete the given spooled file
+   * @param fileToDelete
+   * @throws IOException
+   */
+  private void deleteCurrentFile(File fileToDelete) throws IOException {
+    logger.info("Preparing to delete file {}", fileToDelete);
+    if (!fileToDelete.exists()) {
+      logger.warn("Unable to delete nonexistent file: {}", fileToDelete);
+      return;
+    }
+    if (!fileToDelete.delete()) {
+      throw new IOException("Unable to delete spool file: " + fileToDelete);
+    }
+  }
+
+  /**
    * Find and open the oldest file in the chosen directory. If two or more
    * files are equally old, the file name with lower lexicographical value is
    * returned. If the directory is empty, this will return an absent option.
@@ -336,7 +388,7 @@ public class ReliableSpoolingFileEventReader implements 
ReliableEventReader {
         return true;
       }
     };
-    List<File> candidateFiles = Arrays.asList(directory.listFiles(filter));
+    List<File> candidateFiles = 
Arrays.asList(spoolDirectory.listFiles(filter));
     if (candidateFiles.isEmpty()) {
       return Optional.absent();
     } else {
@@ -412,4 +464,85 @@ public class ReliableSpoolingFileEventReader implements 
ReliableEventReader {
     public File getFile() { return file; }
   }
 
+  @InterfaceAudience.Private
+  @InterfaceStability.Unstable
+  static enum DeletePolicy {
+    NEVER,
+    IMMEDIATE,
+    DELAY
+  }
+
+  /**
+   * Special builder class for ReliableSpoolingFileEventReader
+   */
+  public static class Builder {
+    private File spoolDirectory;
+    private String completedSuffix =
+        SpoolDirectorySourceConfigurationConstants.SPOOLED_FILE_SUFFIX;
+    private String ignorePattern =
+        SpoolDirectorySourceConfigurationConstants.DEFAULT_IGNORE_PAT;
+    private String trackerDirPath =
+        SpoolDirectorySourceConfigurationConstants.DEFAULT_META_DIR;
+    private Boolean annotateFileName =
+        SpoolDirectorySourceConfigurationConstants.DEFAULT_FILE_HEADER;
+    private String fileNameHeader =
+        SpoolDirectorySourceConfigurationConstants.DEFAULT_FILENAME_HEADER_KEY;
+    private String deserializerType =
+        SpoolDirectorySourceConfigurationConstants.DEFAULT_DESERIALIZER;
+    private Context deserializerContext = new Context();
+    private String deletePolicy =
+        SpoolDirectorySourceConfigurationConstants.DEFAULT_DELETE_POLICY;
+
+    public Builder spoolDirectory(File directory) {
+      this.spoolDirectory = directory;
+      return this;
+    }
+
+    public Builder completedSuffix(String completedSuffix) {
+      this.completedSuffix = completedSuffix;
+      return this;
+    }
+
+    public Builder ignorePattern(String ignorePattern) {
+      this.ignorePattern = ignorePattern;
+      return this;
+    }
+
+    public Builder trackerDirPath(String trackerDirPath) {
+      this.trackerDirPath = trackerDirPath;
+      return this;
+    }
+
+    public Builder annotateFileName(Boolean annotateFileName) {
+      this.annotateFileName = annotateFileName;
+      return this;
+    }
+
+    public Builder fileNameHeader(String fileNameHeader) {
+      this.fileNameHeader = fileNameHeader;
+      return this;
+    }
+
+    public Builder deserializerType(String deserializerType) {
+      this.deserializerType = deserializerType;
+      return this;
+    }
+
+    public Builder deserializerContext(Context deserializerContext) {
+      this.deserializerContext = deserializerContext;
+      return this;
+    }
+
+    public Builder deletePolicy(String deletePolicy) {
+      this.deletePolicy = deletePolicy;
+      return this;
+    }
+
+    public ReliableSpoolingFileEventReader build() throws IOException {
+      return new ReliableSpoolingFileEventReader(spoolDirectory, 
completedSuffix,
+          ignorePattern, trackerDirPath, annotateFileName, fileNameHeader,
+          deserializerType, deserializerContext, deletePolicy);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/919353db/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java 
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
index 167193c..552bd48 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySource.java
@@ -52,9 +52,10 @@ Configurable, EventDrivenSource {
   private String fileHeaderKey;
   private int batchSize;
   private String ignorePattern;
-  private File metaDirectory;
+  private String trackerDirPath;
   private String deserializerType;
   private Context deserializerContext;
+  private String deletePolicy;
 
   private CounterGroup counterGroup;
   ReliableSpoolingFileEventReader reader;
@@ -70,9 +71,17 @@ Configurable, EventDrivenSource {
 
     File directory = new File(spoolDirectory);
     try {
-    reader = new ReliableSpoolingFileEventReader(directory, completedSuffix,
-        ignorePattern, metaDirectory, fileHeader, fileHeaderKey,
-        deserializerType, deserializerContext);
+      reader = new ReliableSpoolingFileEventReader.Builder()
+          .spoolDirectory(directory)
+          .completedSuffix(completedSuffix)
+          .ignorePattern(ignorePattern)
+          .trackerDirPath(trackerDirPath)
+          .annotateFileName(fileHeader)
+          .fileNameHeader(fileHeaderKey)
+          .deserializerType(deserializerType)
+          .deserializerContext(deserializerContext)
+          .deletePolicy(deletePolicy)
+          .build();
     } catch (IOException ioe) {
       throw new FlumeException("Error instantiating spooling event parser",
           ioe);
@@ -99,6 +108,7 @@ Configurable, EventDrivenSource {
 
     completedSuffix = context.getString(SPOOLED_FILE_SUFFIX,
         DEFAULT_SPOOLED_FILE_SUFFIX);
+    deletePolicy = context.getString(DELETE_POLICY, DEFAULT_DELETE_POLICY);
     fileHeader = context.getBoolean(FILENAME_HEADER,
         DEFAULT_FILE_HEADER);
     fileHeaderKey = context.getString(FILENAME_HEADER_KEY,
@@ -106,17 +116,8 @@ Configurable, EventDrivenSource {
     batchSize = context.getInteger(BATCH_SIZE,
         DEFAULT_BATCH_SIZE);
 
-    ignorePattern = context.getString(IGNORE_PAT, DFLT_IGNORE_PAT);
-    String metaDirLoc = context.getString(META_DIR, DEFAULT_META_DIR);
-
-    // if absolute path, treat as absolute
-    if (metaDirLoc.charAt(0) == '/') {
-      metaDirectory = new File(metaDirLoc);
-
-    // if relative path, treat as relative to spool directory
-    } else {
-      metaDirectory = new File(spoolDirectory, DEFAULT_META_DIR);
-    }
+    ignorePattern = context.getString(IGNORE_PAT, DEFAULT_IGNORE_PAT);
+    trackerDirPath = context.getString(META_DIR, DEFAULT_META_DIR);
 
     deserializerType = context.getString(DESERIALIZER, DEFAULT_DESERIALIZER);
     deserializerContext = new Context(context.getSubProperties(DESERIALIZER +

http://git-wip-us.apache.org/repos/asf/flume/blob/919353db/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
 
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
index 67549e0..afc7288 100644
--- 
a/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
+++ 
b/flume-ng-core/src/main/java/org/apache/flume/source/SpoolDirectorySourceConfigurationConstants.java
@@ -47,7 +47,7 @@ public class SpoolDirectorySourceConfigurationConstants {
 
   /** Pattern of files to ignore */
   public static final String IGNORE_PAT = "ignorePattern";
-  public static final String DFLT_IGNORE_PAT = "^$"; // no effect
+  public static final String DEFAULT_IGNORE_PAT = "^$"; // no effect
 
   /** Directory to store metadata about files being processed */
   public static final String META_DIR = "metaDir";
@@ -56,4 +56,7 @@ public class SpoolDirectorySourceConfigurationConstants {
   /** Deserializer to use to parse the file data into Flume Events */
   public static final String DESERIALIZER = "deserializer";
   public static final String DEFAULT_DESERIALIZER = "LINE";
+
+  public static final String DELETE_POLICY = "deletePolicy";
+  public static final String DEFAULT_DELETE_POLICY = "never";
 }

http://git-wip-us.apache.org/repos/asf/flume/blob/919353db/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
 
b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
new file mode 100644
index 0000000..a29606e
--- /dev/null
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestReliableSpoolingFileEventReader.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flume.client.avro;
+
+import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
+import com.google.common.io.Files;
+import junit.framework.Assert;
+import org.apache.flume.Event;
+import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants;
+import 
org.apache.flume.client.avro.ReliableSpoolingFileEventReader.DeletePolicy;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.util.List;
+
+public class TestReliableSpoolingFileEventReader {
+
+  private static final Logger logger = LoggerFactory.getLogger
+      (TestReliableSpoolingFileEventReader.class);
+
+  private static final File WORK_DIR = new File("target/test/work/" +
+      TestReliableSpoolingFileEventReader.class.getSimpleName());
+
+  @Before
+  public void setup() throws IOException, InterruptedException {
+    if (!WORK_DIR.isDirectory()) {
+      Files.createParentDirs(new File(WORK_DIR, "dummy"));
+    }
+
+    // write out a few files
+    for (int i = 0; i < 4; i++) {
+      File fileName = new File(WORK_DIR, "file"+i);
+      StringBuilder sb = new StringBuilder();
+
+      // write as many lines as the index of the file
+      for (int j = 0; j < i; j++) {
+        sb.append("file" + i + "line" + j + "\n");
+      }
+      Files.write(sb.toString(), fileName, Charsets.UTF_8);
+    }
+    Thread.sleep(1500L); // make sure timestamp is later
+    Files.write("\n", new File(WORK_DIR, "emptylineFile"), Charsets.UTF_8);
+  }
+
+  @After
+  public void tearDown() {
+
+    // delete all the files & dirs we created
+    File[] files = WORK_DIR.listFiles();
+    for (File f : files) {
+      if (f.isDirectory()) {
+        File[] subDirFiles = f.listFiles();
+        for (File sdf : subDirFiles) {
+          if (!sdf.delete()) {
+            logger.warn("Cannot delete file {}", sdf.getAbsolutePath());
+          }
+        }
+        if (!f.delete()) {
+          logger.warn("Cannot delete directory {}", f.getAbsolutePath());
+        }
+      } else {
+        if (!f.delete()) {
+          logger.warn("Cannot delete file {}", f.getAbsolutePath());
+        }
+      }
+    }
+    if (!WORK_DIR.delete()) {
+      logger.warn("Cannot delete work directory {}", 
WORK_DIR.getAbsolutePath());
+    }
+
+  }
+
+  // FIXME: implement ignore pattern test
+  @Ignore
+  @Test
+  public void testIgnorePattern() {
+    ReliableSpoolingFileEventReader parser;
+  }
+
+  @Test
+  public void testRepeatedCallsWithCommitAlways() throws IOException {
+    ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
+        .spoolDirectory(WORK_DIR).build();
+
+    final int expectedLines = 0 + 1 + 2 + 3 + 1;
+    int seenLines = 0;
+    for (int i = 0; i < 10; i++) {
+      List<Event> events = reader.readEvents(10);
+      seenLines += events.size();
+      reader.commit();
+    }
+
+    Assert.assertEquals(expectedLines, seenLines);
+  }
+
+  @Test
+  public void testRepeatedCallsWithCommitOnSuccess() throws IOException {
+    String trackerDirPath =
+        SpoolDirectorySourceConfigurationConstants.DEFAULT_META_DIR;
+    File trackerDir = new File(WORK_DIR, trackerDirPath);
+
+    ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
+        .spoolDirectory(WORK_DIR).trackerDirPath(trackerDirPath).build();
+
+    final int expectedLines = 0 + 1 + 2 + 3 + 1;
+    int seenLines = 0;
+    for (int i = 0; i < 10; i++) {
+      List<Event> events = reader.readEvents(10);
+      int numEvents = events.size();
+      if (numEvents > 0) {
+        seenLines += numEvents;
+        reader.commit();
+
+        // ensure that there are files in the trackerDir
+        File[] files = trackerDir.listFiles();
+        Assert.assertNotNull(files);
+        Assert.assertTrue("Expected tracker files in tracker dir " + trackerDir
+            .getAbsolutePath(), files.length > 0);
+      }
+    }
+
+    Assert.assertEquals(expectedLines, seenLines);
+  }
+
+  @Test
+  public void testFileDeletion() throws IOException {
+    ReliableEventReader reader = new ReliableSpoolingFileEventReader.Builder()
+        .spoolDirectory(WORK_DIR)
+        .deletePolicy(DeletePolicy.IMMEDIATE.name())
+        .build();
+
+    List<File> before = listFiles(WORK_DIR);
+    Assert.assertEquals("Expected 5, not: " + before, 5, before.size());
+
+    List<Event> events;
+    do {
+      events = reader.readEvents(10);
+      reader.commit();
+    } while (!events.isEmpty());
+
+    List<File> after = listFiles(WORK_DIR);
+    Assert.assertEquals("Expected 0, not: " + after, 0, after.size());
+  }
+
+  private static List<File> listFiles(File dir) {
+    List<File> files = Lists.newArrayList(dir.listFiles(new FileFilter
+        () {
+      @Override
+      public boolean accept(File pathname) {
+        return !pathname.isDirectory();
+      }
+    }));
+    return files;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/919353db/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestSpoolingFileLineReader.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestSpoolingFileLineReader.java
 
b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestSpoolingFileLineReader.java
index bc10243..ac046a9 100644
--- 
a/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestSpoolingFileLineReader.java
+++ 
b/flume-ng-core/src/test/java/org/apache/flume/client/avro/TestSpoolingFileLineReader.java
@@ -26,6 +26,7 @@ import com.google.common.io.Files;
 import org.apache.flume.Context;
 import org.apache.flume.Event;
 import org.apache.flume.serialization.LineDeserializer;
+import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -43,9 +44,9 @@ public class TestSpoolingFileLineReader {
 
   Logger logger = LoggerFactory.getLogger(TestSpoolingFileLineReader.class);
 
-  private static String completedSuffix = ".COMPLETE";
+  private static String completedSuffix =
+      SpoolDirectorySourceConfigurationConstants.DEFAULT_SPOOLED_FILE_SUFFIX;
   private static int bufferMaxLineLength = 500;
-  private static int bufferMaxLines = 30;
 
   private File tmpDir;
 
@@ -66,8 +67,11 @@ public class TestSpoolingFileLineReader {
     ctx.put(LineDeserializer.MAXLINE_KEY, Integer.toString(maxLineLength));
     ReliableSpoolingFileEventReader parser;
     try {
-      parser = new ReliableSpoolingFileEventReader(tmpDir, completedSuffix, 
"^$",
-          new File(tmpDir, ".flumespool"), false, "^$", "LINE", ctx);
+      parser = new ReliableSpoolingFileEventReader.Builder()
+          .spoolDirectory(tmpDir)
+          .completedSuffix(completedSuffix)
+          .deserializerContext(ctx)
+          .build();
     } catch (IOException ioe) {
       throw Throwables.propagate(ioe);
     }

http://git-wip-us.apache.org/repos/asf/flume/blob/919353db/flume-ng-core/src/test/java/org/apache/flume/serialization/TestReliableSpoolingFileEventReader.java
----------------------------------------------------------------------
diff --git 
a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestReliableSpoolingFileEventReader.java
 
b/flume-ng-core/src/test/java/org/apache/flume/serialization/TestReliableSpoolingFileEventReader.java
deleted file mode 100644
index abc1827..0000000
--- 
a/flume-ng-core/src/test/java/org/apache/flume/serialization/TestReliableSpoolingFileEventReader.java
+++ /dev/null
@@ -1,157 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.flume.serialization;
-
-import com.google.common.base.Charsets;
-import com.google.common.collect.Lists;
-import com.google.common.io.Files;
-import junit.framework.Assert;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.client.avro.ReliableEventReader;
-import org.apache.flume.client.avro.ReliableSpoolingFileEventReader;
-import org.apache.flume.source.SpoolDirectorySourceConfigurationConstants;
-import org.junit.After;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.List;
-
-public class TestReliableSpoolingFileEventReader {
-
-  private static final Logger logger = LoggerFactory.getLogger
-      (TestReliableSpoolingFileEventReader.class);
-
-  private static final File WORK_DIR = new File("target/test/work/" +
-      TestReliableSpoolingFileEventReader.class.getSimpleName());
-
-  @Before
-  public void setup() throws IOException, InterruptedException {
-    if (!WORK_DIR.isDirectory()) {
-      Files.createParentDirs(new File(WORK_DIR, "dummy"));
-    }
-
-    // write out a few files
-    for (int i = 0; i < 4; i++) {
-      File fileName = new File(WORK_DIR, "file"+i);
-      StringBuilder sb = new StringBuilder();
-
-      // write as many lines as the index of the file
-      for (int j = 0; j < i; j++) {
-        sb.append("file" + i + "line" + j + "\n");
-      }
-      Files.write(sb.toString(), fileName, Charsets.UTF_8);
-    }
-    Thread.sleep(1500L); // make sure timestamp is later
-    Files.write("\n", new File(WORK_DIR, "emptylineFile"), Charsets.UTF_8);
-  }
-
-  @After
-  public void tearDown() {
-
-    // delete all the files & dirs we created
-    File[] files = WORK_DIR.listFiles();
-    for (File f : files) {
-      if (f.isDirectory()) {
-        File[] subDirFiles = f.listFiles();
-        for (File sdf : subDirFiles) {
-          if (!sdf.delete()) {
-            logger.warn("Cannot delete file {}", sdf.getAbsolutePath());
-          }
-        }
-        if (!f.delete()) {
-          logger.warn("Cannot delete directory {}", f.getAbsolutePath());
-        }
-      } else {
-        if (!f.delete()) {
-          logger.warn("Cannot delete file {}", f.getAbsolutePath());
-        }
-      }
-    }
-    if (!WORK_DIR.delete()) {
-      logger.warn("Cannot delete work directory {}", 
WORK_DIR.getAbsolutePath());
-    }
-
-  }
-
-  // FIXME: implement ignore pattern test
-  @Ignore
-  @Test
-  public void testIgnorePattern() {
-    ReliableSpoolingFileEventReader parser;
-  }
-
-  @Test
-  public void testRepeatedCallsWithCommitAlways() throws IOException {
-    File trackerDir = new File(WORK_DIR,
-        SpoolDirectorySourceConfigurationConstants.DEFAULT_META_DIR);
-    ReliableEventReader reader = new ReliableSpoolingFileEventReader(WORK_DIR,
-        SpoolDirectorySourceConfigurationConstants.DEFAULT_SPOOLED_FILE_SUFFIX,
-        SpoolDirectorySourceConfigurationConstants.DFLT_IGNORE_PAT,
-        trackerDir, false, "file",
-        SpoolDirectorySourceConfigurationConstants.DEFAULT_DESERIALIZER,
-        new Context());
-
-    final int expectedLines = 0 + 1 + 2 + 3 + 1;
-    int seenLines = 0;
-    for (int i = 0; i < 10; i++) {
-      List<Event> events = reader.readEvents(10);
-      seenLines += events.size();
-      reader.commit();
-    }
-
-    Assert.assertEquals(expectedLines, seenLines);
-  }
-
-  @Test
-  public void testRepeatedCallsWithCommitOnSuccess() throws IOException {
-    File trackerDir = new File(WORK_DIR,
-        SpoolDirectorySourceConfigurationConstants.DEFAULT_META_DIR);
-    ReliableEventReader reader = new ReliableSpoolingFileEventReader(WORK_DIR,
-        SpoolDirectorySourceConfigurationConstants.DEFAULT_SPOOLED_FILE_SUFFIX,
-        SpoolDirectorySourceConfigurationConstants.DFLT_IGNORE_PAT,
-        trackerDir, false, "file",
-        SpoolDirectorySourceConfigurationConstants.DEFAULT_DESERIALIZER,
-        new Context());
-
-    final int expectedLines = 0 + 1 + 2 + 3 + 1;
-    int seenLines = 0;
-    for (int i = 0; i < 10; i++) {
-      List<Event> events = reader.readEvents(10);
-      int numEvents = events.size();
-      if (numEvents > 0) {
-        seenLines += numEvents;
-        reader.commit();
-
-        // ensure that there are files in the trackerDir
-        File[] files = trackerDir.listFiles();
-        Assert.assertNotNull(files);
-        Assert.assertTrue("Expected tracker files in tracker dir " + trackerDir
-            .getAbsolutePath(), files.length > 0);
-      }
-    }
-
-    Assert.assertEquals(expectedLines, seenLines);
-  }
-
-}

Reply via email to