NIFI-994: Fixed issue that could result in data duplication if more than 1 
rollover of tailed file has occurred on restart of Processor


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/7b9c8df6
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/7b9c8df6
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/7b9c8df6

Branch: refs/heads/master
Commit: 7b9c8df6c593059d063770095ab9efcf3c82467e
Parents: bfa9e45
Author: Mark Payne <marka...@hotmail.com>
Authored: Tue Nov 10 16:45:46 2015 -0500
Committer: Mark Payne <marka...@hotmail.com>
Committed: Tue Nov 10 16:53:29 2015 -0500

----------------------------------------------------------------------
 .../nifi/processors/standard/TailFile.java      | 127 +++++++++++++------
 .../nifi/processors/standard/TestTailFile.java  |  85 +++++++++++++
 2 files changed, 174 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/7b9c8df6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
index dfe489e..4d7e6f5 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/TailFile.java
@@ -18,7 +18,6 @@
 package org.apache.nifi.processors.standard;
 
 import java.io.BufferedOutputStream;
-import java.io.ByteArrayOutputStream;
 import java.io.DataInputStream;
 import java.io.DataOutputStream;
 import java.io.File;
@@ -60,6 +59,7 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.stream.io.ByteArrayOutputStream;
 import org.apache.nifi.stream.io.NullOutputStream;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.LongHolder;
@@ -179,27 +179,38 @@ public class TailFile extends AbstractProcessor {
                     // We have an expected checksum and the currently 
configured filename is the same as the state file.
                     // We need to check if the existing file is the same as 
the one referred to in the state file based on
                     // the checksum.
+                    final Checksum checksum = new CRC32();
                     final File existingTailFile = new File(filename);
                     if (existingTailFile.length() >= position) {
                         try (final InputStream tailFileIs = new 
FileInputStream(existingTailFile);
-                            final CheckedInputStream in = new 
CheckedInputStream(tailFileIs, new CRC32())) {
+                            final CheckedInputStream in = new 
CheckedInputStream(tailFileIs, checksum)) {
                             StreamUtils.copy(in, new NullOutputStream(), 
state.getPosition());
 
                             final long checksumResult = 
in.getChecksum().getValue();
                             if (checksumResult == expectedRecoveryChecksum) {
+                                // Checksums match. This means that we want to 
resume reading from where we left off.
+                                // So we will populate the reader object so 
that it will be used in onTrigger. If the
+                                // checksums do not match, then we will leave 
the reader object null, so that the next
+                                // call to onTrigger will result in a new 
Reader being created and starting at the
+                                // beginning of the file.
+                                getLogger().debug("When recovering state, 
checksum of tailed file matches the stored checksum. Will resume where left 
off.");
                                 tailFile = existingTailFile;
                                 reader = new RandomAccessFile(tailFile, "r");
                                 reader.seek(position);
+                            } else {
+                                getLogger().debug("When recovering state, 
checksum of tailed file does not match the stored checksum. Will begin tailing 
current file from beginning.");
                             }
                         }
                     }
 
-                    state = new TailFileState(tailFilename, tailFile, reader, 
position, timestamp, null, new byte[65536]);
+                    state = new TailFileState(tailFilename, tailFile, reader, 
position, timestamp, checksum, new byte[65536]);
                 } else {
                     expectedRecoveryChecksum = null;
                     // tailing a new file since the state file was written 
out. We will reset state.
                     state = new TailFileState(tailFilename, null, null, 0L, 
0L, null, new byte[65536]);
                 }
+
+                getLogger().debug("Recovered state {}", new Object[] {state});
             } else {
                 // encoding Version == -1... no data in file. Just move on.
             }
@@ -209,6 +220,8 @@ public class TailFile extends AbstractProcessor {
 
 
     public void persistState(final TailFileState state, final String 
stateFilename) throws IOException {
+        getLogger().debug("Persisting state {} to {}", new Object[] {state, 
stateFilename});
+
         final File stateFile = new File(stateFilename);
         File directory = stateFile.getParentFile();
         if (directory != null && !directory.exists() && !directory.mkdirs()) {
@@ -279,6 +292,7 @@ public class TailFile extends AbstractProcessor {
             // a file when we stopped running, then that file that we were 
reading from should be the first file in this list,
             // assuming that the file still exists on the file system.
             final List<File> rolledOffFiles = getRolledOffFiles(context, 
state.getTimestamp());
+            getLogger().debug("Recovering Rolled Off Files; total number of 
files rolled off = {}", new Object[] {rolledOffFiles.size()});
 
             // For first file that we find, it may or may not be the file that 
we were last reading from.
             // As a result, we have to read up to the position we stored, 
while calculating the checksum. If the checksums match,
@@ -295,6 +309,8 @@ public class TailFile extends AbstractProcessor {
 
                     final long checksumResult = in.getChecksum().getValue();
                     if (checksumResult == expectedRecoveryChecksum) {
+                        getLogger().debug("Checksum for {} matched expected 
checksum. Will skip first {} bytes", new Object[] {firstFile, 
state.getPosition()});
+
                         // This is the same file that we were reading when we 
shutdown. Start reading from this point on.
                         rolledOffFiles.remove(0);
                         FlowFile flowFile = session.create();
@@ -317,6 +333,9 @@ public class TailFile extends AbstractProcessor {
                             session.commit();
                             persistState(state, 
context.getProperty(STATE_FILE).getValue());
                         }
+                    } else {
+                        getLogger().debug("Checksum for {} did not match 
expected checksum. Checksum for file was {} but expected {}. Will consume 
entire file",
+                            new Object[] {firstFile, checksumResult, 
expectedRecoveryChecksum});
                     }
                 }
             }
@@ -425,11 +444,15 @@ public class TailFile extends AbstractProcessor {
             // marked position.
             try {
                 final List<File> updatedRolledOverFiles = 
getRolledOffFiles(context, timestamp);
+                getLogger().debug("Tailed file has rotated. Total number of 
rolled off files to check for un-consumed modifications: {}", new Object[] 
{updatedRolledOverFiles.size()});
+
                 if (!updatedRolledOverFiles.isEmpty()) {
                     final File lastRolledOver = 
updatedRolledOverFiles.get(updatedRolledOverFiles.size() - 1);
 
                     // there is more data in the file that has not yet been 
consumed.
                     if (lastRolledOver.length() > state.getPosition()) {
+                        getLogger().debug("Last rolled over file {} is larger 
than our last position; will consume data from it after offset {}", new 
Object[] {lastRolledOver, state.getPosition()});
+
                         try (final FileInputStream fis = new 
FileInputStream(lastRolledOver)) {
                             StreamUtils.skip(fis, state.getPosition());
 
@@ -450,6 +473,8 @@ public class TailFile extends AbstractProcessor {
                                 persistState(state, 
context.getProperty(STATE_FILE).getValue());
                             }
                         }
+                    } else {
+                        getLogger().debug("Last rolled over file {} is not 
larger than our last position; will not consume data from it", new Object[] 
{lastRolledOver});
                     }
                 }
             } catch (final IOException ioe) {
@@ -531,11 +556,11 @@ public class TailFile extends AbstractProcessor {
                     TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - 
startNanos));
                 session.transfer(flowFile, REL_SUCCESS);
                 position = positionHolder.get();
+                timestamp = System.currentTimeMillis();
             }
         }
 
         // Create a new state object to represent our current position, 
timestamp, etc.
-        timestamp = System.currentTimeMillis();
         final TailFileState updatedState = new 
TailFileState(context.getProperty(FILENAME).getValue(), file, reader, position, 
timestamp, checksum, state.getBuffer());
         this.state = updatedState;
 
@@ -571,47 +596,61 @@ public class TailFile extends AbstractProcessor {
      * @throws java.io.IOException if an I/O error occurs.
      */
     private long readLines(final RandomAccessFile reader, final byte[] buffer, 
final OutputStream out, final Checksum checksum) throws IOException {
-        final ByteArrayOutputStream baos = new ByteArrayOutputStream();
-        long pos = reader.getFilePointer();
-        long rePos = pos; // position to re-read
-
-        int num;
-        boolean seenCR = false;
-        while (((num = reader.read(buffer)) != -1)) {
-            for (int i = 0; i < num; i++) {
-                byte ch = buffer[i];
-
-                switch (ch) {
-                    case '\n':
-                        baos.write(ch);
-                        seenCR = false;
-                        baos.writeTo(out);
-                        baos.reset();
-                        rePos = pos + i + 1;
-                        break;
-                    case '\r':
-                        baos.write(ch);
-                        seenCR = true;
-                        break;
-                    default:
-                        if (seenCR) {
+        try (final ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
+            long pos = reader.getFilePointer();
+            long rePos = pos; // position to re-read
+
+            int num;
+            int linesRead = 0;
+            boolean seenCR = false;
+            while (((num = reader.read(buffer)) != -1)) {
+                for (int i = 0; i < num; i++) {
+                    byte ch = buffer[i];
+
+                    switch (ch) {
+                        case '\n':
+                            baos.write(ch);
                             seenCR = false;
                             baos.writeTo(out);
+                            checksum.update(baos.getUnderlyingBuffer(), 0, 
baos.size());
+                            if (getLogger().isTraceEnabled()) {
+                                getLogger().trace("Checksum updated to {}", 
new Object[] {checksum.getValue()});
+                            }
+
                             baos.reset();
+                            rePos = pos + i + 1;
+                            linesRead++;
+                            break;
+                        case '\r':
                             baos.write(ch);
-                            rePos = pos + i;
-                        } else {
-                            baos.write(ch);
-                        }
+                            seenCR = true;
+                            break;
+                        default:
+                            if (seenCR) {
+                                seenCR = false;
+                                baos.writeTo(out);
+                                checksum.update(baos.getUnderlyingBuffer(), 0, 
baos.size());
+                                if (getLogger().isTraceEnabled()) {
+                                    getLogger().trace("Checksum updated to 
{}", new Object[] {checksum.getValue()});
+                                }
+
+                                linesRead++;
+                                baos.reset();
+                                baos.write(ch);
+                                rePos = pos + i;
+                            } else {
+                                baos.write(ch);
+                            }
+                    }
                 }
+
+                pos = reader.getFilePointer();
             }
 
-            checksum.update(buffer, 0, num);
-            pos = reader.getFilePointer();
+            getLogger().debug("Read {} lines; repositioning reader from {} to 
{}", new Object[] {linesRead, pos, rePos});
+            reader.seek(rePos); // Ensure we can re-read if necessary
+            return rePos;
         }
-
-        reader.seek(rePos); // Ensure we can re-read if necessary
-        return rePos;
     }
 
 
@@ -650,7 +689,14 @@ public class TailFile extends AbstractProcessor {
         final DirectoryStream<Path> dirStream = 
Files.newDirectoryStream(directory.toPath(), rollingPattern);
         for (final Path path : dirStream) {
             final File file = path.toFile();
-            if (file.lastModified() < minTimestamp || file.equals(tailFile)) {
+            final long lastMod = file.lastModified();
+
+            if (file.lastModified() < minTimestamp) {
+                getLogger().debug("Found rolled off file {} but its last 
modified timestamp is before the cutoff (Last Mod = {}, Cutoff = {}) so will 
not consume it",
+                    new Object[] {file, lastMod, minTimestamp});
+
+                continue;
+            } else if (file.equals(tailFile)) {
                 continue;
             }
 
@@ -725,5 +771,10 @@ public class TailFile extends AbstractProcessor {
         public byte[] getBuffer() {
             return buffer;
         }
+
+        @Override
+        public String toString() {
+            return "TailFileState[filename=" + filename + ", position=" + 
position + ", timestamp=" + timestamp + ", checksum=" + (checksum == null ? 
"null" : checksum.getValue()) + "]";
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/7b9c8df6/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
index 993f2b1..85638ad 100644
--- 
a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
+++ 
b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestTailFile.java
@@ -42,6 +42,8 @@ public class TestTailFile {
 
     @Before
     public void setup() throws IOException {
+        
System.setProperty("org.slf4j.simpleLogger.log.org.apache.nifi.processors.standard",
 "TRACE");
+
         final File targetDir = new File("target");
         final File[] files = targetDir.listFiles(new FilenameFilter() {
             @Override
@@ -240,6 +242,89 @@ public class TestTailFile {
         
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("1\n");
     }
 
+    @Test
+    public void testRolloverAfterHavingReadAllData() throws IOException, 
InterruptedException {
+        // If we have read all data in a file, and that file does not end with 
a new-line, then the last line
+        // in the file will have been read, added to the checksum, and then we 
would re-seek to "unread" that
+        // last line since it didn't have a new-line. We need to ensure that 
if the data is then rolled over
+        // that our checksum does not take into account those bytes that have 
been "unread."
+
+        // this mimics the case when we are reading a log file that rolls over 
while processor is running.
+        runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+
+        raf.write("hello\n".getBytes());
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
+        runner.clearTransferState();
+
+        raf.write("world".getBytes());
+
+        Thread.sleep(1000L);
+
+        runner.run(1);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0); // 
should not pull in data because no \n
+
+        raf.close();
+        file.renameTo(new File("target/log.1"));
+
+        raf = new RandomAccessFile(new File("target/log.txt"), "rw");
+        raf.write("1\n".getBytes());
+        runner.run(1, false, false);
+
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 2);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("world");
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("1\n");
+    }
+
+
+    @Test
+    public void testMultipleRolloversAfterHavingReadAllData() throws 
IOException, InterruptedException {
+        // this mimics the case when we are reading a log file that rolls over 
while processor is running.
+        runner.setProperty(TailFile.ROLLING_FILENAME_PATTERN, "log.*");
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 0);
+
+        raf.write("hello\n".getBytes());
+        runner.run(1, false, false);
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 1);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("hello\n");
+        runner.clearTransferState();
+
+        raf.write("world".getBytes());
+        runner.run(1); // ensure that we've read 'world' but not consumed it 
into a flowfile.
+
+        Thread.sleep(1000L);
+
+        // rename file to log.2
+        raf.close();
+        file.renameTo(new File("target/log.2"));
+
+        // write to a new file.
+        file = new File("target/log.txt");
+        raf = new RandomAccessFile(file, "rw");
+        raf.write("abc\n".getBytes());
+
+        // rename file to log.1
+        raf.close();
+        file.renameTo(new File("target/log.1"));
+
+        // write to a new file.
+        file = new File("target/log.txt");
+        raf = new RandomAccessFile(file, "rw");
+        raf.write("1\n".getBytes());
+        raf.close();
+
+        runner.run(1);
+
+        runner.assertAllFlowFilesTransferred(TailFile.REL_SUCCESS, 3);
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(0).assertContentEquals("world");
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(1).assertContentEquals("abc\n");
+        
runner.getFlowFilesForRelationship(TailFile.REL_SUCCESS).get(2).assertContentEquals("1\n");
+    }
+
 
     @Test
     public void testConsumeWhenNewLineFound() throws IOException, 
InterruptedException {

Reply via email to