Repository: nifi
Updated Branches:
  refs/heads/master 68291636c -> 930e95aa0


NIFI-1170 - Improved TailFile processor to support multiple files tailing

This closes #980


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/930e95aa
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/930e95aa
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/930e95aa

Branch: refs/heads/master
Commit: 930e95aa0023b12e5618068ea144808e5627cea7
Parents: 6829163
Author: Pierre Villard <pierre.villard...@gmail.com>
Authored: Wed Aug 31 18:53:26 2016 +0200
Committer: Oleg Zhurakousky <o...@suitcase.io>
Committed: Wed Sep 21 10:05:39 2016 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/TailFile.java      | 577 +++++++++++++++----
 .../additionalDetails.html                      | 134 +++++
 .../nifi/processors/standard/TestTailFile.java  | 316 +++++++++-
 3 files changed, 916 insertions(+), 111 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/930e95aa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
index 16bde4c..ba2c9b8 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
@@ -29,29 +29,38 @@ import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.regex.Pattern;
 import java.util.zip.CRC32;
 import java.util.zip.CheckedInputStream;
 import java.util.zip.Checksum;
 
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.Stateful;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.components.state.StateMap;
 import org.apache.nifi.flowfile.FlowFile;
@@ -71,7 +80,7 @@ import org.apache.nifi.stream.io.StreamUtils;
 @TriggerSerially
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @Tags({"tail", "file", "log", "text", "source"})
-@CapabilityDescription("\"Tails\" a file, ingesting data from the file as it 
is written to the file. The file is expected to be textual. Data is ingested 
only when a "
+@CapabilityDescription("\"Tails\" a file, or a list of files, ingesting data 
from the file as it is written to the file. The file is expected to be textual. 
Data is ingested only when a "
         + "new line is encountered (carriage return or new-line character or 
combination). If the file to tail is periodically \"rolled over\", as is 
generally the case "
         + "with log files, an optional Rolling Filename Pattern can be used to 
retrieve data from files that have rolled over, even if the rollover occurred 
while NiFi "
         + "was not running (provided that the data still exists upon restart 
of NiFi). It is generally advisable to set the Run Schedule to a few seconds, 
rather than running "
@@ -79,35 +88,83 @@ import org.apache.nifi.stream.io.StreamUtils;
         + "ingesting files that have been compressed when 'rolled over'.")
 @Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "Stores state 
about where in the Tailed File it left off so that on restart it does not have 
to duplicate data. "
         + "State is stored either local or clustered depend on the <File 
Location> property.")
+@WritesAttributes({
+    @WritesAttribute(attribute = "tailfile.original.path", description = "Path 
of the original file the flow file comes from.")
+    })
 public class TailFile extends AbstractProcessor {
 
-    static final AllowableValue LOCATION_LOCAL = new AllowableValue("Local", 
"Local", "State is stored locally. Each node in a cluster will tail a different 
file.");
-    static final AllowableValue LOCATION_REMOTE = new AllowableValue("Remote", 
"Remote", "State is located on a remote resource. This Processor will store 
state across the cluster so that "
+    static final String MAP_PREFIX = "file.";
+
+    static final AllowableValue LOCATION_LOCAL = new AllowableValue("Local", 
"Local",
+            "State is stored locally. Each node in a cluster will tail a 
different file.");
+    static final AllowableValue LOCATION_REMOTE = new AllowableValue("Remote", 
"Remote",
+            "State is located on a remote resource. This Processor will store 
state across the cluster so that "
             + "it can be run on Primary Node Only and a new Primary Node can 
pick up where the last one left off.");
 
+    static final AllowableValue MODE_SINGLEFILE = new AllowableValue("Single 
file", "Single file",
+            "In this mode, only the one file indicated in the 'Files to tail' 
property will be watched by the processor."
+            + " In this mode, the file may not exist when starting the 
processor.");
+    static final AllowableValue MODE_MULTIFILE = new AllowableValue("Multiple 
files", "Multiple files",
+            "In this mode, the 'Files to tail' property accepts a regular 
expression and the processor will look"
+            + " for files in 'Base directory' to list the files to tail by the 
processor. In this mode, only the files existing"
+            + " when starting the processor will be used.");
+
+    static final AllowableValue FIXED_NAME = new AllowableValue("Fixed name", 
"Fixed name", "With this rolling strategy, the files "
+            + "where the log messages are appended have always the same 
name.");
+    static final AllowableValue CHANGING_NAME = new AllowableValue("Changing 
name", "Changing name", "With this rolling strategy, "
+            + "the files where the log messages are appended have not a fixed 
name (for example: filename contaning the current day.");
+
     static final AllowableValue START_BEGINNING_OF_TIME = new 
AllowableValue("Beginning of Time", "Beginning of Time",
             "Start with the oldest data that matches the Rolling Filename 
Pattern and then begin reading from the File to Tail");
     static final AllowableValue START_CURRENT_FILE = new 
AllowableValue("Beginning of File", "Beginning of File",
             "Start with the beginning of the File to Tail. Do not ingest any 
data that has already been rolled over");
     static final AllowableValue START_CURRENT_TIME = new 
AllowableValue("Current Time", "Current Time",
-            "Start with the data at the end of the File to Tail. Do not ingest 
any data thas has already been rolled over or any data in the File to Tail that 
has already been written.");
+            "Start with the data at the end of the File to Tail. Do not ingest 
any data thas has already been rolled over or any "
+            + "data in the File to Tail that has already been written.");
+
+    static final PropertyDescriptor BASE_DIRECTORY = new 
PropertyDescriptor.Builder()
+            .name("tail-base-directory")
+            .displayName("Base directory")
+            .description("Base directory used to look for files to tail. This 
property is required when using Multifile mode.")
+            .expressionLanguageSupported(false)
+            .addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
+            .required(false)
+            .build();
+
+    static final PropertyDescriptor MODE = new PropertyDescriptor.Builder()
+            .name("tail-mode")
+            .displayName("Tailing mode")
+            .description("Mode to use: single file will tail only one gile, 
multiple file will look for a list of file. In Multiple mode"
+                    + " the Base directory is required.")
+            .expressionLanguageSupported(false)
+            .required(true)
+            .allowableValues(MODE_SINGLEFILE, MODE_MULTIFILE)
+            .defaultValue(MODE_SINGLEFILE.getValue())
+            .build();
 
     static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder()
+            .displayName("File(s) to Tail")
             .name("File to Tail")
-            .description("Fully-qualified filename of the file that should be 
tailed")
+            .description("Path of the file to tail in case of single file 
mode. If using multifile mode, regular expression to find files "
+                    + "to tail in the base directory. In case recursivity is 
set to true, the regular expression will be used to match the "
+                    + "path starting from the base directory (see additional 
details for examples).")
             .expressionLanguageSupported(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
             .required(true)
             .build();
+
     static final PropertyDescriptor ROLLING_FILENAME_PATTERN = new 
PropertyDescriptor.Builder()
             .name("Rolling Filename Pattern")
             .description("If the file to tail \"rolls over\" as would be the 
case with log files, this filename pattern will be used to "
                     + "identify files that have rolled over so that if NiFi is 
restarted, and the file has rolled over, it will be able to pick up where it 
left off. "
-                    + "This pattern supports wildcard characters * and ? and 
will assume that the files that have rolled over live in the same directory as 
the file being tailed.")
+                    + "This pattern supports wildcard characters * and ?, it 
also supports the notation ${filename} to specify a pattern based on the name 
of the file "
+                    + "(without extension), and will assume that the files 
that have rolled over live in the same directory as the file being tailed. "
+                    + "The same glob pattern will be used for all files.")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .expressionLanguageSupported(false)
             .required(false)
             .build();
+
     static final PropertyDescriptor STATE_LOCATION = new 
PropertyDescriptor.Builder()
             .displayName("State Location")
             .name("File Location") //retained name of property for backward 
compatibility of configs
@@ -117,31 +174,79 @@ public class TailFile extends AbstractProcessor {
             .allowableValues(LOCATION_LOCAL, LOCATION_REMOTE)
             .defaultValue(LOCATION_LOCAL.getValue())
             .build();
+
     static final PropertyDescriptor START_POSITION = new 
PropertyDescriptor.Builder()
             .name("Initial Start Position")
-            .description("When the Processor first begins to tail data, this 
property specifies where the Processor should begin reading data. Once data has 
been ingested from the file, "
+            .description("When the Processor first begins to tail data, this 
property specifies where the Processor should begin reading data. Once data has 
been ingested from a file, "
                     + "the Processor will continue from the last point from 
which it has received data.")
             .allowableValues(START_BEGINNING_OF_TIME, START_CURRENT_FILE, 
START_CURRENT_TIME)
             .defaultValue(START_CURRENT_FILE.getValue())
             .required(true)
             .build();
 
+    static final PropertyDescriptor RECURSIVE = new 
PropertyDescriptor.Builder()
+            .name("tailfile-recursive-lookup")
+            .displayName("Recursive lookup")
+            .description("When using Multiple files mode, this property 
defines if files must be listed recursively or not"
+                    + " in the base directory.")
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    static final PropertyDescriptor ROLLING_STRATEGY = new 
PropertyDescriptor.Builder()
+            .name("tailfile-rolling-strategy")
+            .displayName("Rolling Strategy")
+            .description("Specifies if the files to tail have a fixed name or 
not.")
+            .required(true)
+            .allowableValues(FIXED_NAME, CHANGING_NAME)
+            .defaultValue(FIXED_NAME.getValue())
+            .build();
+
+    static final PropertyDescriptor LOOKUP_FREQUENCY = new 
PropertyDescriptor.Builder()
+            .name("tailfile-lookup-frequency")
+            .displayName("Lookup frequency")
+            .description("Only used in Multiple files mode and Changing name 
rolling strategy. It specifies the minimum "
+                    + "duration the processor will wait before listing again 
the files to tail.")
+            .required(false)
+            .defaultValue("10 minutes")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
+    static final PropertyDescriptor MAXIMUM_AGE = new 
PropertyDescriptor.Builder()
+            .name("tailfile-maximum-age")
+            .displayName("Maximum age")
+            .description("Only used in Multiple files mode and Changing name 
rolling strategy. It specifies the necessary "
+                    + "minimum duration to consider that no new messages will 
be appended in a file regarding its last "
+                    + "modification date. This should not be set too low to 
avoid duplication of data in case new messages "
+                    + "are appended at a lower frequency.")
+            .required(false)
+            .defaultValue("24 hours")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .build();
+
     static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("All FlowFiles are routed to this Relationship.")
             .build();
 
-    private volatile TailFileState state = new TailFileState(null, null, null, 
0L, 0L, null, ByteBuffer.allocate(65536));
-    private volatile Long expectedRecoveryChecksum;
-    private volatile boolean tailFileChanged = false;
+    private volatile Map<String, TailFileObject> states = new HashMap<String, 
TailFileObject>();
+    private volatile AtomicLong lastLookup = new AtomicLong(0L);
+    private volatile AtomicBoolean isMultiChanging = new AtomicBoolean(false);
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
         final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(MODE);
         properties.add(FILENAME);
         properties.add(ROLLING_FILENAME_PATTERN);
+        properties.add(BASE_DIRECTORY);
         properties.add(START_POSITION);
         properties.add(STATE_LOCATION);
+        properties.add(RECURSIVE);
+        properties.add(ROLLING_STRATEGY);
+        properties.add(LOOKUP_FREQUENCY);
+        properties.add(MAXIMUM_AGE);
         return properties;
     }
 
@@ -153,22 +258,178 @@ public class TailFile extends AbstractProcessor {
     @Override
     public void onPropertyModified(final PropertyDescriptor descriptor, final 
String oldValue, final String newValue) {
         if (isConfigurationRestored() && FILENAME.equals(descriptor)) {
-            state = new TailFileState(newValue, null, null, 0L, 0L, null, 
ByteBuffer.allocate(65536));
-            tailFileChanged = true;
+            states = new HashMap<String, TailFileObject>();
+        }
+    }
+
+    @Override
+    protected Collection<ValidationResult> customValidate(ValidationContext 
context) {
+        final List<ValidationResult> results = new 
ArrayList<>(super.customValidate(context));
+
+        
if(context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue())) {
+            String path = context.getProperty(BASE_DIRECTORY).getValue();
+            if(path == null) {
+                results.add(new 
ValidationResult.Builder().subject(BASE_DIRECTORY.getName()).valid(false)
+                        .explanation("Base directory property cannot be empty 
in Multifile mode.").build());
+            } else if (!new File(path).isDirectory()) {
+                results.add(new 
ValidationResult.Builder().subject(BASE_DIRECTORY.getName()).valid(false)
+                            .explanation(path + " is not a 
directory.").build());
+            }
+
+            
if(context.getProperty(ROLLING_STRATEGY).getValue().equals(CHANGING_NAME.getValue()))
 {
+                String freq = context.getProperty(LOOKUP_FREQUENCY).getValue();
+                if(freq == null) {
+                    results.add(new 
ValidationResult.Builder().subject(LOOKUP_FREQUENCY.getName()).valid(false)
+                            .explanation("In Multiple files mode and Changing 
name rolling strategy, lookup frequency "
+                                    + "property must be specified.").build());
+                }
+                String maxAge = context.getProperty(MAXIMUM_AGE).getValue();
+                if(maxAge == null) {
+                    results.add(new 
ValidationResult.Builder().subject(MAXIMUM_AGE.getName()).valid(false)
+                            .explanation("In Multiple files mode and Changing 
name rolling strategy, maximum age "
+                                    + "property must be specified.").build());
+                }
+            } else {
+                long max = context.getProperty(MAXIMUM_AGE).getValue() == null 
? Long.MAX_VALUE : 
context.getProperty(MAXIMUM_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+                List<String> filesToTail = 
getFilesToTail(context.getProperty(BASE_DIRECTORY).getValue(),
+                        context.getProperty(FILENAME).getValue(),
+                        context.getProperty(RECURSIVE).asBoolean(),
+                        max);
+
+                if(filesToTail.isEmpty()) {
+                    results.add(new 
ValidationResult.Builder().subject(FILENAME.getName()).valid(false)
+                                .explanation("There is no file to tail. Files 
must exist when starting this processor.").build());
+                }
+            }
         }
+
+        return results;
     }
 
     @OnScheduled
     public void recoverState(final ProcessContext context) throws IOException {
+        // set isMultiChanging
+        
isMultiChanging.set(context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue())
+                && 
context.getProperty(ROLLING_STRATEGY).getValue().equals(CHANGING_NAME.getValue()));
+
+        // set last lookup to now
+        lastLookup.set(new Date().getTime());
+
+        // maxAge
+        long maxAge = context.getProperty(MAXIMUM_AGE).getValue() == null ? 
Long.MAX_VALUE : 
context.getProperty(MAXIMUM_AGE).asTimePeriod(TimeUnit.MILLISECONDS);
+
+        // get list of files to tail
+        List<String> filesToTail = new ArrayList<String>();
+
+        
if(context.getProperty(MODE).getValue().equals(MODE_MULTIFILE.getValue())) {
+            
filesToTail.addAll(getFilesToTail(context.getProperty(BASE_DIRECTORY).getValue(),
+                    context.getProperty(FILENAME).getValue(),
+                    context.getProperty(RECURSIVE).asBoolean(),
+                    maxAge));
+        } else {
+            filesToTail.add(context.getProperty(FILENAME).getValue());
+        }
+
+
         final Scope scope = getStateScope(context);
         final StateMap stateMap = context.getStateManager().getState(scope);
+
         if (stateMap.getVersion() == -1L) {
             //state has been cleared or never stored so recover as 'empty 
state'
-            recoverState(context, Collections.EMPTY_MAP);
+            initStates(filesToTail, Collections.emptyMap(), true);
+            recoverState(context, filesToTail, Collections.emptyMap());
             return;
         }
 
-        recoverState(context, stateMap.toMap());
+        Map<String, String> statesMap = stateMap.toMap();
+
+        initStates(filesToTail, statesMap, false);
+        recoverState(context, filesToTail, statesMap);
+    }
+
+    private void initStates(List<String> filesToTail, Map<String, String> 
statesMap, boolean isCleared) {
+        int i = 0;
+
+        if(isCleared) {
+            states.clear();
+        } else {
+            // we have to deal with the case where NiFi has been restarted. In 
this
+            // case 'states' object is empty but the statesMap is not. So we 
have to
+            // put back the files we already know about in 'states' object 
before
+            // doing the recovery
+            if(states.isEmpty() && !statesMap.isEmpty()) {
+                for(String key : statesMap.keySet()) {
+                    if(key.endsWith(TailFileState.StateKeys.FILENAME)) {
+                        int index = Integer.valueOf(key.split("\\.")[1]);
+                        states.put(statesMap.get(key), new 
TailFileObject(index, statesMap));
+                    }
+                }
+            }
+
+            // first, we remove the files that are no longer present
+            List<String> toBeRemoved = new ArrayList<String>();
+            for(String file : states.keySet()) {
+                if(!filesToTail.contains(file)) {
+                    toBeRemoved.add(file);
+                    cleanReader(states.get(file));
+                }
+            }
+            states.keySet().removeAll(toBeRemoved);
+
+            // then we need to get the highest ID used so far to be sure
+            // we don't mix different files in case we add new files to tail
+            for(String file : states.keySet()) {
+                if(i <= states.get(file).getFilenameIndex()) {
+                    i = states.get(file).getFilenameIndex() + 1;
+                }
+            }
+
+        }
+
+        for (String file : filesToTail) {
+            if(isCleared || !states.containsKey(file)) {
+                states.put(file, new TailFileObject(i));
+                i++;
+            }
+        }
+
+    }
+
+    private void recoverState(final ProcessContext context, final List<String> 
filesToTail, final Map<String, String> map) throws IOException {
+        for (String file : filesToTail) {
+            recoverState(context, map, file);
+        }
+    }
+
+    /**
+     * Method to list the files to tail according to the given base directory
+     * and using the user-provided regular expression
+     * @param baseDir base directory to recursively look into
+     * @param fileRegex expression regular used to match files to tail
+     * @param isRecursive true if looking for file recursively, false otherwise
+     * @return List of files to tail
+     */
+    private List<String> getFilesToTail(final String baseDir, String 
fileRegex, boolean isRecursive, long maxAge) {
+        Collection<File> files = FileUtils.listFiles(new File(baseDir), null, 
isRecursive);
+        List<String> result = new ArrayList<String>();
+
+        String fullRegex = baseDir.endsWith(File.separator) ? baseDir + 
fileRegex : baseDir + File.separator + fileRegex;
+        Pattern p = Pattern.compile(fullRegex);
+
+        for(File file : files) {
+            String path = file.getPath();
+            if(p.matcher(path).matches()) {
+                if(isMultiChanging.get()) {
+                    if((new Date().getTime() - file.lastModified()) < maxAge) {
+                        result.add(path);
+                    }
+                } else {
+                    result.add(path);
+                }
+            }
+        }
+
+        return result;
     }
 
     /**
@@ -181,35 +442,43 @@ public class TailFile extends AbstractProcessor {
      * @param stateValues the values that were recovered from state that was
      * previously stored. This Map should be populated with the keys defined in
      * {@link TailFileState.StateKeys}.
+     * @param filePath the file of the file for which state must be recovered
      * @throws IOException if unable to seek to the appropriate location in the
      * tailed file.
      */
-    private void recoverState(final ProcessContext context, final Map<String, 
String> stateValues) throws IOException {
-        final String currentFilename = 
context.getProperty(FILENAME).getValue();
-        if (!stateValues.containsKey(TailFileState.StateKeys.FILENAME)) {
-            resetState(currentFilename);
+    private void recoverState(final ProcessContext context, final Map<String, 
String> stateValues, final String filePath) throws IOException {
+
+        final String prefix = MAP_PREFIX + 
states.get(filePath).getFilenameIndex() + '.';
+
+        if (!stateValues.containsKey(prefix + 
TailFileState.StateKeys.FILENAME)) {
+            resetState(filePath);
+            return;
+        }
+        if (!stateValues.containsKey(prefix + 
TailFileState.StateKeys.POSITION)) {
+            resetState(filePath);
             return;
         }
-        if (!stateValues.containsKey(TailFileState.StateKeys.POSITION)) {
-            resetState(currentFilename);
+        if (!stateValues.containsKey(prefix + 
TailFileState.StateKeys.TIMESTAMP)) {
+            resetState(filePath);
             return;
         }
-        if (!stateValues.containsKey(TailFileState.StateKeys.TIMESTAMP)) {
-            resetState(currentFilename);
+        if (!stateValues.containsKey(prefix + TailFileState.StateKeys.LENGTH)) 
{
+            resetState(filePath);
             return;
         }
 
-        final String checksumValue = 
stateValues.get(TailFileState.StateKeys.CHECKSUM);
+        final String checksumValue = stateValues.get(prefix + 
TailFileState.StateKeys.CHECKSUM);
         final boolean checksumPresent = (checksumValue != null);
-        final String storedStateFilename = 
stateValues.get(TailFileState.StateKeys.FILENAME);
-        final long position = 
Long.parseLong(stateValues.get(TailFileState.StateKeys.POSITION));
-        final long timestamp = 
Long.parseLong(stateValues.get(TailFileState.StateKeys.TIMESTAMP));
+        final String storedStateFilename = stateValues.get(prefix + 
TailFileState.StateKeys.FILENAME);
+        final long position = Long.parseLong(stateValues.get(prefix + 
TailFileState.StateKeys.POSITION));
+        final long timestamp = Long.parseLong(stateValues.get(prefix + 
TailFileState.StateKeys.TIMESTAMP));
+        final long length = Long.parseLong(stateValues.get(prefix + 
TailFileState.StateKeys.LENGTH));
 
         FileChannel reader = null;
         File tailFile = null;
 
-        if (checksumPresent && currentFilename.equals(storedStateFilename)) {
-            expectedRecoveryChecksum = Long.parseLong(checksumValue);
+        if (checksumPresent && filePath.equals(storedStateFilename)) {
+            
states.get(filePath).setExpectedRecoveryChecksum(Long.parseLong(checksumValue));
 
             // We have an expected checksum and the currently configured 
filename is the same as the state file.
             // We need to check if the existing file is the same as the one 
referred to in the state file based on
@@ -219,10 +488,10 @@ public class TailFile extends AbstractProcessor {
             if (existingTailFile.length() >= position) {
                 try (final InputStream tailFileIs = new 
FileInputStream(existingTailFile);
                         final CheckedInputStream in = new 
CheckedInputStream(tailFileIs, checksum)) {
-                    StreamUtils.copy(in, new NullOutputStream(), 
state.getPosition());
+                    StreamUtils.copy(in, new NullOutputStream(), 
states.get(filePath).getState().getPosition());
 
                     final long checksumResult = in.getChecksum().getValue();
-                    if (checksumResult == expectedRecoveryChecksum) {
+                    if (checksumResult == 
states.get(filePath).getExpectedRecoveryChecksum()) {
                         // Checksums match. This means that we want to resume 
reading from where we left off.
                         // So we will populate the reader object so that it 
will be used in onTrigger. If the
                         // checksums do not match, then we will leave the 
reader object null, so that the next
@@ -245,58 +514,86 @@ public class TailFile extends AbstractProcessor {
                         + "this indicates that the file has rotated. Will 
begin tailing current file from beginning.", new 
Object[]{existingTailFile.length(), position});
             }
 
-            state = new TailFileState(currentFilename, tailFile, reader, 
position, timestamp, checksum, ByteBuffer.allocate(65536));
+            states.get(filePath).setState(new TailFileState(filePath, 
tailFile, reader, position, timestamp, length, checksum, 
ByteBuffer.allocate(65536)));
         } else {
-            resetState(currentFilename);
+            resetState(filePath);
         }
 
-        getLogger().debug("Recovered state {}", new Object[]{state});
+        getLogger().debug("Recovered state {}", new 
Object[]{states.get(filePath).getState()});
     }
 
-    private void resetState(final String currentFilename) {
-        expectedRecoveryChecksum = null;
-        state = new TailFileState(currentFilename, null, null, 0L, 0L, null, 
ByteBuffer.allocate(65536));
+    private void resetState(final String filePath) {
+        states.get(filePath).setExpectedRecoveryChecksum(null);
+        states.get(filePath).setState(new TailFileState(filePath, null, null, 
0L, 0L, 0L, null, ByteBuffer.allocate(65536)));
     }
 
     @OnStopped
     public void cleanup() {
-        final TailFileState state = this.state;
-        if (state == null) {
+        for (TailFileObject tfo : states.values()) {
+            cleanReader(tfo);
+            final TailFileState state = tfo.getState();
+            tfo.setState(new TailFileState(state.getFilename(), 
state.getFile(), null, state.getPosition(), state.getTimestamp(), 
state.getLength(), state.getChecksum(), state.getBuffer()));
+        }
+    }
+
+    private void cleanReader(TailFileObject tfo) {
+        if (tfo.getState() == null) {
             return;
         }
 
-        final FileChannel reader = state.getReader();
+        final FileChannel reader = tfo.getState().getReader();
         if (reader == null) {
             return;
         }
 
         try {
             reader.close();
+            getLogger().debug("Closed FileChannel {}", new Object[]{reader});
         } catch (final IOException ioe) {
             getLogger().warn("Failed to close file handle during cleanup");
         }
-
-        getLogger().debug("Closed FileChannel {}", new Object[]{reader});
-
-        this.state = new TailFileState(state.getFilename(), state.getFile(), 
null, state.getPosition(), state.getTimestamp(), state.getChecksum(), 
state.getBuffer());
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
+        if(isMultiChanging.get()) {
+            long timeSinceLastLookup = new Date().getTime() - lastLookup.get();
+            if(timeSinceLastLookup > 
context.getProperty(LOOKUP_FREQUENCY).asTimePeriod(TimeUnit.MILLISECONDS)) {
+                try {
+                    recoverState(context);
+                } catch (IOException e) {
+                    getLogger().error("Exception raised while looking up for 
new files", e);
+                    context.yield();
+                    return;
+                }
+            }
+        }
+        if(states.isEmpty()) {
+            context.yield();
+            return;
+        }
+        for (String tailFile : states.keySet()) {
+            processTailFile(context, session, tailFile);
+        }
+    }
+
+    private void processTailFile(final ProcessContext context, final 
ProcessSession session, final String tailFile) {
         // If user changes the file that is being tailed, we need to consume 
the already-rolled-over data according
         // to the Initial Start Position property
         boolean rolloverOccurred;
-        if (tailFileChanged) {
+        TailFileObject tfo = states.get(tailFile);
+
+        if (tfo.isTailFileChanged()) {
             rolloverOccurred = false;
             final String recoverPosition = 
context.getProperty(START_POSITION).getValue();
 
             if (START_BEGINNING_OF_TIME.getValue().equals(recoverPosition)) {
-                recoverRolledFiles(context, session, 
this.expectedRecoveryChecksum, state.getTimestamp(), state.getPosition());
+                recoverRolledFiles(context, session, tailFile, 
tfo.getExpectedRecoveryChecksum(), tfo.getState().getTimestamp(), 
tfo.getState().getPosition());
             } else if (START_CURRENT_FILE.getValue().equals(recoverPosition)) {
                 cleanup();
-                state = new 
TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, 0L, 
null, state.getBuffer());
+                tfo.setState(new TailFileState(tailFile, null, null, 0L, 0L, 
0L, null, tfo.getState().getBuffer()));
             } else {
-                final String filename = 
context.getProperty(FILENAME).getValue();
+                final String filename = tailFile;
                 final File file = new File(filename);
 
                 try {
@@ -314,7 +611,7 @@ public class TailFile extends AbstractProcessor {
 
                     fileChannel.position(position);
                     cleanup();
-                    state = new TailFileState(filename, file, fileChannel, 
position, timestamp, checksum, state.getBuffer());
+                    tfo.setState(new TailFileState(filename, file, 
fileChannel, position, timestamp, file.length(), checksum, 
tfo.getState().getBuffer()));
                 } catch (final IOException ioe) {
                     getLogger().error("Attempted to position Reader at current 
position in file {} but failed to do so due to {}", new Object[]{file, 
ioe.toString()}, ioe);
                     context.yield();
@@ -322,25 +619,25 @@ public class TailFile extends AbstractProcessor {
                 }
             }
 
-            tailFileChanged = false;
+            tfo.setTailFileChanged(false);
         } else {
             // Recover any data that may have rolled over since the last time 
that this processor ran.
             // If expectedRecoveryChecksum != null, that indicates that this 
is the first iteration since processor was started, so use whatever checksum 
value
             // was present when the state was last persisted. In this case, we 
must then null out the value so that the next iteration won't keep using the 
"recovered"
             // value. If the value is null, then we know that either the 
processor has already recovered that data, or there was no state persisted. In 
either case,
             // use whatever checksum value is currently in the state.
-            Long expectedChecksumValue = expectedRecoveryChecksum;
+            Long expectedChecksumValue = tfo.getExpectedRecoveryChecksum();
             if (expectedChecksumValue == null) {
-                expectedChecksumValue = state.getChecksum() == null ? null : 
state.getChecksum().getValue();
+                expectedChecksumValue = tfo.getState().getChecksum() == null ? 
null : tfo.getState().getChecksum().getValue();
             }
 
-            rolloverOccurred = recoverRolledFiles(context, session, 
expectedChecksumValue, state.getTimestamp(), state.getPosition());
-            expectedRecoveryChecksum = null;
+            rolloverOccurred = recoverRolledFiles(context, session, tailFile, 
expectedChecksumValue, tfo.getState().getTimestamp(), 
tfo.getState().getPosition());
+            tfo.setExpectedRecoveryChecksum(null);
         }
 
         // initialize local variables from state object; this is done so that 
we can easily change the values throughout
         // the onTrigger method and then create a new state object after we 
finish processing the files.
-        TailFileState state = this.state;
+        TailFileState state = tfo.getState();
         File file = state.getFile();
         FileChannel reader = state.getReader();
         Checksum checksum = state.getChecksum();
@@ -349,10 +646,11 @@ public class TailFile extends AbstractProcessor {
         }
         long position = state.getPosition();
         long timestamp = state.getTimestamp();
+        long length = state.getLength();
 
         // Create a reader if necessary.
         if (file == null || reader == null) {
-            file = new File(context.getProperty(FILENAME).getValue());
+            file = new File(tailFile);
             reader = createReader(file, position);
             if (reader == null) {
                 context.yield();
@@ -363,7 +661,10 @@ public class TailFile extends AbstractProcessor {
         final long startNanos = System.nanoTime();
 
         // Check if file has rotated
-        if (rolloverOccurred) {
+        if (rolloverOccurred
+                || (timestamp <= file.lastModified() && length > file.length())
+                || (timestamp < file.lastModified() && length >= 
file.length())) {
+
             // Since file has rotated, we close the reader, create a new one, 
and then reset our state.
             try {
                 reader.close();
@@ -377,13 +678,11 @@ public class TailFile extends AbstractProcessor {
             checksum.reset();
         }
 
-        if (file.length() == position) {
+        if (file.length() == position || !file.exists()) {
             // no data to consume so rather than continually running, yield to 
allow other processors to use the thread.
-            // In this case, the state should not have changed, and we will 
have created no FlowFiles, so we don't have to
-            // persist the state or commit the session; instead, just return 
here.
             getLogger().debug("No data to consume; created no FlowFiles");
-            state = this.state = new 
TailFileState(context.getProperty(FILENAME).getValue(), file, reader, position, 
timestamp, checksum, state.getBuffer());
-            persistState(state, context);
+            tfo.setState(new TailFileState(tailFile, file, reader, position, 
timestamp, length, checksum, state.getBuffer()));
+            persistState(tfo, context);
             context.yield();
             return;
         }
@@ -420,9 +719,10 @@ public class TailFile extends AbstractProcessor {
                 flowFileName = baseName + "." + position + "-" + 
positionHolder.get();
             }
 
-            final Map<String, String> attributes = new HashMap<>(2);
+            final Map<String, String> attributes = new HashMap<>(3);
             attributes.put(CoreAttributes.FILENAME.key(), flowFileName);
             attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
+            attributes.put("tailfile.original.path", tailFile);
             flowFile = session.putAllAttributes(flowFile, attributes);
 
             session.getProvenanceReporter().receive(flowFile, 
file.toURI().toString(), "FlowFile contains bytes " + position + " through " + 
positionHolder.get() + " of source file",
@@ -436,16 +736,16 @@ public class TailFile extends AbstractProcessor {
             // operating system file last mod precision), then we could set 
the timestamp to a smaller value, which could result in reading in the
             // rotated file a second time.
             timestamp = Math.max(state.getTimestamp(), file.lastModified());
+            length = file.length();
             getLogger().debug("Created {} and routed to success", new 
Object[]{flowFile});
         }
 
         // Create a new state object to represent our current position, 
timestamp, etc.
-        final TailFileState updatedState = new 
TailFileState(context.getProperty(FILENAME).getValue(), file, reader, position, 
timestamp, checksum, state.getBuffer());
-        this.state = updatedState;
+        tfo.setState(new TailFileState(tailFile, file, reader, position, 
timestamp, length, checksum, state.getBuffer()));
 
         // We must commit session before persisting state in order to avoid 
data loss on restart
         session.commit();
-        persistState(updatedState, context);
+        persistState(tfo, context);
     }
 
     /**
@@ -551,17 +851,18 @@ public class TailFile extends AbstractProcessor {
      * @return a list of all Files that have rolled over
      * @throws IOException if unable to perform the listing of files
      */
-    private List<File> getRolledOffFiles(final ProcessContext context, final 
long minTimestamp) throws IOException {
-        final String tailFilename = context.getProperty(FILENAME).getValue();
-        final File tailFile = new File(tailFilename);
+    private List<File> getRolledOffFiles(final ProcessContext context, final 
long minTimestamp, final String tailFilePath) throws IOException {
+        final File tailFile = new File(tailFilePath);
         File directory = tailFile.getParentFile();
         if (directory == null) {
             directory = new File(".");
         }
 
-        final String rollingPattern = 
context.getProperty(ROLLING_FILENAME_PATTERN).getValue();
+        String rollingPattern = 
context.getProperty(ROLLING_FILENAME_PATTERN).getValue();
         if (rollingPattern == null) {
             return Collections.emptyList();
+        } else {
+            rollingPattern = rollingPattern.replace("${filename}", 
StringUtils.substringBeforeLast(tailFile.getName(), "."));
         }
 
         final List<File> rolledOffFiles = new ArrayList<>();
@@ -610,13 +911,21 @@ public class TailFile extends AbstractProcessor {
         return Scope.LOCAL;
     }
 
-    private void persistState(final TailFileState state, final ProcessContext 
context) {
-        persistState(state.toStateMap(), context);
+    private void persistState(final TailFileObject tfo, final ProcessContext 
context) {
+        persistState(tfo.getState().toStateMap(tfo.getFilenameIndex()), 
context);
     }
 
     private void persistState(final Map<String, String> state, final 
ProcessContext context) {
         try {
-            context.getStateManager().setState(state, getStateScope(context));
+            StateMap oldState = 
context.getStateManager().getState(getStateScope(context));
+            Map<String, String> updatedState = new HashMap<String, String>();
+
+            for(String key : oldState.toMap().keySet()) {
+                updatedState.put(key, oldState.get(key));
+            }
+
+            updatedState.putAll(state);
+            context.getStateManager().setState(updatedState, 
getStateScope(context));
         } catch (final IOException e) {
             getLogger().warn("Failed to store state due to {}; some data may 
be duplicated on restart of NiFi", new Object[]{e});
         }
@@ -652,8 +961,8 @@ public class TailFile extends AbstractProcessor {
     }
 
     // for testing purposes
-    TailFileState getState() {
-        return state;
+    Map<String, TailFileObject> getState() {
+        return states;
     }
 
     /**
@@ -678,14 +987,14 @@ public class TailFile extends AbstractProcessor {
      * @return <code>true</code> if the file being tailed has rolled over,
      * <code>false</code> otherwise
      */
-    private boolean recoverRolledFiles(final ProcessContext context, final 
ProcessSession session, final Long expectedChecksum, final long timestamp, 
final long position) {
+    private boolean recoverRolledFiles(final ProcessContext context, final 
ProcessSession session, final String tailFile, final Long expectedChecksum, 
final long timestamp, final long position) {
         try {
             // Find all files that match our rollover pattern, if any, and 
order them based on their timestamp and filename.
             // Ignore any file that has a timestamp earlier than the state 
that we have persisted. If we were reading from
             // a file when we stopped running, then that file that we were 
reading from should be the first file in this list,
             // assuming that the file still exists on the file system.
-            final List<File> rolledOffFiles = getRolledOffFiles(context, 
timestamp);
-            return recoverRolledFiles(context, session, rolledOffFiles, 
expectedChecksum, timestamp, position);
+            final List<File> rolledOffFiles = getRolledOffFiles(context, 
timestamp, tailFile);
+            return recoverRolledFiles(context, session, tailFile, 
rolledOffFiles, expectedChecksum, timestamp, position);
         } catch (final IOException e) {
             getLogger().error("Failed to recover files that have rolled over 
due to {}", new Object[]{e});
             return false;
@@ -714,10 +1023,11 @@ public class TailFile extends AbstractProcessor {
      * @return <code>true</code> if the file being tailed has rolled over, 
false
      * otherwise
      */
-    private boolean recoverRolledFiles(final ProcessContext context, final 
ProcessSession session, final List<File> rolledOffFiles, final Long 
expectedChecksum,
+    private boolean recoverRolledFiles(final ProcessContext context, final 
ProcessSession session, final String tailFile, final List<File> rolledOffFiles, 
final Long expectedChecksum,
             final long timestamp, final long position) {
         try {
             getLogger().debug("Recovering Rolled Off Files; total number of 
files rolled off = {}", new Object[]{rolledOffFiles.size()});
+            TailFileObject tfo = states.get(tailFile);
 
             // For first file that we find, it may or may not be the file that 
we were last reading from.
             // As a result, we have to read up to the position we stored, 
while calculating the checksum. If the checksums match,
@@ -746,9 +1056,13 @@ public class TailFile extends AbstractProcessor {
                                 session.remove(flowFile);
                                 // use a timestamp of lastModified() + 1 so 
that we do not ingest this file again.
                                 cleanup();
-                                state = new 
TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, 
firstFile.lastModified() + 1L, null, state.getBuffer());
+                                tfo.setState(new TailFileState(tailFile, null, 
null, 0L, firstFile.lastModified() + 1L, firstFile.length(), null, 
tfo.getState().getBuffer()));
                             } else {
-                                flowFile = session.putAttribute(flowFile, 
"filename", firstFile.getName());
+                                final Map<String, String> attributes = new 
HashMap<>(3);
+                                attributes.put(CoreAttributes.FILENAME.key(), 
firstFile.getName());
+                                attributes.put(CoreAttributes.MIME_TYPE.key(), 
"text/plain");
+                                attributes.put("tailfile.original.path", 
tailFile);
+                                flowFile = session.putAllAttributes(flowFile, 
attributes);
 
                                 
session.getProvenanceReporter().receive(flowFile, firstFile.toURI().toString(), 
"FlowFile contains bytes 0 through " + position + " of source file",
                                         
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
@@ -757,11 +1071,11 @@ public class TailFile extends AbstractProcessor {
 
                                 // use a timestamp of lastModified() + 1 so 
that we do not ingest this file again.
                                 cleanup();
-                                state = new 
TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, 
firstFile.lastModified() + 1L, null, state.getBuffer());
+                                tfo.setState(new TailFileState(tailFile, null, 
null, 0L, firstFile.lastModified() + 1L, firstFile.length(), null, 
tfo.getState().getBuffer()));
 
                                 // must ensure that we do session.commit() 
before persisting state in order to avoid data loss.
                                 session.commit();
-                                persistState(state, context);
+                                persistState(tfo, context);
                             }
                         } else {
                             getLogger().debug("Checksum for {} did not match 
expected checksum. Checksum for file was {} but expected {}. Will consume 
entire file",
@@ -776,7 +1090,7 @@ public class TailFile extends AbstractProcessor {
             // we were reading when we last stopped, as it may already have 
been partially consumed. That is taken care of in the
             // above block of code.
             for (final File file : rolledOffFiles) {
-                state = consumeFileFully(file, context, session, state);
+                tfo.setState(consumeFileFully(file, context, session, tfo));
             }
 
             return rolloverOccurred;
@@ -794,32 +1108,92 @@ public class TailFile extends AbstractProcessor {
      * @param file the file to ingest
      * @param context the ProcessContext
      * @param session the ProcessSession
-     * @param state the current state
+     * @param tfo the current state
      *
      * @return the new, updated state that reflects that the given file has 
been
      * ingested.
      */
-    private TailFileState consumeFileFully(final File file, final 
ProcessContext context, final ProcessSession session, TailFileState state) {
+    private TailFileState consumeFileFully(final File file, final 
ProcessContext context, final ProcessSession session, TailFileObject tfo) {
         FlowFile flowFile = session.create();
         flowFile = session.importFrom(file.toPath(), true, flowFile);
         if (flowFile.getSize() == 0L) {
             session.remove(flowFile);
         } else {
-            flowFile = session.putAttribute(flowFile, "filename", 
file.getName());
+            final Map<String, String> attributes = new HashMap<>(3);
+            attributes.put(CoreAttributes.FILENAME.key(), file.getName());
+            attributes.put(CoreAttributes.MIME_TYPE.key(), "text/plain");
+            attributes.put("tailfile.original.path", 
tfo.getState().getFilename());
+            flowFile = session.putAllAttributes(flowFile, attributes);
             session.getProvenanceReporter().receive(flowFile, 
file.toURI().toString());
             session.transfer(flowFile, REL_SUCCESS);
             getLogger().debug("Created {} from {} and routed to success", new 
Object[]{flowFile, file});
 
             // use a timestamp of lastModified() + 1 so that we do not ingest 
this file again.
             cleanup();
-            state = new 
TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, 
file.lastModified() + 1L, null, state.getBuffer());
+            tfo.setState(new 
TailFileState(context.getProperty(FILENAME).getValue(), null, null, 0L, 
file.lastModified() + 1L, file.length(), null, tfo.getState().getBuffer()));
 
             // must ensure that we do session.commit() before persisting state 
in order to avoid data loss.
             session.commit();
-            persistState(state, context);
+            persistState(tfo, context);
+        }
+
+        return tfo.getState();
+    }
+
+    static class TailFileObject {
+
+        private TailFileState state = new TailFileState(null, null, null, 0L, 
0L, 0L, null, ByteBuffer.allocate(65536));
+        private Long expectedRecoveryChecksum;
+        private int filenameIndex;
+        private boolean tailFileChanged = true;
+
+        public TailFileObject(int i) {
+            this.filenameIndex = i;
+        }
+
+        public TailFileObject(int index, Map<String, String> statesMap) {
+            this.filenameIndex = index;
+            this.tailFileChanged = false;
+            final String prefix = MAP_PREFIX + index + '.';
+            final String filename = statesMap.get(prefix + 
TailFileState.StateKeys.FILENAME);
+            final long position = Long.valueOf(statesMap.get(prefix + 
TailFileState.StateKeys.POSITION));
+            final long timestamp = Long.valueOf(statesMap.get(prefix + 
TailFileState.StateKeys.TIMESTAMP));
+            final long length = Long.valueOf(statesMap.get(prefix + 
TailFileState.StateKeys.LENGTH));
+            this.state = new TailFileState(filename, new File(filename), null, 
position, timestamp, length, null, ByteBuffer.allocate(65536));
+        }
+
+        public int getFilenameIndex() {
+            return filenameIndex;
+        }
+
+        public void setFilenameIndex(int filenameIndex) {
+            this.filenameIndex = filenameIndex;
+        }
+
+        public TailFileState getState() {
+            return state;
+        }
+
+        public void setState(TailFileState state) {
+            this.state = state;
+        }
+
+        public Long getExpectedRecoveryChecksum() {
+            return expectedRecoveryChecksum;
+        }
+
+        public void setExpectedRecoveryChecksum(Long expectedRecoveryChecksum) 
{
+            this.expectedRecoveryChecksum = expectedRecoveryChecksum;
+        }
+
+        public boolean isTailFileChanged() {
+            return tailFileChanged;
+        }
+
+        public void setTailFileChanged(boolean tailFileChanged) {
+            this.tailFileChanged = tailFileChanged;
         }
 
-        return state;
     }
 
     /**
@@ -833,22 +1207,25 @@ public class TailFile extends AbstractProcessor {
         private final FileChannel reader;
         private final long position;
         private final long timestamp;
+        private final long length;
         private final Checksum checksum;
         private final ByteBuffer buffer;
 
         private static class StateKeys {
-
             public static final String FILENAME = "filename";
             public static final String POSITION = "position";
             public static final String TIMESTAMP = "timestamp";
             public static final String CHECKSUM = "checksum";
+            public static final String LENGTH = "length";
         }
 
-        public TailFileState(final String filename, final File file, final 
FileChannel reader, final long position, final long timestamp, final Checksum 
checksum, final ByteBuffer buffer) {
+        public TailFileState(final String filename, final File file, final 
FileChannel reader,
+                final long position, final long timestamp, final long length, 
final Checksum checksum, final ByteBuffer buffer) {
             this.filename = filename;
             this.file = file;
             this.reader = reader;
             this.position = position;
+            this.length = length;
             this.timestamp = timestamp; // many operating systems will use 
only second-level precision for last-modified times so cut off milliseconds
             this.checksum = checksum;
             this.buffer = buffer;
@@ -874,6 +1251,10 @@ public class TailFile extends AbstractProcessor {
             return timestamp;
         }
 
+        public long getLength() {
+            return length;
+        }
+
         public Checksum getChecksum() {
             return checksum;
         }
@@ -887,12 +1268,14 @@ public class TailFile extends AbstractProcessor {
             return "TailFileState[filename=" + filename + ", position=" + 
position + ", timestamp=" + timestamp + ", checksum=" + (checksum == null ? 
"null" : checksum.getValue()) + "]";
         }
 
-        public Map<String, String> toStateMap() {
+        public Map<String, String> toStateMap(int index) {
+            final String prefix = MAP_PREFIX + index + '.';
             final Map<String, String> map = new HashMap<>(4);
-            map.put(StateKeys.FILENAME, filename);
-            map.put(StateKeys.POSITION, String.valueOf(position));
-            map.put(StateKeys.TIMESTAMP, String.valueOf(timestamp));
-            map.put(StateKeys.CHECKSUM, checksum == null ? null : 
String.valueOf(checksum.getValue()));
+            map.put(prefix + StateKeys.FILENAME, filename);
+            map.put(prefix + StateKeys.POSITION, String.valueOf(position));
+            map.put(prefix + StateKeys.LENGTH, String.valueOf(length));
+            map.put(prefix + StateKeys.TIMESTAMP, String.valueOf(timestamp));
+            map.put(prefix + StateKeys.CHECKSUM, checksum == null ? null : 
String.valueOf(checksum.getValue()));
             return map;
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/930e95aa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.TailFile/additionalDetails.html
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.TailFile/additionalDetails.html
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.TailFile/additionalDetails.html
new file mode 100644
index 0000000..8e9eaae
--- /dev/null
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/docs/org.apache.nifi.processors.standard.TailFile/additionalDetails.html
@@ -0,0 +1,134 @@
+<!DOCTYPE html>
+<html lang="en">
+    <!--
+      Licensed to the Apache Software Foundation (ASF) under one or more
+      contributor license agreements.  See the NOTICE file distributed with
+      this work for additional information regarding copyright ownership.
+      The ASF licenses this file to You under the Apache License, Version 2.0
+      (the "License"); you may not use this file except in compliance with
+      the License.  You may obtain a copy of the License at
+          http://www.apache.org/licenses/LICENSE-2.0
+      Unless required by applicable law or agreed to in writing, software
+      distributed under the License is distributed on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+      See the License for the specific language governing permissions and
+      limitations under the License.
+    -->
+    <head>
+        <meta charset="utf-8" />
+        <title>TailFile</title>
+
+        <link rel="stylesheet" href="../../css/component-usage.css" 
type="text/css" />
+    </head>
+
+    <body>
+        <h3>Modes</h3>
+        <p>
+            This processor is used to tail a file or multiple files according 
to multiple modes. The 
+            mode to choose depends of the logging pattern followed by the 
file(s) to tail. In any case, if there
+            is a rolling pattern, the rolling files must be plain text files 
(compression is not supported at 
+            the moment).
+        </p>
+        <ul>
+               <li><b>Single file</b>: the processor will tail the file with 
the path given in 'File(s) to tail' property.</li>
+               <li><b>Multiple files</b>: the processor will look for files 
into the 'Base directory'. It will look for file recursively
+               according to the 'Recursive lookup' property and will tail all 
the files matching the regular expression
+               provided in the 'File(s) to tail' property.</li>
+        </ul>
+        <h3>Rolling filename pattern</h3>
+        <p>
+               In case the 'Rolling filename pattern' property is used, when 
the processor detects that the file to tail has rolled over, the
+               processor will look for possible missing messages in the rolled 
file. To do so, the processor will use the pattern to find the 
+               rolling files in the same directory as the file to tail.
+        </p>
+        <p>
+               In order to keep this property available in the 'Multiple 
files' mode when multiples files to tail are in the same directory, 
+               it is possible to use the ${filename} tag to reference the name 
(without extension) of the file to tail. For example, if we have:
+        </p>
+               <p>
+               <code>
+                       /my/path/directory/my-app.log.1<br />
+                       /my/path/directory/my-app.log<br />
+                       /my/path/directory/application.log.1<br />
+                       /my/path/directory/application.log
+               </code>
+               </p>
+               <p>
+                       the 'rolling filename pattern' would be 
<i>${filename}.log.*</i>.
+               </p>
+        <h3>Descriptions for different modes and strategies</h3>
+        <p>
+               The '<b>Single file</b>' mode assumes that the file to tail has 
always the same name even if there is a rolling pattern.
+               Example:
+        </p>
+               <p>
+               <code>
+                       /my/path/directory/my-app.log.2<br />
+                       /my/path/directory/my-app.log.1<br />
+                       /my/path/directory/my-app.log
+               </code>
+               </p>
+               <p>
+               and new log messages are always appended in my-app.log file.
+        </p>
+        <p>
+               In case recursivity is set to 'true'. The regular expression 
for the files to tail must embrace the possible intermediate directories 
+               between the base directory and the files to tail. Example:
+        </p>
+        <p>
+               <code>
+                       /my/path/directory1/my-app1.log<br />
+                       /my/path/directory2/my-app2.log<br />
+                       /my/path/directory3/my-app3.log
+               </code>
+               </p>
+        <p>
+               <code>
+                       Base directory: /my/path<br />
+                       Files to tail: directory[1-3]/my-app[1-3].log<br />
+                       Recursivity: true
+               </code>
+               </p>
+        <p>
+               In the '<b>Multiple files</b>' mode, it is possible to specify 
if the file to tail has always the same name or not. It is done through
+               the property 'Rolling strategy'. The strategy can be 'Fixed 
name' in case the files to tail have always the same name (see example above) 
+               or can be 'Changing name' in case the files to tail do not 
always have the same name. Example:
+        </p>
+               <p>
+               <code>
+                       /my/path/directory/my-app-2016-09-06.log<br />
+                       /my/path/directory/my-app-2016-09-07.log<br />
+                       /my/path/directory/my-app-2016-09-08.log
+               </code>
+               </p>
+               <p>
+               and new log messages are always appended in log file of the 
current day.
+        </p>
+        <p>
+               If the processor is configured with '<b>Multiple files</b>' 
mode and '<b>Fixed name</b>' rolling strategy, the processor will list the 
+               files to tail in the 'Base directory' (recursively or not) and 
matching the regular expression. This listing will only occur once when 
+               the processor is started. In this configuration, the processor 
will act as in 'Single file' mode for all files listed by the processor 
+               when started.
+        </p>
+        <p>
+               If the processor is configured with '<b>Multiple files</b>' 
mode and '<b>Changing name</b>' rolling strategy, two new properties are 
+               mandatory:
+        </p>
+        <ul>
+               <li><b>Lookup frequency</b>: specifies the minimum duration the 
processor will wait before listing again the files to tail.</li>
+               <li><b>Maximum age</b>: specifies the necessary minimum 
duration to consider that no new messages will be appended in a file 
+               regarding its last modification date.</li>
+        </ul>
+        <p>
+               It is necessary to pay attention to 'Lookup frequency' and 
'Maximum age' properties as well as the frequency at which the processor is 
+               triggered in order to keep good performances. It is recommended 
to keep 'Maximum age' > 'Lookup frequency' > processor scheduling 
+               frequency to avoid loosing messages. It also recommended not to 
set 'Maximum Age' too low because if messages are appended in a file 
+               after this file has been considered "too old", all the messages 
in the file may be read again leading to data duplication.
+        </p>
+        <p>
+               Besides, if the processor is configured with '<b>Multiple 
files</b>' mode and '<b>Changing name</b>' rolling strategy, the 'Rolling 
+               filename pattern' property must be specific enough to ensure 
that only the rolling files will be listed and not other currently tailed
+               files in the same directory (this can be achieved using 
${filename} tag).
+        </p>
+    </body>
+</html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/930e95aa/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
index 4091dbf..73b8544 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
@@ -37,36 +37,38 @@ import org.junit.Test;
 public class TestTailFile {
 
     private File file;
-    private TailFile processor;
+    private File otherFile;
+
     private RandomAccessFile raf;
+    private RandomAccessFile otherRaf;
+
+    private TailFile processor;
     private TestRunner runner;
 
     @Before
     public void setup() throws IOException {
         
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard",
 "TRACE");
-
-        final File targetDir = new File("target");
-        final File[] files = targetDir.listFiles(new FilenameFilter() {
-            @Override
-            public boolean accept(final File dir, final String name) {
-                return name.startsWith("log");
-            }
-        });
-
-        for (final File file : files) {
-            file.delete();
-        }
+        clean();
 
         file = new File("target/log.txt");
         file.delete();
         assertTrue(file.createNewFile());
 
+        File directory = new File("target/testDir");
+        if(!directory.exists()) {
+            assertTrue(directory.mkdirs());
+        }
+        otherFile = new File("target/testDir/log.txt");
+        otherFile.delete();
+        assertTrue(otherFile.createNewFile());
+
         processor = new TailFile();
         runner = TestRunners.newTestRunner(processor);
         runner.setProperty(TailFile.FILENAME, "target/log.txt");
         runner.assertValid();
 
         raf = new RandomAccessFile(file, "rw");
+        otherRaf = new RandomAccessFile(otherFile, "rw");
     }
 
     @After
@@ -75,6 +77,10 @@ public class TestTailFile {
             raf.close();
         }
 
+        if (otherRaf != null) {
+            otherRaf.close();
+        }
+
         processor.cleanup();
     }
 
@@ -436,7 +442,7 @@ public class TestTailFile {
         runner.run();
         runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
 
-        final TailFileState state = ((TailFile) 
runner.getProcessor()).getState();
+        final TailFileState state = ((TailFile) 
runner.getProcessor()).getState().get("target/log.txt").getState();
         assertNotNull(state);
         assertEquals("target/log.txt", state.getFilename());
         assertTrue(state.getTimestamp() <= System.currentTimeMillis());
@@ -500,4 +506,286 @@ public class TestTailFile {
         runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
     }
 
+    @Test
+    public void testRolloverWhenNoRollingPattern() throws IOException {
+        // write out some data and ingest it.
+        raf.write("hello there\n".getBytes());
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        runner.clearTransferState();
+
+        // move the file and write data to the new log.txt file.
+        raf.write("another".getBytes());
+        raf.close();
+        file.renameTo(new File("target/log.1"));
+        raf = new RandomAccessFile(file, "rw");
+        raf.write("new file\n".getBytes());
+
+        // because the roll over pattern has not been set we are not able to 
get
+        // data before the file has been moved, but we still want to ingest 
data
+        // from the tailed file
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("new
 file\n");
+        runner.clearTransferState();
+
+        // in the unlikely case where more data is written after the file is 
moved
+        // we are not able to detect it is a completely new file, then we 
continue
+        // on the tailed file as it never changed
+        raf.close();
+        file.renameTo(new File("target/log.2"));
+        raf = new RandomAccessFile(file, "rw");
+        raf.write("new file with longer data in the new file\n".getBytes());
+
+        runner.run();
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("with
 longer data in the new file\n");
+        runner.clearTransferState();
+    }
+
+    @Test
+    public void testMultipleFiles() throws IOException, InterruptedException {
+        runner.setProperty(TailFile.BASE_DIRECTORY, "target");
+        runner.setProperty(TailFile.MODE, TailFile.MODE_MULTIFILE);
+        runner.setProperty(TailFile.FILENAME, "(testDir/)?log(ging)?.txt");
+        runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "${filename}.?");
+        runner.setProperty(TailFile.START_POSITION, 
TailFile.START_CURRENT_FILE);
+        runner.setProperty(TailFile.RECURSIVE, "true");
+
+        runner.run(1);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+
+        // I manually add a third file to tail here
+        // I'll remove it later in the test
+        File thirdFile = new File("target/logging.txt");
+        if(thirdFile.exists()) {
+            thirdFile.delete();
+        }
+        assertTrue(thirdFile.createNewFile());
+        RandomAccessFile thirdFileRaf = new RandomAccessFile(thirdFile, "rw");
+        thirdFileRaf.write("hey\n".getBytes());
+
+        otherRaf.write("hi\n".getBytes());
+        raf.write("hello\n".getBytes());
+
+        runner.run(1);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 3);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hey\n");
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertAttributeEquals("tailfile.original.path",
 thirdFile.getPath());
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("hi\n");
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertAttributeEquals("tailfile.original.path",
 otherFile.getPath());
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(2).assertContentEquals("hello\n");
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(2).assertAttributeEquals("tailfile.original.path",
 file.getPath());
+        runner.clearTransferState();
+
+        otherRaf.write("world!".getBytes());
+        raf.write("world".getBytes());
+
+        Thread.sleep(100L);
+
+        runner.run(1);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0); // 
should not pull in data because no \n
+
+        raf.close();
+        otherRaf.close();
+        thirdFileRaf.close();
+        thirdFile.delete();
+
+        file.renameTo(new File("target/log.1"));
+        otherFile.renameTo(new File("target/testDir/log.1"));
+
+        raf = new RandomAccessFile(new File("target/log.txt"), "rw");
+        raf.write("1\n".getBytes());
+
+        otherRaf = new RandomAccessFile(new File("target/testDir/log.txt"), 
"rw");
+        otherRaf.write("2\n".getBytes());
+
+        // I also add a new file here
+        File fourthFile = new File("target/testDir/logging.txt");
+        if(fourthFile.exists()) {
+            fourthFile.delete();
+        }
+        assertTrue(fourthFile.createNewFile());
+        RandomAccessFile fourthFileRaf = new RandomAccessFile(fourthFile, 
"rw");
+        fourthFileRaf.write("3\n".getBytes());
+        fourthFileRaf.close();
+
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 5);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("3\n");
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("world!");
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(2).assertContentEquals("2\n");
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(3).assertContentEquals("world");
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(4).assertContentEquals("1\n");
+    }
+
+    /**
+     * This test is used to check the case where we have multiple files in the 
same directory
+     * and where it is not possible to specify a single rolling pattern for 
all files.
+     */
+    @Test
+    public void testMultipleFilesInSameDirectory() throws IOException, 
InterruptedException {
+        runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "${filename}.?");
+        runner.setProperty(TailFile.START_POSITION, 
TailFile.START_CURRENT_FILE);
+        runner.setProperty(TailFile.BASE_DIRECTORY, "target");
+        runner.setProperty(TailFile.FILENAME, "log(ging)?.txt");
+        runner.setProperty(TailFile.MODE, TailFile.MODE_MULTIFILE);
+
+        runner.run(1);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+
+        File myOtherFile = new File("target/logging.txt");
+        if(myOtherFile.exists()) {
+            myOtherFile.delete();
+        }
+        assertTrue(myOtherFile.createNewFile());
+
+        RandomAccessFile myOtherRaf = new RandomAccessFile(myOtherFile, "rw");
+        myOtherRaf.write("hey\n".getBytes());
+
+        raf.write("hello\n".getBytes());
+
+        runner.run(1);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hey\n");
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertAttributeEquals("tailfile.original.path",
 myOtherFile.getPath());
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("hello\n");
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertAttributeEquals("tailfile.original.path",
 file.getPath());
+        runner.clearTransferState();
+
+        myOtherRaf.write("guys".getBytes());
+        raf.write("world".getBytes());
+
+        Thread.sleep(100L);
+
+        runner.run(1);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0); // 
should not pull in data because no \n
+
+        raf.close();
+        myOtherRaf.close();
+
+        // roll over
+        myOtherFile.renameTo(new File("target/logging.1"));
+        file.renameTo(new File("target/log.1"));
+
+        raf = new RandomAccessFile(new File("target/log.txt"), "rw");
+        raf.write("1\n".getBytes());
+
+        myOtherRaf = new RandomAccessFile(new File("target/logging.txt"), 
"rw");
+        myOtherRaf.write("2\n".getBytes());
+        myOtherRaf.close();
+
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 4);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("guys");
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("2\n");
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(2).assertContentEquals("world");
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(3).assertContentEquals("1\n");
+    }
+
+    @Test
+    public void testMultipleFilesChangingNameStrategy() throws IOException, 
InterruptedException {
+        runner.setProperty(TailFile.START_POSITION, 
TailFile.START_CURRENT_FILE);
+        runner.setProperty(TailFile.MODE, TailFile.MODE_MULTIFILE);
+        runner.setProperty(TailFile.ROLLING_STRATEGY, TailFile.CHANGING_NAME);
+        runner.setProperty(TailFile.BASE_DIRECTORY, "target");
+        runner.setProperty(TailFile.FILENAME, ".*app-.*.log");
+        runner.setProperty(TailFile.LOOKUP_FREQUENCY, "2s");
+        runner.setProperty(TailFile.MAXIMUM_AGE, "5s");
+
+        File multiChangeFirstFile = new File("target/app-2016-09-07.log");
+        if(multiChangeFirstFile.exists()) {
+            multiChangeFirstFile.delete();
+        }
+        assertTrue(multiChangeFirstFile.createNewFile());
+
+        RandomAccessFile multiChangeFirstRaf = new 
RandomAccessFile(multiChangeFirstFile, "rw");
+        multiChangeFirstRaf.write("hey\n".getBytes());
+
+        File multiChangeSndFile = new File("target/my-app-2016-09-07.log");
+        if(multiChangeSndFile.exists()) {
+            multiChangeSndFile.delete();
+        }
+        assertTrue(multiChangeSndFile.createNewFile());
+
+        RandomAccessFile multiChangeSndRaf = new 
RandomAccessFile(multiChangeSndFile, "rw");
+        multiChangeSndRaf.write("hello\n".getBytes());
+
+        runner.run(1, false);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("hey\n");
+        runner.clearTransferState();
+
+        multiChangeFirstRaf.write("hey2\n".getBytes());
+        multiChangeSndRaf.write("hello2\n".getBytes());
+
+        Thread.sleep(2000);
+        runner.run(1, false);
+
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello2\n");
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("hey2\n");
+        runner.clearTransferState();
+
+        multiChangeFirstRaf.write("hey3\n".getBytes());
+        multiChangeSndRaf.write("hello3\n".getBytes());
+
+        multiChangeFirstRaf.close();
+        multiChangeSndRaf.close();
+
+        multiChangeFirstFile = new File("target/app-2016-09-08.log");
+        if(multiChangeFirstFile.exists()) {
+            multiChangeFirstFile.delete();
+        }
+        assertTrue(multiChangeFirstFile.createNewFile());
+
+        multiChangeFirstRaf = new RandomAccessFile(multiChangeFirstFile, "rw");
+        multiChangeFirstRaf.write("hey\n".getBytes());
+
+        multiChangeSndFile = new File("target/my-app-2016-09-08.log");
+        if(multiChangeSndFile.exists()) {
+            multiChangeSndFile.delete();
+        }
+        assertTrue(multiChangeSndFile.createNewFile());
+
+        multiChangeSndRaf = new RandomAccessFile(multiChangeSndFile, "rw");
+        multiChangeSndRaf.write("hello\n".getBytes());
+
+        Thread.sleep(2000);
+        runner.run(1);
+        multiChangeFirstRaf.close();
+        multiChangeSndRaf.close();
+
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 4);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello3\n");
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("hello\n");
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(2).assertContentEquals("hey3\n");
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(3).assertContentEquals("hey\n");
+        runner.clearTransferState();
+    }
+
+    private void cleanFiles(String directory) {
+        final File targetDir = new File(directory);
+        if(targetDir.exists()) {
+            final File[] files = targetDir.listFiles(new FilenameFilter() {
+                @Override
+                public boolean accept(final File dir, final String name) {
+                    return name.startsWith("log") || name.endsWith("log");
+                }
+            });
+
+            for (final File file : files) {
+                file.delete();
+            }
+        }
+    }
+
+    private void clean() {
+        cleanFiles("target");
+        cleanFiles("target/testDir");
+    }
+
 }

Reply via email to