Repository: nifi
Updated Branches:
  refs/heads/master e23b23561 -> 6bcc415eb


NIFI-2457 removed old state file mechanism and fixed state reset logic

This closes #768.

Signed-off-by: Bryan Bende <bbe...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/6bcc415e
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/6bcc415e
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/6bcc415e

Branch: refs/heads/master
Commit: 6bcc415eb80ea094e1245927cc2bbaa4641e6b12
Parents: e23b235
Author: joewitt <joew...@apache.org>
Authored: Mon Aug 1 23:57:56 2016 -0400
Committer: Bryan Bende <bbe...@apache.org>
Committed: Tue Aug 2 12:11:43 2016 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/TailFile.java      | 357 ++++++++-----------
 .../nifi/processors/standard/TestTailFile.java  |  15 +-
 2 files changed, 157 insertions(+), 215 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/6bcc415e/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
index 1ebf736..16bde4c 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
@@ -14,14 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.nifi.processors.standard;
 
 import java.io.BufferedOutputStream;
-import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
@@ -75,69 +72,64 @@ import org.apache.nifi.stream.io.StreamUtils;
 @InputRequirement(Requirement.INPUT_FORBIDDEN)
 @Tags({"tail", "file", "log", "text", "source"})
 @CapabilityDescription("\"Tails\" a file, ingesting data from the file as it 
is written to the file. The file is expected to be textual. Data is ingested 
only when a "
-    + "new line is encountered (carriage return or new-line character or 
combination). If the file to tail is periodically \"rolled over\", as is 
generally the case "
-    + "with log files, an optional Rolling Filename Pattern can be used to 
retrieve data from files that have rolled over, even if the rollover occurred 
while NiFi "
-    + "was not running (provided that the data still exists upon restart of 
NiFi). It is generally advisable to set the Run Schedule to a few seconds, 
rather than running "
-    + "with the default value of 0 secs, as this Processor will consume a lot 
of resources if scheduled very aggressively. At this time, this Processor does 
not support "
-    + "ingesting files that have been compressed when 'rolled over'.")
+        + "new line is encountered (carriage return or new-line character or 
combination). If the file to tail is periodically \"rolled over\", as is 
generally the case "
+        + "with log files, an optional Rolling Filename Pattern can be used to 
retrieve data from files that have rolled over, even if the rollover occurred 
while NiFi "
+        + "was not running (provided that the data still exists upon restart 
of NiFi). It is generally advisable to set the Run Schedule to a few seconds, 
rather than running "
+        + "with the default value of 0 secs, as this Processor will consume a 
lot of resources if scheduled very aggressively. At this time, this Processor 
does not support "
+        + "ingesting files that have been compressed when 'rolled over'.")
 @Stateful(scopes = {Scope.LOCAL, Scope.CLUSTER}, description = "Stores state 
about where in the Tailed File it left off so that on restart it does not have 
to duplicate data. "
-    + "State is stored either local or clustered depend on the <File Location> 
property.")
+        + "State is stored either local or clustered depend on the <File 
Location> property.")
 public class TailFile extends AbstractProcessor {
 
-    static final AllowableValue LOCATION_LOCAL = new AllowableValue("Local", 
"Local", "File is located on a local disk drive. Each node in a cluster will 
tail a different file.");
-    static final AllowableValue LOCATION_REMOTE = new AllowableValue("Remote", 
"Remote", "File is located on a remote resource. This Processor will store 
state across the cluster so that "
-        + "it can be run on Primary Node Only and a new Primary Node can pick 
up where the last one left off.");
+    static final AllowableValue LOCATION_LOCAL = new AllowableValue("Local", 
"Local", "State is stored locally. Each node in a cluster will tail a different 
file.");
+    static final AllowableValue LOCATION_REMOTE = new AllowableValue("Remote", 
"Remote", "State is located on a remote resource. This Processor will store 
state across the cluster so that "
+            + "it can be run on Primary Node Only and a new Primary Node can 
pick up where the last one left off.");
 
     static final AllowableValue START_BEGINNING_OF_TIME = new 
AllowableValue("Beginning of Time", "Beginning of Time",
-        "Start with the oldest data that matches the Rolling Filename Pattern 
and then begin reading from the File to Tail");
+            "Start with the oldest data that matches the Rolling Filename 
Pattern and then begin reading from the File to Tail");
     static final AllowableValue START_CURRENT_FILE = new 
AllowableValue("Beginning of File", "Beginning of File",
-        "Start with the beginning of the File to Tail. Do not ingest any data 
that has already been rolled over");
+            "Start with the beginning of the File to Tail. Do not ingest any 
data that has already been rolled over");
     static final AllowableValue START_CURRENT_TIME = new 
AllowableValue("Current Time", "Current Time",
-        "Start with the data at the end of the File to Tail. Do not ingest any 
data thas has already been rolled over or any data in the File to Tail that has 
already been written.");
+            "Start with the data at the end of the File to Tail. Do not ingest 
any data thas has already been rolled over or any data in the File to Tail that 
has already been written.");
 
     static final PropertyDescriptor FILENAME = new PropertyDescriptor.Builder()
-        .name("File to Tail")
-        .description("Fully-qualified filename of the file that should be 
tailed")
-        .expressionLanguageSupported(false)
-        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .required(true)
-        .build();
+            .name("File to Tail")
+            .description("Fully-qualified filename of the file that should be 
tailed")
+            .expressionLanguageSupported(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .required(true)
+            .build();
     static final PropertyDescriptor ROLLING_FILENAME_PATTERN = new 
PropertyDescriptor.Builder()
-        .name("Rolling Filename Pattern")
-        .description("If the file to tail \"rolls over\" as would be the case 
with log files, this filename pattern will be used to "
-            + "identify files that have rolled over so that if NiFi is 
restarted, and the file has rolled over, it will be able to pick up where it 
left off. "
-            + "This pattern supports wildcard characters * and ? and will 
assume that the files that have rolled over live in the same directory as the 
file being tailed.")
-        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .expressionLanguageSupported(false)
-        .required(false)
-        .build();
-    static final PropertyDescriptor FILE_LOCATION = new 
PropertyDescriptor.Builder()
-        .name("File Location")
-        .description("Specifies where the file is located, so that state can 
be stored appropriately in order to ensure that all data is consumed without 
duplicating data upon restart of NiFi")
-        .required(true)
-        .allowableValues(LOCATION_LOCAL, LOCATION_REMOTE)
-        .defaultValue(LOCATION_LOCAL.getValue())
-        .build();
-    static final PropertyDescriptor STATE_FILE = new 
PropertyDescriptor.Builder()
-        .name("State File")
-        .description("Specifies the file that should be used for storing state 
about what data has been ingested so that upon restart NiFi can resume from 
where it left off")
-        .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-        .expressionLanguageSupported(false)
-        .required(false)
-        .build();
+            .name("Rolling Filename Pattern")
+            .description("If the file to tail \"rolls over\" as would be the 
case with log files, this filename pattern will be used to "
+                    + "identify files that have rolled over so that if NiFi is 
restarted, and the file has rolled over, it will be able to pick up where it 
left off. "
+                    + "This pattern supports wildcard characters * and ? and 
will assume that the files that have rolled over live in the same directory as 
the file being tailed.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(false)
+            .required(false)
+            .build();
+    static final PropertyDescriptor STATE_LOCATION = new 
PropertyDescriptor.Builder()
+            .displayName("State Location")
+            .name("File Location") //retained name of property for backward 
compatibility of configs
+            .description("Specifies where the state is located either local or 
cluster so that state can be stored "
+                    + "appropriately in order to ensure that all data is 
consumed without duplicating data upon restart of NiFi")
+            .required(true)
+            .allowableValues(LOCATION_LOCAL, LOCATION_REMOTE)
+            .defaultValue(LOCATION_LOCAL.getValue())
+            .build();
     static final PropertyDescriptor START_POSITION = new 
PropertyDescriptor.Builder()
-        .name("Initial Start Position")
-        .description("When the Processor first begins to tail data, this 
property specifies where the Processor should begin reading data. Once data has 
been ingested from the file, "
-            + "the Processor will continue from the last point from which it 
has received data.")
-        .allowableValues(START_BEGINNING_OF_TIME, START_CURRENT_FILE, 
START_CURRENT_TIME)
-        .defaultValue(START_CURRENT_FILE.getValue())
-        .required(true)
-        .build();
+            .name("Initial Start Position")
+            .description("When the Processor first begins to tail data, this 
property specifies where the Processor should begin reading data. Once data has 
been ingested from the file, "
+                    + "the Processor will continue from the last point from 
which it has received data.")
+            .allowableValues(START_BEGINNING_OF_TIME, START_CURRENT_FILE, 
START_CURRENT_TIME)
+            .defaultValue(START_CURRENT_FILE.getValue())
+            .required(true)
+            .build();
 
     static final Relationship REL_SUCCESS = new Relationship.Builder()
-        .name("success")
-        .description("All FlowFiles are routed to this Relationship.")
-        .build();
+            .name("success")
+            .description("All FlowFiles are routed to this Relationship.")
+            .build();
 
     private volatile TailFileState state = new TailFileState(null, null, null, 
0L, 0L, null, ByteBuffer.allocate(65536));
     private volatile Long expectedRecoveryChecksum;
@@ -148,9 +140,8 @@ public class TailFile extends AbstractProcessor {
         final List<PropertyDescriptor> properties = new ArrayList<>();
         properties.add(FILENAME);
         properties.add(ROLLING_FILENAME_PATTERN);
-        properties.add(STATE_FILE);
         properties.add(START_POSITION);
-        properties.add(FILE_LOCATION);
+        properties.add(STATE_LOCATION);
         return properties;
     }
 
@@ -167,22 +158,13 @@ public class TailFile extends AbstractProcessor {
         }
     }
 
-
     @OnScheduled
     public void recoverState(final ProcessContext context) throws IOException {
-        // Before the State Manager existed, we had to store state in a local 
file. Now, we want to use the State Manager
-        // instead. So we will need to recover the state that is stored in the 
file (if any), and then store that in our
-        // State Manager. But we do this only if nothing has ever been stored 
in the State Manager.
         final Scope scope = getStateScope(context);
         final StateMap stateMap = context.getStateManager().getState(scope);
         if (stateMap.getVersion() == -1L) {
-            // State has never been stored in the State Manager. Try to 
recover state from a file, if one exists.
-            final Map<String, String> stateFromFile = 
recoverStateValuesFromFile(context);
-            if (!stateFromFile.isEmpty()) {
-                persistState(stateFromFile, context);
-                recoverState(context, stateFromFile);
-            }
-
+            //state has been cleared or never stored so recover as 'empty 
state'
+            recoverState(context, Collections.EMPTY_MAP);
             return;
         }
 
@@ -190,81 +172,33 @@ public class TailFile extends AbstractProcessor {
     }
 
     /**
-     * Recovers values for the State that was stored in a local file.
-     *
-     * @param context the ProcessContext that indicates where the state is 
stored
-     * @return a Map that contains the keys defined in {@link 
TailFileState.StateKeys}
-     * @throws IOException if the state file exists but was unable to be read
-     */
-    private Map<String, String> recoverStateValuesFromFile(final 
ProcessContext context) throws IOException {
-        final String stateFilename = 
context.getProperty(STATE_FILE).getValue();
-        if (stateFilename == null) {
-            return Collections.emptyMap();
-        }
-
-        final Map<String, String> stateValues = new HashMap<>(4);
-        final File stateFile = new File(stateFilename);
-
-        try (final FileInputStream fis = new FileInputStream(stateFile);
-            final DataInputStream dis = new DataInputStream(fis)) {
-
-            final int encodingVersion = dis.readInt();
-            if (encodingVersion > 0) {
-                throw new IOException("Unable to recover state because State 
File was encoded in a more recent version than Version 1");
-            }
-
-            if (encodingVersion == 0) {
-                final String filename = dis.readUTF();
-                long position = dis.readLong();
-                final long timestamp = dis.readLong();
-                final boolean checksumPresent = dis.readBoolean();
-                final Long checksumValue;
-
-                if (checksumPresent) {
-                    checksumValue = dis.readLong();
-                } else {
-                    checksumValue = null;
-                }
-
-                stateValues.put(TailFileState.StateKeys.FILENAME, filename);
-                stateValues.put(TailFileState.StateKeys.POSITION, 
String.valueOf(position));
-                stateValues.put(TailFileState.StateKeys.TIMESTAMP, 
String.valueOf(timestamp));
-                stateValues.put(TailFileState.StateKeys.CHECKSUM, 
checksumValue == null ? null : String.valueOf(checksumValue));
-            } else {
-                // encoding Version == -1... no data in file. Just move on.
-            }
-        } catch (final FileNotFoundException fnfe) {
-        }
-
-        return stateValues;
-    }
-
-
-    /**
-     * Updates member variables to reflect the "expected recovery checksum" 
and seek to the appropriate location in the
-     * tailed file, updating our checksum, so that we are ready to proceed 
with the {@link #onTrigger(ProcessContext, ProcessSession)} call.
+     * Updates member variables to reflect the "expected recovery checksum" and
+     * seek to the appropriate location in the tailed file, updating our
+     * checksum, so that we are ready to proceed with the
+     * {@link #onTrigger(ProcessContext, ProcessSession)} call.
      *
      * @param context the ProcessContext
-     * @param stateValues the values that were recovered from state that was 
previously stored. This Map should be populated with the keys defined
-     *            in {@link TailFileState.StateKeys}.
-     * @throws IOException if unable to seek to the appropriate location in 
the tailed file.
+     * @param stateValues the values that were recovered from state that was
+     * previously stored. This Map should be populated with the keys defined in
+     * {@link TailFileState.StateKeys}.
+     * @throws IOException if unable to seek to the appropriate location in the
+     * tailed file.
      */
     private void recoverState(final ProcessContext context, final Map<String, 
String> stateValues) throws IOException {
-        if (stateValues == null) {
-            return;
-        }
-
+        final String currentFilename = 
context.getProperty(FILENAME).getValue();
         if (!stateValues.containsKey(TailFileState.StateKeys.FILENAME)) {
+            resetState(currentFilename);
             return;
         }
         if (!stateValues.containsKey(TailFileState.StateKeys.POSITION)) {
+            resetState(currentFilename);
             return;
         }
         if (!stateValues.containsKey(TailFileState.StateKeys.TIMESTAMP)) {
+            resetState(currentFilename);
             return;
         }
 
-        final String currentFilename = 
context.getProperty(FILENAME).getValue();
         final String checksumValue = 
stateValues.get(TailFileState.StateKeys.CHECKSUM);
         final boolean checksumPresent = (checksumValue != null);
         final String storedStateFilename = 
stateValues.get(TailFileState.StateKeys.FILENAME);
@@ -284,7 +218,7 @@ public class TailFile extends AbstractProcessor {
             final File existingTailFile = new File(storedStateFilename);
             if (existingTailFile.length() >= position) {
                 try (final InputStream tailFileIs = new 
FileInputStream(existingTailFile);
-                    final CheckedInputStream in = new 
CheckedInputStream(tailFileIs, checksum)) {
+                        final CheckedInputStream in = new 
CheckedInputStream(tailFileIs, checksum)) {
                     StreamUtils.copy(in, new NullOutputStream(), 
state.getPosition());
 
                     final long checksumResult = in.getChecksum().getValue();
@@ -297,7 +231,7 @@ public class TailFile extends AbstractProcessor {
                         getLogger().debug("When recovering state, checksum of 
tailed file matches the stored checksum. Will resume where left off.");
                         tailFile = existingTailFile;
                         reader = FileChannel.open(tailFile.toPath(), 
StandardOpenOption.READ);
-                        getLogger().debug("Created FileChannel {} for {} in 
recoverState", new Object[] {reader, tailFile});
+                        getLogger().debug("Created FileChannel {} for {} in 
recoverState", new Object[]{reader, tailFile});
 
                         reader.position(position);
                     } else {
@@ -308,21 +242,21 @@ public class TailFile extends AbstractProcessor {
             } else {
                 // fewer bytes than our position, so we know we weren't 
already reading from this file. Keep reader at a position of 0.
                 getLogger().debug("When recovering state, existing file to 
tail is only {} bytes but position flag is {}; "
-                    + "this indicates that the file has rotated. Will begin 
tailing current file from beginning.", new Object[] {existingTailFile.length(), 
position});
+                        + "this indicates that the file has rotated. Will 
begin tailing current file from beginning.", new 
Object[]{existingTailFile.length(), position});
             }
 
             state = new TailFileState(currentFilename, tailFile, reader, 
position, timestamp, checksum, ByteBuffer.allocate(65536));
         } else {
-            // If filename changed or there is no checksum present, then we 
have no expected checksum to use for recovery.
-            expectedRecoveryChecksum = null;
-
-            // tailing a new file since the state file was written out. We 
will reset state.
-            state = new TailFileState(currentFilename, null, null, 0L, 0L, 
null, ByteBuffer.allocate(65536));
+            resetState(currentFilename);
         }
 
-        getLogger().debug("Recovered state {}", new Object[] {state});
+        getLogger().debug("Recovered state {}", new Object[]{state});
     }
 
+    private void resetState(final String currentFilename) {
+        expectedRecoveryChecksum = null;
+        state = new TailFileState(currentFilename, null, null, 0L, 0L, null, 
ByteBuffer.allocate(65536));
+    }
 
     @OnStopped
     public void cleanup() {
@@ -342,13 +276,11 @@ public class TailFile extends AbstractProcessor {
             getLogger().warn("Failed to close file handle during cleanup");
         }
 
-        getLogger().debug("Closed FileChannel {}", new Object[] {reader});
+        getLogger().debug("Closed FileChannel {}", new Object[]{reader});
 
         this.state = new TailFileState(state.getFilename(), state.getFile(), 
null, state.getPosition(), state.getTimestamp(), state.getChecksum(), 
state.getBuffer());
     }
 
-
-
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession 
session) throws ProcessException {
         // If user changes the file that is being tailed, we need to consume 
the already-rolled-over data according
@@ -369,14 +301,14 @@ public class TailFile extends AbstractProcessor {
 
                 try {
                     final FileChannel fileChannel = 
FileChannel.open(file.toPath(), StandardOpenOption.READ);
-                    getLogger().debug("Created FileChannel {} for {}", new 
Object[] {fileChannel, file});
+                    getLogger().debug("Created FileChannel {} for {}", new 
Object[]{fileChannel, file});
 
                     final Checksum checksum = new CRC32();
                     final long position = file.length();
                     final long timestamp = file.lastModified();
 
                     try (final InputStream fis = new FileInputStream(file);
-                        final CheckedInputStream in = new 
CheckedInputStream(fis, checksum)) {
+                            final CheckedInputStream in = new 
CheckedInputStream(fis, checksum)) {
                         StreamUtils.copy(in, new NullOutputStream(), position);
                     }
 
@@ -384,7 +316,7 @@ public class TailFile extends AbstractProcessor {
                     cleanup();
                     state = new TailFileState(filename, file, fileChannel, 
position, timestamp, checksum, state.getBuffer());
                 } catch (final IOException ioe) {
-                    getLogger().error("Attempted to position Reader at current 
position in file {} but failed to do so due to {}", new Object[] {file, 
ioe.toString()}, ioe);
+                    getLogger().error("Attempted to position Reader at current 
position in file {} but failed to do so due to {}", new Object[]{file, 
ioe.toString()}, ioe);
                     context.yield();
                     return;
                 }
@@ -435,9 +367,9 @@ public class TailFile extends AbstractProcessor {
             // Since file has rotated, we close the reader, create a new one, 
and then reset our state.
             try {
                 reader.close();
-                getLogger().debug("Closed FileChannel {}", new Object[] 
{reader, reader});
+                getLogger().debug("Closed FileChannel {}", new 
Object[]{reader, reader});
             } catch (final IOException ioe) {
-                getLogger().warn("Failed to close reader for {} due to {}", 
new Object[] {file, ioe});
+                getLogger().warn("Failed to close reader for {} due to {}", 
new Object[]{file, ioe});
             }
 
             reader = createReader(file, 0L);
@@ -494,7 +426,7 @@ public class TailFile extends AbstractProcessor {
             flowFile = session.putAllAttributes(flowFile, attributes);
 
             session.getProvenanceReporter().receive(flowFile, 
file.toURI().toString(), "FlowFile contains bytes " + position + " through " + 
positionHolder.get() + " of source file",
-                TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
+                    TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startNanos));
             session.transfer(flowFile, REL_SUCCESS);
             position = positionHolder.get();
 
@@ -504,7 +436,7 @@ public class TailFile extends AbstractProcessor {
             // operating system file last mod precision), then we could set 
the timestamp to a smaller value, which could result in reading in the
             // rotated file a second time.
             timestamp = Math.max(state.getTimestamp(), file.lastModified());
-            getLogger().debug("Created {} and routed to success", new Object[] 
{flowFile});
+            getLogger().debug("Created {} and routed to success", new 
Object[]{flowFile});
         }
 
         // Create a new state object to represent our current position, 
timestamp, etc.
@@ -516,21 +448,22 @@ public class TailFile extends AbstractProcessor {
         persistState(updatedState, context);
     }
 
-
     /**
-     * Read new lines from the given FileChannel, copying it to the given 
Output Stream. The Checksum is used in order to later determine whether or not
+     * Read new lines from the given FileChannel, copying it to the given 
Output
+     * Stream. The Checksum is used in order to later determine whether or not
      * data has been consumed.
      *
      * @param reader The FileChannel to read data from
      * @param buffer the buffer to use for copying data
      * @param out the OutputStream to copy the data to
-     * @param checksum the Checksum object to use in order to calculate 
checksum for recovery purposes
+     * @param checksum the Checksum object to use in order to calculate 
checksum
+     * for recovery purposes
      *
      * @return The new position after the lines have been read
      * @throws java.io.IOException if an I/O error occurs.
      */
     private long readLines(final FileChannel reader, final ByteBuffer buffer, 
final OutputStream out, final Checksum checksum) throws IOException {
-        getLogger().debug("Reading lines starting at position {}", new 
Object[] {reader.position()});
+        getLogger().debug("Reading lines starting at position {}", new 
Object[]{reader.position()});
 
         try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
             long pos = reader.position();
@@ -554,7 +487,7 @@ public class TailFile extends AbstractProcessor {
                             baos.writeTo(out);
                             checksum.update(baos.getUnderlyingBuffer(), 0, 
baos.size());
                             if (getLogger().isTraceEnabled()) {
-                                getLogger().trace("Checksum updated to {}", 
new Object[] {checksum.getValue()});
+                                getLogger().trace("Checksum updated to {}", 
new Object[]{checksum.getValue()});
                             }
 
                             baos.reset();
@@ -571,7 +504,7 @@ public class TailFile extends AbstractProcessor {
                                 baos.writeTo(out);
                                 checksum.update(baos.getUnderlyingBuffer(), 0, 
baos.size());
                                 if (getLogger().isTraceEnabled()) {
-                                    getLogger().trace("Checksum updated to 
{}", new Object[] {checksum.getValue()});
+                                    getLogger().trace("Checksum updated to 
{}", new Object[]{checksum.getValue()});
                                 }
 
                                 linesRead++;
@@ -588,7 +521,7 @@ public class TailFile extends AbstractProcessor {
             }
 
             if (rePos < reader.position()) {
-                getLogger().debug("Read {} lines; repositioning reader from {} 
to {}", new Object[] {linesRead, pos, rePos});
+                getLogger().debug("Read {} lines; repositioning reader from {} 
to {}", new Object[]{linesRead, pos, rePos});
                 reader.position(rePos); // Ensure we can re-read if necessary
             }
 
@@ -596,22 +529,25 @@ public class TailFile extends AbstractProcessor {
         }
     }
 
-
     /**
      * Returns a list of all Files that match the following criteria:
      *
      * <ul>
      * <li>Filename matches the Rolling Filename Pattern</li>
      * <li>Filename does not match the actual file being tailed</li>
-     * <li>The Last Modified Time on the file is equal to or later than the 
given minimum timestamp</li>
+     * <li>The Last Modified Time on the file is equal to or later than the
+     * given minimum timestamp</li>
      * </ul>
      *
      * <p>
-     * The List that is returned will be ordered by file timestamp, providing 
the oldest file first.
+     * The List that is returned will be ordered by file timestamp, providing
+     * the oldest file first.
      * </p>
      *
-     * @param context the ProcessContext to use in order to determine 
Processor configuration
-     * @param minTimestamp any file with a Last Modified Time before this 
timestamp will not be returned
+     * @param context the ProcessContext to use in order to determine Processor
+     * configuration
+     * @param minTimestamp any file with a Last Modified Time before this
+     * timestamp will not be returned
      * @return a list of all Files that have rolled over
      * @throws IOException if unable to perform the listing of files
      */
@@ -636,7 +572,7 @@ public class TailFile extends AbstractProcessor {
 
                 if (file.lastModified() < minTimestamp) {
                     getLogger().debug("Found rolled off file {} but its last 
modified timestamp is before the cutoff (Last Mod = {}, Cutoff = {}) so will 
not consume it",
-                        new Object[] {file, lastMod, minTimestamp});
+                            new Object[]{file, lastMod, minTimestamp});
 
                     continue;
                 } else if (file.equals(tailFile)) {
@@ -665,9 +601,8 @@ public class TailFile extends AbstractProcessor {
         return rolledOffFiles;
     }
 
-
     private Scope getStateScope(final ProcessContext context) {
-        final String location = context.getProperty(FILE_LOCATION).getValue();
+        final String location = context.getProperty(STATE_LOCATION).getValue();
         if (LOCATION_REMOTE.getValue().equalsIgnoreCase(location)) {
             return Scope.CLUSTER;
         }
@@ -683,31 +618,30 @@ public class TailFile extends AbstractProcessor {
         try {
             context.getStateManager().setState(state, getStateScope(context));
         } catch (final IOException e) {
-            getLogger().warn("Failed to store state due to {}; some data may 
be duplicated on restart of NiFi", new Object[] {e});
+            getLogger().warn("Failed to store state due to {}; some data may 
be duplicated on restart of NiFi", new Object[]{e});
         }
     }
 
-
     private FileChannel createReader(final File file, final long position) {
         final FileChannel reader;
 
         try {
             reader = FileChannel.open(file.toPath(), StandardOpenOption.READ);
         } catch (final IOException ioe) {
-            getLogger().warn("Unable to open file {}; will attempt to access 
file again after the configured Yield Duration has elapsed: {}", new Object[] 
{file, ioe});
+            getLogger().warn("Unable to open file {}; will attempt to access 
file again after the configured Yield Duration has elapsed: {}", new 
Object[]{file, ioe});
             return null;
         }
 
-        getLogger().debug("Created FileChannel {} for {}", new Object[] 
{reader, file});
+        getLogger().debug("Created FileChannel {} for {}", new 
Object[]{reader, file});
 
         try {
             reader.position(position);
         } catch (final IOException ioe) {
-            getLogger().error("Failed to read from {} due to {}", new Object[] 
{file, ioe});
+            getLogger().error("Failed to read from {} due to {}", new 
Object[]{file, ioe});
 
             try {
                 reader.close();
-                getLogger().debug("Closed FileChannel {}", new Object[] 
{reader});
+                getLogger().debug("Closed FileChannel {}", new 
Object[]{reader});
             } catch (final IOException ioe2) {
             }
 
@@ -722,19 +656,27 @@ public class TailFile extends AbstractProcessor {
         return state;
     }
 
-
     /**
-     * Finds any files that have rolled over and have not yet been ingested by 
this Processor. Each of these files that is found will be
-     * ingested as its own FlowFile. If a file is found that has been 
partially ingested, the rest of the file will be ingested as a
-     * single FlowFile but the data that already has been ingested will not be 
ingested again.
+     * Finds any files that have rolled over and have not yet been ingested by
+     * this Processor. Each of these files that is found will be ingested as 
its
+     * own FlowFile. If a file is found that has been partially ingested, the
+     * rest of the file will be ingested as a single FlowFile but the data that
+     * already has been ingested will not be ingested again.
      *
-     * @param context the ProcessContext to use in order to obtain Processor 
configuration.
-     * @param session the ProcessSession to use in order to interact with 
FlowFile creation and content.
-     * @param expectedChecksum the checksum value that is expected for the 
oldest file from offset 0 through &lt;position&gt;.
-     * @param timestamp the latest Last Modified Timestamp that has been 
consumed. Any data that was written before this data will not be ingested.
-     * @param position the byte offset in the file being tailed, where tailing 
last left off.
+     * @param context the ProcessContext to use in order to obtain Processor
+     * configuration.
+     * @param session the ProcessSession to use in order to interact with
+     * FlowFile creation and content.
+     * @param expectedChecksum the checksum value that is expected for the
+     * oldest file from offset 0 through &lt;position&gt;.
+     * @param timestamp the latest Last Modified Timestamp that has been
+     * consumed. Any data that was written before this data will not be
+     * ingested.
+     * @param position the byte offset in the file being tailed, where tailing
+     * last left off.
      *
-     * @return <code>true</code> if the file being tailed has rolled over, 
<code>false</code> otherwise
+     * @return <code>true</code> if the file being tailed has rolled over,
+     * <code>false</code> otherwise
      */
     private boolean recoverRolledFiles(final ProcessContext context, final 
ProcessSession session, final Long expectedChecksum, final long timestamp, 
final long position) {
         try {
@@ -745,28 +687,37 @@ public class TailFile extends AbstractProcessor {
             final List<File> rolledOffFiles = getRolledOffFiles(context, 
timestamp);
             return recoverRolledFiles(context, session, rolledOffFiles, 
expectedChecksum, timestamp, position);
         } catch (final IOException e) {
-            getLogger().error("Failed to recover files that have rolled over 
due to {}", new Object[] {e});
+            getLogger().error("Failed to recover files that have rolled over 
due to {}", new Object[]{e});
             return false;
         }
     }
 
     /**
-     * Finds any files that have rolled over and have not yet been ingested by 
this Processor. Each of these files that is found will be
-     * ingested as its own FlowFile. If a file is found that has been 
partially ingested, the rest of the file will be ingested as a
-     * single FlowFile but the data that already has been ingested will not be 
ingested again.
+     * Finds any files that have rolled over and have not yet been ingested by
+     * this Processor. Each of these files that is found will be ingested as 
its
+     * own FlowFile. If a file is found that has been partially ingested, the
+     * rest of the file will be ingested as a single FlowFile but the data that
+     * already has been ingested will not be ingested again.
      *
-     * @param context the ProcessContext to use in order to obtain Processor 
configuration.
-     * @param session the ProcessSession to use in order to interact with 
FlowFile creation and content.
-     * @param expectedChecksum the checksum value that is expected for the 
oldest file from offset 0 through &lt;position&gt;.
-     * @param timestamp the latest Last Modfiied Timestamp that has been 
consumed. Any data that was written before this data will not be ingested.
-     * @param position the byte offset in the file being tailed, where tailing 
last left off.
+     * @param context the ProcessContext to use in order to obtain Processor
+     * configuration.
+     * @param session the ProcessSession to use in order to interact with
+     * FlowFile creation and content.
+     * @param expectedChecksum the checksum value that is expected for the
+     * oldest file from offset 0 through &lt;position&gt;.
+     * @param timestamp the latest Last Modfiied Timestamp that has been
+     * consumed. Any data that was written before this data will not be
+     * ingested.
+     * @param position the byte offset in the file being tailed, where tailing
+     * last left off.
      *
-     * @return <code>true</code> if the file being tailed has rolled over, 
false otherwise
+     * @return <code>true</code> if the file being tailed has rolled over, 
false
+     * otherwise
      */
     private boolean recoverRolledFiles(final ProcessContext context, final 
ProcessSession session, final List<File> rolledOffFiles, final Long 
expectedChecksum,
-        final long timestamp, final long position) {
+            final long timestamp, final long position) {
         try {
-            getLogger().debug("Recovering Rolled Off Files; total number of 
files rolled off = {}", new Object[] {rolledOffFiles.size()});
+            getLogger().debug("Recovering Rolled Off Files; total number of 
files rolled off = {}", new Object[]{rolledOffFiles.size()});
 
             // For first file that we find, it may or may not be the file that 
we were last reading from.
             // As a result, we have to read up to the position we stored, 
while calculating the checksum. If the checksums match,
@@ -780,12 +731,12 @@ public class TailFile extends AbstractProcessor {
                 final long startNanos = System.nanoTime();
                 if (position > 0) {
                     try (final InputStream fis = new 
FileInputStream(firstFile);
-                        final CheckedInputStream in = new 
CheckedInputStream(fis, new CRC32())) {
+                            final CheckedInputStream in = new 
CheckedInputStream(fis, new CRC32())) {
                         StreamUtils.copy(in, new NullOutputStream(), position);
 
                         final long checksumResult = 
in.getChecksum().getValue();
                         if (checksumResult == expectedChecksum) {
-                            getLogger().debug("Checksum for {} matched 
expected checksum. Will skip first {} bytes", new Object[] {firstFile, 
position});
+                            getLogger().debug("Checksum for {} matched 
expected checksum. Will skip first {} bytes", new Object[]{firstFile, 
position});
 
                             // This is the same file that we were reading when 
we shutdown. Start reading from this point on.
                             rolledOffFiles.remove(0);
@@ -800,9 +751,9 @@ public class TailFile extends AbstractProcessor {
                                 flowFile = session.putAttribute(flowFile, 
"filename", firstFile.getName());
 
                                 
session.getProvenanceReporter().receive(flowFile, firstFile.toURI().toString(), 
"FlowFile contains bytes 0 through " + position + " of source file",
-                                    
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
+                                        
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startNanos));
                                 session.transfer(flowFile, REL_SUCCESS);
-                                getLogger().debug("Created {} from rolled over 
file {} and routed to success", new Object[] {flowFile, firstFile});
+                                getLogger().debug("Created {} from rolled over 
file {} and routed to success", new Object[]{flowFile, firstFile});
 
                                 // use a timestamp of lastModified() + 1 so 
that we do not ingest this file again.
                                 cleanup();
@@ -814,7 +765,7 @@ public class TailFile extends AbstractProcessor {
                             }
                         } else {
                             getLogger().debug("Checksum for {} did not match 
expected checksum. Checksum for file was {} but expected {}. Will consume 
entire file",
-                                new Object[] {firstFile, checksumResult, 
expectedChecksum});
+                                    new Object[]{firstFile, checksumResult, 
expectedChecksum});
                         }
                     }
                 }
@@ -830,22 +781,23 @@ public class TailFile extends AbstractProcessor {
 
             return rolloverOccurred;
         } catch (final IOException e) {
-            getLogger().error("Failed to recover files that have rolled over 
due to {}", new Object[] {e});
+            getLogger().error("Failed to recover files that have rolled over 
due to {}", new Object[]{e});
             return false;
         }
     }
 
-
     /**
-     * Creates a new FlowFile that contains the entire contents of the given 
file and transfers that FlowFile to success. This method
-     * will commit the given session and emit an appropriate Provenance Event.
+     * Creates a new FlowFile that contains the entire contents of the given
+     * file and transfers that FlowFile to success. This method will commit the
+     * given session and emit an appropriate Provenance Event.
      *
      * @param file the file to ingest
      * @param context the ProcessContext
      * @param session the ProcessSession
      * @param state the current state
      *
-     * @return the new, updated state that reflects that the given file has 
been ingested.
+     * @return the new, updated state that reflects that the given file has 
been
+     * ingested.
      */
     private TailFileState consumeFileFully(final File file, final 
ProcessContext context, final ProcessSession session, TailFileState state) {
         FlowFile flowFile = session.create();
@@ -856,7 +808,7 @@ public class TailFile extends AbstractProcessor {
             flowFile = session.putAttribute(flowFile, "filename", 
file.getName());
             session.getProvenanceReporter().receive(flowFile, 
file.toURI().toString());
             session.transfer(flowFile, REL_SUCCESS);
-            getLogger().debug("Created {} from {} and routed to success", new 
Object[] {flowFile, file});
+            getLogger().debug("Created {} from {} and routed to success", new 
Object[]{flowFile, file});
 
             // use a timestamp of lastModified() + 1 so that we do not ingest 
this file again.
             cleanup();
@@ -871,9 +823,11 @@ public class TailFile extends AbstractProcessor {
     }
 
     /**
-     * A simple Java class to hold information about our state so that we can 
maintain this state across multiple invocations of the Processor
+     * A simple Java class to hold information about our state so that we can
+     * maintain this state across multiple invocations of the Processor
      */
     static class TailFileState {
+
         private final String filename; // hold onto filename and not just File 
because we want to match that against the user-defined filename to recover from
         private final File file;
         private final FileChannel reader;
@@ -883,6 +837,7 @@ public class TailFile extends AbstractProcessor {
         private final ByteBuffer buffer;
 
         private static class StateKeys {
+
             public static final String FILENAME = "filename";
             public static final String POSITION = "position";
             public static final String TIMESTAMP = "timestamp";

http://git-wip-us.apache.org/repos/asf/nifi/blob/6bcc415e/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
index f681049..4091dbf 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
@@ -14,7 +14,6 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.nifi.processors.standard;
 
 import static org.junit.Assert.assertEquals;
@@ -36,6 +35,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 public class TestTailFile {
+
     private File file;
     private TailFile processor;
     private RandomAccessFile raf;
@@ -61,14 +61,9 @@ public class TestTailFile {
         file.delete();
         assertTrue(file.createNewFile());
 
-        final File stateFile = new File("target/tail-file.state");
-        stateFile.delete();
-        Assert.assertFalse(stateFile.exists());
-
         processor = new TailFile();
         runner = TestRunners.newTestRunner(processor);
         runner.setProperty(TailFile.FILENAME, "target/log.txt");
-        runner.setProperty(TailFile.STATE_FILE, "target/tail-file.state");
         runner.assertValid();
 
         raf = new RandomAccessFile(file, "rw");
@@ -83,7 +78,6 @@ public class TestTailFile {
         processor.cleanup();
     }
 
-
     @Test
     public void testConsumeAfterTruncationStartAtBeginningOfFile() throws 
IOException, InterruptedException {
         runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.txt*");
@@ -148,7 +142,6 @@ public class TestTailFile {
         
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("HELLO\n");
     }
 
-
     @Test
     public void testStartAtBeginningOfFile() throws IOException, 
InterruptedException {
         runner.setProperty(TailFile.START_POSITION, 
TailFile.START_CURRENT_FILE.getValue());
@@ -201,7 +194,6 @@ public class TestTailFile {
         assertTrue(world);
     }
 
-
     @Test
     public void testRemainderOfFileRecoveredAfterRestart() throws IOException {
         runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log*.txt");
@@ -227,7 +219,6 @@ public class TestTailFile {
         
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("new
 file\n");
     }
 
-
     @Test
     public void testRemainderOfFileRecoveredIfRolledOverWhileRunning() throws 
IOException {
         // this mimics the case when we are reading a log file that rolls over 
while processor is running.
@@ -291,7 +282,6 @@ public class TestTailFile {
         
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("1\n");
     }
 
-
     @Test
     public void testMultipleRolloversAfterHavingReadAllData() throws 
IOException, InterruptedException {
         // this mimics the case when we are reading a log file that rolls over 
while processor is running.
@@ -339,7 +329,6 @@ public class TestTailFile {
         
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(2).assertContentEquals("1\n");
     }
 
-
     @Test
     public void testMultipleRolloversAfterHavingReadAllDataWhileStillRunning() 
throws IOException, InterruptedException {
         // this mimics the case when we are reading a log file that rolls over 
while processor is running.
@@ -387,7 +376,6 @@ public class TestTailFile {
         
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(2).assertContentEquals("1\n");
     }
 
-
     @Test
     public void testMultipleRolloversWithLongerFileLength() throws 
IOException, InterruptedException {
         // this mimics the case when we are reading a log file that rolls over 
while processor is running.
@@ -433,7 +421,6 @@ public class TestTailFile {
         
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(2).assertContentEquals("This
 is a longer line than the other files had.\n");
     }
 
-
     @Test
     public void testConsumeWhenNewLineFound() throws IOException, 
InterruptedException {
         runner.run();

Reply via email to